PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
xlogprefetcher.h File Reference
Include dependency graph for xlogprefetcher.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Typedefs

typedef struct XLogPrefetcher XLogPrefetcher
 

Enumerations

enum  RecoveryPrefetchValue { RECOVERY_PREFETCH_OFF , RECOVERY_PREFETCH_ON , RECOVERY_PREFETCH_TRY }
 

Functions

void XLogPrefetchReconfigure (void)
 
size_t XLogPrefetchShmemSize (void)
 
void XLogPrefetchShmemInit (void)
 
void XLogPrefetchResetStats (void)
 
XLogPrefetcherXLogPrefetcherAllocate (XLogReaderState *reader)
 
void XLogPrefetcherFree (XLogPrefetcher *prefetcher)
 
XLogReaderStateXLogPrefetcherGetReader (XLogPrefetcher *prefetcher)
 
void XLogPrefetcherBeginRead (XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
 
XLogRecordXLogPrefetcherReadRecord (XLogPrefetcher *prefetcher, char **errmsg)
 
void XLogPrefetcherComputeStats (XLogPrefetcher *prefetcher)
 

Variables

PGDLLIMPORT int recovery_prefetch
 

Typedef Documentation

◆ XLogPrefetcher

Definition at line 32 of file xlogprefetcher.h.

Enumeration Type Documentation

◆ RecoveryPrefetchValue

Enumerator
RECOVERY_PREFETCH_OFF 
RECOVERY_PREFETCH_ON 
RECOVERY_PREFETCH_TRY 

Definition at line 24 of file xlogprefetcher.h.

25{
RecoveryPrefetchValue
@ RECOVERY_PREFETCH_OFF
@ RECOVERY_PREFETCH_ON
@ RECOVERY_PREFETCH_TRY

Function Documentation

◆ XLogPrefetcherAllocate()

XLogPrefetcher * XLogPrefetcherAllocate ( XLogReaderState reader)

Definition at line 362 of file xlogprefetcher.c.

363{
364 XLogPrefetcher *prefetcher;
365 HASHCTL ctl;
366
367 prefetcher = palloc0(sizeof(XLogPrefetcher));
368 prefetcher->reader = reader;
369
370 ctl.keysize = sizeof(RelFileLocator);
371 ctl.entrysize = sizeof(XLogPrefetcherFilter);
372 prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
374 dlist_init(&prefetcher->filter_queue);
375
379
380 /* First usage will cause streaming_read to be allocated. */
382
383 return prefetcher;
384}
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * palloc0(Size size)
Definition: mcxt.c:1347
tree ctl
Definition: radixtree.h:1838
struct RelFileLocator RelFileLocator
dlist_head filter_queue
XLogReaderState * reader
static int XLogPrefetchReconfigureCount
static XLogPrefetchStats * SharedStats
struct XLogPrefetcherFilter XLogPrefetcherFilter

References XLogPrefetchStats::block_distance, ctl, dlist_init(), XLogPrefetcher::filter_queue, XLogPrefetcher::filter_table, HASH_BLOBS, hash_create(), HASH_ELEM, XLogPrefetchStats::io_depth, palloc0(), XLogPrefetcher::reader, XLogPrefetcher::reconfigure_count, SharedStats, XLogPrefetchStats::wal_distance, and XLogPrefetchReconfigureCount.

Referenced by InitWalRecovery().

◆ XLogPrefetcherBeginRead()

void XLogPrefetcherBeginRead ( XLogPrefetcher prefetcher,
XLogRecPtr  recPtr 
)

Definition at line 962 of file xlogprefetcher.c.

963{
964 /* This will forget about any in-flight IO. */
965 prefetcher->reconfigure_count--;
966
967 /* Book-keeping to avoid readahead on first read. */
968 prefetcher->begin_ptr = recPtr;
969
970 prefetcher->no_readahead_until = 0;
971
972 /* This will forget about any queued up records in the decoder. */
973 XLogBeginRead(prefetcher->reader, recPtr);
974}
XLogRecPtr no_readahead_until
XLogRecPtr begin_ptr
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:231

References XLogPrefetcher::begin_ptr, XLogPrefetcher::no_readahead_until, XLogPrefetcher::reader, XLogPrefetcher::reconfigure_count, and XLogBeginRead().

Referenced by FinishWalRecovery(), InitWalRecovery(), PerformWalRecovery(), and ReadCheckpointRecord().

◆ XLogPrefetcherComputeStats()

void XLogPrefetcherComputeStats ( XLogPrefetcher prefetcher)

Definition at line 410 of file xlogprefetcher.c.

411{
412 uint32 io_depth;
413 uint32 completed;
414 int64 wal_distance;
415
416
417 /* How far ahead of replay are we now? */
418 if (prefetcher->reader->decode_queue_tail)
419 {
420 wal_distance =
421 prefetcher->reader->decode_queue_tail->lsn -
422 prefetcher->reader->decode_queue_head->lsn;
423 }
424 else
425 {
426 wal_distance = 0;
427 }
428
429 /* How many IOs are currently in flight and completed? */
430 io_depth = lrq_inflight(prefetcher->streaming_read);
431 completed = lrq_completed(prefetcher->streaming_read);
432
433 /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
434 SharedStats->io_depth = io_depth;
435 SharedStats->block_distance = io_depth + completed;
436 SharedStats->wal_distance = wal_distance;
437
438 prefetcher->next_stats_shm_lsn =
440}
int64_t int64
Definition: c.h:482
uint32_t uint32
Definition: c.h:485
XLogRecPtr lsn
Definition: xlogreader.h:164
LsnReadQueue * streaming_read
XLogRecPtr next_stats_shm_lsn
DecodedXLogRecord * decode_queue_head
Definition: xlogreader.h:260
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206
DecodedXLogRecord * decode_queue_tail
Definition: xlogreader.h:261
static uint32 lrq_completed(LsnReadQueue *lrq)
static uint32 lrq_inflight(LsnReadQueue *lrq)
#define XLOGPREFETCHER_STATS_DISTANCE

References XLogPrefetchStats::block_distance, XLogReaderState::decode_queue_head, XLogReaderState::decode_queue_tail, XLogPrefetchStats::io_depth, lrq_completed(), lrq_inflight(), DecodedXLogRecord::lsn, XLogPrefetcher::next_stats_shm_lsn, XLogPrefetcher::reader, XLogReaderState::ReadRecPtr, SharedStats, XLogPrefetcher::streaming_read, XLogPrefetchStats::wal_distance, and XLOGPREFETCHER_STATS_DISTANCE.

Referenced by ShutdownWalRecovery(), WaitForWALToBecomeAvailable(), and XLogPrefetcherReadRecord().

◆ XLogPrefetcherFree()

void XLogPrefetcherFree ( XLogPrefetcher prefetcher)

Definition at line 390 of file xlogprefetcher.c.

391{
392 lrq_free(prefetcher->streaming_read);
393 hash_destroy(prefetcher->filter_table);
394 pfree(prefetcher);
395}
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865
void pfree(void *pointer)
Definition: mcxt.c:1521
static void lrq_free(LsnReadQueue *lrq)

References XLogPrefetcher::filter_table, hash_destroy(), lrq_free(), pfree(), and XLogPrefetcher::streaming_read.

Referenced by ShutdownWalRecovery().

◆ XLogPrefetcherGetReader()

XLogReaderState * XLogPrefetcherGetReader ( XLogPrefetcher prefetcher)

Definition at line 401 of file xlogprefetcher.c.

402{
403 return prefetcher->reader;
404}

References XLogPrefetcher::reader.

Referenced by ReadRecord().

◆ XLogPrefetcherReadRecord()

XLogRecord * XLogPrefetcherReadRecord ( XLogPrefetcher prefetcher,
char **  errmsg 
)

Definition at line 981 of file xlogprefetcher.c.

982{
983 DecodedXLogRecord *record;
984 XLogRecPtr replayed_up_to;
985
986 /*
987 * See if it's time to reset the prefetching machinery, because a relevant
988 * GUC was changed.
989 */
991 {
992 uint32 max_distance;
993 uint32 max_inflight;
994
995 if (prefetcher->streaming_read)
996 lrq_free(prefetcher->streaming_read);
997
999 {
1001 max_inflight = maintenance_io_concurrency;
1002 max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
1003 }
1004 else
1005 {
1006 max_inflight = 1;
1007 max_distance = 1;
1008 }
1009
1010 prefetcher->streaming_read = lrq_alloc(max_distance,
1011 max_inflight,
1012 (uintptr_t) prefetcher,
1014
1016 }
1017
1018 /*
1019 * Release last returned record, if there is one, as it's now been
1020 * replayed.
1021 */
1022 replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
1023
1024 /*
1025 * Can we drop any filters yet? If we were waiting for a relation to be
1026 * created or extended, it is now OK to access blocks in the covered
1027 * range.
1028 */
1029 XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
1030
1031 /*
1032 * All IO initiated by earlier WAL is now completed. This might trigger
1033 * further prefetching.
1034 */
1035 lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1036
1037 /*
1038 * If there's nothing queued yet, then start prefetching to cause at least
1039 * one record to be queued.
1040 */
1041 if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1042 {
1043 Assert(lrq_inflight(prefetcher->streaming_read) == 0);
1044 Assert(lrq_completed(prefetcher->streaming_read) == 0);
1045 lrq_prefetch(prefetcher->streaming_read);
1046 }
1047
1048 /* Read the next record. */
1049 record = XLogNextRecord(prefetcher->reader, errmsg);
1050 if (!record)
1051 return NULL;
1052
1053 /*
1054 * The record we just got is the "current" one, for the benefit of the
1055 * XLogRecXXX() macros.
1056 */
1057 Assert(record == prefetcher->reader->record);
1058
1059 /*
1060 * If maintenance_io_concurrency is set very low, we might have started
1061 * prefetching some but not all of the blocks referenced in the record
1062 * we're about to return. Forget about the rest of the blocks in this
1063 * record by dropping the prefetcher's reference to it.
1064 */
1065 if (record == prefetcher->record)
1066 prefetcher->record = NULL;
1067
1068 /*
1069 * See if it's time to compute some statistics, because enough WAL has
1070 * been processed.
1071 */
1072 if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
1073 XLogPrefetcherComputeStats(prefetcher);
1074
1075 Assert(record == prefetcher->reader->record);
1076
1077 return &record->header;
1078}
int maintenance_io_concurrency
Definition: bufmgr.c:158
#define Assert(condition)
Definition: c.h:812
#define unlikely(x)
Definition: c.h:330
int errmsg(const char *fmt,...)
Definition: elog.c:1070
XLogRecord header
Definition: xlogreader.h:166
DecodedXLogRecord * record
DecodedXLogRecord * record
Definition: xlogreader.h:236
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
#define RecoveryPrefetchEnabled()
static void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
static void lrq_prefetch(LsnReadQueue *lrq)
static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
static LsnReadQueue * lrq_alloc(uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)
static void lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
#define XLOGPREFETCHER_DISTANCE_MULTIPLIER
DecodedXLogRecord * XLogNextRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:325
XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state)
Definition: xlogreader.c:249
static bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
Definition: xlogreader.h:325

References Assert, errmsg(), DecodedXLogRecord::header, lrq_alloc(), lrq_complete_lsn(), lrq_completed(), lrq_free(), lrq_inflight(), lrq_prefetch(), DecodedXLogRecord::lsn, maintenance_io_concurrency, XLogPrefetcher::next_stats_shm_lsn, XLogPrefetcher::reader, XLogPrefetcher::reconfigure_count, XLogPrefetcher::record, XLogReaderState::record, RecoveryPrefetchEnabled, XLogPrefetcher::streaming_read, unlikely, XLogNextRecord(), XLOGPREFETCHER_DISTANCE_MULTIPLIER, XLogPrefetcherCompleteFilters(), XLogPrefetcherComputeStats(), XLogPrefetcherNextBlock(), XLogPrefetchReconfigureCount, XLogReaderHasQueuedRecordOrError(), and XLogReleasePreviousRecord().

Referenced by ReadRecord().

◆ XLogPrefetchReconfigure()

void XLogPrefetchReconfigure ( void  )

◆ XLogPrefetchResetStats()

void XLogPrefetchResetStats ( void  )

Definition at line 303 of file xlogprefetcher.c.

304{
312}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:485
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
pg_atomic_uint64 skip_fpw
pg_atomic_uint64 skip_init
pg_atomic_uint64 reset_time
pg_atomic_uint64 hit
pg_atomic_uint64 prefetch
pg_atomic_uint64 skip_rep
pg_atomic_uint64 skip_new

References GetCurrentTimestamp(), XLogPrefetchStats::hit, pg_atomic_write_u64(), XLogPrefetchStats::prefetch, XLogPrefetchStats::reset_time, SharedStats, XLogPrefetchStats::skip_fpw, XLogPrefetchStats::skip_init, XLogPrefetchStats::skip_new, and XLogPrefetchStats::skip_rep.

Referenced by pg_stat_reset_shared().

◆ XLogPrefetchShmemInit()

void XLogPrefetchShmemInit ( void  )

Definition at line 315 of file xlogprefetcher.c.

316{
317 bool found;
318
320 ShmemInitStruct("XLogPrefetchStats",
321 sizeof(XLogPrefetchStats),
322 &found);
323
324 if (!found)
325 {
333 }
334}
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:453
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:382

References GetCurrentTimestamp(), XLogPrefetchStats::hit, pg_atomic_init_u64(), XLogPrefetchStats::prefetch, XLogPrefetchStats::reset_time, SharedStats, ShmemInitStruct(), XLogPrefetchStats::skip_fpw, XLogPrefetchStats::skip_init, XLogPrefetchStats::skip_new, and XLogPrefetchStats::skip_rep.

Referenced by CreateOrAttachShmemStructs().

◆ XLogPrefetchShmemSize()

size_t XLogPrefetchShmemSize ( void  )

Definition at line 294 of file xlogprefetcher.c.

295{
296 return sizeof(XLogPrefetchStats);
297}
struct XLogPrefetchStats XLogPrefetchStats

Referenced by CalculateShmemSize().

Variable Documentation

◆ recovery_prefetch

PGDLLIMPORT int recovery_prefetch
extern

Definition at line 68 of file xlogprefetcher.c.

Referenced by assign_recovery_prefetch().