PostgreSQL Source Code  git master
xlogprefetcher.h File Reference
Include dependency graph for xlogprefetcher.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Typedefs

typedef struct XLogPrefetcher XLogPrefetcher
 

Enumerations

enum  RecoveryPrefetchValue { RECOVERY_PREFETCH_OFF , RECOVERY_PREFETCH_ON , RECOVERY_PREFETCH_TRY }
 

Functions

void XLogPrefetchReconfigure (void)
 
size_t XLogPrefetchShmemSize (void)
 
void XLogPrefetchShmemInit (void)
 
void XLogPrefetchResetStats (void)
 
XLogPrefetcherXLogPrefetcherAllocate (XLogReaderState *reader)
 
void XLogPrefetcherFree (XLogPrefetcher *prefetcher)
 
XLogReaderStateXLogPrefetcherGetReader (XLogPrefetcher *prefetcher)
 
void XLogPrefetcherBeginRead (XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
 
XLogRecordXLogPrefetcherReadRecord (XLogPrefetcher *prefetcher, char **errmsg)
 
void XLogPrefetcherComputeStats (XLogPrefetcher *prefetcher)
 

Variables

PGDLLIMPORT int recovery_prefetch
 

Typedef Documentation

◆ XLogPrefetcher

Definition at line 21 of file xlogprefetcher.h.

Enumeration Type Documentation

◆ RecoveryPrefetchValue

Enumerator
RECOVERY_PREFETCH_OFF 
RECOVERY_PREFETCH_ON 
RECOVERY_PREFETCH_TRY 

Definition at line 24 of file xlogprefetcher.h.

25 {
RecoveryPrefetchValue
@ RECOVERY_PREFETCH_OFF
@ RECOVERY_PREFETCH_ON
@ RECOVERY_PREFETCH_TRY

Function Documentation

◆ XLogPrefetcherAllocate()

XLogPrefetcher* XLogPrefetcherAllocate ( XLogReaderState reader)

Definition at line 362 of file xlogprefetcher.c.

363 {
364  XLogPrefetcher *prefetcher;
365  static HASHCTL hash_table_ctl = {
366  .keysize = sizeof(RelFileLocator),
367  .entrysize = sizeof(XLogPrefetcherFilter)
368  };
369 
370  prefetcher = palloc0(sizeof(XLogPrefetcher));
371 
372  prefetcher->reader = reader;
373  prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
374  &hash_table_ctl,
376  dlist_init(&prefetcher->filter_queue);
377 
380  SharedStats->io_depth = 0;
381 
382  /* First usage will cause streaming_read to be allocated. */
384 
385  return prefetcher;
386 }
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * palloc0(Size size)
Definition: mcxt.c:1346
struct RelFileLocator RelFileLocator
Size keysize
Definition: hsearch.h:75
dlist_head filter_queue
XLogReaderState * reader
static int XLogPrefetchReconfigureCount
static XLogPrefetchStats * SharedStats

References XLogPrefetchStats::block_distance, dlist_init(), XLogPrefetcher::filter_queue, XLogPrefetcher::filter_table, HASH_BLOBS, hash_create(), HASH_ELEM, XLogPrefetchStats::io_depth, HASHCTL::keysize, palloc0(), XLogPrefetcher::reader, XLogPrefetcher::reconfigure_count, SharedStats, XLogPrefetchStats::wal_distance, and XLogPrefetchReconfigureCount.

Referenced by InitWalRecovery().

◆ XLogPrefetcherBeginRead()

void XLogPrefetcherBeginRead ( XLogPrefetcher prefetcher,
XLogRecPtr  recPtr 
)

Definition at line 964 of file xlogprefetcher.c.

965 {
966  /* This will forget about any in-flight IO. */
967  prefetcher->reconfigure_count--;
968 
969  /* Book-keeping to avoid readahead on first read. */
970  prefetcher->begin_ptr = recPtr;
971 
972  prefetcher->no_readahead_until = 0;
973 
974  /* This will forget about any queued up records in the decoder. */
975  XLogBeginRead(prefetcher->reader, recPtr);
976 }
XLogRecPtr no_readahead_until
XLogRecPtr begin_ptr
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:231

References XLogPrefetcher::begin_ptr, XLogPrefetcher::no_readahead_until, XLogPrefetcher::reader, XLogPrefetcher::reconfigure_count, and XLogBeginRead().

Referenced by FinishWalRecovery(), InitWalRecovery(), PerformWalRecovery(), and ReadCheckpointRecord().

◆ XLogPrefetcherComputeStats()

void XLogPrefetcherComputeStats ( XLogPrefetcher prefetcher)

Definition at line 412 of file xlogprefetcher.c.

413 {
414  uint32 io_depth;
415  uint32 completed;
416  int64 wal_distance;
417 
418 
419  /* How far ahead of replay are we now? */
420  if (prefetcher->reader->decode_queue_tail)
421  {
422  wal_distance =
423  prefetcher->reader->decode_queue_tail->lsn -
424  prefetcher->reader->decode_queue_head->lsn;
425  }
426  else
427  {
428  wal_distance = 0;
429  }
430 
431  /* How many IOs are currently in flight and completed? */
432  io_depth = lrq_inflight(prefetcher->streaming_read);
433  completed = lrq_completed(prefetcher->streaming_read);
434 
435  /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
436  SharedStats->io_depth = io_depth;
437  SharedStats->block_distance = io_depth + completed;
438  SharedStats->wal_distance = wal_distance;
439 
440  prefetcher->next_stats_shm_lsn =
442 }
unsigned int uint32
Definition: c.h:506
XLogRecPtr lsn
Definition: xlogreader.h:164
LsnReadQueue * streaming_read
XLogRecPtr next_stats_shm_lsn
DecodedXLogRecord * decode_queue_head
Definition: xlogreader.h:260
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206
DecodedXLogRecord * decode_queue_tail
Definition: xlogreader.h:261
static uint32 lrq_completed(LsnReadQueue *lrq)
static uint32 lrq_inflight(LsnReadQueue *lrq)
#define XLOGPREFETCHER_STATS_DISTANCE

References XLogPrefetchStats::block_distance, XLogReaderState::decode_queue_head, XLogReaderState::decode_queue_tail, XLogPrefetchStats::io_depth, lrq_completed(), lrq_inflight(), DecodedXLogRecord::lsn, XLogPrefetcher::next_stats_shm_lsn, XLogPrefetcher::reader, XLogReaderState::ReadRecPtr, SharedStats, XLogPrefetcher::streaming_read, XLogPrefetchStats::wal_distance, and XLOGPREFETCHER_STATS_DISTANCE.

Referenced by ShutdownWalRecovery(), WaitForWALToBecomeAvailable(), and XLogPrefetcherReadRecord().

◆ XLogPrefetcherFree()

void XLogPrefetcherFree ( XLogPrefetcher prefetcher)

Definition at line 392 of file xlogprefetcher.c.

393 {
394  lrq_free(prefetcher->streaming_read);
395  hash_destroy(prefetcher->filter_table);
396  pfree(prefetcher);
397 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865
void pfree(void *pointer)
Definition: mcxt.c:1520
static void lrq_free(LsnReadQueue *lrq)

References XLogPrefetcher::filter_table, hash_destroy(), lrq_free(), pfree(), and XLogPrefetcher::streaming_read.

Referenced by ShutdownWalRecovery().

◆ XLogPrefetcherGetReader()

XLogReaderState* XLogPrefetcherGetReader ( XLogPrefetcher prefetcher)

Definition at line 403 of file xlogprefetcher.c.

404 {
405  return prefetcher->reader;
406 }

References XLogPrefetcher::reader.

Referenced by ReadRecord().

◆ XLogPrefetcherReadRecord()

XLogRecord* XLogPrefetcherReadRecord ( XLogPrefetcher prefetcher,
char **  errmsg 
)

Definition at line 983 of file xlogprefetcher.c.

984 {
985  DecodedXLogRecord *record;
986  XLogRecPtr replayed_up_to;
987 
988  /*
989  * See if it's time to reset the prefetching machinery, because a relevant
990  * GUC was changed.
991  */
993  {
994  uint32 max_distance;
995  uint32 max_inflight;
996 
997  if (prefetcher->streaming_read)
998  lrq_free(prefetcher->streaming_read);
999 
1001  {
1003  max_inflight = maintenance_io_concurrency;
1004  max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
1005  }
1006  else
1007  {
1008  max_inflight = 1;
1009  max_distance = 1;
1010  }
1011 
1012  prefetcher->streaming_read = lrq_alloc(max_distance,
1013  max_inflight,
1014  (uintptr_t) prefetcher,
1016 
1018  }
1019 
1020  /*
1021  * Release last returned record, if there is one, as it's now been
1022  * replayed.
1023  */
1024  replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
1025 
1026  /*
1027  * Can we drop any filters yet? If we were waiting for a relation to be
1028  * created or extended, it is now OK to access blocks in the covered
1029  * range.
1030  */
1031  XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
1032 
1033  /*
1034  * All IO initiated by earlier WAL is now completed. This might trigger
1035  * further prefetching.
1036  */
1037  lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1038 
1039  /*
1040  * If there's nothing queued yet, then start prefetching to cause at least
1041  * one record to be queued.
1042  */
1043  if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1044  {
1045  Assert(lrq_inflight(prefetcher->streaming_read) == 0);
1046  Assert(lrq_completed(prefetcher->streaming_read) == 0);
1047  lrq_prefetch(prefetcher->streaming_read);
1048  }
1049 
1050  /* Read the next record. */
1051  record = XLogNextRecord(prefetcher->reader, errmsg);
1052  if (!record)
1053  return NULL;
1054 
1055  /*
1056  * The record we just got is the "current" one, for the benefit of the
1057  * XLogRecXXX() macros.
1058  */
1059  Assert(record == prefetcher->reader->record);
1060 
1061  /*
1062  * If maintenance_io_concurrency is set very low, we might have started
1063  * prefetching some but not all of the blocks referenced in the record
1064  * we're about to return. Forget about the rest of the blocks in this
1065  * record by dropping the prefetcher's reference to it.
1066  */
1067  if (record == prefetcher->record)
1068  prefetcher->record = NULL;
1069 
1070  /*
1071  * See if it's time to compute some statistics, because enough WAL has
1072  * been processed.
1073  */
1074  if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
1075  XLogPrefetcherComputeStats(prefetcher);
1076 
1077  Assert(record == prefetcher->reader->record);
1078 
1079  return &record->header;
1080 }
int maintenance_io_concurrency
Definition: bufmgr.c:157
#define Assert(condition)
Definition: c.h:858
#define unlikely(x)
Definition: c.h:311
int errmsg(const char *fmt,...)
Definition: elog.c:1072
XLogRecord header
Definition: xlogreader.h:166
DecodedXLogRecord * record
DecodedXLogRecord * record
Definition: xlogreader.h:236
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
#define RecoveryPrefetchEnabled()
static LsnReadQueue * lrq_alloc(uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)
static void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
static void lrq_prefetch(LsnReadQueue *lrq)
static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
static void lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
#define XLOGPREFETCHER_DISTANCE_MULTIPLIER
DecodedXLogRecord * XLogNextRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:325
XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state)
Definition: xlogreader.c:249
static bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
Definition: xlogreader.h:325

References Assert, errmsg(), DecodedXLogRecord::header, lrq_alloc(), lrq_complete_lsn(), lrq_completed(), lrq_free(), lrq_inflight(), lrq_prefetch(), DecodedXLogRecord::lsn, maintenance_io_concurrency, XLogPrefetcher::next_stats_shm_lsn, XLogPrefetcher::reader, XLogPrefetcher::reconfigure_count, XLogPrefetcher::record, XLogReaderState::record, RecoveryPrefetchEnabled, XLogPrefetcher::streaming_read, unlikely, XLogNextRecord(), XLOGPREFETCHER_DISTANCE_MULTIPLIER, XLogPrefetcherCompleteFilters(), XLogPrefetcherComputeStats(), XLogPrefetcherNextBlock(), XLogPrefetchReconfigureCount, XLogReaderHasQueuedRecordOrError(), and XLogReleasePreviousRecord().

Referenced by ReadRecord().

◆ XLogPrefetchReconfigure()

void XLogPrefetchReconfigure ( void  )

◆ XLogPrefetchResetStats()

void XLogPrefetchResetStats ( void  )

Definition at line 303 of file xlogprefetcher.c.

304 {
312 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:480
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1654
pg_atomic_uint64 skip_fpw
pg_atomic_uint64 skip_init
pg_atomic_uint64 reset_time
pg_atomic_uint64 hit
pg_atomic_uint64 prefetch
pg_atomic_uint64 skip_rep
pg_atomic_uint64 skip_new

References GetCurrentTimestamp(), XLogPrefetchStats::hit, pg_atomic_write_u64(), XLogPrefetchStats::prefetch, XLogPrefetchStats::reset_time, SharedStats, XLogPrefetchStats::skip_fpw, XLogPrefetchStats::skip_init, XLogPrefetchStats::skip_new, and XLogPrefetchStats::skip_rep.

Referenced by pg_stat_reset_shared().

◆ XLogPrefetchShmemInit()

void XLogPrefetchShmemInit ( void  )

Definition at line 315 of file xlogprefetcher.c.

316 {
317  bool found;
318 
320  ShmemInitStruct("XLogPrefetchStats",
321  sizeof(XLogPrefetchStats),
322  &found);
323 
324  if (!found)
325  {
333  }
334 }
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:448
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387

References GetCurrentTimestamp(), XLogPrefetchStats::hit, pg_atomic_init_u64(), XLogPrefetchStats::prefetch, XLogPrefetchStats::reset_time, SharedStats, ShmemInitStruct(), XLogPrefetchStats::skip_fpw, XLogPrefetchStats::skip_init, XLogPrefetchStats::skip_new, and XLogPrefetchStats::skip_rep.

Referenced by CreateOrAttachShmemStructs().

◆ XLogPrefetchShmemSize()

size_t XLogPrefetchShmemSize ( void  )

Definition at line 294 of file xlogprefetcher.c.

295 {
296  return sizeof(XLogPrefetchStats);
297 }
struct XLogPrefetchStats XLogPrefetchStats

Referenced by CalculateShmemSize().

Variable Documentation

◆ recovery_prefetch

PGDLLIMPORT int recovery_prefetch
extern

Definition at line 68 of file xlogprefetcher.c.

Referenced by assign_recovery_prefetch().