PostgreSQL Source Code  git master
xlogprefetcher.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * xlogprefetcher.c
4  * Prefetching support for recovery.
5  *
6  * Portions Copyright (c) 2022-2024, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/access/transam/xlogprefetcher.c
12  *
13  * This module provides a drop-in replacement for an XLogReader that tries to
14  * minimize I/O stalls by looking ahead in the WAL. If blocks that will be
15  * accessed in the near future are not already in the buffer pool, it initiates
16  * I/Os that might complete before the caller eventually needs the data. When
17  * referenced blocks are found in the buffer pool already, the buffer is
18  * recorded in the decoded record so that XLogReadBufferForRedo() can try to
19  * avoid a second buffer mapping table lookup.
20  *
21  * Currently, only the main fork is considered for prefetching. Currently,
22  * prefetching is only effective on systems where PrefetchBuffer() does
23  * something useful (mainly Linux).
24  *
25  *-------------------------------------------------------------------------
26  */
27 
28 #include "postgres.h"
29 
30 #include "access/xlogprefetcher.h"
31 #include "access/xlogreader.h"
32 #include "catalog/pg_control.h"
33 #include "catalog/storage_xlog.h"
35 #include "funcapi.h"
36 #include "miscadmin.h"
37 #include "port/atomics.h"
38 #include "storage/bufmgr.h"
39 #include "storage/shmem.h"
40 #include "storage/smgr.h"
41 #include "utils/fmgrprotos.h"
42 #include "utils/guc_hooks.h"
43 #include "utils/hsearch.h"
44 #include "utils/timestamp.h"
45 
46 /*
47  * Every time we process this much WAL, we'll update the values in
48  * pg_stat_recovery_prefetch.
49  */
50 #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
51 
52 /*
53  * To detect repeated access to the same block and skip useless extra system
54  * calls, we remember a small window of recently prefetched blocks.
55  */
56 #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
57 
58 /*
59  * When maintenance_io_concurrency is not saturated, we're prepared to look
60  * ahead up to N times that number of block references.
61  */
62 #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
63 
64 /* Define to log internal debugging messages. */
65 /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
66 
67 /* GUCs */
69 
70 #ifdef USE_PREFETCH
71 #define RecoveryPrefetchEnabled() \
72  (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
73  maintenance_io_concurrency > 0)
74 #else
75 #define RecoveryPrefetchEnabled() false
76 #endif
77 
79 
80 /*
81  * Enum used to report whether an IO should be started.
82  */
83 typedef enum
84 {
89 
90 /*
91  * Type of callback that can decide which block to prefetch next. For now
92  * there is only one.
93  */
94 typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
95  XLogRecPtr *lsn);
96 
97 /*
98  * A simple circular queue of LSNs, using to control the number of
99  * (potentially) inflight IOs. This stands in for a later more general IO
100  * control mechanism, which is why it has the apparently unnecessary
101  * indirection through a function pointer.
102  */
103 typedef struct LsnReadQueue
104 {
106  uintptr_t lrq_private;
113  struct
114  {
115  bool io;
119 
120 /*
121  * A prefetcher. This is a mechanism that wraps an XLogReader, prefetching
122  * blocks that will be soon be referenced, to try to avoid IO stalls.
123  */
125 {
126  /* WAL reader and current reading state. */
130 
131  /* When to publish stats. */
133 
134  /* Book-keeping to avoid accessing blocks that don't exist yet. */
137 
138  /* Book-keeping to avoid repeat prefetches. */
142 
143  /* Book-keeping to disable prefetching temporarily. */
145 
146  /* IO depth manager. */
148 
150 
152 };
153 
154 /*
155  * A temporary filter used to track block ranges that haven't been created
156  * yet, whole relations that haven't been created yet, and whole relations
157  * that (we assume) have already been dropped, or will be created by bulk WAL
158  * operators.
159  */
160 typedef struct XLogPrefetcherFilter
161 {
167 
168 /*
169  * Counters exposed in shared memory for pg_stat_recovery_prefetch.
170  */
171 typedef struct XLogPrefetchStats
172 {
173  pg_atomic_uint64 reset_time; /* Time of last reset. */
174  pg_atomic_uint64 prefetch; /* Prefetches initiated. */
175  pg_atomic_uint64 hit; /* Blocks already in cache. */
176  pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
177  pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */
178  pg_atomic_uint64 skip_fpw; /* FPWs skipped. */
179  pg_atomic_uint64 skip_rep; /* Repeat accesses skipped. */
180 
181  /* Dynamic values */
182  int wal_distance; /* Number of WAL bytes ahead. */
183  int block_distance; /* Number of block references ahead. */
184  int io_depth; /* Number of I/Os in progress. */
186 
187 static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
188  RelFileLocator rlocator,
189  BlockNumber blockno,
190  XLogRecPtr lsn);
191 static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
192  RelFileLocator rlocator,
193  BlockNumber blockno);
194 static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
195  XLogRecPtr replaying_lsn);
196 static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
197  XLogRecPtr *lsn);
198 
200 
201 static inline LsnReadQueue *
202 lrq_alloc(uint32 max_distance,
203  uint32 max_inflight,
204  uintptr_t lrq_private,
206 {
207  LsnReadQueue *lrq;
208  uint32 size;
209 
210  Assert(max_distance >= max_inflight);
211 
212  size = max_distance + 1; /* full ring buffer has a gap */
213  lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
214  lrq->lrq_private = lrq_private;
215  lrq->max_inflight = max_inflight;
216  lrq->size = size;
217  lrq->next = next;
218  lrq->head = 0;
219  lrq->tail = 0;
220  lrq->inflight = 0;
221  lrq->completed = 0;
222 
223  return lrq;
224 }
225 
226 static inline void
228 {
229  pfree(lrq);
230 }
231 
232 static inline uint32
234 {
235  return lrq->inflight;
236 }
237 
238 static inline uint32
240 {
241  return lrq->completed;
242 }
243 
244 static inline void
246 {
247  /* Try to start as many IOs as we can within our limits. */
248  while (lrq->inflight < lrq->max_inflight &&
249  lrq->inflight + lrq->completed < lrq->size - 1)
250  {
251  Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
252  switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
253  {
254  case LRQ_NEXT_AGAIN:
255  return;
256  case LRQ_NEXT_IO:
257  lrq->queue[lrq->head].io = true;
258  lrq->inflight++;
259  break;
260  case LRQ_NEXT_NO_IO:
261  lrq->queue[lrq->head].io = false;
262  lrq->completed++;
263  break;
264  }
265  lrq->head++;
266  if (lrq->head == lrq->size)
267  lrq->head = 0;
268  }
269 }
270 
271 static inline void
273 {
274  /*
275  * We know that LSNs before 'lsn' have been replayed, so we can now assume
276  * that any IOs that were started before then have finished.
277  */
278  while (lrq->tail != lrq->head &&
279  lrq->queue[lrq->tail].lsn < lsn)
280  {
281  if (lrq->queue[lrq->tail].io)
282  lrq->inflight--;
283  else
284  lrq->completed--;
285  lrq->tail++;
286  if (lrq->tail == lrq->size)
287  lrq->tail = 0;
288  }
290  lrq_prefetch(lrq);
291 }
292 
293 size_t
295 {
296  return sizeof(XLogPrefetchStats);
297 }
298 
299 /*
300  * Reset all counters to zero.
301  */
302 void
304 {
312 }
313 
314 void
316 {
317  bool found;
318 
320  ShmemInitStruct("XLogPrefetchStats",
321  sizeof(XLogPrefetchStats),
322  &found);
323 
324  if (!found)
325  {
333  }
334 }
335 
336 /*
337  * Called when any GUC is changed that affects prefetching.
338  */
339 void
341 {
343 }
344 
345 /*
346  * Increment a counter in shared memory. This is equivalent to *counter++ on a
347  * plain uint64 without any memory barrier or locking, except on platforms
348  * where readers can't read uint64 without possibly observing a torn value.
349  */
350 static inline void
352 {
354  pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
355 }
356 
357 /*
358  * Create a prefetcher that is ready to begin prefetching blocks referenced by
359  * WAL records.
360  */
363 {
364  XLogPrefetcher *prefetcher;
365  static HASHCTL hash_table_ctl = {
366  .keysize = sizeof(RelFileLocator),
367  .entrysize = sizeof(XLogPrefetcherFilter)
368  };
369 
370  prefetcher = palloc0(sizeof(XLogPrefetcher));
371 
372  prefetcher->reader = reader;
373  prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
374  &hash_table_ctl,
376  dlist_init(&prefetcher->filter_queue);
377 
380  SharedStats->io_depth = 0;
381 
382  /* First usage will cause streaming_read to be allocated. */
384 
385  return prefetcher;
386 }
387 
388 /*
389  * Destroy a prefetcher and release all resources.
390  */
391 void
393 {
394  lrq_free(prefetcher->streaming_read);
395  hash_destroy(prefetcher->filter_table);
396  pfree(prefetcher);
397 }
398 
399 /*
400  * Provide access to the reader.
401  */
404 {
405  return prefetcher->reader;
406 }
407 
408 /*
409  * Update the statistics visible in the pg_stat_recovery_prefetch view.
410  */
411 void
413 {
414  uint32 io_depth;
415  uint32 completed;
416  int64 wal_distance;
417 
418 
419  /* How far ahead of replay are we now? */
420  if (prefetcher->reader->decode_queue_tail)
421  {
422  wal_distance =
423  prefetcher->reader->decode_queue_tail->lsn -
424  prefetcher->reader->decode_queue_head->lsn;
425  }
426  else
427  {
428  wal_distance = 0;
429  }
430 
431  /* How many IOs are currently in flight and completed? */
432  io_depth = lrq_inflight(prefetcher->streaming_read);
433  completed = lrq_completed(prefetcher->streaming_read);
434 
435  /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
436  SharedStats->io_depth = io_depth;
437  SharedStats->block_distance = io_depth + completed;
438  SharedStats->wal_distance = wal_distance;
439 
440  prefetcher->next_stats_shm_lsn =
442 }
443 
444 /*
445  * A callback that examines the next block reference in the WAL, and possibly
446  * starts an IO so that a later read will be fast.
447  *
448  * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
449  *
450  * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
451  * that isn't in the buffer pool, and the kernel has been asked to start
452  * reading it to make a future read system call faster. An LSN is written to
453  * *lsn, and the I/O will be considered to have completed once that LSN is
454  * replayed.
455  *
456  * Returns LRQ_NEXT_NO_IO if we examined the next block reference and found
457  * that it was already in the buffer pool, or we decided for various reasons
458  * not to prefetch.
459  */
461 XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
462 {
463  XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
464  XLogReaderState *reader = prefetcher->reader;
465  XLogRecPtr replaying_lsn = reader->ReadRecPtr;
466 
467  /*
468  * We keep track of the record and block we're up to between calls with
469  * prefetcher->record and prefetcher->next_block_id.
470  */
471  for (;;)
472  {
473  DecodedXLogRecord *record;
474 
475  /* Try to read a new future record, if we don't already have one. */
476  if (prefetcher->record == NULL)
477  {
478  bool nonblocking;
479 
480  /*
481  * If there are already records or an error queued up that could
482  * be replayed, we don't want to block here. Otherwise, it's OK
483  * to block waiting for more data: presumably the caller has
484  * nothing else to do.
485  */
486  nonblocking = XLogReaderHasQueuedRecordOrError(reader);
487 
488  /* Readahead is disabled until we replay past a certain point. */
489  if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
490  return LRQ_NEXT_AGAIN;
491 
492  record = XLogReadAhead(prefetcher->reader, nonblocking);
493  if (record == NULL)
494  {
495  /*
496  * We can't read any more, due to an error or lack of data in
497  * nonblocking mode. Don't try to read ahead again until
498  * we've replayed everything already decoded.
499  */
500  if (nonblocking && prefetcher->reader->decode_queue_tail)
501  prefetcher->no_readahead_until =
502  prefetcher->reader->decode_queue_tail->lsn;
503 
504  return LRQ_NEXT_AGAIN;
505  }
506 
507  /*
508  * If prefetching is disabled, we don't need to analyze the record
509  * or issue any prefetches. We just need to cause one record to
510  * be decoded.
511  */
513  {
514  *lsn = InvalidXLogRecPtr;
515  return LRQ_NEXT_NO_IO;
516  }
517 
518  /* We have a new record to process. */
519  prefetcher->record = record;
520  prefetcher->next_block_id = 0;
521  }
522  else
523  {
524  /* Continue to process from last call, or last loop. */
525  record = prefetcher->record;
526  }
527 
528  /*
529  * Check for operations that require us to filter out block ranges, or
530  * pause readahead completely.
531  */
532  if (replaying_lsn < record->lsn)
533  {
534  uint8 rmid = record->header.xl_rmid;
535  uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK;
536 
537  if (rmid == RM_XLOG_ID)
538  {
539  if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
540  record_type == XLOG_END_OF_RECOVERY)
541  {
542  /*
543  * These records might change the TLI. Avoid potential
544  * bugs if we were to allow "read TLI" and "replay TLI" to
545  * differ without more analysis.
546  */
547  prefetcher->no_readahead_until = record->lsn;
548 
549 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
550  elog(XLOGPREFETCHER_DEBUG_LEVEL,
551  "suppressing all readahead until %X/%X is replayed due to possible TLI change",
552  LSN_FORMAT_ARGS(record->lsn));
553 #endif
554 
555  /* Fall through so we move past this record. */
556  }
557  }
558  else if (rmid == RM_DBASE_ID)
559  {
560  /*
561  * When databases are created with the file-copy strategy,
562  * there are no WAL records to tell us about the creation of
563  * individual relations.
564  */
565  if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
566  {
569  RelFileLocator rlocator =
571 
572  /*
573  * Don't try to prefetch anything in this database until
574  * it has been created, or we might confuse the blocks of
575  * different generations, if a database OID or
576  * relfilenumber is reused. It's also more efficient than
577  * discovering that relations don't exist on disk yet with
578  * ENOENT errors.
579  */
580  XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
581 
582 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
583  elog(XLOGPREFETCHER_DEBUG_LEVEL,
584  "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
585  rlocator.dbOid,
586  LSN_FORMAT_ARGS(record->lsn));
587 #endif
588  }
589  }
590  else if (rmid == RM_SMGR_ID)
591  {
592  if (record_type == XLOG_SMGR_CREATE)
593  {
594  xl_smgr_create *xlrec = (xl_smgr_create *)
595  record->main_data;
596 
597  if (xlrec->forkNum == MAIN_FORKNUM)
598  {
599  /*
600  * Don't prefetch anything for this whole relation
601  * until it has been created. Otherwise we might
602  * confuse the blocks of different generations, if a
603  * relfilenumber is reused. This also avoids the need
604  * to discover the problem via extra syscalls that
605  * report ENOENT.
606  */
607  XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
608  record->lsn);
609 
610 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
611  elog(XLOGPREFETCHER_DEBUG_LEVEL,
612  "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
613  xlrec->rlocator.spcOid,
614  xlrec->rlocator.dbOid,
615  xlrec->rlocator.relNumber,
616  LSN_FORMAT_ARGS(record->lsn));
617 #endif
618  }
619  }
620  else if (record_type == XLOG_SMGR_TRUNCATE)
621  {
623  record->main_data;
624 
625  /*
626  * Don't consider prefetching anything in the truncated
627  * range until the truncation has been performed.
628  */
629  XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
630  xlrec->blkno,
631  record->lsn);
632 
633 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
634  elog(XLOGPREFETCHER_DEBUG_LEVEL,
635  "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
636  xlrec->rlocator.spcOid,
637  xlrec->rlocator.dbOid,
638  xlrec->rlocator.relNumber,
639  xlrec->blkno,
640  LSN_FORMAT_ARGS(record->lsn));
641 #endif
642  }
643  }
644  }
645 
646  /* Scan the block references, starting where we left off last time. */
647  while (prefetcher->next_block_id <= record->max_block_id)
648  {
649  int block_id = prefetcher->next_block_id++;
650  DecodedBkpBlock *block = &record->blocks[block_id];
651  SMgrRelation reln;
652  PrefetchBufferResult result;
653 
654  if (!block->in_use)
655  continue;
656 
658 
659  /*
660  * Record the LSN of this record. When it's replayed,
661  * LsnReadQueue will consider any IOs submitted for earlier LSNs
662  * to be finished.
663  */
664  *lsn = record->lsn;
665 
666  /* We don't try to prefetch anything but the main fork for now. */
667  if (block->forknum != MAIN_FORKNUM)
668  {
669  return LRQ_NEXT_NO_IO;
670  }
671 
672  /*
673  * If there is a full page image attached, we won't be reading the
674  * page, so don't bother trying to prefetch.
675  */
676  if (block->has_image)
677  {
679  return LRQ_NEXT_NO_IO;
680  }
681 
682  /* There is no point in reading a page that will be zeroed. */
683  if (block->flags & BKPBLOCK_WILL_INIT)
684  {
686  return LRQ_NEXT_NO_IO;
687  }
688 
689  /* Should we skip prefetching this block due to a filter? */
690  if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
691  {
693  return LRQ_NEXT_NO_IO;
694  }
695 
696  /* There is no point in repeatedly prefetching the same block. */
697  for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
698  {
699  if (block->blkno == prefetcher->recent_block[i] &&
700  RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
701  {
702  /*
703  * XXX If we also remembered where it was, we could set
704  * recent_buffer so that recovery could skip smgropen()
705  * and a buffer table lookup.
706  */
708  return LRQ_NEXT_NO_IO;
709  }
710  }
711  prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
712  prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
713  prefetcher->recent_idx =
714  (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
715 
716  /*
717  * We could try to have a fast path for repeated references to the
718  * same relation (with some scheme to handle invalidations
719  * safely), but for now we'll call smgropen() every time.
720  */
721  reln = smgropen(block->rlocator, INVALID_PROC_NUMBER);
722 
723  /*
724  * If the relation file doesn't exist on disk, for example because
725  * we're replaying after a crash and the file will be created and
726  * then unlinked by WAL that hasn't been replayed yet, suppress
727  * further prefetching in the relation until this record is
728  * replayed.
729  */
730  if (!smgrexists(reln, MAIN_FORKNUM))
731  {
732 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
733  elog(XLOGPREFETCHER_DEBUG_LEVEL,
734  "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
738  LSN_FORMAT_ARGS(record->lsn));
739 #endif
740  XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
741  record->lsn);
743  return LRQ_NEXT_NO_IO;
744  }
745 
746  /*
747  * If the relation isn't big enough to contain the referenced
748  * block yet, suppress prefetching of this block and higher until
749  * this record is replayed.
750  */
751  if (block->blkno >= smgrnblocks(reln, block->forknum))
752  {
753 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
754  elog(XLOGPREFETCHER_DEBUG_LEVEL,
755  "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
759  block->blkno,
760  LSN_FORMAT_ARGS(record->lsn));
761 #endif
762  XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
763  record->lsn);
765  return LRQ_NEXT_NO_IO;
766  }
767 
768  /* Try to initiate prefetching. */
769  result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
770  if (BufferIsValid(result.recent_buffer))
771  {
772  /* Cache hit, nothing to do. */
774  block->prefetch_buffer = result.recent_buffer;
775  return LRQ_NEXT_NO_IO;
776  }
777  else if (result.initiated_io)
778  {
779  /* Cache miss, I/O (presumably) started. */
782  return LRQ_NEXT_IO;
783  }
784  else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
785  {
786  /*
787  * This shouldn't be possible, because we already determined
788  * that the relation exists on disk and is big enough.
789  * Something is wrong with the cache invalidation for
790  * smgrexists(), smgrnblocks(), or the file was unlinked or
791  * truncated beneath our feet?
792  */
793  elog(ERROR,
794  "could not prefetch relation %u/%u/%u block %u",
798  block->blkno);
799  }
800  }
801 
802  /*
803  * Several callsites need to be able to read exactly one record
804  * without any internal readahead. Examples: xlog.c reading
805  * checkpoint records with emode set to PANIC, which might otherwise
806  * cause XLogPageRead() to panic on some future page, and xlog.c
807  * determining where to start writing WAL next, which depends on the
808  * contents of the reader's internal buffer after reading one record.
809  * Therefore, don't even think about prefetching until the first
810  * record after XLogPrefetcherBeginRead() has been consumed.
811  */
812  if (prefetcher->reader->decode_queue_tail &&
813  prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
814  return LRQ_NEXT_AGAIN;
815 
816  /* Advance to the next record. */
817  prefetcher->record = NULL;
818  }
819  pg_unreachable();
820 }
821 
822 /*
823  * Expose statistics about recovery prefetching.
824  */
825 Datum
827 {
828 #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
829  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
832 
833  InitMaterializedSRF(fcinfo, 0);
834 
835  for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
836  nulls[i] = false;
837 
848  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
849 
850  return (Datum) 0;
851 }
852 
853 /*
854  * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
855  * has been replayed.
856  */
857 static inline void
859  BlockNumber blockno, XLogRecPtr lsn)
860 {
861  XLogPrefetcherFilter *filter;
862  bool found;
863 
864  filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
865  if (!found)
866  {
867  /*
868  * Don't allow any prefetching of this block or higher until replayed.
869  */
870  filter->filter_until_replayed = lsn;
871  filter->filter_from_block = blockno;
872  dlist_push_head(&prefetcher->filter_queue, &filter->link);
873  }
874  else
875  {
876  /*
877  * We were already filtering this rlocator. Extend the filter's
878  * lifetime to cover this WAL record, but leave the lower of the block
879  * numbers there because we don't want to have to track individual
880  * blocks.
881  */
882  filter->filter_until_replayed = lsn;
883  dlist_delete(&filter->link);
884  dlist_push_head(&prefetcher->filter_queue, &filter->link);
885  filter->filter_from_block = Min(filter->filter_from_block, blockno);
886  }
887 }
888 
889 /*
890  * Have we replayed any records that caused us to begin filtering a block
891  * range? That means that relations should have been created, extended or
892  * dropped as required, so we can stop filtering out accesses to a given
893  * relfilenumber.
894  */
895 static inline void
897 {
898  while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
899  {
901  link,
902  &prefetcher->filter_queue);
903 
904  if (filter->filter_until_replayed >= replaying_lsn)
905  break;
906 
907  dlist_delete(&filter->link);
908  hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
909  }
910 }
911 
912 /*
913  * Check if a given block should be skipped due to a filter.
914  */
915 static inline bool
917  BlockNumber blockno)
918 {
919  /*
920  * Test for empty queue first, because we expect it to be empty most of
921  * the time and we can avoid the hash table lookup in that case.
922  */
923  if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
924  {
925  XLogPrefetcherFilter *filter;
926 
927  /* See if the block range is filtered. */
928  filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
929  if (filter && filter->filter_from_block <= blockno)
930  {
931 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
932  elog(XLOGPREFETCHER_DEBUG_LEVEL,
933  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
934  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
936  filter->filter_from_block);
937 #endif
938  return true;
939  }
940 
941  /* See if the whole database is filtered. */
942  rlocator.relNumber = InvalidRelFileNumber;
943  rlocator.spcOid = InvalidOid;
944  filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
945  if (filter)
946  {
947 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
948  elog(XLOGPREFETCHER_DEBUG_LEVEL,
949  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
950  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
952 #endif
953  return true;
954  }
955  }
956 
957  return false;
958 }
959 
960 /*
961  * A wrapper for XLogBeginRead() that also resets the prefetcher.
962  */
963 void
965 {
966  /* This will forget about any in-flight IO. */
967  prefetcher->reconfigure_count--;
968 
969  /* Book-keeping to avoid readahead on first read. */
970  prefetcher->begin_ptr = recPtr;
971 
972  prefetcher->no_readahead_until = 0;
973 
974  /* This will forget about any queued up records in the decoder. */
975  XLogBeginRead(prefetcher->reader, recPtr);
976 }
977 
978 /*
979  * A wrapper for XLogReadRecord() that provides the same interface, but also
980  * tries to initiate I/O for blocks referenced in future WAL records.
981  */
982 XLogRecord *
984 {
985  DecodedXLogRecord *record;
986  XLogRecPtr replayed_up_to;
987 
988  /*
989  * See if it's time to reset the prefetching machinery, because a relevant
990  * GUC was changed.
991  */
993  {
994  uint32 max_distance;
995  uint32 max_inflight;
996 
997  if (prefetcher->streaming_read)
998  lrq_free(prefetcher->streaming_read);
999 
1001  {
1003  max_inflight = maintenance_io_concurrency;
1004  max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
1005  }
1006  else
1007  {
1008  max_inflight = 1;
1009  max_distance = 1;
1010  }
1011 
1012  prefetcher->streaming_read = lrq_alloc(max_distance,
1013  max_inflight,
1014  (uintptr_t) prefetcher,
1016 
1017  prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
1018  }
1019 
1020  /*
1021  * Release last returned record, if there is one, as it's now been
1022  * replayed.
1023  */
1024  replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
1025 
1026  /*
1027  * Can we drop any filters yet? If we were waiting for a relation to be
1028  * created or extended, it is now OK to access blocks in the covered
1029  * range.
1030  */
1031  XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
1032 
1033  /*
1034  * All IO initiated by earlier WAL is now completed. This might trigger
1035  * further prefetching.
1036  */
1037  lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1038 
1039  /*
1040  * If there's nothing queued yet, then start prefetching to cause at least
1041  * one record to be queued.
1042  */
1043  if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1044  {
1045  Assert(lrq_inflight(prefetcher->streaming_read) == 0);
1046  Assert(lrq_completed(prefetcher->streaming_read) == 0);
1047  lrq_prefetch(prefetcher->streaming_read);
1048  }
1049 
1050  /* Read the next record. */
1051  record = XLogNextRecord(prefetcher->reader, errmsg);
1052  if (!record)
1053  return NULL;
1054 
1055  /*
1056  * The record we just got is the "current" one, for the benefit of the
1057  * XLogRecXXX() macros.
1058  */
1059  Assert(record == prefetcher->reader->record);
1060 
1061  /*
1062  * If maintenance_io_concurrency is set very low, we might have started
1063  * prefetching some but not all of the blocks referenced in the record
1064  * we're about to return. Forget about the rest of the blocks in this
1065  * record by dropping the prefetcher's reference to it.
1066  */
1067  if (record == prefetcher->record)
1068  prefetcher->record = NULL;
1069 
1070  /*
1071  * See if it's time to compute some statistics, because enough WAL has
1072  * been processed.
1073  */
1074  if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
1075  XLogPrefetcherComputeStats(prefetcher);
1076 
1077  Assert(record == prefetcher->reader->record);
1078 
1079  return &record->header;
1080 }
1081 
1082 bool
1083 check_recovery_prefetch(int *new_value, void **extra, GucSource source)
1084 {
1085 #ifndef USE_PREFETCH
1086  if (*new_value == RECOVERY_PREFETCH_ON)
1087  {
1088  GUC_check_errdetail("recovery_prefetch is not supported on platforms that lack posix_fadvise().");
1089  return false;
1090  }
1091 #endif
1092 
1093  return true;
1094 }
1095 
1096 void
1097 assign_recovery_prefetch(int new_value, void *extra)
1098 {
1099  /* Reconfigure prefetching, because a setting it depends on changed. */
1100  recovery_prefetch = new_value;
1101  if (AmStartupProcess())
1103 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:480
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:448
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:462
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1654
uint32 BlockNumber
Definition: block.h:31
static int32 next
Definition: blutils.c:221
static Datum values[MAXATTR]
Definition: bootstrap.c:152
#define InvalidBuffer
Definition: buf.h:25
PrefetchBufferResult PrefetchSharedBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, BlockNumber blockNum)
Definition: bufmgr.c:548
int maintenance_io_concurrency
Definition: bufmgr.c:157
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:359
unsigned int uint32
Definition: c.h:506
#define Min(x, y)
Definition: c.h:1004
#define Assert(condition)
Definition: c.h:858
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:398
#define pg_unreachable()
Definition: c.h:296
#define unlikely(x)
Definition: c.h:311
unsigned char uint8
Definition: c.h:504
#define XLOG_DBASE_CREATE_FILE_COPY
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
int io_direct_flags
Definition: fd.c:168
#define IO_DIRECT_DATA
Definition: fd.h:54
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1807
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
bool IsUnderPostmaster
Definition: globals.c:117
#define GUC_check_errdetail
Definition: guc.h:448
GucSource
Definition: guc.h:108
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_REMOVE
Definition: hsearch.h:115
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:612
static void dlist_push_head(dlist_head *head, dlist_node *node)
Definition: ilist.h:347
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
void pfree(void *pointer)
Definition: mcxt.c:1520
void * palloc0(Size size)
Definition: mcxt.c:1346
void * palloc(Size size)
Definition: mcxt.c:1316
#define AmStartupProcess()
Definition: miscadmin.h:382
#define XLOG_CHECKPOINT_SHUTDOWN
Definition: pg_control.h:67
#define XLOG_END_OF_RECOVERY
Definition: pg_control.h:76
static rewind_source * source
Definition: pg_rewind.c:89
uintptr_t Datum
Definition: postgres.h:64
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
#define InvalidOid
Definition: postgres_ext.h:36
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
struct RelFileLocator RelFileLocator
#define RelFileLocatorEquals(locator1, locator2)
@ MAIN_FORKNUM
Definition: relpath.h:50
#define InvalidRelFileNumber
Definition: relpath.h:26
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
static pg_noinline void Size size
Definition: slab.c:607
BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:655
SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend)
Definition: smgr.c:198
bool smgrexists(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:398
#define XLOG_SMGR_CREATE
Definition: storage_xlog.h:30
#define XLOG_SMGR_TRUNCATE
Definition: storage_xlog.h:31
Buffer prefetch_buffer
Definition: xlogreader.h:130
RelFileLocator rlocator
Definition: xlogreader.h:125
BlockNumber blkno
Definition: xlogreader.h:127
ForkNumber forknum
Definition: xlogreader.h:126
XLogRecord header
Definition: xlogreader.h:166
XLogRecPtr lsn
Definition: xlogreader.h:164
Size keysize
Definition: hsearch.h:75
Definition: dynahash.c:220
XLogRecPtr lsn
uint32 max_inflight
LsnReadQueueNextFun next
uintptr_t lrq_private
struct LsnReadQueue::@17 queue[FLEXIBLE_ARRAY_MEMBER]
Buffer recent_buffer
Definition: bufmgr.h:60
RelFileLocator locator
RelFileNumber relNumber
TupleDesc setDesc
Definition: execnodes.h:340
Tuplestorestate * setResult
Definition: execnodes.h:339
RelFileLocatorBackend smgr_rlocator
Definition: smgr.h:37
pg_atomic_uint64 skip_fpw
pg_atomic_uint64 skip_init
pg_atomic_uint64 reset_time
pg_atomic_uint64 hit
pg_atomic_uint64 prefetch
pg_atomic_uint64 skip_rep
pg_atomic_uint64 skip_new
RelFileLocator rlocator
XLogRecPtr filter_until_replayed
BlockNumber filter_from_block
dlist_head filter_queue
XLogRecPtr no_readahead_until
XLogReaderState * reader
XLogRecPtr begin_ptr
RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
LsnReadQueue * streaming_read
DecodedXLogRecord * record
XLogRecPtr next_stats_shm_lsn
BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
DecodedXLogRecord * record
Definition: xlogreader.h:236
DecodedXLogRecord * decode_queue_head
Definition: xlogreader.h:260
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206
DecodedXLogRecord * decode_queue_tail
Definition: xlogreader.h:261
uint8 xl_info
Definition: xlogrecord.h:46
RmgrId xl_rmid
Definition: xlogrecord.h:47
ForkNumber forkNum
Definition: storage_xlog.h:36
RelFileLocator rlocator
Definition: storage_xlog.h:35
RelFileLocator rlocator
Definition: storage_xlog.h:49
BlockNumber blkno
Definition: storage_xlog.h:48
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:750
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void XLogPrefetchResetStats(void)
static bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno)
void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
int recovery_prefetch
struct LsnReadQueue LsnReadQueue
#define RecoveryPrefetchEnabled()
static LsnReadQueue * lrq_alloc(uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)
static void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
LsnReadQueueNextStatus(* LsnReadQueueNextFun)(uintptr_t lrq_private, XLogRecPtr *lsn)
static void lrq_free(LsnReadQueue *lrq)
struct XLogPrefetchStats XLogPrefetchStats
XLogRecord * XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
static void lrq_prefetch(LsnReadQueue *lrq)
static int XLogPrefetchReconfigureCount
Datum pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
static uint32 lrq_completed(LsnReadQueue *lrq)
static XLogPrefetchStats * SharedStats
static uint32 lrq_inflight(LsnReadQueue *lrq)
void XLogPrefetchReconfigure(void)
size_t XLogPrefetchShmemSize(void)
#define PG_STAT_GET_RECOVERY_PREFETCH_COLS
XLogPrefetcher * XLogPrefetcherAllocate(XLogReaderState *reader)
void XLogPrefetchShmemInit(void)
void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
void assign_recovery_prefetch(int new_value, void *extra)
static void XLogPrefetchIncrement(pg_atomic_uint64 *counter)
#define XLOGPREFETCHER_SEQ_WINDOW_SIZE
struct XLogPrefetcherFilter XLogPrefetcherFilter
static void lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
#define XLOGPREFETCHER_STATS_DISTANCE
static void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno, XLogRecPtr lsn)
#define XLOGPREFETCHER_DISTANCE_MULTIPLIER
void XLogPrefetcherFree(XLogPrefetcher *prefetcher)
bool check_recovery_prefetch(int *new_value, void **extra, GucSource source)
LsnReadQueueNextStatus
@ LRQ_NEXT_NO_IO
@ LRQ_NEXT_IO
@ LRQ_NEXT_AGAIN
XLogReaderState * XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
@ RECOVERY_PREFETCH_ON
@ RECOVERY_PREFETCH_TRY
DecodedXLogRecord * XLogNextRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:325
DecodedXLogRecord * XLogReadAhead(XLogReaderState *state, bool nonblocking)
Definition: xlogreader.c:966
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:231
XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state)
Definition: xlogreader.c:249
static bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
Definition: xlogreader.h:325
#define BKPBLOCK_WILL_INIT
Definition: xlogrecord.h:199
#define XLR_INFO_MASK
Definition: xlogrecord.h:62