PostgreSQL Source Code  git master
xlogprefetcher.c File Reference
#include "postgres.h"
#include "access/xlog.h"
#include "access/xlogprefetcher.h"
#include "access/xlogreader.h"
#include "access/xlogutils.h"
#include "catalog/pg_class.h"
#include "catalog/pg_control.h"
#include "catalog/storage_xlog.h"
#include "commands/dbcommands_xlog.h"
#include "utils/fmgrprotos.h"
#include "utils/timestamp.h"
#include "funcapi.h"
#include "pgstat.h"
#include "miscadmin.h"
#include "port/atomics.h"
#include "storage/bufmgr.h"
#include "storage/shmem.h"
#include "storage/smgr.h"
#include "utils/guc_hooks.h"
#include "utils/hsearch.h"
Include dependency graph for xlogprefetcher.c:

Go to the source code of this file.

Data Structures

struct  LsnReadQueue
 
struct  XLogPrefetcher
 
struct  XLogPrefetcherFilter
 
struct  XLogPrefetchStats
 

Macros

#define XLOGPREFETCHER_STATS_DISTANCE   BLCKSZ
 
#define XLOGPREFETCHER_SEQ_WINDOW_SIZE   4
 
#define XLOGPREFETCHER_DISTANCE_MULTIPLIER   4
 
#define RecoveryPrefetchEnabled()   false
 
#define PG_STAT_GET_RECOVERY_PREFETCH_COLS   10
 

Typedefs

typedef LsnReadQueueNextStatus(* LsnReadQueueNextFun) (uintptr_t lrq_private, XLogRecPtr *lsn)
 
typedef struct LsnReadQueue LsnReadQueue
 
typedef struct XLogPrefetcherFilter XLogPrefetcherFilter
 
typedef struct XLogPrefetchStats XLogPrefetchStats
 

Enumerations

enum  LsnReadQueueNextStatus { LRQ_NEXT_NO_IO , LRQ_NEXT_IO , LRQ_NEXT_AGAIN }
 

Functions

static void XLogPrefetcherAddFilter (XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno, XLogRecPtr lsn)
 
static bool XLogPrefetcherIsFiltered (XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno)
 
static void XLogPrefetcherCompleteFilters (XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
 
static LsnReadQueueNextStatus XLogPrefetcherNextBlock (uintptr_t pgsr_private, XLogRecPtr *lsn)
 
static LsnReadQueuelrq_alloc (uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)
 
static void lrq_free (LsnReadQueue *lrq)
 
static uint32 lrq_inflight (LsnReadQueue *lrq)
 
static uint32 lrq_completed (LsnReadQueue *lrq)
 
static void lrq_prefetch (LsnReadQueue *lrq)
 
static void lrq_complete_lsn (LsnReadQueue *lrq, XLogRecPtr lsn)
 
size_t XLogPrefetchShmemSize (void)
 
void XLogPrefetchResetStats (void)
 
void XLogPrefetchShmemInit (void)
 
void XLogPrefetchReconfigure (void)
 
static void XLogPrefetchIncrement (pg_atomic_uint64 *counter)
 
XLogPrefetcherXLogPrefetcherAllocate (XLogReaderState *reader)
 
void XLogPrefetcherFree (XLogPrefetcher *prefetcher)
 
XLogReaderStateXLogPrefetcherGetReader (XLogPrefetcher *prefetcher)
 
void XLogPrefetcherComputeStats (XLogPrefetcher *prefetcher)
 
Datum pg_stat_get_recovery_prefetch (PG_FUNCTION_ARGS)
 
void XLogPrefetcherBeginRead (XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
 
XLogRecordXLogPrefetcherReadRecord (XLogPrefetcher *prefetcher, char **errmsg)
 
bool check_recovery_prefetch (int *new_value, void **extra, GucSource source)
 
void assign_recovery_prefetch (int new_value, void *extra)
 

Variables

int recovery_prefetch = RECOVERY_PREFETCH_TRY
 
static int XLogPrefetchReconfigureCount = 0
 
static XLogPrefetchStatsSharedStats
 

Macro Definition Documentation

◆ PG_STAT_GET_RECOVERY_PREFETCH_COLS

#define PG_STAT_GET_RECOVERY_PREFETCH_COLS   10

◆ RecoveryPrefetchEnabled

#define RecoveryPrefetchEnabled ( )    false

Definition at line 79 of file xlogprefetcher.c.

◆ XLOGPREFETCHER_DISTANCE_MULTIPLIER

#define XLOGPREFETCHER_DISTANCE_MULTIPLIER   4

Definition at line 66 of file xlogprefetcher.c.

◆ XLOGPREFETCHER_SEQ_WINDOW_SIZE

#define XLOGPREFETCHER_SEQ_WINDOW_SIZE   4

Definition at line 60 of file xlogprefetcher.c.

◆ XLOGPREFETCHER_STATS_DISTANCE

#define XLOGPREFETCHER_STATS_DISTANCE   BLCKSZ

Definition at line 54 of file xlogprefetcher.c.

Typedef Documentation

◆ LsnReadQueue

typedef struct LsnReadQueue LsnReadQueue

◆ LsnReadQueueNextFun

typedef LsnReadQueueNextStatus(* LsnReadQueueNextFun) (uintptr_t lrq_private, XLogRecPtr *lsn)

Definition at line 98 of file xlogprefetcher.c.

◆ XLogPrefetcherFilter

◆ XLogPrefetchStats

Enumeration Type Documentation

◆ LsnReadQueueNextStatus

Enumerator
LRQ_NEXT_NO_IO 
LRQ_NEXT_IO 
LRQ_NEXT_AGAIN 

Definition at line 87 of file xlogprefetcher.c.

88 {
LsnReadQueueNextStatus
@ LRQ_NEXT_NO_IO
@ LRQ_NEXT_IO
@ LRQ_NEXT_AGAIN

Function Documentation

◆ assign_recovery_prefetch()

void assign_recovery_prefetch ( int  new_value,
void *  extra 
)

Definition at line 1101 of file xlogprefetcher.c.

1102 {
1103  /* Reconfigure prefetching, because a setting it depends on changed. */
1104  recovery_prefetch = new_value;
1105  if (AmStartupProcess())
1107 }
#define AmStartupProcess()
Definition: miscadmin.h:452
int recovery_prefetch
void XLogPrefetchReconfigure(void)

References AmStartupProcess, recovery_prefetch, and XLogPrefetchReconfigure().

◆ check_recovery_prefetch()

bool check_recovery_prefetch ( int *  new_value,
void **  extra,
GucSource  source 
)

Definition at line 1087 of file xlogprefetcher.c.

1088 {
1089 #ifndef USE_PREFETCH
1090  if (*new_value == RECOVERY_PREFETCH_ON)
1091  {
1092  GUC_check_errdetail("recovery_prefetch is not supported on platforms that lack posix_fadvise().");
1093  return false;
1094  }
1095 #endif
1096 
1097  return true;
1098 }
#define GUC_check_errdetail
Definition: guc.h:436
@ RECOVERY_PREFETCH_ON

References GUC_check_errdetail, and RECOVERY_PREFETCH_ON.

◆ lrq_alloc()

static LsnReadQueue* lrq_alloc ( uint32  max_distance,
uint32  max_inflight,
uintptr_t  lrq_private,
LsnReadQueueNextFun  next 
)
inlinestatic

Definition at line 206 of file xlogprefetcher.c.

210 {
211  LsnReadQueue *lrq;
212  uint32 size;
213 
214  Assert(max_distance >= max_inflight);
215 
216  size = max_distance + 1; /* full ring buffer has a gap */
217  lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
218  lrq->lrq_private = lrq_private;
219  lrq->max_inflight = max_inflight;
220  lrq->size = size;
221  lrq->next = next;
222  lrq->head = 0;
223  lrq->tail = 0;
224  lrq->inflight = 0;
225  lrq->completed = 0;
226 
227  return lrq;
228 }
static int32 next
Definition: blutils.c:219
unsigned int uint32
Definition: c.h:495
Assert(fmt[strlen(fmt) - 1] !='\n')
void * palloc(Size size)
Definition: mcxt.c:1226
struct LsnReadQueue::@14 queue[FLEXIBLE_ARRAY_MEMBER]
uint32 max_inflight
LsnReadQueueNextFun next
uintptr_t lrq_private

References Assert(), LsnReadQueue::completed, LsnReadQueue::head, LsnReadQueue::inflight, LsnReadQueue::lrq_private, LsnReadQueue::max_inflight, next, LsnReadQueue::next, palloc(), LsnReadQueue::queue, LsnReadQueue::size, and LsnReadQueue::tail.

Referenced by XLogPrefetcherReadRecord().

◆ lrq_complete_lsn()

static void lrq_complete_lsn ( LsnReadQueue lrq,
XLogRecPtr  lsn 
)
inlinestatic

Definition at line 276 of file xlogprefetcher.c.

277 {
278  /*
279  * We know that LSNs before 'lsn' have been replayed, so we can now assume
280  * that any IOs that were started before then have finished.
281  */
282  while (lrq->tail != lrq->head &&
283  lrq->queue[lrq->tail].lsn < lsn)
284  {
285  if (lrq->queue[lrq->tail].io)
286  lrq->inflight--;
287  else
288  lrq->completed--;
289  lrq->tail++;
290  if (lrq->tail == lrq->size)
291  lrq->tail = 0;
292  }
294  lrq_prefetch(lrq);
295 }
XLogRecPtr lsn
#define RecoveryPrefetchEnabled()
static void lrq_prefetch(LsnReadQueue *lrq)

References LsnReadQueue::completed, LsnReadQueue::head, LsnReadQueue::inflight, LsnReadQueue::io, lrq_prefetch(), LsnReadQueue::lsn, LsnReadQueue::queue, RecoveryPrefetchEnabled, LsnReadQueue::size, and LsnReadQueue::tail.

Referenced by XLogPrefetcherReadRecord().

◆ lrq_completed()

static uint32 lrq_completed ( LsnReadQueue lrq)
inlinestatic

Definition at line 243 of file xlogprefetcher.c.

244 {
245  return lrq->completed;
246 }

References LsnReadQueue::completed.

Referenced by XLogPrefetcherComputeStats(), and XLogPrefetcherReadRecord().

◆ lrq_free()

static void lrq_free ( LsnReadQueue lrq)
inlinestatic

Definition at line 231 of file xlogprefetcher.c.

232 {
233  pfree(lrq);
234 }
void pfree(void *pointer)
Definition: mcxt.c:1456

References pfree().

Referenced by XLogPrefetcherFree(), and XLogPrefetcherReadRecord().

◆ lrq_inflight()

static uint32 lrq_inflight ( LsnReadQueue lrq)
inlinestatic

Definition at line 237 of file xlogprefetcher.c.

238 {
239  return lrq->inflight;
240 }

References LsnReadQueue::inflight.

Referenced by XLogPrefetcherComputeStats(), and XLogPrefetcherReadRecord().

◆ lrq_prefetch()

static void lrq_prefetch ( LsnReadQueue lrq)
inlinestatic

Definition at line 249 of file xlogprefetcher.c.

250 {
251  /* Try to start as many IOs as we can within our limits. */
252  while (lrq->inflight < lrq->max_inflight &&
253  lrq->inflight + lrq->completed < lrq->size - 1)
254  {
255  Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
256  switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
257  {
258  case LRQ_NEXT_AGAIN:
259  return;
260  case LRQ_NEXT_IO:
261  lrq->queue[lrq->head].io = true;
262  lrq->inflight++;
263  break;
264  case LRQ_NEXT_NO_IO:
265  lrq->queue[lrq->head].io = false;
266  lrq->completed++;
267  break;
268  }
269  lrq->head++;
270  if (lrq->head == lrq->size)
271  lrq->head = 0;
272  }
273 }

References Assert(), LsnReadQueue::completed, LsnReadQueue::head, LsnReadQueue::inflight, LsnReadQueue::io, LRQ_NEXT_AGAIN, LRQ_NEXT_IO, LRQ_NEXT_NO_IO, LsnReadQueue::lrq_private, LsnReadQueue::lsn, LsnReadQueue::max_inflight, LsnReadQueue::next, LsnReadQueue::queue, LsnReadQueue::size, and LsnReadQueue::tail.

Referenced by lrq_complete_lsn(), and XLogPrefetcherReadRecord().

◆ pg_stat_get_recovery_prefetch()

Datum pg_stat_get_recovery_prefetch ( PG_FUNCTION_ARGS  )

Definition at line 830 of file xlogprefetcher.c.

831 {
832 #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
833  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
836 
837  InitMaterializedSRF(fcinfo, 0);
838 
839  for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
840  nulls[i] = false;
841 
852  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
853 
854  return (Datum) 0;
855 }
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:424
static Datum values[MAXATTR]
Definition: bootstrap.c:156
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1790
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
int i
Definition: isn.c:73
uintptr_t Datum
Definition: postgres.h:64
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
TupleDesc setDesc
Definition: execnodes.h:334
Tuplestorestate * setResult
Definition: execnodes.h:333
pg_atomic_uint64 skip_fpw
pg_atomic_uint64 skip_init
pg_atomic_uint64 reset_time
pg_atomic_uint64 hit
pg_atomic_uint64 prefetch
pg_atomic_uint64 skip_rep
pg_atomic_uint64 skip_new
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static XLogPrefetchStats * SharedStats
#define PG_STAT_GET_RECOVERY_PREFETCH_COLS

References XLogPrefetchStats::block_distance, XLogPrefetchStats::hit, i, InitMaterializedSRF(), Int32GetDatum(), Int64GetDatum(), XLogPrefetchStats::io_depth, pg_atomic_read_u64(), PG_STAT_GET_RECOVERY_PREFETCH_COLS, XLogPrefetchStats::prefetch, XLogPrefetchStats::reset_time, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SharedStats, XLogPrefetchStats::skip_fpw, XLogPrefetchStats::skip_init, XLogPrefetchStats::skip_new, XLogPrefetchStats::skip_rep, TimestampTzGetDatum(), tuplestore_putvalues(), values, and XLogPrefetchStats::wal_distance.

◆ XLogPrefetcherAddFilter()

static void XLogPrefetcherAddFilter ( XLogPrefetcher prefetcher,
RelFileLocator  rlocator,
BlockNumber  blockno,
XLogRecPtr  lsn 
)
inlinestatic

Definition at line 862 of file xlogprefetcher.c.

864 {
865  XLogPrefetcherFilter *filter;
866  bool found;
867 
868  filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
869  if (!found)
870  {
871  /*
872  * Don't allow any prefetching of this block or higher until replayed.
873  */
874  filter->filter_until_replayed = lsn;
875  filter->filter_from_block = blockno;
876  dlist_push_head(&prefetcher->filter_queue, &filter->link);
877  }
878  else
879  {
880  /*
881  * We were already filtering this rlocator. Extend the filter's
882  * lifetime to cover this WAL record, but leave the lower of the block
883  * numbers there because we don't want to have to track individual
884  * blocks.
885  */
886  filter->filter_until_replayed = lsn;
887  dlist_delete(&filter->link);
888  dlist_push_head(&prefetcher->filter_queue, &filter->link);
889  filter->filter_from_block = Min(filter->filter_from_block, blockno);
890  }
891 }
#define Min(x, y)
Definition: c.h:993
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
@ HASH_ENTER
Definition: hsearch.h:114
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static void dlist_push_head(dlist_head *head, dlist_node *node)
Definition: ilist.h:347
XLogRecPtr filter_until_replayed
BlockNumber filter_from_block
dlist_head filter_queue

References dlist_delete(), dlist_push_head(), XLogPrefetcherFilter::filter_from_block, XLogPrefetcher::filter_queue, XLogPrefetcher::filter_table, XLogPrefetcherFilter::filter_until_replayed, HASH_ENTER, hash_search(), XLogPrefetcherFilter::link, and Min.

Referenced by XLogPrefetcherNextBlock().

◆ XLogPrefetcherAllocate()

XLogPrefetcher* XLogPrefetcherAllocate ( XLogReaderState reader)

Definition at line 366 of file xlogprefetcher.c.

367 {
368  XLogPrefetcher *prefetcher;
369  static HASHCTL hash_table_ctl = {
370  .keysize = sizeof(RelFileLocator),
371  .entrysize = sizeof(XLogPrefetcherFilter)
372  };
373 
374  prefetcher = palloc0(sizeof(XLogPrefetcher));
375 
376  prefetcher->reader = reader;
377  prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
378  &hash_table_ctl,
380  dlist_init(&prefetcher->filter_queue);
381 
384  SharedStats->io_depth = 0;
385 
386  /* First usage will cause streaming_read to be allocated. */
388 
389  return prefetcher;
390 }
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * palloc0(Size size)
Definition: mcxt.c:1257
struct RelFileLocator RelFileLocator
Size keysize
Definition: hsearch.h:75
XLogReaderState * reader
static int XLogPrefetchReconfigureCount

References XLogPrefetchStats::block_distance, dlist_init(), XLogPrefetcher::filter_queue, XLogPrefetcher::filter_table, HASH_BLOBS, hash_create(), HASH_ELEM, XLogPrefetchStats::io_depth, HASHCTL::keysize, palloc0(), XLogPrefetcher::reader, XLogPrefetcher::reconfigure_count, SharedStats, XLogPrefetchStats::wal_distance, and XLogPrefetchReconfigureCount.

Referenced by InitWalRecovery().

◆ XLogPrefetcherBeginRead()

void XLogPrefetcherBeginRead ( XLogPrefetcher prefetcher,
XLogRecPtr  recPtr 
)

Definition at line 968 of file xlogprefetcher.c.

969 {
970  /* This will forget about any in-flight IO. */
971  prefetcher->reconfigure_count--;
972 
973  /* Book-keeping to avoid readahead on first read. */
974  prefetcher->begin_ptr = recPtr;
975 
976  prefetcher->no_readahead_until = 0;
977 
978  /* This will forget about any queued up records in the decoder. */
979  XLogBeginRead(prefetcher->reader, recPtr);
980 }
XLogRecPtr no_readahead_until
XLogRecPtr begin_ptr
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:248

References XLogPrefetcher::begin_ptr, XLogPrefetcher::no_readahead_until, XLogPrefetcher::reader, XLogPrefetcher::reconfigure_count, and XLogBeginRead().

Referenced by FinishWalRecovery(), InitWalRecovery(), PerformWalRecovery(), and ReadCheckpointRecord().

◆ XLogPrefetcherCompleteFilters()

static void XLogPrefetcherCompleteFilters ( XLogPrefetcher prefetcher,
XLogRecPtr  replaying_lsn 
)
inlinestatic

Definition at line 900 of file xlogprefetcher.c.

901 {
902  while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
903  {
905  link,
906  &prefetcher->filter_queue);
907 
908  if (filter->filter_until_replayed >= replaying_lsn)
909  break;
910 
911  dlist_delete(&filter->link);
912  hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
913  }
914 }
#define unlikely(x)
Definition: c.h:300
@ HASH_REMOVE
Definition: hsearch.h:115
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:612
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336

References dlist_delete(), dlist_is_empty(), dlist_tail_element, XLogPrefetcher::filter_queue, XLogPrefetcher::filter_table, XLogPrefetcherFilter::filter_until_replayed, HASH_REMOVE, hash_search(), XLogPrefetcherFilter::link, and unlikely.

Referenced by XLogPrefetcherReadRecord().

◆ XLogPrefetcherComputeStats()

void XLogPrefetcherComputeStats ( XLogPrefetcher prefetcher)

Definition at line 416 of file xlogprefetcher.c.

417 {
418  uint32 io_depth;
419  uint32 completed;
420  int64 wal_distance;
421 
422 
423  /* How far ahead of replay are we now? */
424  if (prefetcher->reader->decode_queue_tail)
425  {
426  wal_distance =
427  prefetcher->reader->decode_queue_tail->lsn -
428  prefetcher->reader->decode_queue_head->lsn;
429  }
430  else
431  {
432  wal_distance = 0;
433  }
434 
435  /* How many IOs are currently in flight and completed? */
436  io_depth = lrq_inflight(prefetcher->streaming_read);
437  completed = lrq_completed(prefetcher->streaming_read);
438 
439  /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
440  SharedStats->io_depth = io_depth;
441  SharedStats->block_distance = io_depth + completed;
442  SharedStats->wal_distance = wal_distance;
443 
444  prefetcher->next_stats_shm_lsn =
446 }
XLogRecPtr lsn
Definition: xlogreader.h:164
LsnReadQueue * streaming_read
XLogRecPtr next_stats_shm_lsn
DecodedXLogRecord * decode_queue_head
Definition: xlogreader.h:260
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206
DecodedXLogRecord * decode_queue_tail
Definition: xlogreader.h:261
static uint32 lrq_completed(LsnReadQueue *lrq)
static uint32 lrq_inflight(LsnReadQueue *lrq)
#define XLOGPREFETCHER_STATS_DISTANCE

References XLogPrefetchStats::block_distance, XLogReaderState::decode_queue_head, XLogReaderState::decode_queue_tail, XLogPrefetchStats::io_depth, lrq_completed(), lrq_inflight(), DecodedXLogRecord::lsn, XLogPrefetcher::next_stats_shm_lsn, XLogPrefetcher::reader, XLogReaderState::ReadRecPtr, SharedStats, XLogPrefetcher::streaming_read, XLogPrefetchStats::wal_distance, and XLOGPREFETCHER_STATS_DISTANCE.

Referenced by ShutdownWalRecovery(), WaitForWALToBecomeAvailable(), and XLogPrefetcherReadRecord().

◆ XLogPrefetcherFree()

void XLogPrefetcherFree ( XLogPrefetcher prefetcher)

Definition at line 396 of file xlogprefetcher.c.

397 {
398  lrq_free(prefetcher->streaming_read);
399  hash_destroy(prefetcher->filter_table);
400  pfree(prefetcher);
401 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:863
static void lrq_free(LsnReadQueue *lrq)

References XLogPrefetcher::filter_table, hash_destroy(), lrq_free(), pfree(), and XLogPrefetcher::streaming_read.

Referenced by ShutdownWalRecovery().

◆ XLogPrefetcherGetReader()

XLogReaderState* XLogPrefetcherGetReader ( XLogPrefetcher prefetcher)

Definition at line 407 of file xlogprefetcher.c.

408 {
409  return prefetcher->reader;
410 }

References XLogPrefetcher::reader.

Referenced by ReadRecord().

◆ XLogPrefetcherIsFiltered()

static bool XLogPrefetcherIsFiltered ( XLogPrefetcher prefetcher,
RelFileLocator  rlocator,
BlockNumber  blockno 
)
inlinestatic

Definition at line 920 of file xlogprefetcher.c.

922 {
923  /*
924  * Test for empty queue first, because we expect it to be empty most of
925  * the time and we can avoid the hash table lookup in that case.
926  */
927  if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
928  {
929  XLogPrefetcherFilter *filter;
930 
931  /* See if the block range is filtered. */
932  filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
933  if (filter && filter->filter_from_block <= blockno)
934  {
935 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
936  elog(XLOGPREFETCHER_DEBUG_LEVEL,
937  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
938  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
940  filter->filter_from_block);
941 #endif
942  return true;
943  }
944 
945  /* See if the whole database is filtered. */
946  rlocator.relNumber = InvalidRelFileNumber;
947  rlocator.spcOid = InvalidOid;
948  filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
949  if (filter)
950  {
951 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
952  elog(XLOGPREFETCHER_DEBUG_LEVEL,
953  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
954  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
956 #endif
957  return true;
958  }
959  }
960 
961  return false;
962 }
@ HASH_FIND
Definition: hsearch.h:113
#define InvalidOid
Definition: postgres_ext.h:36
#define InvalidRelFileNumber
Definition: relpath.h:26
RelFileNumber relNumber
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References RelFileLocator::dbOid, dlist_is_empty(), elog(), XLogPrefetcherFilter::filter_from_block, XLogPrefetcher::filter_queue, XLogPrefetcher::filter_table, XLogPrefetcherFilter::filter_until_replayed, HASH_FIND, hash_search(), InvalidOid, InvalidRelFileNumber, LSN_FORMAT_ARGS, RelFileLocator::relNumber, RelFileLocator::spcOid, and unlikely.

Referenced by XLogPrefetcherNextBlock().

◆ XLogPrefetcherNextBlock()

static LsnReadQueueNextStatus XLogPrefetcherNextBlock ( uintptr_t  pgsr_private,
XLogRecPtr lsn 
)
static

Definition at line 465 of file xlogprefetcher.c.

466 {
467  XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
468  XLogReaderState *reader = prefetcher->reader;
469  XLogRecPtr replaying_lsn = reader->ReadRecPtr;
470 
471  /*
472  * We keep track of the record and block we're up to between calls with
473  * prefetcher->record and prefetcher->next_block_id.
474  */
475  for (;;)
476  {
477  DecodedXLogRecord *record;
478 
479  /* Try to read a new future record, if we don't already have one. */
480  if (prefetcher->record == NULL)
481  {
482  bool nonblocking;
483 
484  /*
485  * If there are already records or an error queued up that could
486  * be replayed, we don't want to block here. Otherwise, it's OK
487  * to block waiting for more data: presumably the caller has
488  * nothing else to do.
489  */
490  nonblocking = XLogReaderHasQueuedRecordOrError(reader);
491 
492  /* Readahead is disabled until we replay past a certain point. */
493  if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
494  return LRQ_NEXT_AGAIN;
495 
496  record = XLogReadAhead(prefetcher->reader, nonblocking);
497  if (record == NULL)
498  {
499  /*
500  * We can't read any more, due to an error or lack of data in
501  * nonblocking mode. Don't try to read ahead again until
502  * we've replayed everything already decoded.
503  */
504  if (nonblocking && prefetcher->reader->decode_queue_tail)
505  prefetcher->no_readahead_until =
506  prefetcher->reader->decode_queue_tail->lsn;
507 
508  return LRQ_NEXT_AGAIN;
509  }
510 
511  /*
512  * If prefetching is disabled, we don't need to analyze the record
513  * or issue any prefetches. We just need to cause one record to
514  * be decoded.
515  */
517  {
518  *lsn = InvalidXLogRecPtr;
519  return LRQ_NEXT_NO_IO;
520  }
521 
522  /* We have a new record to process. */
523  prefetcher->record = record;
524  prefetcher->next_block_id = 0;
525  }
526  else
527  {
528  /* Continue to process from last call, or last loop. */
529  record = prefetcher->record;
530  }
531 
532  /*
533  * Check for operations that require us to filter out block ranges, or
534  * pause readahead completely.
535  */
536  if (replaying_lsn < record->lsn)
537  {
538  uint8 rmid = record->header.xl_rmid;
539  uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK;
540 
541  if (rmid == RM_XLOG_ID)
542  {
543  if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
544  record_type == XLOG_END_OF_RECOVERY)
545  {
546  /*
547  * These records might change the TLI. Avoid potential
548  * bugs if we were to allow "read TLI" and "replay TLI" to
549  * differ without more analysis.
550  */
551  prefetcher->no_readahead_until = record->lsn;
552 
553 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
554  elog(XLOGPREFETCHER_DEBUG_LEVEL,
555  "suppressing all readahead until %X/%X is replayed due to possible TLI change",
556  LSN_FORMAT_ARGS(record->lsn));
557 #endif
558 
559  /* Fall through so we move past this record. */
560  }
561  }
562  else if (rmid == RM_DBASE_ID)
563  {
564  /*
565  * When databases are created with the file-copy strategy,
566  * there are no WAL records to tell us about the creation of
567  * individual relations.
568  */
569  if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
570  {
573  RelFileLocator rlocator =
575 
576  /*
577  * Don't try to prefetch anything in this database until
578  * it has been created, or we might confuse the blocks of
579  * different generations, if a database OID or
580  * relfilenumber is reused. It's also more efficient than
581  * discovering that relations don't exist on disk yet with
582  * ENOENT errors.
583  */
584  XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
585 
586 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
587  elog(XLOGPREFETCHER_DEBUG_LEVEL,
588  "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
589  rlocator.dbOid,
590  LSN_FORMAT_ARGS(record->lsn));
591 #endif
592  }
593  }
594  else if (rmid == RM_SMGR_ID)
595  {
596  if (record_type == XLOG_SMGR_CREATE)
597  {
598  xl_smgr_create *xlrec = (xl_smgr_create *)
599  record->main_data;
600 
601  if (xlrec->forkNum == MAIN_FORKNUM)
602  {
603  /*
604  * Don't prefetch anything for this whole relation
605  * until it has been created. Otherwise we might
606  * confuse the blocks of different generations, if a
607  * relfilenumber is reused. This also avoids the need
608  * to discover the problem via extra syscalls that
609  * report ENOENT.
610  */
611  XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
612  record->lsn);
613 
614 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
615  elog(XLOGPREFETCHER_DEBUG_LEVEL,
616  "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
617  xlrec->rlocator.spcOid,
618  xlrec->rlocator.dbOid,
619  xlrec->rlocator.relNumber,
620  LSN_FORMAT_ARGS(record->lsn));
621 #endif
622  }
623  }
624  else if (record_type == XLOG_SMGR_TRUNCATE)
625  {
627  record->main_data;
628 
629  /*
630  * Don't consider prefetching anything in the truncated
631  * range until the truncation has been performed.
632  */
633  XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
634  xlrec->blkno,
635  record->lsn);
636 
637 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
638  elog(XLOGPREFETCHER_DEBUG_LEVEL,
639  "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
640  xlrec->rlocator.spcOid,
641  xlrec->rlocator.dbOid,
642  xlrec->rlocator.relNumber,
643  xlrec->blkno,
644  LSN_FORMAT_ARGS(record->lsn));
645 #endif
646  }
647  }
648  }
649 
650  /* Scan the block references, starting where we left off last time. */
651  while (prefetcher->next_block_id <= record->max_block_id)
652  {
653  int block_id = prefetcher->next_block_id++;
654  DecodedBkpBlock *block = &record->blocks[block_id];
655  SMgrRelation reln;
656  PrefetchBufferResult result;
657 
658  if (!block->in_use)
659  continue;
660 
662 
663  /*
664  * Record the LSN of this record. When it's replayed,
665  * LsnReadQueue will consider any IOs submitted for earlier LSNs
666  * to be finished.
667  */
668  *lsn = record->lsn;
669 
670  /* We don't try to prefetch anything but the main fork for now. */
671  if (block->forknum != MAIN_FORKNUM)
672  {
673  return LRQ_NEXT_NO_IO;
674  }
675 
676  /*
677  * If there is a full page image attached, we won't be reading the
678  * page, so don't bother trying to prefetch.
679  */
680  if (block->has_image)
681  {
683  return LRQ_NEXT_NO_IO;
684  }
685 
686  /* There is no point in reading a page that will be zeroed. */
687  if (block->flags & BKPBLOCK_WILL_INIT)
688  {
690  return LRQ_NEXT_NO_IO;
691  }
692 
693  /* Should we skip prefetching this block due to a filter? */
694  if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
695  {
697  return LRQ_NEXT_NO_IO;
698  }
699 
700  /* There is no point in repeatedly prefetching the same block. */
701  for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
702  {
703  if (block->blkno == prefetcher->recent_block[i] &&
704  RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
705  {
706  /*
707  * XXX If we also remembered where it was, we could set
708  * recent_buffer so that recovery could skip smgropen()
709  * and a buffer table lookup.
710  */
712  return LRQ_NEXT_NO_IO;
713  }
714  }
715  prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
716  prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
717  prefetcher->recent_idx =
718  (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
719 
720  /*
721  * We could try to have a fast path for repeated references to the
722  * same relation (with some scheme to handle invalidations
723  * safely), but for now we'll call smgropen() every time.
724  */
725  reln = smgropen(block->rlocator, InvalidBackendId);
726 
727  /*
728  * If the relation file doesn't exist on disk, for example because
729  * we're replaying after a crash and the file will be created and
730  * then unlinked by WAL that hasn't been replayed yet, suppress
731  * further prefetching in the relation until this record is
732  * replayed.
733  */
734  if (!smgrexists(reln, MAIN_FORKNUM))
735  {
736 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
737  elog(XLOGPREFETCHER_DEBUG_LEVEL,
738  "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
742  LSN_FORMAT_ARGS(record->lsn));
743 #endif
744  XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
745  record->lsn);
747  return LRQ_NEXT_NO_IO;
748  }
749 
750  /*
751  * If the relation isn't big enough to contain the referenced
752  * block yet, suppress prefetching of this block and higher until
753  * this record is replayed.
754  */
755  if (block->blkno >= smgrnblocks(reln, block->forknum))
756  {
757 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
758  elog(XLOGPREFETCHER_DEBUG_LEVEL,
759  "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
763  block->blkno,
764  LSN_FORMAT_ARGS(record->lsn));
765 #endif
766  XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
767  record->lsn);
769  return LRQ_NEXT_NO_IO;
770  }
771 
772  /* Try to initiate prefetching. */
773  result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
774  if (BufferIsValid(result.recent_buffer))
775  {
776  /* Cache hit, nothing to do. */
778  block->prefetch_buffer = result.recent_buffer;
779  return LRQ_NEXT_NO_IO;
780  }
781  else if (result.initiated_io)
782  {
783  /* Cache miss, I/O (presumably) started. */
786  return LRQ_NEXT_IO;
787  }
788  else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
789  {
790  /*
791  * This shouldn't be possible, because we already determined
792  * that the relation exists on disk and is big enough.
793  * Something is wrong with the cache invalidation for
794  * smgrexists(), smgrnblocks(), or the file was unlinked or
795  * truncated beneath our feet?
796  */
797  elog(ERROR,
798  "could not prefetch relation %u/%u/%u block %u",
802  block->blkno);
803  }
804  }
805 
806  /*
807  * Several callsites need to be able to read exactly one record
808  * without any internal readahead. Examples: xlog.c reading
809  * checkpoint records with emode set to PANIC, which might otherwise
810  * cause XLogPageRead() to panic on some future page, and xlog.c
811  * determining where to start writing WAL next, which depends on the
812  * contents of the reader's internal buffer after reading one record.
813  * Therefore, don't even think about prefetching until the first
814  * record after XLogPrefetcherBeginRead() has been consumed.
815  */
816  if (prefetcher->reader->decode_queue_tail &&
817  prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
818  return LRQ_NEXT_AGAIN;
819 
820  /* Advance to the next record. */
821  prefetcher->record = NULL;
822  }
823  pg_unreachable();
824 }
#define InvalidBackendId
Definition: backendid.h:23
#define InvalidBuffer
Definition: buf.h:25
PrefetchBufferResult PrefetchSharedBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, BlockNumber blockNum)
Definition: bufmgr.c:511
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:301
#define pg_unreachable()
Definition: c.h:285
unsigned char uint8
Definition: c.h:493
#define XLOG_DBASE_CREATE_FILE_COPY
#define ERROR
Definition: elog.h:39
int io_direct_flags
Definition: fd.c:168
#define IO_DIRECT_DATA
Definition: fd.h:52
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
#define XLOG_CHECKPOINT_SHUTDOWN
Definition: pg_control.h:67
#define XLOG_END_OF_RECOVERY
Definition: pg_control.h:76
#define RelFileLocatorEquals(locator1, locator2)
@ MAIN_FORKNUM
Definition: relpath.h:50
BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:609
SMgrRelation smgropen(RelFileLocator rlocator, BackendId backend)
Definition: smgr.c:150
bool smgrexists(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:251
#define XLOG_SMGR_CREATE
Definition: storage_xlog.h:30
#define XLOG_SMGR_TRUNCATE
Definition: storage_xlog.h:31
Buffer prefetch_buffer
Definition: xlogreader.h:130
RelFileLocator rlocator
Definition: xlogreader.h:125
BlockNumber blkno
Definition: xlogreader.h:127
ForkNumber forknum
Definition: xlogreader.h:126
XLogRecord header
Definition: xlogreader.h:166
DecodedBkpBlock blocks[FLEXIBLE_ARRAY_MEMBER]
Definition: xlogreader.h:172
Buffer recent_buffer
Definition: bufmgr.h:59
RelFileLocator locator
RelFileLocatorBackend smgr_rlocator
Definition: smgr.h:42
RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
DecodedXLogRecord * record
BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
uint8 xl_info
Definition: xlogrecord.h:46
RmgrId xl_rmid
Definition: xlogrecord.h:47
ForkNumber forkNum
Definition: storage_xlog.h:36
RelFileLocator rlocator
Definition: storage_xlog.h:35
RelFileLocator rlocator
Definition: storage_xlog.h:49
BlockNumber blkno
Definition: storage_xlog.h:48
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno)
static void XLogPrefetchIncrement(pg_atomic_uint64 *counter)
#define XLOGPREFETCHER_SEQ_WINDOW_SIZE
static void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno, XLogRecPtr lsn)
DecodedXLogRecord * XLogReadAhead(XLogReaderState *state, bool nonblocking)
Definition: xlogreader.c:971
static bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
Definition: xlogreader.h:325
#define BKPBLOCK_WILL_INIT
Definition: xlogrecord.h:199
#define XLR_INFO_MASK
Definition: xlogrecord.h:62

References Assert(), XLogPrefetcher::begin_ptr, BKPBLOCK_WILL_INIT, DecodedBkpBlock::blkno, xl_smgr_truncate::blkno, BufferIsValid(), xl_dbase_create_file_copy_rec::db_id, RelFileLocator::dbOid, XLogReaderState::decode_queue_tail, elog(), ERROR, DecodedBkpBlock::flags, DecodedBkpBlock::forknum, xl_smgr_create::forkNum, DecodedBkpBlock::has_image, DecodedXLogRecord::header, XLogPrefetchStats::hit, i, if(), DecodedBkpBlock::in_use, PrefetchBufferResult::initiated_io, InvalidBackendId, InvalidBuffer, InvalidOid, InvalidRelFileNumber, InvalidXLogRecPtr, IO_DIRECT_DATA, io_direct_flags, RelFileLocatorBackend::locator, LRQ_NEXT_AGAIN, LRQ_NEXT_IO, LRQ_NEXT_NO_IO, DecodedXLogRecord::lsn, LSN_FORMAT_ARGS, DecodedXLogRecord::main_data, MAIN_FORKNUM, XLogPrefetcher::next_block_id, XLogPrefetcher::no_readahead_until, XLogPrefetchStats::prefetch, DecodedBkpBlock::prefetch_buffer, PrefetchSharedBuffer(), XLogPrefetcher::reader, XLogReaderState::ReadRecPtr, XLogPrefetcher::recent_block, PrefetchBufferResult::recent_buffer, XLogPrefetcher::recent_idx, XLogPrefetcher::recent_rlocator, XLogPrefetcher::record, RecoveryPrefetchEnabled, RelFileLocatorEquals, RelFileLocator::relNumber, DecodedBkpBlock::rlocator, xl_smgr_create::rlocator, xl_smgr_truncate::rlocator, SharedStats, XLogPrefetchStats::skip_fpw, XLogPrefetchStats::skip_init, XLogPrefetchStats::skip_new, XLogPrefetchStats::skip_rep, SMgrRelationData::smgr_rlocator, smgrexists(), smgrnblocks(), smgropen(), RelFileLocator::spcOid, XLogRecord::xl_info, XLogRecord::xl_rmid, XLOG_CHECKPOINT_SHUTDOWN, XLOG_DBASE_CREATE_FILE_COPY, XLOG_END_OF_RECOVERY, XLOG_SMGR_CREATE, XLOG_SMGR_TRUNCATE, XLOGPREFETCHER_SEQ_WINDOW_SIZE, XLogPrefetcherAddFilter(), XLogPrefetcherIsFiltered(), XLogPrefetchIncrement(), XLogReadAhead(), XLogReaderHasQueuedRecordOrError(), and XLR_INFO_MASK.

Referenced by XLogPrefetcherReadRecord().

◆ XLogPrefetcherReadRecord()

XLogRecord* XLogPrefetcherReadRecord ( XLogPrefetcher prefetcher,
char **  errmsg 
)

Definition at line 987 of file xlogprefetcher.c.

988 {
989  DecodedXLogRecord *record;
990  XLogRecPtr replayed_up_to;
991 
992  /*
993  * See if it's time to reset the prefetching machinery, because a relevant
994  * GUC was changed.
995  */
997  {
998  uint32 max_distance;
999  uint32 max_inflight;
1000 
1001  if (prefetcher->streaming_read)
1002  lrq_free(prefetcher->streaming_read);
1003 
1005  {
1007  max_inflight = maintenance_io_concurrency;
1008  max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
1009  }
1010  else
1011  {
1012  max_inflight = 1;
1013  max_distance = 1;
1014  }
1015 
1016  prefetcher->streaming_read = lrq_alloc(max_distance,
1017  max_inflight,
1018  (uintptr_t) prefetcher,
1020 
1022  }
1023 
1024  /*
1025  * Release last returned record, if there is one, as it's now been
1026  * replayed.
1027  */
1028  replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
1029 
1030  /*
1031  * Can we drop any filters yet? If we were waiting for a relation to be
1032  * created or extended, it is now OK to access blocks in the covered
1033  * range.
1034  */
1035  XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
1036 
1037  /*
1038  * All IO initiated by earlier WAL is now completed. This might trigger
1039  * further prefetching.
1040  */
1041  lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1042 
1043  /*
1044  * If there's nothing queued yet, then start prefetching to cause at least
1045  * one record to be queued.
1046  */
1047  if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1048  {
1049  Assert(lrq_inflight(prefetcher->streaming_read) == 0);
1050  Assert(lrq_completed(prefetcher->streaming_read) == 0);
1051  lrq_prefetch(prefetcher->streaming_read);
1052  }
1053 
1054  /* Read the next record. */
1055  record = XLogNextRecord(prefetcher->reader, errmsg);
1056  if (!record)
1057  return NULL;
1058 
1059  /*
1060  * The record we just got is the "current" one, for the benefit of the
1061  * XLogRecXXX() macros.
1062  */
1063  Assert(record == prefetcher->reader->record);
1064 
1065  /*
1066  * If maintenance_io_concurrency is set very low, we might have started
1067  * prefetching some but not all of the blocks referenced in the record
1068  * we're about to return. Forget about the rest of the blocks in this
1069  * record by dropping the prefetcher's reference to it.
1070  */
1071  if (record == prefetcher->record)
1072  prefetcher->record = NULL;
1073 
1074  /*
1075  * See if it's time to compute some statistics, because enough WAL has
1076  * been processed.
1077  */
1078  if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
1079  XLogPrefetcherComputeStats(prefetcher);
1080 
1081  Assert(record == prefetcher->reader->record);
1082 
1083  return &record->header;
1084 }
int maintenance_io_concurrency
Definition: bufmgr.c:153
int errmsg(const char *fmt,...)
Definition: elog.c:1069
DecodedXLogRecord * record
Definition: xlogreader.h:236
void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
static LsnReadQueue * lrq_alloc(uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)
static void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
static void lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
#define XLOGPREFETCHER_DISTANCE_MULTIPLIER
DecodedXLogRecord * XLogNextRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:342
XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state)
Definition: xlogreader.c:266

References Assert(), errmsg(), DecodedXLogRecord::header, lrq_alloc(), lrq_complete_lsn(), lrq_completed(), lrq_free(), lrq_inflight(), lrq_prefetch(), DecodedXLogRecord::lsn, maintenance_io_concurrency, XLogPrefetcher::next_stats_shm_lsn, XLogPrefetcher::reader, XLogPrefetcher::reconfigure_count, XLogPrefetcher::record, XLogReaderState::record, RecoveryPrefetchEnabled, XLogPrefetcher::streaming_read, unlikely, XLogNextRecord(), XLOGPREFETCHER_DISTANCE_MULTIPLIER, XLogPrefetcherCompleteFilters(), XLogPrefetcherComputeStats(), XLogPrefetcherNextBlock(), XLogPrefetchReconfigureCount, XLogReaderHasQueuedRecordOrError(), and XLogReleasePreviousRecord().

Referenced by ReadRecord().

◆ XLogPrefetchIncrement()

static void XLogPrefetchIncrement ( pg_atomic_uint64 counter)
inlinestatic

Definition at line 355 of file xlogprefetcher.c.

356 {
358  pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
359 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:433
bool IsUnderPostmaster
Definition: globals.c:113

References AmStartupProcess, Assert(), IsUnderPostmaster, pg_atomic_read_u64(), and pg_atomic_write_u64().

Referenced by XLogPrefetcherNextBlock().

◆ XLogPrefetchReconfigure()

void XLogPrefetchReconfigure ( void  )

◆ XLogPrefetchResetStats()

◆ XLogPrefetchShmemInit()

void XLogPrefetchShmemInit ( void  )

Definition at line 319 of file xlogprefetcher.c.

320 {
321  bool found;
322 
324  ShmemInitStruct("XLogPrefetchStats",
325  sizeof(XLogPrefetchStats),
326  &found);
327 
328  if (!found)
329  {
337  }
338 }
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:410
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396

References GetCurrentTimestamp(), XLogPrefetchStats::hit, pg_atomic_init_u64(), XLogPrefetchStats::prefetch, XLogPrefetchStats::reset_time, SharedStats, ShmemInitStruct(), XLogPrefetchStats::skip_fpw, XLogPrefetchStats::skip_init, XLogPrefetchStats::skip_new, and XLogPrefetchStats::skip_rep.

Referenced by CreateSharedMemoryAndSemaphores().

◆ XLogPrefetchShmemSize()

size_t XLogPrefetchShmemSize ( void  )

Definition at line 298 of file xlogprefetcher.c.

299 {
300  return sizeof(XLogPrefetchStats);
301 }
struct XLogPrefetchStats XLogPrefetchStats

Referenced by CalculateShmemSize().

Variable Documentation

◆ recovery_prefetch

int recovery_prefetch = RECOVERY_PREFETCH_TRY

Definition at line 72 of file xlogprefetcher.c.

Referenced by assign_recovery_prefetch().

◆ SharedStats

◆ XLogPrefetchReconfigureCount

int XLogPrefetchReconfigureCount = 0
static