PostgreSQL Source Code  git master
read_stream.c File Reference
#include "postgres.h"
#include "miscadmin.h"
#include "storage/fd.h"
#include "storage/smgr.h"
#include "storage/read_stream.h"
#include "utils/memdebug.h"
#include "utils/rel.h"
#include "utils/spccache.h"
Include dependency graph for read_stream.c:

Go to the source code of this file.

Data Structures

struct  InProgressIO
 
struct  ReadStream
 

Typedefs

typedef struct InProgressIO InProgressIO
 

Functions

static void * get_per_buffer_data (ReadStream *stream, int16 buffer_index)
 
BlockNumber block_range_read_stream_cb (ReadStream *stream, void *callback_private_data, void *per_buffer_data)
 
static BlockNumber read_stream_get_block (ReadStream *stream, void *per_buffer_data)
 
static void read_stream_unget_block (ReadStream *stream, BlockNumber blocknum)
 
static void read_stream_start_pending_read (ReadStream *stream, bool suppress_advice)
 
static void read_stream_look_ahead (ReadStream *stream, bool suppress_advice)
 
static ReadStreamread_stream_begin_impl (int flags, BufferAccessStrategy strategy, Relation rel, SMgrRelation smgr, char persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
ReadStreamread_stream_begin_relation (int flags, BufferAccessStrategy strategy, Relation rel, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
ReadStreamread_stream_begin_smgr_relation (int flags, BufferAccessStrategy strategy, SMgrRelation smgr, char smgr_persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
Buffer read_stream_next_buffer (ReadStream *stream, void **per_buffer_data)
 
BlockNumber read_stream_next_block (ReadStream *stream, BufferAccessStrategy *strategy)
 
void read_stream_reset (ReadStream *stream)
 
void read_stream_end (ReadStream *stream)
 

Typedef Documentation

◆ InProgressIO

typedef struct InProgressIO InProgressIO

Function Documentation

◆ block_range_read_stream_cb()

BlockNumber block_range_read_stream_cb ( ReadStream stream,
void *  callback_private_data,
void *  per_buffer_data 
)

Definition at line 171 of file read_stream.c.

174 {
175  BlockRangeReadStreamPrivate *p = callback_private_data;
176 
177  if (p->current_blocknum < p->last_exclusive)
178  return p->current_blocknum++;
179 
180  return InvalidBlockNumber;
181 }
#define InvalidBlockNumber
Definition: block.h:33

References BlockRangeReadStreamPrivate::current_blocknum, InvalidBlockNumber, and BlockRangeReadStreamPrivate::last_exclusive.

Referenced by collect_visibility_data(), pg_prewarm(), and RelationCopyStorageUsingBuffer().

◆ get_per_buffer_data()

static void* get_per_buffer_data ( ReadStream stream,
int16  buffer_index 
)
inlinestatic

Definition at line 160 of file read_stream.c.

161 {
162  return (char *) stream->per_buffer_data +
163  stream->per_buffer_data_size * buffer_index;
164 }
void * per_buffer_data
Definition: read_stream.c:141
size_t per_buffer_data_size
Definition: read_stream.c:140

References ReadStream::per_buffer_data, and ReadStream::per_buffer_data_size.

Referenced by read_stream_look_ahead(), and read_stream_next_buffer().

◆ read_stream_begin_impl()

static ReadStream* read_stream_begin_impl ( int  flags,
BufferAccessStrategy  strategy,
Relation  rel,
SMgrRelation  smgr,
char  persistence,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void *  callback_private_data,
size_t  per_buffer_data_size 
)
static

Definition at line 394 of file read_stream.c.

403 {
404  ReadStream *stream;
405  size_t size;
406  int16 queue_size;
407  int max_ios;
408  int strategy_pin_limit;
409  uint32 max_pinned_buffers;
410  Oid tablespace_id;
411 
412  /*
413  * Decide how many I/Os we will allow to run at the same time. That
414  * currently means advice to the kernel to tell it that we will soon read.
415  * This number also affects how far we look ahead for opportunities to
416  * start more I/Os.
417  */
418  tablespace_id = smgr->smgr_rlocator.locator.spcOid;
419  if (!OidIsValid(MyDatabaseId) ||
420  (rel && IsCatalogRelation(rel)) ||
422  {
423  /*
424  * Avoid circularity while trying to look up tablespace settings or
425  * before spccache.c is ready.
426  */
427  max_ios = effective_io_concurrency;
428  }
429  else if (flags & READ_STREAM_MAINTENANCE)
430  max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
431  else
432  max_ios = get_tablespace_io_concurrency(tablespace_id);
433 
434  /* Cap to INT16_MAX to avoid overflowing below */
435  max_ios = Min(max_ios, PG_INT16_MAX);
436 
437  /*
438  * Choose the maximum number of buffers we're prepared to pin. We try to
439  * pin fewer if we can, though. We clamp it to at least io_combine_limit
440  * so that we can have a chance to build up a full io_combine_limit sized
441  * read, even when max_ios is zero. Be careful not to allow int16 to
442  * overflow (even though that's not possible with the current GUC range
443  * limits), allowing also for the spare entry and the overflow space.
444  */
445  max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
446  max_pinned_buffers = Min(max_pinned_buffers,
448 
449  /* Give the strategy a chance to limit the number of buffers we pin. */
450  strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
451  max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
452 
453  /* Don't allow this backend to pin more than its share of buffers. */
454  if (SmgrIsTemp(smgr))
455  LimitAdditionalLocalPins(&max_pinned_buffers);
456  else
457  LimitAdditionalPins(&max_pinned_buffers);
458  Assert(max_pinned_buffers > 0);
459 
460  /*
461  * We need one extra entry for buffers and per-buffer data, because users
462  * of per-buffer data have access to the object until the next call to
463  * read_stream_next_buffer(), so we need a gap between the head and tail
464  * of the queue so that we don't clobber it.
465  */
466  queue_size = max_pinned_buffers + 1;
467 
468  /*
469  * Allocate the object, the buffers, the ios and per_buffer_data space in
470  * one big chunk. Though we have queue_size buffers, we want to be able
471  * to assume that all the buffers for a single read are contiguous (i.e.
472  * don't wrap around halfway through), so we allow temporary overflows of
473  * up to the maximum possible read size by allocating an extra
474  * io_combine_limit - 1 elements.
475  */
476  size = offsetof(ReadStream, buffers);
477  size += sizeof(Buffer) * (queue_size + io_combine_limit - 1);
478  size += sizeof(InProgressIO) * Max(1, max_ios);
479  size += per_buffer_data_size * queue_size;
480  size += MAXIMUM_ALIGNOF * 2;
481  stream = (ReadStream *) palloc(size);
482  memset(stream, 0, offsetof(ReadStream, buffers));
483  stream->ios = (InProgressIO *)
484  MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]);
485  if (per_buffer_data_size > 0)
486  stream->per_buffer_data = (void *)
487  MAXALIGN(&stream->ios[Max(1, max_ios)]);
488 
489 #ifdef USE_PREFETCH
490 
491  /*
492  * This system supports prefetching advice. We can use it as long as
493  * direct I/O isn't enabled, the caller hasn't promised sequential access
494  * (overriding our detection heuristics), and max_ios hasn't been set to
495  * zero.
496  */
497  if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
498  (flags & READ_STREAM_SEQUENTIAL) == 0 &&
499  max_ios > 0)
500  stream->advice_enabled = true;
501 #endif
502 
503  /*
504  * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled
505  * above. If we had real asynchronous I/O we might need a slightly
506  * different definition.
507  */
508  if (max_ios == 0)
509  max_ios = 1;
510 
511  stream->max_ios = max_ios;
512  stream->per_buffer_data_size = per_buffer_data_size;
513  stream->max_pinned_buffers = max_pinned_buffers;
514  stream->queue_size = queue_size;
515  stream->callback = callback;
516  stream->callback_private_data = callback_private_data;
518 
519  /*
520  * Skip the initial ramp-up phase if the caller says we're going to be
521  * reading the whole relation. This way we start out assuming we'll be
522  * doing full io_combine_limit sized reads (behavior B).
523  */
524  if (flags & READ_STREAM_FULL)
525  stream->distance = Min(max_pinned_buffers, io_combine_limit);
526  else
527  stream->distance = 1;
528 
529  /*
530  * Since we always access the same relation, we can initialize parts of
531  * the ReadBuffersOperation objects and leave them that way, to avoid
532  * wasting CPU cycles writing to them for each read.
533  */
534  for (int i = 0; i < max_ios; ++i)
535  {
536  stream->ios[i].op.rel = rel;
537  stream->ios[i].op.smgr = smgr;
538  stream->ios[i].op.persistence = persistence;
539  stream->ios[i].op.forknum = forknum;
540  stream->ios[i].op.strategy = strategy;
541  }
542 
543  return stream;
544 }
int Buffer
Definition: buf.h:23
void LimitAdditionalPins(uint32 *additional_pins)
Definition: bufmgr.c:2116
int effective_io_concurrency
Definition: bufmgr.c:151
int io_combine_limit
Definition: bufmgr.c:165
#define Min(x, y)
Definition: c.h:958
#define MAXALIGN(LEN)
Definition: c.h:765
#define Max(x, y)
Definition: c.h:952
#define Assert(condition)
Definition: c.h:812
int16_t int16
Definition: c.h:480
uint32_t uint32
Definition: c.h:485
#define PG_INT16_MAX
Definition: c.h:540
#define OidIsValid(objectId)
Definition: c.h:729
bool IsCatalogRelation(Relation relation)
Definition: catalog.c:103
bool IsCatalogRelationOid(Oid relid)
Definition: catalog.c:120
int io_direct_flags
Definition: fd.c:167
#define IO_DIRECT_DATA
Definition: fd.h:54
int GetAccessStrategyPinLimit(BufferAccessStrategy strategy)
Definition: freelist.c:647
Oid MyDatabaseId
Definition: globals.c:93
int i
Definition: isn.c:72
void LimitAdditionalLocalPins(uint32 *additional_pins)
Definition: localbuf.c:290
void * palloc(Size size)
Definition: mcxt.c:1317
unsigned int Oid
Definition: postgres_ext.h:31
struct InProgressIO InProgressIO
#define READ_STREAM_MAINTENANCE
Definition: read_stream.h:28
#define READ_STREAM_FULL
Definition: read_stream.h:43
#define READ_STREAM_SEQUENTIAL
Definition: read_stream.h:36
static pg_noinline void Size size
Definition: slab.c:607
#define SmgrIsTemp(smgr)
Definition: smgr.h:73
int get_tablespace_io_concurrency(Oid spcid)
Definition: spccache.c:215
int get_tablespace_maintenance_io_concurrency(Oid spcid)
Definition: spccache.c:229
ReadBuffersOperation op
Definition: read_stream.c:103
ForkNumber forknum
Definition: bufmgr.h:121
BufferAccessStrategy strategy
Definition: bufmgr.h:122
struct SMgrRelationData * smgr
Definition: bufmgr.h:119
int16 distance
Definition: read_stream.c:116
int16 max_ios
Definition: read_stream.c:111
bool advice_enabled
Definition: read_stream.c:117
int16 max_pinned_buffers
Definition: read_stream.c:114
InProgressIO * ios
Definition: read_stream.c:144
BlockNumber buffered_blocknum
Definition: read_stream.c:123
int16 queue_size
Definition: read_stream.c:113
ReadStreamBlockNumberCB callback
Definition: read_stream.c:129
void * callback_private_data
Definition: read_stream.c:130
Buffer buffers[FLEXIBLE_ARRAY_MEMBER]
Definition: read_stream.c:153
RelFileLocator locator
RelFileNumber relNumber
RelFileLocatorBackend smgr_rlocator
Definition: smgr.h:37
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
Definition: test_ifaddrs.c:46

References ReadStream::advice_enabled, Assert, ReadStream::buffered_blocknum, ReadStream::buffers, ReadStream::callback, callback(), ReadStream::callback_private_data, ReadStream::distance, effective_io_concurrency, ReadBuffersOperation::forknum, get_tablespace_io_concurrency(), get_tablespace_maintenance_io_concurrency(), GetAccessStrategyPinLimit(), i, InvalidBlockNumber, io_combine_limit, IO_DIRECT_DATA, io_direct_flags, ReadStream::ios, IsCatalogRelation(), IsCatalogRelationOid(), LimitAdditionalLocalPins(), LimitAdditionalPins(), RelFileLocatorBackend::locator, Max, ReadStream::max_ios, ReadStream::max_pinned_buffers, MAXALIGN, Min, MyDatabaseId, OidIsValid, InProgressIO::op, palloc(), ReadStream::per_buffer_data, ReadStream::per_buffer_data_size, ReadBuffersOperation::persistence, PG_INT16_MAX, ReadStream::queue_size, READ_STREAM_FULL, READ_STREAM_MAINTENANCE, READ_STREAM_SEQUENTIAL, ReadBuffersOperation::rel, RelFileLocator::relNumber, size, ReadBuffersOperation::smgr, SMgrRelationData::smgr_rlocator, SmgrIsTemp, RelFileLocator::spcOid, and ReadBuffersOperation::strategy.

Referenced by read_stream_begin_relation(), and read_stream_begin_smgr_relation().

◆ read_stream_begin_relation()

ReadStream* read_stream_begin_relation ( int  flags,
BufferAccessStrategy  strategy,
Relation  rel,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void *  callback_private_data,
size_t  per_buffer_data_size 
)

Definition at line 551 of file read_stream.c.

558 {
559  return read_stream_begin_impl(flags,
560  strategy,
561  rel,
562  RelationGetSmgr(rel),
563  rel->rd_rel->relpersistence,
564  forknum,
565  callback,
566  callback_private_data,
567  per_buffer_data_size);
568 }
static ReadStream * read_stream_begin_impl(int flags, BufferAccessStrategy strategy, Relation rel, SMgrRelation smgr, char persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
Definition: read_stream.c:394
static SMgrRelation RelationGetSmgr(Relation rel)
Definition: rel.h:567
Form_pg_class rd_rel
Definition: rel.h:111

References callback(), RelationData::rd_rel, read_stream_begin_impl(), and RelationGetSmgr().

Referenced by acquire_sample_rows(), collect_corrupt_items(), collect_visibility_data(), heap_beginscan(), and pg_prewarm().

◆ read_stream_begin_smgr_relation()

ReadStream* read_stream_begin_smgr_relation ( int  flags,
BufferAccessStrategy  strategy,
SMgrRelation  smgr,
char  smgr_persistence,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void *  callback_private_data,
size_t  per_buffer_data_size 
)

Definition at line 575 of file read_stream.c.

583 {
584  return read_stream_begin_impl(flags,
585  strategy,
586  NULL,
587  smgr,
588  smgr_persistence,
589  forknum,
590  callback,
591  callback_private_data,
592  per_buffer_data_size);
593 }

References callback(), and read_stream_begin_impl().

Referenced by RelationCopyStorageUsingBuffer().

◆ read_stream_end()

void read_stream_end ( ReadStream stream)

Definition at line 846 of file read_stream.c.

847 {
848  read_stream_reset(stream);
849  pfree(stream);
850 }
void pfree(void *pointer)
Definition: mcxt.c:1521
void read_stream_reset(ReadStream *stream)
Definition: read_stream.c:820

References pfree(), and read_stream_reset().

Referenced by acquire_sample_rows(), collect_corrupt_items(), collect_visibility_data(), heap_endscan(), pg_prewarm(), and RelationCopyStorageUsingBuffer().

◆ read_stream_get_block()

static BlockNumber read_stream_get_block ( ReadStream stream,
void *  per_buffer_data 
)
inlinestatic

Definition at line 188 of file read_stream.c.

189 {
190  BlockNumber blocknum;
191 
192  blocknum = stream->buffered_blocknum;
193  if (blocknum != InvalidBlockNumber)
195  else
196  blocknum = stream->callback(stream,
197  stream->callback_private_data,
198  per_buffer_data);
199 
200  return blocknum;
201 }
uint32 BlockNumber
Definition: block.h:31

References ReadStream::buffered_blocknum, ReadStream::callback, ReadStream::callback_private_data, and InvalidBlockNumber.

Referenced by read_stream_look_ahead(), read_stream_next_block(), and read_stream_next_buffer().

◆ read_stream_look_ahead()

static void read_stream_look_ahead ( ReadStream stream,
bool  suppress_advice 
)
static

Definition at line 307 of file read_stream.c.

308 {
309  while (stream->ios_in_progress < stream->max_ios &&
310  stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
311  {
312  BlockNumber blocknum;
313  int16 buffer_index;
314  void *per_buffer_data;
315 
316  if (stream->pending_read_nblocks == io_combine_limit)
317  {
318  read_stream_start_pending_read(stream, suppress_advice);
319  suppress_advice = false;
320  continue;
321  }
322 
323  /*
324  * See which block the callback wants next in the stream. We need to
325  * compute the index of the Nth block of the pending read including
326  * wrap-around, but we don't want to use the expensive % operator.
327  */
328  buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
329  if (buffer_index >= stream->queue_size)
330  buffer_index -= stream->queue_size;
331  Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
332  per_buffer_data = get_per_buffer_data(stream, buffer_index);
333  blocknum = read_stream_get_block(stream, per_buffer_data);
334  if (blocknum == InvalidBlockNumber)
335  {
336  /* End of stream. */
337  stream->distance = 0;
338  break;
339  }
340 
341  /* Can we merge it with the pending read? */
342  if (stream->pending_read_nblocks > 0 &&
343  stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
344  {
345  stream->pending_read_nblocks++;
346  continue;
347  }
348 
349  /* We have to start the pending read before we can build another. */
350  while (stream->pending_read_nblocks > 0)
351  {
352  read_stream_start_pending_read(stream, suppress_advice);
353  suppress_advice = false;
354  if (stream->ios_in_progress == stream->max_ios)
355  {
356  /* And we've hit the limit. Rewind, and stop here. */
357  read_stream_unget_block(stream, blocknum);
358  return;
359  }
360  }
361 
362  /* This is the start of a new pending read. */
363  stream->pending_read_blocknum = blocknum;
364  stream->pending_read_nblocks = 1;
365  }
366 
367  /*
368  * We don't start the pending read just because we've hit the distance
369  * limit, preferring to give it another chance to grow to full
370  * io_combine_limit size once more buffers have been consumed. However,
371  * if we've already reached io_combine_limit, or we've reached the
372  * distance limit and there isn't anything pinned yet, or the callback has
373  * signaled end-of-stream, we start the read immediately.
374  */
375  if (stream->pending_read_nblocks > 0 &&
377  (stream->pending_read_nblocks == stream->distance &&
378  stream->pinned_buffers == 0) ||
379  stream->distance == 0) &&
380  stream->ios_in_progress < stream->max_ios)
381  read_stream_start_pending_read(stream, suppress_advice);
382 }
static void * get_per_buffer_data(ReadStream *stream, int16 buffer_index)
Definition: read_stream.c:160
static BlockNumber read_stream_get_block(ReadStream *stream, void *per_buffer_data)
Definition: read_stream.c:188
static void read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
Definition: read_stream.c:208
static void read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
Definition: read_stream.c:217
int16 ios_in_progress
Definition: read_stream.c:112
int16 pinned_buffers
Definition: read_stream.c:115
BlockNumber pending_read_blocknum
Definition: read_stream.c:136
int16 next_buffer_index
Definition: read_stream.c:152
int16 pending_read_nblocks
Definition: read_stream.c:137

References Assert, ReadStream::distance, get_per_buffer_data(), InvalidBlockNumber, io_combine_limit, ReadStream::ios_in_progress, ReadStream::max_ios, ReadStream::next_buffer_index, ReadStream::pending_read_blocknum, ReadStream::pending_read_nblocks, ReadStream::pinned_buffers, ReadStream::queue_size, read_stream_get_block(), read_stream_start_pending_read(), and read_stream_unget_block().

Referenced by read_stream_next_buffer().

◆ read_stream_next_block()

BlockNumber read_stream_next_block ( ReadStream stream,
BufferAccessStrategy strategy 
)

Definition at line 807 of file read_stream.c.

808 {
809  *strategy = stream->ios[0].op.strategy;
810  return read_stream_get_block(stream, NULL);
811 }

References ReadStream::ios, InProgressIO::op, read_stream_get_block(), and ReadBuffersOperation::strategy.

◆ read_stream_next_buffer()

Buffer read_stream_next_buffer ( ReadStream stream,
void **  per_buffer_data 
)

Definition at line 605 of file read_stream.c.

606 {
607  Buffer buffer;
608  int16 oldest_buffer_index;
609 
610 #ifndef READ_STREAM_DISABLE_FAST_PATH
611 
612  /*
613  * A fast path for all-cached scans (behavior A). This is the same as the
614  * usual algorithm, but it is specialized for no I/O and no per-buffer
615  * data, so we can skip the queue management code, stay in the same buffer
616  * slot and use singular StartReadBuffer().
617  */
618  if (likely(stream->fast_path))
619  {
620  BlockNumber next_blocknum;
621 
622  /* Fast path assumptions. */
623  Assert(stream->ios_in_progress == 0);
624  Assert(stream->pinned_buffers == 1);
625  Assert(stream->distance == 1);
626  Assert(stream->pending_read_nblocks == 0);
627  Assert(stream->per_buffer_data_size == 0);
628 
629  /* We're going to return the buffer we pinned last time. */
630  oldest_buffer_index = stream->oldest_buffer_index;
631  Assert((oldest_buffer_index + 1) % stream->queue_size ==
632  stream->next_buffer_index);
633  buffer = stream->buffers[oldest_buffer_index];
634  Assert(buffer != InvalidBuffer);
635 
636  /* Choose the next block to pin. */
637  next_blocknum = read_stream_get_block(stream, NULL);
638 
639  if (likely(next_blocknum != InvalidBlockNumber))
640  {
641  /*
642  * Pin a buffer for the next call. Same buffer entry, and
643  * arbitrary I/O entry (they're all free). We don't have to
644  * adjust pinned_buffers because we're transferring one to caller
645  * but pinning one more.
646  */
647  if (likely(!StartReadBuffer(&stream->ios[0].op,
648  &stream->buffers[oldest_buffer_index],
649  next_blocknum,
650  stream->advice_enabled ?
652  {
653  /* Fast return. */
654  return buffer;
655  }
656 
657  /* Next call must wait for I/O for the newly pinned buffer. */
658  stream->oldest_io_index = 0;
659  stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
660  stream->ios_in_progress = 1;
661  stream->ios[0].buffer_index = oldest_buffer_index;
662  stream->seq_blocknum = next_blocknum + 1;
663  }
664  else
665  {
666  /* No more blocks, end of stream. */
667  stream->distance = 0;
668  stream->oldest_buffer_index = stream->next_buffer_index;
669  stream->pinned_buffers = 0;
670  }
671 
672  stream->fast_path = false;
673  return buffer;
674  }
675 #endif
676 
677  if (unlikely(stream->pinned_buffers == 0))
678  {
679  Assert(stream->oldest_buffer_index == stream->next_buffer_index);
680 
681  /* End of stream reached? */
682  if (stream->distance == 0)
683  return InvalidBuffer;
684 
685  /*
686  * The usual order of operations is that we look ahead at the bottom
687  * of this function after potentially finishing an I/O and making
688  * space for more, but if we're just starting up we'll need to crank
689  * the handle to get started.
690  */
691  read_stream_look_ahead(stream, true);
692 
693  /* End of stream reached? */
694  if (stream->pinned_buffers == 0)
695  {
696  Assert(stream->distance == 0);
697  return InvalidBuffer;
698  }
699  }
700 
701  /* Grab the oldest pinned buffer and associated per-buffer data. */
702  Assert(stream->pinned_buffers > 0);
703  oldest_buffer_index = stream->oldest_buffer_index;
704  Assert(oldest_buffer_index >= 0 &&
705  oldest_buffer_index < stream->queue_size);
706  buffer = stream->buffers[oldest_buffer_index];
707  if (per_buffer_data)
708  *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
709 
710  Assert(BufferIsValid(buffer));
711 
712  /* Do we have to wait for an associated I/O first? */
713  if (stream->ios_in_progress > 0 &&
714  stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
715  {
716  int16 io_index = stream->oldest_io_index;
717  int16 distance;
718 
719  /* Sanity check that we still agree on the buffers. */
720  Assert(stream->ios[io_index].op.buffers ==
721  &stream->buffers[oldest_buffer_index]);
722 
723  WaitReadBuffers(&stream->ios[io_index].op);
724 
725  Assert(stream->ios_in_progress > 0);
726  stream->ios_in_progress--;
727  if (++stream->oldest_io_index == stream->max_ios)
728  stream->oldest_io_index = 0;
729 
730  if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
731  {
732  /* Distance ramps up fast (behavior C). */
733  distance = stream->distance * 2;
734  distance = Min(distance, stream->max_pinned_buffers);
735  stream->distance = distance;
736  }
737  else
738  {
739  /* No advice; move towards io_combine_limit (behavior B). */
740  if (stream->distance > io_combine_limit)
741  {
742  stream->distance--;
743  }
744  else
745  {
746  distance = stream->distance * 2;
747  distance = Min(distance, io_combine_limit);
748  distance = Min(distance, stream->max_pinned_buffers);
749  stream->distance = distance;
750  }
751  }
752  }
753 
754 #ifdef CLOBBER_FREED_MEMORY
755  /* Clobber old buffer and per-buffer data for debugging purposes. */
756  stream->buffers[oldest_buffer_index] = InvalidBuffer;
757 
758  /*
759  * The caller will get access to the per-buffer data, until the next call.
760  * We wipe the one before, which is never occupied because queue_size
761  * allowed one extra element. This will hopefully trip up client code
762  * that is holding a dangling pointer to it.
763  */
764  if (stream->per_buffer_data)
765  wipe_mem(get_per_buffer_data(stream,
766  oldest_buffer_index == 0 ?
767  stream->queue_size - 1 :
768  oldest_buffer_index - 1),
769  stream->per_buffer_data_size);
770 #endif
771 
772  /* Pin transferred to caller. */
773  Assert(stream->pinned_buffers > 0);
774  stream->pinned_buffers--;
775 
776  /* Advance oldest buffer, with wrap-around. */
777  stream->oldest_buffer_index++;
778  if (stream->oldest_buffer_index == stream->queue_size)
779  stream->oldest_buffer_index = 0;
780 
781  /* Prepare for the next call. */
782  read_stream_look_ahead(stream, false);
783 
784 #ifndef READ_STREAM_DISABLE_FAST_PATH
785  /* See if we can take the fast path for all-cached scans next time. */
786  if (stream->ios_in_progress == 0 &&
787  stream->pinned_buffers == 1 &&
788  stream->distance == 1 &&
789  stream->pending_read_nblocks == 0 &&
790  stream->per_buffer_data_size == 0)
791  {
792  stream->fast_path = true;
793  }
794 #endif
795 
796  return buffer;
797 }
#define InvalidBuffer
Definition: buf.h:25
void WaitReadBuffers(ReadBuffersOperation *operation)
Definition: bufmgr.c:1410
bool StartReadBuffer(ReadBuffersOperation *operation, Buffer *buffer, BlockNumber blocknum, int flags)
Definition: bufmgr.c:1382
#define READ_BUFFERS_ISSUE_ADVICE
Definition: bufmgr.h:113
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:351
#define likely(x)
Definition: c.h:329
#define unlikely(x)
Definition: c.h:330
static void read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
Definition: read_stream.c:307
int16 buffer_index
Definition: read_stream.c:102
Buffer * buffers
Definition: bufmgr.h:129
int16 oldest_buffer_index
Definition: read_stream.c:151
BlockNumber seq_blocknum
Definition: read_stream.c:133
int16 oldest_io_index
Definition: read_stream.c:145
int16 next_io_index
Definition: read_stream.c:146
bool fast_path
Definition: read_stream.c:148

References ReadStream::advice_enabled, Assert, InProgressIO::buffer_index, BufferIsValid(), ReadStream::buffers, ReadBuffersOperation::buffers, ReadStream::distance, ReadStream::fast_path, ReadBuffersOperation::flags, get_per_buffer_data(), InvalidBlockNumber, InvalidBuffer, io_combine_limit, ReadStream::ios, ReadStream::ios_in_progress, likely, ReadStream::max_ios, ReadStream::max_pinned_buffers, Min, ReadStream::next_buffer_index, ReadStream::next_io_index, ReadStream::oldest_buffer_index, ReadStream::oldest_io_index, InProgressIO::op, ReadStream::pending_read_nblocks, ReadStream::per_buffer_data, ReadStream::per_buffer_data_size, ReadStream::pinned_buffers, ReadStream::queue_size, READ_BUFFERS_ISSUE_ADVICE, read_stream_get_block(), read_stream_look_ahead(), ReadStream::seq_blocknum, StartReadBuffer(), unlikely, and WaitReadBuffers().

Referenced by collect_corrupt_items(), collect_visibility_data(), heap_fetch_next_buffer(), heapam_scan_analyze_next_block(), pg_prewarm(), read_stream_reset(), and RelationCopyStorageUsingBuffer().

◆ read_stream_reset()

void read_stream_reset ( ReadStream stream)

Definition at line 820 of file read_stream.c.

821 {
822  Buffer buffer;
823 
824  /* Stop looking ahead. */
825  stream->distance = 0;
826 
827  /* Forget buffered block number and fast path state. */
829  stream->fast_path = false;
830 
831  /* Unpin anything that wasn't consumed. */
832  while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
833  ReleaseBuffer(buffer);
834 
835  Assert(stream->pinned_buffers == 0);
836  Assert(stream->ios_in_progress == 0);
837 
838  /* Start off assuming data is cached. */
839  stream->distance = 1;
840 }
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4924
Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
Definition: read_stream.c:605

References Assert, ReadStream::buffered_blocknum, ReadStream::distance, ReadStream::fast_path, InvalidBlockNumber, InvalidBuffer, ReadStream::ios_in_progress, ReadStream::pinned_buffers, read_stream_next_buffer(), and ReleaseBuffer().

Referenced by heap_fetch_next_buffer(), heap_rescan(), and read_stream_end().

◆ read_stream_start_pending_read()

static void read_stream_start_pending_read ( ReadStream stream,
bool  suppress_advice 
)
static

Definition at line 217 of file read_stream.c.

218 {
219  bool need_wait;
220  int nblocks;
221  int flags;
222  int16 io_index;
223  int16 overflow;
224  int16 buffer_index;
225 
226  /* This should only be called with a pending read. */
227  Assert(stream->pending_read_nblocks > 0);
229 
230  /* We had better not exceed the pin limit by starting this read. */
231  Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
232  stream->max_pinned_buffers);
233 
234  /* We had better not be overwriting an existing pinned buffer. */
235  if (stream->pinned_buffers > 0)
236  Assert(stream->next_buffer_index != stream->oldest_buffer_index);
237  else
238  Assert(stream->next_buffer_index == stream->oldest_buffer_index);
239 
240  /*
241  * If advice hasn't been suppressed, this system supports it, and this
242  * isn't a strictly sequential pattern, then we'll issue advice.
243  */
244  if (!suppress_advice &&
245  stream->advice_enabled &&
246  stream->pending_read_blocknum != stream->seq_blocknum)
248  else
249  flags = 0;
250 
251  /* We say how many blocks we want to read, but may be smaller on return. */
252  buffer_index = stream->next_buffer_index;
253  io_index = stream->next_io_index;
254  nblocks = stream->pending_read_nblocks;
255  need_wait = StartReadBuffers(&stream->ios[io_index].op,
256  &stream->buffers[buffer_index],
257  stream->pending_read_blocknum,
258  &nblocks,
259  flags);
260  stream->pinned_buffers += nblocks;
261 
262  /* Remember whether we need to wait before returning this buffer. */
263  if (!need_wait)
264  {
265  /* Look-ahead distance decays, no I/O necessary (behavior A). */
266  if (stream->distance > 1)
267  stream->distance--;
268  }
269  else
270  {
271  /*
272  * Remember to call WaitReadBuffers() before returning head buffer.
273  * Look-ahead distance will be adjusted after waiting.
274  */
275  stream->ios[io_index].buffer_index = buffer_index;
276  if (++stream->next_io_index == stream->max_ios)
277  stream->next_io_index = 0;
278  Assert(stream->ios_in_progress < stream->max_ios);
279  stream->ios_in_progress++;
280  stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
281  }
282 
283  /*
284  * We gave a contiguous range of buffer space to StartReadBuffers(), but
285  * we want it to wrap around at queue_size. Slide overflowing buffers to
286  * the front of the array.
287  */
288  overflow = (buffer_index + nblocks) - stream->queue_size;
289  if (overflow > 0)
290  memmove(&stream->buffers[0],
291  &stream->buffers[stream->queue_size],
292  sizeof(stream->buffers[0]) * overflow);
293 
294  /* Compute location of start of next read, without using % operator. */
295  buffer_index += nblocks;
296  if (buffer_index >= stream->queue_size)
297  buffer_index -= stream->queue_size;
298  Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
299  stream->next_buffer_index = buffer_index;
300 
301  /* Adjust the pending read to cover the remaining portion, if any. */
302  stream->pending_read_blocknum += nblocks;
303  stream->pending_read_nblocks -= nblocks;
304 }
bool StartReadBuffers(ReadBuffersOperation *operation, Buffer *buffers, BlockNumber blockNum, int *nblocks, int flags)
Definition: bufmgr.c:1367
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:76

References ReadStream::advice_enabled, Assert, InProgressIO::buffer_index, ReadStream::buffers, ReadStream::distance, if(), io_combine_limit, ReadStream::ios, ReadStream::ios_in_progress, ReadStream::max_ios, ReadStream::max_pinned_buffers, ReadStream::next_buffer_index, ReadStream::next_io_index, ReadStream::oldest_buffer_index, InProgressIO::op, ReadStream::pending_read_blocknum, ReadStream::pending_read_nblocks, ReadStream::pinned_buffers, ReadStream::queue_size, READ_BUFFERS_ISSUE_ADVICE, ReadStream::seq_blocknum, and StartReadBuffers().

Referenced by read_stream_look_ahead().

◆ read_stream_unget_block()

static void read_stream_unget_block ( ReadStream stream,
BlockNumber  blocknum 
)
inlinestatic

Definition at line 208 of file read_stream.c.

209 {
210  /* We shouldn't ever unget more than one block. */
212  Assert(blocknum != InvalidBlockNumber);
213  stream->buffered_blocknum = blocknum;
214 }

References Assert, ReadStream::buffered_blocknum, and InvalidBlockNumber.

Referenced by read_stream_look_ahead().