PostgreSQL Source Code  git master
read_stream.h File Reference
#include "storage/bufmgr.h"
#include "storage/smgr.h"
Include dependency graph for read_stream.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define READ_STREAM_DEFAULT   0x00
 
#define READ_STREAM_MAINTENANCE   0x01
 
#define READ_STREAM_SEQUENTIAL   0x02
 
#define READ_STREAM_FULL   0x04
 

Typedefs

typedef struct ReadStream ReadStream
 
typedef BlockNumber(* ReadStreamBlockNumberCB) (ReadStream *stream, void *callback_private_data, void *per_buffer_data)
 

Functions

ReadStreamread_stream_begin_relation (int flags, BufferAccessStrategy strategy, Relation rel, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
Buffer read_stream_next_buffer (ReadStream *stream, void **per_buffer_data)
 
ReadStreamread_stream_begin_smgr_relation (int flags, BufferAccessStrategy strategy, SMgrRelation smgr, char smgr_persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
void read_stream_reset (ReadStream *stream)
 
void read_stream_end (ReadStream *stream)
 

Macro Definition Documentation

◆ READ_STREAM_DEFAULT

#define READ_STREAM_DEFAULT   0x00

Definition at line 21 of file read_stream.h.

◆ READ_STREAM_FULL

#define READ_STREAM_FULL   0x04

Definition at line 43 of file read_stream.h.

◆ READ_STREAM_MAINTENANCE

#define READ_STREAM_MAINTENANCE   0x01

Definition at line 28 of file read_stream.h.

◆ READ_STREAM_SEQUENTIAL

#define READ_STREAM_SEQUENTIAL   0x02

Definition at line 36 of file read_stream.h.

Typedef Documentation

◆ ReadStream

typedef struct ReadStream ReadStream

Definition at line 1 of file read_stream.h.

◆ ReadStreamBlockNumberCB

typedef BlockNumber(* ReadStreamBlockNumberCB) (ReadStream *stream, void *callback_private_data, void *per_buffer_data)

Definition at line 49 of file read_stream.h.

Function Documentation

◆ read_stream_begin_relation()

ReadStream* read_stream_begin_relation ( int  flags,
BufferAccessStrategy  strategy,
Relation  rel,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void *  callback_private_data,
size_t  per_buffer_data_size 
)

Definition at line 566 of file read_stream.c.

573 {
574  return read_stream_begin_impl(flags,
575  strategy,
576  rel,
577  RelationGetSmgr(rel),
578  rel->rd_rel->relpersistence,
579  forknum,
580  callback,
581  callback_private_data,
582  per_buffer_data_size);
583 }
static ReadStream * read_stream_begin_impl(int flags, BufferAccessStrategy strategy, Relation rel, SMgrRelation smgr, char persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
Definition: read_stream.c:410
static SMgrRelation RelationGetSmgr(Relation rel)
Definition: rel.h:567
Form_pg_class rd_rel
Definition: rel.h:111
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
Definition: test_ifaddrs.c:46

References callback(), RelationData::rd_rel, read_stream_begin_impl(), and RelationGetSmgr().

Referenced by acquire_sample_rows(), heap_beginscan(), and pg_prewarm().

◆ read_stream_begin_smgr_relation()

ReadStream* read_stream_begin_smgr_relation ( int  flags,
BufferAccessStrategy  strategy,
SMgrRelation  smgr,
char  smgr_persistence,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void *  callback_private_data,
size_t  per_buffer_data_size 
)

Definition at line 590 of file read_stream.c.

598 {
599  return read_stream_begin_impl(flags,
600  strategy,
601  NULL,
602  smgr,
603  smgr_persistence,
604  forknum,
605  callback,
606  callback_private_data,
607  per_buffer_data_size);
608 }

References callback(), and read_stream_begin_impl().

Referenced by RelationCopyStorageUsingBuffer().

◆ read_stream_end()

void read_stream_end ( ReadStream stream)

Definition at line 850 of file read_stream.c.

851 {
852  read_stream_reset(stream);
853  pfree(stream);
854 }
void pfree(void *pointer)
Definition: mcxt.c:1521
void read_stream_reset(ReadStream *stream)
Definition: read_stream.c:823

References pfree(), and read_stream_reset().

Referenced by acquire_sample_rows(), heap_endscan(), pg_prewarm(), and RelationCopyStorageUsingBuffer().

◆ read_stream_next_buffer()

Buffer read_stream_next_buffer ( ReadStream stream,
void **  per_buffer_data 
)

Definition at line 620 of file read_stream.c.

621 {
622  Buffer buffer;
623  int16 oldest_buffer_index;
624 
625 #ifndef READ_STREAM_DISABLE_FAST_PATH
626 
627  /*
628  * A fast path for all-cached scans (behavior A). This is the same as the
629  * usual algorithm, but it is specialized for no I/O and no per-buffer
630  * data, so we can skip the queue management code, stay in the same buffer
631  * slot and use singular StartReadBuffer().
632  */
633  if (likely(stream->fast_path))
634  {
635  BlockNumber next_blocknum;
636 
637  /* Fast path assumptions. */
638  Assert(stream->ios_in_progress == 0);
639  Assert(stream->pinned_buffers == 1);
640  Assert(stream->distance == 1);
641  Assert(stream->pending_read_nblocks == 0);
642  Assert(stream->per_buffer_data_size == 0);
643 
644  /* We're going to return the buffer we pinned last time. */
645  oldest_buffer_index = stream->oldest_buffer_index;
646  Assert((oldest_buffer_index + 1) % stream->queue_size ==
647  stream->next_buffer_index);
648  buffer = stream->buffers[oldest_buffer_index];
649  Assert(buffer != InvalidBuffer);
650 
651  /* Choose the next block to pin. */
652  if (unlikely(stream->blocknums_next == stream->blocknums_count))
654  next_blocknum = stream->blocknums[stream->blocknums_next++];
655 
656  if (likely(next_blocknum != InvalidBlockNumber))
657  {
658  /*
659  * Pin a buffer for the next call. Same buffer entry, and
660  * arbitrary I/O entry (they're all free). We don't have to
661  * adjust pinned_buffers because we're transferring one to caller
662  * but pinning one more.
663  */
664  if (likely(!StartReadBuffer(&stream->ios[0].op,
665  &stream->buffers[oldest_buffer_index],
666  next_blocknum,
667  stream->advice_enabled ?
669  {
670  /* Fast return. */
671  return buffer;
672  }
673 
674  /* Next call must wait for I/O for the newly pinned buffer. */
675  stream->oldest_io_index = 0;
676  stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
677  stream->ios_in_progress = 1;
678  stream->ios[0].buffer_index = oldest_buffer_index;
679  stream->seq_blocknum = next_blocknum + 1;
680  }
681  else
682  {
683  /* No more blocks, end of stream. */
684  stream->distance = 0;
685  stream->oldest_buffer_index = stream->next_buffer_index;
686  stream->pinned_buffers = 0;
687  }
688 
689  stream->fast_path = false;
690  return buffer;
691  }
692 #endif
693 
694  if (unlikely(stream->pinned_buffers == 0))
695  {
696  Assert(stream->oldest_buffer_index == stream->next_buffer_index);
697 
698  /* End of stream reached? */
699  if (stream->distance == 0)
700  return InvalidBuffer;
701 
702  /*
703  * The usual order of operations is that we look ahead at the bottom
704  * of this function after potentially finishing an I/O and making
705  * space for more, but if we're just starting up we'll need to crank
706  * the handle to get started.
707  */
708  read_stream_look_ahead(stream, true);
709 
710  /* End of stream reached? */
711  if (stream->pinned_buffers == 0)
712  {
713  Assert(stream->distance == 0);
714  return InvalidBuffer;
715  }
716  }
717 
718  /* Grab the oldest pinned buffer and associated per-buffer data. */
719  Assert(stream->pinned_buffers > 0);
720  oldest_buffer_index = stream->oldest_buffer_index;
721  Assert(oldest_buffer_index >= 0 &&
722  oldest_buffer_index < stream->queue_size);
723  buffer = stream->buffers[oldest_buffer_index];
724  if (per_buffer_data)
725  *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
726 
727  Assert(BufferIsValid(buffer));
728 
729  /* Do we have to wait for an associated I/O first? */
730  if (stream->ios_in_progress > 0 &&
731  stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
732  {
733  int16 io_index = stream->oldest_io_index;
734  int16 distance;
735 
736  /* Sanity check that we still agree on the buffers. */
737  Assert(stream->ios[io_index].op.buffers ==
738  &stream->buffers[oldest_buffer_index]);
739 
740  WaitReadBuffers(&stream->ios[io_index].op);
741 
742  Assert(stream->ios_in_progress > 0);
743  stream->ios_in_progress--;
744  if (++stream->oldest_io_index == stream->max_ios)
745  stream->oldest_io_index = 0;
746 
747  if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
748  {
749  /* Distance ramps up fast (behavior C). */
750  distance = stream->distance * 2;
751  distance = Min(distance, stream->max_pinned_buffers);
752  stream->distance = distance;
753  }
754  else
755  {
756  /* No advice; move towards io_combine_limit (behavior B). */
757  if (stream->distance > io_combine_limit)
758  {
759  stream->distance--;
760  }
761  else
762  {
763  distance = stream->distance * 2;
764  distance = Min(distance, io_combine_limit);
765  distance = Min(distance, stream->max_pinned_buffers);
766  stream->distance = distance;
767  }
768  }
769  }
770 
771 #ifdef CLOBBER_FREED_MEMORY
772  /* Clobber old buffer and per-buffer data for debugging purposes. */
773  stream->buffers[oldest_buffer_index] = InvalidBuffer;
774 
775  /*
776  * The caller will get access to the per-buffer data, until the next call.
777  * We wipe the one before, which is never occupied because queue_size
778  * allowed one extra element. This will hopefully trip up client code
779  * that is holding a dangling pointer to it.
780  */
781  if (stream->per_buffer_data)
782  wipe_mem(get_per_buffer_data(stream,
783  oldest_buffer_index == 0 ?
784  stream->queue_size - 1 :
785  oldest_buffer_index - 1),
786  stream->per_buffer_data_size);
787 #endif
788 
789  /* Pin transferred to caller. */
790  Assert(stream->pinned_buffers > 0);
791  stream->pinned_buffers--;
792 
793  /* Advance oldest buffer, with wrap-around. */
794  stream->oldest_buffer_index++;
795  if (stream->oldest_buffer_index == stream->queue_size)
796  stream->oldest_buffer_index = 0;
797 
798  /* Prepare for the next call. */
799  read_stream_look_ahead(stream, false);
800 
801 #ifndef READ_STREAM_DISABLE_FAST_PATH
802  /* See if we can take the fast path for all-cached scans next time. */
803  if (stream->ios_in_progress == 0 &&
804  stream->pinned_buffers == 1 &&
805  stream->distance == 1 &&
806  stream->pending_read_nblocks == 0 &&
807  stream->per_buffer_data_size == 0)
808  {
809  stream->fast_path = true;
810  }
811 #endif
812 
813  return buffer;
814 }
uint32 BlockNumber
Definition: block.h:31
#define InvalidBlockNumber
Definition: block.h:33
int Buffer
Definition: buf.h:23
#define InvalidBuffer
Definition: buf.h:25
void WaitReadBuffers(ReadBuffersOperation *operation)
Definition: bufmgr.c:1420
bool StartReadBuffer(ReadBuffersOperation *operation, Buffer *buffer, BlockNumber blocknum, int flags)
Definition: bufmgr.c:1392
int io_combine_limit
Definition: bufmgr.c:192
#define READ_BUFFERS_ISSUE_ADVICE
Definition: bufmgr.h:113
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:351
#define Min(x, y)
Definition: c.h:1004
#define likely(x)
Definition: c.h:310
signed short int16
Definition: c.h:493
#define Assert(condition)
Definition: c.h:858
#define unlikely(x)
Definition: c.h:311
static void * get_per_buffer_data(ReadStream *stream, int16 buffer_index)
Definition: read_stream.c:163
static void read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
Definition: read_stream.c:323
static void read_stream_fill_blocknums(ReadStream *stream)
Definition: read_stream.c:214
int16 buffer_index
Definition: read_stream.c:102
ReadBuffersOperation op
Definition: read_stream.c:103
Buffer * buffers
Definition: bufmgr.h:129
int16 distance
Definition: read_stream.c:116
int16 blocknums_next
Definition: read_stream.c:126
int16 ios_in_progress
Definition: read_stream.c:112
void * per_buffer_data
Definition: read_stream.c:144
int16 pinned_buffers
Definition: read_stream.c:115
int16 max_ios
Definition: read_stream.c:111
int16 oldest_buffer_index
Definition: read_stream.c:154
BlockNumber seq_blocknum
Definition: read_stream.c:136
bool advice_enabled
Definition: read_stream.c:117
int16 max_pinned_buffers
Definition: read_stream.c:114
InProgressIO * ios
Definition: read_stream.c:147
int16 oldest_io_index
Definition: read_stream.c:148
int16 queue_size
Definition: read_stream.c:113
int16 blocknums_count
Definition: read_stream.c:125
BlockNumber blocknums[16]
Definition: read_stream.c:124
int16 next_buffer_index
Definition: read_stream.c:155
size_t per_buffer_data_size
Definition: read_stream.c:143
int16 next_io_index
Definition: read_stream.c:149
bool fast_path
Definition: read_stream.c:151
int16 pending_read_nblocks
Definition: read_stream.c:140
Buffer buffers[FLEXIBLE_ARRAY_MEMBER]
Definition: read_stream.c:156

References ReadStream::advice_enabled, Assert, ReadStream::blocknums, ReadStream::blocknums_count, ReadStream::blocknums_next, InProgressIO::buffer_index, BufferIsValid(), ReadStream::buffers, ReadBuffersOperation::buffers, ReadStream::distance, ReadStream::fast_path, ReadBuffersOperation::flags, get_per_buffer_data(), InvalidBlockNumber, InvalidBuffer, io_combine_limit, ReadStream::ios, ReadStream::ios_in_progress, likely, ReadStream::max_ios, ReadStream::max_pinned_buffers, Min, ReadStream::next_buffer_index, ReadStream::next_io_index, ReadStream::oldest_buffer_index, ReadStream::oldest_io_index, InProgressIO::op, ReadStream::pending_read_nblocks, ReadStream::per_buffer_data, ReadStream::per_buffer_data_size, ReadStream::pinned_buffers, ReadStream::queue_size, READ_BUFFERS_ISSUE_ADVICE, read_stream_fill_blocknums(), read_stream_look_ahead(), ReadStream::seq_blocknum, StartReadBuffer(), unlikely, and WaitReadBuffers().

Referenced by heap_fetch_next_buffer(), heapam_scan_analyze_next_block(), pg_prewarm(), read_stream_reset(), and RelationCopyStorageUsingBuffer().

◆ read_stream_reset()

void read_stream_reset ( ReadStream stream)

Definition at line 823 of file read_stream.c.

824 {
825  Buffer buffer;
826 
827  /* Stop looking ahead. */
828  stream->distance = 0;
829 
830  /* Forget buffered block numbers and fast path state. */
831  stream->blocknums_next = 0;
832  stream->blocknums_count = 0;
833  stream->fast_path = false;
834 
835  /* Unpin anything that wasn't consumed. */
836  while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
837  ReleaseBuffer(buffer);
838 
839  Assert(stream->pinned_buffers == 0);
840  Assert(stream->ios_in_progress == 0);
841 
842  /* Start off assuming data is cached. */
843  stream->distance = 1;
844 }
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4936
Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
Definition: read_stream.c:620

References Assert, ReadStream::blocknums_count, ReadStream::blocknums_next, ReadStream::distance, ReadStream::fast_path, InvalidBuffer, ReadStream::ios_in_progress, ReadStream::pinned_buffers, read_stream_next_buffer(), and ReleaseBuffer().

Referenced by heap_fetch_next_buffer(), heap_rescan(), and read_stream_end().