PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
localbuf.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * localbuf.c
4 * local buffer manager. Fast buffer manager for temporary tables,
5 * which never need to be WAL-logged or checkpointed, etc.
6 *
7 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994-5, Regents of the University of California
9 *
10 *
11 * IDENTIFICATION
12 * src/backend/storage/buffer/localbuf.c
13 *
14 *-------------------------------------------------------------------------
15 */
16#include "postgres.h"
17
18#include "access/parallel.h"
19#include "executor/instrument.h"
20#include "pgstat.h"
21#include "storage/aio.h"
23#include "storage/bufmgr.h"
24#include "storage/fd.h"
25#include "utils/guc_hooks.h"
26#include "utils/memdebug.h"
27#include "utils/memutils.h"
28#include "utils/resowner.h"
29
30
31/*#define LBDEBUG*/
32
33/* entry for buffer lookup hashtable */
34typedef struct
35{
36 BufferTag key; /* Tag of a disk page */
37 int id; /* Associated local buffer's index */
39
40/* Note: this macro only works on local buffers, not shared ones! */
41#define LocalBufHdrGetBlock(bufHdr) \
42 LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
43
44int NLocBuffer = 0; /* until buffers are initialized */
45
49
50static int nextFreeLocalBufId = 0;
51
52static HTAB *LocalBufHash = NULL;
53
54/* number of local buffers pinned at least once */
55static int NLocalPinnedBuffers = 0;
56
57
58static void InitLocalBuffers(void);
59static Block GetLocalBufferStorage(void);
60static Buffer GetLocalVictimBuffer(void);
61
62
63/*
64 * PrefetchLocalBuffer -
65 * initiate asynchronous read of a block of a relation
66 *
67 * Do PrefetchBuffer's work for temporary relations.
68 * No-op if prefetching isn't compiled in.
69 */
72 BlockNumber blockNum)
73{
74 PrefetchBufferResult result = {InvalidBuffer, false};
75 BufferTag newTag; /* identity of requested block */
76 LocalBufferLookupEnt *hresult;
77
78 InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
79
80 /* Initialize local buffers if first request in this session */
81 if (LocalBufHash == NULL)
83
84 /* See if the desired buffer already exists */
85 hresult = (LocalBufferLookupEnt *)
86 hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
87
88 if (hresult)
89 {
90 /* Yes, so nothing to do */
91 result.recent_buffer = -hresult->id - 1;
92 }
93 else
94 {
95#ifdef USE_PREFETCH
96 /* Not in buffers, so initiate prefetch */
97 if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
98 smgrprefetch(smgr, forkNum, blockNum, 1))
99 {
100 result.initiated_io = true;
101 }
102#endif /* USE_PREFETCH */
103 }
104
105 return result;
106}
107
108
109/*
110 * LocalBufferAlloc -
111 * Find or create a local buffer for the given page of the given relation.
112 *
113 * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
114 * any locking since this is all local. We support only default access
115 * strategy (hence, usage_count is always advanced).
116 */
119 bool *foundPtr)
120{
121 BufferTag newTag; /* identity of requested block */
122 LocalBufferLookupEnt *hresult;
123 BufferDesc *bufHdr;
124 Buffer victim_buffer;
125 int bufid;
126 bool found;
127
128 InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
129
130 /* Initialize local buffers if first request in this session */
131 if (LocalBufHash == NULL)
133
135
136 /* See if the desired buffer already exists */
137 hresult = (LocalBufferLookupEnt *)
138 hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
139
140 if (hresult)
141 {
142 bufid = hresult->id;
143 bufHdr = GetLocalBufferDescriptor(bufid);
144 Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
145
146 *foundPtr = PinLocalBuffer(bufHdr, true);
147 }
148 else
149 {
150 uint32 buf_state;
151
152 victim_buffer = GetLocalVictimBuffer();
153 bufid = -victim_buffer - 1;
154 bufHdr = GetLocalBufferDescriptor(bufid);
155
156 hresult = (LocalBufferLookupEnt *)
157 hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
158 if (found) /* shouldn't happen */
159 elog(ERROR, "local buffer hash table corrupted");
160 hresult->id = bufid;
161
162 /*
163 * it's all ours now.
164 */
165 bufHdr->tag = newTag;
166
167 buf_state = pg_atomic_read_u32(&bufHdr->state);
168 buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
169 buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
170 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
171
172 *foundPtr = false;
173 }
174
175 return bufHdr;
176}
177
178/*
179 * Like FlushBuffer(), just for local buffers.
180 */
181void
183{
184 instr_time io_start;
185 Page localpage = (char *) LocalBufHdrGetBlock(bufHdr);
186
188
189 /*
190 * Try to start an I/O operation. There currently are no reasons for
191 * StartLocalBufferIO to return false, so we raise an error in that case.
192 */
193 if (!StartLocalBufferIO(bufHdr, false, false))
194 elog(ERROR, "failed to start write IO on local buffer");
195
196 /* Find smgr relation for buffer */
197 if (reln == NULL)
198 reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag),
200
201 PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
202
204
205 /* And write... */
206 smgrwrite(reln,
207 BufTagGetForkNum(&bufHdr->tag),
208 bufHdr->tag.blockNum,
209 localpage,
210 false);
211
212 /* Temporary table I/O does not use Buffer Access Strategies */
214 IOOP_WRITE, io_start, 1, BLCKSZ);
215
216 /* Mark not-dirty */
217 TerminateLocalBufferIO(bufHdr, true, 0, false);
218
220}
221
222static Buffer
224{
225 int victim_bufid;
226 int trycounter;
227 BufferDesc *bufHdr;
228
230
231 /*
232 * Need to get a new buffer. We use a clock sweep algorithm (essentially
233 * the same as what freelist.c does now...)
234 */
235 trycounter = NLocBuffer;
236 for (;;)
237 {
238 victim_bufid = nextFreeLocalBufId;
239
242
243 bufHdr = GetLocalBufferDescriptor(victim_bufid);
244
245 if (LocalRefCount[victim_bufid] == 0)
246 {
247 uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
248
249 if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
250 {
251 buf_state -= BUF_USAGECOUNT_ONE;
252 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
253 trycounter = NLocBuffer;
254 }
255 else if (BUF_STATE_GET_REFCOUNT(buf_state) > 0)
256 {
257 /*
258 * This can be reached if the backend initiated AIO for this
259 * buffer and then errored out.
260 */
261 }
262 else
263 {
264 /* Found a usable buffer */
265 PinLocalBuffer(bufHdr, false);
266 break;
267 }
268 }
269 else if (--trycounter == 0)
271 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
272 errmsg("no empty local buffer available")));
273 }
274
275 /*
276 * lazy memory allocation: allocate space on first use of a buffer.
277 */
278 if (LocalBufHdrGetBlock(bufHdr) == NULL)
279 {
280 /* Set pointer for use by BufferGetBlock() macro */
282 }
283
284 /*
285 * this buffer is not referenced but it might still be dirty. if that's
286 * the case, write it out before reusing it!
287 */
288 if (pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY)
289 FlushLocalBuffer(bufHdr, NULL);
290
291 /*
292 * Remove the victim buffer from the hashtable and mark as invalid.
293 */
294 if (pg_atomic_read_u32(&bufHdr->state) & BM_TAG_VALID)
295 {
296 InvalidateLocalBuffer(bufHdr, false);
297
299 }
300
301 return BufferDescriptorGetBuffer(bufHdr);
302}
303
304/* see GetPinLimit() */
305uint32
307{
308 /* Every backend has its own temporary buffers, and can pin them all. */
309 return num_temp_buffers;
310}
311
312/* see GetAdditionalPinLimit() */
313uint32
315{
318}
319
320/* see LimitAdditionalPins() */
321void
323{
324 uint32 max_pins;
325
326 if (*additional_pins <= 1)
327 return;
328
329 /*
330 * In contrast to LimitAdditionalPins() other backends don't play a role
331 * here. We can allow up to NLocBuffer pins in total, but it might not be
332 * initialized yet so read num_temp_buffers.
333 */
335
336 if (*additional_pins >= max_pins)
337 *additional_pins = max_pins;
338}
339
340/*
341 * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
342 * temporary buffers.
343 */
346 ForkNumber fork,
347 uint32 flags,
348 uint32 extend_by,
349 BlockNumber extend_upto,
350 Buffer *buffers,
351 uint32 *extended_by)
352{
353 BlockNumber first_block;
354 instr_time io_start;
355
356 /* Initialize local buffers if first request in this session */
357 if (LocalBufHash == NULL)
359
360 LimitAdditionalLocalPins(&extend_by);
361
362 for (uint32 i = 0; i < extend_by; i++)
363 {
364 BufferDesc *buf_hdr;
365 Block buf_block;
366
367 buffers[i] = GetLocalVictimBuffer();
368 buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
369 buf_block = LocalBufHdrGetBlock(buf_hdr);
370
371 /* new buffers are zero-filled */
372 MemSet(buf_block, 0, BLCKSZ);
373 }
374
375 first_block = smgrnblocks(bmr.smgr, fork);
376
377 if (extend_upto != InvalidBlockNumber)
378 {
379 /*
380 * In contrast to shared relations, nothing could change the relation
381 * size concurrently. Thus we shouldn't end up finding that we don't
382 * need to do anything.
383 */
384 Assert(first_block <= extend_upto);
385
386 Assert((uint64) first_block + extend_by <= extend_upto);
387 }
388
389 /* Fail if relation is already at maximum possible length */
390 if ((uint64) first_block + extend_by >= MaxBlockNumber)
392 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
393 errmsg("cannot extend relation %s beyond %u blocks",
394 relpath(bmr.smgr->smgr_rlocator, fork).str,
396
397 for (uint32 i = 0; i < extend_by; i++)
398 {
399 int victim_buf_id;
400 BufferDesc *victim_buf_hdr;
401 BufferTag tag;
402 LocalBufferLookupEnt *hresult;
403 bool found;
404
405 victim_buf_id = -buffers[i] - 1;
406 victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
407
408 /* in case we need to pin an existing buffer below */
410
411 InitBufferTag(&tag, &bmr.smgr->smgr_rlocator.locator, fork, first_block + i);
412
413 hresult = (LocalBufferLookupEnt *)
414 hash_search(LocalBufHash, &tag, HASH_ENTER, &found);
415 if (found)
416 {
417 BufferDesc *existing_hdr;
418 uint32 buf_state;
419
421
422 existing_hdr = GetLocalBufferDescriptor(hresult->id);
423 PinLocalBuffer(existing_hdr, false);
424 buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
425
426 /*
427 * Clear the BM_VALID bit, do StartLocalBufferIO() and proceed.
428 */
429 buf_state = pg_atomic_read_u32(&existing_hdr->state);
430 Assert(buf_state & BM_TAG_VALID);
431 Assert(!(buf_state & BM_DIRTY));
432 buf_state &= ~BM_VALID;
433 pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
434
435 /* no need to loop for local buffers */
436 StartLocalBufferIO(existing_hdr, true, false);
437 }
438 else
439 {
440 uint32 buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
441
442 Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
443
444 victim_buf_hdr->tag = tag;
445
446 buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
447
448 pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
449
450 hresult->id = victim_buf_id;
451
452 StartLocalBufferIO(victim_buf_hdr, true, false);
453 }
454 }
455
457
458 /* actually extend relation */
459 smgrzeroextend(bmr.smgr, fork, first_block, extend_by, false);
460
462 io_start, 1, extend_by * BLCKSZ);
463
464 for (uint32 i = 0; i < extend_by; i++)
465 {
466 Buffer buf = buffers[i];
467 BufferDesc *buf_hdr;
468 uint32 buf_state;
469
470 buf_hdr = GetLocalBufferDescriptor(-buf - 1);
471
472 buf_state = pg_atomic_read_u32(&buf_hdr->state);
473 buf_state |= BM_VALID;
474 pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
475 }
476
477 *extended_by = extend_by;
478
480
481 return first_block;
482}
483
484/*
485 * MarkLocalBufferDirty -
486 * mark a local buffer dirty
487 */
488void
490{
491 int bufid;
492 BufferDesc *bufHdr;
493 uint32 buf_state;
494
495 Assert(BufferIsLocal(buffer));
496
497#ifdef LBDEBUG
498 fprintf(stderr, "LB DIRTY %d\n", buffer);
499#endif
500
501 bufid = -buffer - 1;
502
503 Assert(LocalRefCount[bufid] > 0);
504
505 bufHdr = GetLocalBufferDescriptor(bufid);
506
507 buf_state = pg_atomic_read_u32(&bufHdr->state);
508
509 if (!(buf_state & BM_DIRTY))
511
512 buf_state |= BM_DIRTY;
513
514 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
515}
516
517/*
518 * Like StartBufferIO, but for local buffers
519 */
520bool
521StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
522{
523 uint32 buf_state;
524
525 /*
526 * With AIO the buffer could have IO in progress, e.g. when there are two
527 * scans of the same relation. Either wait for the other IO or return
528 * false.
529 */
530 if (pgaio_wref_valid(&bufHdr->io_wref))
531 {
532 PgAioWaitRef iow = bufHdr->io_wref;
533
534 if (nowait)
535 return false;
536
537 pgaio_wref_wait(&iow);
538 }
539
540 /* Once we get here, there is definitely no I/O active on this buffer */
541
542 /* Check if someone else already did the I/O */
543 buf_state = pg_atomic_read_u32(&bufHdr->state);
544 if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
545 {
546 return false;
547 }
548
549 /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
550
551 /* local buffers don't track IO using resowners */
552
553 return true;
554}
555
556/*
557 * Like TerminateBufferIO, but for local buffers
558 */
559void
560TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits,
561 bool release_aio)
562{
563 /* Only need to adjust flags */
564 uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
565
566 /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
567
568 /* Clear earlier errors, if this IO failed, it'll be marked again */
569 buf_state &= ~BM_IO_ERROR;
570
571 if (clear_dirty)
572 buf_state &= ~BM_DIRTY;
573
574 if (release_aio)
575 {
576 /* release pin held by IO subsystem, see also buffer_stage_common() */
577 Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
578 buf_state -= BUF_REFCOUNT_ONE;
579 pgaio_wref_clear(&bufHdr->io_wref);
580 }
581
582 buf_state |= set_flag_bits;
583 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
584
585 /* local buffers don't track IO using resowners */
586
587 /* local buffers don't use the IO CV, as no other process can see buffer */
588
589 /* local buffers don't use BM_PIN_COUNT_WAITER, so no need to wake */
590}
591
592/*
593 * InvalidateLocalBuffer -- mark a local buffer invalid.
594 *
595 * If check_unreferenced is true, error out if the buffer is still
596 * pinned. Passing false is appropriate when calling InvalidateLocalBuffer()
597 * as part of changing the identity of a buffer, instead of just dropping the
598 * buffer.
599 *
600 * See also InvalidateBuffer().
601 */
602void
603InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
604{
605 Buffer buffer = BufferDescriptorGetBuffer(bufHdr);
606 int bufid = -buffer - 1;
607 uint32 buf_state;
608 LocalBufferLookupEnt *hresult;
609
610 /*
611 * It's possible that we started IO on this buffer before e.g. aborting
612 * the transaction that created a table. We need to wait for that IO to
613 * complete before removing / reusing the buffer.
614 */
615 if (pgaio_wref_valid(&bufHdr->io_wref))
616 {
617 PgAioWaitRef iow = bufHdr->io_wref;
618
619 pgaio_wref_wait(&iow);
620 Assert(!pgaio_wref_valid(&bufHdr->io_wref));
621 }
622
623 buf_state = pg_atomic_read_u32(&bufHdr->state);
624
625 /*
626 * We need to test not just LocalRefCount[bufid] but also the BufferDesc
627 * itself, as the latter is used to represent a pin by the AIO subsystem.
628 * This can happen if AIO is initiated and then the query errors out.
629 */
630 if (check_unreferenced &&
631 (LocalRefCount[bufid] != 0 || BUF_STATE_GET_REFCOUNT(buf_state) != 0))
632 elog(ERROR, "block %u of %s is still referenced (local %u)",
633 bufHdr->tag.blockNum,
636 BufTagGetForkNum(&bufHdr->tag)).str,
637 LocalRefCount[bufid]);
638
639 /* Remove entry from hashtable */
640 hresult = (LocalBufferLookupEnt *)
641 hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
642 if (!hresult) /* shouldn't happen */
643 elog(ERROR, "local buffer hash table corrupted");
644 /* Mark buffer invalid */
645 ClearBufferTag(&bufHdr->tag);
646 buf_state &= ~BUF_FLAG_MASK;
647 buf_state &= ~BUF_USAGECOUNT_MASK;
648 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
649}
650
651/*
652 * DropRelationLocalBuffers
653 * This function removes from the buffer pool all the pages of the
654 * specified relation that have block numbers >= firstDelBlock.
655 * (In particular, with firstDelBlock = 0, all pages are removed.)
656 * Dirty pages are simply dropped, without bothering to write them
657 * out first. Therefore, this is NOT rollback-able, and so should be
658 * used only with extreme caution!
659 *
660 * See DropRelationBuffers in bufmgr.c for more notes.
661 */
662void
664 BlockNumber firstDelBlock)
665{
666 int i;
667
668 for (i = 0; i < NLocBuffer; i++)
669 {
671 uint32 buf_state;
672
673 buf_state = pg_atomic_read_u32(&bufHdr->state);
674
675 if ((buf_state & BM_TAG_VALID) &&
676 BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator) &&
677 BufTagGetForkNum(&bufHdr->tag) == forkNum &&
678 bufHdr->tag.blockNum >= firstDelBlock)
679 {
680 InvalidateLocalBuffer(bufHdr, true);
681 }
682 }
683}
684
685/*
686 * DropRelationAllLocalBuffers
687 * This function removes from the buffer pool all pages of all forks
688 * of the specified relation.
689 *
690 * See DropRelationsAllBuffers in bufmgr.c for more notes.
691 */
692void
694{
695 int i;
696
697 for (i = 0; i < NLocBuffer; i++)
698 {
700 uint32 buf_state;
701
702 buf_state = pg_atomic_read_u32(&bufHdr->state);
703
704 if ((buf_state & BM_TAG_VALID) &&
705 BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
706 {
707 InvalidateLocalBuffer(bufHdr, true);
708 }
709 }
710}
711
712/*
713 * InitLocalBuffers -
714 * init the local buffer cache. Since most queries (esp. multi-user ones)
715 * don't involve local buffers, we delay allocating actual memory for the
716 * buffers until we need them; just make the buffer headers here.
717 */
718static void
720{
721 int nbufs = num_temp_buffers;
722 HASHCTL info;
723 int i;
724
725 /*
726 * Parallel workers can't access data in temporary tables, because they
727 * have no visibility into the local buffers of their leader. This is a
728 * convenient, low-cost place to provide a backstop check for that. Note
729 * that we don't wish to prevent a parallel worker from accessing catalog
730 * metadata about a temp table, so checks at higher levels would be
731 * inappropriate.
732 */
733 if (IsParallelWorker())
735 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
736 errmsg("cannot access temporary tables during a parallel operation")));
737
738 /* Allocate and zero buffer headers and auxiliary arrays */
740 LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
741 LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
744 (errcode(ERRCODE_OUT_OF_MEMORY),
745 errmsg("out of memory")));
746
748
749 /* initialize fields that need to start off nonzero */
750 for (i = 0; i < nbufs; i++)
751 {
753
754 /*
755 * negative to indicate local buffer. This is tricky: shared buffers
756 * start with 0. We have to start with -2. (Note that the routine
757 * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
758 * is -1.)
759 */
760 buf->buf_id = -i - 2;
761
762 pgaio_wref_clear(&buf->io_wref);
763
764 /*
765 * Intentionally do not initialize the buffer's atomic variable
766 * (besides zeroing the underlying memory above). That way we get
767 * errors on platforms without atomics, if somebody (re-)introduces
768 * atomic operations for local buffers.
769 */
770 }
771
772 /* Create the lookup hash table */
773 info.keysize = sizeof(BufferTag);
774 info.entrysize = sizeof(LocalBufferLookupEnt);
775
776 LocalBufHash = hash_create("Local Buffer Lookup Table",
777 nbufs,
778 &info,
780
781 if (!LocalBufHash)
782 elog(ERROR, "could not initialize local buffer hash table");
783
784 /* Initialization done, mark buffers allocated */
785 NLocBuffer = nbufs;
786}
787
788/*
789 * XXX: We could have a slightly more efficient version of PinLocalBuffer()
790 * that does not support adjusting the usagecount - but so far it does not
791 * seem worth the trouble.
792 *
793 * Note that ResourceOwnerEnlarge() must have been done already.
794 */
795bool
796PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
797{
798 uint32 buf_state;
799 Buffer buffer = BufferDescriptorGetBuffer(buf_hdr);
800 int bufid = -buffer - 1;
801
802 buf_state = pg_atomic_read_u32(&buf_hdr->state);
803
804 if (LocalRefCount[bufid] == 0)
805 {
807 buf_state += BUF_REFCOUNT_ONE;
808 if (adjust_usagecount &&
810 {
811 buf_state += BUF_USAGECOUNT_ONE;
812 }
813 pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
814
815 /*
816 * See comment in PinBuffer().
817 *
818 * If the buffer isn't allocated yet, it'll be marked as defined in
819 * GetLocalBufferStorage().
820 */
821 if (LocalBufHdrGetBlock(buf_hdr) != NULL)
823 }
824 LocalRefCount[bufid]++;
827
828 return buf_state & BM_VALID;
829}
830
831void
833{
836}
837
838void
840{
841 int buffid = -buffer - 1;
842
843 Assert(BufferIsLocal(buffer));
844 Assert(LocalRefCount[buffid] > 0);
846
847 if (--LocalRefCount[buffid] == 0)
848 {
849 BufferDesc *buf_hdr = GetLocalBufferDescriptor(buffid);
850 uint32 buf_state;
851
853
854 buf_state = pg_atomic_read_u32(&buf_hdr->state);
855 Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
856 buf_state -= BUF_REFCOUNT_ONE;
857 pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
858
859 /* see comment in UnpinBufferNoOwner */
861 }
862}
863
864/*
865 * GUC check_hook for temp_buffers
866 */
867bool
869{
870 /*
871 * Once local buffers have been initialized, it's too late to change this.
872 * However, if this is only a test call, allow it.
873 */
875 {
876 GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
877 return false;
878 }
879 return true;
880}
881
882/*
883 * GetLocalBufferStorage - allocate memory for a local buffer
884 *
885 * The idea of this function is to aggregate our requests for storage
886 * so that the memory manager doesn't see a whole lot of relatively small
887 * requests. Since we'll never give back a local buffer once it's created
888 * within a particular process, no point in burdening memmgr with separately
889 * managed chunks.
890 */
891static Block
893{
894 static char *cur_block = NULL;
895 static int next_buf_in_block = 0;
896 static int num_bufs_in_block = 0;
897 static int total_bufs_allocated = 0;
898 static MemoryContext LocalBufferContext = NULL;
899
900 char *this_buf;
901
902 Assert(total_bufs_allocated < NLocBuffer);
903
904 if (next_buf_in_block >= num_bufs_in_block)
905 {
906 /* Need to make a new request to memmgr */
907 int num_bufs;
908
909 /*
910 * We allocate local buffers in a context of their own, so that the
911 * space eaten for them is easily recognizable in MemoryContextStats
912 * output. Create the context on first use.
913 */
914 if (LocalBufferContext == NULL)
915 LocalBufferContext =
917 "LocalBufferContext",
919
920 /* Start with a 16-buffer request; subsequent ones double each time */
921 num_bufs = Max(num_bufs_in_block * 2, 16);
922 /* But not more than what we need for all remaining local bufs */
923 num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
924 /* And don't overflow MaxAllocSize, either */
925 num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
926
927 /* Buffers should be I/O aligned. */
928 cur_block = (char *)
930 MemoryContextAlloc(LocalBufferContext,
931 num_bufs * BLCKSZ + PG_IO_ALIGN_SIZE));
932 next_buf_in_block = 0;
933 num_bufs_in_block = num_bufs;
934 }
935
936 /* Allocate next buffer in current memory block */
937 this_buf = cur_block + next_buf_in_block * BLCKSZ;
938 next_buf_in_block++;
939 total_bufs_allocated++;
940
941 /*
942 * Caller's PinLocalBuffer() was too early for Valgrind updates, so do it
943 * here. The block is actually undefined, but we want consistency with
944 * the regular case of not needing to allocate memory. This is
945 * specifically needed when method_io_uring.c fills the block, because
946 * Valgrind doesn't recognize io_uring reads causing undefined memory to
947 * become defined.
948 */
949 VALGRIND_MAKE_MEM_DEFINED(this_buf, BLCKSZ);
950
951 return (Block) this_buf;
952}
953
954/*
955 * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
956 *
957 * This is just like CheckForBufferLeaks(), but for local buffers.
958 */
959static void
961{
962#ifdef USE_ASSERT_CHECKING
963 if (LocalRefCount)
964 {
965 int RefCountErrors = 0;
966 int i;
967
968 for (i = 0; i < NLocBuffer; i++)
969 {
970 if (LocalRefCount[i] != 0)
971 {
972 Buffer b = -i - 1;
973 char *s;
974
976 elog(WARNING, "local buffer refcount leak: %s", s);
977 pfree(s);
978
979 RefCountErrors++;
980 }
981 }
982 Assert(RefCountErrors == 0);
983 }
984#endif
985}
986
987/*
988 * AtEOXact_LocalBuffers - clean up at end of transaction.
989 *
990 * This is just like AtEOXact_Buffers, but for local buffers.
991 */
992void
994{
996}
997
998/*
999 * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
1000 *
1001 * This is just like AtProcExit_Buffers, but for local buffers.
1002 */
1003void
1005{
1006 /*
1007 * We shouldn't be holding any remaining pins; if we are, and assertions
1008 * aren't enabled, we'll fail later in DropRelationBuffers while trying to
1009 * drop the temp rels.
1010 */
1012}
bool pgaio_wref_valid(PgAioWaitRef *iow)
Definition: aio.c:873
void pgaio_wref_clear(PgAioWaitRef *iow)
Definition: aio.c:866
void pgaio_wref_wait(PgAioWaitRef *iow)
Definition: aio.c:893
static void pg_atomic_unlocked_write_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:295
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition: atomics.h:239
uint32 BlockNumber
Definition: block.h:31
#define InvalidBlockNumber
Definition: block.h:33
#define MaxBlockNumber
Definition: block.h:35
int Buffer
Definition: buf.h:23
#define InvalidBuffer
Definition: buf.h:25
#define BufferIsLocal(buffer)
Definition: buf.h:37
#define BM_MAX_USAGE_COUNT
Definition: buf_internals.h:86
static void InitBufferTag(BufferTag *tag, const RelFileLocator *rlocator, ForkNumber forkNum, BlockNumber blockNum)
#define BM_TAG_VALID
Definition: buf_internals.h:71
#define BUF_USAGECOUNT_MASK
Definition: buf_internals.h:53
static ForkNumber BufTagGetForkNum(const BufferTag *tag)
#define BUF_REFCOUNT_ONE
Definition: buf_internals.h:51
static bool BufferTagsEqual(const BufferTag *tag1, const BufferTag *tag2)
static bool BufTagMatchesRelFileLocator(const BufferTag *tag, const RelFileLocator *rlocator)
#define BUF_FLAG_MASK
Definition: buf_internals.h:56
#define BM_DIRTY
Definition: buf_internals.h:69
#define BM_JUST_DIRTIED
Definition: buf_internals.h:74
#define BUF_STATE_GET_USAGECOUNT(state)
Definition: buf_internals.h:60
static void ClearBufferTag(BufferTag *tag)
static void ResourceOwnerRememberBuffer(ResourceOwner owner, Buffer buffer)
struct buftag BufferTag
static void ResourceOwnerForgetBuffer(ResourceOwner owner, Buffer buffer)
#define BUF_USAGECOUNT_ONE
Definition: buf_internals.h:54
#define BUF_STATE_GET_REFCOUNT(state)
Definition: buf_internals.h:59
static RelFileLocator BufTagGetRelFileLocator(const BufferTag *tag)
#define BM_VALID
Definition: buf_internals.h:70
static BufferDesc * GetLocalBufferDescriptor(uint32 id)
static Buffer BufferDescriptorGetBuffer(const BufferDesc *bdesc)
bool track_io_timing
Definition: bufmgr.c:144
char * DebugPrintBufferRefcount(Buffer buffer)
Definition: bufmgr.c:4104
void * Block
Definition: bufmgr.h:26
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1541
PageData * Page
Definition: bufpage.h:82
#define Min(x, y)
Definition: c.h:975
#define TYPEALIGN(ALIGNVAL, LEN)
Definition: c.h:775
#define Max(x, y)
Definition: c.h:969
int32_t int32
Definition: c.h:498
uint64_t uint64
Definition: c.h:503
uint32_t uint32
Definition: c.h:502
#define MemSet(start, val, len)
Definition: c.h:991
#define fprintf(file, fmt, msg)
Definition: cubescan.l:21
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define FATAL
Definition: elog.h:41
#define WARNING
Definition: elog.h:36
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:149
int io_direct_flags
Definition: fd.c:168
#define IO_DIRECT_DATA
Definition: fd.h:54
#define MaxAllocSize
Definition: fe_memutils.h:22
ProcNumber MyProcNumber
Definition: globals.c:91
#define newval
#define GUC_check_errdetail
Definition: guc.h:481
GucSource
Definition: guc.h:112
@ PGC_S_TEST
Definition: guc.h:125
int num_temp_buffers
Definition: guc_tables.c:552
Assert(PointerIsAligned(start, uint64))
#define calloc(a, b)
Definition: header.h:55
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_REMOVE
Definition: hsearch.h:115
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
#define IsParallelWorker()
Definition: parallel.h:60
BufferUsage pgBufferUsage
Definition: instrument.c:20
int b
Definition: isn.c:74
int i
Definition: isn.c:77
int32 * LocalRefCount
Definition: localbuf.c:48
void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
Definition: localbuf.c:182
void UnpinLocalBuffer(Buffer buffer)
Definition: localbuf.c:832
bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
Definition: localbuf.c:521
static HTAB * LocalBufHash
Definition: localbuf.c:52
static int NLocalPinnedBuffers
Definition: localbuf.c:55
void AtEOXact_LocalBuffers(bool isCommit)
Definition: localbuf.c:993
#define LocalBufHdrGetBlock(bufHdr)
Definition: localbuf.c:41
static void CheckForLocalBufferLeaks(void)
Definition: localbuf.c:960
uint32 GetAdditionalLocalPinLimit(void)
Definition: localbuf.c:314
void DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber firstDelBlock)
Definition: localbuf.c:663
static Block GetLocalBufferStorage(void)
Definition: localbuf.c:892
static int nextFreeLocalBufId
Definition: localbuf.c:50
bool check_temp_buffers(int *newval, void **extra, GucSource source)
Definition: localbuf.c:868
void AtProcExit_LocalBuffers(void)
Definition: localbuf.c:1004
bool PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
Definition: localbuf.c:796
static void InitLocalBuffers(void)
Definition: localbuf.c:719
void LimitAdditionalLocalPins(uint32 *additional_pins)
Definition: localbuf.c:322
uint32 GetLocalPinLimit(void)
Definition: localbuf.c:306
static Buffer GetLocalVictimBuffer(void)
Definition: localbuf.c:223
void MarkLocalBufferDirty(Buffer buffer)
Definition: localbuf.c:489
void DropRelationAllLocalBuffers(RelFileLocator rlocator)
Definition: localbuf.c:693
void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits, bool release_aio)
Definition: localbuf.c:560
void InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
Definition: localbuf.c:603
int NLocBuffer
Definition: localbuf.c:44
PrefetchBufferResult PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum)
Definition: localbuf.c:71
BlockNumber ExtendBufferedRelLocal(BufferManagerRelation bmr, ForkNumber fork, uint32 flags, uint32 extend_by, BlockNumber extend_upto, Buffer *buffers, uint32 *extended_by)
Definition: localbuf.c:345
Block * LocalBufferBlockPointers
Definition: localbuf.c:47
void UnpinLocalBufferNoOwner(Buffer buffer)
Definition: localbuf.c:839
BufferDesc * LocalBufferDescriptors
Definition: localbuf.c:46
BufferDesc * LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, bool *foundPtr)
Definition: localbuf.c:118
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1256
void pfree(void *pointer)
Definition: mcxt.c:2147
MemoryContext TopMemoryContext
Definition: mcxt.c:165
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
#define VALGRIND_MAKE_MEM_NOACCESS(addr, size)
Definition: memdebug.h:27
#define AllocSetContextCreate
Definition: memutils.h:149
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:180
#define PG_IO_ALIGN_SIZE
static rewind_source * source
Definition: pg_rewind.c:89
static char * buf
Definition: pg_test_fsync.c:72
@ IOOBJECT_TEMP_RELATION
Definition: pgstat.h:275
@ IOCONTEXT_NORMAL
Definition: pgstat.h:286
@ IOOP_EXTEND
Definition: pgstat.h:311
@ IOOP_EVICT
Definition: pgstat.h:304
@ IOOP_WRITE
Definition: pgstat.h:313
instr_time pgstat_prepare_io_time(bool track_io_guc)
Definition: pgstat_io.c:90
void pgstat_count_io_op(IOObject io_object, IOContext io_context, IOOp io_op, uint32 cnt, uint64 bytes)
Definition: pgstat_io.c:68
void pgstat_count_io_op_time(IOObject io_object, IOContext io_context, IOOp io_op, instr_time start_time, uint32 cnt, uint64 bytes)
Definition: pgstat_io.c:121
ForkNumber
Definition: relpath.h:56
#define relpath(rlocator, forknum)
Definition: relpath.h:150
#define relpathbackend(rlocator, backend, forknum)
Definition: relpath.h:141
ResourceOwner CurrentResourceOwner
Definition: resowner.c:173
void ResourceOwnerEnlarge(ResourceOwner owner)
Definition: resowner.c:452
BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:819
SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend)
Definition: smgr.c:240
void smgrzeroextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, int nblocks, bool skipFsync)
Definition: smgr.c:649
bool smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, int nblocks)
Definition: smgr.c:678
static void smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void *buffer, bool skipFsync)
Definition: smgr.h:131
BufferTag tag
pg_atomic_uint32 state
PgAioWaitRef io_wref
struct SMgrRelationData * smgr
Definition: bufmgr.h:104
int64 local_blks_written
Definition: instrument.h:33
int64 local_blks_dirtied
Definition: instrument.h:32
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
Definition: dynahash.c:220
Buffer recent_buffer
Definition: bufmgr.h:61
RelFileLocator locator
RelFileLocatorBackend smgr_rlocator
Definition: smgr.h:38
BlockNumber blockNum