PostgreSQL Source Code  git master
hio.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * hio.c
4  * POSTGRES heap access method input/output code.
5  *
6  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/access/heap/hio.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/heapam.h"
19 #include "access/hio.h"
20 #include "access/htup_details.h"
21 #include "access/visibilitymap.h"
22 #include "storage/bufmgr.h"
23 #include "storage/freespace.h"
24 #include "storage/lmgr.h"
25 #include "storage/smgr.h"
26 
27 
28 /*
29  * RelationPutHeapTuple - place tuple at specified page
30  *
31  * !!! EREPORT(ERROR) IS DISALLOWED HERE !!! Must PANIC on failure!!!
32  *
33  * Note - caller must hold BUFFER_LOCK_EXCLUSIVE on the buffer.
34  */
35 void
37  Buffer buffer,
38  HeapTuple tuple,
39  bool token)
40 {
41  Page pageHeader;
42  OffsetNumber offnum;
43 
44  /*
45  * A tuple that's being inserted speculatively should already have its
46  * token set.
47  */
49 
50  /*
51  * Do not allow tuples with invalid combinations of hint bits to be placed
52  * on a page. This combination is detected as corruption by the
53  * contrib/amcheck logic, so if you disable this assertion, make
54  * corresponding changes there.
55  */
57  (tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI)));
58 
59  /* Add the tuple to the page */
60  pageHeader = BufferGetPage(buffer);
61 
62  offnum = PageAddItem(pageHeader, (Item) tuple->t_data,
63  tuple->t_len, InvalidOffsetNumber, false, true);
64 
65  if (offnum == InvalidOffsetNumber)
66  elog(PANIC, "failed to add tuple to page");
67 
68  /* Update tuple->t_self to the actual position where it was stored */
69  ItemPointerSet(&(tuple->t_self), BufferGetBlockNumber(buffer), offnum);
70 
71  /*
72  * Insert the correct position into CTID of the stored tuple, too (unless
73  * this is a speculative insertion, in which case the token is held in
74  * CTID field instead)
75  */
76  if (!token)
77  {
78  ItemId itemId = PageGetItemId(pageHeader, offnum);
79  HeapTupleHeader item = (HeapTupleHeader) PageGetItem(pageHeader, itemId);
80 
81  item->t_ctid = tuple->t_self;
82  }
83 }
84 
85 /*
86  * Read in a buffer in mode, using bulk-insert strategy if bistate isn't NULL.
87  */
88 static Buffer
89 ReadBufferBI(Relation relation, BlockNumber targetBlock,
91 {
92  Buffer buffer;
93 
94  /* If not bulk-insert, exactly like ReadBuffer */
95  if (!bistate)
96  return ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
97  mode, NULL);
98 
99  /* If we have the desired block already pinned, re-pin and return it */
100  if (bistate->current_buf != InvalidBuffer)
101  {
102  if (BufferGetBlockNumber(bistate->current_buf) == targetBlock)
103  {
104  /*
105  * Currently the LOCK variants are only used for extending
106  * relation, which should never reach this branch.
107  */
110 
112  return bistate->current_buf;
113  }
114  /* ... else drop the old buffer */
115  ReleaseBuffer(bistate->current_buf);
116  bistate->current_buf = InvalidBuffer;
117  }
118 
119  /* Perform a read using the buffer strategy */
120  buffer = ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
121  mode, bistate->strategy);
122 
123  /* Save the selected block as target for future inserts */
124  IncrBufferRefCount(buffer);
125  bistate->current_buf = buffer;
126 
127  return buffer;
128 }
129 
130 /*
131  * For each heap page which is all-visible, acquire a pin on the appropriate
132  * visibility map page, if we haven't already got one.
133  *
134  * To avoid complexity in the callers, either buffer1 or buffer2 may be
135  * InvalidBuffer if only one buffer is involved. For the same reason, block2
136  * may be smaller than block1.
137  *
138  * Returns whether buffer locks were temporarily released.
139  */
140 static bool
141 GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
142  BlockNumber block1, BlockNumber block2,
143  Buffer *vmbuffer1, Buffer *vmbuffer2)
144 {
145  bool need_to_pin_buffer1;
146  bool need_to_pin_buffer2;
147  bool released_locks = false;
148 
149  /*
150  * Swap buffers around to handle case of a single block/buffer, and to
151  * handle if lock ordering rules require to lock block2 first.
152  */
153  if (!BufferIsValid(buffer1) ||
154  (BufferIsValid(buffer2) && block1 > block2))
155  {
156  Buffer tmpbuf = buffer1;
157  Buffer *tmpvmbuf = vmbuffer1;
158  BlockNumber tmpblock = block1;
159 
160  buffer1 = buffer2;
161  vmbuffer1 = vmbuffer2;
162  block1 = block2;
163 
164  buffer2 = tmpbuf;
165  vmbuffer2 = tmpvmbuf;
166  block2 = tmpblock;
167  }
168 
169  Assert(BufferIsValid(buffer1));
170  Assert(buffer2 == InvalidBuffer || block1 <= block2);
171 
172  while (1)
173  {
174  /* Figure out which pins we need but don't have. */
175  need_to_pin_buffer1 = PageIsAllVisible(BufferGetPage(buffer1))
176  && !visibilitymap_pin_ok(block1, *vmbuffer1);
177  need_to_pin_buffer2 = buffer2 != InvalidBuffer
178  && PageIsAllVisible(BufferGetPage(buffer2))
179  && !visibilitymap_pin_ok(block2, *vmbuffer2);
180  if (!need_to_pin_buffer1 && !need_to_pin_buffer2)
181  break;
182 
183  /* We must unlock both buffers before doing any I/O. */
184  released_locks = true;
185  LockBuffer(buffer1, BUFFER_LOCK_UNLOCK);
186  if (buffer2 != InvalidBuffer && buffer2 != buffer1)
187  LockBuffer(buffer2, BUFFER_LOCK_UNLOCK);
188 
189  /* Get pins. */
190  if (need_to_pin_buffer1)
191  visibilitymap_pin(relation, block1, vmbuffer1);
192  if (need_to_pin_buffer2)
193  visibilitymap_pin(relation, block2, vmbuffer2);
194 
195  /* Relock buffers. */
197  if (buffer2 != InvalidBuffer && buffer2 != buffer1)
199 
200  /*
201  * If there are two buffers involved and we pinned just one of them,
202  * it's possible that the second one became all-visible while we were
203  * busy pinning the first one. If it looks like that's a possible
204  * scenario, we'll need to make a second pass through this loop.
205  */
206  if (buffer2 == InvalidBuffer || buffer1 == buffer2
207  || (need_to_pin_buffer1 && need_to_pin_buffer2))
208  break;
209  }
210 
211  return released_locks;
212 }
213 
214 /*
215  * Extend the relation. By multiple pages, if beneficial.
216  *
217  * If the caller needs multiple pages (num_pages > 1), we always try to extend
218  * by at least that much.
219  *
220  * If there is contention on the extension lock, we don't just extend "for
221  * ourselves", but we try to help others. We can do so by adding empty pages
222  * into the FSM. Typically there is no contention when we can't use the FSM.
223  *
224  * We do have to limit the number of pages to extend by to some value, as the
225  * buffers for all the extended pages need to, temporarily, be pinned. For now
226  * we define MAX_BUFFERS_TO_EXTEND_BY to be 64 buffers, it's hard to see
227  * benefits with higher numbers. This partially is because copyfrom.c's
228  * MAX_BUFFERED_TUPLES / MAX_BUFFERED_BYTES prevents larger multi_inserts.
229  *
230  * Returns a buffer for a newly extended block. If possible, the buffer is
231  * returned exclusively locked. *did_unlock is set to true if the lock had to
232  * be released, false otherwise.
233  *
234  *
235  * XXX: It would likely be beneficial for some workloads to extend more
236  * aggressively, e.g. using a heuristic based on the relation size.
237  */
238 static Buffer
240  int num_pages, bool use_fsm, bool *did_unlock)
241 {
242 #define MAX_BUFFERS_TO_EXTEND_BY 64
243  Buffer victim_buffers[MAX_BUFFERS_TO_EXTEND_BY];
244  BlockNumber first_block = InvalidBlockNumber;
245  BlockNumber last_block = InvalidBlockNumber;
246  uint32 extend_by_pages;
247  uint32 not_in_fsm_pages;
248  Buffer buffer;
249  Page page;
250 
251  /*
252  * Determine by how many pages to try to extend by.
253  */
254  if (bistate == NULL && !use_fsm)
255  {
256  /*
257  * If we have neither bistate, nor can use the FSM, we can't bulk
258  * extend - there'd be no way to find the additional pages.
259  */
260  extend_by_pages = 1;
261  }
262  else
263  {
264  uint32 waitcount;
265 
266  /*
267  * Try to extend at least by the number of pages the caller needs. We
268  * can remember the additional pages (either via FSM or bistate).
269  */
270  extend_by_pages = num_pages;
271 
272  if (!RELATION_IS_LOCAL(relation))
273  waitcount = RelationExtensionLockWaiterCount(relation);
274  else
275  waitcount = 0;
276 
277  /*
278  * Multiply the number of pages to extend by the number of waiters. Do
279  * this even if we're not using the FSM, as it still relieves
280  * contention, by deferring the next time this backend needs to
281  * extend. In that case the extended pages will be found via
282  * bistate->next_free.
283  */
284  extend_by_pages += extend_by_pages * waitcount;
285 
286  /* ---
287  * If we previously extended using the same bistate, it's very likely
288  * we'll extend some more. Try to extend by as many pages as
289  * before. This can be important for performance for several reasons,
290  * including:
291  *
292  * - It prevents mdzeroextend() switching between extending the
293  * relation in different ways, which is inefficient for some
294  * filesystems.
295  *
296  * - Contention is often intermittent. Even if we currently don't see
297  * other waiters (see above), extending by larger amounts can
298  * prevent future contention.
299  * ---
300  */
301  if (bistate)
302  extend_by_pages = Max(extend_by_pages, bistate->already_extended_by);
303 
304  /*
305  * Can't extend by more than MAX_BUFFERS_TO_EXTEND_BY, we need to pin
306  * them all concurrently.
307  */
308  extend_by_pages = Min(extend_by_pages, MAX_BUFFERS_TO_EXTEND_BY);
309  }
310 
311  /*
312  * How many of the extended pages should be entered into the FSM?
313  *
314  * If we have a bistate, only enter pages that we don't need ourselves
315  * into the FSM. Otherwise every other backend will immediately try to
316  * use the pages this backend needs for itself, causing unnecessary
317  * contention. If we don't have a bistate, we can't avoid the FSM.
318  *
319  * Never enter the page returned into the FSM, we'll immediately use it.
320  */
321  if (num_pages > 1 && bistate == NULL)
322  not_in_fsm_pages = 1;
323  else
324  not_in_fsm_pages = num_pages;
325 
326  /* prepare to put another buffer into the bistate */
327  if (bistate && bistate->current_buf != InvalidBuffer)
328  {
329  ReleaseBuffer(bistate->current_buf);
330  bistate->current_buf = InvalidBuffer;
331  }
332 
333  /*
334  * Extend the relation. We ask for the first returned page to be locked,
335  * so that we are sure that nobody has inserted into the page
336  * concurrently.
337  *
338  * With the current MAX_BUFFERS_TO_EXTEND_BY there's no danger of
339  * [auto]vacuum trying to truncate later pages as REL_TRUNCATE_MINIMUM is
340  * way larger.
341  */
342  first_block = ExtendBufferedRelBy(BMR_REL(relation), MAIN_FORKNUM,
343  bistate ? bistate->strategy : NULL,
345  extend_by_pages,
346  victim_buffers,
347  &extend_by_pages);
348  buffer = victim_buffers[0]; /* the buffer the function will return */
349  last_block = first_block + (extend_by_pages - 1);
350  Assert(first_block == BufferGetBlockNumber(buffer));
351 
352  /*
353  * Relation is now extended. Initialize the page. We do this here, before
354  * potentially releasing the lock on the page, because it allows us to
355  * double check that the page contents are empty (this should never
356  * happen, but if it does we don't want to risk wiping out valid data).
357  */
358  page = BufferGetPage(buffer);
359  if (!PageIsNew(page))
360  elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
361  first_block,
362  RelationGetRelationName(relation));
363 
364  PageInit(page, BufferGetPageSize(buffer), 0);
365  MarkBufferDirty(buffer);
366 
367  /*
368  * If we decided to put pages into the FSM, release the buffer lock (but
369  * not pin), we don't want to do IO while holding a buffer lock. This will
370  * necessitate a bit more extensive checking in our caller.
371  */
372  if (use_fsm && not_in_fsm_pages < extend_by_pages)
373  {
375  *did_unlock = true;
376  }
377  else
378  *did_unlock = false;
379 
380  /*
381  * Relation is now extended. Release pins on all buffers, except for the
382  * first (which we'll return). If we decided to put pages into the FSM,
383  * we can do that as part of the same loop.
384  */
385  for (uint32 i = 1; i < extend_by_pages; i++)
386  {
387  BlockNumber curBlock = first_block + i;
388 
389  Assert(curBlock == BufferGetBlockNumber(victim_buffers[i]));
390  Assert(BlockNumberIsValid(curBlock));
391 
392  ReleaseBuffer(victim_buffers[i]);
393 
394  if (use_fsm && i >= not_in_fsm_pages)
395  {
396  Size freespace = BufferGetPageSize(victim_buffers[i]) -
398 
399  RecordPageWithFreeSpace(relation, curBlock, freespace);
400  }
401  }
402 
403  if (use_fsm && not_in_fsm_pages < extend_by_pages)
404  {
405  BlockNumber first_fsm_block = first_block + not_in_fsm_pages;
406 
407  FreeSpaceMapVacuumRange(relation, first_fsm_block, last_block);
408  }
409 
410  if (bistate)
411  {
412  /*
413  * Remember the additional pages we extended by, so we later can use
414  * them without looking into the FSM.
415  */
416  if (extend_by_pages > 1)
417  {
418  bistate->next_free = first_block + 1;
419  bistate->last_free = last_block;
420  }
421  else
422  {
423  bistate->next_free = InvalidBlockNumber;
424  bistate->last_free = InvalidBlockNumber;
425  }
426 
427  /* maintain bistate->current_buf */
428  IncrBufferRefCount(buffer);
429  bistate->current_buf = buffer;
430  bistate->already_extended_by += extend_by_pages;
431  }
432 
433  return buffer;
434 #undef MAX_BUFFERS_TO_EXTEND_BY
435 }
436 
437 /*
438  * RelationGetBufferForTuple
439  *
440  * Returns pinned and exclusive-locked buffer of a page in given relation
441  * with free space >= given len.
442  *
443  * If num_pages is > 1, we will try to extend the relation by at least that
444  * many pages when we decide to extend the relation. This is more efficient
445  * for callers that know they will need multiple pages
446  * (e.g. heap_multi_insert()).
447  *
448  * If otherBuffer is not InvalidBuffer, then it references a previously
449  * pinned buffer of another page in the same relation; on return, this
450  * buffer will also be exclusive-locked. (This case is used by heap_update;
451  * the otherBuffer contains the tuple being updated.)
452  *
453  * The reason for passing otherBuffer is that if two backends are doing
454  * concurrent heap_update operations, a deadlock could occur if they try
455  * to lock the same two buffers in opposite orders. To ensure that this
456  * can't happen, we impose the rule that buffers of a relation must be
457  * locked in increasing page number order. This is most conveniently done
458  * by having RelationGetBufferForTuple lock them both, with suitable care
459  * for ordering.
460  *
461  * NOTE: it is unlikely, but not quite impossible, for otherBuffer to be the
462  * same buffer we select for insertion of the new tuple (this could only
463  * happen if space is freed in that page after heap_update finds there's not
464  * enough there). In that case, the page will be pinned and locked only once.
465  *
466  * We also handle the possibility that the all-visible flag will need to be
467  * cleared on one or both pages. If so, pin on the associated visibility map
468  * page must be acquired before acquiring buffer lock(s), to avoid possibly
469  * doing I/O while holding buffer locks. The pins are passed back to the
470  * caller using the input-output arguments vmbuffer and vmbuffer_other.
471  * Note that in some cases the caller might have already acquired such pins,
472  * which is indicated by these arguments not being InvalidBuffer on entry.
473  *
474  * We normally use FSM to help us find free space. However,
475  * if HEAP_INSERT_SKIP_FSM is specified, we just append a new empty page to
476  * the end of the relation if the tuple won't fit on the current target page.
477  * This can save some cycles when we know the relation is new and doesn't
478  * contain useful amounts of free space.
479  *
480  * HEAP_INSERT_SKIP_FSM is also useful for non-WAL-logged additions to a
481  * relation, if the caller holds exclusive lock and is careful to invalidate
482  * relation's smgr_targblock before the first insertion --- that ensures that
483  * all insertions will occur into newly added pages and not be intermixed
484  * with tuples from other transactions. That way, a crash can't risk losing
485  * any committed data of other transactions. (See heap_insert's comments
486  * for additional constraints needed for safe usage of this behavior.)
487  *
488  * The caller can also provide a BulkInsertState object to optimize many
489  * insertions into the same relation. This keeps a pin on the current
490  * insertion target page (to save pin/unpin cycles) and also passes a
491  * BULKWRITE buffer selection strategy object to the buffer manager.
492  * Passing NULL for bistate selects the default behavior.
493  *
494  * We don't fill existing pages further than the fillfactor, except for large
495  * tuples in nearly-empty pages. This is OK since this routine is not
496  * consulted when updating a tuple and keeping it on the same page, which is
497  * the scenario fillfactor is meant to reserve space for.
498  *
499  * ereport(ERROR) is allowed here, so this routine *must* be called
500  * before any (unlogged) changes are made in buffer pool.
501  */
502 Buffer
504  Buffer otherBuffer, int options,
505  BulkInsertState bistate,
506  Buffer *vmbuffer, Buffer *vmbuffer_other,
507  int num_pages)
508 {
509  bool use_fsm = !(options & HEAP_INSERT_SKIP_FSM);
510  Buffer buffer = InvalidBuffer;
511  Page page;
512  Size nearlyEmptyFreeSpace,
513  pageFreeSpace = 0,
514  saveFreeSpace = 0,
515  targetFreeSpace = 0;
516  BlockNumber targetBlock,
517  otherBlock;
518  bool unlockedTargetBuffer;
519  bool recheckVmPins;
520 
521  len = MAXALIGN(len); /* be conservative */
522 
523  /* if the caller doesn't know by how many pages to extend, extend by 1 */
524  if (num_pages <= 0)
525  num_pages = 1;
526 
527  /* Bulk insert is not supported for updates, only inserts. */
528  Assert(otherBuffer == InvalidBuffer || !bistate);
529 
530  /*
531  * If we're gonna fail for oversize tuple, do it right away
532  */
533  if (len > MaxHeapTupleSize)
534  ereport(ERROR,
535  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
536  errmsg("row is too big: size %zu, maximum size %zu",
537  len, MaxHeapTupleSize)));
538 
539  /* Compute desired extra freespace due to fillfactor option */
540  saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
542 
543  /*
544  * Since pages without tuples can still have line pointers, we consider
545  * pages "empty" when the unavailable space is slight. This threshold is
546  * somewhat arbitrary, but it should prevent most unnecessary relation
547  * extensions while inserting large tuples into low-fillfactor tables.
548  */
549  nearlyEmptyFreeSpace = MaxHeapTupleSize -
550  (MaxHeapTuplesPerPage / 8 * sizeof(ItemIdData));
551  if (len + saveFreeSpace > nearlyEmptyFreeSpace)
552  targetFreeSpace = Max(len, nearlyEmptyFreeSpace);
553  else
554  targetFreeSpace = len + saveFreeSpace;
555 
556  if (otherBuffer != InvalidBuffer)
557  otherBlock = BufferGetBlockNumber(otherBuffer);
558  else
559  otherBlock = InvalidBlockNumber; /* just to keep compiler quiet */
560 
561  /*
562  * We first try to put the tuple on the same page we last inserted a tuple
563  * on, as cached in the BulkInsertState or relcache entry. If that
564  * doesn't work, we ask the Free Space Map to locate a suitable page.
565  * Since the FSM's info might be out of date, we have to be prepared to
566  * loop around and retry multiple times. (To insure this isn't an infinite
567  * loop, we must update the FSM with the correct amount of free space on
568  * each page that proves not to be suitable.) If the FSM has no record of
569  * a page with enough free space, we give up and extend the relation.
570  *
571  * When use_fsm is false, we either put the tuple onto the existing target
572  * page or extend the relation.
573  */
574  if (bistate && bistate->current_buf != InvalidBuffer)
575  targetBlock = BufferGetBlockNumber(bistate->current_buf);
576  else
577  targetBlock = RelationGetTargetBlock(relation);
578 
579  if (targetBlock == InvalidBlockNumber && use_fsm)
580  {
581  /*
582  * We have no cached target page, so ask the FSM for an initial
583  * target.
584  */
585  targetBlock = GetPageWithFreeSpace(relation, targetFreeSpace);
586  }
587 
588  /*
589  * If the FSM knows nothing of the rel, try the last page before we give
590  * up and extend. This avoids one-tuple-per-page syndrome during
591  * bootstrapping or in a recently-started system.
592  */
593  if (targetBlock == InvalidBlockNumber)
594  {
595  BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
596 
597  if (nblocks > 0)
598  targetBlock = nblocks - 1;
599  }
600 
601 loop:
602  while (targetBlock != InvalidBlockNumber)
603  {
604  /*
605  * Read and exclusive-lock the target block, as well as the other
606  * block if one was given, taking suitable care with lock ordering and
607  * the possibility they are the same block.
608  *
609  * If the page-level all-visible flag is set, caller will need to
610  * clear both that and the corresponding visibility map bit. However,
611  * by the time we return, we'll have x-locked the buffer, and we don't
612  * want to do any I/O while in that state. So we check the bit here
613  * before taking the lock, and pin the page if it appears necessary.
614  * Checking without the lock creates a risk of getting the wrong
615  * answer, so we'll have to recheck after acquiring the lock.
616  */
617  if (otherBuffer == InvalidBuffer)
618  {
619  /* easy case */
620  buffer = ReadBufferBI(relation, targetBlock, RBM_NORMAL, bistate);
621  if (PageIsAllVisible(BufferGetPage(buffer)))
622  visibilitymap_pin(relation, targetBlock, vmbuffer);
623 
624  /*
625  * If the page is empty, pin vmbuffer to set all_frozen bit later.
626  */
627  if ((options & HEAP_INSERT_FROZEN) &&
628  (PageGetMaxOffsetNumber(BufferGetPage(buffer)) == 0))
629  visibilitymap_pin(relation, targetBlock, vmbuffer);
630 
632  }
633  else if (otherBlock == targetBlock)
634  {
635  /* also easy case */
636  buffer = otherBuffer;
637  if (PageIsAllVisible(BufferGetPage(buffer)))
638  visibilitymap_pin(relation, targetBlock, vmbuffer);
640  }
641  else if (otherBlock < targetBlock)
642  {
643  /* lock other buffer first */
644  buffer = ReadBuffer(relation, targetBlock);
645  if (PageIsAllVisible(BufferGetPage(buffer)))
646  visibilitymap_pin(relation, targetBlock, vmbuffer);
647  LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
649  }
650  else
651  {
652  /* lock target buffer first */
653  buffer = ReadBuffer(relation, targetBlock);
654  if (PageIsAllVisible(BufferGetPage(buffer)))
655  visibilitymap_pin(relation, targetBlock, vmbuffer);
657  LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
658  }
659 
660  /*
661  * We now have the target page (and the other buffer, if any) pinned
662  * and locked. However, since our initial PageIsAllVisible checks
663  * were performed before acquiring the lock, the results might now be
664  * out of date, either for the selected victim buffer, or for the
665  * other buffer passed by the caller. In that case, we'll need to
666  * give up our locks, go get the pin(s) we failed to get earlier, and
667  * re-lock. That's pretty painful, but hopefully shouldn't happen
668  * often.
669  *
670  * Note that there's a small possibility that we didn't pin the page
671  * above but still have the correct page pinned anyway, either because
672  * we've already made a previous pass through this loop, or because
673  * caller passed us the right page anyway.
674  *
675  * Note also that it's possible that by the time we get the pin and
676  * retake the buffer locks, the visibility map bit will have been
677  * cleared by some other backend anyway. In that case, we'll have
678  * done a bit of extra work for no gain, but there's no real harm
679  * done.
680  */
681  GetVisibilityMapPins(relation, buffer, otherBuffer,
682  targetBlock, otherBlock, vmbuffer,
683  vmbuffer_other);
684 
685  /*
686  * Now we can check to see if there's enough free space here. If so,
687  * we're done.
688  */
689  page = BufferGetPage(buffer);
690 
691  /*
692  * If necessary initialize page, it'll be used soon. We could avoid
693  * dirtying the buffer here, and rely on the caller to do so whenever
694  * it puts a tuple onto the page, but there seems not much benefit in
695  * doing so.
696  */
697  if (PageIsNew(page))
698  {
699  PageInit(page, BufferGetPageSize(buffer), 0);
700  MarkBufferDirty(buffer);
701  }
702 
703  pageFreeSpace = PageGetHeapFreeSpace(page);
704  if (targetFreeSpace <= pageFreeSpace)
705  {
706  /* use this page as future insert target, too */
707  RelationSetTargetBlock(relation, targetBlock);
708  return buffer;
709  }
710 
711  /*
712  * Not enough space, so we must give up our page locks and pin (if
713  * any) and prepare to look elsewhere. We don't care which order we
714  * unlock the two buffers in, so this can be slightly simpler than the
715  * code above.
716  */
718  if (otherBuffer == InvalidBuffer)
719  ReleaseBuffer(buffer);
720  else if (otherBlock != targetBlock)
721  {
722  LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
723  ReleaseBuffer(buffer);
724  }
725 
726  /* Is there an ongoing bulk extension? */
727  if (bistate && bistate->next_free != InvalidBlockNumber)
728  {
729  Assert(bistate->next_free <= bistate->last_free);
730 
731  /*
732  * We bulk extended the relation before, and there are still some
733  * unused pages from that extension, so we don't need to look in
734  * the FSM for a new page. But do record the free space from the
735  * last page, somebody might insert narrower tuples later.
736  */
737  if (use_fsm)
738  RecordPageWithFreeSpace(relation, targetBlock, pageFreeSpace);
739 
740  targetBlock = bistate->next_free;
741  if (bistate->next_free >= bistate->last_free)
742  {
743  bistate->next_free = InvalidBlockNumber;
744  bistate->last_free = InvalidBlockNumber;
745  }
746  else
747  bistate->next_free++;
748  }
749  else if (!use_fsm)
750  {
751  /* Without FSM, always fall out of the loop and extend */
752  break;
753  }
754  else
755  {
756  /*
757  * Update FSM as to condition of this page, and ask for another
758  * page to try.
759  */
760  targetBlock = RecordAndGetPageWithFreeSpace(relation,
761  targetBlock,
762  pageFreeSpace,
763  targetFreeSpace);
764  }
765  }
766 
767  /* Have to extend the relation */
768  buffer = RelationAddBlocks(relation, bistate, num_pages, use_fsm,
769  &unlockedTargetBuffer);
770 
771  targetBlock = BufferGetBlockNumber(buffer);
772  page = BufferGetPage(buffer);
773 
774  /*
775  * The page is empty, pin vmbuffer to set all_frozen bit. We don't want to
776  * do IO while the buffer is locked, so we unlock the page first if IO is
777  * needed (necessitating checks below).
778  */
780  {
781  Assert(PageGetMaxOffsetNumber(page) == 0);
782 
783  if (!visibilitymap_pin_ok(targetBlock, *vmbuffer))
784  {
785  if (!unlockedTargetBuffer)
787  unlockedTargetBuffer = true;
788  visibilitymap_pin(relation, targetBlock, vmbuffer);
789  }
790  }
791 
792  /*
793  * Reacquire locks if necessary.
794  *
795  * If the target buffer was unlocked above, or is unlocked while
796  * reacquiring the lock on otherBuffer below, it's unlikely, but possible,
797  * that another backend used space on this page. We check for that below,
798  * and retry if necessary.
799  */
800  recheckVmPins = false;
801  if (unlockedTargetBuffer)
802  {
803  /* released lock on target buffer above */
804  if (otherBuffer != InvalidBuffer)
805  LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
807  recheckVmPins = true;
808  }
809  else if (otherBuffer != InvalidBuffer)
810  {
811  /*
812  * We did not release the target buffer, and otherBuffer is valid,
813  * need to lock the other buffer. It's guaranteed to be of a lower
814  * page number than the new page. To conform with the deadlock
815  * prevent rules, we ought to lock otherBuffer first, but that would
816  * give other backends a chance to put tuples on our page. To reduce
817  * the likelihood of that, attempt to lock the other buffer
818  * conditionally, that's very likely to work.
819  *
820  * Alternatively, we could acquire the lock on otherBuffer before
821  * extending the relation, but that'd require holding the lock while
822  * performing IO, which seems worse than an unlikely retry.
823  */
824  Assert(otherBuffer != buffer);
825  Assert(targetBlock > otherBlock);
826 
827  if (unlikely(!ConditionalLockBuffer(otherBuffer)))
828  {
829  unlockedTargetBuffer = true;
831  LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
833  }
834  recheckVmPins = true;
835  }
836 
837  /*
838  * If one of the buffers was unlocked (always the case if otherBuffer is
839  * valid), it's possible, although unlikely, that an all-visible flag
840  * became set. We can use GetVisibilityMapPins to deal with that. It's
841  * possible that GetVisibilityMapPins() might need to temporarily release
842  * buffer locks, in which case we'll need to check if there's still enough
843  * space on the page below.
844  */
845  if (recheckVmPins)
846  {
847  if (GetVisibilityMapPins(relation, otherBuffer, buffer,
848  otherBlock, targetBlock, vmbuffer_other,
849  vmbuffer))
850  unlockedTargetBuffer = true;
851  }
852 
853  /*
854  * If the target buffer was temporarily unlocked since the relation
855  * extension, it's possible, although unlikely, that all the space on the
856  * page was already used. If so, we just retry from the start. If we
857  * didn't unlock, something has gone wrong if there's not enough space -
858  * the test at the top should have prevented reaching this case.
859  */
860  pageFreeSpace = PageGetHeapFreeSpace(page);
861  if (len > pageFreeSpace)
862  {
863  if (unlockedTargetBuffer)
864  {
865  if (otherBuffer != InvalidBuffer)
866  LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
867  UnlockReleaseBuffer(buffer);
868 
869  goto loop;
870  }
871  elog(PANIC, "tuple is too big: size %zu", len);
872  }
873 
874  /*
875  * Remember the new page as our target for future insertions.
876  *
877  * XXX should we enter the new page into the free space map immediately,
878  * or just keep it for this backend's exclusive use in the short run
879  * (until VACUUM sees it)? Seems to depend on whether you expect the
880  * current backend to make more insertions or not, which is probably a
881  * good bet most of the time. So for now, don't add it to FSM yet.
882  */
883  RelationSetTargetBlock(relation, targetBlock);
884 
885  return buffer;
886 }
uint32 BlockNumber
Definition: block.h:31
#define InvalidBlockNumber
Definition: block.h:33
static bool BlockNumberIsValid(BlockNumber blockNumber)
Definition: block.h:71
int Buffer
Definition: buf.h:23
#define InvalidBuffer
Definition: buf.h:25
void IncrBufferRefCount(Buffer buffer)
Definition: bufmgr.c:4512
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:3290
BlockNumber ExtendBufferedRelBy(BufferManagerRelation bmr, ForkNumber fork, BufferAccessStrategy strategy, uint32 flags, uint32 extend_by, Buffer *buffers, uint32 *extended_by)
Definition: bufmgr.c:844
bool ConditionalLockBuffer(Buffer buffer)
Definition: bufmgr.c:4741
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4480
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4497
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:2111
void LockBuffer(Buffer buffer, int mode)
Definition: bufmgr.c:4715
Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy)
Definition: bufmgr.c:755
Buffer ReadBuffer(Relation reln, BlockNumber blockNum)
Definition: bufmgr.c:708
#define BUFFER_LOCK_UNLOCK
Definition: bufmgr.h:157
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:227
static Page BufferGetPage(Buffer buffer)
Definition: bufmgr.h:350
static Size BufferGetPageSize(Buffer buffer)
Definition: bufmgr.h:339
@ EB_LOCK_FIRST
Definition: bufmgr.h:85
#define BUFFER_LOCK_EXCLUSIVE
Definition: bufmgr.h:159
ReadBufferMode
Definition: bufmgr.h:43
@ RBM_ZERO_AND_CLEANUP_LOCK
Definition: bufmgr.h:47
@ RBM_ZERO_AND_LOCK
Definition: bufmgr.h:45
@ RBM_NORMAL
Definition: bufmgr.h:44
#define BMR_REL(p_rel)
Definition: bufmgr.h:106
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:301
Size PageGetHeapFreeSpace(Page page)
Definition: bufpage.c:991
void PageInit(Page page, Size pageSize, Size specialSize)
Definition: bufpage.c:42
Pointer Page
Definition: bufpage.h:78
static Item PageGetItem(Page page, ItemId itemId)
Definition: bufpage.h:351
#define SizeOfPageHeaderData
Definition: bufpage.h:213
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
Definition: bufpage.h:240
static bool PageIsNew(Page page)
Definition: bufpage.h:230
static bool PageIsAllVisible(Page page)
Definition: bufpage.h:426
static OffsetNumber PageGetMaxOffsetNumber(Page page)
Definition: bufpage.h:369
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:468
unsigned int uint32
Definition: c.h:495
#define Min(x, y)
Definition: c.h:993
#define MAXALIGN(LEN)
Definition: c.h:800
#define Max(x, y)
Definition: c.h:987
#define unlikely(x)
Definition: c.h:300
size_t Size
Definition: c.h:594
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define PANIC
Definition: elog.h:42
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
void FreeSpaceMapVacuumRange(Relation rel, BlockNumber start, BlockNumber end)
Definition: freespace.c:354
BlockNumber RecordAndGetPageWithFreeSpace(Relation rel, BlockNumber oldPage, Size oldSpaceAvail, Size spaceNeeded)
Definition: freespace.c:150
void RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk, Size spaceAvail)
Definition: freespace.c:182
BlockNumber GetPageWithFreeSpace(Relation rel, Size spaceNeeded)
Definition: freespace.c:133
#define HEAP_INSERT_SKIP_FSM
Definition: heapam.h:34
#define HEAP_INSERT_FROZEN
Definition: heapam.h:35
void RelationPutHeapTuple(Relation relation, Buffer buffer, HeapTuple tuple, bool token)
Definition: hio.c:36
static Buffer RelationAddBlocks(Relation relation, BulkInsertState bistate, int num_pages, bool use_fsm, bool *did_unlock)
Definition: hio.c:239
static bool GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2, BlockNumber block1, BlockNumber block2, Buffer *vmbuffer1, Buffer *vmbuffer2)
Definition: hio.c:141
#define MAX_BUFFERS_TO_EXTEND_BY
Buffer RelationGetBufferForTuple(Relation relation, Size len, Buffer otherBuffer, int options, BulkInsertState bistate, Buffer *vmbuffer, Buffer *vmbuffer_other, int num_pages)
Definition: hio.c:503
static Buffer ReadBufferBI(Relation relation, BlockNumber targetBlock, ReadBufferMode mode, BulkInsertState bistate)
Definition: hio.c:89
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define HEAP_XMAX_IS_MULTI
Definition: htup_details.h:209
#define HEAP_XMAX_COMMITTED
Definition: htup_details.h:207
#define MaxHeapTuplesPerPage
Definition: htup_details.h:572
#define MaxHeapTupleSize
Definition: htup_details.h:558
#define HeapTupleHeaderIsSpeculative(tup)
Definition: htup_details.h:428
#define token
Definition: indent_globs.h:126
int i
Definition: isn.c:73
Pointer Item
Definition: item.h:17
struct ItemIdData ItemIdData
static void ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum)
Definition: itemptr.h:135
Assert(fmt[strlen(fmt) - 1] !='\n')
int RelationExtensionLockWaiterCount(Relation relation)
Definition: lmgr.c:466
#define InvalidOffsetNumber
Definition: off.h:26
uint16 OffsetNumber
Definition: off.h:24
static PgChecksumMode mode
Definition: pg_checksums.c:56
const void size_t len
#define RELATION_IS_LOCAL(relation)
Definition: rel.h:649
#define RelationGetTargetPageFreeSpace(relation, defaultff)
Definition: rel.h:377
#define RelationGetRelationName(relation)
Definition: rel.h:538
#define RelationGetTargetBlock(relation)
Definition: rel.h:602
#define RelationSetTargetBlock(relation, targblock)
Definition: rel.h:609
#define HEAP_DEFAULT_FILLFACTOR
Definition: rel.h:348
@ MAIN_FORKNUM
Definition: relpath.h:50
BlockNumber last_free
Definition: hio.h:49
BufferAccessStrategy strategy
Definition: hio.h:31
uint32 already_extended_by
Definition: hio.h:50
BlockNumber next_free
Definition: hio.h:48
Buffer current_buf
Definition: hio.h:32
ItemPointerData t_self
Definition: htup.h:65
uint32 t_len
Definition: htup.h:64
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData t_ctid
Definition: htup_details.h:161
bool visibilitymap_pin_ok(BlockNumber heapBlk, Buffer vmbuf)
void visibilitymap_pin(Relation rel, BlockNumber heapBlk, Buffer *vmbuf)
static StringInfoData tmpbuf
Definition: walsender.c:160