PostgreSQL Source Code git master
Loading...
Searching...
No Matches
xlogprefetcher.c File Reference
#include "postgres.h"
#include "access/xlogprefetcher.h"
#include "access/xlogreader.h"
#include "catalog/pg_control.h"
#include "catalog/storage_xlog.h"
#include "commands/dbcommands_xlog.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "port/atomics.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/shmem.h"
#include "storage/smgr.h"
#include "storage/subsystems.h"
#include "utils/fmgrprotos.h"
#include "utils/guc_hooks.h"
#include "utils/hsearch.h"
#include "utils/timestamp.h"
#include "utils/tuplestore.h"
Include dependency graph for xlogprefetcher.c:

Go to the source code of this file.

Data Structures

struct  LsnReadQueue
 
struct  XLogPrefetcher
 
struct  XLogPrefetcherFilter
 
struct  XLogPrefetchStats
 

Macros

#define XLOGPREFETCHER_STATS_DISTANCE   BLCKSZ
 
#define XLOGPREFETCHER_SEQ_WINDOW_SIZE   4
 
#define XLOGPREFETCHER_DISTANCE_MULTIPLIER   4
 
#define RecoveryPrefetchEnabled()   false
 
#define PG_STAT_GET_RECOVERY_PREFETCH_COLS   10
 

Typedefs

typedef LsnReadQueueNextStatus(* LsnReadQueueNextFun) (uintptr_t lrq_private, XLogRecPtr *lsn)
 
typedef struct LsnReadQueue LsnReadQueue
 
typedef struct XLogPrefetcherFilter XLogPrefetcherFilter
 
typedef struct XLogPrefetchStats XLogPrefetchStats
 

Enumerations

enum  LsnReadQueueNextStatus { LRQ_NEXT_NO_IO , LRQ_NEXT_IO , LRQ_NEXT_AGAIN }
 

Functions

static void XLogPrefetcherAddFilter (XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno, XLogRecPtr lsn)
 
static bool XLogPrefetcherIsFiltered (XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno)
 
static void XLogPrefetcherCompleteFilters (XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
 
static LsnReadQueueNextStatus XLogPrefetcherNextBlock (uintptr_t pgsr_private, XLogRecPtr *lsn)
 
static void XLogPrefetchShmemRequest (void *arg)
 
static void XLogPrefetchShmemInit (void *arg)
 
static LsnReadQueuelrq_alloc (uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)
 
static void lrq_free (LsnReadQueue *lrq)
 
static uint32 lrq_inflight (LsnReadQueue *lrq)
 
static uint32 lrq_completed (LsnReadQueue *lrq)
 
static void lrq_prefetch (LsnReadQueue *lrq)
 
static void lrq_complete_lsn (LsnReadQueue *lrq, XLogRecPtr lsn)
 
void XLogPrefetchResetStats (void)
 
void XLogPrefetchReconfigure (void)
 
static void XLogPrefetchIncrement (pg_atomic_uint64 *counter)
 
XLogPrefetcherXLogPrefetcherAllocate (XLogReaderState *reader)
 
void XLogPrefetcherFree (XLogPrefetcher *prefetcher)
 
XLogReaderStateXLogPrefetcherGetReader (XLogPrefetcher *prefetcher)
 
void XLogPrefetcherComputeStats (XLogPrefetcher *prefetcher)
 
Datum pg_stat_get_recovery_prefetch (PG_FUNCTION_ARGS)
 
void XLogPrefetcherBeginRead (XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
 
XLogRecordXLogPrefetcherReadRecord (XLogPrefetcher *prefetcher, char **errmsg)
 
bool check_recovery_prefetch (int *new_value, void **extra, GucSource source)
 
void assign_recovery_prefetch (int new_value, void *extra)
 

Variables

int recovery_prefetch = RECOVERY_PREFETCH_TRY
 
static int XLogPrefetchReconfigureCount = 0
 
static XLogPrefetchStatsSharedStats
 
const ShmemCallbacks XLogPrefetchShmemCallbacks
 

Macro Definition Documentation

◆ PG_STAT_GET_RECOVERY_PREFETCH_COLS

#define PG_STAT_GET_RECOVERY_PREFETCH_COLS   10

◆ RecoveryPrefetchEnabled

#define RecoveryPrefetchEnabled ( )    false

Definition at line 78 of file xlogprefetcher.c.

◆ XLOGPREFETCHER_DISTANCE_MULTIPLIER

#define XLOGPREFETCHER_DISTANCE_MULTIPLIER   4

Definition at line 65 of file xlogprefetcher.c.

◆ XLOGPREFETCHER_SEQ_WINDOW_SIZE

#define XLOGPREFETCHER_SEQ_WINDOW_SIZE   4

Definition at line 59 of file xlogprefetcher.c.

◆ XLOGPREFETCHER_STATS_DISTANCE

#define XLOGPREFETCHER_STATS_DISTANCE   BLCKSZ

Definition at line 53 of file xlogprefetcher.c.

Typedef Documentation

◆ LsnReadQueue

◆ LsnReadQueueNextFun

typedef LsnReadQueueNextStatus(* LsnReadQueueNextFun) (uintptr_t lrq_private, XLogRecPtr *lsn)

Definition at line 97 of file xlogprefetcher.c.

◆ XLogPrefetcherFilter

◆ XLogPrefetchStats

Enumeration Type Documentation

◆ LsnReadQueueNextStatus

Enumerator
LRQ_NEXT_NO_IO 
LRQ_NEXT_IO 
LRQ_NEXT_AGAIN 

Definition at line 86 of file xlogprefetcher.c.

87{
LsnReadQueueNextStatus
@ LRQ_NEXT_NO_IO
@ LRQ_NEXT_IO
@ LRQ_NEXT_AGAIN

Function Documentation

◆ assign_recovery_prefetch()

void assign_recovery_prefetch ( int  new_value,
void extra 
)

Definition at line 1100 of file xlogprefetcher.c.

1101{
1102 /* Reconfigure prefetching, because a setting it depends on changed. */
1104 if (AmStartupProcess())
1106}
#define AmStartupProcess()
Definition miscadmin.h:405
static int fb(int x)
int recovery_prefetch
void XLogPrefetchReconfigure(void)

References AmStartupProcess, fb(), recovery_prefetch, and XLogPrefetchReconfigure().

◆ check_recovery_prefetch()

bool check_recovery_prefetch ( int new_value,
void **  extra,
GucSource  source 
)

Definition at line 1086 of file xlogprefetcher.c.

1087{
1088#ifndef USE_PREFETCH
1090 {
1091 GUC_check_errdetail("\"recovery_prefetch\" is not supported on platforms that lack support for issuing read-ahead advice.");
1092 return false;
1093 }
1094#endif
1095
1096 return true;
1097}
#define GUC_check_errdetail
Definition guc.h:507
@ RECOVERY_PREFETCH_ON

References fb(), GUC_check_errdetail, and RECOVERY_PREFETCH_ON.

◆ lrq_alloc()

static LsnReadQueue * lrq_alloc ( uint32  max_distance,
uint32  max_inflight,
uintptr_t  lrq_private,
LsnReadQueueNextFun  next 
)
inlinestatic

Definition at line 213 of file xlogprefetcher.c.

217{
219 uint32 size;
220
221 Assert(max_distance >= max_inflight);
222
223 size = max_distance + 1; /* full ring buffer has a gap */
224 lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
225 lrq->lrq_private = lrq_private;
226 lrq->max_inflight = max_inflight;
227 lrq->size = size;
228 lrq->next = next;
229 lrq->head = 0;
230 lrq->tail = 0;
231 lrq->inflight = 0;
232 lrq->completed = 0;
233
234 return lrq;
235}
static int32 next
Definition blutils.c:225
#define Assert(condition)
Definition c.h:943
uint32_t uint32
Definition c.h:624
void * palloc(Size size)
Definition mcxt.c:1387

References Assert, fb(), next, and palloc().

Referenced by XLogPrefetcherReadRecord().

◆ lrq_complete_lsn()

static void lrq_complete_lsn ( LsnReadQueue lrq,
XLogRecPtr  lsn 
)
inlinestatic

Definition at line 283 of file xlogprefetcher.c.

284{
285 /*
286 * We know that LSNs before 'lsn' have been replayed, so we can now assume
287 * that any IOs that were started before then have finished.
288 */
289 while (lrq->tail != lrq->head &&
290 lrq->queue[lrq->tail].lsn < lsn)
291 {
292 if (lrq->queue[lrq->tail].io)
293 lrq->inflight--;
294 else
295 lrq->completed--;
296 lrq->tail++;
297 if (lrq->tail == lrq->size)
298 lrq->tail = 0;
299 }
302}
#define RecoveryPrefetchEnabled()
static void lrq_prefetch(LsnReadQueue *lrq)

References fb(), lrq_prefetch(), and RecoveryPrefetchEnabled.

Referenced by XLogPrefetcherReadRecord().

◆ lrq_completed()

static uint32 lrq_completed ( LsnReadQueue lrq)
inlinestatic

Definition at line 250 of file xlogprefetcher.c.

251{
252 return lrq->completed;
253}

References fb().

Referenced by XLogPrefetcherComputeStats(), and XLogPrefetcherReadRecord().

◆ lrq_free()

static void lrq_free ( LsnReadQueue lrq)
inlinestatic

Definition at line 238 of file xlogprefetcher.c.

239{
240 pfree(lrq);
241}
void pfree(void *pointer)
Definition mcxt.c:1616

References fb(), and pfree().

Referenced by XLogPrefetcherFree(), and XLogPrefetcherReadRecord().

◆ lrq_inflight()

static uint32 lrq_inflight ( LsnReadQueue lrq)
inlinestatic

Definition at line 244 of file xlogprefetcher.c.

245{
246 return lrq->inflight;
247}

References fb().

Referenced by XLogPrefetcherComputeStats(), and XLogPrefetcherReadRecord().

◆ lrq_prefetch()

static void lrq_prefetch ( LsnReadQueue lrq)
inlinestatic

Definition at line 256 of file xlogprefetcher.c.

257{
258 /* Try to start as many IOs as we can within our limits. */
259 while (lrq->inflight < lrq->max_inflight &&
260 lrq->inflight + lrq->completed < lrq->size - 1)
261 {
262 Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
263 switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
264 {
265 case LRQ_NEXT_AGAIN:
266 return;
267 case LRQ_NEXT_IO:
268 lrq->queue[lrq->head].io = true;
269 lrq->inflight++;
270 break;
271 case LRQ_NEXT_NO_IO:
272 lrq->queue[lrq->head].io = false;
273 lrq->completed++;
274 break;
275 }
276 lrq->head++;
277 if (lrq->head == lrq->size)
278 lrq->head = 0;
279 }
280}

References Assert, fb(), LRQ_NEXT_AGAIN, LRQ_NEXT_IO, and LRQ_NEXT_NO_IO.

Referenced by lrq_complete_lsn(), and XLogPrefetcherReadRecord().

◆ pg_stat_get_recovery_prefetch()

Datum pg_stat_get_recovery_prefetch ( PG_FUNCTION_ARGS  )

Definition at line 829 of file xlogprefetcher.c.

830{
831#define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
832 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
835
836 InitMaterializedSRF(fcinfo, 0);
837
838 for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
839 nulls[i] = false;
840
851 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
852
853 return (Datum) 0;
854}
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition atomics.h:467
static Datum values[MAXATTR]
Definition bootstrap.c:190
void InitMaterializedSRF(FunctionCallInfo fcinfo, uint32 flags)
Definition funcapi.c:76
int i
Definition isn.c:77
static Datum Int64GetDatum(int64 X)
Definition postgres.h:426
uint64_t Datum
Definition postgres.h:70
static Datum Int32GetDatum(int32 X)
Definition postgres.h:212
pg_atomic_uint64 skip_fpw
pg_atomic_uint64 skip_init
pg_atomic_uint64 reset_time
pg_atomic_uint64 hit
pg_atomic_uint64 prefetch
pg_atomic_uint64 skip_rep
pg_atomic_uint64 skip_new
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition tuplestore.c:785
static Datum TimestampTzGetDatum(TimestampTz X)
Definition timestamp.h:52
static XLogPrefetchStats * SharedStats
#define PG_STAT_GET_RECOVERY_PREFETCH_COLS

References XLogPrefetchStats::block_distance, fb(), XLogPrefetchStats::hit, i, InitMaterializedSRF(), Int32GetDatum(), Int64GetDatum(), XLogPrefetchStats::io_depth, pg_atomic_read_u64(), PG_STAT_GET_RECOVERY_PREFETCH_COLS, XLogPrefetchStats::prefetch, XLogPrefetchStats::reset_time, SharedStats, XLogPrefetchStats::skip_fpw, XLogPrefetchStats::skip_init, XLogPrefetchStats::skip_new, XLogPrefetchStats::skip_rep, TimestampTzGetDatum(), tuplestore_putvalues(), values, and XLogPrefetchStats::wal_distance.

◆ XLogPrefetcherAddFilter()

static void XLogPrefetcherAddFilter ( XLogPrefetcher prefetcher,
RelFileLocator  rlocator,
BlockNumber  blockno,
XLogRecPtr  lsn 
)
inlinestatic

Definition at line 861 of file xlogprefetcher.c.

863{
864 XLogPrefetcherFilter *filter;
865 bool found;
866
867 filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
868 if (!found)
869 {
870 /*
871 * Don't allow any prefetching of this block or higher until replayed.
872 */
873 filter->filter_until_replayed = lsn;
874 filter->filter_from_block = blockno;
875 dlist_push_head(&prefetcher->filter_queue, &filter->link);
876 }
877 else
878 {
879 /*
880 * We were already filtering this rlocator. Extend the filter's
881 * lifetime to cover this WAL record, but leave the lower of the block
882 * numbers there because we don't want to have to track individual
883 * blocks.
884 */
885 filter->filter_until_replayed = lsn;
886 dlist_delete(&filter->link);
887 dlist_push_head(&prefetcher->filter_queue, &filter->link);
888 filter->filter_from_block = Min(filter->filter_from_block, blockno);
889 }
890}
#define Min(x, y)
Definition c.h:1091
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:889
@ HASH_ENTER
Definition hsearch.h:109
static void dlist_delete(dlist_node *node)
Definition ilist.h:405
static void dlist_push_head(dlist_head *head, dlist_node *node)
Definition ilist.h:347
XLogRecPtr filter_until_replayed
BlockNumber filter_from_block

References dlist_delete(), dlist_push_head(), fb(), XLogPrefetcherFilter::filter_from_block, XLogPrefetcherFilter::filter_until_replayed, HASH_ENTER, hash_search(), XLogPrefetcherFilter::link, and Min.

Referenced by XLogPrefetcherNextBlock().

◆ XLogPrefetcherAllocate()

XLogPrefetcher * XLogPrefetcherAllocate ( XLogReaderState reader)

Definition at line 367 of file xlogprefetcher.c.

368{
370 HASHCTL ctl;
371
373 prefetcher->reader = reader;
374
375 ctl.keysize = sizeof(RelFileLocator);
376 ctl.entrysize = sizeof(XLogPrefetcherFilter);
377 prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
379 dlist_init(&prefetcher->filter_queue);
380
384
385 /* First usage will cause streaming_read to be allocated. */
386 prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1;
387
388 return prefetcher;
389}
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:360
#define palloc0_object(type)
Definition fe_memutils.h:75
#define HASH_ELEM
Definition hsearch.h:90
#define HASH_BLOBS
Definition hsearch.h:92
static void dlist_init(dlist_head *head)
Definition ilist.h:314
tree ctl
Definition radixtree.h:1838
static int XLogPrefetchReconfigureCount

References XLogPrefetchStats::block_distance, ctl, dlist_init(), fb(), HASH_BLOBS, hash_create(), HASH_ELEM, XLogPrefetchStats::io_depth, palloc0_object, SharedStats, XLogPrefetchStats::wal_distance, and XLogPrefetchReconfigureCount.

Referenced by InitWalRecovery().

◆ XLogPrefetcherBeginRead()

void XLogPrefetcherBeginRead ( XLogPrefetcher prefetcher,
XLogRecPtr  recPtr 
)

Definition at line 967 of file xlogprefetcher.c.

968{
969 /* This will forget about any in-flight IO. */
970 prefetcher->reconfigure_count--;
971
972 /* Book-keeping to avoid readahead on first read. */
973 prefetcher->begin_ptr = recPtr;
974
975 prefetcher->no_readahead_until = InvalidXLogRecPtr;
976
977 /* This will forget about any queued up records in the decoder. */
979}
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition xlogreader.c:233

References fb(), InvalidXLogRecPtr, and XLogBeginRead().

Referenced by FinishWalRecovery(), InitWalRecovery(), PerformWalRecovery(), and ReadCheckpointRecord().

◆ XLogPrefetcherCompleteFilters()

static void XLogPrefetcherCompleteFilters ( XLogPrefetcher prefetcher,
XLogRecPtr  replaying_lsn 
)
inlinestatic

Definition at line 899 of file xlogprefetcher.c.

900{
901 while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
902 {
904 link,
905 &prefetcher->filter_queue);
906
908 break;
909
910 dlist_delete(&filter->link);
911 hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
912 }
913}
#define unlikely(x)
Definition c.h:438
@ HASH_REMOVE
Definition hsearch.h:110
#define dlist_tail_element(type, membername, lhead)
Definition ilist.h:612
static bool dlist_is_empty(const dlist_head *head)
Definition ilist.h:336

References dlist_delete(), dlist_is_empty(), dlist_tail_element, fb(), XLogPrefetcherFilter::filter_until_replayed, HASH_REMOVE, hash_search(), XLogPrefetcherFilter::link, and unlikely.

Referenced by XLogPrefetcherReadRecord().

◆ XLogPrefetcherComputeStats()

void XLogPrefetcherComputeStats ( XLogPrefetcher prefetcher)

Definition at line 415 of file xlogprefetcher.c.

416{
417 uint32 io_depth;
418 uint32 completed;
419 int64 wal_distance;
420
421
422 /* How far ahead of replay are we now? */
423 if (prefetcher->reader->decode_queue_tail)
424 {
425 wal_distance =
426 prefetcher->reader->decode_queue_tail->lsn -
427 prefetcher->reader->decode_queue_head->lsn;
428 }
429 else
430 {
431 wal_distance = 0;
432 }
433
434 /* How many IOs are currently in flight and completed? */
435 io_depth = lrq_inflight(prefetcher->streaming_read);
436 completed = lrq_completed(prefetcher->streaming_read);
437
438 /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
439 SharedStats->io_depth = io_depth;
440 SharedStats->block_distance = io_depth + completed;
441 SharedStats->wal_distance = wal_distance;
442
443 prefetcher->next_stats_shm_lsn =
444 prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE;
445}
int64_t int64
Definition c.h:621
static uint32 lrq_completed(LsnReadQueue *lrq)
static uint32 lrq_inflight(LsnReadQueue *lrq)
#define XLOGPREFETCHER_STATS_DISTANCE

References XLogPrefetchStats::block_distance, XLogReaderState::decode_queue_tail, fb(), XLogPrefetchStats::io_depth, lrq_completed(), lrq_inflight(), DecodedXLogRecord::lsn, SharedStats, XLogPrefetchStats::wal_distance, and XLOGPREFETCHER_STATS_DISTANCE.

Referenced by ShutdownWalRecovery(), WaitForWALToBecomeAvailable(), and XLogPrefetcherReadRecord().

◆ XLogPrefetcherFree()

void XLogPrefetcherFree ( XLogPrefetcher prefetcher)

Definition at line 395 of file xlogprefetcher.c.

396{
397 lrq_free(prefetcher->streaming_read);
398 hash_destroy(prefetcher->filter_table);
400}
void hash_destroy(HTAB *hashp)
Definition dynahash.c:802
static void lrq_free(LsnReadQueue *lrq)

References fb(), hash_destroy(), lrq_free(), and pfree().

Referenced by ShutdownWalRecovery().

◆ XLogPrefetcherGetReader()

XLogReaderState * XLogPrefetcherGetReader ( XLogPrefetcher prefetcher)

Definition at line 406 of file xlogprefetcher.c.

407{
408 return prefetcher->reader;
409}

References fb().

Referenced by ReadRecord().

◆ XLogPrefetcherIsFiltered()

static bool XLogPrefetcherIsFiltered ( XLogPrefetcher prefetcher,
RelFileLocator  rlocator,
BlockNumber  blockno 
)
inlinestatic

Definition at line 919 of file xlogprefetcher.c.

921{
922 /*
923 * Test for empty queue first, because we expect it to be empty most of
924 * the time and we can avoid the hash table lookup in that case.
925 */
926 if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
927 {
928 XLogPrefetcherFilter *filter;
929
930 /* See if the block range is filtered. */
931 filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
932 if (filter && filter->filter_from_block <= blockno)
933 {
934#ifdef XLOGPREFETCHER_DEBUG_LEVEL
936 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (blocks >= %u filtered)",
937 rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
939 filter->filter_from_block);
940#endif
941 return true;
942 }
943
944 /* See if the whole database is filtered. */
946 rlocator.spcOid = InvalidOid;
947 filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
948 if (filter)
949 {
950#ifdef XLOGPREFETCHER_DEBUG_LEVEL
952 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (whole database)",
953 rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
955#endif
956 return true;
957 }
958 }
959
960 return false;
961}
#define elog(elevel,...)
Definition elog.h:228
@ HASH_FIND
Definition hsearch.h:108
#define InvalidOid
#define InvalidRelFileNumber
Definition relpath.h:26
RelFileNumber relNumber
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47

References RelFileLocator::dbOid, dlist_is_empty(), elog, fb(), XLogPrefetcherFilter::filter_from_block, XLogPrefetcherFilter::filter_until_replayed, HASH_FIND, hash_search(), InvalidOid, InvalidRelFileNumber, LSN_FORMAT_ARGS, RelFileLocator::relNumber, RelFileLocator::spcOid, and unlikely.

Referenced by XLogPrefetcherNextBlock().

◆ XLogPrefetcherNextBlock()

static LsnReadQueueNextStatus XLogPrefetcherNextBlock ( uintptr_t  pgsr_private,
XLogRecPtr lsn 
)
static

Definition at line 464 of file xlogprefetcher.c.

465{
467 XLogReaderState *reader = prefetcher->reader;
469
470 /*
471 * We keep track of the record and block we're up to between calls with
472 * prefetcher->record and prefetcher->next_block_id.
473 */
474 for (;;)
475 {
476 DecodedXLogRecord *record;
477
478 /* Try to read a new future record, if we don't already have one. */
479 if (prefetcher->record == NULL)
480 {
481 bool nonblocking;
482
483 /*
484 * If there are already records or an error queued up that could
485 * be replayed, we don't want to block here. Otherwise, it's OK
486 * to block waiting for more data: presumably the caller has
487 * nothing else to do.
488 */
489 nonblocking = XLogReaderHasQueuedRecordOrError(reader);
490
491 /* Readahead is disabled until we replay past a certain point. */
492 if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
493 return LRQ_NEXT_AGAIN;
494
495 record = XLogReadAhead(prefetcher->reader, nonblocking);
496 if (record == NULL)
497 {
498 /*
499 * We can't read any more, due to an error or lack of data in
500 * nonblocking mode. Don't try to read ahead again until
501 * we've replayed everything already decoded.
502 */
503 if (nonblocking && prefetcher->reader->decode_queue_tail)
504 prefetcher->no_readahead_until =
505 prefetcher->reader->decode_queue_tail->lsn;
506
507 return LRQ_NEXT_AGAIN;
508 }
509
510 /*
511 * If prefetching is disabled, we don't need to analyze the record
512 * or issue any prefetches. We just need to cause one record to
513 * be decoded.
514 */
516 {
517 *lsn = InvalidXLogRecPtr;
518 return LRQ_NEXT_NO_IO;
519 }
520
521 /* We have a new record to process. */
522 prefetcher->record = record;
523 prefetcher->next_block_id = 0;
524 }
525 else
526 {
527 /* Continue to process from last call, or last loop. */
528 record = prefetcher->record;
529 }
530
531 /*
532 * Check for operations that require us to filter out block ranges, or
533 * pause readahead completely.
534 */
536 {
537 uint8 rmid = record->header.xl_rmid;
538 uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK;
539
540 if (rmid == RM_XLOG_ID)
541 {
542 if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
543 record_type == XLOG_END_OF_RECOVERY)
544 {
545 /*
546 * These records might change the TLI. Avoid potential
547 * bugs if we were to allow "read TLI" and "replay TLI" to
548 * differ without more analysis.
549 */
550 prefetcher->no_readahead_until = record->lsn;
551
552#ifdef XLOGPREFETCHER_DEBUG_LEVEL
554 "suppressing all readahead until %X/%08X is replayed due to possible TLI change",
555 LSN_FORMAT_ARGS(record->lsn));
556#endif
557
558 /* Fall through so we move past this record. */
559 }
560 }
561 else if (rmid == RM_DBASE_ID)
562 {
563 /*
564 * When databases are created with the file-copy strategy,
565 * there are no WAL records to tell us about the creation of
566 * individual relations.
567 */
568 if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
569 {
572 RelFileLocator rlocator =
574
575 /*
576 * Don't try to prefetch anything in this database until
577 * it has been created, or we might confuse the blocks of
578 * different generations, if a database OID or
579 * relfilenumber is reused. It's also more efficient than
580 * discovering that relations don't exist on disk yet with
581 * ENOENT errors.
582 */
583 XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
584
585#ifdef XLOGPREFETCHER_DEBUG_LEVEL
587 "suppressing prefetch in database %u until %X/%08X is replayed due to raw file copy",
588 rlocator.dbOid,
589 LSN_FORMAT_ARGS(record->lsn));
590#endif
591 }
592 }
593 else if (rmid == RM_SMGR_ID)
594 {
595 if (record_type == XLOG_SMGR_CREATE)
596 {
598 record->main_data;
599
600 if (xlrec->forkNum == MAIN_FORKNUM)
601 {
602 /*
603 * Don't prefetch anything for this whole relation
604 * until it has been created. Otherwise we might
605 * confuse the blocks of different generations, if a
606 * relfilenumber is reused. This also avoids the need
607 * to discover the problem via extra syscalls that
608 * report ENOENT.
609 */
611 record->lsn);
612
613#ifdef XLOGPREFETCHER_DEBUG_LEVEL
615 "suppressing prefetch in relation %u/%u/%u until %X/%08X is replayed, which creates the relation",
616 xlrec->rlocator.spcOid,
617 xlrec->rlocator.dbOid,
618 xlrec->rlocator.relNumber,
619 LSN_FORMAT_ARGS(record->lsn));
620#endif
621 }
622 }
623 else if (record_type == XLOG_SMGR_TRUNCATE)
624 {
626 record->main_data;
627
628 /*
629 * Don't consider prefetching anything in the truncated
630 * range until the truncation has been performed.
631 */
633 xlrec->blkno,
634 record->lsn);
635
636#ifdef XLOGPREFETCHER_DEBUG_LEVEL
638 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, which truncates the relation",
639 xlrec->rlocator.spcOid,
640 xlrec->rlocator.dbOid,
641 xlrec->rlocator.relNumber,
642 xlrec->blkno,
643 LSN_FORMAT_ARGS(record->lsn));
644#endif
645 }
646 }
647 }
648
649 /* Scan the block references, starting where we left off last time. */
650 while (prefetcher->next_block_id <= record->max_block_id)
651 {
652 int block_id = prefetcher->next_block_id++;
653 DecodedBkpBlock *block = &record->blocks[block_id];
656
657 if (!block->in_use)
658 continue;
659
661
662 /*
663 * Record the LSN of this record. When it's replayed,
664 * LsnReadQueue will consider any IOs submitted for earlier LSNs
665 * to be finished.
666 */
667 *lsn = record->lsn;
668
669 /* We don't try to prefetch anything but the main fork for now. */
670 if (block->forknum != MAIN_FORKNUM)
671 {
672 return LRQ_NEXT_NO_IO;
673 }
674
675 /*
676 * If there is a full page image attached, we won't be reading the
677 * page, so don't bother trying to prefetch.
678 */
679 if (block->has_image)
680 {
682 return LRQ_NEXT_NO_IO;
683 }
684
685 /* There is no point in reading a page that will be zeroed. */
686 if (block->flags & BKPBLOCK_WILL_INIT)
687 {
689 return LRQ_NEXT_NO_IO;
690 }
691
692 /* Should we skip prefetching this block due to a filter? */
694 {
696 return LRQ_NEXT_NO_IO;
697 }
698
699 /* There is no point in repeatedly prefetching the same block. */
700 for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
701 {
702 if (block->blkno == prefetcher->recent_block[i] &&
703 RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
704 {
705 /*
706 * XXX If we also remembered where it was, we could set
707 * recent_buffer so that recovery could skip smgropen()
708 * and a buffer table lookup.
709 */
711 return LRQ_NEXT_NO_IO;
712 }
713 }
714 prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
715 prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
716 prefetcher->recent_idx =
717 (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
718
719 /*
720 * We could try to have a fast path for repeated references to the
721 * same relation (with some scheme to handle invalidations
722 * safely), but for now we'll call smgropen() every time.
723 */
725
726 /*
727 * If the relation file doesn't exist on disk, for example because
728 * we're replaying after a crash and the file will be created and
729 * then unlinked by WAL that hasn't been replayed yet, suppress
730 * further prefetching in the relation until this record is
731 * replayed.
732 */
734 {
735#ifdef XLOGPREFETCHER_DEBUG_LEVEL
737 "suppressing all prefetch in relation %u/%u/%u until %X/%08X is replayed, because the relation does not exist on disk",
738 reln->smgr_rlocator.locator.spcOid,
739 reln->smgr_rlocator.locator.dbOid,
740 reln->smgr_rlocator.locator.relNumber,
741 LSN_FORMAT_ARGS(record->lsn));
742#endif
744 record->lsn);
746 return LRQ_NEXT_NO_IO;
747 }
748
749 /*
750 * If the relation isn't big enough to contain the referenced
751 * block yet, suppress prefetching of this block and higher until
752 * this record is replayed.
753 */
754 if (block->blkno >= smgrnblocks(reln, block->forknum))
755 {
756#ifdef XLOGPREFETCHER_DEBUG_LEVEL
758 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, because the relation is too small",
759 reln->smgr_rlocator.locator.spcOid,
760 reln->smgr_rlocator.locator.dbOid,
761 reln->smgr_rlocator.locator.relNumber,
762 block->blkno,
763 LSN_FORMAT_ARGS(record->lsn));
764#endif
766 record->lsn);
768 return LRQ_NEXT_NO_IO;
769 }
770
771 /* Try to initiate prefetching. */
772 result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
773 if (BufferIsValid(result.recent_buffer))
774 {
775 /* Cache hit, nothing to do. */
777 block->prefetch_buffer = result.recent_buffer;
778 return LRQ_NEXT_NO_IO;
779 }
780 else if (result.initiated_io)
781 {
782 /* Cache miss, I/O (presumably) started. */
785 return LRQ_NEXT_IO;
786 }
787 else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
788 {
789 /*
790 * This shouldn't be possible, because we already determined
791 * that the relation exists on disk and is big enough.
792 * Something is wrong with the cache invalidation for
793 * smgrexists(), smgrnblocks(), or the file was unlinked or
794 * truncated beneath our feet?
795 */
796 elog(ERROR,
797 "could not prefetch relation %u/%u/%u block %u",
798 reln->smgr_rlocator.locator.spcOid,
799 reln->smgr_rlocator.locator.dbOid,
800 reln->smgr_rlocator.locator.relNumber,
801 block->blkno);
802 }
803 }
804
805 /*
806 * Several callsites need to be able to read exactly one record
807 * without any internal readahead. Examples: xlog.c reading
808 * checkpoint records with emode set to PANIC, which might otherwise
809 * cause XLogPageRead() to panic on some future page, and xlog.c
810 * determining where to start writing WAL next, which depends on the
811 * contents of the reader's internal buffer after reading one record.
812 * Therefore, don't even think about prefetching until the first
813 * record after XLogPrefetcherBeginRead() has been consumed.
814 */
815 if (prefetcher->reader->decode_queue_tail &&
816 prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
817 return LRQ_NEXT_AGAIN;
818
819 /* Advance to the next record. */
820 prefetcher->record = NULL;
821 }
823}
#define InvalidBuffer
Definition buf.h:25
PrefetchBufferResult PrefetchSharedBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, BlockNumber blockNum)
Definition bufmgr.c:697
static bool BufferIsValid(Buffer bufnum)
Definition bufmgr.h:419
uint8_t uint8
Definition c.h:622
#define pg_unreachable()
Definition c.h:367
uint32 result
#define XLOG_DBASE_CREATE_FILE_COPY
#define ERROR
Definition elog.h:40
int io_direct_flags
Definition fd.c:172
#define IO_DIRECT_DATA
Definition fd.h:54
#define XLOG_CHECKPOINT_SHUTDOWN
Definition pg_control.h:72
#define XLOG_END_OF_RECOVERY
Definition pg_control.h:81
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
#define RelFileLocatorEquals(locator1, locator2)
@ MAIN_FORKNUM
Definition relpath.h:58
BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum)
Definition smgr.c:819
SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend)
Definition smgr.c:240
bool smgrexists(SMgrRelation reln, ForkNumber forknum)
Definition smgr.c:462
#define XLOG_SMGR_CREATE
#define XLOG_SMGR_TRUNCATE
Buffer prefetch_buffer
Definition xlogreader.h:130
RelFileLocator rlocator
Definition xlogreader.h:125
BlockNumber blkno
Definition xlogreader.h:127
ForkNumber forknum
Definition xlogreader.h:126
XLogRecord header
Definition xlogreader.h:165
DecodedBkpBlock blocks[FLEXIBLE_ARRAY_MEMBER]
Definition xlogreader.h:171
XLogRecPtr ReadRecPtr
Definition xlogreader.h:205
uint8 xl_info
Definition xlogrecord.h:46
RmgrId xl_rmid
Definition xlogrecord.h:47
uint64 XLogRecPtr
Definition xlogdefs.h:21
static bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno)
static void XLogPrefetchIncrement(pg_atomic_uint64 *counter)
#define XLOGPREFETCHER_SEQ_WINDOW_SIZE
static void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno, XLogRecPtr lsn)
DecodedXLogRecord * XLogReadAhead(XLogReaderState *state, bool nonblocking)
Definition xlogreader.c:978
static bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
Definition xlogreader.h:324
#define BKPBLOCK_WILL_INIT
Definition xlogrecord.h:199

References Assert, BKPBLOCK_WILL_INIT, DecodedBkpBlock::blkno, DecodedXLogRecord::blocks, BufferIsValid(), RelFileLocator::dbOid, elog, ERROR, fb(), DecodedBkpBlock::flags, DecodedBkpBlock::forknum, DecodedBkpBlock::has_image, DecodedXLogRecord::header, XLogPrefetchStats::hit, i, DecodedBkpBlock::in_use, INVALID_PROC_NUMBER, InvalidBuffer, InvalidOid, InvalidRelFileNumber, InvalidXLogRecPtr, IO_DIRECT_DATA, io_direct_flags, LRQ_NEXT_AGAIN, LRQ_NEXT_IO, LRQ_NEXT_NO_IO, DecodedXLogRecord::lsn, LSN_FORMAT_ARGS, DecodedXLogRecord::main_data, MAIN_FORKNUM, DecodedXLogRecord::max_block_id, pg_unreachable, XLogPrefetchStats::prefetch, DecodedBkpBlock::prefetch_buffer, PrefetchSharedBuffer(), XLogReaderState::ReadRecPtr, RecoveryPrefetchEnabled, RelFileLocatorEquals, result, DecodedBkpBlock::rlocator, SharedStats, XLogPrefetchStats::skip_fpw, XLogPrefetchStats::skip_init, XLogPrefetchStats::skip_new, XLogPrefetchStats::skip_rep, smgrexists(), smgrnblocks(), smgropen(), XLogRecord::xl_info, XLogRecord::xl_rmid, XLOG_CHECKPOINT_SHUTDOWN, XLOG_DBASE_CREATE_FILE_COPY, XLOG_END_OF_RECOVERY, XLOG_SMGR_CREATE, XLOG_SMGR_TRUNCATE, XLOGPREFETCHER_SEQ_WINDOW_SIZE, XLogPrefetcherAddFilter(), XLogPrefetcherIsFiltered(), XLogPrefetchIncrement(), XLogReadAhead(), and XLogReaderHasQueuedRecordOrError().

Referenced by XLogPrefetcherReadRecord().

◆ XLogPrefetcherReadRecord()

XLogRecord * XLogPrefetcherReadRecord ( XLogPrefetcher prefetcher,
char **  errmsg 
)

Definition at line 986 of file xlogprefetcher.c.

987{
988 DecodedXLogRecord *record;
990
991 /*
992 * See if it's time to reset the prefetching machinery, because a relevant
993 * GUC was changed.
994 */
995 if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
996 {
998 uint32 max_inflight;
999
1000 if (prefetcher->streaming_read)
1001 lrq_free(prefetcher->streaming_read);
1002
1004 {
1006 max_inflight = maintenance_io_concurrency;
1008 }
1009 else
1010 {
1011 max_inflight = 1;
1012 max_distance = 1;
1013 }
1014
1015 prefetcher->streaming_read = lrq_alloc(max_distance,
1016 max_inflight,
1019
1020 prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
1021 }
1022
1023 /*
1024 * Release last returned record, if there is one, as it's now been
1025 * replayed.
1026 */
1028
1029 /*
1030 * Can we drop any filters yet? If we were waiting for a relation to be
1031 * created or extended, it is now OK to access blocks in the covered
1032 * range.
1033 */
1035
1036 /*
1037 * All IO initiated by earlier WAL is now completed. This might trigger
1038 * further prefetching.
1039 */
1040 lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1041
1042 /*
1043 * If there's nothing queued yet, then start prefetching to cause at least
1044 * one record to be queued.
1045 */
1047 {
1048 Assert(lrq_inflight(prefetcher->streaming_read) == 0);
1049 Assert(lrq_completed(prefetcher->streaming_read) == 0);
1050 lrq_prefetch(prefetcher->streaming_read);
1051 }
1052
1053 /* Read the next record. */
1054 record = XLogNextRecord(prefetcher->reader, errmsg);
1055 if (!record)
1056 return NULL;
1057
1058 /*
1059 * The record we just got is the "current" one, for the benefit of the
1060 * XLogRecXXX() macros.
1061 */
1062 Assert(record == prefetcher->reader->record);
1063
1064 /*
1065 * If maintenance_io_concurrency is set very low, we might have started
1066 * prefetching some but not all of the blocks referenced in the record
1067 * we're about to return. Forget about the rest of the blocks in this
1068 * record by dropping the prefetcher's reference to it.
1069 */
1070 if (record == prefetcher->record)
1071 prefetcher->record = NULL;
1072
1073 /*
1074 * See if it's time to compute some statistics, because enough WAL has
1075 * been processed.
1076 */
1077 if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
1079
1080 Assert(record == prefetcher->reader->record);
1081
1082 return &record->header;
1083}
int maintenance_io_concurrency
Definition bufmgr.c:207
static char * errmsg
void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
static void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
static LsnReadQueue * lrq_alloc(uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)
static void lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
#define XLOGPREFETCHER_DISTANCE_MULTIPLIER
DecodedXLogRecord * XLogNextRecord(XLogReaderState *state, char **errormsg)
Definition xlogreader.c:327
XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state)
Definition xlogreader.c:251

References Assert, errmsg, fb(), DecodedXLogRecord::header, lrq_alloc(), lrq_complete_lsn(), lrq_completed(), lrq_free(), lrq_inflight(), lrq_prefetch(), DecodedXLogRecord::lsn, maintenance_io_concurrency, RecoveryPrefetchEnabled, unlikely, XLogNextRecord(), XLOGPREFETCHER_DISTANCE_MULTIPLIER, XLogPrefetcherCompleteFilters(), XLogPrefetcherComputeStats(), XLogPrefetcherNextBlock(), XLogPrefetchReconfigureCount, XLogReaderHasQueuedRecordOrError(), and XLogReleasePreviousRecord().

Referenced by ReadRecord().

◆ XLogPrefetchIncrement()

static void XLogPrefetchIncrement ( pg_atomic_uint64 counter)
inlinestatic

Definition at line 356 of file xlogprefetcher.c.

357{
359 pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
360}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:485
bool IsUnderPostmaster
Definition globals.c:122

References AmStartupProcess, Assert, IsUnderPostmaster, pg_atomic_read_u64(), and pg_atomic_write_u64().

Referenced by XLogPrefetcherNextBlock().

◆ XLogPrefetchReconfigure()

void XLogPrefetchReconfigure ( void  )

◆ XLogPrefetchResetStats()

◆ XLogPrefetchShmemInit()

◆ XLogPrefetchShmemRequest()

static void XLogPrefetchShmemRequest ( void arg)
static

Definition at line 305 of file xlogprefetcher.c.

306{
307 ShmemRequestStruct(.name = "XLogPrefetchStats",
308 .size = sizeof(XLogPrefetchStats),
309 .ptr = (void **) &SharedStats,
310 );
311}
#define ShmemRequestStruct(...)
Definition shmem.h:176
const char * name

References name, SharedStats, and ShmemRequestStruct.

Variable Documentation

◆ recovery_prefetch

int recovery_prefetch = RECOVERY_PREFETCH_TRY

Definition at line 71 of file xlogprefetcher.c.

Referenced by assign_recovery_prefetch().

◆ SharedStats

◆ XLogPrefetchReconfigureCount

int XLogPrefetchReconfigureCount = 0
static

◆ XLogPrefetchShmemCallbacks

const ShmemCallbacks XLogPrefetchShmemCallbacks
Initial value:
= {
.request_fn = XLogPrefetchShmemRequest,
}
static void XLogPrefetchShmemRequest(void *arg)
static void XLogPrefetchShmemInit(void *arg)

Definition at line 207 of file xlogprefetcher.c.

207 {
208 .request_fn = XLogPrefetchShmemRequest,
209 .init_fn = XLogPrefetchShmemInit,
210};