PostgreSQL Source Code  git master
read_stream.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * read_stream.c
4  * Mechanism for accessing buffered relation data with look-ahead
5  *
6  * Code that needs to access relation data typically pins blocks one at a
7  * time, often in a predictable order that might be sequential or data-driven.
8  * Calling the simple ReadBuffer() function for each block is inefficient,
9  * because blocks that are not yet in the buffer pool require I/O operations
10  * that are small and might stall waiting for storage. This mechanism looks
11  * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
12  * neighboring blocks together and ahead of time, with an adaptive look-ahead
13  * distance.
14  *
15  * A user-provided callback generates a stream of block numbers that is used
16  * to form reads of up to io_combine_limit, by attempting to merge them with a
17  * pending read. When that isn't possible, the existing pending read is sent
18  * to StartReadBuffers() so that a new one can begin to form.
19  *
20  * The algorithm for controlling the look-ahead distance tries to classify the
21  * stream into three ideal behaviors:
22  *
23  * A) No I/O is necessary, because the requested blocks are fully cached
24  * already. There is no benefit to looking ahead more than one block, so
25  * distance is 1. This is the default initial assumption.
26  *
27  * B) I/O is necessary, but fadvise is undesirable because the access is
28  * sequential, or impossible because direct I/O is enabled or the system
29  * doesn't support fadvise. There is no benefit in looking ahead more than
30  * io_combine_limit, because in this case the only goal is larger read system
31  * calls. Looking further ahead would pin many buffers and perform
32  * speculative work looking ahead for no benefit.
33  *
34  * C) I/O is necessary, it appears random, and this system supports fadvise.
35  * We'll look further ahead in order to reach the configured level of I/O
36  * concurrency.
37  *
38  * The distance increases rapidly and decays slowly, so that it moves towards
39  * those levels as different I/O patterns are discovered. For example, a
40  * sequential scan of fully cached data doesn't bother looking ahead, but a
41  * sequential scan that hits a region of uncached blocks will start issuing
42  * increasingly wide read calls until it plateaus at io_combine_limit.
43  *
44  * The main data structure is a circular queue of buffers of size
45  * max_pinned_buffers plus some extra space for technical reasons, ready to be
46  * returned by read_stream_next_buffer(). Each buffer also has an optional
47  * variable sized object that is passed from the callback to the consumer of
48  * buffers.
49  *
50  * Parallel to the queue of buffers, there is a circular queue of in-progress
51  * I/Os that have been started with StartReadBuffers(), and for which
52  * WaitReadBuffers() must be called before returning the buffer.
53  *
54  * For example, if the callback return block numbers 10, 42, 43, 60 in
55  * successive calls, then these data structures might appear as follows:
56  *
57  * buffers buf/data ios
58  *
59  * +----+ +-----+ +--------+
60  * | | | | +----+ 42..44 | <- oldest_io_index
61  * +----+ +-----+ | +--------+
62  * oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 |
63  * +----+ +-----+ | | +--------+
64  * | 42 | | ? |<-+ | | | <- next_io_index
65  * +----+ +-----+ | +--------+
66  * | 43 | | ? | | | |
67  * +----+ +-----+ | +--------+
68  * | 44 | | ? | | | |
69  * +----+ +-----+ | +--------+
70  * | 60 | | ? |<---+
71  * +----+ +-----+
72  * next_buffer_index -> | | | |
73  * +----+ +-----+
74  *
75  * In the example, 5 buffers are pinned, and the next buffer to be streamed to
76  * the client is block 10. Block 10 was a hit and has no associated I/O, but
77  * the range 42..44 requires an I/O wait before its buffers are returned, as
78  * does block 60.
79  *
80  *
81  * Portions Copyright (c) 2024, PostgreSQL Global Development Group
82  * Portions Copyright (c) 1994, Regents of the University of California
83  *
84  * IDENTIFICATION
85  * src/backend/storage/aio/read_stream.c
86  *
87  *-------------------------------------------------------------------------
88  */
89 #include "postgres.h"
90 
91 #include "catalog/pg_tablespace.h"
92 #include "miscadmin.h"
93 #include "storage/fd.h"
94 #include "storage/smgr.h"
95 #include "storage/read_stream.h"
96 #include "utils/memdebug.h"
97 #include "utils/rel.h"
98 #include "utils/spccache.h"
99 
100 typedef struct InProgressIO
101 {
105 
106 /*
107  * State for managing a stream of reads.
108  */
110 {
118 
119  /*
120  * Small buffer of block numbers, useful for 'ungetting' to resolve flow
121  * control problems when I/Os are split. Also useful for batch-loading
122  * block numbers in the fast path.
123  */
127 
128  /*
129  * The callback that will tell us which block numbers to read, and an
130  * opaque pointer that will be pass to it for its own purposes.
131  */
134 
135  /* Next expected block, for detecting sequential access. */
137 
138  /* The read operation we are currently preparing. */
141 
142  /* Space for buffers and optional per-buffer private data. */
145 
146  /* Read operations that have been started but not waited for yet. */
150 
151  bool fast_path;
152 
153  /* Circular queue of buffers. */
154  int16 oldest_buffer_index; /* Next pinned buffer to return */
155  int16 next_buffer_index; /* Index of next buffer to pin */
157 };
158 
159 /*
160  * Return a pointer to the per-buffer data by index.
161  */
162 static inline void *
163 get_per_buffer_data(ReadStream *stream, int16 buffer_index)
164 {
165  return (char *) stream->per_buffer_data +
166  stream->per_buffer_data_size * buffer_index;
167 }
168 
169 /*
170  * Ask the callback which block it would like us to read next, with a small
171  * buffer in front to allow read_stream_unget_block() to work and to allow the
172  * fast path to skip this function and work directly from the array.
173  */
174 static inline BlockNumber
175 read_stream_get_block(ReadStream *stream, void *per_buffer_data)
176 {
177  if (stream->blocknums_next < stream->blocknums_count)
178  return stream->blocknums[stream->blocknums_next++];
179 
180  /*
181  * We only bother to fetch one at a time here (but see the fast path which
182  * uses more).
183  */
184  return stream->callback(stream,
185  stream->callback_private_data,
186  per_buffer_data);
187 }
188 
189 /*
190  * In order to deal with short reads in StartReadBuffers(), we sometimes need
191  * to defer handling of a block until later.
192  */
193 static inline void
195 {
196  if (stream->blocknums_next == stream->blocknums_count)
197  {
198  /* Never initialized or entirely consumed. Re-initialize. */
199  stream->blocknums[0] = blocknum;
200  stream->blocknums_count = 1;
201  stream->blocknums_next = 0;
202  }
203  else
204  {
205  /* Must be the last value return from blocknums array. */
206  Assert(stream->blocknums_next > 0);
207  stream->blocknums_next--;
208  Assert(stream->blocknums[stream->blocknums_next] == blocknum);
209  }
210 }
211 
212 #ifndef READ_STREAM_DISABLE_FAST_PATH
213 static void
215 {
216  BlockNumber blocknum;
217  int i = 0;
218 
219  do
220  {
221  blocknum = stream->callback(stream,
222  stream->callback_private_data,
223  NULL);
224  stream->blocknums[i++] = blocknum;
225  } while (i < lengthof(stream->blocknums) &&
226  blocknum != InvalidBlockNumber);
227  stream->blocknums_count = i;
228  stream->blocknums_next = 0;
229 }
230 #endif
231 
232 static void
233 read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
234 {
235  bool need_wait;
236  int nblocks;
237  int flags;
238  int16 io_index;
239  int16 overflow;
240  int16 buffer_index;
241 
242  /* This should only be called with a pending read. */
243  Assert(stream->pending_read_nblocks > 0);
245 
246  /* We had better not exceed the pin limit by starting this read. */
247  Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
248  stream->max_pinned_buffers);
249 
250  /* We had better not be overwriting an existing pinned buffer. */
251  if (stream->pinned_buffers > 0)
252  Assert(stream->next_buffer_index != stream->oldest_buffer_index);
253  else
254  Assert(stream->next_buffer_index == stream->oldest_buffer_index);
255 
256  /*
257  * If advice hasn't been suppressed, this system supports it, and this
258  * isn't a strictly sequential pattern, then we'll issue advice.
259  */
260  if (!suppress_advice &&
261  stream->advice_enabled &&
262  stream->pending_read_blocknum != stream->seq_blocknum)
264  else
265  flags = 0;
266 
267  /* We say how many blocks we want to read, but may be smaller on return. */
268  buffer_index = stream->next_buffer_index;
269  io_index = stream->next_io_index;
270  nblocks = stream->pending_read_nblocks;
271  need_wait = StartReadBuffers(&stream->ios[io_index].op,
272  &stream->buffers[buffer_index],
273  stream->pending_read_blocknum,
274  &nblocks,
275  flags);
276  stream->pinned_buffers += nblocks;
277 
278  /* Remember whether we need to wait before returning this buffer. */
279  if (!need_wait)
280  {
281  /* Look-ahead distance decays, no I/O necessary (behavior A). */
282  if (stream->distance > 1)
283  stream->distance--;
284  }
285  else
286  {
287  /*
288  * Remember to call WaitReadBuffers() before returning head buffer.
289  * Look-ahead distance will be adjusted after waiting.
290  */
291  stream->ios[io_index].buffer_index = buffer_index;
292  if (++stream->next_io_index == stream->max_ios)
293  stream->next_io_index = 0;
294  Assert(stream->ios_in_progress < stream->max_ios);
295  stream->ios_in_progress++;
296  stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
297  }
298 
299  /*
300  * We gave a contiguous range of buffer space to StartReadBuffers(), but
301  * we want it to wrap around at queue_size. Slide overflowing buffers to
302  * the front of the array.
303  */
304  overflow = (buffer_index + nblocks) - stream->queue_size;
305  if (overflow > 0)
306  memmove(&stream->buffers[0],
307  &stream->buffers[stream->queue_size],
308  sizeof(stream->buffers[0]) * overflow);
309 
310  /* Compute location of start of next read, without using % operator. */
311  buffer_index += nblocks;
312  if (buffer_index >= stream->queue_size)
313  buffer_index -= stream->queue_size;
314  Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
315  stream->next_buffer_index = buffer_index;
316 
317  /* Adjust the pending read to cover the remaining portion, if any. */
318  stream->pending_read_blocknum += nblocks;
319  stream->pending_read_nblocks -= nblocks;
320 }
321 
322 static void
323 read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
324 {
325  while (stream->ios_in_progress < stream->max_ios &&
326  stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
327  {
328  BlockNumber blocknum;
329  int16 buffer_index;
330  void *per_buffer_data;
331 
332  if (stream->pending_read_nblocks == io_combine_limit)
333  {
334  read_stream_start_pending_read(stream, suppress_advice);
335  suppress_advice = false;
336  continue;
337  }
338 
339  /*
340  * See which block the callback wants next in the stream. We need to
341  * compute the index of the Nth block of the pending read including
342  * wrap-around, but we don't want to use the expensive % operator.
343  */
344  buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
345  if (buffer_index >= stream->queue_size)
346  buffer_index -= stream->queue_size;
347  Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
348  per_buffer_data = get_per_buffer_data(stream, buffer_index);
349  blocknum = read_stream_get_block(stream, per_buffer_data);
350  if (blocknum == InvalidBlockNumber)
351  {
352  /* End of stream. */
353  stream->distance = 0;
354  break;
355  }
356 
357  /* Can we merge it with the pending read? */
358  if (stream->pending_read_nblocks > 0 &&
359  stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
360  {
361  stream->pending_read_nblocks++;
362  continue;
363  }
364 
365  /* We have to start the pending read before we can build another. */
366  while (stream->pending_read_nblocks > 0)
367  {
368  read_stream_start_pending_read(stream, suppress_advice);
369  suppress_advice = false;
370  if (stream->ios_in_progress == stream->max_ios)
371  {
372  /* And we've hit the limit. Rewind, and stop here. */
373  read_stream_unget_block(stream, blocknum);
374  return;
375  }
376  }
377 
378  /* This is the start of a new pending read. */
379  stream->pending_read_blocknum = blocknum;
380  stream->pending_read_nblocks = 1;
381  }
382 
383  /*
384  * We don't start the pending read just because we've hit the distance
385  * limit, preferring to give it another chance to grow to full
386  * io_combine_limit size once more buffers have been consumed. However,
387  * if we've already reached io_combine_limit, or we've reached the
388  * distance limit and there isn't anything pinned yet, or the callback has
389  * signaled end-of-stream, we start the read immediately.
390  */
391  if (stream->pending_read_nblocks > 0 &&
393  (stream->pending_read_nblocks == stream->distance &&
394  stream->pinned_buffers == 0) ||
395  stream->distance == 0) &&
396  stream->ios_in_progress < stream->max_ios)
397  read_stream_start_pending_read(stream, suppress_advice);
398 }
399 
400 /*
401  * Create a new read stream object that can be used to perform the equivalent
402  * of a series of ReadBuffer() calls for one fork of one relation.
403  * Internally, it generates larger vectored reads where possible by looking
404  * ahead. The callback should return block numbers or InvalidBlockNumber to
405  * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
406  * write extra data for each block into the space provided to it. It will
407  * also receive callback_private_data for its own purposes.
408  */
409 ReadStream *
411  BufferAccessStrategy strategy,
412  Relation rel,
413  ForkNumber forknum,
415  void *callback_private_data,
416  size_t per_buffer_data_size)
417 {
418  ReadStream *stream;
419  size_t size;
420  int16 queue_size;
421  int max_ios;
422  int strategy_pin_limit;
423  uint32 max_pinned_buffers;
424  Oid tablespace_id;
425  SMgrRelation smgr;
426 
427  smgr = RelationGetSmgr(rel);
428 
429  /*
430  * Decide how many I/Os we will allow to run at the same time. That
431  * currently means advice to the kernel to tell it that we will soon read.
432  * This number also affects how far we look ahead for opportunities to
433  * start more I/Os.
434  */
435  tablespace_id = smgr->smgr_rlocator.locator.spcOid;
436  if (!OidIsValid(MyDatabaseId) ||
437  IsCatalogRelation(rel) ||
439  {
440  /*
441  * Avoid circularity while trying to look up tablespace settings or
442  * before spccache.c is ready.
443  */
444  max_ios = effective_io_concurrency;
445  }
446  else if (flags & READ_STREAM_MAINTENANCE)
447  max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
448  else
449  max_ios = get_tablespace_io_concurrency(tablespace_id);
450 
451  /* Cap to INT16_MAX to avoid overflowing below */
452  max_ios = Min(max_ios, PG_INT16_MAX);
453 
454  /*
455  * Choose the maximum number of buffers we're prepared to pin. We try to
456  * pin fewer if we can, though. We clamp it to at least io_combine_limit
457  * so that we can have a chance to build up a full io_combine_limit sized
458  * read, even when max_ios is zero. Be careful not to allow int16 to
459  * overflow (even though that's not possible with the current GUC range
460  * limits), allowing also for the spare entry and the overflow space.
461  */
462  max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
463  max_pinned_buffers = Min(max_pinned_buffers,
465 
466  /* Give the strategy a chance to limit the number of buffers we pin. */
467  strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
468  max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
469 
470  /* Don't allow this backend to pin more than its share of buffers. */
471  if (SmgrIsTemp(smgr))
472  LimitAdditionalLocalPins(&max_pinned_buffers);
473  else
474  LimitAdditionalPins(&max_pinned_buffers);
475  Assert(max_pinned_buffers > 0);
476 
477  /*
478  * We need one extra entry for buffers and per-buffer data, because users
479  * of per-buffer data have access to the object until the next call to
480  * read_stream_next_buffer(), so we need a gap between the head and tail
481  * of the queue so that we don't clobber it.
482  */
483  queue_size = max_pinned_buffers + 1;
484 
485  /*
486  * Allocate the object, the buffers, the ios and per_data_data space in
487  * one big chunk. Though we have queue_size buffers, we want to be able
488  * to assume that all the buffers for a single read are contiguous (i.e.
489  * don't wrap around halfway through), so we allow temporary overflows of
490  * up to the maximum possible read size by allocating an extra
491  * io_combine_limit - 1 elements.
492  */
493  size = offsetof(ReadStream, buffers);
494  size += sizeof(Buffer) * (queue_size + io_combine_limit - 1);
495  size += sizeof(InProgressIO) * Max(1, max_ios);
496  size += per_buffer_data_size * queue_size;
497  size += MAXIMUM_ALIGNOF * 2;
498  stream = (ReadStream *) palloc(size);
499  memset(stream, 0, offsetof(ReadStream, buffers));
500  stream->ios = (InProgressIO *)
501  MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]);
502  if (per_buffer_data_size > 0)
503  stream->per_buffer_data = (void *)
504  MAXALIGN(&stream->ios[Max(1, max_ios)]);
505 
506 #ifdef USE_PREFETCH
507 
508  /*
509  * This system supports prefetching advice. We can use it as long as
510  * direct I/O isn't enabled, the caller hasn't promised sequential access
511  * (overriding our detection heuristics), and max_ios hasn't been set to
512  * zero.
513  */
514  if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
515  (flags & READ_STREAM_SEQUENTIAL) == 0 &&
516  max_ios > 0)
517  stream->advice_enabled = true;
518 #endif
519 
520  /*
521  * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled
522  * above. If we had real asynchronous I/O we might need a slightly
523  * different definition.
524  */
525  if (max_ios == 0)
526  max_ios = 1;
527 
528  stream->max_ios = max_ios;
529  stream->per_buffer_data_size = per_buffer_data_size;
530  stream->max_pinned_buffers = max_pinned_buffers;
531  stream->queue_size = queue_size;
532  stream->callback = callback;
533  stream->callback_private_data = callback_private_data;
534 
535  /*
536  * Skip the initial ramp-up phase if the caller says we're going to be
537  * reading the whole relation. This way we start out assuming we'll be
538  * doing full io_combine_limit sized reads (behavior B).
539  */
540  if (flags & READ_STREAM_FULL)
541  stream->distance = Min(max_pinned_buffers, io_combine_limit);
542  else
543  stream->distance = 1;
544 
545  /*
546  * Since we always access the same relation, we can initialize parts of
547  * the ReadBuffersOperation objects and leave them that way, to avoid
548  * wasting CPU cycles writing to them for each read.
549  */
550  for (int i = 0; i < max_ios; ++i)
551  {
552  stream->ios[i].op.rel = rel;
553  stream->ios[i].op.smgr = RelationGetSmgr(rel);
554  stream->ios[i].op.smgr_persistence = 0;
555  stream->ios[i].op.forknum = forknum;
556  stream->ios[i].op.strategy = strategy;
557  }
558 
559  return stream;
560 }
561 
562 /*
563  * Pull one pinned buffer out of a stream. Each call returns successive
564  * blocks in the order specified by the callback. If per_buffer_data_size was
565  * set to a non-zero size, *per_buffer_data receives a pointer to the extra
566  * per-buffer data that the callback had a chance to populate, which remains
567  * valid until the next call to read_stream_next_buffer(). When the stream
568  * runs out of data, InvalidBuffer is returned. The caller may decide to end
569  * the stream early at any time by calling read_stream_end().
570  */
571 Buffer
572 read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
573 {
574  Buffer buffer;
575  int16 oldest_buffer_index;
576 
577 #ifndef READ_STREAM_DISABLE_FAST_PATH
578 
579  /*
580  * A fast path for all-cached scans (behavior A). This is the same as the
581  * usual algorithm, but it is specialized for no I/O and no per-buffer
582  * data, so we can skip the queue management code, stay in the same buffer
583  * slot and use singular StartReadBuffer().
584  */
585  if (likely(stream->fast_path))
586  {
587  BlockNumber next_blocknum;
588 
589  /* Fast path assumptions. */
590  Assert(stream->ios_in_progress == 0);
591  Assert(stream->pinned_buffers == 1);
592  Assert(stream->distance == 1);
593  Assert(stream->pending_read_nblocks == 0);
594  Assert(stream->per_buffer_data_size == 0);
595 
596  /* We're going to return the buffer we pinned last time. */
597  oldest_buffer_index = stream->oldest_buffer_index;
598  Assert((oldest_buffer_index + 1) % stream->queue_size ==
599  stream->next_buffer_index);
600  buffer = stream->buffers[oldest_buffer_index];
601  Assert(buffer != InvalidBuffer);
602 
603  /* Choose the next block to pin. */
604  if (unlikely(stream->blocknums_next == stream->blocknums_count))
606  next_blocknum = stream->blocknums[stream->blocknums_next++];
607 
608  if (likely(next_blocknum != InvalidBlockNumber))
609  {
610  /*
611  * Pin a buffer for the next call. Same buffer entry, and
612  * arbitrary I/O entry (they're all free). We don't have to
613  * adjust pinned_buffers because we're transferring one to caller
614  * but pinning one more.
615  */
616  if (likely(!StartReadBuffer(&stream->ios[0].op,
617  &stream->buffers[oldest_buffer_index],
618  next_blocknum,
619  stream->advice_enabled ?
621  {
622  /* Fast return. */
623  return buffer;
624  }
625 
626  /* Next call must wait for I/O for the newly pinned buffer. */
627  stream->oldest_io_index = 0;
628  stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
629  stream->ios_in_progress = 1;
630  stream->ios[0].buffer_index = oldest_buffer_index;
631  stream->seq_blocknum = next_blocknum + 1;
632  }
633  else
634  {
635  /* No more blocks, end of stream. */
636  stream->distance = 0;
637  stream->oldest_buffer_index = stream->next_buffer_index;
638  stream->pinned_buffers = 0;
639  }
640 
641  stream->fast_path = false;
642  return buffer;
643  }
644 #endif
645 
646  if (unlikely(stream->pinned_buffers == 0))
647  {
648  Assert(stream->oldest_buffer_index == stream->next_buffer_index);
649 
650  /* End of stream reached? */
651  if (stream->distance == 0)
652  return InvalidBuffer;
653 
654  /*
655  * The usual order of operations is that we look ahead at the bottom
656  * of this function after potentially finishing an I/O and making
657  * space for more, but if we're just starting up we'll need to crank
658  * the handle to get started.
659  */
660  read_stream_look_ahead(stream, true);
661 
662  /* End of stream reached? */
663  if (stream->pinned_buffers == 0)
664  {
665  Assert(stream->distance == 0);
666  return InvalidBuffer;
667  }
668  }
669 
670  /* Grab the oldest pinned buffer and associated per-buffer data. */
671  Assert(stream->pinned_buffers > 0);
672  oldest_buffer_index = stream->oldest_buffer_index;
673  Assert(oldest_buffer_index >= 0 &&
674  oldest_buffer_index < stream->queue_size);
675  buffer = stream->buffers[oldest_buffer_index];
676  if (per_buffer_data)
677  *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
678 
679  Assert(BufferIsValid(buffer));
680 
681  /* Do we have to wait for an associated I/O first? */
682  if (stream->ios_in_progress > 0 &&
683  stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
684  {
685  int16 io_index = stream->oldest_io_index;
686  int16 distance;
687 
688  /* Sanity check that we still agree on the buffers. */
689  Assert(stream->ios[io_index].op.buffers ==
690  &stream->buffers[oldest_buffer_index]);
691 
692  WaitReadBuffers(&stream->ios[io_index].op);
693 
694  Assert(stream->ios_in_progress > 0);
695  stream->ios_in_progress--;
696  if (++stream->oldest_io_index == stream->max_ios)
697  stream->oldest_io_index = 0;
698 
699  if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
700  {
701  /* Distance ramps up fast (behavior C). */
702  distance = stream->distance * 2;
703  distance = Min(distance, stream->max_pinned_buffers);
704  stream->distance = distance;
705  }
706  else
707  {
708  /* No advice; move towards io_combine_limit (behavior B). */
709  if (stream->distance > io_combine_limit)
710  {
711  stream->distance--;
712  }
713  else
714  {
715  distance = stream->distance * 2;
716  distance = Min(distance, io_combine_limit);
717  distance = Min(distance, stream->max_pinned_buffers);
718  stream->distance = distance;
719  }
720  }
721  }
722 
723 #ifdef CLOBBER_FREED_MEMORY
724  /* Clobber old buffer and per-buffer data for debugging purposes. */
725  stream->buffers[oldest_buffer_index] = InvalidBuffer;
726 
727  /*
728  * The caller will get access to the per-buffer data, until the next call.
729  * We wipe the one before, which is never occupied because queue_size
730  * allowed one extra element. This will hopefully trip up client code
731  * that is holding a dangling pointer to it.
732  */
733  if (stream->per_buffer_data)
734  wipe_mem(get_per_buffer_data(stream,
735  oldest_buffer_index == 0 ?
736  stream->queue_size - 1 :
737  oldest_buffer_index - 1),
738  stream->per_buffer_data_size);
739 #endif
740 
741  /* Pin transferred to caller. */
742  Assert(stream->pinned_buffers > 0);
743  stream->pinned_buffers--;
744 
745  /* Advance oldest buffer, with wrap-around. */
746  stream->oldest_buffer_index++;
747  if (stream->oldest_buffer_index == stream->queue_size)
748  stream->oldest_buffer_index = 0;
749 
750  /* Prepare for the next call. */
751  read_stream_look_ahead(stream, false);
752 
753 #ifndef READ_STREAM_DISABLE_FAST_PATH
754  /* See if we can take the fast path for all-cached scans next time. */
755  if (stream->ios_in_progress == 0 &&
756  stream->pinned_buffers == 1 &&
757  stream->distance == 1 &&
758  stream->pending_read_nblocks == 0 &&
759  stream->per_buffer_data_size == 0)
760  {
761  stream->fast_path = true;
762  }
763 #endif
764 
765  return buffer;
766 }
767 
768 /*
769  * Reset a read stream by releasing any queued up buffers, allowing the stream
770  * to be used again for different blocks. This can be used to clear an
771  * end-of-stream condition and start again, or to throw away blocks that were
772  * speculatively read and read some different blocks instead.
773  */
774 void
776 {
777  Buffer buffer;
778 
779  /* Stop looking ahead. */
780  stream->distance = 0;
781 
782  /* Forget buffered block numbers and fast path state. */
783  stream->blocknums_next = 0;
784  stream->blocknums_count = 0;
785  stream->fast_path = false;
786 
787  /* Unpin anything that wasn't consumed. */
788  while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
789  ReleaseBuffer(buffer);
790 
791  Assert(stream->pinned_buffers == 0);
792  Assert(stream->ios_in_progress == 0);
793 
794  /* Start off assuming data is cached. */
795  stream->distance = 1;
796 }
797 
798 /*
799  * Release and free a read stream.
800  */
801 void
803 {
804  read_stream_reset(stream);
805  pfree(stream);
806 }
uint32 BlockNumber
Definition: block.h:31
#define InvalidBlockNumber
Definition: block.h:33
int Buffer
Definition: buf.h:23
#define InvalidBuffer
Definition: buf.h:25
bool StartReadBuffers(ReadBuffersOperation *operation, Buffer *buffers, BlockNumber blockNum, int *nblocks, int flags)
Definition: bufmgr.c:1306
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4850
void WaitReadBuffers(ReadBuffersOperation *operation)
Definition: bufmgr.c:1349
void LimitAdditionalPins(uint32 *additional_pins)
Definition: bufmgr.c:2058
int effective_io_concurrency
Definition: bufmgr.c:150
bool StartReadBuffer(ReadBuffersOperation *operation, Buffer *buffer, BlockNumber blocknum, int flags)
Definition: bufmgr.c:1321
int io_combine_limit
Definition: bufmgr.c:164
@ READ_BUFFERS_ISSUE_ADVICE
Definition: bufmgr.h:116
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:359
unsigned int uint32
Definition: c.h:506
#define Min(x, y)
Definition: c.h:1004
#define likely(x)
Definition: c.h:310
signed short int16
Definition: c.h:493
#define MAXALIGN(LEN)
Definition: c.h:811
#define Max(x, y)
Definition: c.h:998
#define Assert(condition)
Definition: c.h:858
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:398
#define unlikely(x)
Definition: c.h:311
#define lengthof(array)
Definition: c.h:788
#define PG_INT16_MAX
Definition: c.h:586
#define OidIsValid(objectId)
Definition: c.h:775
bool IsCatalogRelation(Relation relation)
Definition: catalog.c:103
bool IsCatalogRelationOid(Oid relid)
Definition: catalog.c:120
int io_direct_flags
Definition: fd.c:168
#define IO_DIRECT_DATA
Definition: fd.h:54
int GetAccessStrategyPinLimit(BufferAccessStrategy strategy)
Definition: freelist.c:647
Oid MyDatabaseId
Definition: globals.c:91
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
void LimitAdditionalLocalPins(uint32 *additional_pins)
Definition: localbuf.c:290
void pfree(void *pointer)
Definition: mcxt.c:1520
void * palloc(Size size)
Definition: mcxt.c:1316
unsigned int Oid
Definition: postgres_ext.h:31
static void * get_per_buffer_data(ReadStream *stream, int16 buffer_index)
Definition: read_stream.c:163
ReadStream * read_stream_begin_relation(int flags, BufferAccessStrategy strategy, Relation rel, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
Definition: read_stream.c:410
static void read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
Definition: read_stream.c:323
void read_stream_reset(ReadStream *stream)
Definition: read_stream.c:775
Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
Definition: read_stream.c:572
static void read_stream_fill_blocknums(ReadStream *stream)
Definition: read_stream.c:214
struct InProgressIO InProgressIO
void read_stream_end(ReadStream *stream)
Definition: read_stream.c:802
static BlockNumber read_stream_get_block(ReadStream *stream, void *per_buffer_data)
Definition: read_stream.c:175
static void read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
Definition: read_stream.c:194
static void read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
Definition: read_stream.c:233
#define READ_STREAM_MAINTENANCE
Definition: read_stream.h:27
BlockNumber(* ReadStreamBlockNumberCB)(ReadStream *stream, void *callback_private_data, void *per_buffer_data)
Definition: read_stream.h:48
#define READ_STREAM_FULL
Definition: read_stream.h:42
#define READ_STREAM_SEQUENTIAL
Definition: read_stream.h:35
static SMgrRelation RelationGetSmgr(Relation rel)
Definition: rel.h:567
ForkNumber
Definition: relpath.h:48
static pg_noinline void Size size
Definition: slab.c:607
#define SmgrIsTemp(smgr)
Definition: smgr.h:73
int get_tablespace_io_concurrency(Oid spcid)
Definition: spccache.c:215
int get_tablespace_maintenance_io_concurrency(Oid spcid)
Definition: spccache.c:229
int16 buffer_index
Definition: read_stream.c:102
ReadBuffersOperation op
Definition: read_stream.c:103
ForkNumber forknum
Definition: bufmgr.h:129
Buffer * buffers
Definition: bufmgr.h:137
BufferAccessStrategy strategy
Definition: bufmgr.h:130
struct SMgrRelationData * smgr
Definition: bufmgr.h:127
int16 distance
Definition: read_stream.c:116
int16 blocknums_next
Definition: read_stream.c:126
int16 ios_in_progress
Definition: read_stream.c:112
void * per_buffer_data
Definition: read_stream.c:144
int16 pinned_buffers
Definition: read_stream.c:115
int16 max_ios
Definition: read_stream.c:111
int16 oldest_buffer_index
Definition: read_stream.c:154
BlockNumber seq_blocknum
Definition: read_stream.c:136
bool advice_enabled
Definition: read_stream.c:117
BlockNumber pending_read_blocknum
Definition: read_stream.c:139
int16 max_pinned_buffers
Definition: read_stream.c:114
InProgressIO * ios
Definition: read_stream.c:147
int16 oldest_io_index
Definition: read_stream.c:148
int16 queue_size
Definition: read_stream.c:113
int16 blocknums_count
Definition: read_stream.c:125
BlockNumber blocknums[16]
Definition: read_stream.c:124
int16 next_buffer_index
Definition: read_stream.c:155
size_t per_buffer_data_size
Definition: read_stream.c:143
ReadStreamBlockNumberCB callback
Definition: read_stream.c:132
int16 next_io_index
Definition: read_stream.c:149
bool fast_path
Definition: read_stream.c:151
int16 pending_read_nblocks
Definition: read_stream.c:140
void * callback_private_data
Definition: read_stream.c:133
Buffer buffers[FLEXIBLE_ARRAY_MEMBER]
Definition: read_stream.c:156
RelFileLocator locator
RelFileNumber relNumber
RelFileLocatorBackend smgr_rlocator
Definition: smgr.h:37
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
Definition: test_ifaddrs.c:46