63 #define BufHdrGetBlock(bufHdr) ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ))
64 #define BufferGetLSN(bufHdr) (PageGetLSN(BufHdrGetBlock(bufHdr)))
67 #define LocalBufHdrGetBlock(bufHdr) \
68 LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
71 #define BUF_WRITTEN 0x01
72 #define BUF_REUSABLE 0x02
74 #define RELS_BSEARCH_THRESHOLD 20
82 #define BUF_DROP_FULL_SCAN_THRESHOLD (uint64) (NBuffers / 32)
91 #define REFCOUNT_ARRAY_ENTRIES 8
362 free->refcount =
res->refcount;
438 #define BufferIsPinned(bufnum) \
440 !BufferIsValid(bufnum) ? \
443 BufferIsLocal(bufnum) ? \
444 (LocalRefCount[-(bufnum) - 1] > 0) \
446 (GetPrivateRefCount(bufnum) > 0) \
475 static int SyncOneBuffer(
int buf_id,
bool skip_recently_used,
611 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
612 errmsg(
"cannot access temporary tables of other sessions")));
638 bool have_private_ref;
648 int b = -recent_buffer - 1;
673 if (have_private_ref)
685 if (have_private_ref)
696 if (!have_private_ref)
768 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
769 errmsg(
"cannot access temporary tables of other sessions")));
777 forkNum, blockNum,
mode, strategy, &hit);
804 RELPERSISTENCE_UNLOGGED, forkNum, blockNum,
805 mode, strategy, &hit);
864 buffers, extended_by);
952 num_pages, extend_to,
953 buffers, &extended_by);
958 for (
int i = 0;
i < extended_by;
i++)
960 if (first_block +
i != extend_to - 1)
979 fork, extend_to - 1,
mode, strategy,
1023 forkNum, strategy, flags);
1029 TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
1060 bufHdr =
BufferAlloc(smgr, relpersistence, forkNum, blockNum,
1061 strategy, &found, io_context);
1082 TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
1119 MemSet((
char *) bufBlock, 0, BLCKSZ);
1124 smgrread(smgr, forkNum, blockNum, bufBlock);
1137 errmsg(
"invalid page in block %u of relation %s; zeroing out page",
1140 MemSet((
char *) bufBlock, 0, BLCKSZ);
1145 errmsg(
"invalid page in block %u of relation %s",
1186 TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
1227 LWLock *newPartitionLock;
1228 int existing_buf_id;
1243 if (existing_buf_id >= 0)
1305 if (existing_buf_id >= 0)
1333 valid =
PinBuffer(existing_buf_hdr, strategy);
1359 return existing_buf_hdr;
1365 victim_buf_state =
LockBufHdr(victim_buf_hdr);
1371 victim_buf_hdr->
tag = newTag;
1380 if (relpersistence == RELPERSISTENCE_PERMANENT || forkNum ==
INIT_FORKNUM)
1398 return victim_buf_hdr;
1423 LWLock *oldPartitionLock;
1476 elog(
ERROR,
"buffer is pinned in InvalidateBuffer");
1665 if (strategy != NULL)
1726 #ifdef USE_ASSERT_CHECKING
1754 int max_proportional_pins;
1756 if (*additional_pins <= 1)
1760 max_proportional_pins =
NBuffers / max_backends;
1770 if (max_proportional_pins < 0)
1771 max_proportional_pins = 1;
1773 if (*additional_pins > max_proportional_pins)
1774 *additional_pins = max_proportional_pins;
1793 TRACE_POSTGRESQL_BUFFER_EXTEND_START(fork,
1802 extend_by, extend_upto,
1803 buffers, &extend_by);
1806 extend_by, extend_upto,
1807 buffers, &extend_by);
1808 *extended_by = extend_by;
1810 TRACE_POSTGRESQL_BUFFER_EXTEND_DONE(fork,
1859 MemSet((
char *) buf_block, 0, BLCKSZ);
1899 uint32 orig_extend_by = extend_by;
1901 if (first_block > extend_upto)
1903 else if ((uint64) first_block + extend_by > extend_upto)
1904 extend_by = extend_upto - first_block;
1906 for (
uint32 i = extend_by;
i < orig_extend_by;
i++)
1922 *extended_by = extend_by;
1930 (
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1931 errmsg(
"cannot extend relation %s beyond %u blocks",
1941 for (
int i = 0;
i < extend_by;
i++)
1943 Buffer victim_buf = buffers[
i];
1972 if (existing_id >= 0)
1982 valid =
PinBuffer(existing_hdr, strategy);
1998 (
errmsg(
"unexpected data beyond EOF in block %u of relation %s",
2000 errhint(
"This has been seen to occur with buggy kernels; consider updating your system.")));
2030 victim_buf_hdr->
tag = tag;
2070 io_start, extend_by);
2073 for (
int i = 0;
i < extend_by;
i++)
2084 if (first_block +
i + 1 == extend_upto)
2096 *extended_by = extend_by;
2138 buf_state = old_buf_state;
2255 buf_state = old_buf_state;
2260 if (strategy == NULL)
2279 result = (buf_state &
BM_VALID) != 0;
2423 buf_state = old_buf_state;
2448 int wait_backend_pgprocno =
buf->wait_backend_pgprocno;
2461 #define ST_SORT sort_checkpoint_bufferids
2462 #define ST_ELEMENT_TYPE CkptSortItem
2463 #define ST_COMPARE(a, b) ckpt_buforder_comparator(a, b)
2464 #define ST_SCOPE static
2523 for (buf_id = 0; buf_id <
NBuffers; buf_id++)
2533 if ((buf_state & mask) == mask)
2554 if (num_to_scan == 0)
2559 TRACE_POSTGRESQL_BUFFER_SYNC_START(
NBuffers, num_to_scan);
2577 for (
i = 0;
i < num_to_scan;
i++)
2588 if (last_tsid ==
InvalidOid || last_tsid != cur_tsid)
2600 if (per_ts_stat == NULL)
2605 s = &per_ts_stat[num_spaces - 1];
2606 memset(s, 0,
sizeof(*s));
2621 last_tsid = cur_tsid;
2625 s = &per_ts_stat[num_spaces - 1];
2646 for (
i = 0;
i < num_spaces;
i++)
2694 TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
2743 TRACE_POSTGRESQL_BUFFER_SYNC_DONE(
NBuffers, num_written, num_to_scan);
2761 int strategy_buf_id;
2769 static bool saved_info_valid =
false;
2770 static int prev_strategy_buf_id;
2771 static uint32 prev_strategy_passes;
2772 static int next_to_clean;
2773 static uint32 next_passes;
2776 static float smoothed_alloc = 0;
2777 static float smoothed_density = 10.0;
2780 float smoothing_samples = 16;
2781 float scan_whole_pool_milliseconds = 120000.0;
2784 long strategy_delta;
2787 float scans_per_alloc;
2788 int reusable_buffers_est;
2789 int upcoming_alloc_est;
2790 int min_scan_buffers;
2795 int reusable_buffers;
2798 long new_strategy_delta;
2817 saved_info_valid =
false;
2829 if (saved_info_valid)
2831 int32 passes_delta = strategy_passes - prev_strategy_passes;
2833 strategy_delta = strategy_buf_id - prev_strategy_buf_id;
2834 strategy_delta += (long) passes_delta *
NBuffers;
2836 Assert(strategy_delta >= 0);
2838 if ((
int32) (next_passes - strategy_passes) > 0)
2841 bufs_to_lap = strategy_buf_id - next_to_clean;
2843 elog(
DEBUG2,
"bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
2844 next_passes, next_to_clean,
2845 strategy_passes, strategy_buf_id,
2846 strategy_delta, bufs_to_lap);
2849 else if (next_passes == strategy_passes &&
2850 next_to_clean >= strategy_buf_id)
2853 bufs_to_lap =
NBuffers - (next_to_clean - strategy_buf_id);
2855 elog(
DEBUG2,
"bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
2856 next_passes, next_to_clean,
2857 strategy_passes, strategy_buf_id,
2858 strategy_delta, bufs_to_lap);
2868 elog(
DEBUG2,
"bgwriter behind: bgw %u-%u strategy %u-%u delta=%ld",
2869 next_passes, next_to_clean,
2870 strategy_passes, strategy_buf_id,
2873 next_to_clean = strategy_buf_id;
2874 next_passes = strategy_passes;
2885 elog(
DEBUG2,
"bgwriter initializing: strategy %u-%u",
2886 strategy_passes, strategy_buf_id);
2889 next_to_clean = strategy_buf_id;
2890 next_passes = strategy_passes;
2895 prev_strategy_buf_id = strategy_buf_id;
2896 prev_strategy_passes = strategy_passes;
2897 saved_info_valid =
true;
2905 if (strategy_delta > 0 && recent_alloc > 0)
2907 scans_per_alloc = (float) strategy_delta / (
float) recent_alloc;
2908 smoothed_density += (scans_per_alloc - smoothed_density) /
2917 bufs_ahead =
NBuffers - bufs_to_lap;
2918 reusable_buffers_est = (float) bufs_ahead / smoothed_density;
2925 if (smoothed_alloc <= (
float) recent_alloc)
2926 smoothed_alloc = recent_alloc;
2928 smoothed_alloc += ((float) recent_alloc - smoothed_alloc) /
2942 if (upcoming_alloc_est == 0)
2957 if (upcoming_alloc_est < (min_scan_buffers + reusable_buffers_est))
2960 elog(
DEBUG2,
"bgwriter: alloc_est=%d too small, using min=%d + reusable_est=%d",
2961 upcoming_alloc_est, min_scan_buffers, reusable_buffers_est);
2963 upcoming_alloc_est = min_scan_buffers + reusable_buffers_est;
2976 num_to_scan = bufs_to_lap;
2978 reusable_buffers = reusable_buffers_est;
2981 while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
3009 elog(
DEBUG1,
"bgwriter: recent_alloc=%u smoothed=%.2f delta=%ld ahead=%d density=%.2f reusable_est=%d upcoming_est=%d scanned=%d wrote=%d reusable=%d",
3010 recent_alloc, smoothed_alloc, strategy_delta, bufs_ahead,
3011 smoothed_density, reusable_buffers_est, upcoming_alloc_est,
3012 bufs_to_lap - num_to_scan,
3014 reusable_buffers - reusable_buffers_est);
3025 new_strategy_delta = bufs_to_lap - num_to_scan;
3026 new_recent_alloc = reusable_buffers - reusable_buffers_est;
3027 if (new_strategy_delta > 0 && new_recent_alloc > 0)
3029 scans_per_alloc = (float) new_strategy_delta / (
float) new_recent_alloc;
3030 smoothed_density += (scans_per_alloc - smoothed_density) /
3034 elog(
DEBUG2,
"bgwriter: cleaner density alloc=%u scan=%ld density=%.2f new smoothed=%.2f",
3035 new_recent_alloc, new_strategy_delta,
3036 scans_per_alloc, smoothed_density);
3041 return (bufs_to_lap == 0 && recent_alloc == 0);
3086 else if (skip_recently_used)
3194 #ifdef USE_ASSERT_CHECKING
3195 int RefCountErrors = 0;
3224 Assert(RefCountErrors == 0);
3259 "buffer refcount leak: [%03d] "
3260 "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)",
3370 errcallback.
arg = (
void *)
buf;
3491 if (RELKIND_HAS_TABLE_AM(relation->
rd_rel->relkind))
3503 return (szbytes + (BLCKSZ - 1)) / BLCKSZ;
3505 else if (RELKIND_HAS_STORAGE(relation->
rd_rel->relkind))
3604 uint64 nBlocksToInvalidate = 0;
3613 for (
j = 0;
j < nforks;
j++)
3642 for (
i = 0;
i < nforks;
i++)
3654 nBlocksToInvalidate += (nForkBlock[
i] - firstDelBlock[
i]);
3664 for (
j = 0;
j < nforks;
j++)
3666 nForkBlock[
j], firstDelBlock[
j]);
3696 for (
j = 0;
j < nforks;
j++)
3726 uint64 nBlocksToInvalidate = 0;
3737 for (
i = 0;
i < nlocators;
i++)
3741 if (smgr_reln[
i]->smgr_rlocator.backend ==
MyBackendId)
3745 rels[n++] = smgr_reln[
i];
3769 for (
i = 0;
i < n && cached;
i++)
3786 nBlocksToInvalidate += block[
i][
j];
3796 for (
i = 0;
i < n;
i++)
3817 for (
i = 0;
i < n;
i++)
3818 locators[
i] = rels[
i]->smgr_rlocator.locator;
3847 for (
j = 0;
j < n;
j++)
3851 rlocator = &locators[
j];
3861 rlocator = bsearch((
const void *) &(locator),
3867 if (rlocator == NULL)
3897 for (curBlock = firstDelBlock; curBlock < nForkBlock; curBlock++)
3901 LWLock *bufPartitionLock;
3990 PrintBufferDescs(
void)
4001 "[%02d] (freeNext=%d, rel=%s, "
4002 "blockNum=%u, flags=0x%x, refcount=%u %d)",
4006 buf->tag.blockNum,
buf->flags,
4014 PrintPinnedBufs(
void)
4027 "[%02d] (freeNext=%d, rel=%s, "
4028 "blockNum=%u, flags=0x%x, refcount=%u %d)",
4032 buf->tag.blockNum,
buf->flags,
4082 errcallback.
arg = (
void *) bufHdr;
4168 for (
i = 0;
i < nrels;
i++)
4173 srels[
i].
srel = smgrs[
i];
4204 for (
j = 0;
j < nrels;
j++)
4208 srelent = &srels[
j];
4218 srelent = bsearch((
const void *) &(rlocator),
4224 if (srelent == NULL)
4291 memset(
buf.data, 0, BLCKSZ);
4300 for (blkno = 0; blkno < nblocks; blkno++)
4319 memcpy(dstPage, srcPage, BLCKSZ);
4352 char relpersistence;
4355 relpersistence = permanent ?
4356 RELPERSISTENCE_PERMANENT : RELPERSISTENCE_UNLOGGED;
4394 rlocator.
locator = src_rlocator;
4397 rlocator.
locator = dst_rlocator;
4579 bool dirtied =
false;
4580 bool delayChkptFlags =
false;
4632 delayChkptFlags =
true;
4664 if (delayChkptFlags)
4767 elog(
ERROR,
"incorrect local pin count: %d",
4773 elog(
ERROR,
"incorrect local pin count: %d",
4800 bool logged_recovery_conflict =
false;
4832 if (logged_recovery_conflict)
4850 elog(
ERROR,
"multiple backends attempting to wait for pincount 1");
4876 if (waitStart != 0 && !logged_recovery_conflict)
4884 waitStart,
now, NULL,
true);
4885 logged_recovery_conflict =
true;
5172 buf_state |= set_flag_bits;
5219 errmsg(
"could not write block %u of %s",
5221 errdetail(
"Multiple failures --- write error might be permanent.")));
5243 errcontext(
"writing block %u of relation %s",
5263 errcontext(
"writing block %u of relation %s",
5391 if (
a->tsId <
b->tsId)
5393 else if (
a->tsId >
b->tsId)
5396 if (
a->relNumber <
b->relNumber)
5398 else if (
a->relNumber >
b->relNumber)
5401 else if (
a->forkNum <
b->forkNum)
5403 else if (
a->forkNum >
b->forkNum)
5406 else if (
a->blockNum <
b->blockNum)
5408 else if (
a->blockNum >
b->blockNum)
5472 pending->
tag = *tag;
5484 #define ST_SORT sort_pending_writebacks
5485 #define ST_ELEMENT_TYPE PendingWriteback
5486 #define ST_COMPARE(a, b) buffertag_comparator(&a->tag, &b->tag)
5487 #define ST_SCOPE static
5539 for (ahead = 0;
i + ahead + 1 < wb_context->
nr_pending; ahead++)
5551 if (
cur->tag.blockNum ==
next->tag.blockNum)
5555 if (
cur->tag.blockNum + 1 !=
next->tag.blockNum)
5592 (
errcode(ERRCODE_SNAPSHOT_TOO_OLD),
5593 errmsg(
"snapshot too old")));
static bool pg_atomic_compare_exchange_u32(volatile pg_atomic_uint32 *ptr, uint32 *expected, uint32 newval)
static uint32 pg_atomic_fetch_or_u32(volatile pg_atomic_uint32 *ptr, uint32 or_)
static void pg_atomic_unlocked_write_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
void binaryheap_build(binaryheap *heap)
void binaryheap_add_unordered(binaryheap *heap, Datum d)
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Datum binaryheap_remove_first(binaryheap *heap)
void binaryheap_free(binaryheap *heap)
void binaryheap_replace_first(binaryheap *heap, Datum d)
Datum binaryheap_first(binaryheap *heap)
#define binaryheap_empty(h)
#define InvalidBlockNumber
static bool BlockNumberIsValid(BlockNumber blockNumber)
#define BufferIsLocal(buffer)
CkptSortItem * CkptBufferIds
WritebackContext BackendWritebackContext
#define BM_MAX_USAGE_COUNT
static void InitBufferTag(BufferTag *tag, const RelFileLocator *rlocator, ForkNumber forkNum, BlockNumber blockNum)
#define BUF_USAGECOUNT_MASK
static ForkNumber BufTagGetForkNum(const BufferTag *tag)
static BufferDesc * GetLocalBufferDescriptor(uint32 id)
static BufferDesc * GetBufferDescriptor(uint32 id)
static void UnlockBufHdr(BufferDesc *desc, uint32 buf_state)
static bool BufferTagsEqual(const BufferTag *tag1, const BufferTag *tag2)
static RelFileNumber BufTagGetRelNumber(const BufferTag *tag)
static bool BufTagMatchesRelFileLocator(const BufferTag *tag, const RelFileLocator *rlocator)
static LWLock * BufMappingPartitionLock(uint32 hashcode)
#define BM_PIN_COUNT_WAITER
#define BUF_STATE_GET_USAGECOUNT(state)
#define BM_IO_IN_PROGRESS
static void ClearBufferTag(BufferTag *tag)
static ConditionVariable * BufferDescriptorGetIOCV(const BufferDesc *bdesc)
#define BUF_USAGECOUNT_ONE
#define BUF_STATE_GET_REFCOUNT(state)
static RelFileLocator BufTagGetRelFileLocator(const BufferTag *tag)
static Buffer BufferDescriptorGetBuffer(const BufferDesc *bdesc)
#define BM_CHECKPOINT_NEEDED
static LWLock * BufferDescriptorGetContentLock(const BufferDesc *bdesc)
void BufTableDelete(BufferTag *tagPtr, uint32 hashcode)
int BufTableLookup(BufferTag *tagPtr, uint32 hashcode)
uint32 BufTableHashCode(BufferTag *tagPtr)
int BufTableInsert(BufferTag *tagPtr, uint32 hashcode, int buf_id)
void CheckBufferIsPinnedOnce(Buffer buffer)
void FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels)
void IncrBufferRefCount(Buffer buffer)
void DropDatabaseBuffers(Oid dbid)
static int ckpt_buforder_comparator(const CkptSortItem *a, const CkptSortItem *b)
BlockNumber BufferGetBlockNumber(Buffer buffer)
static PrivateRefCountEntry * NewPrivateRefCountEntry(Buffer buffer)
void DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum, int nforks, BlockNumber *firstDelBlock)
Buffer ReleaseAndReadBuffer(Buffer buffer, Relation relation, BlockNumber blockNum)
PrefetchBufferResult PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
static uint32 PrivateRefCountClock
static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context)
static BlockNumber ExtendBufferedRelCommon(ExtendBufferedWhat eb, ForkNumber fork, BufferAccessStrategy strategy, uint32 flags, uint32 extend_by, BlockNumber extend_upto, Buffer *buffers, uint32 *extended_by)
static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
#define BUF_DROP_FULL_SCAN_THRESHOLD
static BlockNumber ExtendBufferedRelShared(ExtendBufferedWhat eb, ForkNumber fork, BufferAccessStrategy strategy, uint32 flags, uint32 extend_by, BlockNumber extend_upto, Buffer *buffers, uint32 *extended_by)
static void PinBuffer_Locked(BufferDesc *buf)
static uint32 WaitBufHdrUnlocked(BufferDesc *buf)
static int buffertag_comparator(const BufferTag *ba, const BufferTag *bb)
#define LocalBufHdrGetBlock(bufHdr)
bool IsBufferCleanupOK(Buffer buffer)
#define BufferGetLSN(bufHdr)
static BufferDesc * BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, BlockNumber blockNum, BufferAccessStrategy strategy, bool *foundPtr, IOContext io_context)
void AtEOXact_Buffers(bool isCommit)
static void CheckForBufferLeaks(void)
void CreateAndCopyRelationData(RelFileLocator src_rlocator, RelFileLocator dst_rlocator, bool permanent)
Buffer ExtendBufferedRel(ExtendBufferedWhat eb, ForkNumber forkNum, BufferAccessStrategy strategy, uint32 flags)
void DropRelationsAllBuffers(SMgrRelation *smgr_reln, int nlocators)
De