43#include "utils/fmgrprotos.h"
53#define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
59#define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
65#define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
74#define RecoveryPrefetchEnabled() \
75 (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
76 maintenance_io_concurrency > 0)
78#define RecoveryPrefetchEnabled() false
225 lrq->lrq_private = lrq_private;
226 lrq->max_inflight = max_inflight;
246 return lrq->inflight;
252 return lrq->completed;
259 while (
lrq->inflight <
lrq->max_inflight &&
260 lrq->inflight +
lrq->completed <
lrq->size - 1)
263 switch (
lrq->next(
lrq->lrq_private, &
lrq->queue[
lrq->head].lsn))
268 lrq->queue[
lrq->head].io =
true;
272 lrq->queue[
lrq->head].io =
false;
277 if (
lrq->head ==
lrq->size)
289 while (
lrq->tail !=
lrq->head &&
290 lrq->queue[
lrq->tail].lsn < lsn)
292 if (
lrq->queue[
lrq->tail].io)
297 if (
lrq->tail ==
lrq->size)
503 if (nonblocking &&
prefetcher->reader->decode_queue_tail)
552#ifdef XLOGPREFETCHER_DEBUG_LEVEL
554 "suppressing all readahead until %X/%08X is replayed due to possible TLI change",
585#ifdef XLOGPREFETCHER_DEBUG_LEVEL
587 "suppressing prefetch in database %u until %X/%08X is replayed due to raw file copy",
613#ifdef XLOGPREFETCHER_DEBUG_LEVEL
615 "suppressing prefetch in relation %u/%u/%u until %X/%08X is replayed, which creates the relation",
616 xlrec->rlocator.spcOid,
617 xlrec->rlocator.dbOid,
618 xlrec->rlocator.relNumber,
636#ifdef XLOGPREFETCHER_DEBUG_LEVEL
638 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, which truncates the relation",
639 xlrec->rlocator.spcOid,
640 xlrec->rlocator.dbOid,
641 xlrec->rlocator.relNumber,
735#ifdef XLOGPREFETCHER_DEBUG_LEVEL
737 "suppressing all prefetch in relation %u/%u/%u until %X/%08X is replayed, because the relation does not exist on disk",
738 reln->smgr_rlocator.locator.spcOid,
739 reln->smgr_rlocator.locator.dbOid,
740 reln->smgr_rlocator.locator.relNumber,
756#ifdef XLOGPREFETCHER_DEBUG_LEVEL
758 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, because the relation is too small",
759 reln->smgr_rlocator.locator.spcOid,
760 reln->smgr_rlocator.locator.dbOid,
761 reln->smgr_rlocator.locator.relNumber,
780 else if (
result.initiated_io)
797 "could not prefetch relation %u/%u/%u block %u",
798 reln->smgr_rlocator.locator.spcOid,
799 reln->smgr_rlocator.locator.dbOid,
800 reln->smgr_rlocator.locator.relNumber,
831#define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
934#ifdef XLOGPREFETCHER_DEBUG_LEVEL
936 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (blocks >= %u filtered)",
950#ifdef XLOGPREFETCHER_DEBUG_LEVEL
952 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (whole database)",
1091 GUC_check_errdetail(
"\"recovery_prefetch\" is not supported on platforms that lack support for issuing read-ahead advice.");
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
TimestampTz GetCurrentTimestamp(void)
static Datum values[MAXATTR]
PrefetchBufferResult PrefetchSharedBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, BlockNumber blockNum)
int maintenance_io_concurrency
static bool BufferIsValid(Buffer bufnum)
#define Assert(condition)
#define FLEXIBLE_ARRAY_MEMBER
#define XLOG_DBASE_CREATE_FILE_COPY
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
void hash_destroy(HTAB *hashp)
#define palloc0_object(type)
void InitMaterializedSRF(FunctionCallInfo fcinfo, uint32 flags)
#define GUC_check_errdetail
static void dlist_init(dlist_head *head)
static void dlist_delete(dlist_node *node)
#define dlist_tail_element(type, membername, lhead)
static void dlist_push_head(dlist_head *head, dlist_node *node)
static bool dlist_is_empty(const dlist_head *head)
void pfree(void *pointer)
#define AmStartupProcess()
#define XLOG_CHECKPOINT_SHUTDOWN
#define XLOG_END_OF_RECOVERY
static rewind_source * source
static Datum Int64GetDatum(int64 X)
static Datum Int32GetDatum(int32 X)
#define INVALID_PROC_NUMBER
#define RelFileLocatorEquals(locator1, locator2)
#define InvalidRelFileNumber
#define ShmemRequestStruct(...)
BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum)
SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend)
bool smgrexists(SMgrRelation reln, ForkNumber forknum)
#define XLOG_SMGR_TRUNCATE
DecodedBkpBlock blocks[FLEXIBLE_ARRAY_MEMBER]
struct LsnReadQueue::@17 queue[FLEXIBLE_ARRAY_MEMBER]
ShmemRequestCallback request_fn
pg_atomic_uint64 skip_fpw
pg_atomic_uint64 skip_init
pg_atomic_uint64 reset_time
pg_atomic_uint64 prefetch
pg_atomic_uint64 skip_rep
pg_atomic_uint64 skip_new
XLogRecPtr filter_until_replayed
BlockNumber filter_from_block
XLogRecPtr no_readahead_until
RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
LsnReadQueue * streaming_read
DecodedXLogRecord * record
XLogRecPtr next_stats_shm_lsn
BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
DecodedXLogRecord * decode_queue_tail
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
static Datum TimestampTzGetDatum(TimestampTz X)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr
void XLogPrefetchResetStats(void)
static bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno)
void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
#define RecoveryPrefetchEnabled()
static void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
LsnReadQueueNextStatus(* LsnReadQueueNextFun)(uintptr_t lrq_private, XLogRecPtr *lsn)
static void lrq_free(LsnReadQueue *lrq)
static void lrq_prefetch(LsnReadQueue *lrq)
static int XLogPrefetchReconfigureCount
Datum pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
static void XLogPrefetchShmemRequest(void *arg)
XLogPrefetcher * XLogPrefetcherAllocate(XLogReaderState *reader)
static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
static uint32 lrq_completed(LsnReadQueue *lrq)
static XLogPrefetchStats * SharedStats
static uint32 lrq_inflight(LsnReadQueue *lrq)
void XLogPrefetchReconfigure(void)
#define PG_STAT_GET_RECOVERY_PREFETCH_COLS
XLogRecord * XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
const ShmemCallbacks XLogPrefetchShmemCallbacks
XLogReaderState * XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
static LsnReadQueue * lrq_alloc(uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)
void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
void assign_recovery_prefetch(int new_value, void *extra)
static void XLogPrefetchIncrement(pg_atomic_uint64 *counter)
#define XLOGPREFETCHER_SEQ_WINDOW_SIZE
static void lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
static void XLogPrefetchShmemInit(void *arg)
#define XLOGPREFETCHER_STATS_DISTANCE
static void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno, XLogRecPtr lsn)
#define XLOGPREFETCHER_DISTANCE_MULTIPLIER
void XLogPrefetcherFree(XLogPrefetcher *prefetcher)
bool check_recovery_prefetch(int *new_value, void **extra, GucSource source)
DecodedXLogRecord * XLogReadAhead(XLogReaderState *state, bool nonblocking)
DecodedXLogRecord * XLogNextRecord(XLogReaderState *state, char **errormsg)
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state)
static bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
#define BKPBLOCK_WILL_INIT