PostgreSQL Source Code  git master
read_stream.h File Reference
#include "storage/bufmgr.h"
Include dependency graph for read_stream.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define READ_STREAM_DEFAULT   0x00
 
#define READ_STREAM_MAINTENANCE   0x01
 
#define READ_STREAM_SEQUENTIAL   0x02
 
#define READ_STREAM_FULL   0x04
 

Typedefs

typedef struct ReadStream ReadStream
 
typedef BlockNumber(* ReadStreamBlockNumberCB) (ReadStream *stream, void *callback_private_data, void *per_buffer_data)
 

Functions

ReadStreamread_stream_begin_relation (int flags, BufferAccessStrategy strategy, Relation rel, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
Buffer read_stream_next_buffer (ReadStream *stream, void **per_buffer_private)
 
void read_stream_reset (ReadStream *stream)
 
void read_stream_end (ReadStream *stream)
 

Macro Definition Documentation

◆ READ_STREAM_DEFAULT

#define READ_STREAM_DEFAULT   0x00

Definition at line 20 of file read_stream.h.

◆ READ_STREAM_FULL

#define READ_STREAM_FULL   0x04

Definition at line 42 of file read_stream.h.

◆ READ_STREAM_MAINTENANCE

#define READ_STREAM_MAINTENANCE   0x01

Definition at line 27 of file read_stream.h.

◆ READ_STREAM_SEQUENTIAL

#define READ_STREAM_SEQUENTIAL   0x02

Definition at line 35 of file read_stream.h.

Typedef Documentation

◆ ReadStream

typedef struct ReadStream ReadStream

Definition at line 1 of file read_stream.h.

◆ ReadStreamBlockNumberCB

typedef BlockNumber(* ReadStreamBlockNumberCB) (ReadStream *stream, void *callback_private_data, void *per_buffer_data)

Definition at line 48 of file read_stream.h.

Function Documentation

◆ read_stream_begin_relation()

ReadStream* read_stream_begin_relation ( int  flags,
BufferAccessStrategy  strategy,
Relation  rel,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void *  callback_private_data,
size_t  per_buffer_data_size 
)

Definition at line 410 of file read_stream.c.

417 {
418  ReadStream *stream;
419  size_t size;
420  int16 queue_size;
421  int16 max_ios;
422  int strategy_pin_limit;
423  uint32 max_pinned_buffers;
424  Oid tablespace_id;
425  SMgrRelation smgr;
426 
427  smgr = RelationGetSmgr(rel);
428 
429  /*
430  * Decide how many I/Os we will allow to run at the same time. That
431  * currently means advice to the kernel to tell it that we will soon read.
432  * This number also affects how far we look ahead for opportunities to
433  * start more I/Os.
434  */
435  tablespace_id = smgr->smgr_rlocator.locator.spcOid;
436  if (!OidIsValid(MyDatabaseId) ||
437  IsCatalogRelation(rel) ||
439  {
440  /*
441  * Avoid circularity while trying to look up tablespace settings or
442  * before spccache.c is ready.
443  */
444  max_ios = effective_io_concurrency;
445  }
446  else if (flags & READ_STREAM_MAINTENANCE)
447  max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
448  else
449  max_ios = get_tablespace_io_concurrency(tablespace_id);
450  max_ios = Min(max_ios, PG_INT16_MAX);
451 
452  /*
453  * Choose the maximum number of buffers we're prepared to pin. We try to
454  * pin fewer if we can, though. We clamp it to at least io_combine_limit
455  * so that we can have a chance to build up a full io_combine_limit sized
456  * read, even when max_ios is zero. Be careful not to allow int16 to
457  * overflow (even though that's not possible with the current GUC range
458  * limits), allowing also for the spare entry and the overflow space.
459  */
460  max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
461  max_pinned_buffers = Min(max_pinned_buffers,
463 
464  /* Give the strategy a chance to limit the number of buffers we pin. */
465  strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
466  max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
467 
468  /* Don't allow this backend to pin more than its share of buffers. */
469  if (SmgrIsTemp(smgr))
470  LimitAdditionalLocalPins(&max_pinned_buffers);
471  else
472  LimitAdditionalPins(&max_pinned_buffers);
473  Assert(max_pinned_buffers > 0);
474 
475  /*
476  * We need one extra entry for buffers and per-buffer data, because users
477  * of per-buffer data have access to the object until the next call to
478  * read_stream_next_buffer(), so we need a gap between the head and tail
479  * of the queue so that we don't clobber it.
480  */
481  queue_size = max_pinned_buffers + 1;
482 
483  /*
484  * Allocate the object, the buffers, the ios and per_data_data space in
485  * one big chunk. Though we have queue_size buffers, we want to be able
486  * to assume that all the buffers for a single read are contiguous (i.e.
487  * don't wrap around halfway through), so we allow temporary overflows of
488  * up to the maximum possible read size by allocating an extra
489  * io_combine_limit - 1 elements.
490  */
491  size = offsetof(ReadStream, buffers);
492  size += sizeof(Buffer) * (queue_size + io_combine_limit - 1);
493  size += sizeof(InProgressIO) * Max(1, max_ios);
494  size += per_buffer_data_size * queue_size;
495  size += MAXIMUM_ALIGNOF * 2;
496  stream = (ReadStream *) palloc(size);
497  memset(stream, 0, offsetof(ReadStream, buffers));
498  stream->ios = (InProgressIO *)
499  MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]);
500  if (per_buffer_data_size > 0)
501  stream->per_buffer_data = (void *)
502  MAXALIGN(&stream->ios[Max(1, max_ios)]);
503 
504 #ifdef USE_PREFETCH
505 
506  /*
507  * This system supports prefetching advice. We can use it as long as
508  * direct I/O isn't enabled, the caller hasn't promised sequential access
509  * (overriding our detection heuristics), and max_ios hasn't been set to
510  * zero.
511  */
512  if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
513  (flags & READ_STREAM_SEQUENTIAL) == 0 &&
514  max_ios > 0)
515  stream->advice_enabled = true;
516 #endif
517 
518  /*
519  * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled
520  * above. If we had real asynchronous I/O we might need a slightly
521  * different definition.
522  */
523  if (max_ios == 0)
524  max_ios = 1;
525 
526  stream->max_ios = max_ios;
527  stream->per_buffer_data_size = per_buffer_data_size;
528  stream->max_pinned_buffers = max_pinned_buffers;
529  stream->queue_size = queue_size;
530  stream->callback = callback;
531  stream->callback_private_data = callback_private_data;
532 
533  /*
534  * Skip the initial ramp-up phase if the caller says we're going to be
535  * reading the whole relation. This way we start out assuming we'll be
536  * doing full io_combine_limit sized reads (behavior B).
537  */
538  if (flags & READ_STREAM_FULL)
539  stream->distance = Min(max_pinned_buffers, io_combine_limit);
540  else
541  stream->distance = 1;
542 
543  /*
544  * Since we always access the same relation, we can initialize parts of
545  * the ReadBuffersOperation objects and leave them that way, to avoid
546  * wasting CPU cycles writing to them for each read.
547  */
548  for (int i = 0; i < max_ios; ++i)
549  {
550  stream->ios[i].op.rel = rel;
551  stream->ios[i].op.smgr = RelationGetSmgr(rel);
552  stream->ios[i].op.smgr_persistence = 0;
553  stream->ios[i].op.forknum = forknum;
554  stream->ios[i].op.strategy = strategy;
555  }
556 
557  return stream;
558 }
int Buffer
Definition: buf.h:23
void LimitAdditionalPins(uint32 *additional_pins)
Definition: bufmgr.c:2058
int effective_io_concurrency
Definition: bufmgr.c:150
int io_combine_limit
Definition: bufmgr.c:164
unsigned int uint32
Definition: c.h:506
#define Min(x, y)
Definition: c.h:1004
signed short int16
Definition: c.h:493
#define MAXALIGN(LEN)
Definition: c.h:811
#define Max(x, y)
Definition: c.h:998
#define Assert(condition)
Definition: c.h:858
#define PG_INT16_MAX
Definition: c.h:586
#define OidIsValid(objectId)
Definition: c.h:775
bool IsCatalogRelation(Relation relation)
Definition: catalog.c:103
bool IsCatalogRelationOid(Oid relid)
Definition: catalog.c:120
int io_direct_flags
Definition: fd.c:168
#define IO_DIRECT_DATA
Definition: fd.h:54
int GetAccessStrategyPinLimit(BufferAccessStrategy strategy)
Definition: freelist.c:647
Oid MyDatabaseId
Definition: globals.c:91
int i
Definition: isn.c:73
void LimitAdditionalLocalPins(uint32 *additional_pins)
Definition: localbuf.c:290
void * palloc(Size size)
Definition: mcxt.c:1316
unsigned int Oid
Definition: postgres_ext.h:31
struct InProgressIO InProgressIO
#define READ_STREAM_MAINTENANCE
Definition: read_stream.h:27
#define READ_STREAM_FULL
Definition: read_stream.h:42
#define READ_STREAM_SEQUENTIAL
Definition: read_stream.h:35
static SMgrRelation RelationGetSmgr(Relation rel)
Definition: rel.h:567
static pg_noinline void Size size
Definition: slab.c:607
#define SmgrIsTemp(smgr)
Definition: smgr.h:73
int get_tablespace_io_concurrency(Oid spcid)
Definition: spccache.c:215
int get_tablespace_maintenance_io_concurrency(Oid spcid)
Definition: spccache.c:229
ReadBuffersOperation op
Definition: read_stream.c:103
ForkNumber forknum
Definition: bufmgr.h:129
BufferAccessStrategy strategy
Definition: bufmgr.h:130
struct SMgrRelationData * smgr
Definition: bufmgr.h:127
int16 distance
Definition: read_stream.c:116
void * per_buffer_data
Definition: read_stream.c:144
int16 max_ios
Definition: read_stream.c:111
bool advice_enabled
Definition: read_stream.c:117
int16 max_pinned_buffers
Definition: read_stream.c:114
InProgressIO * ios
Definition: read_stream.c:147
int16 queue_size
Definition: read_stream.c:113
size_t per_buffer_data_size
Definition: read_stream.c:143
ReadStreamBlockNumberCB callback
Definition: read_stream.c:132
void * callback_private_data
Definition: read_stream.c:133
Buffer buffers[FLEXIBLE_ARRAY_MEMBER]
Definition: read_stream.c:156
RelFileLocator locator
RelFileNumber relNumber
RelFileLocatorBackend smgr_rlocator
Definition: smgr.h:37
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
Definition: test_ifaddrs.c:46

References ReadStream::advice_enabled, Assert, ReadStream::buffers, ReadStream::callback, callback(), ReadStream::callback_private_data, ReadStream::distance, effective_io_concurrency, ReadBuffersOperation::forknum, get_tablespace_io_concurrency(), get_tablespace_maintenance_io_concurrency(), GetAccessStrategyPinLimit(), i, io_combine_limit, IO_DIRECT_DATA, io_direct_flags, ReadStream::ios, IsCatalogRelation(), IsCatalogRelationOid(), LimitAdditionalLocalPins(), LimitAdditionalPins(), RelFileLocatorBackend::locator, Max, ReadStream::max_ios, ReadStream::max_pinned_buffers, MAXALIGN, Min, MyDatabaseId, OidIsValid, InProgressIO::op, palloc(), ReadStream::per_buffer_data, ReadStream::per_buffer_data_size, PG_INT16_MAX, ReadStream::queue_size, READ_STREAM_FULL, READ_STREAM_MAINTENANCE, READ_STREAM_SEQUENTIAL, ReadBuffersOperation::rel, RelationGetSmgr(), RelFileLocator::relNumber, size, ReadBuffersOperation::smgr, ReadBuffersOperation::smgr_persistence, SMgrRelationData::smgr_rlocator, SmgrIsTemp, RelFileLocator::spcOid, and ReadBuffersOperation::strategy.

Referenced by acquire_sample_rows(), heap_beginscan(), and pg_prewarm().

◆ read_stream_end()

void read_stream_end ( ReadStream stream)

Definition at line 800 of file read_stream.c.

801 {
802  read_stream_reset(stream);
803  pfree(stream);
804 }
void pfree(void *pointer)
Definition: mcxt.c:1520
void read_stream_reset(ReadStream *stream)
Definition: read_stream.c:773

References pfree(), and read_stream_reset().

Referenced by acquire_sample_rows(), heap_endscan(), and pg_prewarm().

◆ read_stream_next_buffer()

Buffer read_stream_next_buffer ( ReadStream stream,
void **  per_buffer_private 
)

Definition at line 570 of file read_stream.c.

571 {
572  Buffer buffer;
573  int16 oldest_buffer_index;
574 
575 #ifndef READ_STREAM_DISABLE_FAST_PATH
576 
577  /*
578  * A fast path for all-cached scans (behavior A). This is the same as the
579  * usual algorithm, but it is specialized for no I/O and no per-buffer
580  * data, so we can skip the queue management code, stay in the same buffer
581  * slot and use singular StartReadBuffer().
582  */
583  if (likely(stream->fast_path))
584  {
585  BlockNumber next_blocknum;
586 
587  /* Fast path assumptions. */
588  Assert(stream->ios_in_progress == 0);
589  Assert(stream->pinned_buffers == 1);
590  Assert(stream->distance == 1);
591  Assert(stream->pending_read_nblocks == 0);
592  Assert(stream->per_buffer_data_size == 0);
593 
594  /* We're going to return the buffer we pinned last time. */
595  oldest_buffer_index = stream->oldest_buffer_index;
596  Assert((oldest_buffer_index + 1) % stream->queue_size ==
597  stream->next_buffer_index);
598  buffer = stream->buffers[oldest_buffer_index];
599  Assert(buffer != InvalidBuffer);
600 
601  /* Choose the next block to pin. */
602  if (unlikely(stream->blocknums_next == stream->blocknums_count))
604  next_blocknum = stream->blocknums[stream->blocknums_next++];
605 
606  if (likely(next_blocknum != InvalidBlockNumber))
607  {
608  /*
609  * Pin a buffer for the next call. Same buffer entry, and
610  * arbitrary I/O entry (they're all free). We don't have to
611  * adjust pinned_buffers because we're transferring one to caller
612  * but pinning one more.
613  */
614  if (likely(!StartReadBuffer(&stream->ios[0].op,
615  &stream->buffers[oldest_buffer_index],
616  next_blocknum,
617  stream->advice_enabled ?
619  {
620  /* Fast return. */
621  return buffer;
622  }
623 
624  /* Next call must wait for I/O for the newly pinned buffer. */
625  stream->oldest_io_index = 0;
626  stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
627  stream->ios_in_progress = 1;
628  stream->ios[0].buffer_index = oldest_buffer_index;
629  stream->seq_blocknum = next_blocknum + 1;
630  }
631  else
632  {
633  /* No more blocks, end of stream. */
634  stream->distance = 0;
635  stream->oldest_buffer_index = stream->next_buffer_index;
636  stream->pinned_buffers = 0;
637  }
638 
639  stream->fast_path = false;
640  return buffer;
641  }
642 #endif
643 
644  if (unlikely(stream->pinned_buffers == 0))
645  {
646  Assert(stream->oldest_buffer_index == stream->next_buffer_index);
647 
648  /* End of stream reached? */
649  if (stream->distance == 0)
650  return InvalidBuffer;
651 
652  /*
653  * The usual order of operations is that we look ahead at the bottom
654  * of this function after potentially finishing an I/O and making
655  * space for more, but if we're just starting up we'll need to crank
656  * the handle to get started.
657  */
658  read_stream_look_ahead(stream, true);
659 
660  /* End of stream reached? */
661  if (stream->pinned_buffers == 0)
662  {
663  Assert(stream->distance == 0);
664  return InvalidBuffer;
665  }
666  }
667 
668  /* Grab the oldest pinned buffer and associated per-buffer data. */
669  Assert(stream->pinned_buffers > 0);
670  oldest_buffer_index = stream->oldest_buffer_index;
671  Assert(oldest_buffer_index >= 0 &&
672  oldest_buffer_index < stream->queue_size);
673  buffer = stream->buffers[oldest_buffer_index];
674  if (per_buffer_data)
675  *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
676 
677  Assert(BufferIsValid(buffer));
678 
679  /* Do we have to wait for an associated I/O first? */
680  if (stream->ios_in_progress > 0 &&
681  stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
682  {
683  int16 io_index = stream->oldest_io_index;
684  int16 distance;
685 
686  /* Sanity check that we still agree on the buffers. */
687  Assert(stream->ios[io_index].op.buffers ==
688  &stream->buffers[oldest_buffer_index]);
689 
690  WaitReadBuffers(&stream->ios[io_index].op);
691 
692  Assert(stream->ios_in_progress > 0);
693  stream->ios_in_progress--;
694  if (++stream->oldest_io_index == stream->max_ios)
695  stream->oldest_io_index = 0;
696 
697  if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
698  {
699  /* Distance ramps up fast (behavior C). */
700  distance = stream->distance * 2;
701  distance = Min(distance, stream->max_pinned_buffers);
702  stream->distance = distance;
703  }
704  else
705  {
706  /* No advice; move towards io_combine_limit (behavior B). */
707  if (stream->distance > io_combine_limit)
708  {
709  stream->distance--;
710  }
711  else
712  {
713  distance = stream->distance * 2;
714  distance = Min(distance, io_combine_limit);
715  distance = Min(distance, stream->max_pinned_buffers);
716  stream->distance = distance;
717  }
718  }
719  }
720 
721 #ifdef CLOBBER_FREED_MEMORY
722  /* Clobber old buffer and per-buffer data for debugging purposes. */
723  stream->buffers[oldest_buffer_index] = InvalidBuffer;
724 
725  /*
726  * The caller will get access to the per-buffer data, until the next call.
727  * We wipe the one before, which is never occupied because queue_size
728  * allowed one extra element. This will hopefully trip up client code
729  * that is holding a dangling pointer to it.
730  */
731  if (stream->per_buffer_data)
732  wipe_mem(get_per_buffer_data(stream,
733  oldest_buffer_index == 0 ?
734  stream->queue_size - 1 :
735  oldest_buffer_index - 1),
736  stream->per_buffer_data_size);
737 #endif
738 
739  /* Pin transferred to caller. */
740  Assert(stream->pinned_buffers > 0);
741  stream->pinned_buffers--;
742 
743  /* Advance oldest buffer, with wrap-around. */
744  stream->oldest_buffer_index++;
745  if (stream->oldest_buffer_index == stream->queue_size)
746  stream->oldest_buffer_index = 0;
747 
748  /* Prepare for the next call. */
749  read_stream_look_ahead(stream, false);
750 
751 #ifndef READ_STREAM_DISABLE_FAST_PATH
752  /* See if we can take the fast path for all-cached scans next time. */
753  if (stream->ios_in_progress == 0 &&
754  stream->pinned_buffers == 1 &&
755  stream->distance == 1 &&
756  stream->pending_read_nblocks == 0 &&
757  stream->per_buffer_data_size == 0)
758  {
759  stream->fast_path = true;
760  }
761 #endif
762 
763  return buffer;
764 }
uint32 BlockNumber
Definition: block.h:31
#define InvalidBlockNumber
Definition: block.h:33
#define InvalidBuffer
Definition: buf.h:25
void WaitReadBuffers(ReadBuffersOperation *operation)
Definition: bufmgr.c:1349
bool StartReadBuffer(ReadBuffersOperation *operation, Buffer *buffer, BlockNumber blocknum, int flags)
Definition: bufmgr.c:1321
@ READ_BUFFERS_ISSUE_ADVICE
Definition: bufmgr.h:116
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:359
#define likely(x)
Definition: c.h:310
#define unlikely(x)
Definition: c.h:311
static void * get_per_buffer_data(ReadStream *stream, int16 buffer_index)
Definition: read_stream.c:163
static void read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
Definition: read_stream.c:323
static void read_stream_fill_blocknums(ReadStream *stream)
Definition: read_stream.c:214
int16 buffer_index
Definition: read_stream.c:102
Buffer * buffers
Definition: bufmgr.h:137
int16 blocknums_next
Definition: read_stream.c:126
int16 ios_in_progress
Definition: read_stream.c:112
int16 pinned_buffers
Definition: read_stream.c:115
int16 oldest_buffer_index
Definition: read_stream.c:154
BlockNumber seq_blocknum
Definition: read_stream.c:136
int16 oldest_io_index
Definition: read_stream.c:148
int16 blocknums_count
Definition: read_stream.c:125
BlockNumber blocknums[16]
Definition: read_stream.c:124
int16 next_buffer_index
Definition: read_stream.c:155
int16 next_io_index
Definition: read_stream.c:149
bool fast_path
Definition: read_stream.c:151
int16 pending_read_nblocks
Definition: read_stream.c:140

References ReadStream::advice_enabled, Assert, ReadStream::blocknums, ReadStream::blocknums_count, ReadStream::blocknums_next, InProgressIO::buffer_index, BufferIsValid(), ReadStream::buffers, ReadBuffersOperation::buffers, ReadStream::distance, ReadStream::fast_path, ReadBuffersOperation::flags, get_per_buffer_data(), InvalidBlockNumber, InvalidBuffer, io_combine_limit, ReadStream::ios, ReadStream::ios_in_progress, likely, ReadStream::max_ios, ReadStream::max_pinned_buffers, Min, ReadStream::next_buffer_index, ReadStream::next_io_index, ReadStream::oldest_buffer_index, ReadStream::oldest_io_index, InProgressIO::op, ReadStream::pending_read_nblocks, ReadStream::per_buffer_data, ReadStream::per_buffer_data_size, ReadStream::pinned_buffers, ReadStream::queue_size, READ_BUFFERS_ISSUE_ADVICE, read_stream_fill_blocknums(), read_stream_look_ahead(), ReadStream::seq_blocknum, StartReadBuffer(), unlikely, and WaitReadBuffers().

Referenced by heap_fetch_next_buffer(), heapam_scan_analyze_next_block(), pg_prewarm(), and read_stream_reset().

◆ read_stream_reset()

void read_stream_reset ( ReadStream stream)

Definition at line 773 of file read_stream.c.

774 {
775  Buffer buffer;
776 
777  /* Stop looking ahead. */
778  stream->distance = 0;
779 
780  /* Forget buffered block numbers and fast path state. */
781  stream->blocknums_next = 0;
782  stream->blocknums_count = 0;
783  stream->fast_path = false;
784 
785  /* Unpin anything that wasn't consumed. */
786  while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
787  ReleaseBuffer(buffer);
788 
789  Assert(stream->pinned_buffers == 0);
790  Assert(stream->ios_in_progress == 0);
791 
792  /* Start off assuming data is cached. */
793  stream->distance = 1;
794 }
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4850
Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
Definition: read_stream.c:570

References Assert, ReadStream::blocknums_count, ReadStream::blocknums_next, ReadStream::distance, ReadStream::fast_path, InvalidBuffer, ReadStream::ios_in_progress, ReadStream::pinned_buffers, read_stream_next_buffer(), and ReleaseBuffer().

Referenced by heap_fetch_next_buffer(), heap_rescan(), and read_stream_end().