PostgreSQL Source Code  git master
xlogprefetcher.c File Reference
#include "postgres.h"
#include "access/xlogprefetcher.h"
#include "access/xlogreader.h"
#include "catalog/pg_control.h"
#include "catalog/storage_xlog.h"
#include "commands/dbcommands_xlog.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "port/atomics.h"
#include "storage/bufmgr.h"
#include "storage/shmem.h"
#include "storage/smgr.h"
#include "utils/fmgrprotos.h"
#include "utils/guc_hooks.h"
#include "utils/hsearch.h"
#include "utils/timestamp.h"
Include dependency graph for xlogprefetcher.c:

Go to the source code of this file.

Data Structures

struct  LsnReadQueue
 
struct  XLogPrefetcher
 
struct  XLogPrefetcherFilter
 
struct  XLogPrefetchStats
 

Macros

#define XLOGPREFETCHER_STATS_DISTANCE   BLCKSZ
 
#define XLOGPREFETCHER_SEQ_WINDOW_SIZE   4
 
#define XLOGPREFETCHER_DISTANCE_MULTIPLIER   4
 
#define RecoveryPrefetchEnabled()   false
 
#define PG_STAT_GET_RECOVERY_PREFETCH_COLS   10
 

Typedefs

typedef LsnReadQueueNextStatus(* LsnReadQueueNextFun) (uintptr_t lrq_private, XLogRecPtr *lsn)
 
typedef struct LsnReadQueue LsnReadQueue
 
typedef struct XLogPrefetcherFilter XLogPrefetcherFilter
 
typedef struct XLogPrefetchStats XLogPrefetchStats
 

Enumerations

enum  LsnReadQueueNextStatus { LRQ_NEXT_NO_IO , LRQ_NEXT_IO , LRQ_NEXT_AGAIN }
 

Functions

static void XLogPrefetcherAddFilter (XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno, XLogRecPtr lsn)
 
static bool XLogPrefetcherIsFiltered (XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno)
 
static void XLogPrefetcherCompleteFilters (XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
 
static LsnReadQueueNextStatus XLogPrefetcherNextBlock (uintptr_t pgsr_private, XLogRecPtr *lsn)
 
static LsnReadQueuelrq_alloc (uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)
 
static void lrq_free (LsnReadQueue *lrq)
 
static uint32 lrq_inflight (LsnReadQueue *lrq)
 
static uint32 lrq_completed (LsnReadQueue *lrq)
 
static void lrq_prefetch (LsnReadQueue *lrq)
 
static void lrq_complete_lsn (LsnReadQueue *lrq, XLogRecPtr lsn)
 
size_t XLogPrefetchShmemSize (void)
 
void XLogPrefetchResetStats (void)
 
void XLogPrefetchShmemInit (void)
 
void XLogPrefetchReconfigure (void)
 
static void XLogPrefetchIncrement (pg_atomic_uint64 *counter)
 
XLogPrefetcherXLogPrefetcherAllocate (XLogReaderState *reader)
 
void XLogPrefetcherFree (XLogPrefetcher *prefetcher)
 
XLogReaderStateXLogPrefetcherGetReader (XLogPrefetcher *prefetcher)
 
void XLogPrefetcherComputeStats (XLogPrefetcher *prefetcher)
 
Datum pg_stat_get_recovery_prefetch (PG_FUNCTION_ARGS)
 
void XLogPrefetcherBeginRead (XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
 
XLogRecordXLogPrefetcherReadRecord (XLogPrefetcher *prefetcher, char **errmsg)
 
bool check_recovery_prefetch (int *new_value, void **extra, GucSource source)
 
void assign_recovery_prefetch (int new_value, void *extra)
 

Variables

int recovery_prefetch = RECOVERY_PREFETCH_TRY
 
static int XLogPrefetchReconfigureCount = 0
 
static XLogPrefetchStatsSharedStats
 

Macro Definition Documentation

◆ PG_STAT_GET_RECOVERY_PREFETCH_COLS

#define PG_STAT_GET_RECOVERY_PREFETCH_COLS   10

◆ RecoveryPrefetchEnabled

#define RecoveryPrefetchEnabled ( )    false

Definition at line 75 of file xlogprefetcher.c.

◆ XLOGPREFETCHER_DISTANCE_MULTIPLIER

#define XLOGPREFETCHER_DISTANCE_MULTIPLIER   4

Definition at line 62 of file xlogprefetcher.c.

◆ XLOGPREFETCHER_SEQ_WINDOW_SIZE

#define XLOGPREFETCHER_SEQ_WINDOW_SIZE   4

Definition at line 56 of file xlogprefetcher.c.

◆ XLOGPREFETCHER_STATS_DISTANCE

#define XLOGPREFETCHER_STATS_DISTANCE   BLCKSZ

Definition at line 50 of file xlogprefetcher.c.

Typedef Documentation

◆ LsnReadQueue

typedef struct LsnReadQueue LsnReadQueue

◆ LsnReadQueueNextFun

typedef LsnReadQueueNextStatus(* LsnReadQueueNextFun) (uintptr_t lrq_private, XLogRecPtr *lsn)

Definition at line 94 of file xlogprefetcher.c.

◆ XLogPrefetcherFilter

◆ XLogPrefetchStats

Enumeration Type Documentation

◆ LsnReadQueueNextStatus

Enumerator
LRQ_NEXT_NO_IO 
LRQ_NEXT_IO 
LRQ_NEXT_AGAIN 

Definition at line 83 of file xlogprefetcher.c.

84 {
LsnReadQueueNextStatus
@ LRQ_NEXT_NO_IO
@ LRQ_NEXT_IO
@ LRQ_NEXT_AGAIN

Function Documentation

◆ assign_recovery_prefetch()

void assign_recovery_prefetch ( int  new_value,
void *  extra 
)

Definition at line 1097 of file xlogprefetcher.c.

1098 {
1099  /* Reconfigure prefetching, because a setting it depends on changed. */
1100  recovery_prefetch = new_value;
1101  if (AmStartupProcess())
1103 }
#define AmStartupProcess()
Definition: miscadmin.h:382
int recovery_prefetch
void XLogPrefetchReconfigure(void)

References AmStartupProcess, recovery_prefetch, and XLogPrefetchReconfigure().

◆ check_recovery_prefetch()

bool check_recovery_prefetch ( int *  new_value,
void **  extra,
GucSource  source 
)

Definition at line 1083 of file xlogprefetcher.c.

1084 {
1085 #ifndef USE_PREFETCH
1086  if (*new_value == RECOVERY_PREFETCH_ON)
1087  {
1088  GUC_check_errdetail("recovery_prefetch is not supported on platforms that lack posix_fadvise().");
1089  return false;
1090  }
1091 #endif
1092 
1093  return true;
1094 }
#define GUC_check_errdetail
Definition: guc.h:448
@ RECOVERY_PREFETCH_ON

References GUC_check_errdetail, and RECOVERY_PREFETCH_ON.

◆ lrq_alloc()

static LsnReadQueue* lrq_alloc ( uint32  max_distance,
uint32  max_inflight,
uintptr_t  lrq_private,
LsnReadQueueNextFun  next 
)
inlinestatic

Definition at line 202 of file xlogprefetcher.c.

206 {
207  LsnReadQueue *lrq;
208  uint32 size;
209 
210  Assert(max_distance >= max_inflight);
211 
212  size = max_distance + 1; /* full ring buffer has a gap */
213  lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
214  lrq->lrq_private = lrq_private;
215  lrq->max_inflight = max_inflight;
216  lrq->size = size;
217  lrq->next = next;
218  lrq->head = 0;
219  lrq->tail = 0;
220  lrq->inflight = 0;
221  lrq->completed = 0;
222 
223  return lrq;
224 }
static int32 next
Definition: blutils.c:221
unsigned int uint32
Definition: c.h:506
#define Assert(condition)
Definition: c.h:858
void * palloc(Size size)
Definition: mcxt.c:1316
static pg_noinline void Size size
Definition: slab.c:607
uint32 max_inflight
LsnReadQueueNextFun next
uintptr_t lrq_private
struct LsnReadQueue::@17 queue[FLEXIBLE_ARRAY_MEMBER]

References Assert, LsnReadQueue::completed, LsnReadQueue::head, LsnReadQueue::inflight, LsnReadQueue::lrq_private, LsnReadQueue::max_inflight, next, LsnReadQueue::next, palloc(), LsnReadQueue::queue, LsnReadQueue::size, size, and LsnReadQueue::tail.

Referenced by XLogPrefetcherReadRecord().

◆ lrq_complete_lsn()

static void lrq_complete_lsn ( LsnReadQueue lrq,
XLogRecPtr  lsn 
)
inlinestatic

Definition at line 272 of file xlogprefetcher.c.

273 {
274  /*
275  * We know that LSNs before 'lsn' have been replayed, so we can now assume
276  * that any IOs that were started before then have finished.
277  */
278  while (lrq->tail != lrq->head &&
279  lrq->queue[lrq->tail].lsn < lsn)
280  {
281  if (lrq->queue[lrq->tail].io)
282  lrq->inflight--;
283  else
284  lrq->completed--;
285  lrq->tail++;
286  if (lrq->tail == lrq->size)
287  lrq->tail = 0;
288  }
290  lrq_prefetch(lrq);
291 }
XLogRecPtr lsn
#define RecoveryPrefetchEnabled()
static void lrq_prefetch(LsnReadQueue *lrq)

References LsnReadQueue::completed, LsnReadQueue::head, LsnReadQueue::inflight, LsnReadQueue::io, lrq_prefetch(), LsnReadQueue::lsn, LsnReadQueue::queue, RecoveryPrefetchEnabled, LsnReadQueue::size, and LsnReadQueue::tail.

Referenced by XLogPrefetcherReadRecord().

◆ lrq_completed()

static uint32 lrq_completed ( LsnReadQueue lrq)
inlinestatic

Definition at line 239 of file xlogprefetcher.c.

240 {
241  return lrq->completed;
242 }

References LsnReadQueue::completed.

Referenced by XLogPrefetcherComputeStats(), and XLogPrefetcherReadRecord().

◆ lrq_free()

static void lrq_free ( LsnReadQueue lrq)
inlinestatic

Definition at line 227 of file xlogprefetcher.c.

228 {
229  pfree(lrq);
230 }
void pfree(void *pointer)
Definition: mcxt.c:1520

References pfree().

Referenced by XLogPrefetcherFree(), and XLogPrefetcherReadRecord().

◆ lrq_inflight()

static uint32 lrq_inflight ( LsnReadQueue lrq)
inlinestatic

Definition at line 233 of file xlogprefetcher.c.

234 {
235  return lrq->inflight;
236 }

References LsnReadQueue::inflight.

Referenced by XLogPrefetcherComputeStats(), and XLogPrefetcherReadRecord().

◆ lrq_prefetch()

static void lrq_prefetch ( LsnReadQueue lrq)
inlinestatic

Definition at line 245 of file xlogprefetcher.c.

246 {
247  /* Try to start as many IOs as we can within our limits. */
248  while (lrq->inflight < lrq->max_inflight &&
249  lrq->inflight + lrq->completed < lrq->size - 1)
250  {
251  Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
252  switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
253  {
254  case LRQ_NEXT_AGAIN:
255  return;
256  case LRQ_NEXT_IO:
257  lrq->queue[lrq->head].io = true;
258  lrq->inflight++;
259  break;
260  case LRQ_NEXT_NO_IO:
261  lrq->queue[lrq->head].io = false;
262  lrq->completed++;
263  break;
264  }
265  lrq->head++;
266  if (lrq->head == lrq->size)
267  lrq->head = 0;
268  }
269 }

References Assert, LsnReadQueue::completed, LsnReadQueue::head, LsnReadQueue::inflight, LsnReadQueue::io, LRQ_NEXT_AGAIN, LRQ_NEXT_IO, LRQ_NEXT_NO_IO, LsnReadQueue::lrq_private, LsnReadQueue::lsn, LsnReadQueue::max_inflight, LsnReadQueue::next, LsnReadQueue::queue, LsnReadQueue::size, and LsnReadQueue::tail.

Referenced by lrq_complete_lsn(), and XLogPrefetcherReadRecord().

◆ pg_stat_get_recovery_prefetch()

Datum pg_stat_get_recovery_prefetch ( PG_FUNCTION_ARGS  )

Definition at line 826 of file xlogprefetcher.c.

827 {
828 #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
829  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
832 
833  InitMaterializedSRF(fcinfo, 0);
834 
835  for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
836  nulls[i] = false;
837 
848  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
849 
850  return (Datum) 0;
851 }
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:462
static Datum values[MAXATTR]
Definition: bootstrap.c:152
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1807
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
int i
Definition: isn.c:73
uintptr_t Datum
Definition: postgres.h:64
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
TupleDesc setDesc
Definition: execnodes.h:340
Tuplestorestate * setResult
Definition: execnodes.h:339
pg_atomic_uint64 skip_fpw
pg_atomic_uint64 skip_init
pg_atomic_uint64 reset_time
pg_atomic_uint64 hit
pg_atomic_uint64 prefetch
pg_atomic_uint64 skip_rep
pg_atomic_uint64 skip_new
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:750
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static XLogPrefetchStats * SharedStats
#define PG_STAT_GET_RECOVERY_PREFETCH_COLS

References XLogPrefetchStats::block_distance, XLogPrefetchStats::hit, i, InitMaterializedSRF(), Int32GetDatum(), Int64GetDatum(), XLogPrefetchStats::io_depth, pg_atomic_read_u64(), PG_STAT_GET_RECOVERY_PREFETCH_COLS, XLogPrefetchStats::prefetch, XLogPrefetchStats::reset_time, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SharedStats, XLogPrefetchStats::skip_fpw, XLogPrefetchStats::skip_init, XLogPrefetchStats::skip_new, XLogPrefetchStats::skip_rep, TimestampTzGetDatum(), tuplestore_putvalues(), values, and XLogPrefetchStats::wal_distance.

◆ XLogPrefetcherAddFilter()

static void XLogPrefetcherAddFilter ( XLogPrefetcher prefetcher,
RelFileLocator  rlocator,
BlockNumber  blockno,
XLogRecPtr  lsn 
)
inlinestatic

Definition at line 858 of file xlogprefetcher.c.

860 {
861  XLogPrefetcherFilter *filter;
862  bool found;
863 
864  filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
865  if (!found)
866  {
867  /*
868  * Don't allow any prefetching of this block or higher until replayed.
869  */
870  filter->filter_until_replayed = lsn;
871  filter->filter_from_block = blockno;
872  dlist_push_head(&prefetcher->filter_queue, &filter->link);
873  }
874  else
875  {
876  /*
877  * We were already filtering this rlocator. Extend the filter's
878  * lifetime to cover this WAL record, but leave the lower of the block
879  * numbers there because we don't want to have to track individual
880  * blocks.
881  */
882  filter->filter_until_replayed = lsn;
883  dlist_delete(&filter->link);
884  dlist_push_head(&prefetcher->filter_queue, &filter->link);
885  filter->filter_from_block = Min(filter->filter_from_block, blockno);
886  }
887 }
#define Min(x, y)
Definition: c.h:1004
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
@ HASH_ENTER
Definition: hsearch.h:114
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static void dlist_push_head(dlist_head *head, dlist_node *node)
Definition: ilist.h:347
XLogRecPtr filter_until_replayed
BlockNumber filter_from_block
dlist_head filter_queue

References dlist_delete(), dlist_push_head(), XLogPrefetcherFilter::filter_from_block, XLogPrefetcher::filter_queue, XLogPrefetcher::filter_table, XLogPrefetcherFilter::filter_until_replayed, HASH_ENTER, hash_search(), XLogPrefetcherFilter::link, and Min.

Referenced by XLogPrefetcherNextBlock().

◆ XLogPrefetcherAllocate()

XLogPrefetcher* XLogPrefetcherAllocate ( XLogReaderState reader)

Definition at line 362 of file xlogprefetcher.c.

363 {
364  XLogPrefetcher *prefetcher;
365  static HASHCTL hash_table_ctl = {
366  .keysize = sizeof(RelFileLocator),
367  .entrysize = sizeof(XLogPrefetcherFilter)
368  };
369 
370  prefetcher = palloc0(sizeof(XLogPrefetcher));
371 
372  prefetcher->reader = reader;
373  prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
374  &hash_table_ctl,
376  dlist_init(&prefetcher->filter_queue);
377 
380  SharedStats->io_depth = 0;
381 
382  /* First usage will cause streaming_read to be allocated. */
384 
385  return prefetcher;
386 }
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * palloc0(Size size)
Definition: mcxt.c:1346
struct RelFileLocator RelFileLocator
Size keysize
Definition: hsearch.h:75
XLogReaderState * reader
static int XLogPrefetchReconfigureCount

References XLogPrefetchStats::block_distance, dlist_init(), XLogPrefetcher::filter_queue, XLogPrefetcher::filter_table, HASH_BLOBS, hash_create(), HASH_ELEM, XLogPrefetchStats::io_depth, HASHCTL::keysize, palloc0(), XLogPrefetcher::reader, XLogPrefetcher::reconfigure_count, SharedStats, XLogPrefetchStats::wal_distance, and XLogPrefetchReconfigureCount.

Referenced by InitWalRecovery().

◆ XLogPrefetcherBeginRead()

void XLogPrefetcherBeginRead ( XLogPrefetcher prefetcher,
XLogRecPtr  recPtr 
)

Definition at line 964 of file xlogprefetcher.c.

965 {
966  /* This will forget about any in-flight IO. */
967  prefetcher->reconfigure_count--;
968 
969  /* Book-keeping to avoid readahead on first read. */
970  prefetcher->begin_ptr = recPtr;
971 
972  prefetcher->no_readahead_until = 0;
973 
974  /* This will forget about any queued up records in the decoder. */
975  XLogBeginRead(prefetcher->reader, recPtr);
976 }
XLogRecPtr no_readahead_until
XLogRecPtr begin_ptr
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:231

References XLogPrefetcher::begin_ptr, XLogPrefetcher::no_readahead_until, XLogPrefetcher::reader, XLogPrefetcher::reconfigure_count, and XLogBeginRead().

Referenced by FinishWalRecovery(), InitWalRecovery(), PerformWalRecovery(), and ReadCheckpointRecord().

◆ XLogPrefetcherCompleteFilters()

static void XLogPrefetcherCompleteFilters ( XLogPrefetcher prefetcher,
XLogRecPtr  replaying_lsn 
)
inlinestatic

Definition at line 896 of file xlogprefetcher.c.

897 {
898  while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
899  {
901  link,
902  &prefetcher->filter_queue);
903 
904  if (filter->filter_until_replayed >= replaying_lsn)
905  break;
906 
907  dlist_delete(&filter->link);
908  hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
909  }
910 }
#define unlikely(x)
Definition: c.h:311
@ HASH_REMOVE
Definition: hsearch.h:115
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:612
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336

References dlist_delete(), dlist_is_empty(), dlist_tail_element, XLogPrefetcher::filter_queue, XLogPrefetcher::filter_table, XLogPrefetcherFilter::filter_until_replayed, HASH_REMOVE, hash_search(), XLogPrefetcherFilter::link, and unlikely.

Referenced by XLogPrefetcherReadRecord().

◆ XLogPrefetcherComputeStats()

void XLogPrefetcherComputeStats ( XLogPrefetcher prefetcher)

Definition at line 412 of file xlogprefetcher.c.

413 {
414  uint32 io_depth;
415  uint32 completed;
416  int64 wal_distance;
417 
418 
419  /* How far ahead of replay are we now? */
420  if (prefetcher->reader->decode_queue_tail)
421  {
422  wal_distance =
423  prefetcher->reader->decode_queue_tail->lsn -
424  prefetcher->reader->decode_queue_head->lsn;
425  }
426  else
427  {
428  wal_distance = 0;
429  }
430 
431  /* How many IOs are currently in flight and completed? */
432  io_depth = lrq_inflight(prefetcher->streaming_read);
433  completed = lrq_completed(prefetcher->streaming_read);
434 
435  /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
436  SharedStats->io_depth = io_depth;
437  SharedStats->block_distance = io_depth + completed;
438  SharedStats->wal_distance = wal_distance;
439 
440  prefetcher->next_stats_shm_lsn =
442 }
XLogRecPtr lsn
Definition: xlogreader.h:164
LsnReadQueue * streaming_read
XLogRecPtr next_stats_shm_lsn
DecodedXLogRecord * decode_queue_head
Definition: xlogreader.h:260
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206
DecodedXLogRecord * decode_queue_tail
Definition: xlogreader.h:261
static uint32 lrq_completed(LsnReadQueue *lrq)
static uint32 lrq_inflight(LsnReadQueue *lrq)
#define XLOGPREFETCHER_STATS_DISTANCE

References XLogPrefetchStats::block_distance, XLogReaderState::decode_queue_head, XLogReaderState::decode_queue_tail, XLogPrefetchStats::io_depth, lrq_completed(), lrq_inflight(), DecodedXLogRecord::lsn, XLogPrefetcher::next_stats_shm_lsn, XLogPrefetcher::reader, XLogReaderState::ReadRecPtr, SharedStats, XLogPrefetcher::streaming_read, XLogPrefetchStats::wal_distance, and XLOGPREFETCHER_STATS_DISTANCE.

Referenced by ShutdownWalRecovery(), WaitForWALToBecomeAvailable(), and XLogPrefetcherReadRecord().

◆ XLogPrefetcherFree()

void XLogPrefetcherFree ( XLogPrefetcher prefetcher)

Definition at line 392 of file xlogprefetcher.c.

393 {
394  lrq_free(prefetcher->streaming_read);
395  hash_destroy(prefetcher->filter_table);
396  pfree(prefetcher);
397 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865
static void lrq_free(LsnReadQueue *lrq)

References XLogPrefetcher::filter_table, hash_destroy(), lrq_free(), pfree(), and XLogPrefetcher::streaming_read.

Referenced by ShutdownWalRecovery().

◆ XLogPrefetcherGetReader()

XLogReaderState* XLogPrefetcherGetReader ( XLogPrefetcher prefetcher)

Definition at line 403 of file xlogprefetcher.c.

404 {
405  return prefetcher->reader;
406 }

References XLogPrefetcher::reader.

Referenced by ReadRecord().

◆ XLogPrefetcherIsFiltered()

static bool XLogPrefetcherIsFiltered ( XLogPrefetcher prefetcher,
RelFileLocator  rlocator,
BlockNumber  blockno 
)
inlinestatic

Definition at line 916 of file xlogprefetcher.c.

918 {
919  /*
920  * Test for empty queue first, because we expect it to be empty most of
921  * the time and we can avoid the hash table lookup in that case.
922  */
923  if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
924  {
925  XLogPrefetcherFilter *filter;
926 
927  /* See if the block range is filtered. */
928  filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
929  if (filter && filter->filter_from_block <= blockno)
930  {
931 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
932  elog(XLOGPREFETCHER_DEBUG_LEVEL,
933  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
934  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
936  filter->filter_from_block);
937 #endif
938  return true;
939  }
940 
941  /* See if the whole database is filtered. */
942  rlocator.relNumber = InvalidRelFileNumber;
943  rlocator.spcOid = InvalidOid;
944  filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
945  if (filter)
946  {
947 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
948  elog(XLOGPREFETCHER_DEBUG_LEVEL,
949  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
950  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
952 #endif
953  return true;
954  }
955  }
956 
957  return false;
958 }
#define elog(elevel,...)
Definition: elog.h:224
@ HASH_FIND
Definition: hsearch.h:113
#define InvalidOid
Definition: postgres_ext.h:36
#define InvalidRelFileNumber
Definition: relpath.h:26
RelFileNumber relNumber
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References RelFileLocator::dbOid, dlist_is_empty(), elog, XLogPrefetcherFilter::filter_from_block, XLogPrefetcher::filter_queue, XLogPrefetcher::filter_table, XLogPrefetcherFilter::filter_until_replayed, HASH_FIND, hash_search(), InvalidOid, InvalidRelFileNumber, LSN_FORMAT_ARGS, RelFileLocator::relNumber, RelFileLocator::spcOid, and unlikely.

Referenced by XLogPrefetcherNextBlock().

◆ XLogPrefetcherNextBlock()

static LsnReadQueueNextStatus XLogPrefetcherNextBlock ( uintptr_t  pgsr_private,
XLogRecPtr lsn 
)
static

Definition at line 461 of file xlogprefetcher.c.

462 {
463  XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
464  XLogReaderState *reader = prefetcher->reader;
465  XLogRecPtr replaying_lsn = reader->ReadRecPtr;
466 
467  /*
468  * We keep track of the record and block we're up to between calls with
469  * prefetcher->record and prefetcher->next_block_id.
470  */
471  for (;;)
472  {
473  DecodedXLogRecord *record;
474 
475  /* Try to read a new future record, if we don't already have one. */
476  if (prefetcher->record == NULL)
477  {
478  bool nonblocking;
479 
480  /*
481  * If there are already records or an error queued up that could
482  * be replayed, we don't want to block here. Otherwise, it's OK
483  * to block waiting for more data: presumably the caller has
484  * nothing else to do.
485  */
486  nonblocking = XLogReaderHasQueuedRecordOrError(reader);
487 
488  /* Readahead is disabled until we replay past a certain point. */
489  if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
490  return LRQ_NEXT_AGAIN;
491 
492  record = XLogReadAhead(prefetcher->reader, nonblocking);
493  if (record == NULL)
494  {
495  /*
496  * We can't read any more, due to an error or lack of data in
497  * nonblocking mode. Don't try to read ahead again until
498  * we've replayed everything already decoded.
499  */
500  if (nonblocking && prefetcher->reader->decode_queue_tail)
501  prefetcher->no_readahead_until =
502  prefetcher->reader->decode_queue_tail->lsn;
503 
504  return LRQ_NEXT_AGAIN;
505  }
506 
507  /*
508  * If prefetching is disabled, we don't need to analyze the record
509  * or issue any prefetches. We just need to cause one record to
510  * be decoded.
511  */
513  {
514  *lsn = InvalidXLogRecPtr;
515  return LRQ_NEXT_NO_IO;
516  }
517 
518  /* We have a new record to process. */
519  prefetcher->record = record;
520  prefetcher->next_block_id = 0;
521  }
522  else
523  {
524  /* Continue to process from last call, or last loop. */
525  record = prefetcher->record;
526  }
527 
528  /*
529  * Check for operations that require us to filter out block ranges, or
530  * pause readahead completely.
531  */
532  if (replaying_lsn < record->lsn)
533  {
534  uint8 rmid = record->header.xl_rmid;
535  uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK;
536 
537  if (rmid == RM_XLOG_ID)
538  {
539  if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
540  record_type == XLOG_END_OF_RECOVERY)
541  {
542  /*
543  * These records might change the TLI. Avoid potential
544  * bugs if we were to allow "read TLI" and "replay TLI" to
545  * differ without more analysis.
546  */
547  prefetcher->no_readahead_until = record->lsn;
548 
549 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
550  elog(XLOGPREFETCHER_DEBUG_LEVEL,
551  "suppressing all readahead until %X/%X is replayed due to possible TLI change",
552  LSN_FORMAT_ARGS(record->lsn));
553 #endif
554 
555  /* Fall through so we move past this record. */
556  }
557  }
558  else if (rmid == RM_DBASE_ID)
559  {
560  /*
561  * When databases are created with the file-copy strategy,
562  * there are no WAL records to tell us about the creation of
563  * individual relations.
564  */
565  if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
566  {
569  RelFileLocator rlocator =
571 
572  /*
573  * Don't try to prefetch anything in this database until
574  * it has been created, or we might confuse the blocks of
575  * different generations, if a database OID or
576  * relfilenumber is reused. It's also more efficient than
577  * discovering that relations don't exist on disk yet with
578  * ENOENT errors.
579  */
580  XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
581 
582 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
583  elog(XLOGPREFETCHER_DEBUG_LEVEL,
584  "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
585  rlocator.dbOid,
586  LSN_FORMAT_ARGS(record->lsn));
587 #endif
588  }
589  }
590  else if (rmid == RM_SMGR_ID)
591  {
592  if (record_type == XLOG_SMGR_CREATE)
593  {
594  xl_smgr_create *xlrec = (xl_smgr_create *)
595  record->main_data;
596 
597  if (xlrec->forkNum == MAIN_FORKNUM)
598  {
599  /*
600  * Don't prefetch anything for this whole relation
601  * until it has been created. Otherwise we might
602  * confuse the blocks of different generations, if a
603  * relfilenumber is reused. This also avoids the need
604  * to discover the problem via extra syscalls that
605  * report ENOENT.
606  */
607  XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
608  record->lsn);
609 
610 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
611  elog(XLOGPREFETCHER_DEBUG_LEVEL,
612  "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
613  xlrec->rlocator.spcOid,
614  xlrec->rlocator.dbOid,
615  xlrec->rlocator.relNumber,
616  LSN_FORMAT_ARGS(record->lsn));
617 #endif
618  }
619  }
620  else if (record_type == XLOG_SMGR_TRUNCATE)
621  {
623  record->main_data;
624 
625  /*
626  * Don't consider prefetching anything in the truncated
627  * range until the truncation has been performed.
628  */
629  XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
630  xlrec->blkno,
631  record->lsn);
632 
633 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
634  elog(XLOGPREFETCHER_DEBUG_LEVEL,
635  "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
636  xlrec->rlocator.spcOid,
637  xlrec->rlocator.dbOid,
638  xlrec->rlocator.relNumber,
639  xlrec->blkno,
640  LSN_FORMAT_ARGS(record->lsn));
641 #endif
642  }
643  }
644  }
645 
646  /* Scan the block references, starting where we left off last time. */
647  while (prefetcher->next_block_id <= record->max_block_id)
648  {
649  int block_id = prefetcher->next_block_id++;
650  DecodedBkpBlock *block = &record->blocks[block_id];
651  SMgrRelation reln;
652  PrefetchBufferResult result;
653 
654  if (!block->in_use)
655  continue;
656 
658 
659  /*
660  * Record the LSN of this record. When it's replayed,
661  * LsnReadQueue will consider any IOs submitted for earlier LSNs
662  * to be finished.
663  */
664  *lsn = record->lsn;
665 
666  /* We don't try to prefetch anything but the main fork for now. */
667  if (block->forknum != MAIN_FORKNUM)
668  {
669  return LRQ_NEXT_NO_IO;
670  }
671 
672  /*
673  * If there is a full page image attached, we won't be reading the
674  * page, so don't bother trying to prefetch.
675  */
676  if (block->has_image)
677  {
679  return LRQ_NEXT_NO_IO;
680  }
681 
682  /* There is no point in reading a page that will be zeroed. */
683  if (block->flags & BKPBLOCK_WILL_INIT)
684  {
686  return LRQ_NEXT_NO_IO;
687  }
688 
689  /* Should we skip prefetching this block due to a filter? */
690  if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
691  {
693  return LRQ_NEXT_NO_IO;
694  }
695 
696  /* There is no point in repeatedly prefetching the same block. */
697  for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
698  {
699  if (block->blkno == prefetcher->recent_block[i] &&
700  RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
701  {
702  /*
703  * XXX If we also remembered where it was, we could set
704  * recent_buffer so that recovery could skip smgropen()
705  * and a buffer table lookup.
706  */
708  return LRQ_NEXT_NO_IO;
709  }
710  }
711  prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
712  prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
713  prefetcher->recent_idx =
714  (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
715 
716  /*
717  * We could try to have a fast path for repeated references to the
718  * same relation (with some scheme to handle invalidations
719  * safely), but for now we'll call smgropen() every time.
720  */
721  reln = smgropen(block->rlocator, INVALID_PROC_NUMBER);
722 
723  /*
724  * If the relation file doesn't exist on disk, for example because
725  * we're replaying after a crash and the file will be created and
726  * then unlinked by WAL that hasn't been replayed yet, suppress
727  * further prefetching in the relation until this record is
728  * replayed.
729  */
730  if (!smgrexists(reln, MAIN_FORKNUM))
731  {
732 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
733  elog(XLOGPREFETCHER_DEBUG_LEVEL,
734  "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
738  LSN_FORMAT_ARGS(record->lsn));
739 #endif
740  XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
741  record->lsn);
743  return LRQ_NEXT_NO_IO;
744  }
745 
746  /*
747  * If the relation isn't big enough to contain the referenced
748  * block yet, suppress prefetching of this block and higher until
749  * this record is replayed.
750  */
751  if (block->blkno >= smgrnblocks(reln, block->forknum))
752  {
753 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
754  elog(XLOGPREFETCHER_DEBUG_LEVEL,
755  "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
759  block->blkno,
760  LSN_FORMAT_ARGS(record->lsn));
761 #endif
762  XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
763  record->lsn);
765  return LRQ_NEXT_NO_IO;
766  }
767 
768  /* Try to initiate prefetching. */
769  result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
770  if (BufferIsValid(result.recent_buffer))
771  {
772  /* Cache hit, nothing to do. */
774  block->prefetch_buffer = result.recent_buffer;
775  return LRQ_NEXT_NO_IO;
776  }
777  else if (result.initiated_io)
778  {
779  /* Cache miss, I/O (presumably) started. */
782  return LRQ_NEXT_IO;
783  }
784  else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
785  {
786  /*
787  * This shouldn't be possible, because we already determined
788  * that the relation exists on disk and is big enough.
789  * Something is wrong with the cache invalidation for
790  * smgrexists(), smgrnblocks(), or the file was unlinked or
791  * truncated beneath our feet?
792  */
793  elog(ERROR,
794  "could not prefetch relation %u/%u/%u block %u",
798  block->blkno);
799  }
800  }
801 
802  /*
803  * Several callsites need to be able to read exactly one record
804  * without any internal readahead. Examples: xlog.c reading
805  * checkpoint records with emode set to PANIC, which might otherwise
806  * cause XLogPageRead() to panic on some future page, and xlog.c
807  * determining where to start writing WAL next, which depends on the
808  * contents of the reader's internal buffer after reading one record.
809  * Therefore, don't even think about prefetching until the first
810  * record after XLogPrefetcherBeginRead() has been consumed.
811  */
812  if (prefetcher->reader->decode_queue_tail &&
813  prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
814  return LRQ_NEXT_AGAIN;
815 
816  /* Advance to the next record. */
817  prefetcher->record = NULL;
818  }
819  pg_unreachable();
820 }
#define InvalidBuffer
Definition: buf.h:25
PrefetchBufferResult PrefetchSharedBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, BlockNumber blockNum)
Definition: bufmgr.c:548
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:359
#define pg_unreachable()
Definition: c.h:296
unsigned char uint8
Definition: c.h:504
#define XLOG_DBASE_CREATE_FILE_COPY
#define ERROR
Definition: elog.h:39
int io_direct_flags
Definition: fd.c:168
#define IO_DIRECT_DATA
Definition: fd.h:54
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
#define XLOG_CHECKPOINT_SHUTDOWN
Definition: pg_control.h:67
#define XLOG_END_OF_RECOVERY
Definition: pg_control.h:76
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
#define RelFileLocatorEquals(locator1, locator2)
@ MAIN_FORKNUM
Definition: relpath.h:50
BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:655
SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend)
Definition: smgr.c:198
bool smgrexists(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:398
#define XLOG_SMGR_CREATE
Definition: storage_xlog.h:30
#define XLOG_SMGR_TRUNCATE
Definition: storage_xlog.h:31
Buffer prefetch_buffer
Definition: xlogreader.h:130
RelFileLocator rlocator
Definition: xlogreader.h:125
BlockNumber blkno
Definition: xlogreader.h:127
ForkNumber forknum
Definition: xlogreader.h:126
XLogRecord header
Definition: xlogreader.h:166
DecodedBkpBlock blocks[FLEXIBLE_ARRAY_MEMBER]
Definition: xlogreader.h:172
Buffer recent_buffer
Definition: bufmgr.h:60
RelFileLocator locator
RelFileLocatorBackend smgr_rlocator
Definition: smgr.h:37
RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
DecodedXLogRecord * record
BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
uint8 xl_info
Definition: xlogrecord.h:46
RmgrId xl_rmid
Definition: xlogrecord.h:47
ForkNumber forkNum
Definition: storage_xlog.h:36
RelFileLocator rlocator
Definition: storage_xlog.h:35
RelFileLocator rlocator
Definition: storage_xlog.h:49
BlockNumber blkno
Definition: storage_xlog.h:48
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno)
static void XLogPrefetchIncrement(pg_atomic_uint64 *counter)
#define XLOGPREFETCHER_SEQ_WINDOW_SIZE
static void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno, XLogRecPtr lsn)
DecodedXLogRecord * XLogReadAhead(XLogReaderState *state, bool nonblocking)
Definition: xlogreader.c:966
static bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
Definition: xlogreader.h:325
#define BKPBLOCK_WILL_INIT
Definition: xlogrecord.h:199
#define XLR_INFO_MASK
Definition: xlogrecord.h:62

References Assert, XLogPrefetcher::begin_ptr, BKPBLOCK_WILL_INIT, DecodedBkpBlock::blkno, xl_smgr_truncate::blkno, BufferIsValid(), xl_dbase_create_file_copy_rec::db_id, RelFileLocator::dbOid, XLogReaderState::decode_queue_tail, elog, ERROR, DecodedBkpBlock::flags, DecodedBkpBlock::forknum, xl_smgr_create::forkNum, DecodedBkpBlock::has_image, DecodedXLogRecord::header, XLogPrefetchStats::hit, i, if(), DecodedBkpBlock::in_use, PrefetchBufferResult::initiated_io, INVALID_PROC_NUMBER, InvalidBuffer, InvalidOid, InvalidRelFileNumber, InvalidXLogRecPtr, IO_DIRECT_DATA, io_direct_flags, RelFileLocatorBackend::locator, LRQ_NEXT_AGAIN, LRQ_NEXT_IO, LRQ_NEXT_NO_IO, DecodedXLogRecord::lsn, LSN_FORMAT_ARGS, DecodedXLogRecord::main_data, MAIN_FORKNUM, XLogPrefetcher::next_block_id, XLogPrefetcher::no_readahead_until, XLogPrefetchStats::prefetch, DecodedBkpBlock::prefetch_buffer, PrefetchSharedBuffer(), XLogPrefetcher::reader, XLogReaderState::ReadRecPtr, XLogPrefetcher::recent_block, PrefetchBufferResult::recent_buffer, XLogPrefetcher::recent_idx, XLogPrefetcher::recent_rlocator, XLogPrefetcher::record, RecoveryPrefetchEnabled, RelFileLocatorEquals, RelFileLocator::relNumber, DecodedBkpBlock::rlocator, xl_smgr_create::rlocator, xl_smgr_truncate::rlocator, SharedStats, XLogPrefetchStats::skip_fpw, XLogPrefetchStats::skip_init, XLogPrefetchStats::skip_new, XLogPrefetchStats::skip_rep, SMgrRelationData::smgr_rlocator, smgrexists(), smgrnblocks(), smgropen(), RelFileLocator::spcOid, XLogRecord::xl_info, XLogRecord::xl_rmid, XLOG_CHECKPOINT_SHUTDOWN, XLOG_DBASE_CREATE_FILE_COPY, XLOG_END_OF_RECOVERY, XLOG_SMGR_CREATE, XLOG_SMGR_TRUNCATE, XLOGPREFETCHER_SEQ_WINDOW_SIZE, XLogPrefetcherAddFilter(), XLogPrefetcherIsFiltered(), XLogPrefetchIncrement(), XLogReadAhead(), XLogReaderHasQueuedRecordOrError(), and XLR_INFO_MASK.

Referenced by XLogPrefetcherReadRecord().

◆ XLogPrefetcherReadRecord()

XLogRecord* XLogPrefetcherReadRecord ( XLogPrefetcher prefetcher,
char **  errmsg 
)

Definition at line 983 of file xlogprefetcher.c.

984 {
985  DecodedXLogRecord *record;
986  XLogRecPtr replayed_up_to;
987 
988  /*
989  * See if it's time to reset the prefetching machinery, because a relevant
990  * GUC was changed.
991  */
993  {
994  uint32 max_distance;
995  uint32 max_inflight;
996 
997  if (prefetcher->streaming_read)
998  lrq_free(prefetcher->streaming_read);
999 
1001  {
1003  max_inflight = maintenance_io_concurrency;
1004  max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
1005  }
1006  else
1007  {
1008  max_inflight = 1;
1009  max_distance = 1;
1010  }
1011 
1012  prefetcher->streaming_read = lrq_alloc(max_distance,
1013  max_inflight,
1014  (uintptr_t) prefetcher,
1016 
1018  }
1019 
1020  /*
1021  * Release last returned record, if there is one, as it's now been
1022  * replayed.
1023  */
1024  replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
1025 
1026  /*
1027  * Can we drop any filters yet? If we were waiting for a relation to be
1028  * created or extended, it is now OK to access blocks in the covered
1029  * range.
1030  */
1031  XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
1032 
1033  /*
1034  * All IO initiated by earlier WAL is now completed. This might trigger
1035  * further prefetching.
1036  */
1037  lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1038 
1039  /*
1040  * If there's nothing queued yet, then start prefetching to cause at least
1041  * one record to be queued.
1042  */
1043  if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1044  {
1045  Assert(lrq_inflight(prefetcher->streaming_read) == 0);
1046  Assert(lrq_completed(prefetcher->streaming_read) == 0);
1047  lrq_prefetch(prefetcher->streaming_read);
1048  }
1049 
1050  /* Read the next record. */
1051  record = XLogNextRecord(prefetcher->reader, errmsg);
1052  if (!record)
1053  return NULL;
1054 
1055  /*
1056  * The record we just got is the "current" one, for the benefit of the
1057  * XLogRecXXX() macros.
1058  */
1059  Assert(record == prefetcher->reader->record);
1060 
1061  /*
1062  * If maintenance_io_concurrency is set very low, we might have started
1063  * prefetching some but not all of the blocks referenced in the record
1064  * we're about to return. Forget about the rest of the blocks in this
1065  * record by dropping the prefetcher's reference to it.
1066  */
1067  if (record == prefetcher->record)
1068  prefetcher->record = NULL;
1069 
1070  /*
1071  * See if it's time to compute some statistics, because enough WAL has
1072  * been processed.
1073  */
1074  if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
1075  XLogPrefetcherComputeStats(prefetcher);
1076 
1077  Assert(record == prefetcher->reader->record);
1078 
1079  return &record->header;
1080 }
int maintenance_io_concurrency
Definition: bufmgr.c:157
int errmsg(const char *fmt,...)
Definition: elog.c:1072
DecodedXLogRecord * record
Definition: xlogreader.h:236
void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
static LsnReadQueue * lrq_alloc(uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)
static void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
static void lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
#define XLOGPREFETCHER_DISTANCE_MULTIPLIER
DecodedXLogRecord * XLogNextRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:325
XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state)
Definition: xlogreader.c:249

References Assert, errmsg(), DecodedXLogRecord::header, lrq_alloc(), lrq_complete_lsn(), lrq_completed(), lrq_free(), lrq_inflight(), lrq_prefetch(), DecodedXLogRecord::lsn, maintenance_io_concurrency, XLogPrefetcher::next_stats_shm_lsn, XLogPrefetcher::reader, XLogPrefetcher::reconfigure_count, XLogPrefetcher::record, XLogReaderState::record, RecoveryPrefetchEnabled, XLogPrefetcher::streaming_read, unlikely, XLogNextRecord(), XLOGPREFETCHER_DISTANCE_MULTIPLIER, XLogPrefetcherCompleteFilters(), XLogPrefetcherComputeStats(), XLogPrefetcherNextBlock(), XLogPrefetchReconfigureCount, XLogReaderHasQueuedRecordOrError(), and XLogReleasePreviousRecord().

Referenced by ReadRecord().

◆ XLogPrefetchIncrement()

static void XLogPrefetchIncrement ( pg_atomic_uint64 counter)
inlinestatic

Definition at line 351 of file xlogprefetcher.c.

352 {
354  pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
355 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:480
bool IsUnderPostmaster
Definition: globals.c:117

References AmStartupProcess, Assert, IsUnderPostmaster, pg_atomic_read_u64(), and pg_atomic_write_u64().

Referenced by XLogPrefetcherNextBlock().

◆ XLogPrefetchReconfigure()

void XLogPrefetchReconfigure ( void  )

◆ XLogPrefetchResetStats()

◆ XLogPrefetchShmemInit()

void XLogPrefetchShmemInit ( void  )

Definition at line 315 of file xlogprefetcher.c.

316 {
317  bool found;
318 
320  ShmemInitStruct("XLogPrefetchStats",
321  sizeof(XLogPrefetchStats),
322  &found);
323 
324  if (!found)
325  {
333  }
334 }
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:448
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387

References GetCurrentTimestamp(), XLogPrefetchStats::hit, pg_atomic_init_u64(), XLogPrefetchStats::prefetch, XLogPrefetchStats::reset_time, SharedStats, ShmemInitStruct(), XLogPrefetchStats::skip_fpw, XLogPrefetchStats::skip_init, XLogPrefetchStats::skip_new, and XLogPrefetchStats::skip_rep.

Referenced by CreateOrAttachShmemStructs().

◆ XLogPrefetchShmemSize()

size_t XLogPrefetchShmemSize ( void  )

Definition at line 294 of file xlogprefetcher.c.

295 {
296  return sizeof(XLogPrefetchStats);
297 }
struct XLogPrefetchStats XLogPrefetchStats

Referenced by CalculateShmemSize().

Variable Documentation

◆ recovery_prefetch

int recovery_prefetch = RECOVERY_PREFETCH_TRY

Definition at line 68 of file xlogprefetcher.c.

Referenced by assign_recovery_prefetch().

◆ SharedStats

◆ XLogPrefetchReconfigureCount

int XLogPrefetchReconfigureCount = 0
static