PostgreSQL Source Code  git master
xlogprefetcher.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * xlogprefetcher.c
4  * Prefetching support for recovery.
5  *
6  * Portions Copyright (c) 2022, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/access/transam/xlogprefetcher.c
12  *
13  * This module provides a drop-in replacement for an XLogReader that tries to
14  * minimize I/O stalls by looking ahead in the WAL. If blocks that will be
15  * accessed in the near future are not already in the buffer pool, it initiates
16  * I/Os that might complete before the caller eventually needs the data. When
17  * referenced blocks are found in the buffer pool already, the buffer is
18  * recorded in the decoded record so that XLogReadBufferForRedo() can try to
19  * avoid a second buffer mapping table lookup.
20  *
21  * Currently, only the main fork is considered for prefetching. Currently,
22  * prefetching is only effective on systems where PrefetchBuffer() does
23  * something useful (mainly Linux).
24  *
25  *-------------------------------------------------------------------------
26  */
27 
28 #include "postgres.h"
29 
30 #include "access/xlog.h"
31 #include "access/xlogprefetcher.h"
32 #include "access/xlogreader.h"
33 #include "access/xlogutils.h"
34 #include "catalog/pg_class.h"
35 #include "catalog/pg_control.h"
36 #include "catalog/storage_xlog.h"
38 #include "utils/fmgrprotos.h"
39 #include "utils/timestamp.h"
40 #include "funcapi.h"
41 #include "pgstat.h"
42 #include "miscadmin.h"
43 #include "port/atomics.h"
44 #include "storage/bufmgr.h"
45 #include "storage/shmem.h"
46 #include "storage/smgr.h"
47 #include "utils/guc_hooks.h"
48 #include "utils/hsearch.h"
49 
50 /*
51  * Every time we process this much WAL, we'll update the values in
52  * pg_stat_recovery_prefetch.
53  */
54 #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
55 
56 /*
57  * To detect repeated access to the same block and skip useless extra system
58  * calls, we remember a small window of recently prefetched blocks.
59  */
60 #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
61 
62 /*
63  * When maintenance_io_concurrency is not saturated, we're prepared to look
64  * ahead up to N times that number of block references.
65  */
66 #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
67 
68 /* Define to log internal debugging messages. */
69 /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
70 
71 /* GUCs */
73 
74 #ifdef USE_PREFETCH
75 #define RecoveryPrefetchEnabled() \
76  (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
77  maintenance_io_concurrency > 0)
78 #else
79 #define RecoveryPrefetchEnabled() false
80 #endif
81 
83 
84 /*
85  * Enum used to report whether an IO should be started.
86  */
87 typedef enum
88 {
93 
94 /*
95  * Type of callback that can decide which block to prefetch next. For now
96  * there is only one.
97  */
98 typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
99  XLogRecPtr *lsn);
100 
101 /*
102  * A simple circular queue of LSNs, using to control the number of
103  * (potentially) inflight IOs. This stands in for a later more general IO
104  * control mechanism, which is why it has the apparently unnecessary
105  * indirection through a function pointer.
106  */
107 typedef struct LsnReadQueue
108 {
110  uintptr_t lrq_private;
117  struct
118  {
119  bool io;
123 
124 /*
125  * A prefetcher. This is a mechanism that wraps an XLogReader, prefetching
126  * blocks that will be soon be referenced, to try to avoid IO stalls.
127  */
129 {
130  /* WAL reader and current reading state. */
134 
135  /* When to publish stats. */
137 
138  /* Book-keeping to avoid accessing blocks that don't exist yet. */
141 
142  /* Book-keeping to avoid repeat prefetches. */
146 
147  /* Book-keeping to disable prefetching temporarily. */
149 
150  /* IO depth manager. */
152 
154 
156 };
157 
158 /*
159  * A temporary filter used to track block ranges that haven't been created
160  * yet, whole relations that haven't been created yet, and whole relations
161  * that (we assume) have already been dropped, or will be created by bulk WAL
162  * operators.
163  */
164 typedef struct XLogPrefetcherFilter
165 {
171 
172 /*
173  * Counters exposed in shared memory for pg_stat_recovery_prefetch.
174  */
175 typedef struct XLogPrefetchStats
176 {
177  pg_atomic_uint64 reset_time; /* Time of last reset. */
178  pg_atomic_uint64 prefetch; /* Prefetches initiated. */
179  pg_atomic_uint64 hit; /* Blocks already in cache. */
180  pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
181  pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */
182  pg_atomic_uint64 skip_fpw; /* FPWs skipped. */
183  pg_atomic_uint64 skip_rep; /* Repeat accesses skipped. */
184 
185  /* Dynamic values */
186  int wal_distance; /* Number of WAL bytes ahead. */
187  int block_distance; /* Number of block references ahead. */
188  int io_depth; /* Number of I/Os in progress. */
190 
191 static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
192  RelFileLocator rlocator,
193  BlockNumber blockno,
194  XLogRecPtr lsn);
195 static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
196  RelFileLocator rlocator,
197  BlockNumber blockno);
198 static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
199  XLogRecPtr replaying_lsn);
200 static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
201  XLogRecPtr *lsn);
202 
204 
205 static inline LsnReadQueue *
206 lrq_alloc(uint32 max_distance,
207  uint32 max_inflight,
208  uintptr_t lrq_private,
210 {
211  LsnReadQueue *lrq;
212  uint32 size;
213 
214  Assert(max_distance >= max_inflight);
215 
216  size = max_distance + 1; /* full ring buffer has a gap */
217  lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
218  lrq->lrq_private = lrq_private;
219  lrq->max_inflight = max_inflight;
220  lrq->size = size;
221  lrq->next = next;
222  lrq->head = 0;
223  lrq->tail = 0;
224  lrq->inflight = 0;
225  lrq->completed = 0;
226 
227  return lrq;
228 }
229 
230 static inline void
232 {
233  pfree(lrq);
234 }
235 
236 static inline uint32
238 {
239  return lrq->inflight;
240 }
241 
242 static inline uint32
244 {
245  return lrq->completed;
246 }
247 
248 static inline void
250 {
251  /* Try to start as many IOs as we can within our limits. */
252  while (lrq->inflight < lrq->max_inflight &&
253  lrq->inflight + lrq->completed < lrq->size - 1)
254  {
255  Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
256  switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
257  {
258  case LRQ_NEXT_AGAIN:
259  return;
260  case LRQ_NEXT_IO:
261  lrq->queue[lrq->head].io = true;
262  lrq->inflight++;
263  break;
264  case LRQ_NEXT_NO_IO:
265  lrq->queue[lrq->head].io = false;
266  lrq->completed++;
267  break;
268  }
269  lrq->head++;
270  if (lrq->head == lrq->size)
271  lrq->head = 0;
272  }
273 }
274 
275 static inline void
277 {
278  /*
279  * We know that LSNs before 'lsn' have been replayed, so we can now assume
280  * that any IOs that were started before then have finished.
281  */
282  while (lrq->tail != lrq->head &&
283  lrq->queue[lrq->tail].lsn < lsn)
284  {
285  if (lrq->queue[lrq->tail].io)
286  lrq->inflight--;
287  else
288  lrq->completed--;
289  lrq->tail++;
290  if (lrq->tail == lrq->size)
291  lrq->tail = 0;
292  }
294  lrq_prefetch(lrq);
295 }
296 
297 size_t
299 {
300  return sizeof(XLogPrefetchStats);
301 }
302 
303 /*
304  * Reset all counters to zero.
305  */
306 void
308 {
316 }
317 
318 void
320 {
321  bool found;
322 
324  ShmemInitStruct("XLogPrefetchStats",
325  sizeof(XLogPrefetchStats),
326  &found);
327 
328  if (!found)
329  {
337  }
338 }
339 
340 /*
341  * Called when any GUC is changed that affects prefetching.
342  */
343 void
345 {
347 }
348 
349 /*
350  * Increment a counter in shared memory. This is equivalent to *counter++ on a
351  * plain uint64 without any memory barrier or locking, except on platforms
352  * where readers can't read uint64 without possibly observing a torn value.
353  */
354 static inline void
356 {
358  pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
359 }
360 
361 /*
362  * Create a prefetcher that is ready to begin prefetching blocks referenced by
363  * WAL records.
364  */
367 {
368  XLogPrefetcher *prefetcher;
369  static HASHCTL hash_table_ctl = {
370  .keysize = sizeof(RelFileLocator),
371  .entrysize = sizeof(XLogPrefetcherFilter)
372  };
373 
374  prefetcher = palloc0(sizeof(XLogPrefetcher));
375 
376  prefetcher->reader = reader;
377  prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
378  &hash_table_ctl,
380  dlist_init(&prefetcher->filter_queue);
381 
384  SharedStats->io_depth = 0;
385 
386  /* First usage will cause streaming_read to be allocated. */
388 
389  return prefetcher;
390 }
391 
392 /*
393  * Destroy a prefetcher and release all resources.
394  */
395 void
397 {
398  lrq_free(prefetcher->streaming_read);
399  hash_destroy(prefetcher->filter_table);
400  pfree(prefetcher);
401 }
402 
403 /*
404  * Provide access to the reader.
405  */
408 {
409  return prefetcher->reader;
410 }
411 
412 /*
413  * Update the statistics visible in the pg_stat_recovery_prefetch view.
414  */
415 void
417 {
418  uint32 io_depth;
419  uint32 completed;
420  int64 wal_distance;
421 
422 
423  /* How far ahead of replay are we now? */
424  if (prefetcher->reader->decode_queue_tail)
425  {
426  wal_distance =
427  prefetcher->reader->decode_queue_tail->lsn -
428  prefetcher->reader->decode_queue_head->lsn;
429  }
430  else
431  {
432  wal_distance = 0;
433  }
434 
435  /* How many IOs are currently in flight and completed? */
436  io_depth = lrq_inflight(prefetcher->streaming_read);
437  completed = lrq_completed(prefetcher->streaming_read);
438 
439  /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
440  SharedStats->io_depth = io_depth;
441  SharedStats->block_distance = io_depth + completed;
442  SharedStats->wal_distance = wal_distance;
443 
444  prefetcher->next_stats_shm_lsn =
446 }
447 
448 /*
449  * A callback that examines the next block reference in the WAL, and possibly
450  * starts an IO so that a later read will be fast.
451  *
452  * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
453  *
454  * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
455  * that isn't in the buffer pool, and the kernel has been asked to start
456  * reading it to make a future read system call faster. An LSN is written to
457  * *lsn, and the I/O will be considered to have completed once that LSN is
458  * replayed.
459  *
460  * Returns LRQ_NO_IO if we examined the next block reference and found that it
461  * was already in the buffer pool, or we decided for various reasons not to
462  * prefetch.
463  */
465 XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
466 {
467  XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
468  XLogReaderState *reader = prefetcher->reader;
469  XLogRecPtr replaying_lsn = reader->ReadRecPtr;
470 
471  /*
472  * We keep track of the record and block we're up to between calls with
473  * prefetcher->record and prefetcher->next_block_id.
474  */
475  for (;;)
476  {
477  DecodedXLogRecord *record;
478 
479  /* Try to read a new future record, if we don't already have one. */
480  if (prefetcher->record == NULL)
481  {
482  bool nonblocking;
483 
484  /*
485  * If there are already records or an error queued up that could
486  * be replayed, we don't want to block here. Otherwise, it's OK
487  * to block waiting for more data: presumably the caller has
488  * nothing else to do.
489  */
490  nonblocking = XLogReaderHasQueuedRecordOrError(reader);
491 
492  /* Readahead is disabled until we replay past a certain point. */
493  if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
494  return LRQ_NEXT_AGAIN;
495 
496  record = XLogReadAhead(prefetcher->reader, nonblocking);
497  if (record == NULL)
498  {
499  /*
500  * We can't read any more, due to an error or lack of data in
501  * nonblocking mode. Don't try to read ahead again until
502  * we've replayed everything already decoded.
503  */
504  if (nonblocking && prefetcher->reader->decode_queue_tail)
505  prefetcher->no_readahead_until =
506  prefetcher->reader->decode_queue_tail->lsn;
507 
508  return LRQ_NEXT_AGAIN;
509  }
510 
511  /*
512  * If prefetching is disabled, we don't need to analyze the record
513  * or issue any prefetches. We just need to cause one record to
514  * be decoded.
515  */
517  {
518  *lsn = InvalidXLogRecPtr;
519  return LRQ_NEXT_NO_IO;
520  }
521 
522  /* We have a new record to process. */
523  prefetcher->record = record;
524  prefetcher->next_block_id = 0;
525  }
526  else
527  {
528  /* Continue to process from last call, or last loop. */
529  record = prefetcher->record;
530  }
531 
532  /*
533  * Check for operations that require us to filter out block ranges, or
534  * pause readahead completely.
535  */
536  if (replaying_lsn < record->lsn)
537  {
538  uint8 rmid = record->header.xl_rmid;
539  uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK;
540 
541  if (rmid == RM_XLOG_ID)
542  {
543  if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
544  record_type == XLOG_END_OF_RECOVERY)
545  {
546  /*
547  * These records might change the TLI. Avoid potential
548  * bugs if we were to allow "read TLI" and "replay TLI" to
549  * differ without more analysis.
550  */
551  prefetcher->no_readahead_until = record->lsn;
552 
553 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
554  elog(XLOGPREFETCHER_DEBUG_LEVEL,
555  "suppressing all readahead until %X/%X is replayed due to possible TLI change",
556  LSN_FORMAT_ARGS(record->lsn));
557 #endif
558 
559  /* Fall through so we move past this record. */
560  }
561  }
562  else if (rmid == RM_DBASE_ID)
563  {
564  /*
565  * When databases are created with the file-copy strategy,
566  * there are no WAL records to tell us about the creation of
567  * individual relations.
568  */
569  if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
570  {
573  RelFileLocator rlocator =
575 
576  /*
577  * Don't try to prefetch anything in this database until
578  * it has been created, or we might confuse the blocks of
579  * different generations, if a database OID or
580  * relfilenumber is reused. It's also more efficient than
581  * discovering that relations don't exist on disk yet with
582  * ENOENT errors.
583  */
584  XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
585 
586 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
587  elog(XLOGPREFETCHER_DEBUG_LEVEL,
588  "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
589  rlocator.dbOid,
590  LSN_FORMAT_ARGS(record->lsn));
591 #endif
592  }
593  }
594  else if (rmid == RM_SMGR_ID)
595  {
596  if (record_type == XLOG_SMGR_CREATE)
597  {
598  xl_smgr_create *xlrec = (xl_smgr_create *)
599  record->main_data;
600 
601  if (xlrec->forkNum == MAIN_FORKNUM)
602  {
603  /*
604  * Don't prefetch anything for this whole relation
605  * until it has been created. Otherwise we might
606  * confuse the blocks of different generations, if a
607  * relfilenumber is reused. This also avoids the need
608  * to discover the problem via extra syscalls that
609  * report ENOENT.
610  */
611  XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
612  record->lsn);
613 
614 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
615  elog(XLOGPREFETCHER_DEBUG_LEVEL,
616  "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
617  xlrec->rlocator.spcOid,
618  xlrec->rlocator.dbOid,
619  xlrec->rlocator.relNumber,
620  LSN_FORMAT_ARGS(record->lsn));
621 #endif
622  }
623  }
624  else if (record_type == XLOG_SMGR_TRUNCATE)
625  {
627  record->main_data;
628 
629  /*
630  * Don't consider prefetching anything in the truncated
631  * range until the truncation has been performed.
632  */
633  XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
634  xlrec->blkno,
635  record->lsn);
636 
637 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
638  elog(XLOGPREFETCHER_DEBUG_LEVEL,
639  "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
640  xlrec->rlocator.spcOid,
641  xlrec->rlocator.dbOid,
642  xlrec->rlocator.relNumber,
643  xlrec->blkno,
644  LSN_FORMAT_ARGS(record->lsn));
645 #endif
646  }
647  }
648  }
649 
650  /* Scan the block references, starting where we left off last time. */
651  while (prefetcher->next_block_id <= record->max_block_id)
652  {
653  int block_id = prefetcher->next_block_id++;
654  DecodedBkpBlock *block = &record->blocks[block_id];
655  SMgrRelation reln;
656  PrefetchBufferResult result;
657 
658  if (!block->in_use)
659  continue;
660 
662 
663  /*
664  * Record the LSN of this record. When it's replayed,
665  * LsnReadQueue will consider any IOs submitted for earlier LSNs
666  * to be finished.
667  */
668  *lsn = record->lsn;
669 
670  /* We don't try to prefetch anything but the main fork for now. */
671  if (block->forknum != MAIN_FORKNUM)
672  {
673  return LRQ_NEXT_NO_IO;
674  }
675 
676  /*
677  * If there is a full page image attached, we won't be reading the
678  * page, so don't bother trying to prefetch.
679  */
680  if (block->has_image)
681  {
683  return LRQ_NEXT_NO_IO;
684  }
685 
686  /* There is no point in reading a page that will be zeroed. */
687  if (block->flags & BKPBLOCK_WILL_INIT)
688  {
690  return LRQ_NEXT_NO_IO;
691  }
692 
693  /* Should we skip prefetching this block due to a filter? */
694  if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
695  {
697  return LRQ_NEXT_NO_IO;
698  }
699 
700  /* There is no point in repeatedly prefetching the same block. */
701  for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
702  {
703  if (block->blkno == prefetcher->recent_block[i] &&
704  RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
705  {
706  /*
707  * XXX If we also remembered where it was, we could set
708  * recent_buffer so that recovery could skip smgropen()
709  * and a buffer table lookup.
710  */
712  return LRQ_NEXT_NO_IO;
713  }
714  }
715  prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
716  prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
717  prefetcher->recent_idx =
718  (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
719 
720  /*
721  * We could try to have a fast path for repeated references to the
722  * same relation (with some scheme to handle invalidations
723  * safely), but for now we'll call smgropen() every time.
724  */
725  reln = smgropen(block->rlocator, InvalidBackendId);
726 
727  /*
728  * If the relation file doesn't exist on disk, for example because
729  * we're replaying after a crash and the file will be created and
730  * then unlinked by WAL that hasn't been replayed yet, suppress
731  * further prefetching in the relation until this record is
732  * replayed.
733  */
734  if (!smgrexists(reln, MAIN_FORKNUM))
735  {
736 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
737  elog(XLOGPREFETCHER_DEBUG_LEVEL,
738  "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
742  LSN_FORMAT_ARGS(record->lsn));
743 #endif
744  XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
745  record->lsn);
747  return LRQ_NEXT_NO_IO;
748  }
749 
750  /*
751  * If the relation isn't big enough to contain the referenced
752  * block yet, suppress prefetching of this block and higher until
753  * this record is replayed.
754  */
755  if (block->blkno >= smgrnblocks(reln, block->forknum))
756  {
757 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
758  elog(XLOGPREFETCHER_DEBUG_LEVEL,
759  "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
763  block->blkno,
764  LSN_FORMAT_ARGS(record->lsn));
765 #endif
766  XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
767  record->lsn);
769  return LRQ_NEXT_NO_IO;
770  }
771 
772  /* Try to initiate prefetching. */
773  result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
774  if (BufferIsValid(result.recent_buffer))
775  {
776  /* Cache hit, nothing to do. */
778  block->prefetch_buffer = result.recent_buffer;
779  return LRQ_NEXT_NO_IO;
780  }
781  else if (result.initiated_io)
782  {
783  /* Cache miss, I/O (presumably) started. */
786  return LRQ_NEXT_IO;
787  }
788  else
789  {
790  /*
791  * This shouldn't be possible, because we already determined
792  * that the relation exists on disk and is big enough.
793  * Something is wrong with the cache invalidation for
794  * smgrexists(), smgrnblocks(), or the file was unlinked or
795  * truncated beneath our feet?
796  */
797  elog(ERROR,
798  "could not prefetch relation %u/%u/%u block %u",
802  block->blkno);
803  }
804  }
805 
806  /*
807  * Several callsites need to be able to read exactly one record
808  * without any internal readahead. Examples: xlog.c reading
809  * checkpoint records with emode set to PANIC, which might otherwise
810  * cause XLogPageRead() to panic on some future page, and xlog.c
811  * determining where to start writing WAL next, which depends on the
812  * contents of the reader's internal buffer after reading one record.
813  * Therefore, don't even think about prefetching until the first
814  * record after XLogPrefetcherBeginRead() has been consumed.
815  */
816  if (prefetcher->reader->decode_queue_tail &&
817  prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
818  return LRQ_NEXT_AGAIN;
819 
820  /* Advance to the next record. */
821  prefetcher->record = NULL;
822  }
823  pg_unreachable();
824 }
825 
826 /*
827  * Expose statistics about recovery prefetching.
828  */
829 Datum
831 {
832 #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
833  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
836 
837  InitMaterializedSRF(fcinfo, 0);
838 
839  for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
840  nulls[i] = false;
841 
852  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
853 
854  return (Datum) 0;
855 }
856 
857 /*
858  * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
859  * has been replayed.
860  */
861 static inline void
863  BlockNumber blockno, XLogRecPtr lsn)
864 {
865  XLogPrefetcherFilter *filter;
866  bool found;
867 
868  filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
869  if (!found)
870  {
871  /*
872  * Don't allow any prefetching of this block or higher until replayed.
873  */
874  filter->filter_until_replayed = lsn;
875  filter->filter_from_block = blockno;
876  dlist_push_head(&prefetcher->filter_queue, &filter->link);
877  }
878  else
879  {
880  /*
881  * We were already filtering this rlocator. Extend the filter's
882  * lifetime to cover this WAL record, but leave the lower of the block
883  * numbers there because we don't want to have to track individual
884  * blocks.
885  */
886  filter->filter_until_replayed = lsn;
887  dlist_delete(&filter->link);
888  dlist_push_head(&prefetcher->filter_queue, &filter->link);
889  filter->filter_from_block = Min(filter->filter_from_block, blockno);
890  }
891 }
892 
893 /*
894  * Have we replayed any records that caused us to begin filtering a block
895  * range? That means that relations should have been created, extended or
896  * dropped as required, so we can stop filtering out accesses to a given
897  * relfilenumber.
898  */
899 static inline void
901 {
902  while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
903  {
905  link,
906  &prefetcher->filter_queue);
907 
908  if (filter->filter_until_replayed >= replaying_lsn)
909  break;
910 
911  dlist_delete(&filter->link);
912  hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
913  }
914 }
915 
916 /*
917  * Check if a given block should be skipped due to a filter.
918  */
919 static inline bool
921  BlockNumber blockno)
922 {
923  /*
924  * Test for empty queue first, because we expect it to be empty most of
925  * the time and we can avoid the hash table lookup in that case.
926  */
927  if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
928  {
929  XLogPrefetcherFilter *filter;
930 
931  /* See if the block range is filtered. */
932  filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
933  if (filter && filter->filter_from_block <= blockno)
934  {
935 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
936  elog(XLOGPREFETCHER_DEBUG_LEVEL,
937  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
938  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
940  filter->filter_from_block);
941 #endif
942  return true;
943  }
944 
945  /* See if the whole database is filtered. */
946  rlocator.relNumber = InvalidRelFileNumber;
947  rlocator.spcOid = InvalidOid;
948  filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
949  if (filter)
950  {
951 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
952  elog(XLOGPREFETCHER_DEBUG_LEVEL,
953  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
954  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
956 #endif
957  return true;
958  }
959  }
960 
961  return false;
962 }
963 
964 /*
965  * A wrapper for XLogBeginRead() that also resets the prefetcher.
966  */
967 void
969 {
970  /* This will forget about any in-flight IO. */
971  prefetcher->reconfigure_count--;
972 
973  /* Book-keeping to avoid readahead on first read. */
974  prefetcher->begin_ptr = recPtr;
975 
976  prefetcher->no_readahead_until = 0;
977 
978  /* This will forget about any queued up records in the decoder. */
979  XLogBeginRead(prefetcher->reader, recPtr);
980 }
981 
982 /*
983  * A wrapper for XLogReadRecord() that provides the same interface, but also
984  * tries to initiate I/O for blocks referenced in future WAL records.
985  */
986 XLogRecord *
988 {
989  DecodedXLogRecord *record;
990  XLogRecPtr replayed_up_to;
991 
992  /*
993  * See if it's time to reset the prefetching machinery, because a relevant
994  * GUC was changed.
995  */
997  {
998  uint32 max_distance;
999  uint32 max_inflight;
1000 
1001  if (prefetcher->streaming_read)
1002  lrq_free(prefetcher->streaming_read);
1003 
1005  {
1007  max_inflight = maintenance_io_concurrency;
1008  max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
1009  }
1010  else
1011  {
1012  max_inflight = 1;
1013  max_distance = 1;
1014  }
1015 
1016  prefetcher->streaming_read = lrq_alloc(max_distance,
1017  max_inflight,
1018  (uintptr_t) prefetcher,
1020 
1021  prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
1022  }
1023 
1024  /*
1025  * Release last returned record, if there is one, as it's now been
1026  * replayed.
1027  */
1028  replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
1029 
1030  /*
1031  * Can we drop any filters yet? If we were waiting for a relation to be
1032  * created or extended, it is now OK to access blocks in the covered
1033  * range.
1034  */
1035  XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
1036 
1037  /*
1038  * All IO initiated by earlier WAL is now completed. This might trigger
1039  * further prefetching.
1040  */
1041  lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1042 
1043  /*
1044  * If there's nothing queued yet, then start prefetching to cause at least
1045  * one record to be queued.
1046  */
1047  if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1048  {
1049  Assert(lrq_inflight(prefetcher->streaming_read) == 0);
1050  Assert(lrq_completed(prefetcher->streaming_read) == 0);
1051  lrq_prefetch(prefetcher->streaming_read);
1052  }
1053 
1054  /* Read the next record. */
1055  record = XLogNextRecord(prefetcher->reader, errmsg);
1056  if (!record)
1057  return NULL;
1058 
1059  /*
1060  * The record we just got is the "current" one, for the benefit of the
1061  * XLogRecXXX() macros.
1062  */
1063  Assert(record == prefetcher->reader->record);
1064 
1065  /*
1066  * If maintenance_io_concurrency is set very low, we might have started
1067  * prefetching some but not all of the blocks referenced in the record
1068  * we're about to return. Forget about the rest of the blocks in this
1069  * record by dropping the prefetcher's reference to it.
1070  */
1071  if (record == prefetcher->record)
1072  prefetcher->record = NULL;
1073 
1074  /*
1075  * See if it's time to compute some statistics, because enough WAL has
1076  * been processed.
1077  */
1078  if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
1079  XLogPrefetcherComputeStats(prefetcher);
1080 
1081  Assert(record == prefetcher->reader->record);
1082 
1083  return &record->header;
1084 }
1085 
1086 bool
1087 check_recovery_prefetch(int *new_value, void **extra, GucSource source)
1088 {
1089 #ifndef USE_PREFETCH
1090  if (*new_value == RECOVERY_PREFETCH_ON)
1091  {
1092  GUC_check_errdetail("recovery_prefetch is not supported on platforms that lack posix_fadvise().");
1093  return false;
1094  }
1095 #endif
1096 
1097  return true;
1098 }
1099 
1100 void
1101 assign_recovery_prefetch(int new_value, void *extra)
1102 {
1103  /* Reconfigure prefetching, because a setting it depends on changed. */
1104  recovery_prefetch = new_value;
1105  if (AmStartupProcess())
1107 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:433
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:410
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:424
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1573
#define InvalidBackendId
Definition: backendid.h:23
uint32 BlockNumber
Definition: block.h:31
static int32 next
Definition: blutils.c:219
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define InvalidBuffer
Definition: buf.h:25
PrefetchBufferResult PrefetchSharedBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, BlockNumber blockNum)
Definition: bufmgr.c:505
int maintenance_io_concurrency
Definition: bufmgr.c:152
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:228
unsigned int uint32
Definition: c.h:442
#define Min(x, y)
Definition: c.h:937
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:362
#define pg_unreachable()
Definition: c.h:280
#define unlikely(x)
Definition: c.h:295
unsigned char uint8
Definition: c.h:440
#define XLOG_DBASE_CREATE_FILE_COPY
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:863
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
int errmsg(const char *fmt,...)
Definition: elog.c:906
#define ERROR
Definition: elog.h:35
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1683
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
bool IsUnderPostmaster
Definition: globals.c:113
#define GUC_check_errdetail
Definition: guc.h:434
GucSource
Definition: guc.h:108
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_REMOVE
Definition: hsearch.h:115
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:325
static void dlist_delete(dlist_node *node)
Definition: ilist.h:394
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:562
static void dlist_push_head(dlist_head *head, dlist_node *node)
Definition: ilist.h:336
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
Assert(fmt[strlen(fmt) - 1] !='\n')
void pfree(void *pointer)
Definition: mcxt.c:1306
void * palloc0(Size size)
Definition: mcxt.c:1230
void * palloc(Size size)
Definition: mcxt.c:1199
#define AmStartupProcess()
Definition: miscadmin.h:440
#define XLOG_CHECKPOINT_SHUTDOWN
Definition: pg_control.h:67
#define XLOG_END_OF_RECOVERY
Definition: pg_control.h:76
static rewind_source * source
Definition: pg_rewind.c:81
uintptr_t Datum
Definition: postgres.h:412
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:560
#define InvalidOid
Definition: postgres_ext.h:36
struct RelFileLocator RelFileLocator
#define RelFileLocatorEquals(locator1, locator2)
@ MAIN_FORKNUM
Definition: relpath.h:50
#define InvalidRelFileNumber
Definition: relpath.h:26
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:579
SMgrRelation smgropen(RelFileLocator rlocator, BackendId backend)
Definition: smgr.c:146
bool smgrexists(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:247
#define XLOG_SMGR_CREATE
Definition: storage_xlog.h:30
#define XLOG_SMGR_TRUNCATE
Definition: storage_xlog.h:31
Buffer prefetch_buffer
Definition: xlogreader.h:130
RelFileLocator rlocator
Definition: xlogreader.h:125
BlockNumber blkno
Definition: xlogreader.h:127
ForkNumber forknum
Definition: xlogreader.h:126
XLogRecord header
Definition: xlogreader.h:166
XLogRecPtr lsn
Definition: xlogreader.h:164
Size keysize
Definition: hsearch.h:75
Definition: dynahash.c:220
XLogRecPtr lsn
struct LsnReadQueue::@14 queue[FLEXIBLE_ARRAY_MEMBER]
uint32 max_inflight
LsnReadQueueNextFun next
uintptr_t lrq_private
Buffer recent_buffer
Definition: bufmgr.h:54
RelFileLocator locator
RelFileNumber relNumber
TupleDesc setDesc
Definition: execnodes.h:332
Tuplestorestate * setResult
Definition: execnodes.h:331
RelFileLocatorBackend smgr_rlocator
Definition: smgr.h:42
pg_atomic_uint64 skip_fpw
pg_atomic_uint64 skip_init
pg_atomic_uint64 reset_time
pg_atomic_uint64 hit
pg_atomic_uint64 prefetch
pg_atomic_uint64 skip_rep
pg_atomic_uint64 skip_new
RelFileLocator rlocator
XLogRecPtr filter_until_replayed
BlockNumber filter_from_block
dlist_head filter_queue
XLogRecPtr no_readahead_until
XLogReaderState * reader
XLogRecPtr begin_ptr
RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
LsnReadQueue * streaming_read
DecodedXLogRecord * record
XLogRecPtr next_stats_shm_lsn
BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
DecodedXLogRecord * record
Definition: xlogreader.h:236
DecodedXLogRecord * decode_queue_head
Definition: xlogreader.h:260
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206
DecodedXLogRecord * decode_queue_tail
Definition: xlogreader.h:261
uint8 xl_info
Definition: xlogrecord.h:46
RmgrId xl_rmid
Definition: xlogrecord.h:47
ForkNumber forkNum
Definition: storage_xlog.h:36
RelFileLocator rlocator
Definition: storage_xlog.h:35
RelFileLocator rlocator
Definition: storage_xlog.h:49
BlockNumber blkno
Definition: storage_xlog.h:48
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void XLogPrefetchResetStats(void)
static bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno)
void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
int recovery_prefetch
struct LsnReadQueue LsnReadQueue
#define RecoveryPrefetchEnabled()
static LsnReadQueue * lrq_alloc(uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)
static void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
LsnReadQueueNextStatus(* LsnReadQueueNextFun)(uintptr_t lrq_private, XLogRecPtr *lsn)
static void lrq_free(LsnReadQueue *lrq)
struct XLogPrefetchStats XLogPrefetchStats
XLogRecord * XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
static void lrq_prefetch(LsnReadQueue *lrq)
static int XLogPrefetchReconfigureCount
Datum pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
static uint32 lrq_completed(LsnReadQueue *lrq)
static XLogPrefetchStats * SharedStats
static uint32 lrq_inflight(LsnReadQueue *lrq)
void XLogPrefetchReconfigure(void)
size_t XLogPrefetchShmemSize(void)
#define PG_STAT_GET_RECOVERY_PREFETCH_COLS
XLogPrefetcher * XLogPrefetcherAllocate(XLogReaderState *reader)
void XLogPrefetchShmemInit(void)
void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
void assign_recovery_prefetch(int new_value, void *extra)
static void XLogPrefetchIncrement(pg_atomic_uint64 *counter)
#define XLOGPREFETCHER_SEQ_WINDOW_SIZE
struct XLogPrefetcherFilter XLogPrefetcherFilter
static void lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
#define XLOGPREFETCHER_STATS_DISTANCE
static void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno, XLogRecPtr lsn)
#define XLOGPREFETCHER_DISTANCE_MULTIPLIER
void XLogPrefetcherFree(XLogPrefetcher *prefetcher)
bool check_recovery_prefetch(int *new_value, void **extra, GucSource source)
LsnReadQueueNextStatus
@ LRQ_NEXT_NO_IO
@ LRQ_NEXT_IO
@ LRQ_NEXT_AGAIN
XLogReaderState * XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
@ RECOVERY_PREFETCH_ON
@ RECOVERY_PREFETCH_TRY
DecodedXLogRecord * XLogNextRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:358
DecodedXLogRecord * XLogReadAhead(XLogReaderState *state, bool nonblocking)
Definition: xlogreader.c:954
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:264
XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state)
Definition: xlogreader.c:282
static bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
Definition: xlogreader.h:325
#define BKPBLOCK_WILL_INIT
Definition: xlogrecord.h:188
#define XLR_INFO_MASK
Definition: xlogrecord.h:62