PostgreSQL Source Code  git master
read_stream.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * read_stream.c
4  * Mechanism for accessing buffered relation data with look-ahead
5  *
6  * Code that needs to access relation data typically pins blocks one at a
7  * time, often in a predictable order that might be sequential or data-driven.
8  * Calling the simple ReadBuffer() function for each block is inefficient,
9  * because blocks that are not yet in the buffer pool require I/O operations
10  * that are small and might stall waiting for storage. This mechanism looks
11  * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
12  * neighboring blocks together and ahead of time, with an adaptive look-ahead
13  * distance.
14  *
15  * A user-provided callback generates a stream of block numbers that is used
16  * to form reads of up to io_combine_limit, by attempting to merge them with a
17  * pending read. When that isn't possible, the existing pending read is sent
18  * to StartReadBuffers() so that a new one can begin to form.
19  *
20  * The algorithm for controlling the look-ahead distance tries to classify the
21  * stream into three ideal behaviors:
22  *
23  * A) No I/O is necessary, because the requested blocks are fully cached
24  * already. There is no benefit to looking ahead more than one block, so
25  * distance is 1. This is the default initial assumption.
26  *
27  * B) I/O is necessary, but read-ahead advice is undesirable because the
28  * access is sequential and we can rely on the kernel's read-ahead heuristics,
29  * or impossible because direct I/O is enabled, or the system doesn't support
30  * read-ahead advice. There is no benefit in looking ahead more than
31  * io_combine_limit, because in this case the only goal is larger read system
32  * calls. Looking further ahead would pin many buffers and perform
33  * speculative work for no benefit.
34  *
35  * C) I/O is necessary, it appears to be random, and this system supports
36  * read-ahead advice. We'll look further ahead in order to reach the
37  * configured level of I/O concurrency.
38  *
39  * The distance increases rapidly and decays slowly, so that it moves towards
40  * those levels as different I/O patterns are discovered. For example, a
41  * sequential scan of fully cached data doesn't bother looking ahead, but a
42  * sequential scan that hits a region of uncached blocks will start issuing
43  * increasingly wide read calls until it plateaus at io_combine_limit.
44  *
45  * The main data structure is a circular queue of buffers of size
46  * max_pinned_buffers plus some extra space for technical reasons, ready to be
47  * returned by read_stream_next_buffer(). Each buffer also has an optional
48  * variable sized object that is passed from the callback to the consumer of
49  * buffers.
50  *
51  * Parallel to the queue of buffers, there is a circular queue of in-progress
52  * I/Os that have been started with StartReadBuffers(), and for which
53  * WaitReadBuffers() must be called before returning the buffer.
54  *
55  * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
56  * successive calls, then these data structures might appear as follows:
57  *
58  * buffers buf/data ios
59  *
60  * +----+ +-----+ +--------+
61  * | | | | +----+ 42..44 | <- oldest_io_index
62  * +----+ +-----+ | +--------+
63  * oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 |
64  * +----+ +-----+ | | +--------+
65  * | 42 | | ? |<-+ | | | <- next_io_index
66  * +----+ +-----+ | +--------+
67  * | 43 | | ? | | | |
68  * +----+ +-----+ | +--------+
69  * | 44 | | ? | | | |
70  * +----+ +-----+ | +--------+
71  * | 60 | | ? |<---+
72  * +----+ +-----+
73  * next_buffer_index -> | | | |
74  * +----+ +-----+
75  *
76  * In the example, 5 buffers are pinned, and the next buffer to be streamed to
77  * the client is block 10. Block 10 was a hit and has no associated I/O, but
78  * the range 42..44 requires an I/O wait before its buffers are returned, as
79  * does block 60.
80  *
81  *
82  * Portions Copyright (c) 2024, PostgreSQL Global Development Group
83  * Portions Copyright (c) 1994, Regents of the University of California
84  *
85  * IDENTIFICATION
86  * src/backend/storage/aio/read_stream.c
87  *
88  *-------------------------------------------------------------------------
89  */
90 #include "postgres.h"
91 
92 #include "catalog/pg_tablespace.h"
93 #include "miscadmin.h"
94 #include "storage/fd.h"
95 #include "storage/smgr.h"
96 #include "storage/read_stream.h"
97 #include "utils/memdebug.h"
98 #include "utils/rel.h"
99 #include "utils/spccache.h"
100 
101 typedef struct InProgressIO
102 {
106 
107 /*
108  * State for managing a stream of reads.
109  */
111 {
119 
120  /*
121  * One-block buffer to support 'ungetting' a block number, to resolve flow
122  * control problems when I/Os are split.
123  */
125 
126  /*
127  * The callback that will tell us which block numbers to read, and an
128  * opaque pointer that will be pass to it for its own purposes.
129  */
132 
133  /* Next expected block, for detecting sequential access. */
135 
136  /* The read operation we are currently preparing. */
139 
140  /* Space for buffers and optional per-buffer private data. */
143 
144  /* Read operations that have been started but not waited for yet. */
148 
149  bool fast_path;
150 
151  /* Circular queue of buffers. */
152  int16 oldest_buffer_index; /* Next pinned buffer to return */
153  int16 next_buffer_index; /* Index of next buffer to pin */
155 };
156 
157 /*
158  * Return a pointer to the per-buffer data by index.
159  */
160 static inline void *
161 get_per_buffer_data(ReadStream *stream, int16 buffer_index)
162 {
163  return (char *) stream->per_buffer_data +
164  stream->per_buffer_data_size * buffer_index;
165 }
166 
167 /*
168  * General-use ReadStreamBlockNumberCB for block range scans. Loops over the
169  * blocks [current_blocknum, last_exclusive).
170  */
173  void *callback_private_data,
174  void *per_buffer_data)
175 {
176  BlockRangeReadStreamPrivate *p = callback_private_data;
177 
178  if (p->current_blocknum < p->last_exclusive)
179  return p->current_blocknum++;
180 
181  return InvalidBlockNumber;
182 }
183 
184 /*
185  * Ask the callback which block it would like us to read next, with a one block
186  * buffer in front to allow read_stream_unget_block() to work.
187  */
188 static inline BlockNumber
189 read_stream_get_block(ReadStream *stream, void *per_buffer_data)
190 {
191  BlockNumber blocknum;
192 
193  blocknum = stream->buffered_blocknum;
194  if (blocknum != InvalidBlockNumber)
196  else
197  blocknum = stream->callback(stream,
198  stream->callback_private_data,
199  per_buffer_data);
200 
201  return blocknum;
202 }
203 
204 /*
205  * In order to deal with short reads in StartReadBuffers(), we sometimes need
206  * to defer handling of a block until later.
207  */
208 static inline void
210 {
211  /* We shouldn't ever unget more than one block. */
213  Assert(blocknum != InvalidBlockNumber);
214  stream->buffered_blocknum = blocknum;
215 }
216 
217 static void
218 read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
219 {
220  bool need_wait;
221  int nblocks;
222  int flags;
223  int16 io_index;
224  int16 overflow;
225  int16 buffer_index;
226 
227  /* This should only be called with a pending read. */
228  Assert(stream->pending_read_nblocks > 0);
230 
231  /* We had better not exceed the pin limit by starting this read. */
232  Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
233  stream->max_pinned_buffers);
234 
235  /* We had better not be overwriting an existing pinned buffer. */
236  if (stream->pinned_buffers > 0)
237  Assert(stream->next_buffer_index != stream->oldest_buffer_index);
238  else
239  Assert(stream->next_buffer_index == stream->oldest_buffer_index);
240 
241  /*
242  * If advice hasn't been suppressed, this system supports it, and this
243  * isn't a strictly sequential pattern, then we'll issue advice.
244  */
245  if (!suppress_advice &&
246  stream->advice_enabled &&
247  stream->pending_read_blocknum != stream->seq_blocknum)
249  else
250  flags = 0;
251 
252  /* We say how many blocks we want to read, but may be smaller on return. */
253  buffer_index = stream->next_buffer_index;
254  io_index = stream->next_io_index;
255  nblocks = stream->pending_read_nblocks;
256  need_wait = StartReadBuffers(&stream->ios[io_index].op,
257  &stream->buffers[buffer_index],
258  stream->pending_read_blocknum,
259  &nblocks,
260  flags);
261  stream->pinned_buffers += nblocks;
262 
263  /* Remember whether we need to wait before returning this buffer. */
264  if (!need_wait)
265  {
266  /* Look-ahead distance decays, no I/O necessary (behavior A). */
267  if (stream->distance > 1)
268  stream->distance--;
269  }
270  else
271  {
272  /*
273  * Remember to call WaitReadBuffers() before returning head buffer.
274  * Look-ahead distance will be adjusted after waiting.
275  */
276  stream->ios[io_index].buffer_index = buffer_index;
277  if (++stream->next_io_index == stream->max_ios)
278  stream->next_io_index = 0;
279  Assert(stream->ios_in_progress < stream->max_ios);
280  stream->ios_in_progress++;
281  stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
282  }
283 
284  /*
285  * We gave a contiguous range of buffer space to StartReadBuffers(), but
286  * we want it to wrap around at queue_size. Slide overflowing buffers to
287  * the front of the array.
288  */
289  overflow = (buffer_index + nblocks) - stream->queue_size;
290  if (overflow > 0)
291  memmove(&stream->buffers[0],
292  &stream->buffers[stream->queue_size],
293  sizeof(stream->buffers[0]) * overflow);
294 
295  /* Compute location of start of next read, without using % operator. */
296  buffer_index += nblocks;
297  if (buffer_index >= stream->queue_size)
298  buffer_index -= stream->queue_size;
299  Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
300  stream->next_buffer_index = buffer_index;
301 
302  /* Adjust the pending read to cover the remaining portion, if any. */
303  stream->pending_read_blocknum += nblocks;
304  stream->pending_read_nblocks -= nblocks;
305 }
306 
307 static void
308 read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
309 {
310  while (stream->ios_in_progress < stream->max_ios &&
311  stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
312  {
313  BlockNumber blocknum;
314  int16 buffer_index;
315  void *per_buffer_data;
316 
317  if (stream->pending_read_nblocks == io_combine_limit)
318  {
319  read_stream_start_pending_read(stream, suppress_advice);
320  suppress_advice = false;
321  continue;
322  }
323 
324  /*
325  * See which block the callback wants next in the stream. We need to
326  * compute the index of the Nth block of the pending read including
327  * wrap-around, but we don't want to use the expensive % operator.
328  */
329  buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
330  if (buffer_index >= stream->queue_size)
331  buffer_index -= stream->queue_size;
332  Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
333  per_buffer_data = get_per_buffer_data(stream, buffer_index);
334  blocknum = read_stream_get_block(stream, per_buffer_data);
335  if (blocknum == InvalidBlockNumber)
336  {
337  /* End of stream. */
338  stream->distance = 0;
339  break;
340  }
341 
342  /* Can we merge it with the pending read? */
343  if (stream->pending_read_nblocks > 0 &&
344  stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
345  {
346  stream->pending_read_nblocks++;
347  continue;
348  }
349 
350  /* We have to start the pending read before we can build another. */
351  while (stream->pending_read_nblocks > 0)
352  {
353  read_stream_start_pending_read(stream, suppress_advice);
354  suppress_advice = false;
355  if (stream->ios_in_progress == stream->max_ios)
356  {
357  /* And we've hit the limit. Rewind, and stop here. */
358  read_stream_unget_block(stream, blocknum);
359  return;
360  }
361  }
362 
363  /* This is the start of a new pending read. */
364  stream->pending_read_blocknum = blocknum;
365  stream->pending_read_nblocks = 1;
366  }
367 
368  /*
369  * We don't start the pending read just because we've hit the distance
370  * limit, preferring to give it another chance to grow to full
371  * io_combine_limit size once more buffers have been consumed. However,
372  * if we've already reached io_combine_limit, or we've reached the
373  * distance limit and there isn't anything pinned yet, or the callback has
374  * signaled end-of-stream, we start the read immediately.
375  */
376  if (stream->pending_read_nblocks > 0 &&
378  (stream->pending_read_nblocks == stream->distance &&
379  stream->pinned_buffers == 0) ||
380  stream->distance == 0) &&
381  stream->ios_in_progress < stream->max_ios)
382  read_stream_start_pending_read(stream, suppress_advice);
383 }
384 
385 /*
386  * Create a new read stream object that can be used to perform the equivalent
387  * of a series of ReadBuffer() calls for one fork of one relation.
388  * Internally, it generates larger vectored reads where possible by looking
389  * ahead. The callback should return block numbers or InvalidBlockNumber to
390  * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
391  * write extra data for each block into the space provided to it. It will
392  * also receive callback_private_data for its own purposes.
393  */
394 static ReadStream *
396  BufferAccessStrategy strategy,
397  Relation rel,
398  SMgrRelation smgr,
399  char persistence,
400  ForkNumber forknum,
402  void *callback_private_data,
403  size_t per_buffer_data_size)
404 {
405  ReadStream *stream;
406  size_t size;
407  int16 queue_size;
408  int max_ios;
409  int strategy_pin_limit;
410  uint32 max_pinned_buffers;
411  Oid tablespace_id;
412 
413  /*
414  * Decide how many I/Os we will allow to run at the same time. That
415  * currently means advice to the kernel to tell it that we will soon read.
416  * This number also affects how far we look ahead for opportunities to
417  * start more I/Os.
418  */
419  tablespace_id = smgr->smgr_rlocator.locator.spcOid;
420  if (!OidIsValid(MyDatabaseId) ||
421  (rel && IsCatalogRelation(rel)) ||
423  {
424  /*
425  * Avoid circularity while trying to look up tablespace settings or
426  * before spccache.c is ready.
427  */
428  max_ios = effective_io_concurrency;
429  }
430  else if (flags & READ_STREAM_MAINTENANCE)
431  max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
432  else
433  max_ios = get_tablespace_io_concurrency(tablespace_id);
434 
435  /* Cap to INT16_MAX to avoid overflowing below */
436  max_ios = Min(max_ios, PG_INT16_MAX);
437 
438  /*
439  * Choose the maximum number of buffers we're prepared to pin. We try to
440  * pin fewer if we can, though. We clamp it to at least io_combine_limit
441  * so that we can have a chance to build up a full io_combine_limit sized
442  * read, even when max_ios is zero. Be careful not to allow int16 to
443  * overflow (even though that's not possible with the current GUC range
444  * limits), allowing also for the spare entry and the overflow space.
445  */
446  max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
447  max_pinned_buffers = Min(max_pinned_buffers,
449 
450  /* Give the strategy a chance to limit the number of buffers we pin. */
451  strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
452  max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
453 
454  /* Don't allow this backend to pin more than its share of buffers. */
455  if (SmgrIsTemp(smgr))
456  LimitAdditionalLocalPins(&max_pinned_buffers);
457  else
458  LimitAdditionalPins(&max_pinned_buffers);
459  Assert(max_pinned_buffers > 0);
460 
461  /*
462  * We need one extra entry for buffers and per-buffer data, because users
463  * of per-buffer data have access to the object until the next call to
464  * read_stream_next_buffer(), so we need a gap between the head and tail
465  * of the queue so that we don't clobber it.
466  */
467  queue_size = max_pinned_buffers + 1;
468 
469  /*
470  * Allocate the object, the buffers, the ios and per_buffer_data space in
471  * one big chunk. Though we have queue_size buffers, we want to be able
472  * to assume that all the buffers for a single read are contiguous (i.e.
473  * don't wrap around halfway through), so we allow temporary overflows of
474  * up to the maximum possible read size by allocating an extra
475  * io_combine_limit - 1 elements.
476  */
477  size = offsetof(ReadStream, buffers);
478  size += sizeof(Buffer) * (queue_size + io_combine_limit - 1);
479  size += sizeof(InProgressIO) * Max(1, max_ios);
480  size += per_buffer_data_size * queue_size;
481  size += MAXIMUM_ALIGNOF * 2;
482  stream = (ReadStream *) palloc(size);
483  memset(stream, 0, offsetof(ReadStream, buffers));
484  stream->ios = (InProgressIO *)
485  MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]);
486  if (per_buffer_data_size > 0)
487  stream->per_buffer_data = (void *)
488  MAXALIGN(&stream->ios[Max(1, max_ios)]);
489 
490 #ifdef USE_PREFETCH
491 
492  /*
493  * This system supports prefetching advice. We can use it as long as
494  * direct I/O isn't enabled, the caller hasn't promised sequential access
495  * (overriding our detection heuristics), and max_ios hasn't been set to
496  * zero.
497  */
498  if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
499  (flags & READ_STREAM_SEQUENTIAL) == 0 &&
500  max_ios > 0)
501  stream->advice_enabled = true;
502 #endif
503 
504  /*
505  * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled
506  * above. If we had real asynchronous I/O we might need a slightly
507  * different definition.
508  */
509  if (max_ios == 0)
510  max_ios = 1;
511 
512  stream->max_ios = max_ios;
513  stream->per_buffer_data_size = per_buffer_data_size;
514  stream->max_pinned_buffers = max_pinned_buffers;
515  stream->queue_size = queue_size;
516  stream->callback = callback;
517  stream->callback_private_data = callback_private_data;
519 
520  /*
521  * Skip the initial ramp-up phase if the caller says we're going to be
522  * reading the whole relation. This way we start out assuming we'll be
523  * doing full io_combine_limit sized reads (behavior B).
524  */
525  if (flags & READ_STREAM_FULL)
526  stream->distance = Min(max_pinned_buffers, io_combine_limit);
527  else
528  stream->distance = 1;
529 
530  /*
531  * Since we always access the same relation, we can initialize parts of
532  * the ReadBuffersOperation objects and leave them that way, to avoid
533  * wasting CPU cycles writing to them for each read.
534  */
535  for (int i = 0; i < max_ios; ++i)
536  {
537  stream->ios[i].op.rel = rel;
538  stream->ios[i].op.smgr = smgr;
539  stream->ios[i].op.persistence = persistence;
540  stream->ios[i].op.forknum = forknum;
541  stream->ios[i].op.strategy = strategy;
542  }
543 
544  return stream;
545 }
546 
547 /*
548  * Create a new read stream for reading a relation.
549  * See read_stream_begin_impl() for the detailed explanation.
550  */
551 ReadStream *
553  BufferAccessStrategy strategy,
554  Relation rel,
555  ForkNumber forknum,
557  void *callback_private_data,
558  size_t per_buffer_data_size)
559 {
560  return read_stream_begin_impl(flags,
561  strategy,
562  rel,
563  RelationGetSmgr(rel),
564  rel->rd_rel->relpersistence,
565  forknum,
566  callback,
567  callback_private_data,
568  per_buffer_data_size);
569 }
570 
571 /*
572  * Create a new read stream for reading a SMgr relation.
573  * See read_stream_begin_impl() for the detailed explanation.
574  */
575 ReadStream *
577  BufferAccessStrategy strategy,
578  SMgrRelation smgr,
579  char smgr_persistence,
580  ForkNumber forknum,
582  void *callback_private_data,
583  size_t per_buffer_data_size)
584 {
585  return read_stream_begin_impl(flags,
586  strategy,
587  NULL,
588  smgr,
589  smgr_persistence,
590  forknum,
591  callback,
592  callback_private_data,
593  per_buffer_data_size);
594 }
595 
596 /*
597  * Pull one pinned buffer out of a stream. Each call returns successive
598  * blocks in the order specified by the callback. If per_buffer_data_size was
599  * set to a non-zero size, *per_buffer_data receives a pointer to the extra
600  * per-buffer data that the callback had a chance to populate, which remains
601  * valid until the next call to read_stream_next_buffer(). When the stream
602  * runs out of data, InvalidBuffer is returned. The caller may decide to end
603  * the stream early at any time by calling read_stream_end().
604  */
605 Buffer
606 read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
607 {
608  Buffer buffer;
609  int16 oldest_buffer_index;
610 
611 #ifndef READ_STREAM_DISABLE_FAST_PATH
612 
613  /*
614  * A fast path for all-cached scans (behavior A). This is the same as the
615  * usual algorithm, but it is specialized for no I/O and no per-buffer
616  * data, so we can skip the queue management code, stay in the same buffer
617  * slot and use singular StartReadBuffer().
618  */
619  if (likely(stream->fast_path))
620  {
621  BlockNumber next_blocknum;
622 
623  /* Fast path assumptions. */
624  Assert(stream->ios_in_progress == 0);
625  Assert(stream->pinned_buffers == 1);
626  Assert(stream->distance == 1);
627  Assert(stream->pending_read_nblocks == 0);
628  Assert(stream->per_buffer_data_size == 0);
629 
630  /* We're going to return the buffer we pinned last time. */
631  oldest_buffer_index = stream->oldest_buffer_index;
632  Assert((oldest_buffer_index + 1) % stream->queue_size ==
633  stream->next_buffer_index);
634  buffer = stream->buffers[oldest_buffer_index];
635  Assert(buffer != InvalidBuffer);
636 
637  /* Choose the next block to pin. */
638  next_blocknum = read_stream_get_block(stream, NULL);
639 
640  if (likely(next_blocknum != InvalidBlockNumber))
641  {
642  /*
643  * Pin a buffer for the next call. Same buffer entry, and
644  * arbitrary I/O entry (they're all free). We don't have to
645  * adjust pinned_buffers because we're transferring one to caller
646  * but pinning one more.
647  */
648  if (likely(!StartReadBuffer(&stream->ios[0].op,
649  &stream->buffers[oldest_buffer_index],
650  next_blocknum,
651  stream->advice_enabled ?
653  {
654  /* Fast return. */
655  return buffer;
656  }
657 
658  /* Next call must wait for I/O for the newly pinned buffer. */
659  stream->oldest_io_index = 0;
660  stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
661  stream->ios_in_progress = 1;
662  stream->ios[0].buffer_index = oldest_buffer_index;
663  stream->seq_blocknum = next_blocknum + 1;
664  }
665  else
666  {
667  /* No more blocks, end of stream. */
668  stream->distance = 0;
669  stream->oldest_buffer_index = stream->next_buffer_index;
670  stream->pinned_buffers = 0;
671  }
672 
673  stream->fast_path = false;
674  return buffer;
675  }
676 #endif
677 
678  if (unlikely(stream->pinned_buffers == 0))
679  {
680  Assert(stream->oldest_buffer_index == stream->next_buffer_index);
681 
682  /* End of stream reached? */
683  if (stream->distance == 0)
684  return InvalidBuffer;
685 
686  /*
687  * The usual order of operations is that we look ahead at the bottom
688  * of this function after potentially finishing an I/O and making
689  * space for more, but if we're just starting up we'll need to crank
690  * the handle to get started.
691  */
692  read_stream_look_ahead(stream, true);
693 
694  /* End of stream reached? */
695  if (stream->pinned_buffers == 0)
696  {
697  Assert(stream->distance == 0);
698  return InvalidBuffer;
699  }
700  }
701 
702  /* Grab the oldest pinned buffer and associated per-buffer data. */
703  Assert(stream->pinned_buffers > 0);
704  oldest_buffer_index = stream->oldest_buffer_index;
705  Assert(oldest_buffer_index >= 0 &&
706  oldest_buffer_index < stream->queue_size);
707  buffer = stream->buffers[oldest_buffer_index];
708  if (per_buffer_data)
709  *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
710 
711  Assert(BufferIsValid(buffer));
712 
713  /* Do we have to wait for an associated I/O first? */
714  if (stream->ios_in_progress > 0 &&
715  stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
716  {
717  int16 io_index = stream->oldest_io_index;
718  int16 distance;
719 
720  /* Sanity check that we still agree on the buffers. */
721  Assert(stream->ios[io_index].op.buffers ==
722  &stream->buffers[oldest_buffer_index]);
723 
724  WaitReadBuffers(&stream->ios[io_index].op);
725 
726  Assert(stream->ios_in_progress > 0);
727  stream->ios_in_progress--;
728  if (++stream->oldest_io_index == stream->max_ios)
729  stream->oldest_io_index = 0;
730 
731  if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
732  {
733  /* Distance ramps up fast (behavior C). */
734  distance = stream->distance * 2;
735  distance = Min(distance, stream->max_pinned_buffers);
736  stream->distance = distance;
737  }
738  else
739  {
740  /* No advice; move towards io_combine_limit (behavior B). */
741  if (stream->distance > io_combine_limit)
742  {
743  stream->distance--;
744  }
745  else
746  {
747  distance = stream->distance * 2;
748  distance = Min(distance, io_combine_limit);
749  distance = Min(distance, stream->max_pinned_buffers);
750  stream->distance = distance;
751  }
752  }
753  }
754 
755 #ifdef CLOBBER_FREED_MEMORY
756  /* Clobber old buffer and per-buffer data for debugging purposes. */
757  stream->buffers[oldest_buffer_index] = InvalidBuffer;
758 
759  /*
760  * The caller will get access to the per-buffer data, until the next call.
761  * We wipe the one before, which is never occupied because queue_size
762  * allowed one extra element. This will hopefully trip up client code
763  * that is holding a dangling pointer to it.
764  */
765  if (stream->per_buffer_data)
766  wipe_mem(get_per_buffer_data(stream,
767  oldest_buffer_index == 0 ?
768  stream->queue_size - 1 :
769  oldest_buffer_index - 1),
770  stream->per_buffer_data_size);
771 #endif
772 
773  /* Pin transferred to caller. */
774  Assert(stream->pinned_buffers > 0);
775  stream->pinned_buffers--;
776 
777  /* Advance oldest buffer, with wrap-around. */
778  stream->oldest_buffer_index++;
779  if (stream->oldest_buffer_index == stream->queue_size)
780  stream->oldest_buffer_index = 0;
781 
782  /* Prepare for the next call. */
783  read_stream_look_ahead(stream, false);
784 
785 #ifndef READ_STREAM_DISABLE_FAST_PATH
786  /* See if we can take the fast path for all-cached scans next time. */
787  if (stream->ios_in_progress == 0 &&
788  stream->pinned_buffers == 1 &&
789  stream->distance == 1 &&
790  stream->pending_read_nblocks == 0 &&
791  stream->per_buffer_data_size == 0)
792  {
793  stream->fast_path = true;
794  }
795 #endif
796 
797  return buffer;
798 }
799 
800 /*
801  * Transitional support for code that would like to perform or skip reads
802  * itself, without using the stream. Returns, and consumes, the next block
803  * number that would be read by the stream's look-ahead algorithm, or
804  * InvalidBlockNumber if the end of the stream is reached. Also reports the
805  * strategy that would be used to read it.
806  */
809 {
810  *strategy = stream->ios[0].op.strategy;
811  return read_stream_get_block(stream, NULL);
812 }
813 
814 /*
815  * Reset a read stream by releasing any queued up buffers, allowing the stream
816  * to be used again for different blocks. This can be used to clear an
817  * end-of-stream condition and start again, or to throw away blocks that were
818  * speculatively read and read some different blocks instead.
819  */
820 void
822 {
823  Buffer buffer;
824 
825  /* Stop looking ahead. */
826  stream->distance = 0;
827 
828  /* Forget buffered block number and fast path state. */
830  stream->fast_path = false;
831 
832  /* Unpin anything that wasn't consumed. */
833  while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
834  ReleaseBuffer(buffer);
835 
836  Assert(stream->pinned_buffers == 0);
837  Assert(stream->ios_in_progress == 0);
838 
839  /* Start off assuming data is cached. */
840  stream->distance = 1;
841 }
842 
843 /*
844  * Release and free a read stream.
845  */
846 void
848 {
849  read_stream_reset(stream);
850  pfree(stream);
851 }
uint32 BlockNumber
Definition: block.h:31
#define InvalidBlockNumber
Definition: block.h:33
int Buffer
Definition: buf.h:23
#define InvalidBuffer
Definition: buf.h:25
bool StartReadBuffers(ReadBuffersOperation *operation, Buffer *buffers, BlockNumber blockNum, int *nblocks, int flags)
Definition: bufmgr.c:1349
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4906
void WaitReadBuffers(ReadBuffersOperation *operation)
Definition: bufmgr.c:1392
void LimitAdditionalPins(uint32 *additional_pins)
Definition: bufmgr.c:2098
int effective_io_concurrency
Definition: bufmgr.c:151
bool StartReadBuffer(ReadBuffersOperation *operation, Buffer *buffer, BlockNumber blocknum, int flags)
Definition: bufmgr.c:1364
int io_combine_limit
Definition: bufmgr.c:165
#define READ_BUFFERS_ISSUE_ADVICE
Definition: bufmgr.h:113
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:351
unsigned int uint32
Definition: c.h:509
#define Min(x, y)
Definition: c.h:1007
#define likely(x)
Definition: c.h:313
signed short int16
Definition: c.h:496
#define MAXALIGN(LEN)
Definition: c.h:814
#define Max(x, y)
Definition: c.h:1001
#define Assert(condition)
Definition: c.h:861
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:401
#define unlikely(x)
Definition: c.h:314
#define PG_INT16_MAX
Definition: c.h:589
#define OidIsValid(objectId)
Definition: c.h:778
bool IsCatalogRelation(Relation relation)
Definition: catalog.c:103
bool IsCatalogRelationOid(Oid relid)
Definition: catalog.c:120
int io_direct_flags
Definition: fd.c:168
#define IO_DIRECT_DATA
Definition: fd.h:54
int GetAccessStrategyPinLimit(BufferAccessStrategy strategy)
Definition: freelist.c:647
Oid MyDatabaseId
Definition: globals.c:93
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
void LimitAdditionalLocalPins(uint32 *additional_pins)
Definition: localbuf.c:290
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc(Size size)
Definition: mcxt.c:1317
unsigned int Oid
Definition: postgres_ext.h:31
static void * get_per_buffer_data(ReadStream *stream, int16 buffer_index)
Definition: read_stream.c:161
ReadStream * read_stream_begin_relation(int flags, BufferAccessStrategy strategy, Relation rel, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
Definition: read_stream.c:552
static void read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
Definition: read_stream.c:308
BlockNumber read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
Definition: read_stream.c:808
void read_stream_reset(ReadStream *stream)
Definition: read_stream.c:821
Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
Definition: read_stream.c:606
ReadStream * read_stream_begin_smgr_relation(int flags, BufferAccessStrategy strategy, SMgrRelation smgr, char smgr_persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
Definition: read_stream.c:576
struct InProgressIO InProgressIO
static ReadStream * read_stream_begin_impl(int flags, BufferAccessStrategy strategy, Relation rel, SMgrRelation smgr, char persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
Definition: read_stream.c:395
void read_stream_end(ReadStream *stream)
Definition: read_stream.c:847
BlockNumber block_range_read_stream_cb(ReadStream *stream, void *callback_private_data, void *per_buffer_data)
Definition: read_stream.c:172
static BlockNumber read_stream_get_block(ReadStream *stream, void *per_buffer_data)
Definition: read_stream.c:189
static void read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
Definition: read_stream.c:209
static void read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
Definition: read_stream.c:218
#define READ_STREAM_MAINTENANCE
Definition: read_stream.h:28
BlockNumber(* ReadStreamBlockNumberCB)(ReadStream *stream, void *callback_private_data, void *per_buffer_data)
Definition: read_stream.h:56
#define READ_STREAM_FULL
Definition: read_stream.h:43
#define READ_STREAM_SEQUENTIAL
Definition: read_stream.h:36
static SMgrRelation RelationGetSmgr(Relation rel)
Definition: rel.h:567
ForkNumber
Definition: relpath.h:56
static pg_noinline void Size size
Definition: slab.c:607
#define SmgrIsTemp(smgr)
Definition: smgr.h:73
int get_tablespace_io_concurrency(Oid spcid)
Definition: spccache.c:215
int get_tablespace_maintenance_io_concurrency(Oid spcid)
Definition: spccache.c:229
int16 buffer_index
Definition: read_stream.c:103
ReadBuffersOperation op
Definition: read_stream.c:104
ForkNumber forknum
Definition: bufmgr.h:121
Buffer * buffers
Definition: bufmgr.h:129
BufferAccessStrategy strategy
Definition: bufmgr.h:122
struct SMgrRelationData * smgr
Definition: bufmgr.h:119
int16 distance
Definition: read_stream.c:117
int16 ios_in_progress
Definition: read_stream.c:113
void * per_buffer_data
Definition: read_stream.c:142
int16 pinned_buffers
Definition: read_stream.c:116
int16 max_ios
Definition: read_stream.c:112
int16 oldest_buffer_index
Definition: read_stream.c:152
BlockNumber seq_blocknum
Definition: read_stream.c:134
bool advice_enabled
Definition: read_stream.c:118
BlockNumber pending_read_blocknum
Definition: read_stream.c:137
int16 max_pinned_buffers
Definition: read_stream.c:115
InProgressIO * ios
Definition: read_stream.c:145
int16 oldest_io_index
Definition: read_stream.c:146
BlockNumber buffered_blocknum
Definition: read_stream.c:124
int16 queue_size
Definition: read_stream.c:114
int16 next_buffer_index
Definition: read_stream.c:153
size_t per_buffer_data_size
Definition: read_stream.c:141
ReadStreamBlockNumberCB callback
Definition: read_stream.c:130
int16 next_io_index
Definition: read_stream.c:147
bool fast_path
Definition: read_stream.c:149
int16 pending_read_nblocks
Definition: read_stream.c:138
void * callback_private_data
Definition: read_stream.c:131
Buffer buffers[FLEXIBLE_ARRAY_MEMBER]
Definition: read_stream.c:154
RelFileLocator locator
RelFileNumber relNumber
Form_pg_class rd_rel
Definition: rel.h:111
RelFileLocatorBackend smgr_rlocator
Definition: smgr.h:37
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
Definition: test_ifaddrs.c:46