PostgreSQL Source Code git master
Loading...
Searching...
No Matches
xlogprefetcher.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * xlogprefetcher.c
4 * Prefetching support for recovery.
5 *
6 * Portions Copyright (c) 2022-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/access/transam/xlogprefetcher.c
12 *
13 * This module provides a drop-in replacement for an XLogReader that tries to
14 * minimize I/O stalls by looking ahead in the WAL. If blocks that will be
15 * accessed in the near future are not already in the buffer pool, it initiates
16 * I/Os that might complete before the caller eventually needs the data. When
17 * referenced blocks are found in the buffer pool already, the buffer is
18 * recorded in the decoded record so that XLogReadBufferForRedo() can try to
19 * avoid a second buffer mapping table lookup.
20 *
21 * Currently, only the main fork is considered for prefetching. Currently,
22 * prefetching is only effective on systems where PrefetchBuffer() does
23 * something useful (mainly Linux).
24 *
25 *-------------------------------------------------------------------------
26 */
27
28#include "postgres.h"
29
31#include "access/xlogreader.h"
32#include "catalog/pg_control.h"
35#include "funcapi.h"
36#include "miscadmin.h"
37#include "port/atomics.h"
38#include "storage/bufmgr.h"
39#include "storage/fd.h"
40#include "storage/shmem.h"
41#include "storage/smgr.h"
42#include "utils/fmgrprotos.h"
43#include "utils/guc_hooks.h"
44#include "utils/hsearch.h"
45#include "utils/timestamp.h"
46#include "utils/tuplestore.h"
47
48/*
49 * Every time we process this much WAL, we'll update the values in
50 * pg_stat_recovery_prefetch.
51 */
52#define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
53
54/*
55 * To detect repeated access to the same block and skip useless extra system
56 * calls, we remember a small window of recently prefetched blocks.
57 */
58#define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
59
60/*
61 * When maintenance_io_concurrency is not saturated, we're prepared to look
62 * ahead up to N times that number of block references.
63 */
64#define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
65
66/* Define to log internal debugging messages. */
67/* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
68
69/* GUCs */
71
72#ifdef USE_PREFETCH
73#define RecoveryPrefetchEnabled() \
74 (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
75 maintenance_io_concurrency > 0)
76#else
77#define RecoveryPrefetchEnabled() false
78#endif
79
81
82/*
83 * Enum used to report whether an IO should be started.
84 */
91
92/*
93 * Type of callback that can decide which block to prefetch next. For now
94 * there is only one.
95 */
97 XLogRecPtr *lsn);
98
99/*
100 * A simple circular queue of LSNs, using to control the number of
101 * (potentially) inflight IOs. This stands in for a later more general IO
102 * control mechanism, which is why it has the apparently unnecessary
103 * indirection through a function pointer.
104 */
121
122/*
123 * A prefetcher. This is a mechanism that wraps an XLogReader, prefetching
124 * blocks that will be soon be referenced, to try to avoid IO stalls.
125 */
127{
128 /* WAL reader and current reading state. */
132
133 /* When to publish stats. */
135
136 /* Book-keeping to avoid accessing blocks that don't exist yet. */
139
140 /* Book-keeping to avoid repeat prefetches. */
144
145 /* Book-keeping to disable prefetching temporarily. */
147
148 /* IO depth manager. */
150
152
154};
155
156/*
157 * A temporary filter used to track block ranges that haven't been created
158 * yet, whole relations that haven't been created yet, and whole relations
159 * that (we assume) have already been dropped, or will be created by bulk WAL
160 * operators.
161 */
169
170/*
171 * Counters exposed in shared memory for pg_stat_recovery_prefetch.
172 */
173typedef struct XLogPrefetchStats
174{
175 pg_atomic_uint64 reset_time; /* Time of last reset. */
176 pg_atomic_uint64 prefetch; /* Prefetches initiated. */
177 pg_atomic_uint64 hit; /* Blocks already in cache. */
178 pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
179 pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */
180 pg_atomic_uint64 skip_fpw; /* FPWs skipped. */
181 pg_atomic_uint64 skip_rep; /* Repeat accesses skipped. */
182
183 /* Dynamic values */
184 int wal_distance; /* Number of WAL bytes ahead. */
185 int block_distance; /* Number of block references ahead. */
186 int io_depth; /* Number of I/Os in progress. */
188
190 RelFileLocator rlocator,
191 BlockNumber blockno,
192 XLogRecPtr lsn);
194 RelFileLocator rlocator,
195 BlockNumber blockno);
199 XLogRecPtr *lsn);
200
202
203static inline LsnReadQueue *
205 uint32 max_inflight,
206 uintptr_t lrq_private,
208{
210 uint32 size;
211
212 Assert(max_distance >= max_inflight);
213
214 size = max_distance + 1; /* full ring buffer has a gap */
215 lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
216 lrq->lrq_private = lrq_private;
217 lrq->max_inflight = max_inflight;
218 lrq->size = size;
219 lrq->next = next;
220 lrq->head = 0;
221 lrq->tail = 0;
222 lrq->inflight = 0;
223 lrq->completed = 0;
224
225 return lrq;
226}
227
228static inline void
233
234static inline uint32
236{
237 return lrq->inflight;
238}
239
240static inline uint32
242{
243 return lrq->completed;
244}
245
246static inline void
248{
249 /* Try to start as many IOs as we can within our limits. */
250 while (lrq->inflight < lrq->max_inflight &&
251 lrq->inflight + lrq->completed < lrq->size - 1)
252 {
253 Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
254 switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
255 {
256 case LRQ_NEXT_AGAIN:
257 return;
258 case LRQ_NEXT_IO:
259 lrq->queue[lrq->head].io = true;
260 lrq->inflight++;
261 break;
262 case LRQ_NEXT_NO_IO:
263 lrq->queue[lrq->head].io = false;
264 lrq->completed++;
265 break;
266 }
267 lrq->head++;
268 if (lrq->head == lrq->size)
269 lrq->head = 0;
270 }
271}
272
273static inline void
275{
276 /*
277 * We know that LSNs before 'lsn' have been replayed, so we can now assume
278 * that any IOs that were started before then have finished.
279 */
280 while (lrq->tail != lrq->head &&
281 lrq->queue[lrq->tail].lsn < lsn)
282 {
283 if (lrq->queue[lrq->tail].io)
284 lrq->inflight--;
285 else
286 lrq->completed--;
287 lrq->tail++;
288 if (lrq->tail == lrq->size)
289 lrq->tail = 0;
290 }
293}
294
295size_t
297{
298 return sizeof(XLogPrefetchStats);
299}
300
301/*
302 * Reset all counters to zero.
303 */
304void
315
316void
337
338/*
339 * Called when any GUC is changed that affects prefetching.
340 */
341void
346
347/*
348 * Increment a counter in shared memory. This is equivalent to *counter++ on a
349 * plain uint64 without any memory barrier or locking, except on platforms
350 * where readers can't read uint64 without possibly observing a torn value.
351 */
352static inline void
358
359/*
360 * Create a prefetcher that is ready to begin prefetching blocks referenced by
361 * WAL records.
362 */
365{
367 HASHCTL ctl;
368
370 prefetcher->reader = reader;
371
372 ctl.keysize = sizeof(RelFileLocator);
373 ctl.entrysize = sizeof(XLogPrefetcherFilter);
374 prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
376 dlist_init(&prefetcher->filter_queue);
377
381
382 /* First usage will cause streaming_read to be allocated. */
383 prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1;
384
385 return prefetcher;
386}
387
388/*
389 * Destroy a prefetcher and release all resources.
390 */
391void
393{
394 lrq_free(prefetcher->streaming_read);
395 hash_destroy(prefetcher->filter_table);
397}
398
399/*
400 * Provide access to the reader.
401 */
407
408/*
409 * Update the statistics visible in the pg_stat_recovery_prefetch view.
410 */
411void
413{
414 uint32 io_depth;
415 uint32 completed;
416 int64 wal_distance;
417
418
419 /* How far ahead of replay are we now? */
420 if (prefetcher->reader->decode_queue_tail)
421 {
422 wal_distance =
424 prefetcher->reader->decode_queue_head->lsn;
425 }
426 else
427 {
428 wal_distance = 0;
429 }
430
431 /* How many IOs are currently in flight and completed? */
432 io_depth = lrq_inflight(prefetcher->streaming_read);
433 completed = lrq_completed(prefetcher->streaming_read);
434
435 /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
436 SharedStats->io_depth = io_depth;
437 SharedStats->block_distance = io_depth + completed;
438 SharedStats->wal_distance = wal_distance;
439
440 prefetcher->next_stats_shm_lsn =
441 prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE;
442}
443
444/*
445 * A callback that examines the next block reference in the WAL, and possibly
446 * starts an IO so that a later read will be fast.
447 *
448 * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
449 *
450 * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
451 * that isn't in the buffer pool, and the kernel has been asked to start
452 * reading it to make a future read system call faster. An LSN is written to
453 * *lsn, and the I/O will be considered to have completed once that LSN is
454 * replayed.
455 *
456 * Returns LRQ_NEXT_NO_IO if we examined the next block reference and found
457 * that it was already in the buffer pool, or we decided for various reasons
458 * not to prefetch.
459 */
462{
464 XLogReaderState *reader = prefetcher->reader;
466
467 /*
468 * We keep track of the record and block we're up to between calls with
469 * prefetcher->record and prefetcher->next_block_id.
470 */
471 for (;;)
472 {
473 DecodedXLogRecord *record;
474
475 /* Try to read a new future record, if we don't already have one. */
476 if (prefetcher->record == NULL)
477 {
478 bool nonblocking;
479
480 /*
481 * If there are already records or an error queued up that could
482 * be replayed, we don't want to block here. Otherwise, it's OK
483 * to block waiting for more data: presumably the caller has
484 * nothing else to do.
485 */
486 nonblocking = XLogReaderHasQueuedRecordOrError(reader);
487
488 /* Readahead is disabled until we replay past a certain point. */
489 if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
490 return LRQ_NEXT_AGAIN;
491
492 record = XLogReadAhead(prefetcher->reader, nonblocking);
493 if (record == NULL)
494 {
495 /*
496 * We can't read any more, due to an error or lack of data in
497 * nonblocking mode. Don't try to read ahead again until
498 * we've replayed everything already decoded.
499 */
500 if (nonblocking && prefetcher->reader->decode_queue_tail)
501 prefetcher->no_readahead_until =
502 prefetcher->reader->decode_queue_tail->lsn;
503
504 return LRQ_NEXT_AGAIN;
505 }
506
507 /*
508 * If prefetching is disabled, we don't need to analyze the record
509 * or issue any prefetches. We just need to cause one record to
510 * be decoded.
511 */
513 {
514 *lsn = InvalidXLogRecPtr;
515 return LRQ_NEXT_NO_IO;
516 }
517
518 /* We have a new record to process. */
519 prefetcher->record = record;
520 prefetcher->next_block_id = 0;
521 }
522 else
523 {
524 /* Continue to process from last call, or last loop. */
525 record = prefetcher->record;
526 }
527
528 /*
529 * Check for operations that require us to filter out block ranges, or
530 * pause readahead completely.
531 */
533 {
534 uint8 rmid = record->header.xl_rmid;
535 uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK;
536
537 if (rmid == RM_XLOG_ID)
538 {
539 if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
540 record_type == XLOG_END_OF_RECOVERY)
541 {
542 /*
543 * These records might change the TLI. Avoid potential
544 * bugs if we were to allow "read TLI" and "replay TLI" to
545 * differ without more analysis.
546 */
547 prefetcher->no_readahead_until = record->lsn;
548
549#ifdef XLOGPREFETCHER_DEBUG_LEVEL
551 "suppressing all readahead until %X/%08X is replayed due to possible TLI change",
552 LSN_FORMAT_ARGS(record->lsn));
553#endif
554
555 /* Fall through so we move past this record. */
556 }
557 }
558 else if (rmid == RM_DBASE_ID)
559 {
560 /*
561 * When databases are created with the file-copy strategy,
562 * there are no WAL records to tell us about the creation of
563 * individual relations.
564 */
565 if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
566 {
569 RelFileLocator rlocator =
571
572 /*
573 * Don't try to prefetch anything in this database until
574 * it has been created, or we might confuse the blocks of
575 * different generations, if a database OID or
576 * relfilenumber is reused. It's also more efficient than
577 * discovering that relations don't exist on disk yet with
578 * ENOENT errors.
579 */
580 XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
581
582#ifdef XLOGPREFETCHER_DEBUG_LEVEL
584 "suppressing prefetch in database %u until %X/%08X is replayed due to raw file copy",
585 rlocator.dbOid,
586 LSN_FORMAT_ARGS(record->lsn));
587#endif
588 }
589 }
590 else if (rmid == RM_SMGR_ID)
591 {
592 if (record_type == XLOG_SMGR_CREATE)
593 {
595 record->main_data;
596
597 if (xlrec->forkNum == MAIN_FORKNUM)
598 {
599 /*
600 * Don't prefetch anything for this whole relation
601 * until it has been created. Otherwise we might
602 * confuse the blocks of different generations, if a
603 * relfilenumber is reused. This also avoids the need
604 * to discover the problem via extra syscalls that
605 * report ENOENT.
606 */
608 record->lsn);
609
610#ifdef XLOGPREFETCHER_DEBUG_LEVEL
612 "suppressing prefetch in relation %u/%u/%u until %X/%08X is replayed, which creates the relation",
613 xlrec->rlocator.spcOid,
614 xlrec->rlocator.dbOid,
615 xlrec->rlocator.relNumber,
616 LSN_FORMAT_ARGS(record->lsn));
617#endif
618 }
619 }
620 else if (record_type == XLOG_SMGR_TRUNCATE)
621 {
623 record->main_data;
624
625 /*
626 * Don't consider prefetching anything in the truncated
627 * range until the truncation has been performed.
628 */
630 xlrec->blkno,
631 record->lsn);
632
633#ifdef XLOGPREFETCHER_DEBUG_LEVEL
635 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, which truncates the relation",
636 xlrec->rlocator.spcOid,
637 xlrec->rlocator.dbOid,
638 xlrec->rlocator.relNumber,
639 xlrec->blkno,
640 LSN_FORMAT_ARGS(record->lsn));
641#endif
642 }
643 }
644 }
645
646 /* Scan the block references, starting where we left off last time. */
647 while (prefetcher->next_block_id <= record->max_block_id)
648 {
649 int block_id = prefetcher->next_block_id++;
650 DecodedBkpBlock *block = &record->blocks[block_id];
653
654 if (!block->in_use)
655 continue;
656
658
659 /*
660 * Record the LSN of this record. When it's replayed,
661 * LsnReadQueue will consider any IOs submitted for earlier LSNs
662 * to be finished.
663 */
664 *lsn = record->lsn;
665
666 /* We don't try to prefetch anything but the main fork for now. */
667 if (block->forknum != MAIN_FORKNUM)
668 {
669 return LRQ_NEXT_NO_IO;
670 }
671
672 /*
673 * If there is a full page image attached, we won't be reading the
674 * page, so don't bother trying to prefetch.
675 */
676 if (block->has_image)
677 {
679 return LRQ_NEXT_NO_IO;
680 }
681
682 /* There is no point in reading a page that will be zeroed. */
683 if (block->flags & BKPBLOCK_WILL_INIT)
684 {
686 return LRQ_NEXT_NO_IO;
687 }
688
689 /* Should we skip prefetching this block due to a filter? */
691 {
693 return LRQ_NEXT_NO_IO;
694 }
695
696 /* There is no point in repeatedly prefetching the same block. */
697 for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
698 {
699 if (block->blkno == prefetcher->recent_block[i] &&
700 RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
701 {
702 /*
703 * XXX If we also remembered where it was, we could set
704 * recent_buffer so that recovery could skip smgropen()
705 * and a buffer table lookup.
706 */
708 return LRQ_NEXT_NO_IO;
709 }
710 }
711 prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
712 prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
713 prefetcher->recent_idx =
714 (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
715
716 /*
717 * We could try to have a fast path for repeated references to the
718 * same relation (with some scheme to handle invalidations
719 * safely), but for now we'll call smgropen() every time.
720 */
722
723 /*
724 * If the relation file doesn't exist on disk, for example because
725 * we're replaying after a crash and the file will be created and
726 * then unlinked by WAL that hasn't been replayed yet, suppress
727 * further prefetching in the relation until this record is
728 * replayed.
729 */
731 {
732#ifdef XLOGPREFETCHER_DEBUG_LEVEL
734 "suppressing all prefetch in relation %u/%u/%u until %X/%08X is replayed, because the relation does not exist on disk",
735 reln->smgr_rlocator.locator.spcOid,
736 reln->smgr_rlocator.locator.dbOid,
737 reln->smgr_rlocator.locator.relNumber,
738 LSN_FORMAT_ARGS(record->lsn));
739#endif
741 record->lsn);
743 return LRQ_NEXT_NO_IO;
744 }
745
746 /*
747 * If the relation isn't big enough to contain the referenced
748 * block yet, suppress prefetching of this block and higher until
749 * this record is replayed.
750 */
751 if (block->blkno >= smgrnblocks(reln, block->forknum))
752 {
753#ifdef XLOGPREFETCHER_DEBUG_LEVEL
755 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, because the relation is too small",
756 reln->smgr_rlocator.locator.spcOid,
757 reln->smgr_rlocator.locator.dbOid,
758 reln->smgr_rlocator.locator.relNumber,
759 block->blkno,
760 LSN_FORMAT_ARGS(record->lsn));
761#endif
763 record->lsn);
765 return LRQ_NEXT_NO_IO;
766 }
767
768 /* Try to initiate prefetching. */
769 result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
770 if (BufferIsValid(result.recent_buffer))
771 {
772 /* Cache hit, nothing to do. */
774 block->prefetch_buffer = result.recent_buffer;
775 return LRQ_NEXT_NO_IO;
776 }
777 else if (result.initiated_io)
778 {
779 /* Cache miss, I/O (presumably) started. */
782 return LRQ_NEXT_IO;
783 }
784 else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
785 {
786 /*
787 * This shouldn't be possible, because we already determined
788 * that the relation exists on disk and is big enough.
789 * Something is wrong with the cache invalidation for
790 * smgrexists(), smgrnblocks(), or the file was unlinked or
791 * truncated beneath our feet?
792 */
793 elog(ERROR,
794 "could not prefetch relation %u/%u/%u block %u",
795 reln->smgr_rlocator.locator.spcOid,
796 reln->smgr_rlocator.locator.dbOid,
797 reln->smgr_rlocator.locator.relNumber,
798 block->blkno);
799 }
800 }
801
802 /*
803 * Several callsites need to be able to read exactly one record
804 * without any internal readahead. Examples: xlog.c reading
805 * checkpoint records with emode set to PANIC, which might otherwise
806 * cause XLogPageRead() to panic on some future page, and xlog.c
807 * determining where to start writing WAL next, which depends on the
808 * contents of the reader's internal buffer after reading one record.
809 * Therefore, don't even think about prefetching until the first
810 * record after XLogPrefetcherBeginRead() has been consumed.
811 */
812 if (prefetcher->reader->decode_queue_tail &&
813 prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
814 return LRQ_NEXT_AGAIN;
815
816 /* Advance to the next record. */
817 prefetcher->record = NULL;
818 }
820}
821
822/*
823 * Expose statistics about recovery prefetching.
824 */
825Datum
852
853/*
854 * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
855 * has been replayed.
856 */
857static inline void
859 BlockNumber blockno, XLogRecPtr lsn)
860{
861 XLogPrefetcherFilter *filter;
862 bool found;
863
864 filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
865 if (!found)
866 {
867 /*
868 * Don't allow any prefetching of this block or higher until replayed.
869 */
870 filter->filter_until_replayed = lsn;
871 filter->filter_from_block = blockno;
872 dlist_push_head(&prefetcher->filter_queue, &filter->link);
873 }
874 else
875 {
876 /*
877 * We were already filtering this rlocator. Extend the filter's
878 * lifetime to cover this WAL record, but leave the lower of the block
879 * numbers there because we don't want to have to track individual
880 * blocks.
881 */
882 filter->filter_until_replayed = lsn;
883 dlist_delete(&filter->link);
884 dlist_push_head(&prefetcher->filter_queue, &filter->link);
885 filter->filter_from_block = Min(filter->filter_from_block, blockno);
886 }
887}
888
889/*
890 * Have we replayed any records that caused us to begin filtering a block
891 * range? That means that relations should have been created, extended or
892 * dropped as required, so we can stop filtering out accesses to a given
893 * relfilenumber.
894 */
895static inline void
897{
898 while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
899 {
901 link,
902 &prefetcher->filter_queue);
903
905 break;
906
907 dlist_delete(&filter->link);
908 hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
909 }
910}
911
912/*
913 * Check if a given block should be skipped due to a filter.
914 */
915static inline bool
917 BlockNumber blockno)
918{
919 /*
920 * Test for empty queue first, because we expect it to be empty most of
921 * the time and we can avoid the hash table lookup in that case.
922 */
923 if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
924 {
925 XLogPrefetcherFilter *filter;
926
927 /* See if the block range is filtered. */
928 filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
929 if (filter && filter->filter_from_block <= blockno)
930 {
931#ifdef XLOGPREFETCHER_DEBUG_LEVEL
933 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (blocks >= %u filtered)",
934 rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
936 filter->filter_from_block);
937#endif
938 return true;
939 }
940
941 /* See if the whole database is filtered. */
943 rlocator.spcOid = InvalidOid;
944 filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
945 if (filter)
946 {
947#ifdef XLOGPREFETCHER_DEBUG_LEVEL
949 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (whole database)",
950 rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
952#endif
953 return true;
954 }
955 }
956
957 return false;
958}
959
960/*
961 * A wrapper for XLogBeginRead() that also resets the prefetcher.
962 */
963void
965{
966 /* This will forget about any in-flight IO. */
967 prefetcher->reconfigure_count--;
968
969 /* Book-keeping to avoid readahead on first read. */
970 prefetcher->begin_ptr = recPtr;
971
972 prefetcher->no_readahead_until = InvalidXLogRecPtr;
973
974 /* This will forget about any queued up records in the decoder. */
976}
977
978/*
979 * A wrapper for XLogReadRecord() that provides the same interface, but also
980 * tries to initiate I/O for blocks referenced in future WAL records.
981 */
984{
985 DecodedXLogRecord *record;
987
988 /*
989 * See if it's time to reset the prefetching machinery, because a relevant
990 * GUC was changed.
991 */
992 if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
993 {
995 uint32 max_inflight;
996
997 if (prefetcher->streaming_read)
998 lrq_free(prefetcher->streaming_read);
999
1001 {
1003 max_inflight = maintenance_io_concurrency;
1005 }
1006 else
1007 {
1008 max_inflight = 1;
1009 max_distance = 1;
1010 }
1011
1012 prefetcher->streaming_read = lrq_alloc(max_distance,
1013 max_inflight,
1016
1017 prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
1018 }
1019
1020 /*
1021 * Release last returned record, if there is one, as it's now been
1022 * replayed.
1023 */
1025
1026 /*
1027 * Can we drop any filters yet? If we were waiting for a relation to be
1028 * created or extended, it is now OK to access blocks in the covered
1029 * range.
1030 */
1032
1033 /*
1034 * All IO initiated by earlier WAL is now completed. This might trigger
1035 * further prefetching.
1036 */
1037 lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1038
1039 /*
1040 * If there's nothing queued yet, then start prefetching to cause at least
1041 * one record to be queued.
1042 */
1044 {
1045 Assert(lrq_inflight(prefetcher->streaming_read) == 0);
1046 Assert(lrq_completed(prefetcher->streaming_read) == 0);
1047 lrq_prefetch(prefetcher->streaming_read);
1048 }
1049
1050 /* Read the next record. */
1051 record = XLogNextRecord(prefetcher->reader, errmsg);
1052 if (!record)
1053 return NULL;
1054
1055 /*
1056 * The record we just got is the "current" one, for the benefit of the
1057 * XLogRecXXX() macros.
1058 */
1059 Assert(record == prefetcher->reader->record);
1060
1061 /*
1062 * If maintenance_io_concurrency is set very low, we might have started
1063 * prefetching some but not all of the blocks referenced in the record
1064 * we're about to return. Forget about the rest of the blocks in this
1065 * record by dropping the prefetcher's reference to it.
1066 */
1067 if (record == prefetcher->record)
1068 prefetcher->record = NULL;
1069
1070 /*
1071 * See if it's time to compute some statistics, because enough WAL has
1072 * been processed.
1073 */
1074 if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
1076
1077 Assert(record == prefetcher->reader->record);
1078
1079 return &record->header;
1080}
1081
1082bool
1084{
1085#ifndef USE_PREFETCH
1087 {
1088 GUC_check_errdetail("\"recovery_prefetch\" is not supported on platforms that lack support for issuing read-ahead advice.");
1089 return false;
1090 }
1091#endif
1092
1093 return true;
1094}
1095
1096void
1098{
1099 /* Reconfigure prefetching, because a setting it depends on changed. */
1101 if (AmStartupProcess())
1103}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:485
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:453
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition atomics.h:467
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1636
uint32 BlockNumber
Definition block.h:31
static int32 next
Definition blutils.c:225
static Datum values[MAXATTR]
Definition bootstrap.c:188
#define InvalidBuffer
Definition buf.h:25
PrefetchBufferResult PrefetchSharedBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, BlockNumber blockNum)
Definition bufmgr.c:692
int maintenance_io_concurrency
Definition bufmgr.c:207
static bool BufferIsValid(Buffer bufnum)
Definition bufmgr.h:421
#define Min(x, y)
Definition c.h:1093
uint8_t uint8
Definition c.h:616
#define Assert(condition)
Definition c.h:945
int64_t int64
Definition c.h:615
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:552
#define pg_unreachable()
Definition c.h:361
#define unlikely(x)
Definition c.h:432
uint32_t uint32
Definition c.h:618
#define XLOG_DBASE_CREATE_FILE_COPY
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:952
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:358
void hash_destroy(HTAB *hashp)
Definition dynahash.c:865
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
int io_direct_flags
Definition fd.c:172
#define IO_DIRECT_DATA
Definition fd.h:54
#define palloc0_object(type)
Definition fe_memutils.h:75
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition funcapi.c:76
bool IsUnderPostmaster
Definition globals.c:120
#define GUC_check_errdetail
Definition guc.h:507
GucSource
Definition guc.h:112
@ HASH_FIND
Definition hsearch.h:113
@ HASH_REMOVE
Definition hsearch.h:115
@ HASH_ENTER
Definition hsearch.h:114
#define HASH_ELEM
Definition hsearch.h:95
#define HASH_BLOBS
Definition hsearch.h:97
static void dlist_init(dlist_head *head)
Definition ilist.h:314
static void dlist_delete(dlist_node *node)
Definition ilist.h:405
#define dlist_tail_element(type, membername, lhead)
Definition ilist.h:612
static void dlist_push_head(dlist_head *head, dlist_node *node)
Definition ilist.h:347
static bool dlist_is_empty(const dlist_head *head)
Definition ilist.h:336
int i
Definition isn.c:77
void pfree(void *pointer)
Definition mcxt.c:1616
void * palloc(Size size)
Definition mcxt.c:1387
#define AmStartupProcess()
Definition miscadmin.h:390
static char * errmsg
#define XLOG_CHECKPOINT_SHUTDOWN
Definition pg_control.h:69
#define XLOG_END_OF_RECOVERY
Definition pg_control.h:78
static rewind_source * source
Definition pg_rewind.c:89
static Datum Int64GetDatum(int64 X)
Definition postgres.h:413
uint64_t Datum
Definition postgres.h:70
static Datum Int32GetDatum(int32 X)
Definition postgres.h:212
#define InvalidOid
static int fb(int x)
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
tree ctl
Definition radixtree.h:1838
#define RelFileLocatorEquals(locator1, locator2)
@ MAIN_FORKNUM
Definition relpath.h:58
#define InvalidRelFileNumber
Definition relpath.h:26
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:381
BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum)
Definition smgr.c:819
SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend)
Definition smgr.c:240
bool smgrexists(SMgrRelation reln, ForkNumber forknum)
Definition smgr.c:462
#define XLOG_SMGR_CREATE
#define XLOG_SMGR_TRUNCATE
Buffer prefetch_buffer
Definition xlogreader.h:130
RelFileLocator rlocator
Definition xlogreader.h:125
BlockNumber blkno
Definition xlogreader.h:127
ForkNumber forknum
Definition xlogreader.h:126
XLogRecord header
Definition xlogreader.h:165
DecodedBkpBlock blocks[FLEXIBLE_ARRAY_MEMBER]
Definition xlogreader.h:171
XLogRecPtr lsn
LsnReadQueueNextFun next
uintptr_t lrq_private
struct LsnReadQueue::@17 queue[FLEXIBLE_ARRAY_MEMBER]
Buffer recent_buffer
Definition bufmgr.h:61
RelFileNumber relNumber
pg_atomic_uint64 skip_fpw
pg_atomic_uint64 skip_init
pg_atomic_uint64 reset_time
pg_atomic_uint64 hit
pg_atomic_uint64 prefetch
pg_atomic_uint64 skip_rep
pg_atomic_uint64 skip_new
RelFileLocator rlocator
XLogRecPtr filter_until_replayed
BlockNumber filter_from_block
dlist_head filter_queue
XLogRecPtr no_readahead_until
XLogReaderState * reader
XLogRecPtr begin_ptr
RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
LsnReadQueue * streaming_read
DecodedXLogRecord * record
XLogRecPtr next_stats_shm_lsn
BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
XLogRecPtr ReadRecPtr
Definition xlogreader.h:205
DecodedXLogRecord * decode_queue_tail
Definition xlogreader.h:260
uint8 xl_info
Definition xlogrecord.h:46
RmgrId xl_rmid
Definition xlogrecord.h:47
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition tuplestore.c:785
static Datum TimestampTzGetDatum(TimestampTz X)
Definition timestamp.h:52
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
void XLogPrefetchResetStats(void)
static bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno)
void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
int recovery_prefetch
#define RecoveryPrefetchEnabled()
static void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
LsnReadQueueNextStatus(* LsnReadQueueNextFun)(uintptr_t lrq_private, XLogRecPtr *lsn)
static void lrq_free(LsnReadQueue *lrq)
static void lrq_prefetch(LsnReadQueue *lrq)
static int XLogPrefetchReconfigureCount
Datum pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
XLogPrefetcher * XLogPrefetcherAllocate(XLogReaderState *reader)
static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
static uint32 lrq_completed(LsnReadQueue *lrq)
static XLogPrefetchStats * SharedStats
static uint32 lrq_inflight(LsnReadQueue *lrq)
void XLogPrefetchReconfigure(void)
size_t XLogPrefetchShmemSize(void)
#define PG_STAT_GET_RECOVERY_PREFETCH_COLS
XLogRecord * XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
XLogReaderState * XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
static LsnReadQueue * lrq_alloc(uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)
void XLogPrefetchShmemInit(void)
void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
void assign_recovery_prefetch(int new_value, void *extra)
static void XLogPrefetchIncrement(pg_atomic_uint64 *counter)
#define XLOGPREFETCHER_SEQ_WINDOW_SIZE
static void lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
#define XLOGPREFETCHER_STATS_DISTANCE
static void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno, XLogRecPtr lsn)
#define XLOGPREFETCHER_DISTANCE_MULTIPLIER
void XLogPrefetcherFree(XLogPrefetcher *prefetcher)
bool check_recovery_prefetch(int *new_value, void **extra, GucSource source)
LsnReadQueueNextStatus
@ LRQ_NEXT_NO_IO
@ LRQ_NEXT_IO
@ LRQ_NEXT_AGAIN
@ RECOVERY_PREFETCH_ON
@ RECOVERY_PREFETCH_TRY
DecodedXLogRecord * XLogReadAhead(XLogReaderState *state, bool nonblocking)
Definition xlogreader.c:978
DecodedXLogRecord * XLogNextRecord(XLogReaderState *state, char **errormsg)
Definition xlogreader.c:327
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition xlogreader.c:233
XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state)
Definition xlogreader.c:251
static bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
Definition xlogreader.h:324
#define BKPBLOCK_WILL_INIT
Definition xlogrecord.h:199