PostgreSQL Source Code  git master
xlogprefetcher.c File Reference
#include "postgres.h"
#include "access/xlog.h"
#include "access/xlogprefetcher.h"
#include "access/xlogreader.h"
#include "access/xlogutils.h"
#include "catalog/pg_class.h"
#include "catalog/pg_control.h"
#include "catalog/storage_xlog.h"
#include "commands/dbcommands_xlog.h"
#include "utils/fmgrprotos.h"
#include "utils/timestamp.h"
#include "funcapi.h"
#include "pgstat.h"
#include "miscadmin.h"
#include "port/atomics.h"
#include "storage/bufmgr.h"
#include "storage/shmem.h"
#include "storage/smgr.h"
#include "utils/guc.h"
#include "utils/hsearch.h"
Include dependency graph for xlogprefetcher.c:

Go to the source code of this file.

Data Structures

struct  LsnReadQueue
 
struct  XLogPrefetcher
 
struct  XLogPrefetcherFilter
 
struct  XLogPrefetchStats
 

Macros

#define XLOGPREFETCHER_STATS_DISTANCE   BLCKSZ
 
#define XLOGPREFETCHER_SEQ_WINDOW_SIZE   4
 
#define XLOGPREFETCHER_DISTANCE_MULTIPLIER   4
 
#define RecoveryPrefetchEnabled()   false
 
#define PG_STAT_GET_RECOVERY_PREFETCH_COLS   10
 

Typedefs

typedef LsnReadQueueNextStatus(* LsnReadQueueNextFun) (uintptr_t lrq_private, XLogRecPtr *lsn)
 
typedef struct LsnReadQueue LsnReadQueue
 
typedef struct XLogPrefetcherFilter XLogPrefetcherFilter
 
typedef struct XLogPrefetchStats XLogPrefetchStats
 

Enumerations

enum  LsnReadQueueNextStatus { LRQ_NEXT_NO_IO , LRQ_NEXT_IO , LRQ_NEXT_AGAIN }
 

Functions

static void XLogPrefetcherAddFilter (XLogPrefetcher *prefetcher, RelFileNode rnode, BlockNumber blockno, XLogRecPtr lsn)
 
static bool XLogPrefetcherIsFiltered (XLogPrefetcher *prefetcher, RelFileNode rnode, BlockNumber blockno)
 
static void XLogPrefetcherCompleteFilters (XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
 
static LsnReadQueueNextStatus XLogPrefetcherNextBlock (uintptr_t pgsr_private, XLogRecPtr *lsn)
 
static LsnReadQueuelrq_alloc (uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)
 
static void lrq_free (LsnReadQueue *lrq)
 
static uint32 lrq_inflight (LsnReadQueue *lrq)
 
static uint32 lrq_completed (LsnReadQueue *lrq)
 
static void lrq_prefetch (LsnReadQueue *lrq)
 
static void lrq_complete_lsn (LsnReadQueue *lrq, XLogRecPtr lsn)
 
size_t XLogPrefetchShmemSize (void)
 
void XLogPrefetchResetStats (void)
 
void XLogPrefetchShmemInit (void)
 
void XLogPrefetchReconfigure (void)
 
static void XLogPrefetchIncrement (pg_atomic_uint64 *counter)
 
XLogPrefetcherXLogPrefetcherAllocate (XLogReaderState *reader)
 
void XLogPrefetcherFree (XLogPrefetcher *prefetcher)
 
XLogReaderStateXLogPrefetcherGetReader (XLogPrefetcher *prefetcher)
 
void XLogPrefetcherComputeStats (XLogPrefetcher *prefetcher)
 
Datum pg_stat_get_recovery_prefetch (PG_FUNCTION_ARGS)
 
void XLogPrefetcherBeginRead (XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
 
XLogRecordXLogPrefetcherReadRecord (XLogPrefetcher *prefetcher, char **errmsg)
 
bool check_recovery_prefetch (int *new_value, void **extra, GucSource source)
 
void assign_recovery_prefetch (int new_value, void *extra)
 

Variables

int recovery_prefetch = RECOVERY_PREFETCH_TRY
 
static int XLogPrefetchReconfigureCount = 0
 
static XLogPrefetchStatsSharedStats
 

Macro Definition Documentation

◆ PG_STAT_GET_RECOVERY_PREFETCH_COLS

#define PG_STAT_GET_RECOVERY_PREFETCH_COLS   10

◆ RecoveryPrefetchEnabled

#define RecoveryPrefetchEnabled ( )    false

Definition at line 77 of file xlogprefetcher.c.

◆ XLOGPREFETCHER_DISTANCE_MULTIPLIER

#define XLOGPREFETCHER_DISTANCE_MULTIPLIER   4

Definition at line 66 of file xlogprefetcher.c.

◆ XLOGPREFETCHER_SEQ_WINDOW_SIZE

#define XLOGPREFETCHER_SEQ_WINDOW_SIZE   4

Definition at line 60 of file xlogprefetcher.c.

◆ XLOGPREFETCHER_STATS_DISTANCE

#define XLOGPREFETCHER_STATS_DISTANCE   BLCKSZ

Definition at line 54 of file xlogprefetcher.c.

Typedef Documentation

◆ LsnReadQueue

typedef struct LsnReadQueue LsnReadQueue

◆ LsnReadQueueNextFun

typedef LsnReadQueueNextStatus(* LsnReadQueueNextFun) (uintptr_t lrq_private, XLogRecPtr *lsn)

Definition at line 96 of file xlogprefetcher.c.

◆ XLogPrefetcherFilter

◆ XLogPrefetchStats

Enumeration Type Documentation

◆ LsnReadQueueNextStatus

Enumerator
LRQ_NEXT_NO_IO 
LRQ_NEXT_IO 
LRQ_NEXT_AGAIN 

Definition at line 85 of file xlogprefetcher.c.

86 {
LsnReadQueueNextStatus
@ LRQ_NEXT_NO_IO
@ LRQ_NEXT_IO
@ LRQ_NEXT_AGAIN

Function Documentation

◆ assign_recovery_prefetch()

void assign_recovery_prefetch ( int  new_value,
void *  extra 
)

Definition at line 1081 of file xlogprefetcher.c.

1082 {
1083  /* Reconfigure prefetching, because a setting it depends on changed. */
1084  recovery_prefetch = new_value;
1085  if (AmStartupProcess())
1087 }
#define AmStartupProcess()
Definition: miscadmin.h:444
int recovery_prefetch
void XLogPrefetchReconfigure(void)

References AmStartupProcess, recovery_prefetch, and XLogPrefetchReconfigure().

◆ check_recovery_prefetch()

bool check_recovery_prefetch ( int *  new_value,
void **  extra,
GucSource  source 
)

Definition at line 1067 of file xlogprefetcher.c.

1068 {
1069 #ifndef USE_PREFETCH
1070  if (*new_value == RECOVERY_PREFETCH_ON)
1071  {
1072  GUC_check_errdetail("recovery_prefetch not supported on platforms that lack posix_fadvise().");
1073  return false;
1074  }
1075 #endif
1076 
1077  return true;
1078 }
#define GUC_check_errdetail
Definition: guc.h:431
@ RECOVERY_PREFETCH_ON

References GUC_check_errdetail, and RECOVERY_PREFETCH_ON.

◆ lrq_alloc()

static LsnReadQueue* lrq_alloc ( uint32  max_distance,
uint32  max_inflight,
uintptr_t  lrq_private,
LsnReadQueueNextFun  next 
)
inlinestatic

Definition at line 204 of file xlogprefetcher.c.

208 {
209  LsnReadQueue *lrq;
210  uint32 size;
211 
212  Assert(max_distance >= max_inflight);
213 
214  size = max_distance + 1; /* full ring buffer has a gap */
215  lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
216  lrq->lrq_private = lrq_private;
217  lrq->max_inflight = max_inflight;
218  lrq->size = size;
219  lrq->next = next;
220  lrq->head = 0;
221  lrq->tail = 0;
222  lrq->inflight = 0;
223  lrq->completed = 0;
224 
225  return lrq;
226 }
static int32 next
Definition: blutils.c:219
unsigned int uint32
Definition: c.h:441
#define offsetof(type, field)
Definition: c.h:727
Assert(fmt[strlen(fmt) - 1] !='\n')
void * palloc(Size size)
Definition: mcxt.c:1068
struct LsnReadQueue::@14 queue[FLEXIBLE_ARRAY_MEMBER]
uint32 max_inflight
LsnReadQueueNextFun next
uintptr_t lrq_private

References Assert(), LsnReadQueue::completed, LsnReadQueue::head, LsnReadQueue::inflight, LsnReadQueue::lrq_private, LsnReadQueue::max_inflight, next, LsnReadQueue::next, offsetof, palloc(), LsnReadQueue::queue, LsnReadQueue::size, and LsnReadQueue::tail.

Referenced by XLogPrefetcherReadRecord().

◆ lrq_complete_lsn()

static void lrq_complete_lsn ( LsnReadQueue lrq,
XLogRecPtr  lsn 
)
inlinestatic

Definition at line 274 of file xlogprefetcher.c.

275 {
276  /*
277  * We know that LSNs before 'lsn' have been replayed, so we can now assume
278  * that any IOs that were started before then have finished.
279  */
280  while (lrq->tail != lrq->head &&
281  lrq->queue[lrq->tail].lsn < lsn)
282  {
283  if (lrq->queue[lrq->tail].io)
284  lrq->inflight--;
285  else
286  lrq->completed--;
287  lrq->tail++;
288  if (lrq->tail == lrq->size)
289  lrq->tail = 0;
290  }
292  lrq_prefetch(lrq);
293 }
XLogRecPtr lsn
#define RecoveryPrefetchEnabled()
static void lrq_prefetch(LsnReadQueue *lrq)

References LsnReadQueue::completed, LsnReadQueue::head, LsnReadQueue::inflight, LsnReadQueue::io, lrq_prefetch(), LsnReadQueue::lsn, LsnReadQueue::queue, RecoveryPrefetchEnabled, LsnReadQueue::size, and LsnReadQueue::tail.

Referenced by XLogPrefetcherReadRecord().

◆ lrq_completed()

static uint32 lrq_completed ( LsnReadQueue lrq)
inlinestatic

Definition at line 241 of file xlogprefetcher.c.

242 {
243  return lrq->completed;
244 }

References LsnReadQueue::completed.

Referenced by XLogPrefetcherComputeStats().

◆ lrq_free()

static void lrq_free ( LsnReadQueue lrq)
inlinestatic

Definition at line 229 of file xlogprefetcher.c.

230 {
231  pfree(lrq);
232 }
void pfree(void *pointer)
Definition: mcxt.c:1175

References pfree().

Referenced by XLogPrefetcherFree(), and XLogPrefetcherReadRecord().

◆ lrq_inflight()

static uint32 lrq_inflight ( LsnReadQueue lrq)
inlinestatic

Definition at line 235 of file xlogprefetcher.c.

236 {
237  return lrq->inflight;
238 }

References LsnReadQueue::inflight.

Referenced by XLogPrefetcherComputeStats().

◆ lrq_prefetch()

static void lrq_prefetch ( LsnReadQueue lrq)
inlinestatic

Definition at line 247 of file xlogprefetcher.c.

248 {
249  /* Try to start as many IOs as we can within our limits. */
250  while (lrq->inflight < lrq->max_inflight &&
251  lrq->inflight + lrq->completed < lrq->size - 1)
252  {
253  Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
254  switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
255  {
256  case LRQ_NEXT_AGAIN:
257  return;
258  case LRQ_NEXT_IO:
259  lrq->queue[lrq->head].io = true;
260  lrq->inflight++;
261  break;
262  case LRQ_NEXT_NO_IO:
263  lrq->queue[lrq->head].io = false;
264  lrq->completed++;
265  break;
266  }
267  lrq->head++;
268  if (lrq->head == lrq->size)
269  lrq->head = 0;
270  }
271 }

References Assert(), LsnReadQueue::completed, LsnReadQueue::head, LsnReadQueue::inflight, LsnReadQueue::io, LRQ_NEXT_AGAIN, LRQ_NEXT_IO, LRQ_NEXT_NO_IO, LsnReadQueue::lrq_private, LsnReadQueue::lsn, LsnReadQueue::max_inflight, LsnReadQueue::next, LsnReadQueue::queue, LsnReadQueue::size, and LsnReadQueue::tail.

Referenced by lrq_complete_lsn(), and XLogPrefetcherReadRecord().

◆ pg_stat_get_recovery_prefetch()

Datum pg_stat_get_recovery_prefetch ( PG_FUNCTION_ARGS  )

Definition at line 827 of file xlogprefetcher.c.

828 {
829 #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
830  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
833 
834  SetSingleFuncCall(fcinfo, 0);
835 
836  for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
837  nulls[i] = false;
838 
849  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
850 
851  return (Datum) 0;
852 }
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
static Datum values[MAXATTR]
Definition: bootstrap.c:156
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1683
void SetSingleFuncCall(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
int i
Definition: isn.c:73
uintptr_t Datum
Definition: postgres.h:411
#define Int32GetDatum(X)
Definition: postgres.h:523
TupleDesc setDesc
Definition: execnodes.h:317
Tuplestorestate * setResult
Definition: execnodes.h:316
pg_atomic_uint64 skip_fpw
pg_atomic_uint64 skip_init
pg_atomic_uint64 reset_time
pg_atomic_uint64 hit
pg_atomic_uint64 prefetch
pg_atomic_uint64 skip_rep
pg_atomic_uint64 skip_new
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
static XLogPrefetchStats * SharedStats
#define PG_STAT_GET_RECOVERY_PREFETCH_COLS

References XLogPrefetchStats::block_distance, XLogPrefetchStats::hit, i, Int32GetDatum, Int64GetDatum(), XLogPrefetchStats::io_depth, pg_atomic_read_u64(), PG_STAT_GET_RECOVERY_PREFETCH_COLS, XLogPrefetchStats::prefetch, XLogPrefetchStats::reset_time, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SetSingleFuncCall(), SharedStats, XLogPrefetchStats::skip_fpw, XLogPrefetchStats::skip_init, XLogPrefetchStats::skip_new, XLogPrefetchStats::skip_rep, TimestampTzGetDatum, tuplestore_putvalues(), values, and XLogPrefetchStats::wal_distance.

◆ XLogPrefetcherAddFilter()

static void XLogPrefetcherAddFilter ( XLogPrefetcher prefetcher,
RelFileNode  rnode,
BlockNumber  blockno,
XLogRecPtr  lsn 
)
inlinestatic

Definition at line 859 of file xlogprefetcher.c.

861 {
862  XLogPrefetcherFilter *filter;
863  bool found;
864 
865  filter = hash_search(prefetcher->filter_table, &rnode, HASH_ENTER, &found);
866  if (!found)
867  {
868  /*
869  * Don't allow any prefetching of this block or higher until replayed.
870  */
871  filter->filter_until_replayed = lsn;
872  filter->filter_from_block = blockno;
873  dlist_push_head(&prefetcher->filter_queue, &filter->link);
874  }
875  else
876  {
877  /*
878  * We were already filtering this rnode. Extend the filter's lifetime
879  * to cover this WAL record, but leave the lower of the block numbers
880  * there because we don't want to have to track individual blocks.
881  */
882  filter->filter_until_replayed = lsn;
883  dlist_delete(&filter->link);
884  dlist_push_head(&prefetcher->filter_queue, &filter->link);
885  filter->filter_from_block = Min(filter->filter_from_block, blockno);
886  }
887 }
#define Min(x, y)
Definition: c.h:986
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
@ HASH_ENTER
Definition: hsearch.h:114
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void dlist_push_head(dlist_head *head, dlist_node *node)
Definition: ilist.h:300
XLogRecPtr filter_until_replayed
BlockNumber filter_from_block
dlist_head filter_queue

References dlist_delete(), dlist_push_head(), XLogPrefetcherFilter::filter_from_block, XLogPrefetcher::filter_queue, XLogPrefetcher::filter_table, XLogPrefetcherFilter::filter_until_replayed, HASH_ENTER, hash_search(), XLogPrefetcherFilter::link, and Min.

Referenced by XLogPrefetcherNextBlock().

◆ XLogPrefetcherAllocate()

XLogPrefetcher* XLogPrefetcherAllocate ( XLogReaderState reader)

Definition at line 364 of file xlogprefetcher.c.

365 {
366  XLogPrefetcher *prefetcher;
367  static HASHCTL hash_table_ctl = {
368  .keysize = sizeof(RelFileNode),
369  .entrysize = sizeof(XLogPrefetcherFilter)
370  };
371 
372  prefetcher = palloc0(sizeof(XLogPrefetcher));
373 
374  prefetcher->reader = reader;
375  prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
376  &hash_table_ctl,
378  dlist_init(&prefetcher->filter_queue);
379 
382  SharedStats->io_depth = 0;
383 
384  /* First usage will cause streaming_read to be allocated. */
386 
387  return prefetcher;
388 }
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
void * palloc0(Size size)
Definition: mcxt.c:1099
struct RelFileNode RelFileNode
Size keysize
Definition: hsearch.h:75
XLogReaderState * reader
static int XLogPrefetchReconfigureCount

References XLogPrefetchStats::block_distance, dlist_init(), XLogPrefetcher::filter_queue, XLogPrefetcher::filter_table, HASH_BLOBS, hash_create(), HASH_ELEM, XLogPrefetchStats::io_depth, HASHCTL::keysize, palloc0(), XLogPrefetcher::reader, XLogPrefetcher::reconfigure_count, SharedStats, XLogPrefetchStats::wal_distance, and XLogPrefetchReconfigureCount.

Referenced by InitWalRecovery().

◆ XLogPrefetcherBeginRead()

void XLogPrefetcherBeginRead ( XLogPrefetcher prefetcher,
XLogRecPtr  recPtr 
)

Definition at line 964 of file xlogprefetcher.c.

965 {
966  /* This will forget about any in-flight IO. */
967  prefetcher->reconfigure_count--;
968 
969  /* Book-keeping to avoid readahead on first read. */
970  prefetcher->begin_ptr = recPtr;
971 
972  prefetcher->no_readahead_until = 0;
973 
974  /* This will forget about any queued up records in the decoder. */
975  XLogBeginRead(prefetcher->reader, recPtr);
976 }
XLogRecPtr no_readahead_until
XLogRecPtr begin_ptr
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:264

References XLogPrefetcher::begin_ptr, XLogPrefetcher::no_readahead_until, XLogPrefetcher::reader, XLogPrefetcher::reconfigure_count, and XLogBeginRead().

Referenced by FinishWalRecovery(), InitWalRecovery(), PerformWalRecovery(), and ReadCheckpointRecord().

◆ XLogPrefetcherCompleteFilters()

static void XLogPrefetcherCompleteFilters ( XLogPrefetcher prefetcher,
XLogRecPtr  replaying_lsn 
)
inlinestatic

Definition at line 896 of file xlogprefetcher.c.

897 {
898  while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
899  {
901  link,
902  &prefetcher->filter_queue);
903 
904  if (filter->filter_until_replayed >= replaying_lsn)
905  break;
906 
907  dlist_delete(&filter->link);
908  hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
909  }
910 }
#define unlikely(x)
Definition: c.h:273
@ HASH_REMOVE
Definition: hsearch.h:115
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:515

References dlist_delete(), dlist_is_empty(), dlist_tail_element, XLogPrefetcher::filter_queue, XLogPrefetcher::filter_table, XLogPrefetcherFilter::filter_until_replayed, HASH_REMOVE, hash_search(), XLogPrefetcherFilter::link, and unlikely.

Referenced by XLogPrefetcherReadRecord().

◆ XLogPrefetcherComputeStats()

void XLogPrefetcherComputeStats ( XLogPrefetcher prefetcher)

Definition at line 414 of file xlogprefetcher.c.

415 {
416  uint32 io_depth;
417  uint32 completed;
418  int64 wal_distance;
419 
420 
421  /* How far ahead of replay are we now? */
422  if (prefetcher->reader->decode_queue_tail)
423  {
424  wal_distance =
425  prefetcher->reader->decode_queue_tail->lsn -
426  prefetcher->reader->decode_queue_head->lsn;
427  }
428  else
429  {
430  wal_distance = 0;
431  }
432 
433  /* How many IOs are currently in flight and completed? */
434  io_depth = lrq_inflight(prefetcher->streaming_read);
435  completed = lrq_completed(prefetcher->streaming_read);
436 
437  /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
438  SharedStats->io_depth = io_depth;
439  SharedStats->block_distance = io_depth + completed;
440  SharedStats->wal_distance = wal_distance;
441 
442  prefetcher->next_stats_shm_lsn =
444 }
XLogRecPtr lsn
Definition: xlogreader.h:164
LsnReadQueue * streaming_read
XLogRecPtr next_stats_shm_lsn
DecodedXLogRecord * decode_queue_head
Definition: xlogreader.h:260
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206
DecodedXLogRecord * decode_queue_tail
Definition: xlogreader.h:261
static uint32 lrq_completed(LsnReadQueue *lrq)
static uint32 lrq_inflight(LsnReadQueue *lrq)
#define XLOGPREFETCHER_STATS_DISTANCE

References XLogPrefetchStats::block_distance, XLogReaderState::decode_queue_head, XLogReaderState::decode_queue_tail, XLogPrefetchStats::io_depth, lrq_completed(), lrq_inflight(), DecodedXLogRecord::lsn, XLogPrefetcher::next_stats_shm_lsn, XLogPrefetcher::reader, XLogReaderState::ReadRecPtr, SharedStats, XLogPrefetcher::streaming_read, XLogPrefetchStats::wal_distance, and XLOGPREFETCHER_STATS_DISTANCE.

Referenced by ShutdownWalRecovery(), WaitForWALToBecomeAvailable(), and XLogPrefetcherReadRecord().

◆ XLogPrefetcherFree()

void XLogPrefetcherFree ( XLogPrefetcher prefetcher)

Definition at line 394 of file xlogprefetcher.c.

395 {
396  lrq_free(prefetcher->streaming_read);
397  hash_destroy(prefetcher->filter_table);
398  pfree(prefetcher);
399 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:862
static void lrq_free(LsnReadQueue *lrq)

References XLogPrefetcher::filter_table, hash_destroy(), lrq_free(), pfree(), and XLogPrefetcher::streaming_read.

Referenced by ShutdownWalRecovery().

◆ XLogPrefetcherGetReader()

XLogReaderState* XLogPrefetcherGetReader ( XLogPrefetcher prefetcher)

Definition at line 405 of file xlogprefetcher.c.

406 {
407  return prefetcher->reader;
408 }

References XLogPrefetcher::reader.

Referenced by ReadRecord().

◆ XLogPrefetcherIsFiltered()

static bool XLogPrefetcherIsFiltered ( XLogPrefetcher prefetcher,
RelFileNode  rnode,
BlockNumber  blockno 
)
inlinestatic

Definition at line 916 of file xlogprefetcher.c.

918 {
919  /*
920  * Test for empty queue first, because we expect it to be empty most of
921  * the time and we can avoid the hash table lookup in that case.
922  */
923  if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
924  {
925  XLogPrefetcherFilter *filter;
926 
927  /* See if the block range is filtered. */
928  filter = hash_search(prefetcher->filter_table, &rnode, HASH_FIND, NULL);
929  if (filter && filter->filter_from_block <= blockno)
930  {
931 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
932  elog(XLOGPREFETCHER_DEBUG_LEVEL,
933  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
934  rnode.spcNode, rnode.dbNode, rnode.relNode, blockno,
936  filter->filter_from_block);
937 #endif
938  return true;
939  }
940 
941  /* See if the whole database is filtered. */
942  rnode.relNode = InvalidOid;
943  rnode.spcNode = InvalidOid;
944  filter = hash_search(prefetcher->filter_table, &rnode, HASH_FIND, NULL);
945  if (filter)
946  {
947 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
948  elog(XLOGPREFETCHER_DEBUG_LEVEL,
949  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
950  rnode.spcNode, rnode.dbNode, rnode.relNode, blockno,
952 #endif
953  return true;
954  }
955  }
956 
957  return false;
958 }
#define elog(elevel,...)
Definition: elog.h:218
@ HASH_FIND
Definition: hsearch.h:113
#define InvalidOid
Definition: postgres_ext.h:36
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References RelFileNode::dbNode, dlist_is_empty(), elog, XLogPrefetcherFilter::filter_from_block, XLogPrefetcher::filter_queue, XLogPrefetcher::filter_table, XLogPrefetcherFilter::filter_until_replayed, HASH_FIND, hash_search(), InvalidOid, LSN_FORMAT_ARGS, RelFileNode::relNode, RelFileNode::spcNode, and unlikely.

Referenced by XLogPrefetcherNextBlock().

◆ XLogPrefetcherNextBlock()

static LsnReadQueueNextStatus XLogPrefetcherNextBlock ( uintptr_t  pgsr_private,
XLogRecPtr lsn 
)
static

Definition at line 463 of file xlogprefetcher.c.

464 {
465  XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
466  XLogReaderState *reader = prefetcher->reader;
467  XLogRecPtr replaying_lsn = reader->ReadRecPtr;
468 
469  /*
470  * We keep track of the record and block we're up to between calls with
471  * prefetcher->record and prefetcher->next_block_id.
472  */
473  for (;;)
474  {
475  DecodedXLogRecord *record;
476 
477  /* Try to read a new future record, if we don't already have one. */
478  if (prefetcher->record == NULL)
479  {
480  bool nonblocking;
481 
482  /*
483  * If there are already records or an error queued up that could
484  * be replayed, we don't want to block here. Otherwise, it's OK
485  * to block waiting for more data: presumably the caller has
486  * nothing else to do.
487  */
488  nonblocking = XLogReaderHasQueuedRecordOrError(reader);
489 
490  /* Readahead is disabled until we replay past a certain point. */
491  if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
492  return LRQ_NEXT_AGAIN;
493 
494  record = XLogReadAhead(prefetcher->reader, nonblocking);
495  if (record == NULL)
496  {
497  /*
498  * We can't read any more, due to an error or lack of data in
499  * nonblocking mode. Don't try to read ahead again until
500  * we've replayed everything already decoded.
501  */
502  if (nonblocking && prefetcher->reader->decode_queue_tail)
503  prefetcher->no_readahead_until =
504  prefetcher->reader->decode_queue_tail->lsn;
505 
506  return LRQ_NEXT_AGAIN;
507  }
508 
509  /*
510  * If prefetching is disabled, we don't need to analyze the record
511  * or issue any prefetches. We just need to cause one record to
512  * be decoded.
513  */
515  {
516  *lsn = InvalidXLogRecPtr;
517  return LRQ_NEXT_NO_IO;
518  }
519 
520  /* We have a new record to process. */
521  prefetcher->record = record;
522  prefetcher->next_block_id = 0;
523  }
524  else
525  {
526  /* Continue to process from last call, or last loop. */
527  record = prefetcher->record;
528  }
529 
530  /*
531  * Check for operations that require us to filter out block ranges, or
532  * pause readahead completely.
533  */
534  if (replaying_lsn < record->lsn)
535  {
536  uint8 rmid = record->header.xl_rmid;
537  uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK;
538 
539  if (rmid == RM_XLOG_ID)
540  {
541  if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
542  record_type == XLOG_END_OF_RECOVERY)
543  {
544  /*
545  * These records might change the TLI. Avoid potential
546  * bugs if we were to allow "read TLI" and "replay TLI" to
547  * differ without more analysis.
548  */
549  prefetcher->no_readahead_until = record->lsn;
550 
551 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
552  elog(XLOGPREFETCHER_DEBUG_LEVEL,
553  "suppressing all readahead until %X/%X is replayed due to possible TLI change",
554  LSN_FORMAT_ARGS(record->lsn));
555 #endif
556 
557  /* Fall through so we move past this record. */
558  }
559  }
560  else if (rmid == RM_DBASE_ID)
561  {
562  /*
563  * When databases are created with the file-copy strategy,
564  * there are no WAL records to tell us about the creation of
565  * individual relations.
566  */
567  if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
568  {
571  RelFileNode rnode = {InvalidOid, xlrec->db_id, InvalidOid};
572 
573  /*
574  * Don't try to prefetch anything in this database until
575  * it has been created, or we might confuse the blocks of
576  * different generations, if a database OID or relfilenode
577  * is reused. It's also more efficient than discovering
578  * that relations don't exist on disk yet with ENOENT
579  * errors.
580  */
581  XLogPrefetcherAddFilter(prefetcher, rnode, 0, record->lsn);
582 
583 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
584  elog(XLOGPREFETCHER_DEBUG_LEVEL,
585  "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
586  rnode.dbNode,
587  LSN_FORMAT_ARGS(record->lsn));
588 #endif
589  }
590  }
591  else if (rmid == RM_SMGR_ID)
592  {
593  if (record_type == XLOG_SMGR_CREATE)
594  {
595  xl_smgr_create *xlrec = (xl_smgr_create *)
596  record->main_data;
597 
598  if (xlrec->forkNum == MAIN_FORKNUM)
599  {
600  /*
601  * Don't prefetch anything for this whole relation
602  * until it has been created. Otherwise we might
603  * confuse the blocks of different generations, if a
604  * relfilenode is reused. This also avoids the need
605  * to discover the problem via extra syscalls that
606  * report ENOENT.
607  */
608  XLogPrefetcherAddFilter(prefetcher, xlrec->rnode, 0,
609  record->lsn);
610 
611 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
612  elog(XLOGPREFETCHER_DEBUG_LEVEL,
613  "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
614  xlrec->rnode.spcNode,
615  xlrec->rnode.dbNode,
616  xlrec->rnode.relNode,
617  LSN_FORMAT_ARGS(record->lsn));
618 #endif
619  }
620  }
621  else if (record_type == XLOG_SMGR_TRUNCATE)
622  {
624  record->main_data;
625 
626  /*
627  * Don't consider prefetching anything in the truncated
628  * range until the truncation has been performed.
629  */
630  XLogPrefetcherAddFilter(prefetcher, xlrec->rnode,
631  xlrec->blkno,
632  record->lsn);
633 
634 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
635  elog(XLOGPREFETCHER_DEBUG_LEVEL,
636  "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
637  xlrec->rnode.spcNode,
638  xlrec->rnode.dbNode,
639  xlrec->rnode.relNode,
640  xlrec->blkno,
641  LSN_FORMAT_ARGS(record->lsn));
642 #endif
643  }
644  }
645  }
646 
647  /* Scan the block references, starting where we left off last time. */
648  while (prefetcher->next_block_id <= record->max_block_id)
649  {
650  int block_id = prefetcher->next_block_id++;
651  DecodedBkpBlock *block = &record->blocks[block_id];
652  SMgrRelation reln;
653  PrefetchBufferResult result;
654 
655  if (!block->in_use)
656  continue;
657 
659 
660  /*
661  * Record the LSN of this record. When it's replayed,
662  * LsnReadQueue will consider any IOs submitted for earlier LSNs
663  * to be finished.
664  */
665  *lsn = record->lsn;
666 
667  /* We don't try to prefetch anything but the main fork for now. */
668  if (block->forknum != MAIN_FORKNUM)
669  {
670  return LRQ_NEXT_NO_IO;
671  }
672 
673  /*
674  * If there is a full page image attached, we won't be reading the
675  * page, so don't bother trying to prefetch.
676  */
677  if (block->has_image)
678  {
680  return LRQ_NEXT_NO_IO;
681  }
682 
683  /* There is no point in reading a page that will be zeroed. */
684  if (block->flags & BKPBLOCK_WILL_INIT)
685  {
687  return LRQ_NEXT_NO_IO;
688  }
689 
690  /* Should we skip prefetching this block due to a filter? */
691  if (XLogPrefetcherIsFiltered(prefetcher, block->rnode, block->blkno))
692  {
694  return LRQ_NEXT_NO_IO;
695  }
696 
697  /* There is no point in repeatedly prefetching the same block. */
698  for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
699  {
700  if (block->blkno == prefetcher->recent_block[i] &&
701  RelFileNodeEquals(block->rnode, prefetcher->recent_rnode[i]))
702  {
703  /*
704  * XXX If we also remembered where it was, we could set
705  * recent_buffer so that recovery could skip smgropen()
706  * and a buffer table lookup.
707  */
709  return LRQ_NEXT_NO_IO;
710  }
711  }
712  prefetcher->recent_rnode[prefetcher->recent_idx] = block->rnode;
713  prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
714  prefetcher->recent_idx =
715  (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
716 
717  /*
718  * We could try to have a fast path for repeated references to the
719  * same relation (with some scheme to handle invalidations
720  * safely), but for now we'll call smgropen() every time.
721  */
722  reln = smgropen(block->rnode, InvalidBackendId);
723 
724  /*
725  * If the relation file doesn't exist on disk, for example because
726  * we're replaying after a crash and the file will be created and
727  * then unlinked by WAL that hasn't been replayed yet, suppress
728  * further prefetching in the relation until this record is
729  * replayed.
730  */
731  if (!smgrexists(reln, MAIN_FORKNUM))
732  {
733 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
734  elog(XLOGPREFETCHER_DEBUG_LEVEL,
735  "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
736  reln->smgr_rnode.node.spcNode,
737  reln->smgr_rnode.node.dbNode,
738  reln->smgr_rnode.node.relNode,
739  LSN_FORMAT_ARGS(record->lsn));
740 #endif
741  XLogPrefetcherAddFilter(prefetcher, block->rnode, 0,
742  record->lsn);
744  return LRQ_NEXT_NO_IO;
745  }
746 
747  /*
748  * If the relation isn't big enough to contain the referenced
749  * block yet, suppress prefetching of this block and higher until
750  * this record is replayed.
751  */
752  if (block->blkno >= smgrnblocks(reln, block->forknum))
753  {
754 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
755  elog(XLOGPREFETCHER_DEBUG_LEVEL,
756  "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
757  reln->smgr_rnode.node.spcNode,
758  reln->smgr_rnode.node.dbNode,
759  reln->smgr_rnode.node.relNode,
760  block->blkno,
761  LSN_FORMAT_ARGS(record->lsn));
762 #endif
763  XLogPrefetcherAddFilter(prefetcher, block->rnode, block->blkno,
764  record->lsn);
766  return LRQ_NEXT_NO_IO;
767  }
768 
769  /* Try to initiate prefetching. */
770  result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
771  if (BufferIsValid(result.recent_buffer))
772  {
773  /* Cache hit, nothing to do. */
775  block->prefetch_buffer = result.recent_buffer;
776  return LRQ_NEXT_NO_IO;
777  }
778  else if (result.initiated_io)
779  {
780  /* Cache miss, I/O (presumably) started. */
783  return LRQ_NEXT_IO;
784  }
785  else
786  {
787  /*
788  * This shouldn't be possible, because we already determined
789  * that the relation exists on disk and is big enough.
790  * Something is wrong with the cache invalidation for
791  * smgrexists(), smgrnblocks(), or the file was unlinked or
792  * truncated beneath our feet?
793  */
794  elog(ERROR,
795  "could not prefetch relation %u/%u/%u block %u",
796  reln->smgr_rnode.node.spcNode,
797  reln->smgr_rnode.node.dbNode,
798  reln->smgr_rnode.node.relNode,
799  block->blkno);
800  }
801  }
802 
803  /*
804  * Several callsites need to be able to read exactly one record
805  * without any internal readahead. Examples: xlog.c reading
806  * checkpoint records with emode set to PANIC, which might otherwise
807  * cause XLogPageRead() to panic on some future page, and xlog.c
808  * determining where to start writing WAL next, which depends on the
809  * contents of the reader's internal buffer after reading one record.
810  * Therefore, don't even think about prefetching until the first
811  * record after XLogPrefetcherBeginRead() has been consumed.
812  */
813  if (prefetcher->reader->decode_queue_tail &&
814  prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
815  return LRQ_NEXT_AGAIN;
816 
817  /* Advance to the next record. */
818  prefetcher->record = NULL;
819  }
820  pg_unreachable();
821 }
#define InvalidBackendId
Definition: backendid.h:23
#define InvalidBuffer
Definition: buf.h:25
PrefetchBufferResult PrefetchSharedBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, BlockNumber blockNum)
Definition: bufmgr.c:505
#define BufferIsValid(bufnum)
Definition: bufmgr.h:123
#define pg_unreachable()
Definition: c.h:258
unsigned char uint8
Definition: c.h:439
#define XLOG_DBASE_CREATE_FILE_COPY
#define ERROR
Definition: elog.h:33
#define XLOG_CHECKPOINT_SHUTDOWN
Definition: pg_control.h:67
#define XLOG_END_OF_RECOVERY
Definition: pg_control.h:76
#define RelFileNodeEquals(node1, node2)
Definition: relfilenode.h:88
@ MAIN_FORKNUM
Definition: relpath.h:43
BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:579
bool smgrexists(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:247
SMgrRelation smgropen(RelFileNode rnode, BackendId backend)
Definition: smgr.c:146
#define XLOG_SMGR_CREATE
Definition: storage_xlog.h:30
#define XLOG_SMGR_TRUNCATE
Definition: storage_xlog.h:31
RelFileNode rnode
Definition: xlogreader.h:125
Buffer prefetch_buffer
Definition: xlogreader.h:130
BlockNumber blkno
Definition: xlogreader.h:127
ForkNumber forknum
Definition: xlogreader.h:126
XLogRecord header
Definition: xlogreader.h:166
DecodedBkpBlock blocks[FLEXIBLE_ARRAY_MEMBER]
Definition: xlogreader.h:172
Buffer recent_buffer
Definition: bufmgr.h:54
RelFileNode node
Definition: relfilenode.h:74
RelFileNodeBackend smgr_rnode
Definition: smgr.h:42
RelFileNode recent_rnode[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
DecodedXLogRecord * record
BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
uint8 xl_info
Definition: xlogrecord.h:46
RmgrId xl_rmid
Definition: xlogrecord.h:47
ForkNumber forkNum
Definition: storage_xlog.h:36
RelFileNode rnode
Definition: storage_xlog.h:35
BlockNumber blkno
Definition: storage_xlog.h:48
RelFileNode rnode
Definition: storage_xlog.h:49
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileNode rnode, BlockNumber blockno)
static void XLogPrefetchIncrement(pg_atomic_uint64 *counter)
#define XLOGPREFETCHER_SEQ_WINDOW_SIZE
static void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileNode rnode, BlockNumber blockno, XLogRecPtr lsn)
DecodedXLogRecord * XLogReadAhead(XLogReaderState *state, bool nonblocking)
Definition: xlogreader.c:938
static bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
Definition: xlogreader.h:325
#define BKPBLOCK_WILL_INIT
Definition: xlogrecord.h:188
#define XLR_INFO_MASK
Definition: xlogrecord.h:62

References Assert(), XLogPrefetcher::begin_ptr, BKPBLOCK_WILL_INIT, DecodedBkpBlock::blkno, xl_smgr_truncate::blkno, BufferIsValid, xl_dbase_create_file_copy_rec::db_id, RelFileNode::dbNode, XLogReaderState::decode_queue_tail, elog, ERROR, DecodedBkpBlock::flags, DecodedBkpBlock::forknum, xl_smgr_create::forkNum, DecodedBkpBlock::has_image, DecodedXLogRecord::header, XLogPrefetchStats::hit, i, if(), DecodedBkpBlock::in_use, PrefetchBufferResult::initiated_io, InvalidBackendId, InvalidBuffer, InvalidOid, InvalidXLogRecPtr, LRQ_NEXT_AGAIN, LRQ_NEXT_IO, LRQ_NEXT_NO_IO, DecodedXLogRecord::lsn, LSN_FORMAT_ARGS, DecodedXLogRecord::main_data, MAIN_FORKNUM, XLogPrefetcher::next_block_id, XLogPrefetcher::no_readahead_until, RelFileNodeBackend::node, XLogPrefetchStats::prefetch, DecodedBkpBlock::prefetch_buffer, PrefetchSharedBuffer(), XLogPrefetcher::reader, XLogReaderState::ReadRecPtr, XLogPrefetcher::recent_block, PrefetchBufferResult::recent_buffer, XLogPrefetcher::recent_idx, XLogPrefetcher::recent_rnode, XLogPrefetcher::record, RecoveryPrefetchEnabled, RelFileNodeEquals, RelFileNode::relNode, DecodedBkpBlock::rnode, xl_smgr_create::rnode, xl_smgr_truncate::rnode, SharedStats, XLogPrefetchStats::skip_fpw, XLogPrefetchStats::skip_init, XLogPrefetchStats::skip_new, XLogPrefetchStats::skip_rep, SMgrRelationData::smgr_rnode, smgrexists(), smgrnblocks(), smgropen(), RelFileNode::spcNode, XLogRecord::xl_info, XLogRecord::xl_rmid, XLOG_CHECKPOINT_SHUTDOWN, XLOG_DBASE_CREATE_FILE_COPY, XLOG_END_OF_RECOVERY, XLOG_SMGR_CREATE, XLOG_SMGR_TRUNCATE, XLOGPREFETCHER_SEQ_WINDOW_SIZE, XLogPrefetcherAddFilter(), XLogPrefetcherIsFiltered(), XLogPrefetchIncrement(), XLogReadAhead(), XLogReaderHasQueuedRecordOrError(), and XLR_INFO_MASK.

Referenced by XLogPrefetcherReadRecord().

◆ XLogPrefetcherReadRecord()

XLogRecord* XLogPrefetcherReadRecord ( XLogPrefetcher prefetcher,
char **  errmsg 
)

Definition at line 983 of file xlogprefetcher.c.

984 {
985  DecodedXLogRecord *record;
986 
987  /*
988  * See if it's time to reset the prefetching machinery, because a relevant
989  * GUC was changed.
990  */
992  {
993  uint32 max_distance;
994  uint32 max_inflight;
995 
996  if (prefetcher->streaming_read)
997  lrq_free(prefetcher->streaming_read);
998 
1000  {
1001  max_inflight = Max(maintenance_io_concurrency, 2);
1002  max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
1003  }
1004  else
1005  {
1006  max_inflight = 1;
1007  max_distance = 1;
1008  }
1009 
1010  prefetcher->streaming_read = lrq_alloc(max_distance,
1011  max_inflight,
1012  (uintptr_t) prefetcher,
1014 
1016  }
1017 
1018  /*
1019  * Release last returned record, if there is one. We need to do this so
1020  * that we can check for empty decode queue accurately.
1021  */
1022  XLogReleasePreviousRecord(prefetcher->reader);
1023 
1024  /* If there's nothing queued yet, then start prefetching. */
1025  if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1026  lrq_prefetch(prefetcher->streaming_read);
1027 
1028  /* Read the next record. */
1029  record = XLogNextRecord(prefetcher->reader, errmsg);
1030  if (!record)
1031  return NULL;
1032 
1033  /*
1034  * The record we just got is the "current" one, for the benefit of the
1035  * XLogRecXXX() macros.
1036  */
1037  Assert(record == prefetcher->reader->record);
1038 
1039  /*
1040  * Can we drop any prefetch filters yet, given the record we're about to
1041  * return? This assumes that any records with earlier LSNs have been
1042  * replayed, so if we were waiting for a relation to be created or
1043  * extended, it is now OK to access blocks in the covered range.
1044  */
1045  XLogPrefetcherCompleteFilters(prefetcher, record->lsn);
1046 
1047  /*
1048  * See if it's time to compute some statistics, because enough WAL has
1049  * been processed.
1050  */
1051  if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
1052  XLogPrefetcherComputeStats(prefetcher);
1053 
1054  /*
1055  * The caller is about to replay this record, so we can now report that
1056  * all IO initiated because of early WAL must be finished. This may
1057  * trigger more readahead.
1058  */
1059  lrq_complete_lsn(prefetcher->streaming_read, record->lsn);
1060 
1061  Assert(record == prefetcher->reader->record);
1062 
1063  return &record->header;
1064 }
int maintenance_io_concurrency
Definition: bufmgr.c:152
#define Max(x, y)
Definition: c.h:980
int errmsg(const char *fmt,...)
Definition: elog.c:904
DecodedXLogRecord * record
Definition: xlogreader.h:236
void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
static LsnReadQueue * lrq_alloc(uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)
static void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
static void lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
#define XLOGPREFETCHER_DISTANCE_MULTIPLIER
DecodedXLogRecord * XLogNextRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:354
void XLogReleasePreviousRecord(XLogReaderState *state)
Definition: xlogreader.c:282

References Assert(), errmsg(), DecodedXLogRecord::header, lrq_alloc(), lrq_complete_lsn(), lrq_free(), lrq_prefetch(), DecodedXLogRecord::lsn, maintenance_io_concurrency, Max, XLogPrefetcher::next_stats_shm_lsn, XLogPrefetcher::reader, XLogPrefetcher::reconfigure_count, XLogReaderState::record, RecoveryPrefetchEnabled, XLogPrefetcher::streaming_read, unlikely, XLogNextRecord(), XLOGPREFETCHER_DISTANCE_MULTIPLIER, XLogPrefetcherCompleteFilters(), XLogPrefetcherComputeStats(), XLogPrefetcherNextBlock(), XLogPrefetchReconfigureCount, XLogReaderHasQueuedRecordOrError(), and XLogReleasePreviousRecord().

Referenced by ReadRecord().

◆ XLogPrefetchIncrement()

static void XLogPrefetchIncrement ( pg_atomic_uint64 counter)
inlinestatic

Definition at line 353 of file xlogprefetcher.c.

354 {
356  pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
357 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
bool IsUnderPostmaster
Definition: globals.c:113

References AmStartupProcess, Assert(), IsUnderPostmaster, pg_atomic_read_u64(), and pg_atomic_write_u64().

Referenced by XLogPrefetcherNextBlock().

◆ XLogPrefetchReconfigure()

void XLogPrefetchReconfigure ( void  )

◆ XLogPrefetchResetStats()

◆ XLogPrefetchShmemInit()

void XLogPrefetchShmemInit ( void  )

Definition at line 317 of file xlogprefetcher.c.

318 {
319  bool found;
320 
322  ShmemInitStruct("XLogPrefetchStats",
323  sizeof(XLogPrefetchStats),
324  &found);
325 
326  if (!found)
327  {
335  }
336 }
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:415
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396

References GetCurrentTimestamp(), XLogPrefetchStats::hit, pg_atomic_init_u64(), XLogPrefetchStats::prefetch, XLogPrefetchStats::reset_time, SharedStats, ShmemInitStruct(), XLogPrefetchStats::skip_fpw, XLogPrefetchStats::skip_init, XLogPrefetchStats::skip_new, and XLogPrefetchStats::skip_rep.

Referenced by CreateSharedMemoryAndSemaphores().

◆ XLogPrefetchShmemSize()

size_t XLogPrefetchShmemSize ( void  )

Definition at line 296 of file xlogprefetcher.c.

297 {
298  return sizeof(XLogPrefetchStats);
299 }
struct XLogPrefetchStats XLogPrefetchStats

Referenced by CalculateShmemSize().

Variable Documentation

◆ recovery_prefetch

int recovery_prefetch = RECOVERY_PREFETCH_TRY

Definition at line 72 of file xlogprefetcher.c.

Referenced by assign_recovery_prefetch().

◆ SharedStats

◆ XLogPrefetchReconfigureCount

int XLogPrefetchReconfigureCount = 0
static