PostgreSQL Source Code  git master
xlogprefetcher.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * xlogprefetcher.c
4  * Prefetching support for recovery.
5  *
6  * Portions Copyright (c) 2022-2024, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/access/transam/xlogprefetcher.c
12  *
13  * This module provides a drop-in replacement for an XLogReader that tries to
14  * minimize I/O stalls by looking ahead in the WAL. If blocks that will be
15  * accessed in the near future are not already in the buffer pool, it initiates
16  * I/Os that might complete before the caller eventually needs the data. When
17  * referenced blocks are found in the buffer pool already, the buffer is
18  * recorded in the decoded record so that XLogReadBufferForRedo() can try to
19  * avoid a second buffer mapping table lookup.
20  *
21  * Currently, only the main fork is considered for prefetching. Currently,
22  * prefetching is only effective on systems where PrefetchBuffer() does
23  * something useful (mainly Linux).
24  *
25  *-------------------------------------------------------------------------
26  */
27 
28 #include "postgres.h"
29 
30 #include "access/xlogprefetcher.h"
31 #include "access/xlogreader.h"
32 #include "catalog/pg_control.h"
33 #include "catalog/storage_xlog.h"
35 #include "funcapi.h"
36 #include "miscadmin.h"
37 #include "port/atomics.h"
38 #include "storage/bufmgr.h"
39 #include "storage/shmem.h"
40 #include "storage/smgr.h"
41 #include "utils/fmgrprotos.h"
42 #include "utils/guc_hooks.h"
43 #include "utils/hsearch.h"
44 #include "utils/timestamp.h"
45 
46 /*
47  * Every time we process this much WAL, we'll update the values in
48  * pg_stat_recovery_prefetch.
49  */
50 #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
51 
52 /*
53  * To detect repeated access to the same block and skip useless extra system
54  * calls, we remember a small window of recently prefetched blocks.
55  */
56 #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
57 
58 /*
59  * When maintenance_io_concurrency is not saturated, we're prepared to look
60  * ahead up to N times that number of block references.
61  */
62 #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
63 
64 /* Define to log internal debugging messages. */
65 /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
66 
67 /* GUCs */
69 
70 #ifdef USE_PREFETCH
71 #define RecoveryPrefetchEnabled() \
72  (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
73  maintenance_io_concurrency > 0)
74 #else
75 #define RecoveryPrefetchEnabled() false
76 #endif
77 
79 
80 /*
81  * Enum used to report whether an IO should be started.
82  */
83 typedef enum
84 {
89 
90 /*
91  * Type of callback that can decide which block to prefetch next. For now
92  * there is only one.
93  */
94 typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
95  XLogRecPtr *lsn);
96 
97 /*
98  * A simple circular queue of LSNs, using to control the number of
99  * (potentially) inflight IOs. This stands in for a later more general IO
100  * control mechanism, which is why it has the apparently unnecessary
101  * indirection through a function pointer.
102  */
103 typedef struct LsnReadQueue
104 {
106  uintptr_t lrq_private;
113  struct
114  {
115  bool io;
119 
120 /*
121  * A prefetcher. This is a mechanism that wraps an XLogReader, prefetching
122  * blocks that will be soon be referenced, to try to avoid IO stalls.
123  */
125 {
126  /* WAL reader and current reading state. */
130 
131  /* When to publish stats. */
133 
134  /* Book-keeping to avoid accessing blocks that don't exist yet. */
137 
138  /* Book-keeping to avoid repeat prefetches. */
142 
143  /* Book-keeping to disable prefetching temporarily. */
145 
146  /* IO depth manager. */
148 
150 
152 };
153 
154 /*
155  * A temporary filter used to track block ranges that haven't been created
156  * yet, whole relations that haven't been created yet, and whole relations
157  * that (we assume) have already been dropped, or will be created by bulk WAL
158  * operators.
159  */
160 typedef struct XLogPrefetcherFilter
161 {
167 
168 /*
169  * Counters exposed in shared memory for pg_stat_recovery_prefetch.
170  */
171 typedef struct XLogPrefetchStats
172 {
173  pg_atomic_uint64 reset_time; /* Time of last reset. */
174  pg_atomic_uint64 prefetch; /* Prefetches initiated. */
175  pg_atomic_uint64 hit; /* Blocks already in cache. */
176  pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
177  pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */
178  pg_atomic_uint64 skip_fpw; /* FPWs skipped. */
179  pg_atomic_uint64 skip_rep; /* Repeat accesses skipped. */
180 
181  /* Dynamic values */
182  int wal_distance; /* Number of WAL bytes ahead. */
183  int block_distance; /* Number of block references ahead. */
184  int io_depth; /* Number of I/Os in progress. */
186 
187 static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
188  RelFileLocator rlocator,
189  BlockNumber blockno,
190  XLogRecPtr lsn);
191 static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
192  RelFileLocator rlocator,
193  BlockNumber blockno);
194 static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
195  XLogRecPtr replaying_lsn);
196 static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
197  XLogRecPtr *lsn);
198 
200 
201 static inline LsnReadQueue *
202 lrq_alloc(uint32 max_distance,
203  uint32 max_inflight,
204  uintptr_t lrq_private,
206 {
207  LsnReadQueue *lrq;
208  uint32 size;
209 
210  Assert(max_distance >= max_inflight);
211 
212  size = max_distance + 1; /* full ring buffer has a gap */
213  lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
214  lrq->lrq_private = lrq_private;
215  lrq->max_inflight = max_inflight;
216  lrq->size = size;
217  lrq->next = next;
218  lrq->head = 0;
219  lrq->tail = 0;
220  lrq->inflight = 0;
221  lrq->completed = 0;
222 
223  return lrq;
224 }
225 
226 static inline void
228 {
229  pfree(lrq);
230 }
231 
232 static inline uint32
234 {
235  return lrq->inflight;
236 }
237 
238 static inline uint32
240 {
241  return lrq->completed;
242 }
243 
244 static inline void
246 {
247  /* Try to start as many IOs as we can within our limits. */
248  while (lrq->inflight < lrq->max_inflight &&
249  lrq->inflight + lrq->completed < lrq->size - 1)
250  {
251  Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
252  switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
253  {
254  case LRQ_NEXT_AGAIN:
255  return;
256  case LRQ_NEXT_IO:
257  lrq->queue[lrq->head].io = true;
258  lrq->inflight++;
259  break;
260  case LRQ_NEXT_NO_IO:
261  lrq->queue[lrq->head].io = false;
262  lrq->completed++;
263  break;
264  }
265  lrq->head++;
266  if (lrq->head == lrq->size)
267  lrq->head = 0;
268  }
269 }
270 
271 static inline void
273 {
274  /*
275  * We know that LSNs before 'lsn' have been replayed, so we can now assume
276  * that any IOs that were started before then have finished.
277  */
278  while (lrq->tail != lrq->head &&
279  lrq->queue[lrq->tail].lsn < lsn)
280  {
281  if (lrq->queue[lrq->tail].io)
282  lrq->inflight--;
283  else
284  lrq->completed--;
285  lrq->tail++;
286  if (lrq->tail == lrq->size)
287  lrq->tail = 0;
288  }
290  lrq_prefetch(lrq);
291 }
292 
293 size_t
295 {
296  return sizeof(XLogPrefetchStats);
297 }
298 
299 /*
300  * Reset all counters to zero.
301  */
302 void
304 {
312 }
313 
314 void
316 {
317  bool found;
318 
320  ShmemInitStruct("XLogPrefetchStats",
321  sizeof(XLogPrefetchStats),
322  &found);
323 
324  if (!found)
325  {
333  }
334 }
335 
336 /*
337  * Called when any GUC is changed that affects prefetching.
338  */
339 void
341 {
343 }
344 
345 /*
346  * Increment a counter in shared memory. This is equivalent to *counter++ on a
347  * plain uint64 without any memory barrier or locking, except on platforms
348  * where readers can't read uint64 without possibly observing a torn value.
349  */
350 static inline void
352 {
354  pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
355 }
356 
357 /*
358  * Create a prefetcher that is ready to begin prefetching blocks referenced by
359  * WAL records.
360  */
363 {
364  XLogPrefetcher *prefetcher;
365  HASHCTL ctl;
366 
367  prefetcher = palloc0(sizeof(XLogPrefetcher));
368  prefetcher->reader = reader;
369 
370  ctl.keysize = sizeof(RelFileLocator);
371  ctl.entrysize = sizeof(XLogPrefetcherFilter);
372  prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
373  &ctl, HASH_ELEM | HASH_BLOBS);
374  dlist_init(&prefetcher->filter_queue);
375 
378  SharedStats->io_depth = 0;
379 
380  /* First usage will cause streaming_read to be allocated. */
382 
383  return prefetcher;
384 }
385 
386 /*
387  * Destroy a prefetcher and release all resources.
388  */
389 void
391 {
392  lrq_free(prefetcher->streaming_read);
393  hash_destroy(prefetcher->filter_table);
394  pfree(prefetcher);
395 }
396 
397 /*
398  * Provide access to the reader.
399  */
402 {
403  return prefetcher->reader;
404 }
405 
406 /*
407  * Update the statistics visible in the pg_stat_recovery_prefetch view.
408  */
409 void
411 {
412  uint32 io_depth;
413  uint32 completed;
414  int64 wal_distance;
415 
416 
417  /* How far ahead of replay are we now? */
418  if (prefetcher->reader->decode_queue_tail)
419  {
420  wal_distance =
421  prefetcher->reader->decode_queue_tail->lsn -
422  prefetcher->reader->decode_queue_head->lsn;
423  }
424  else
425  {
426  wal_distance = 0;
427  }
428 
429  /* How many IOs are currently in flight and completed? */
430  io_depth = lrq_inflight(prefetcher->streaming_read);
431  completed = lrq_completed(prefetcher->streaming_read);
432 
433  /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
434  SharedStats->io_depth = io_depth;
435  SharedStats->block_distance = io_depth + completed;
436  SharedStats->wal_distance = wal_distance;
437 
438  prefetcher->next_stats_shm_lsn =
440 }
441 
442 /*
443  * A callback that examines the next block reference in the WAL, and possibly
444  * starts an IO so that a later read will be fast.
445  *
446  * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
447  *
448  * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
449  * that isn't in the buffer pool, and the kernel has been asked to start
450  * reading it to make a future read system call faster. An LSN is written to
451  * *lsn, and the I/O will be considered to have completed once that LSN is
452  * replayed.
453  *
454  * Returns LRQ_NEXT_NO_IO if we examined the next block reference and found
455  * that it was already in the buffer pool, or we decided for various reasons
456  * not to prefetch.
457  */
459 XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
460 {
461  XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
462  XLogReaderState *reader = prefetcher->reader;
463  XLogRecPtr replaying_lsn = reader->ReadRecPtr;
464 
465  /*
466  * We keep track of the record and block we're up to between calls with
467  * prefetcher->record and prefetcher->next_block_id.
468  */
469  for (;;)
470  {
471  DecodedXLogRecord *record;
472 
473  /* Try to read a new future record, if we don't already have one. */
474  if (prefetcher->record == NULL)
475  {
476  bool nonblocking;
477 
478  /*
479  * If there are already records or an error queued up that could
480  * be replayed, we don't want to block here. Otherwise, it's OK
481  * to block waiting for more data: presumably the caller has
482  * nothing else to do.
483  */
484  nonblocking = XLogReaderHasQueuedRecordOrError(reader);
485 
486  /* Readahead is disabled until we replay past a certain point. */
487  if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
488  return LRQ_NEXT_AGAIN;
489 
490  record = XLogReadAhead(prefetcher->reader, nonblocking);
491  if (record == NULL)
492  {
493  /*
494  * We can't read any more, due to an error or lack of data in
495  * nonblocking mode. Don't try to read ahead again until
496  * we've replayed everything already decoded.
497  */
498  if (nonblocking && prefetcher->reader->decode_queue_tail)
499  prefetcher->no_readahead_until =
500  prefetcher->reader->decode_queue_tail->lsn;
501 
502  return LRQ_NEXT_AGAIN;
503  }
504 
505  /*
506  * If prefetching is disabled, we don't need to analyze the record
507  * or issue any prefetches. We just need to cause one record to
508  * be decoded.
509  */
511  {
512  *lsn = InvalidXLogRecPtr;
513  return LRQ_NEXT_NO_IO;
514  }
515 
516  /* We have a new record to process. */
517  prefetcher->record = record;
518  prefetcher->next_block_id = 0;
519  }
520  else
521  {
522  /* Continue to process from last call, or last loop. */
523  record = prefetcher->record;
524  }
525 
526  /*
527  * Check for operations that require us to filter out block ranges, or
528  * pause readahead completely.
529  */
530  if (replaying_lsn < record->lsn)
531  {
532  uint8 rmid = record->header.xl_rmid;
533  uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK;
534 
535  if (rmid == RM_XLOG_ID)
536  {
537  if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
538  record_type == XLOG_END_OF_RECOVERY)
539  {
540  /*
541  * These records might change the TLI. Avoid potential
542  * bugs if we were to allow "read TLI" and "replay TLI" to
543  * differ without more analysis.
544  */
545  prefetcher->no_readahead_until = record->lsn;
546 
547 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
548  elog(XLOGPREFETCHER_DEBUG_LEVEL,
549  "suppressing all readahead until %X/%X is replayed due to possible TLI change",
550  LSN_FORMAT_ARGS(record->lsn));
551 #endif
552 
553  /* Fall through so we move past this record. */
554  }
555  }
556  else if (rmid == RM_DBASE_ID)
557  {
558  /*
559  * When databases are created with the file-copy strategy,
560  * there are no WAL records to tell us about the creation of
561  * individual relations.
562  */
563  if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
564  {
567  RelFileLocator rlocator =
569 
570  /*
571  * Don't try to prefetch anything in this database until
572  * it has been created, or we might confuse the blocks of
573  * different generations, if a database OID or
574  * relfilenumber is reused. It's also more efficient than
575  * discovering that relations don't exist on disk yet with
576  * ENOENT errors.
577  */
578  XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
579 
580 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
581  elog(XLOGPREFETCHER_DEBUG_LEVEL,
582  "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
583  rlocator.dbOid,
584  LSN_FORMAT_ARGS(record->lsn));
585 #endif
586  }
587  }
588  else if (rmid == RM_SMGR_ID)
589  {
590  if (record_type == XLOG_SMGR_CREATE)
591  {
592  xl_smgr_create *xlrec = (xl_smgr_create *)
593  record->main_data;
594 
595  if (xlrec->forkNum == MAIN_FORKNUM)
596  {
597  /*
598  * Don't prefetch anything for this whole relation
599  * until it has been created. Otherwise we might
600  * confuse the blocks of different generations, if a
601  * relfilenumber is reused. This also avoids the need
602  * to discover the problem via extra syscalls that
603  * report ENOENT.
604  */
605  XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
606  record->lsn);
607 
608 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
609  elog(XLOGPREFETCHER_DEBUG_LEVEL,
610  "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
611  xlrec->rlocator.spcOid,
612  xlrec->rlocator.dbOid,
613  xlrec->rlocator.relNumber,
614  LSN_FORMAT_ARGS(record->lsn));
615 #endif
616  }
617  }
618  else if (record_type == XLOG_SMGR_TRUNCATE)
619  {
621  record->main_data;
622 
623  /*
624  * Don't consider prefetching anything in the truncated
625  * range until the truncation has been performed.
626  */
627  XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
628  xlrec->blkno,
629  record->lsn);
630 
631 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
632  elog(XLOGPREFETCHER_DEBUG_LEVEL,
633  "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
634  xlrec->rlocator.spcOid,
635  xlrec->rlocator.dbOid,
636  xlrec->rlocator.relNumber,
637  xlrec->blkno,
638  LSN_FORMAT_ARGS(record->lsn));
639 #endif
640  }
641  }
642  }
643 
644  /* Scan the block references, starting where we left off last time. */
645  while (prefetcher->next_block_id <= record->max_block_id)
646  {
647  int block_id = prefetcher->next_block_id++;
648  DecodedBkpBlock *block = &record->blocks[block_id];
649  SMgrRelation reln;
650  PrefetchBufferResult result;
651 
652  if (!block->in_use)
653  continue;
654 
656 
657  /*
658  * Record the LSN of this record. When it's replayed,
659  * LsnReadQueue will consider any IOs submitted for earlier LSNs
660  * to be finished.
661  */
662  *lsn = record->lsn;
663 
664  /* We don't try to prefetch anything but the main fork for now. */
665  if (block->forknum != MAIN_FORKNUM)
666  {
667  return LRQ_NEXT_NO_IO;
668  }
669 
670  /*
671  * If there is a full page image attached, we won't be reading the
672  * page, so don't bother trying to prefetch.
673  */
674  if (block->has_image)
675  {
677  return LRQ_NEXT_NO_IO;
678  }
679 
680  /* There is no point in reading a page that will be zeroed. */
681  if (block->flags & BKPBLOCK_WILL_INIT)
682  {
684  return LRQ_NEXT_NO_IO;
685  }
686 
687  /* Should we skip prefetching this block due to a filter? */
688  if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
689  {
691  return LRQ_NEXT_NO_IO;
692  }
693 
694  /* There is no point in repeatedly prefetching the same block. */
695  for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
696  {
697  if (block->blkno == prefetcher->recent_block[i] &&
698  RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
699  {
700  /*
701  * XXX If we also remembered where it was, we could set
702  * recent_buffer so that recovery could skip smgropen()
703  * and a buffer table lookup.
704  */
706  return LRQ_NEXT_NO_IO;
707  }
708  }
709  prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
710  prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
711  prefetcher->recent_idx =
712  (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
713 
714  /*
715  * We could try to have a fast path for repeated references to the
716  * same relation (with some scheme to handle invalidations
717  * safely), but for now we'll call smgropen() every time.
718  */
719  reln = smgropen(block->rlocator, INVALID_PROC_NUMBER);
720 
721  /*
722  * If the relation file doesn't exist on disk, for example because
723  * we're replaying after a crash and the file will be created and
724  * then unlinked by WAL that hasn't been replayed yet, suppress
725  * further prefetching in the relation until this record is
726  * replayed.
727  */
728  if (!smgrexists(reln, MAIN_FORKNUM))
729  {
730 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
731  elog(XLOGPREFETCHER_DEBUG_LEVEL,
732  "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
736  LSN_FORMAT_ARGS(record->lsn));
737 #endif
738  XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
739  record->lsn);
741  return LRQ_NEXT_NO_IO;
742  }
743 
744  /*
745  * If the relation isn't big enough to contain the referenced
746  * block yet, suppress prefetching of this block and higher until
747  * this record is replayed.
748  */
749  if (block->blkno >= smgrnblocks(reln, block->forknum))
750  {
751 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
752  elog(XLOGPREFETCHER_DEBUG_LEVEL,
753  "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
757  block->blkno,
758  LSN_FORMAT_ARGS(record->lsn));
759 #endif
760  XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
761  record->lsn);
763  return LRQ_NEXT_NO_IO;
764  }
765 
766  /* Try to initiate prefetching. */
767  result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
768  if (BufferIsValid(result.recent_buffer))
769  {
770  /* Cache hit, nothing to do. */
772  block->prefetch_buffer = result.recent_buffer;
773  return LRQ_NEXT_NO_IO;
774  }
775  else if (result.initiated_io)
776  {
777  /* Cache miss, I/O (presumably) started. */
780  return LRQ_NEXT_IO;
781  }
782  else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
783  {
784  /*
785  * This shouldn't be possible, because we already determined
786  * that the relation exists on disk and is big enough.
787  * Something is wrong with the cache invalidation for
788  * smgrexists(), smgrnblocks(), or the file was unlinked or
789  * truncated beneath our feet?
790  */
791  elog(ERROR,
792  "could not prefetch relation %u/%u/%u block %u",
796  block->blkno);
797  }
798  }
799 
800  /*
801  * Several callsites need to be able to read exactly one record
802  * without any internal readahead. Examples: xlog.c reading
803  * checkpoint records with emode set to PANIC, which might otherwise
804  * cause XLogPageRead() to panic on some future page, and xlog.c
805  * determining where to start writing WAL next, which depends on the
806  * contents of the reader's internal buffer after reading one record.
807  * Therefore, don't even think about prefetching until the first
808  * record after XLogPrefetcherBeginRead() has been consumed.
809  */
810  if (prefetcher->reader->decode_queue_tail &&
811  prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
812  return LRQ_NEXT_AGAIN;
813 
814  /* Advance to the next record. */
815  prefetcher->record = NULL;
816  }
817  pg_unreachable();
818 }
819 
820 /*
821  * Expose statistics about recovery prefetching.
822  */
823 Datum
825 {
826 #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
827  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
830 
831  InitMaterializedSRF(fcinfo, 0);
832 
833  for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
834  nulls[i] = false;
835 
846  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
847 
848  return (Datum) 0;
849 }
850 
851 /*
852  * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
853  * has been replayed.
854  */
855 static inline void
857  BlockNumber blockno, XLogRecPtr lsn)
858 {
859  XLogPrefetcherFilter *filter;
860  bool found;
861 
862  filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
863  if (!found)
864  {
865  /*
866  * Don't allow any prefetching of this block or higher until replayed.
867  */
868  filter->filter_until_replayed = lsn;
869  filter->filter_from_block = blockno;
870  dlist_push_head(&prefetcher->filter_queue, &filter->link);
871  }
872  else
873  {
874  /*
875  * We were already filtering this rlocator. Extend the filter's
876  * lifetime to cover this WAL record, but leave the lower of the block
877  * numbers there because we don't want to have to track individual
878  * blocks.
879  */
880  filter->filter_until_replayed = lsn;
881  dlist_delete(&filter->link);
882  dlist_push_head(&prefetcher->filter_queue, &filter->link);
883  filter->filter_from_block = Min(filter->filter_from_block, blockno);
884  }
885 }
886 
887 /*
888  * Have we replayed any records that caused us to begin filtering a block
889  * range? That means that relations should have been created, extended or
890  * dropped as required, so we can stop filtering out accesses to a given
891  * relfilenumber.
892  */
893 static inline void
895 {
896  while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
897  {
899  link,
900  &prefetcher->filter_queue);
901 
902  if (filter->filter_until_replayed >= replaying_lsn)
903  break;
904 
905  dlist_delete(&filter->link);
906  hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
907  }
908 }
909 
910 /*
911  * Check if a given block should be skipped due to a filter.
912  */
913 static inline bool
915  BlockNumber blockno)
916 {
917  /*
918  * Test for empty queue first, because we expect it to be empty most of
919  * the time and we can avoid the hash table lookup in that case.
920  */
921  if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
922  {
923  XLogPrefetcherFilter *filter;
924 
925  /* See if the block range is filtered. */
926  filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
927  if (filter && filter->filter_from_block <= blockno)
928  {
929 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
930  elog(XLOGPREFETCHER_DEBUG_LEVEL,
931  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
932  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
934  filter->filter_from_block);
935 #endif
936  return true;
937  }
938 
939  /* See if the whole database is filtered. */
940  rlocator.relNumber = InvalidRelFileNumber;
941  rlocator.spcOid = InvalidOid;
942  filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
943  if (filter)
944  {
945 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
946  elog(XLOGPREFETCHER_DEBUG_LEVEL,
947  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
948  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
950 #endif
951  return true;
952  }
953  }
954 
955  return false;
956 }
957 
958 /*
959  * A wrapper for XLogBeginRead() that also resets the prefetcher.
960  */
961 void
963 {
964  /* This will forget about any in-flight IO. */
965  prefetcher->reconfigure_count--;
966 
967  /* Book-keeping to avoid readahead on first read. */
968  prefetcher->begin_ptr = recPtr;
969 
970  prefetcher->no_readahead_until = 0;
971 
972  /* This will forget about any queued up records in the decoder. */
973  XLogBeginRead(prefetcher->reader, recPtr);
974 }
975 
976 /*
977  * A wrapper for XLogReadRecord() that provides the same interface, but also
978  * tries to initiate I/O for blocks referenced in future WAL records.
979  */
980 XLogRecord *
982 {
983  DecodedXLogRecord *record;
984  XLogRecPtr replayed_up_to;
985 
986  /*
987  * See if it's time to reset the prefetching machinery, because a relevant
988  * GUC was changed.
989  */
991  {
992  uint32 max_distance;
993  uint32 max_inflight;
994 
995  if (prefetcher->streaming_read)
996  lrq_free(prefetcher->streaming_read);
997 
999  {
1001  max_inflight = maintenance_io_concurrency;
1002  max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
1003  }
1004  else
1005  {
1006  max_inflight = 1;
1007  max_distance = 1;
1008  }
1009 
1010  prefetcher->streaming_read = lrq_alloc(max_distance,
1011  max_inflight,
1012  (uintptr_t) prefetcher,
1014 
1015  prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
1016  }
1017 
1018  /*
1019  * Release last returned record, if there is one, as it's now been
1020  * replayed.
1021  */
1022  replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
1023 
1024  /*
1025  * Can we drop any filters yet? If we were waiting for a relation to be
1026  * created or extended, it is now OK to access blocks in the covered
1027  * range.
1028  */
1029  XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
1030 
1031  /*
1032  * All IO initiated by earlier WAL is now completed. This might trigger
1033  * further prefetching.
1034  */
1035  lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1036 
1037  /*
1038  * If there's nothing queued yet, then start prefetching to cause at least
1039  * one record to be queued.
1040  */
1041  if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1042  {
1043  Assert(lrq_inflight(prefetcher->streaming_read) == 0);
1044  Assert(lrq_completed(prefetcher->streaming_read) == 0);
1045  lrq_prefetch(prefetcher->streaming_read);
1046  }
1047 
1048  /* Read the next record. */
1049  record = XLogNextRecord(prefetcher->reader, errmsg);
1050  if (!record)
1051  return NULL;
1052 
1053  /*
1054  * The record we just got is the "current" one, for the benefit of the
1055  * XLogRecXXX() macros.
1056  */
1057  Assert(record == prefetcher->reader->record);
1058 
1059  /*
1060  * If maintenance_io_concurrency is set very low, we might have started
1061  * prefetching some but not all of the blocks referenced in the record
1062  * we're about to return. Forget about the rest of the blocks in this
1063  * record by dropping the prefetcher's reference to it.
1064  */
1065  if (record == prefetcher->record)
1066  prefetcher->record = NULL;
1067 
1068  /*
1069  * See if it's time to compute some statistics, because enough WAL has
1070  * been processed.
1071  */
1072  if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
1073  XLogPrefetcherComputeStats(prefetcher);
1074 
1075  Assert(record == prefetcher->reader->record);
1076 
1077  return &record->header;
1078 }
1079 
1080 bool
1081 check_recovery_prefetch(int *new_value, void **extra, GucSource source)
1082 {
1083 #ifndef USE_PREFETCH
1084  if (*new_value == RECOVERY_PREFETCH_ON)
1085  {
1086  GUC_check_errdetail("\"recovery_prefetch\" is not supported on platforms that lack support for issuing read-ahead advice.");
1087  return false;
1088  }
1089 #endif
1090 
1091  return true;
1092 }
1093 
1094 void
1095 assign_recovery_prefetch(int new_value, void *extra)
1096 {
1097  /* Reconfigure prefetching, because a setting it depends on changed. */
1098  recovery_prefetch = new_value;
1099  if (AmStartupProcess())
1101 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:485
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:453
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:467
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
uint32 BlockNumber
Definition: block.h:31
static int32 next
Definition: blutils.c:222
static Datum values[MAXATTR]
Definition: bootstrap.c:150
#define InvalidBuffer
Definition: buf.h:25
PrefetchBufferResult PrefetchSharedBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, BlockNumber blockNum)
Definition: bufmgr.c:549
int maintenance_io_concurrency
Definition: bufmgr.c:158
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:351
unsigned int uint32
Definition: c.h:506
#define Min(x, y)
Definition: c.h:1004
#define Assert(condition)
Definition: c.h:858
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:398
#define pg_unreachable()
Definition: c.h:296
#define unlikely(x)
Definition: c.h:311
unsigned char uint8
Definition: c.h:504
#define XLOG_DBASE_CREATE_FILE_COPY
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
int io_direct_flags
Definition: fd.c:168
#define IO_DIRECT_DATA
Definition: fd.h:54
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1807
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
bool IsUnderPostmaster
Definition: globals.c:119
#define GUC_check_errdetail
Definition: guc.h:476
GucSource
Definition: guc.h:108
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_REMOVE
Definition: hsearch.h:115
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:612
static void dlist_push_head(dlist_head *head, dlist_node *node)
Definition: ilist.h:347
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
void * palloc(Size size)
Definition: mcxt.c:1317
#define AmStartupProcess()
Definition: miscadmin.h:379
#define XLOG_CHECKPOINT_SHUTDOWN
Definition: pg_control.h:68
#define XLOG_END_OF_RECOVERY
Definition: pg_control.h:77
static rewind_source * source
Definition: pg_rewind.c:89
uintptr_t Datum
Definition: postgres.h:64
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
#define InvalidOid
Definition: postgres_ext.h:36
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
tree ctl
Definition: radixtree.h:1853
struct RelFileLocator RelFileLocator
#define RelFileLocatorEquals(locator1, locator2)
@ MAIN_FORKNUM
Definition: relpath.h:58
#define InvalidRelFileNumber
Definition: relpath.h:26
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
static pg_noinline void Size size
Definition: slab.c:607
BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:655
SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend)
Definition: smgr.c:198
bool smgrexists(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:398
#define XLOG_SMGR_CREATE
Definition: storage_xlog.h:30
#define XLOG_SMGR_TRUNCATE
Definition: storage_xlog.h:31
Buffer prefetch_buffer
Definition: xlogreader.h:130
RelFileLocator rlocator
Definition: xlogreader.h:125
BlockNumber blkno
Definition: xlogreader.h:127
ForkNumber forknum
Definition: xlogreader.h:126
XLogRecord header
Definition: xlogreader.h:166
XLogRecPtr lsn
Definition: xlogreader.h:164
Definition: dynahash.c:220
XLogRecPtr lsn
uint32 max_inflight
LsnReadQueueNextFun next
uintptr_t lrq_private
struct LsnReadQueue::@17 queue[FLEXIBLE_ARRAY_MEMBER]
Buffer recent_buffer
Definition: bufmgr.h:60
RelFileLocator locator
RelFileNumber relNumber
TupleDesc setDesc
Definition: execnodes.h:343
Tuplestorestate * setResult
Definition: execnodes.h:342
RelFileLocatorBackend smgr_rlocator
Definition: smgr.h:37
pg_atomic_uint64 skip_fpw
pg_atomic_uint64 skip_init
pg_atomic_uint64 reset_time
pg_atomic_uint64 hit
pg_atomic_uint64 prefetch
pg_atomic_uint64 skip_rep
pg_atomic_uint64 skip_new
RelFileLocator rlocator
XLogRecPtr filter_until_replayed
BlockNumber filter_from_block
dlist_head filter_queue
XLogRecPtr no_readahead_until
XLogReaderState * reader
XLogRecPtr begin_ptr
RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
LsnReadQueue * streaming_read
DecodedXLogRecord * record
XLogRecPtr next_stats_shm_lsn
BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
DecodedXLogRecord * record
Definition: xlogreader.h:236
DecodedXLogRecord * decode_queue_head
Definition: xlogreader.h:260
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206
DecodedXLogRecord * decode_queue_tail
Definition: xlogreader.h:261
uint8 xl_info
Definition: xlogrecord.h:46
RmgrId xl_rmid
Definition: xlogrecord.h:47
ForkNumber forkNum
Definition: storage_xlog.h:36
RelFileLocator rlocator
Definition: storage_xlog.h:35
RelFileLocator rlocator
Definition: storage_xlog.h:49
BlockNumber blkno
Definition: storage_xlog.h:48
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:784
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void XLogPrefetchResetStats(void)
static bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno)
void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
int recovery_prefetch
struct LsnReadQueue LsnReadQueue
#define RecoveryPrefetchEnabled()
static LsnReadQueue * lrq_alloc(uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)
static void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
LsnReadQueueNextStatus(* LsnReadQueueNextFun)(uintptr_t lrq_private, XLogRecPtr *lsn)
static void lrq_free(LsnReadQueue *lrq)
struct XLogPrefetchStats XLogPrefetchStats
XLogRecord * XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
static void lrq_prefetch(LsnReadQueue *lrq)
static int XLogPrefetchReconfigureCount
Datum pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
static uint32 lrq_completed(LsnReadQueue *lrq)
static XLogPrefetchStats * SharedStats
static uint32 lrq_inflight(LsnReadQueue *lrq)
void XLogPrefetchReconfigure(void)
size_t XLogPrefetchShmemSize(void)
#define PG_STAT_GET_RECOVERY_PREFETCH_COLS
XLogPrefetcher * XLogPrefetcherAllocate(XLogReaderState *reader)
void XLogPrefetchShmemInit(void)
void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
void assign_recovery_prefetch(int new_value, void *extra)
static void XLogPrefetchIncrement(pg_atomic_uint64 *counter)
#define XLOGPREFETCHER_SEQ_WINDOW_SIZE
struct XLogPrefetcherFilter XLogPrefetcherFilter
static void lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
#define XLOGPREFETCHER_STATS_DISTANCE
static void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno, XLogRecPtr lsn)
#define XLOGPREFETCHER_DISTANCE_MULTIPLIER
void XLogPrefetcherFree(XLogPrefetcher *prefetcher)
bool check_recovery_prefetch(int *new_value, void **extra, GucSource source)
LsnReadQueueNextStatus
@ LRQ_NEXT_NO_IO
@ LRQ_NEXT_IO
@ LRQ_NEXT_AGAIN
XLogReaderState * XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
@ RECOVERY_PREFETCH_ON
@ RECOVERY_PREFETCH_TRY
DecodedXLogRecord * XLogNextRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:325
DecodedXLogRecord * XLogReadAhead(XLogReaderState *state, bool nonblocking)
Definition: xlogreader.c:966
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:231
XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state)
Definition: xlogreader.c:249
static bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
Definition: xlogreader.h:325
#define BKPBLOCK_WILL_INIT
Definition: xlogrecord.h:199
#define XLR_INFO_MASK
Definition: xlogrecord.h:62