41 #include "utils/fmgrprotos.h"
50 #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
56 #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
62 #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
71 #define RecoveryPrefetchEnabled() \
72 (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
73 maintenance_io_concurrency > 0)
75 #define RecoveryPrefetchEnabled() false
204 uintptr_t lrq_private,
210 Assert(max_distance >= max_inflight);
212 size = max_distance + 1;
368 prefetcher->
reader = reader;
403 return prefetcher->
reader;
474 if (prefetcher->
record == NULL)
487 if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
517 prefetcher->
record = record;
523 record = prefetcher->
record;
530 if (replaying_lsn < record->lsn)
535 if (rmid == RM_XLOG_ID)
547 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
548 elog(XLOGPREFETCHER_DEBUG_LEVEL,
549 "suppressing all readahead until %X/%X is replayed due to possible TLI change",
556 else if (rmid == RM_DBASE_ID)
580 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
581 elog(XLOGPREFETCHER_DEBUG_LEVEL,
582 "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
588 else if (rmid == RM_SMGR_ID)
608 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
609 elog(XLOGPREFETCHER_DEBUG_LEVEL,
610 "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
631 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
632 elog(XLOGPREFETCHER_DEBUG_LEVEL,
633 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
730 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
731 elog(XLOGPREFETCHER_DEBUG_LEVEL,
732 "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
751 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
752 elog(XLOGPREFETCHER_DEBUG_LEVEL,
753 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
792 "could not prefetch relation %u/%u/%u block %u",
815 prefetcher->
record = NULL;
826 #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
929 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
930 elog(XLOGPREFETCHER_DEBUG_LEVEL,
931 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
945 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
946 elog(XLOGPREFETCHER_DEBUG_LEVEL,
947 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
1012 (uintptr_t) prefetcher,
1065 if (record == prefetcher->
record)
1066 prefetcher->
record = NULL;
1083 #ifndef USE_PREFETCH
1086 GUC_check_errdetail(
"\"recovery_prefetch\" is not supported on platforms that lack support for issuing read-ahead advice.");
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
TimestampTz GetCurrentTimestamp(void)
static Datum values[MAXATTR]
PrefetchBufferResult PrefetchSharedBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, BlockNumber blockNum)
int maintenance_io_concurrency
static bool BufferIsValid(Buffer bufnum)
#define Assert(condition)
#define FLEXIBLE_ARRAY_MEMBER
#define XLOG_DBASE_CREATE_FILE_COPY
void hash_destroy(HTAB *hashp)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
int errmsg(const char *fmt,...)
Datum Int64GetDatum(int64 X)
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
#define GUC_check_errdetail
static void dlist_init(dlist_head *head)
static void dlist_delete(dlist_node *node)
#define dlist_tail_element(type, membername, lhead)
static void dlist_push_head(dlist_head *head, dlist_node *node)
static bool dlist_is_empty(const dlist_head *head)
if(TABLE==NULL||TABLE_index==NULL)
void pfree(void *pointer)
void * palloc0(Size size)
#define AmStartupProcess()
#define XLOG_CHECKPOINT_SHUTDOWN
#define XLOG_END_OF_RECOVERY
static rewind_source * source
static Datum Int32GetDatum(int32 X)
#define INVALID_PROC_NUMBER
struct RelFileLocator RelFileLocator
#define RelFileLocatorEquals(locator1, locator2)
#define InvalidRelFileNumber
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
static pg_noinline void Size size
BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum)
SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend)
bool smgrexists(SMgrRelation reln, ForkNumber forknum)
#define XLOG_SMGR_TRUNCATE
struct LsnReadQueue::@17 queue[FLEXIBLE_ARRAY_MEMBER]
Tuplestorestate * setResult
RelFileLocatorBackend smgr_rlocator
pg_atomic_uint64 skip_fpw
pg_atomic_uint64 skip_init
pg_atomic_uint64 reset_time
pg_atomic_uint64 prefetch
pg_atomic_uint64 skip_rep
pg_atomic_uint64 skip_new
XLogRecPtr filter_until_replayed
BlockNumber filter_from_block
XLogRecPtr no_readahead_until
RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
LsnReadQueue * streaming_read
DecodedXLogRecord * record
XLogRecPtr next_stats_shm_lsn
BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
DecodedXLogRecord * record
DecodedXLogRecord * decode_queue_head
DecodedXLogRecord * decode_queue_tail
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
static Datum TimestampTzGetDatum(TimestampTz X)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr
void XLogPrefetchResetStats(void)
static bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno)
void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
struct LsnReadQueue LsnReadQueue
#define RecoveryPrefetchEnabled()
static LsnReadQueue * lrq_alloc(uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)
static void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
LsnReadQueueNextStatus(* LsnReadQueueNextFun)(uintptr_t lrq_private, XLogRecPtr *lsn)
static void lrq_free(LsnReadQueue *lrq)
struct XLogPrefetchStats XLogPrefetchStats
XLogRecord * XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
static void lrq_prefetch(LsnReadQueue *lrq)
static int XLogPrefetchReconfigureCount
Datum pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
static uint32 lrq_completed(LsnReadQueue *lrq)
static XLogPrefetchStats * SharedStats
static uint32 lrq_inflight(LsnReadQueue *lrq)
void XLogPrefetchReconfigure(void)
size_t XLogPrefetchShmemSize(void)
#define PG_STAT_GET_RECOVERY_PREFETCH_COLS
XLogPrefetcher * XLogPrefetcherAllocate(XLogReaderState *reader)
void XLogPrefetchShmemInit(void)
void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
void assign_recovery_prefetch(int new_value, void *extra)
static void XLogPrefetchIncrement(pg_atomic_uint64 *counter)
#define XLOGPREFETCHER_SEQ_WINDOW_SIZE
struct XLogPrefetcherFilter XLogPrefetcherFilter
static void lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
#define XLOGPREFETCHER_STATS_DISTANCE
static void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno, XLogRecPtr lsn)
#define XLOGPREFETCHER_DISTANCE_MULTIPLIER
void XLogPrefetcherFree(XLogPrefetcher *prefetcher)
bool check_recovery_prefetch(int *new_value, void **extra, GucSource source)
XLogReaderState * XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
DecodedXLogRecord * XLogNextRecord(XLogReaderState *state, char **errormsg)
DecodedXLogRecord * XLogReadAhead(XLogReaderState *state, bool nonblocking)
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state)
static bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
#define BKPBLOCK_WILL_INIT