38 #include "utils/fmgrprotos.h"
54 #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
60 #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
66 #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
75 #define RecoveryPrefetchEnabled() \
76 (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
77 maintenance_io_concurrency > 0)
79 #define RecoveryPrefetchEnabled() false
208 uintptr_t lrq_private,
214 Assert(max_distance >= max_inflight);
216 size = max_distance + 1;
369 static HASHCTL hash_table_ctl = {
376 prefetcher->
reader = reader;
409 return prefetcher->
reader;
480 if (prefetcher->
record == NULL)
493 if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
523 prefetcher->
record = record;
529 record = prefetcher->
record;
536 if (replaying_lsn < record->lsn)
541 if (rmid == RM_XLOG_ID)
553 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
554 elog(XLOGPREFETCHER_DEBUG_LEVEL,
555 "suppressing all readahead until %X/%X is replayed due to possible TLI change",
562 else if (rmid == RM_DBASE_ID)
586 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
587 elog(XLOGPREFETCHER_DEBUG_LEVEL,
588 "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
594 else if (rmid == RM_SMGR_ID)
614 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
615 elog(XLOGPREFETCHER_DEBUG_LEVEL,
616 "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
637 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
638 elog(XLOGPREFETCHER_DEBUG_LEVEL,
639 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
736 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
737 elog(XLOGPREFETCHER_DEBUG_LEVEL,
738 "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
757 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
758 elog(XLOGPREFETCHER_DEBUG_LEVEL,
759 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
798 "could not prefetch relation %u/%u/%u block %u",
821 prefetcher->
record = NULL;
832 #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
935 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
936 elog(XLOGPREFETCHER_DEBUG_LEVEL,
937 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
951 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
952 elog(XLOGPREFETCHER_DEBUG_LEVEL,
953 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
1018 (uintptr_t) prefetcher,
1071 if (record == prefetcher->
record)
1072 prefetcher->
record = NULL;
1089 #ifndef USE_PREFETCH
1092 GUC_check_errdetail(
"recovery_prefetch is not supported on platforms that lack posix_fadvise().");
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
TimestampTz GetCurrentTimestamp(void)
static Datum values[MAXATTR]
PrefetchBufferResult PrefetchSharedBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, BlockNumber blockNum)
int maintenance_io_concurrency
static bool BufferIsValid(Buffer bufnum)
#define FLEXIBLE_ARRAY_MEMBER
#define XLOG_DBASE_CREATE_FILE_COPY
elog(ERROR, "%s: %s", p2, msg)
void hash_destroy(HTAB *hashp)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
int errmsg(const char *fmt,...)
Datum Int64GetDatum(int64 X)
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
#define GUC_check_errdetail
static void dlist_init(dlist_head *head)
static void dlist_delete(dlist_node *node)
#define dlist_tail_element(type, membername, lhead)
static void dlist_push_head(dlist_head *head, dlist_node *node)
static bool dlist_is_empty(const dlist_head *head)
if(TABLE==NULL||TABLE_index==NULL)
Assert(fmt[strlen(fmt) - 1] !='\n')
void pfree(void *pointer)
void * palloc0(Size size)
#define AmStartupProcess()
#define XLOG_CHECKPOINT_SHUTDOWN
#define XLOG_END_OF_RECOVERY
static rewind_source * source
static Datum Int32GetDatum(int32 X)
struct RelFileLocator RelFileLocator
#define RelFileLocatorEquals(locator1, locator2)
#define InvalidRelFileNumber
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum)
SMgrRelation smgropen(RelFileLocator rlocator, BackendId backend)
bool smgrexists(SMgrRelation reln, ForkNumber forknum)
#define XLOG_SMGR_TRUNCATE
struct LsnReadQueue::@14 queue[FLEXIBLE_ARRAY_MEMBER]
Tuplestorestate * setResult
RelFileLocatorBackend smgr_rlocator
pg_atomic_uint64 skip_fpw
pg_atomic_uint64 skip_init
pg_atomic_uint64 reset_time
pg_atomic_uint64 prefetch
pg_atomic_uint64 skip_rep
pg_atomic_uint64 skip_new
XLogRecPtr filter_until_replayed
BlockNumber filter_from_block
XLogRecPtr no_readahead_until
RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
LsnReadQueue * streaming_read
DecodedXLogRecord * record
XLogRecPtr next_stats_shm_lsn
BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
DecodedXLogRecord * record
DecodedXLogRecord * decode_queue_head
DecodedXLogRecord * decode_queue_tail
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
static Datum TimestampTzGetDatum(TimestampTz X)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr
void XLogPrefetchResetStats(void)
static bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno)
void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
struct LsnReadQueue LsnReadQueue
#define RecoveryPrefetchEnabled()
static LsnReadQueue * lrq_alloc(uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)
static void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
LsnReadQueueNextStatus(* LsnReadQueueNextFun)(uintptr_t lrq_private, XLogRecPtr *lsn)
static void lrq_free(LsnReadQueue *lrq)
struct XLogPrefetchStats XLogPrefetchStats
XLogRecord * XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
static void lrq_prefetch(LsnReadQueue *lrq)
static int XLogPrefetchReconfigureCount
Datum pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
static uint32 lrq_completed(LsnReadQueue *lrq)
static XLogPrefetchStats * SharedStats
static uint32 lrq_inflight(LsnReadQueue *lrq)
void XLogPrefetchReconfigure(void)
size_t XLogPrefetchShmemSize(void)
#define PG_STAT_GET_RECOVERY_PREFETCH_COLS
XLogPrefetcher * XLogPrefetcherAllocate(XLogReaderState *reader)
void XLogPrefetchShmemInit(void)
void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
void assign_recovery_prefetch(int new_value, void *extra)
static void XLogPrefetchIncrement(pg_atomic_uint64 *counter)
#define XLOGPREFETCHER_SEQ_WINDOW_SIZE
struct XLogPrefetcherFilter XLogPrefetcherFilter
static void lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
#define XLOGPREFETCHER_STATS_DISTANCE
static void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno, XLogRecPtr lsn)
#define XLOGPREFETCHER_DISTANCE_MULTIPLIER
void XLogPrefetcherFree(XLogPrefetcher *prefetcher)
bool check_recovery_prefetch(int *new_value, void **extra, GucSource source)
XLogReaderState * XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
DecodedXLogRecord * XLogNextRecord(XLogReaderState *state, char **errormsg)
DecodedXLogRecord * XLogReadAhead(XLogReaderState *state, bool nonblocking)
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state)
static bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
#define BKPBLOCK_WILL_INIT