PostgreSQL Source Code  git master
heapam.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * heapam.c
4  * heap access method code
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/access/heap/heapam.c
12  *
13  *
14  * INTERFACE ROUTINES
15  * relation_open - open any relation by relation OID
16  * relation_openrv - open any relation specified by a RangeVar
17  * relation_close - close any relation
18  * heap_open - open a heap relation by relation OID
19  * heap_openrv - open a heap relation specified by a RangeVar
20  * heap_close - (now just a macro for relation_close)
21  * heap_beginscan - begin relation scan
22  * heap_rescan - restart a relation scan
23  * heap_endscan - end relation scan
24  * heap_getnext - retrieve next tuple in scan
25  * heap_fetch - retrieve tuple with given tid
26  * heap_insert - insert tuple into a relation
27  * heap_multi_insert - insert multiple tuples into a relation
28  * heap_delete - delete a tuple from a relation
29  * heap_update - replace a tuple in a relation with another tuple
30  * heap_sync - sync heap, for when no WAL has been written
31  *
32  * NOTES
33  * This file contains the heap_ routines which implement
34  * the POSTGRES heap access method used for all POSTGRES
35  * relations.
36  *
37  *-------------------------------------------------------------------------
38  */
39 #include "postgres.h"
40 
41 #include "access/bufmask.h"
42 #include "access/heapam.h"
43 #include "access/heapam_xlog.h"
44 #include "access/hio.h"
45 #include "access/multixact.h"
46 #include "access/parallel.h"
47 #include "access/relscan.h"
48 #include "access/sysattr.h"
49 #include "access/transam.h"
50 #include "access/tuptoaster.h"
51 #include "access/valid.h"
52 #include "access/visibilitymap.h"
53 #include "access/xact.h"
54 #include "access/xlog.h"
55 #include "access/xloginsert.h"
56 #include "access/xlogutils.h"
57 #include "catalog/catalog.h"
58 #include "catalog/namespace.h"
59 #include "catalog/index.h"
60 #include "miscadmin.h"
61 #include "pgstat.h"
62 #include "port/atomics.h"
63 #include "storage/bufmgr.h"
64 #include "storage/freespace.h"
65 #include "storage/lmgr.h"
66 #include "storage/predicate.h"
67 #include "storage/procarray.h"
68 #include "storage/smgr.h"
69 #include "storage/spin.h"
70 #include "storage/standby.h"
71 #include "utils/datum.h"
72 #include "utils/inval.h"
73 #include "utils/lsyscache.h"
74 #include "utils/relcache.h"
75 #include "utils/snapmgr.h"
76 #include "utils/syscache.h"
77 #include "utils/tqual.h"
78 #include "utils/memutils.h"
79 #include "nodes/execnodes.h"
80 #include "executor/executor.h"
81 
82 /* GUC variable */
84 
85 
87  Snapshot snapshot,
88  int nkeys, ScanKey key,
89  ParallelHeapScanDesc parallel_scan,
90  bool allow_strat,
91  bool allow_sync,
92  bool allow_pagemode,
93  bool is_bitmapscan,
94  bool is_samplescan,
95  bool temp_snap);
98 static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
99  TransactionId xid, CommandId cid, int options);
100 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
101  Buffer newbuf, HeapTuple oldtup,
102  HeapTuple newtup, HeapTuple old_key_tup,
103  bool all_visible_cleared, bool new_all_visible_cleared);
105  Bitmapset *interesting_cols,
106  HeapTuple oldtup, HeapTuple newtup);
107 static bool heap_acquire_tuplock(Relation relation, ItemPointer tid,
108  LockTupleMode mode, LockWaitPolicy wait_policy,
109  bool *have_tuple_lock);
110 static void compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask,
111  uint16 old_infomask2, TransactionId add_to_xmax,
112  LockTupleMode mode, bool is_update,
113  TransactionId *result_xmax, uint16 *result_infomask,
114  uint16 *result_infomask2);
116  ItemPointer ctid, TransactionId xid,
117  LockTupleMode mode);
118 static void GetMultiXactIdHintBits(MultiXactId multi, uint16 *new_infomask,
119  uint16 *new_infomask2);
121  uint16 t_infomask);
122 static bool DoesMultiXactIdConflict(MultiXactId multi, uint16 infomask,
123  LockTupleMode lockmode);
124 static void MultiXactIdWait(MultiXactId multi, MultiXactStatus status, uint16 infomask,
125  Relation rel, ItemPointer ctid, XLTW_Oper oper,
126  int *remaining);
128  uint16 infomask, Relation rel, int *remaining);
129 static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
130 static HeapTuple ExtractReplicaIdentity(Relation rel, HeapTuple tup, bool key_modified,
131  bool *copy);
132 static bool ProjIndexIsUnchanged(Relation relation, HeapTuple oldtup, HeapTuple newtup);
133 
134 
135 /*
136  * Each tuple lock mode has a corresponding heavyweight lock, and one or two
137  * corresponding MultiXactStatuses (one to merely lock tuples, another one to
138  * update them). This table (and the macros below) helps us determine the
139  * heavyweight lock mode and MultiXactStatus values to use for any particular
140  * tuple lock strength.
141  *
142  * Don't look at lockstatus/updstatus directly! Use get_mxact_status_for_lock
143  * instead.
144  */
145 static const struct
146 {
150 }
151 
153 {
154  { /* LockTupleKeyShare */
157  -1 /* KeyShare does not allow updating tuples */
158  },
159  { /* LockTupleShare */
160  RowShareLock,
162  -1 /* Share does not allow updating tuples */
163  },
164  { /* LockTupleNoKeyExclusive */
168  },
169  { /* LockTupleExclusive */
173  }
174 };
175 
176 /* Get the LOCKMODE for a given MultiXactStatus */
177 #define LOCKMODE_from_mxstatus(status) \
178  (tupleLockExtraInfo[TUPLOCK_from_mxstatus((status))].hwlock)
179 
180 /*
181  * Acquire heavyweight locks on tuples, using a LockTupleMode strength value.
182  * This is more readable than having every caller translate it to lock.h's
183  * LOCKMODE.
184  */
185 #define LockTupleTuplock(rel, tup, mode) \
186  LockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock)
187 #define UnlockTupleTuplock(rel, tup, mode) \
188  UnlockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock)
189 #define ConditionalLockTupleTuplock(rel, tup, mode) \
190  ConditionalLockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock)
191 
192 /*
193  * This table maps tuple lock strength values for each particular
194  * MultiXactStatus value.
195  */
197 {
198  LockTupleKeyShare, /* ForKeyShare */
199  LockTupleShare, /* ForShare */
200  LockTupleNoKeyExclusive, /* ForNoKeyUpdate */
201  LockTupleExclusive, /* ForUpdate */
202  LockTupleNoKeyExclusive, /* NoKeyUpdate */
203  LockTupleExclusive /* Update */
204 };
205 
206 /* Get the LockTupleMode for a given MultiXactStatus */
207 #define TUPLOCK_from_mxstatus(status) \
208  (MultiXactStatusLock[(status)])
209 
210 /* ----------------------------------------------------------------
211  * heap support routines
212  * ----------------------------------------------------------------
213  */
214 
215 /* ----------------
216  * initscan - scan code common to heap_beginscan and heap_rescan
217  * ----------------
218  */
219 static void
220 initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
221 {
222  bool allow_strat;
223  bool allow_sync;
224 
225  /*
226  * Determine the number of blocks we have to scan.
227  *
228  * It is sufficient to do this once at scan start, since any tuples added
229  * while the scan is in progress will be invisible to my snapshot anyway.
230  * (That is not true when using a non-MVCC snapshot. However, we couldn't
231  * guarantee to return tuples added after scan start anyway, since they
232  * might go into pages we already scanned. To guarantee consistent
233  * results for a non-MVCC snapshot, the caller must hold some higher-level
234  * lock that ensures the interesting tuple(s) won't change.)
235  */
236  if (scan->rs_parallel != NULL)
237  scan->rs_nblocks = scan->rs_parallel->phs_nblocks;
238  else
240 
241  /*
242  * If the table is large relative to NBuffers, use a bulk-read access
243  * strategy and enable synchronized scanning (see syncscan.c). Although
244  * the thresholds for these features could be different, we make them the
245  * same so that there are only two behaviors to tune rather than four.
246  * (However, some callers need to be able to disable one or both of these
247  * behaviors, independently of the size of the table; also there is a GUC
248  * variable that can disable synchronized scanning.)
249  *
250  * Note that heap_parallelscan_initialize has a very similar test; if you
251  * change this, consider changing that one, too.
252  */
253  if (!RelationUsesLocalBuffers(scan->rs_rd) &&
254  scan->rs_nblocks > NBuffers / 4)
255  {
256  allow_strat = scan->rs_allow_strat;
257  allow_sync = scan->rs_allow_sync;
258  }
259  else
260  allow_strat = allow_sync = false;
261 
262  if (allow_strat)
263  {
264  /* During a rescan, keep the previous strategy object. */
265  if (scan->rs_strategy == NULL)
267  }
268  else
269  {
270  if (scan->rs_strategy != NULL)
272  scan->rs_strategy = NULL;
273  }
274 
275  if (scan->rs_parallel != NULL)
276  {
277  /* For parallel scan, believe whatever ParallelHeapScanDesc says. */
278  scan->rs_syncscan = scan->rs_parallel->phs_syncscan;
279  }
280  else if (keep_startblock)
281  {
282  /*
283  * When rescanning, we want to keep the previous startblock setting,
284  * so that rewinding a cursor doesn't generate surprising results.
285  * Reset the active syncscan setting, though.
286  */
287  scan->rs_syncscan = (allow_sync && synchronize_seqscans);
288  }
289  else if (allow_sync && synchronize_seqscans)
290  {
291  scan->rs_syncscan = true;
292  scan->rs_startblock = ss_get_location(scan->rs_rd, scan->rs_nblocks);
293  }
294  else
295  {
296  scan->rs_syncscan = false;
297  scan->rs_startblock = 0;
298  }
299 
301  scan->rs_inited = false;
302  scan->rs_ctup.t_data = NULL;
304  scan->rs_cbuf = InvalidBuffer;
306 
307  /* page-at-a-time fields are always invalid when not rs_inited */
308 
309  /*
310  * copy the scan key, if appropriate
311  */
312  if (key != NULL)
313  memcpy(scan->rs_key, key, scan->rs_nkeys * sizeof(ScanKeyData));
314 
315  /*
316  * Currently, we don't have a stats counter for bitmap heap scans (but the
317  * underlying bitmap index scans will be counted) or sample scans (we only
318  * update stats for tuple fetches there)
319  */
320  if (!scan->rs_bitmapscan && !scan->rs_samplescan)
322 }
323 
324 /*
325  * heap_setscanlimits - restrict range of a heapscan
326  *
327  * startBlk is the page to start at
328  * numBlks is number of pages to scan (InvalidBlockNumber means "all")
329  */
330 void
332 {
333  Assert(!scan->rs_inited); /* else too late to change */
334  Assert(!scan->rs_syncscan); /* else rs_startblock is significant */
335 
336  /* Check startBlk is valid (but allow case of zero blocks...) */
337  Assert(startBlk == 0 || startBlk < scan->rs_nblocks);
338 
339  scan->rs_startblock = startBlk;
340  scan->rs_numblocks = numBlks;
341 }
342 
343 /*
344  * heapgetpage - subroutine for heapgettup()
345  *
346  * This routine reads and pins the specified page of the relation.
347  * In page-at-a-time mode it performs additional work, namely determining
348  * which tuples on the page are visible.
349  */
350 void
352 {
353  Buffer buffer;
354  Snapshot snapshot;
355  Page dp;
356  int lines;
357  int ntup;
358  OffsetNumber lineoff;
359  ItemId lpp;
360  bool all_visible;
361 
362  Assert(page < scan->rs_nblocks);
363 
364  /* release previous scan buffer, if any */
365  if (BufferIsValid(scan->rs_cbuf))
366  {
367  ReleaseBuffer(scan->rs_cbuf);
368  scan->rs_cbuf = InvalidBuffer;
369  }
370 
371  /*
372  * Be sure to check for interrupts at least once per page. Checks at
373  * higher code levels won't be able to stop a seqscan that encounters many
374  * pages' worth of consecutive dead tuples.
375  */
377 
378  /* read page using selected strategy */
379  scan->rs_cbuf = ReadBufferExtended(scan->rs_rd, MAIN_FORKNUM, page,
380  RBM_NORMAL, scan->rs_strategy);
381  scan->rs_cblock = page;
382 
383  if (!scan->rs_pageatatime)
384  return;
385 
386  buffer = scan->rs_cbuf;
387  snapshot = scan->rs_snapshot;
388 
389  /*
390  * Prune and repair fragmentation for the whole page, if possible.
391  */
392  heap_page_prune_opt(scan->rs_rd, buffer);
393 
394  /*
395  * We must hold share lock on the buffer content while examining tuple
396  * visibility. Afterwards, however, the tuples we have found to be
397  * visible are guaranteed good as long as we hold the buffer pin.
398  */
399  LockBuffer(buffer, BUFFER_LOCK_SHARE);
400 
401  dp = BufferGetPage(buffer);
402  TestForOldSnapshot(snapshot, scan->rs_rd, dp);
403  lines = PageGetMaxOffsetNumber(dp);
404  ntup = 0;
405 
406  /*
407  * If the all-visible flag indicates that all tuples on the page are
408  * visible to everyone, we can skip the per-tuple visibility tests.
409  *
410  * Note: In hot standby, a tuple that's already visible to all
411  * transactions in the master might still be invisible to a read-only
412  * transaction in the standby. We partly handle this problem by tracking
413  * the minimum xmin of visible tuples as the cut-off XID while marking a
414  * page all-visible on master and WAL log that along with the visibility
415  * map SET operation. In hot standby, we wait for (or abort) all
416  * transactions that can potentially may not see one or more tuples on the
417  * page. That's how index-only scans work fine in hot standby. A crucial
418  * difference between index-only scans and heap scans is that the
419  * index-only scan completely relies on the visibility map where as heap
420  * scan looks at the page-level PD_ALL_VISIBLE flag. We are not sure if
421  * the page-level flag can be trusted in the same way, because it might
422  * get propagated somehow without being explicitly WAL-logged, e.g. via a
423  * full page write. Until we can prove that beyond doubt, let's check each
424  * tuple for visibility the hard way.
425  */
426  all_visible = PageIsAllVisible(dp) && !snapshot->takenDuringRecovery;
427 
428  for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff);
429  lineoff <= lines;
430  lineoff++, lpp++)
431  {
432  if (ItemIdIsNormal(lpp))
433  {
434  HeapTupleData loctup;
435  bool valid;
436 
437  loctup.t_tableOid = RelationGetRelid(scan->rs_rd);
438  loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
439  loctup.t_len = ItemIdGetLength(lpp);
440  ItemPointerSet(&(loctup.t_self), page, lineoff);
441 
442  if (all_visible)
443  valid = true;
444  else
445  valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
446 
447  CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup,
448  buffer, snapshot);
449 
450  if (valid)
451  scan->rs_vistuples[ntup++] = lineoff;
452  }
453  }
454 
456 
457  Assert(ntup <= MaxHeapTuplesPerPage);
458  scan->rs_ntuples = ntup;
459 }
460 
461 /* ----------------
462  * heapgettup - fetch next heap tuple
463  *
464  * Initialize the scan if not already done; then advance to the next
465  * tuple as indicated by "dir"; return the next tuple in scan->rs_ctup,
466  * or set scan->rs_ctup.t_data = NULL if no more tuples.
467  *
468  * dir == NoMovementScanDirection means "re-fetch the tuple indicated
469  * by scan->rs_ctup".
470  *
471  * Note: the reason nkeys/key are passed separately, even though they are
472  * kept in the scan descriptor, is that the caller may not want us to check
473  * the scankeys.
474  *
475  * Note: when we fall off the end of the scan in either direction, we
476  * reset rs_inited. This means that a further request with the same
477  * scan direction will restart the scan, which is a bit odd, but a
478  * request with the opposite scan direction will start a fresh scan
479  * in the proper direction. The latter is required behavior for cursors,
480  * while the former case is generally undefined behavior in Postgres
481  * so we don't care too much.
482  * ----------------
483  */
484 static void
486  ScanDirection dir,
487  int nkeys,
488  ScanKey key)
489 {
490  HeapTuple tuple = &(scan->rs_ctup);
491  Snapshot snapshot = scan->rs_snapshot;
492  bool backward = ScanDirectionIsBackward(dir);
493  BlockNumber page;
494  bool finished;
495  Page dp;
496  int lines;
497  OffsetNumber lineoff;
498  int linesleft;
499  ItemId lpp;
500 
501  /*
502  * calculate next starting lineoff, given scan direction
503  */
504  if (ScanDirectionIsForward(dir))
505  {
506  if (!scan->rs_inited)
507  {
508  /*
509  * return null immediately if relation is empty
510  */
511  if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0)
512  {
513  Assert(!BufferIsValid(scan->rs_cbuf));
514  tuple->t_data = NULL;
515  return;
516  }
517  if (scan->rs_parallel != NULL)
518  {
520 
521  page = heap_parallelscan_nextpage(scan);
522 
523  /* Other processes might have already finished the scan. */
524  if (page == InvalidBlockNumber)
525  {
526  Assert(!BufferIsValid(scan->rs_cbuf));
527  tuple->t_data = NULL;
528  return;
529  }
530  }
531  else
532  page = scan->rs_startblock; /* first page */
533  heapgetpage(scan, page);
534  lineoff = FirstOffsetNumber; /* first offnum */
535  scan->rs_inited = true;
536  }
537  else
538  {
539  /* continue from previously returned page/tuple */
540  page = scan->rs_cblock; /* current page */
541  lineoff = /* next offnum */
543  }
544 
546 
547  dp = BufferGetPage(scan->rs_cbuf);
548  TestForOldSnapshot(snapshot, scan->rs_rd, dp);
549  lines = PageGetMaxOffsetNumber(dp);
550  /* page and lineoff now reference the physically next tid */
551 
552  linesleft = lines - lineoff + 1;
553  }
554  else if (backward)
555  {
556  /* backward parallel scan not supported */
557  Assert(scan->rs_parallel == NULL);
558 
559  if (!scan->rs_inited)
560  {
561  /*
562  * return null immediately if relation is empty
563  */
564  if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0)
565  {
566  Assert(!BufferIsValid(scan->rs_cbuf));
567  tuple->t_data = NULL;
568  return;
569  }
570 
571  /*
572  * Disable reporting to syncscan logic in a backwards scan; it's
573  * not very likely anyone else is doing the same thing at the same
574  * time, and much more likely that we'll just bollix things for
575  * forward scanners.
576  */
577  scan->rs_syncscan = false;
578  /* start from last page of the scan */
579  if (scan->rs_startblock > 0)
580  page = scan->rs_startblock - 1;
581  else
582  page = scan->rs_nblocks - 1;
583  heapgetpage(scan, page);
584  }
585  else
586  {
587  /* continue from previously returned page/tuple */
588  page = scan->rs_cblock; /* current page */
589  }
590 
592 
593  dp = BufferGetPage(scan->rs_cbuf);
594  TestForOldSnapshot(snapshot, scan->rs_rd, dp);
595  lines = PageGetMaxOffsetNumber(dp);
596 
597  if (!scan->rs_inited)
598  {
599  lineoff = lines; /* final offnum */
600  scan->rs_inited = true;
601  }
602  else
603  {
604  lineoff = /* previous offnum */
606  }
607  /* page and lineoff now reference the physically previous tid */
608 
609  linesleft = lineoff;
610  }
611  else
612  {
613  /*
614  * ``no movement'' scan direction: refetch prior tuple
615  */
616  if (!scan->rs_inited)
617  {
618  Assert(!BufferIsValid(scan->rs_cbuf));
619  tuple->t_data = NULL;
620  return;
621  }
622 
623  page = ItemPointerGetBlockNumber(&(tuple->t_self));
624  if (page != scan->rs_cblock)
625  heapgetpage(scan, page);
626 
627  /* Since the tuple was previously fetched, needn't lock page here */
628  dp = BufferGetPage(scan->rs_cbuf);
629  TestForOldSnapshot(snapshot, scan->rs_rd, dp);
630  lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
631  lpp = PageGetItemId(dp, lineoff);
632  Assert(ItemIdIsNormal(lpp));
633 
634  tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
635  tuple->t_len = ItemIdGetLength(lpp);
636 
637  return;
638  }
639 
640  /*
641  * advance the scan until we find a qualifying tuple or run out of stuff
642  * to scan
643  */
644  lpp = PageGetItemId(dp, lineoff);
645  for (;;)
646  {
647  while (linesleft > 0)
648  {
649  if (ItemIdIsNormal(lpp))
650  {
651  bool valid;
652 
653  tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
654  tuple->t_len = ItemIdGetLength(lpp);
655  ItemPointerSet(&(tuple->t_self), page, lineoff);
656 
657  /*
658  * if current tuple qualifies, return it.
659  */
660  valid = HeapTupleSatisfiesVisibility(tuple,
661  snapshot,
662  scan->rs_cbuf);
663 
664  CheckForSerializableConflictOut(valid, scan->rs_rd, tuple,
665  scan->rs_cbuf, snapshot);
666 
667  if (valid && key != NULL)
668  HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
669  nkeys, key, valid);
670 
671  if (valid)
672  {
674  return;
675  }
676  }
677 
678  /*
679  * otherwise move to the next item on the page
680  */
681  --linesleft;
682  if (backward)
683  {
684  --lpp; /* move back in this page's ItemId array */
685  --lineoff;
686  }
687  else
688  {
689  ++lpp; /* move forward in this page's ItemId array */
690  ++lineoff;
691  }
692  }
693 
694  /*
695  * if we get here, it means we've exhausted the items on this page and
696  * it's time to move to the next.
697  */
699 
700  /*
701  * advance to next/prior page and detect end of scan
702  */
703  if (backward)
704  {
705  finished = (page == scan->rs_startblock) ||
706  (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false);
707  if (page == 0)
708  page = scan->rs_nblocks;
709  page--;
710  }
711  else if (scan->rs_parallel != NULL)
712  {
713  page = heap_parallelscan_nextpage(scan);
714  finished = (page == InvalidBlockNumber);
715  }
716  else
717  {
718  page++;
719  if (page >= scan->rs_nblocks)
720  page = 0;
721  finished = (page == scan->rs_startblock) ||
722  (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false);
723 
724  /*
725  * Report our new scan position for synchronization purposes. We
726  * don't do that when moving backwards, however. That would just
727  * mess up any other forward-moving scanners.
728  *
729  * Note: we do this before checking for end of scan so that the
730  * final state of the position hint is back at the start of the
731  * rel. That's not strictly necessary, but otherwise when you run
732  * the same query multiple times the starting position would shift
733  * a little bit backwards on every invocation, which is confusing.
734  * We don't guarantee any specific ordering in general, though.
735  */
736  if (scan->rs_syncscan)
737  ss_report_location(scan->rs_rd, page);
738  }
739 
740  /*
741  * return NULL if we've exhausted all the pages
742  */
743  if (finished)
744  {
745  if (BufferIsValid(scan->rs_cbuf))
746  ReleaseBuffer(scan->rs_cbuf);
747  scan->rs_cbuf = InvalidBuffer;
749  tuple->t_data = NULL;
750  scan->rs_inited = false;
751  return;
752  }
753 
754  heapgetpage(scan, page);
755 
757 
758  dp = BufferGetPage(scan->rs_cbuf);
759  TestForOldSnapshot(snapshot, scan->rs_rd, dp);
760  lines = PageGetMaxOffsetNumber((Page) dp);
761  linesleft = lines;
762  if (backward)
763  {
764  lineoff = lines;
765  lpp = PageGetItemId(dp, lines);
766  }
767  else
768  {
769  lineoff = FirstOffsetNumber;
770  lpp = PageGetItemId(dp, FirstOffsetNumber);
771  }
772  }
773 }
774 
775 /* ----------------
776  * heapgettup_pagemode - fetch next heap tuple in page-at-a-time mode
777  *
778  * Same API as heapgettup, but used in page-at-a-time mode
779  *
780  * The internal logic is much the same as heapgettup's too, but there are some
781  * differences: we do not take the buffer content lock (that only needs to
782  * happen inside heapgetpage), and we iterate through just the tuples listed
783  * in rs_vistuples[] rather than all tuples on the page. Notice that
784  * lineindex is 0-based, where the corresponding loop variable lineoff in
785  * heapgettup is 1-based.
786  * ----------------
787  */
788 static void
790  ScanDirection dir,
791  int nkeys,
792  ScanKey key)
793 {
794  HeapTuple tuple = &(scan->rs_ctup);
795  bool backward = ScanDirectionIsBackward(dir);
796  BlockNumber page;
797  bool finished;
798  Page dp;
799  int lines;
800  int lineindex;
801  OffsetNumber lineoff;
802  int linesleft;
803  ItemId lpp;
804 
805  /*
806  * calculate next starting lineindex, given scan direction
807  */
808  if (ScanDirectionIsForward(dir))
809  {
810  if (!scan->rs_inited)
811  {
812  /*
813  * return null immediately if relation is empty
814  */
815  if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0)
816  {
817  Assert(!BufferIsValid(scan->rs_cbuf));
818  tuple->t_data = NULL;
819  return;
820  }
821  if (scan->rs_parallel != NULL)
822  {
824 
825  page = heap_parallelscan_nextpage(scan);
826 
827  /* Other processes might have already finished the scan. */
828  if (page == InvalidBlockNumber)
829  {
830  Assert(!BufferIsValid(scan->rs_cbuf));
831  tuple->t_data = NULL;
832  return;
833  }
834  }
835  else
836  page = scan->rs_startblock; /* first page */
837  heapgetpage(scan, page);
838  lineindex = 0;
839  scan->rs_inited = true;
840  }
841  else
842  {
843  /* continue from previously returned page/tuple */
844  page = scan->rs_cblock; /* current page */
845  lineindex = scan->rs_cindex + 1;
846  }
847 
848  dp = BufferGetPage(scan->rs_cbuf);
849  TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
850  lines = scan->rs_ntuples;
851  /* page and lineindex now reference the next visible tid */
852 
853  linesleft = lines - lineindex;
854  }
855  else if (backward)
856  {
857  /* backward parallel scan not supported */
858  Assert(scan->rs_parallel == NULL);
859 
860  if (!scan->rs_inited)
861  {
862  /*
863  * return null immediately if relation is empty
864  */
865  if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0)
866  {
867  Assert(!BufferIsValid(scan->rs_cbuf));
868  tuple->t_data = NULL;
869  return;
870  }
871 
872  /*
873  * Disable reporting to syncscan logic in a backwards scan; it's
874  * not very likely anyone else is doing the same thing at the same
875  * time, and much more likely that we'll just bollix things for
876  * forward scanners.
877  */
878  scan->rs_syncscan = false;
879  /* start from last page of the scan */
880  if (scan->rs_startblock > 0)
881  page = scan->rs_startblock - 1;
882  else
883  page = scan->rs_nblocks - 1;
884  heapgetpage(scan, page);
885  }
886  else
887  {
888  /* continue from previously returned page/tuple */
889  page = scan->rs_cblock; /* current page */
890  }
891 
892  dp = BufferGetPage(scan->rs_cbuf);
893  TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
894  lines = scan->rs_ntuples;
895 
896  if (!scan->rs_inited)
897  {
898  lineindex = lines - 1;
899  scan->rs_inited = true;
900  }
901  else
902  {
903  lineindex = scan->rs_cindex - 1;
904  }
905  /* page and lineindex now reference the previous visible tid */
906 
907  linesleft = lineindex + 1;
908  }
909  else
910  {
911  /*
912  * ``no movement'' scan direction: refetch prior tuple
913  */
914  if (!scan->rs_inited)
915  {
916  Assert(!BufferIsValid(scan->rs_cbuf));
917  tuple->t_data = NULL;
918  return;
919  }
920 
921  page = ItemPointerGetBlockNumber(&(tuple->t_self));
922  if (page != scan->rs_cblock)
923  heapgetpage(scan, page);
924 
925  /* Since the tuple was previously fetched, needn't lock page here */
926  dp = BufferGetPage(scan->rs_cbuf);
927  TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
928  lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
929  lpp = PageGetItemId(dp, lineoff);
930  Assert(ItemIdIsNormal(lpp));
931 
932  tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
933  tuple->t_len = ItemIdGetLength(lpp);
934 
935  /* check that rs_cindex is in sync */
936  Assert(scan->rs_cindex < scan->rs_ntuples);
937  Assert(lineoff == scan->rs_vistuples[scan->rs_cindex]);
938 
939  return;
940  }
941 
942  /*
943  * advance the scan until we find a qualifying tuple or run out of stuff
944  * to scan
945  */
946  for (;;)
947  {
948  while (linesleft > 0)
949  {
950  lineoff = scan->rs_vistuples[lineindex];
951  lpp = PageGetItemId(dp, lineoff);
952  Assert(ItemIdIsNormal(lpp));
953 
954  tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
955  tuple->t_len = ItemIdGetLength(lpp);
956  ItemPointerSet(&(tuple->t_self), page, lineoff);
957 
958  /*
959  * if current tuple qualifies, return it.
960  */
961  if (key != NULL)
962  {
963  bool valid;
964 
965  HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
966  nkeys, key, valid);
967  if (valid)
968  {
969  scan->rs_cindex = lineindex;
970  return;
971  }
972  }
973  else
974  {
975  scan->rs_cindex = lineindex;
976  return;
977  }
978 
979  /*
980  * otherwise move to the next item on the page
981  */
982  --linesleft;
983  if (backward)
984  --lineindex;
985  else
986  ++lineindex;
987  }
988 
989  /*
990  * if we get here, it means we've exhausted the items on this page and
991  * it's time to move to the next.
992  */
993  if (backward)
994  {
995  finished = (page == scan->rs_startblock) ||
996  (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false);
997  if (page == 0)
998  page = scan->rs_nblocks;
999  page--;
1000  }
1001  else if (scan->rs_parallel != NULL)
1002  {
1003  page = heap_parallelscan_nextpage(scan);
1004  finished = (page == InvalidBlockNumber);
1005  }
1006  else
1007  {
1008  page++;
1009  if (page >= scan->rs_nblocks)
1010  page = 0;
1011  finished = (page == scan->rs_startblock) ||
1012  (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false);
1013 
1014  /*
1015  * Report our new scan position for synchronization purposes. We
1016  * don't do that when moving backwards, however. That would just
1017  * mess up any other forward-moving scanners.
1018  *
1019  * Note: we do this before checking for end of scan so that the
1020  * final state of the position hint is back at the start of the
1021  * rel. That's not strictly necessary, but otherwise when you run
1022  * the same query multiple times the starting position would shift
1023  * a little bit backwards on every invocation, which is confusing.
1024  * We don't guarantee any specific ordering in general, though.
1025  */
1026  if (scan->rs_syncscan)
1027  ss_report_location(scan->rs_rd, page);
1028  }
1029 
1030  /*
1031  * return NULL if we've exhausted all the pages
1032  */
1033  if (finished)
1034  {
1035  if (BufferIsValid(scan->rs_cbuf))
1036  ReleaseBuffer(scan->rs_cbuf);
1037  scan->rs_cbuf = InvalidBuffer;
1038  scan->rs_cblock = InvalidBlockNumber;
1039  tuple->t_data = NULL;
1040  scan->rs_inited = false;
1041  return;
1042  }
1043 
1044  heapgetpage(scan, page);
1045 
1046  dp = BufferGetPage(scan->rs_cbuf);
1047  TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
1048  lines = scan->rs_ntuples;
1049  linesleft = lines;
1050  if (backward)
1051  lineindex = lines - 1;
1052  else
1053  lineindex = 0;
1054  }
1055 }
1056 
1057 
1058 #if defined(DISABLE_COMPLEX_MACRO)
1059 /*
1060  * This is formatted so oddly so that the correspondence to the macro
1061  * definition in access/htup_details.h is maintained.
1062  */
1063 Datum
1065  bool *isnull)
1066 {
1067  return (
1068  (attnum) > 0 ?
1069  (
1070  (*(isnull) = false),
1071  HeapTupleNoNulls(tup) ?
1072  (
1073  TupleDescAttr((tupleDesc), (attnum) - 1)->attcacheoff >= 0 ?
1074  (
1075  fetchatt(TupleDescAttr((tupleDesc), (attnum) - 1),
1076  (char *) (tup)->t_data + (tup)->t_data->t_hoff +
1077  TupleDescAttr((tupleDesc), (attnum) - 1)->attcacheoff)
1078  )
1079  :
1080  nocachegetattr((tup), (attnum), (tupleDesc))
1081  )
1082  :
1083  (
1084  att_isnull((attnum) - 1, (tup)->t_data->t_bits) ?
1085  (
1086  (*(isnull) = true),
1087  (Datum) NULL
1088  )
1089  :
1090  (
1091  nocachegetattr((tup), (attnum), (tupleDesc))
1092  )
1093  )
1094  )
1095  :
1096  (
1097  (Datum) NULL
1098  )
1099  );
1100 }
1101 #endif /* defined(DISABLE_COMPLEX_MACRO) */
1102 
1103 
1104 /* ----------------------------------------------------------------
1105  * heap access method interface
1106  * ----------------------------------------------------------------
1107  */
1108 
1109 /* ----------------
1110  * relation_open - open any relation by relation OID
1111  *
1112  * If lockmode is not "NoLock", the specified kind of lock is
1113  * obtained on the relation. (Generally, NoLock should only be
1114  * used if the caller knows it has some appropriate lock on the
1115  * relation already.)
1116  *
1117  * An error is raised if the relation does not exist.
1118  *
1119  * NB: a "relation" is anything with a pg_class entry. The caller is
1120  * expected to check whether the relkind is something it can handle.
1121  * ----------------
1122  */
1123 Relation
1124 relation_open(Oid relationId, LOCKMODE lockmode)
1125 {
1126  Relation r;
1127 
1128  Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
1129 
1130  /* Get the lock before trying to open the relcache entry */
1131  if (lockmode != NoLock)
1132  LockRelationOid(relationId, lockmode);
1133 
1134  /* The relcache does all the real work... */
1135  r = RelationIdGetRelation(relationId);
1136 
1137  if (!RelationIsValid(r))
1138  elog(ERROR, "could not open relation with OID %u", relationId);
1139 
1140  /* Make note that we've accessed a temporary relation */
1141  if (RelationUsesLocalBuffers(r))
1143 
1144  pgstat_initstats(r);
1145 
1146  return r;
1147 }
1148 
1149 /* ----------------
1150  * try_relation_open - open any relation by relation OID
1151  *
1152  * Same as relation_open, except return NULL instead of failing
1153  * if the relation does not exist.
1154  * ----------------
1155  */
1156 Relation
1157 try_relation_open(Oid relationId, LOCKMODE lockmode)
1158 {
1159  Relation r;
1160 
1161  Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
1162 
1163  /* Get the lock first */
1164  if (lockmode != NoLock)
1165  LockRelationOid(relationId, lockmode);
1166 
1167  /*
1168  * Now that we have the lock, probe to see if the relation really exists
1169  * or not.
1170  */
1171  if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relationId)))
1172  {
1173  /* Release useless lock */
1174  if (lockmode != NoLock)
1175  UnlockRelationOid(relationId, lockmode);
1176 
1177  return NULL;
1178  }
1179 
1180  /* Should be safe to do a relcache load */
1181  r = RelationIdGetRelation(relationId);
1182 
1183  if (!RelationIsValid(r))
1184  elog(ERROR, "could not open relation with OID %u", relationId);
1185 
1186  /* Make note that we've accessed a temporary relation */
1187  if (RelationUsesLocalBuffers(r))
1189 
1190  pgstat_initstats(r);
1191 
1192  return r;
1193 }
1194 
1195 /* ----------------
1196  * relation_openrv - open any relation specified by a RangeVar
1197  *
1198  * Same as relation_open, but the relation is specified by a RangeVar.
1199  * ----------------
1200  */
1201 Relation
1202 relation_openrv(const RangeVar *relation, LOCKMODE lockmode)
1203 {
1204  Oid relOid;
1205 
1206  /*
1207  * Check for shared-cache-inval messages before trying to open the
1208  * relation. This is needed even if we already hold a lock on the
1209  * relation, because GRANT/REVOKE are executed without taking any lock on
1210  * the target relation, and we want to be sure we see current ACL
1211  * information. We can skip this if asked for NoLock, on the assumption
1212  * that such a call is not the first one in the current command, and so we
1213  * should be reasonably up-to-date already. (XXX this all could stand to
1214  * be redesigned, but for the moment we'll keep doing this like it's been
1215  * done historically.)
1216  */
1217  if (lockmode != NoLock)
1219 
1220  /* Look up and lock the appropriate relation using namespace search */
1221  relOid = RangeVarGetRelid(relation, lockmode, false);
1222 
1223  /* Let relation_open do the rest */
1224  return relation_open(relOid, NoLock);
1225 }
1226 
1227 /* ----------------
1228  * relation_openrv_extended - open any relation specified by a RangeVar
1229  *
1230  * Same as relation_openrv, but with an additional missing_ok argument
1231  * allowing a NULL return rather than an error if the relation is not
1232  * found. (Note that some other causes, such as permissions problems,
1233  * will still result in an ereport.)
1234  * ----------------
1235  */
1236 Relation
1237 relation_openrv_extended(const RangeVar *relation, LOCKMODE lockmode,
1238  bool missing_ok)
1239 {
1240  Oid relOid;
1241 
1242  /*
1243  * Check for shared-cache-inval messages before trying to open the
1244  * relation. See comments in relation_openrv().
1245  */
1246  if (lockmode != NoLock)
1248 
1249  /* Look up and lock the appropriate relation using namespace search */
1250  relOid = RangeVarGetRelid(relation, lockmode, missing_ok);
1251 
1252  /* Return NULL on not-found */
1253  if (!OidIsValid(relOid))
1254  return NULL;
1255 
1256  /* Let relation_open do the rest */
1257  return relation_open(relOid, NoLock);
1258 }
1259 
1260 /* ----------------
1261  * relation_close - close any relation
1262  *
1263  * If lockmode is not "NoLock", we then release the specified lock.
1264  *
1265  * Note that it is often sensible to hold a lock beyond relation_close;
1266  * in that case, the lock is released automatically at xact end.
1267  * ----------------
1268  */
1269 void
1270 relation_close(Relation relation, LOCKMODE lockmode)
1271 {
1272  LockRelId relid = relation->rd_lockInfo.lockRelId;
1273 
1274  Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
1275 
1276  /* The relcache does the real work... */
1277  RelationClose(relation);
1278 
1279  if (lockmode != NoLock)
1280  UnlockRelationId(&relid, lockmode);
1281 }
1282 
1283 
1284 /* ----------------
1285  * heap_open - open a heap relation by relation OID
1286  *
1287  * This is essentially relation_open plus check that the relation
1288  * is not an index nor a composite type. (The caller should also
1289  * check that it's not a view or foreign table before assuming it has
1290  * storage.)
1291  * ----------------
1292  */
1293 Relation
1294 heap_open(Oid relationId, LOCKMODE lockmode)
1295 {
1296  Relation r;
1297 
1298  r = relation_open(relationId, lockmode);
1299 
1300  if (r->rd_rel->relkind == RELKIND_INDEX ||
1301  r->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
1302  ereport(ERROR,
1303  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1304  errmsg("\"%s\" is an index",
1306  else if (r->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
1307  ereport(ERROR,
1308  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1309  errmsg("\"%s\" is a composite type",
1311 
1312  return r;
1313 }
1314 
1315 /* ----------------
1316  * heap_openrv - open a heap relation specified
1317  * by a RangeVar node
1318  *
1319  * As above, but relation is specified by a RangeVar.
1320  * ----------------
1321  */
1322 Relation
1323 heap_openrv(const RangeVar *relation, LOCKMODE lockmode)
1324 {
1325  Relation r;
1326 
1327  r = relation_openrv(relation, lockmode);
1328 
1329  if (r->rd_rel->relkind == RELKIND_INDEX ||
1330  r->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
1331  ereport(ERROR,
1332  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1333  errmsg("\"%s\" is an index",
1335  else if (r->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
1336  ereport(ERROR,
1337  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1338  errmsg("\"%s\" is a composite type",
1340 
1341  return r;
1342 }
1343 
1344 /* ----------------
1345  * heap_openrv_extended - open a heap relation specified
1346  * by a RangeVar node
1347  *
1348  * As above, but optionally return NULL instead of failing for
1349  * relation-not-found.
1350  * ----------------
1351  */
1352 Relation
1353 heap_openrv_extended(const RangeVar *relation, LOCKMODE lockmode,
1354  bool missing_ok)
1355 {
1356  Relation r;
1357 
1358  r = relation_openrv_extended(relation, lockmode, missing_ok);
1359 
1360  if (r)
1361  {
1362  if (r->rd_rel->relkind == RELKIND_INDEX ||
1363  r->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
1364  ereport(ERROR,
1365  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1366  errmsg("\"%s\" is an index",
1368  else if (r->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
1369  ereport(ERROR,
1370  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1371  errmsg("\"%s\" is a composite type",
1373  }
1374 
1375  return r;
1376 }
1377 
1378 
1379 /* ----------------
1380  * heap_beginscan - begin relation scan
1381  *
1382  * heap_beginscan is the "standard" case.
1383  *
1384  * heap_beginscan_catalog differs in setting up its own temporary snapshot.
1385  *
1386  * heap_beginscan_strat offers an extended API that lets the caller control
1387  * whether a nondefault buffer access strategy can be used, and whether
1388  * syncscan can be chosen (possibly resulting in the scan not starting from
1389  * block zero). Both of these default to true with plain heap_beginscan.
1390  *
1391  * heap_beginscan_bm is an alternative entry point for setting up a
1392  * HeapScanDesc for a bitmap heap scan. Although that scan technology is
1393  * really quite unlike a standard seqscan, there is just enough commonality
1394  * to make it worth using the same data structure.
1395  *
1396  * heap_beginscan_sampling is an alternative entry point for setting up a
1397  * HeapScanDesc for a TABLESAMPLE scan. As with bitmap scans, it's worth
1398  * using the same data structure although the behavior is rather different.
1399  * In addition to the options offered by heap_beginscan_strat, this call
1400  * also allows control of whether page-mode visibility checking is used.
1401  * ----------------
1402  */
1404 heap_beginscan(Relation relation, Snapshot snapshot,
1405  int nkeys, ScanKey key)
1406 {
1407  return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
1408  true, true, true, false, false, false);
1409 }
1410 
1412 heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
1413 {
1414  Oid relid = RelationGetRelid(relation);
1415  Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid));
1416 
1417  return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
1418  true, true, true, false, false, true);
1419 }
1420 
1423  int nkeys, ScanKey key,
1424  bool allow_strat, bool allow_sync)
1425 {
1426  return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
1427  allow_strat, allow_sync, true,
1428  false, false, false);
1429 }
1430 
1433  int nkeys, ScanKey key)
1434 {
1435  return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
1436  false, false, true, true, false, false);
1437 }
1438 
1441  int nkeys, ScanKey key,
1442  bool allow_strat, bool allow_sync, bool allow_pagemode)
1443 {
1444  return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
1445  allow_strat, allow_sync, allow_pagemode,
1446  false, true, false);
1447 }
1448 
1449 static HeapScanDesc
1451  int nkeys, ScanKey key,
1452  ParallelHeapScanDesc parallel_scan,
1453  bool allow_strat,
1454  bool allow_sync,
1455  bool allow_pagemode,
1456  bool is_bitmapscan,
1457  bool is_samplescan,
1458  bool temp_snap)
1459 {
1460  HeapScanDesc scan;
1461 
1462  /*
1463  * increment relation ref count while scanning relation
1464  *
1465  * This is just to make really sure the relcache entry won't go away while
1466  * the scan has a pointer to it. Caller should be holding the rel open
1467  * anyway, so this is redundant in all normal scenarios...
1468  */
1470 
1471  /*
1472  * allocate and initialize scan descriptor
1473  */
1474  scan = (HeapScanDesc) palloc(sizeof(HeapScanDescData));
1475 
1476  scan->rs_rd = relation;
1477  scan->rs_snapshot = snapshot;
1478  scan->rs_nkeys = nkeys;
1479  scan->rs_bitmapscan = is_bitmapscan;
1480  scan->rs_samplescan = is_samplescan;
1481  scan->rs_strategy = NULL; /* set in initscan */
1482  scan->rs_allow_strat = allow_strat;
1483  scan->rs_allow_sync = allow_sync;
1484  scan->rs_temp_snap = temp_snap;
1485  scan->rs_parallel = parallel_scan;
1486 
1487  /*
1488  * we can use page-at-a-time mode if it's an MVCC-safe snapshot
1489  */
1490  scan->rs_pageatatime = allow_pagemode && IsMVCCSnapshot(snapshot);
1491 
1492  /*
1493  * For a seqscan in a serializable transaction, acquire a predicate lock
1494  * on the entire relation. This is required not only to lock all the
1495  * matching tuples, but also to conflict with new insertions into the
1496  * table. In an indexscan, we take page locks on the index pages covering
1497  * the range specified in the scan qual, but in a heap scan there is
1498  * nothing more fine-grained to lock. A bitmap scan is a different story,
1499  * there we have already scanned the index and locked the index pages
1500  * covering the predicate. But in that case we still have to lock any
1501  * matching heap tuples.
1502  */
1503  if (!is_bitmapscan)
1504  PredicateLockRelation(relation, snapshot);
1505 
1506  /* we only need to set this up once */
1507  scan->rs_ctup.t_tableOid = RelationGetRelid(relation);
1508 
1509  /*
1510  * we do this here instead of in initscan() because heap_rescan also calls
1511  * initscan() and we don't want to allocate memory again
1512  */
1513  if (nkeys > 0)
1514  scan->rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys);
1515  else
1516  scan->rs_key = NULL;
1517 
1518  initscan(scan, key, false);
1519 
1520  return scan;
1521 }
1522 
1523 /* ----------------
1524  * heap_rescan - restart a relation scan
1525  * ----------------
1526  */
1527 void
1529  ScanKey key)
1530 {
1531  /*
1532  * unpin scan buffers
1533  */
1534  if (BufferIsValid(scan->rs_cbuf))
1535  ReleaseBuffer(scan->rs_cbuf);
1536 
1537  /*
1538  * reinitialize scan descriptor
1539  */
1540  initscan(scan, key, true);
1541 }
1542 
1543 /* ----------------
1544  * heap_rescan_set_params - restart a relation scan after changing params
1545  *
1546  * This call allows changing the buffer strategy, syncscan, and pagemode
1547  * options before starting a fresh scan. Note that although the actual use
1548  * of syncscan might change (effectively, enabling or disabling reporting),
1549  * the previously selected startblock will be kept.
1550  * ----------------
1551  */
1552 void
1554  bool allow_strat, bool allow_sync, bool allow_pagemode)
1555 {
1556  /* adjust parameters */
1557  scan->rs_allow_strat = allow_strat;
1558  scan->rs_allow_sync = allow_sync;
1559  scan->rs_pageatatime = allow_pagemode && IsMVCCSnapshot(scan->rs_snapshot);
1560  /* ... and rescan */
1561  heap_rescan(scan, key);
1562 }
1563 
1564 /* ----------------
1565  * heap_endscan - end relation scan
1566  *
1567  * See how to integrate with index scans.
1568  * Check handling if reldesc caching.
1569  * ----------------
1570  */
1571 void
1573 {
1574  /* Note: no locking manipulations needed */
1575 
1576  /*
1577  * unpin scan buffers
1578  */
1579  if (BufferIsValid(scan->rs_cbuf))
1580  ReleaseBuffer(scan->rs_cbuf);
1581 
1582  /*
1583  * decrement relation reference count and free scan descriptor storage
1584  */
1586 
1587  if (scan->rs_key)
1588  pfree(scan->rs_key);
1589 
1590  if (scan->rs_strategy != NULL)
1592 
1593  if (scan->rs_temp_snap)
1595 
1596  pfree(scan);
1597 }
1598 
1599 /* ----------------
1600  * heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc
1601  *
1602  * Sadly, this doesn't reduce to a constant, because the size required
1603  * to serialize the snapshot can vary.
1604  * ----------------
1605  */
1606 Size
1608 {
1609  return add_size(offsetof(ParallelHeapScanDescData, phs_snapshot_data),
1610  EstimateSnapshotSpace(snapshot));
1611 }
1612 
1613 /* ----------------
1614  * heap_parallelscan_initialize - initialize ParallelHeapScanDesc
1615  *
1616  * Must allow as many bytes of shared memory as returned by
1617  * heap_parallelscan_estimate. Call this just once in the leader
1618  * process; then, individual workers attach via heap_beginscan_parallel.
1619  * ----------------
1620  */
1621 void
1623  Snapshot snapshot)
1624 {
1625  target->phs_relid = RelationGetRelid(relation);
1626  target->phs_nblocks = RelationGetNumberOfBlocks(relation);
1627  /* compare phs_syncscan initialization to similar logic in initscan */
1628  target->phs_syncscan = synchronize_seqscans &&
1629  !RelationUsesLocalBuffers(relation) &&
1630  target->phs_nblocks > NBuffers / 4;
1631  SpinLockInit(&target->phs_mutex);
1633  pg_atomic_init_u64(&target->phs_nallocated, 0);
1634  if (IsMVCCSnapshot(snapshot))
1635  {
1636  SerializeSnapshot(snapshot, target->phs_snapshot_data);
1637  target->phs_snapshot_any = false;
1638  }
1639  else
1640  {
1641  Assert(snapshot == SnapshotAny);
1642  target->phs_snapshot_any = true;
1643  }
1644 }
1645 
1646 /* ----------------
1647  * heap_parallelscan_reinitialize - reset a parallel scan
1648  *
1649  * Call this in the leader process. Caller is responsible for
1650  * making sure that all workers have finished the scan beforehand.
1651  * ----------------
1652  */
1653 void
1655 {
1656  pg_atomic_write_u64(&parallel_scan->phs_nallocated, 0);
1657 }
1658 
1659 /* ----------------
1660  * heap_beginscan_parallel - join a parallel scan
1661  *
1662  * Caller must hold a suitable lock on the correct relation.
1663  * ----------------
1664  */
1667 {
1668  Snapshot snapshot;
1669 
1670  Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
1671 
1672  if (!parallel_scan->phs_snapshot_any)
1673  {
1674  /* Snapshot was serialized -- restore it */
1675  snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
1676  RegisterSnapshot(snapshot);
1677  }
1678  else
1679  {
1680  /* SnapshotAny passed by caller (not serialized) */
1681  snapshot = SnapshotAny;
1682  }
1683 
1684  return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan,
1685  true, true, true, false, false,
1686  !parallel_scan->phs_snapshot_any);
1687 }
1688 
1689 /* ----------------
1690  * heap_parallelscan_startblock_init - find and set the scan's startblock
1691  *
1692  * Determine where the parallel seq scan should start. This function may
1693  * be called many times, once by each parallel worker. We must be careful
1694  * only to set the startblock once.
1695  * ----------------
1696  */
1697 static void
1699 {
1700  BlockNumber sync_startpage = InvalidBlockNumber;
1701  ParallelHeapScanDesc parallel_scan;
1702 
1703  Assert(scan->rs_parallel);
1704  parallel_scan = scan->rs_parallel;
1705 
1706 retry:
1707  /* Grab the spinlock. */
1708  SpinLockAcquire(&parallel_scan->phs_mutex);
1709 
1710  /*
1711  * If the scan's startblock has not yet been initialized, we must do so
1712  * now. If this is not a synchronized scan, we just start at block 0, but
1713  * if it is a synchronized scan, we must get the starting position from
1714  * the synchronized scan machinery. We can't hold the spinlock while
1715  * doing that, though, so release the spinlock, get the information we
1716  * need, and retry. If nobody else has initialized the scan in the
1717  * meantime, we'll fill in the value we fetched on the second time
1718  * through.
1719  */
1720  if (parallel_scan->phs_startblock == InvalidBlockNumber)
1721  {
1722  if (!parallel_scan->phs_syncscan)
1723  parallel_scan->phs_startblock = 0;
1724  else if (sync_startpage != InvalidBlockNumber)
1725  parallel_scan->phs_startblock = sync_startpage;
1726  else
1727  {
1728  SpinLockRelease(&parallel_scan->phs_mutex);
1729  sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks);
1730  goto retry;
1731  }
1732  }
1733  SpinLockRelease(&parallel_scan->phs_mutex);
1734 }
1735 
1736 /* ----------------
1737  * heap_parallelscan_nextpage - get the next page to scan
1738  *
1739  * Get the next page to scan. Even if there are no pages left to scan,
1740  * another backend could have grabbed a page to scan and not yet finished
1741  * looking at it, so it doesn't follow that the scan is done when the
1742  * first backend gets an InvalidBlockNumber return.
1743  * ----------------
1744  */
1745 static BlockNumber
1747 {
1748  BlockNumber page;
1749  ParallelHeapScanDesc parallel_scan;
1750  uint64 nallocated;
1751 
1752  Assert(scan->rs_parallel);
1753  parallel_scan = scan->rs_parallel;
1754 
1755  /*
1756  * phs_nallocated tracks how many pages have been allocated to workers
1757  * already. When phs_nallocated >= rs_nblocks, all blocks have been
1758  * allocated.
1759  *
1760  * Because we use an atomic fetch-and-add to fetch the current value, the
1761  * phs_nallocated counter will exceed rs_nblocks, because workers will
1762  * still increment the value, when they try to allocate the next block but
1763  * all blocks have been allocated already. The counter must be 64 bits
1764  * wide because of that, to avoid wrapping around when rs_nblocks is close
1765  * to 2^32.
1766  *
1767  * The actual page to return is calculated by adding the counter to the
1768  * starting block number, modulo nblocks.
1769  */
1770  nallocated = pg_atomic_fetch_add_u64(&parallel_scan->phs_nallocated, 1);
1771  if (nallocated >= scan->rs_nblocks)
1772  page = InvalidBlockNumber; /* all blocks have been allocated */
1773  else
1774  page = (nallocated + parallel_scan->phs_startblock) % scan->rs_nblocks;
1775 
1776  /*
1777  * Report scan location. Normally, we report the current page number.
1778  * When we reach the end of the scan, though, we report the starting page,
1779  * not the ending page, just so the starting positions for later scans
1780  * doesn't slew backwards. We only report the position at the end of the
1781  * scan once, though: subsequent callers will report nothing.
1782  */
1783  if (scan->rs_syncscan)
1784  {
1785  if (page != InvalidBlockNumber)
1786  ss_report_location(scan->rs_rd, page);
1787  else if (nallocated == scan->rs_nblocks)
1788  ss_report_location(scan->rs_rd, parallel_scan->phs_startblock);
1789  }
1790 
1791  return page;
1792 }
1793 
1794 /* ----------------
1795  * heap_update_snapshot
1796  *
1797  * Update snapshot info in heap scan descriptor.
1798  * ----------------
1799  */
1800 void
1802 {
1803  Assert(IsMVCCSnapshot(snapshot));
1804 
1805  RegisterSnapshot(snapshot);
1806  scan->rs_snapshot = snapshot;
1807  scan->rs_temp_snap = true;
1808 }
1809 
1810 /* ----------------
1811  * heap_getnext - retrieve next tuple in scan
1812  *
1813  * Fix to work with index relations.
1814  * We don't return the buffer anymore, but you can get it from the
1815  * returned HeapTuple.
1816  * ----------------
1817  */
1818 
1819 #ifdef HEAPDEBUGALL
1820 #define HEAPDEBUG_1 \
1821  elog(DEBUG2, "heap_getnext([%s,nkeys=%d],dir=%d) called", \
1822  RelationGetRelationName(scan->rs_rd), scan->rs_nkeys, (int) direction)
1823 #define HEAPDEBUG_2 \
1824  elog(DEBUG2, "heap_getnext returning EOS")
1825 #define HEAPDEBUG_3 \
1826  elog(DEBUG2, "heap_getnext returning tuple")
1827 #else
1828 #define HEAPDEBUG_1
1829 #define HEAPDEBUG_2
1830 #define HEAPDEBUG_3
1831 #endif /* !defined(HEAPDEBUGALL) */
1832 
1833 
1834 HeapTuple
1836 {
1837  /* Note: no locking manipulations needed */
1838 
1839  HEAPDEBUG_1; /* heap_getnext( info ) */
1840 
1841  if (scan->rs_pageatatime)
1842  heapgettup_pagemode(scan, direction,
1843  scan->rs_nkeys, scan->rs_key);
1844  else
1845  heapgettup(scan, direction, scan->rs_nkeys, scan->rs_key);
1846 
1847  if (scan->rs_ctup.t_data == NULL)
1848  {
1849  HEAPDEBUG_2; /* heap_getnext returning EOS */
1850  return NULL;
1851  }
1852 
1853  /*
1854  * if we get here it means we have a new current scan tuple, so point to
1855  * the proper return buffer and return the tuple.
1856  */
1857  HEAPDEBUG_3; /* heap_getnext returning tuple */
1858 
1860 
1861  return &(scan->rs_ctup);
1862 }
1863 
1864 /*
1865  * heap_fetch - retrieve tuple with given tid
1866  *
1867  * On entry, tuple->t_self is the TID to fetch. We pin the buffer holding
1868  * the tuple, fill in the remaining fields of *tuple, and check the tuple
1869  * against the specified snapshot.
1870  *
1871  * If successful (tuple found and passes snapshot time qual), then *userbuf
1872  * is set to the buffer holding the tuple and true is returned. The caller
1873  * must unpin the buffer when done with the tuple.
1874  *
1875  * If the tuple is not found (ie, item number references a deleted slot),
1876  * then tuple->t_data is set to NULL and false is returned.
1877  *
1878  * If the tuple is found but fails the time qual check, then false is returned
1879  * but tuple->t_data is left pointing to the tuple.
1880  *
1881  * keep_buf determines what is done with the buffer in the false-result cases.
1882  * When the caller specifies keep_buf = true, we retain the pin on the buffer
1883  * and return it in *userbuf (so the caller must eventually unpin it); when
1884  * keep_buf = false, the pin is released and *userbuf is set to InvalidBuffer.
1885  *
1886  * stats_relation is the relation to charge the heap_fetch operation against
1887  * for statistical purposes. (This could be the heap rel itself, an
1888  * associated index, or NULL to not count the fetch at all.)
1889  *
1890  * heap_fetch does not follow HOT chains: only the exact TID requested will
1891  * be fetched.
1892  *
1893  * It is somewhat inconsistent that we ereport() on invalid block number but
1894  * return false on invalid item number. There are a couple of reasons though.
1895  * One is that the caller can relatively easily check the block number for
1896  * validity, but cannot check the item number without reading the page
1897  * himself. Another is that when we are following a t_ctid link, we can be
1898  * reasonably confident that the page number is valid (since VACUUM shouldn't
1899  * truncate off the destination page without having killed the referencing
1900  * tuple first), but the item number might well not be good.
1901  */
1902 bool
1904  Snapshot snapshot,
1905  HeapTuple tuple,
1906  Buffer *userbuf,
1907  bool keep_buf,
1908  Relation stats_relation)
1909 {
1910  ItemPointer tid = &(tuple->t_self);
1911  ItemId lp;
1912  Buffer buffer;
1913  Page page;
1914  OffsetNumber offnum;
1915  bool valid;
1916 
1917  /*
1918  * Fetch and pin the appropriate page of the relation.
1919  */
1920  buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
1921 
1922  /*
1923  * Need share lock on buffer to examine tuple commit status.
1924  */
1925  LockBuffer(buffer, BUFFER_LOCK_SHARE);
1926  page = BufferGetPage(buffer);
1927  TestForOldSnapshot(snapshot, relation, page);
1928 
1929  /*
1930  * We'd better check for out-of-range offnum in case of VACUUM since the
1931  * TID was obtained.
1932  */
1933  offnum = ItemPointerGetOffsetNumber(tid);
1934  if (offnum < FirstOffsetNumber || offnum > PageGetMaxOffsetNumber(page))
1935  {
1936  LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1937  if (keep_buf)
1938  *userbuf = buffer;
1939  else
1940  {
1941  ReleaseBuffer(buffer);
1942  *userbuf = InvalidBuffer;
1943  }
1944  tuple->t_data = NULL;
1945  return false;
1946  }
1947 
1948  /*
1949  * get the item line pointer corresponding to the requested tid
1950  */
1951  lp = PageGetItemId(page, offnum);
1952 
1953  /*
1954  * Must check for deleted tuple.
1955  */
1956  if (!ItemIdIsNormal(lp))
1957  {
1958  LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1959  if (keep_buf)
1960  *userbuf = buffer;
1961  else
1962  {
1963  ReleaseBuffer(buffer);
1964  *userbuf = InvalidBuffer;
1965  }
1966  tuple->t_data = NULL;
1967  return false;
1968  }
1969 
1970  /*
1971  * fill in *tuple fields
1972  */
1973  tuple->t_data = (HeapTupleHeader) PageGetItem(page, lp);
1974  tuple->t_len = ItemIdGetLength(lp);
1975  tuple->t_tableOid = RelationGetRelid(relation);
1976 
1977  /*
1978  * check time qualification of tuple, then release lock
1979  */
1980  valid = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer);
1981 
1982  if (valid)
1983  PredicateLockTuple(relation, tuple, snapshot);
1984 
1985  CheckForSerializableConflictOut(valid, relation, tuple, buffer, snapshot);
1986 
1987  LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1988 
1989  if (valid)
1990  {
1991  /*
1992  * All checks passed, so return the tuple as valid. Caller is now
1993  * responsible for releasing the buffer.
1994  */
1995  *userbuf = buffer;
1996 
1997  /* Count the successful fetch against appropriate rel, if any */
1998  if (stats_relation != NULL)
1999  pgstat_count_heap_fetch(stats_relation);
2000 
2001  return true;
2002  }
2003 
2004  /* Tuple failed time qual, but maybe caller wants to see it anyway. */
2005  if (keep_buf)
2006  *userbuf = buffer;
2007  else
2008  {
2009  ReleaseBuffer(buffer);
2010  *userbuf = InvalidBuffer;
2011  }
2012 
2013  return false;
2014 }
2015 
2016 /*
2017  * heap_hot_search_buffer - search HOT chain for tuple satisfying snapshot
2018  *
2019  * On entry, *tid is the TID of a tuple (either a simple tuple, or the root
2020  * of a HOT chain), and buffer is the buffer holding this tuple. We search
2021  * for the first chain member satisfying the given snapshot. If one is
2022  * found, we update *tid to reference that tuple's offset number, and
2023  * return true. If no match, return false without modifying *tid.
2024  *
2025  * heapTuple is a caller-supplied buffer. When a match is found, we return
2026  * the tuple here, in addition to updating *tid. If no match is found, the
2027  * contents of this buffer on return are undefined.
2028  *
2029  * If all_dead is not NULL, we check non-visible tuples to see if they are
2030  * globally dead; *all_dead is set true if all members of the HOT chain
2031  * are vacuumable, false if not.
2032  *
2033  * Unlike heap_fetch, the caller must already have pin and (at least) share
2034  * lock on the buffer; it is still pinned/locked at exit. Also unlike
2035  * heap_fetch, we do not report any pgstats count; caller may do so if wanted.
2036  */
2037 bool
2039  Snapshot snapshot, HeapTuple heapTuple,
2040  bool *all_dead, bool first_call)
2041 {
2042  Page dp = (Page) BufferGetPage(buffer);
2043  TransactionId prev_xmax = InvalidTransactionId;
2044  OffsetNumber offnum;
2045  bool at_chain_start;
2046  bool valid;
2047  bool skip;
2048 
2049  /* If this is not the first call, previous call returned a (live!) tuple */
2050  if (all_dead)
2051  *all_dead = first_call;
2052 
2054 
2056  offnum = ItemPointerGetOffsetNumber(tid);
2057  at_chain_start = first_call;
2058  skip = !first_call;
2059 
2060  heapTuple->t_self = *tid;
2061 
2062  /* Scan through possible multiple members of HOT-chain */
2063  for (;;)
2064  {
2065  ItemId lp;
2066 
2067  /* check for bogus TID */
2068  if (offnum < FirstOffsetNumber || offnum > PageGetMaxOffsetNumber(dp))
2069  break;
2070 
2071  lp = PageGetItemId(dp, offnum);
2072 
2073  /* check for unused, dead, or redirected items */
2074  if (!ItemIdIsNormal(lp))
2075  {
2076  /* We should only see a redirect at start of chain */
2077  if (ItemIdIsRedirected(lp) && at_chain_start)
2078  {
2079  /* Follow the redirect */
2080  offnum = ItemIdGetRedirect(lp);
2081  at_chain_start = false;
2082  continue;
2083  }
2084  /* else must be end of chain */
2085  break;
2086  }
2087 
2088  heapTuple->t_data = (HeapTupleHeader) PageGetItem(dp, lp);
2089  heapTuple->t_len = ItemIdGetLength(lp);
2090  heapTuple->t_tableOid = RelationGetRelid(relation);
2091  ItemPointerSetOffsetNumber(&heapTuple->t_self, offnum);
2092 
2093  /*
2094  * Shouldn't see a HEAP_ONLY tuple at chain start.
2095  */
2096  if (at_chain_start && HeapTupleIsHeapOnly(heapTuple))
2097  break;
2098 
2099  /*
2100  * The xmin should match the previous xmax value, else chain is
2101  * broken.
2102  */
2103  if (TransactionIdIsValid(prev_xmax) &&
2104  !TransactionIdEquals(prev_xmax,
2105  HeapTupleHeaderGetXmin(heapTuple->t_data)))
2106  break;
2107 
2108  /*
2109  * When first_call is true (and thus, skip is initially false) we'll
2110  * return the first tuple we find. But on later passes, heapTuple
2111  * will initially be pointing to the tuple we returned last time.
2112  * Returning it again would be incorrect (and would loop forever), so
2113  * we skip it and return the next match we find.
2114  */
2115  if (!skip)
2116  {
2117  /*
2118  * For the benefit of logical decoding, have t_self point at the
2119  * element of the HOT chain we're currently investigating instead
2120  * of the root tuple of the HOT chain. This is important because
2121  * the *Satisfies routine for historical mvcc snapshots needs the
2122  * correct tid to decide about the visibility in some cases.
2123  */
2124  ItemPointerSet(&(heapTuple->t_self), BufferGetBlockNumber(buffer), offnum);
2125 
2126  /* If it's visible per the snapshot, we must return it */
2127  valid = HeapTupleSatisfiesVisibility(heapTuple, snapshot, buffer);
2128  CheckForSerializableConflictOut(valid, relation, heapTuple,
2129  buffer, snapshot);
2130  /* reset to original, non-redirected, tid */
2131  heapTuple->t_self = *tid;
2132 
2133  if (valid)
2134  {
2135  ItemPointerSetOffsetNumber(tid, offnum);
2136  PredicateLockTuple(relation, heapTuple, snapshot);
2137  if (all_dead)
2138  *all_dead = false;
2139  return true;
2140  }
2141  }
2142  skip = false;
2143 
2144  /*
2145  * If we can't see it, maybe no one else can either. At caller
2146  * request, check whether all chain members are dead to all
2147  * transactions.
2148  *
2149  * Note: if you change the criterion here for what is "dead", fix the
2150  * planner's get_actual_variable_range() function to match.
2151  */
2152  if (all_dead && *all_dead &&
2154  *all_dead = false;
2155 
2156  /*
2157  * Check to see if HOT chain continues past this tuple; if so fetch
2158  * the next offnum and loop around.
2159  */
2160  if (HeapTupleIsHotUpdated(heapTuple))
2161  {
2164  offnum = ItemPointerGetOffsetNumber(&heapTuple->t_data->t_ctid);
2165  at_chain_start = false;
2166  prev_xmax = HeapTupleHeaderGetUpdateXid(heapTuple->t_data);
2167  }
2168  else
2169  break; /* end of chain */
2170  }
2171 
2172  return false;
2173 }
2174 
2175 /*
2176  * heap_hot_search - search HOT chain for tuple satisfying snapshot
2177  *
2178  * This has the same API as heap_hot_search_buffer, except that the caller
2179  * does not provide the buffer containing the page, rather we access it
2180  * locally.
2181  */
2182 bool
2184  bool *all_dead)
2185 {
2186  bool result;
2187  Buffer buffer;
2188  HeapTupleData heapTuple;
2189 
2190  buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
2191  LockBuffer(buffer, BUFFER_LOCK_SHARE);
2192  result = heap_hot_search_buffer(tid, relation, buffer, snapshot,
2193  &heapTuple, all_dead, true);
2194  LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2195  ReleaseBuffer(buffer);
2196  return result;
2197 }
2198 
2199 /*
2200  * heap_get_latest_tid - get the latest tid of a specified tuple
2201  *
2202  * Actually, this gets the latest version that is visible according to
2203  * the passed snapshot. You can pass SnapshotDirty to get the very latest,
2204  * possibly uncommitted version.
2205  *
2206  * *tid is both an input and an output parameter: it is updated to
2207  * show the latest version of the row. Note that it will not be changed
2208  * if no version of the row passes the snapshot test.
2209  */
2210 void
2212  Snapshot snapshot,
2213  ItemPointer tid)
2214 {
2215  BlockNumber blk;
2216  ItemPointerData ctid;
2217  TransactionId priorXmax;
2218 
2219  /* this is to avoid Assert failures on bad input */
2220  if (!ItemPointerIsValid(tid))
2221  return;
2222 
2223  /*
2224  * Since this can be called with user-supplied TID, don't trust the input
2225  * too much. (RelationGetNumberOfBlocks is an expensive check, so we
2226  * don't check t_ctid links again this way. Note that it would not do to
2227  * call it just once and save the result, either.)
2228  */
2229  blk = ItemPointerGetBlockNumber(tid);
2230  if (blk >= RelationGetNumberOfBlocks(relation))
2231  elog(ERROR, "block number %u is out of range for relation \"%s\"",
2232  blk, RelationGetRelationName(relation));
2233 
2234  /*
2235  * Loop to chase down t_ctid links. At top of loop, ctid is the tuple we
2236  * need to examine, and *tid is the TID we will return if ctid turns out
2237  * to be bogus.
2238  *
2239  * Note that we will loop until we reach the end of the t_ctid chain.
2240  * Depending on the snapshot passed, there might be at most one visible
2241  * version of the row, but we don't try to optimize for that.
2242  */
2243  ctid = *tid;
2244  priorXmax = InvalidTransactionId; /* cannot check first XMIN */
2245  for (;;)
2246  {
2247  Buffer buffer;
2248  Page page;
2249  OffsetNumber offnum;
2250  ItemId lp;
2251  HeapTupleData tp;
2252  bool valid;
2253 
2254  /*
2255  * Read, pin, and lock the page.
2256  */
2257  buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&ctid));
2258  LockBuffer(buffer, BUFFER_LOCK_SHARE);
2259  page = BufferGetPage(buffer);
2260  TestForOldSnapshot(snapshot, relation, page);
2261 
2262  /*
2263  * Check for bogus item number. This is not treated as an error
2264  * condition because it can happen while following a t_ctid link. We
2265  * just assume that the prior tid is OK and return it unchanged.
2266  */
2267  offnum = ItemPointerGetOffsetNumber(&ctid);
2268  if (offnum < FirstOffsetNumber || offnum > PageGetMaxOffsetNumber(page))
2269  {
2270  UnlockReleaseBuffer(buffer);
2271  break;
2272  }
2273  lp = PageGetItemId(page, offnum);
2274  if (!ItemIdIsNormal(lp))
2275  {
2276  UnlockReleaseBuffer(buffer);
2277  break;
2278  }
2279 
2280  /* OK to access the tuple */
2281  tp.t_self = ctid;
2282  tp.t_data = (HeapTupleHeader) PageGetItem(page, lp);
2283  tp.t_len = ItemIdGetLength(lp);
2284  tp.t_tableOid = RelationGetRelid(relation);
2285 
2286  /*
2287  * After following a t_ctid link, we might arrive at an unrelated
2288  * tuple. Check for XMIN match.
2289  */
2290  if (TransactionIdIsValid(priorXmax) &&
2292  {
2293  UnlockReleaseBuffer(buffer);
2294  break;
2295  }
2296 
2297  /*
2298  * Check time qualification of tuple; if visible, set it as the new
2299  * result candidate.
2300  */
2301  valid = HeapTupleSatisfiesVisibility(&tp, snapshot, buffer);
2302  CheckForSerializableConflictOut(valid, relation, &tp, buffer, snapshot);
2303  if (valid)
2304  *tid = ctid;
2305 
2306  /*
2307  * If there's a valid t_ctid link, follow it, else we're done.
2308  */
2309  if ((tp.t_data->t_infomask & HEAP_XMAX_INVALID) ||
2313  {
2314  UnlockReleaseBuffer(buffer);
2315  break;
2316  }
2317 
2318  ctid = tp.t_data->t_ctid;
2319  priorXmax = HeapTupleHeaderGetUpdateXid(tp.t_data);
2320  UnlockReleaseBuffer(buffer);
2321  } /* end of loop */
2322 }
2323 
2324 
2325 /*
2326  * UpdateXmaxHintBits - update tuple hint bits after xmax transaction ends
2327  *
2328  * This is called after we have waited for the XMAX transaction to terminate.
2329  * If the transaction aborted, we guarantee the XMAX_INVALID hint bit will
2330  * be set on exit. If the transaction committed, we set the XMAX_COMMITTED
2331  * hint bit if possible --- but beware that that may not yet be possible,
2332  * if the transaction committed asynchronously.
2333  *
2334  * Note that if the transaction was a locker only, we set HEAP_XMAX_INVALID
2335  * even if it commits.
2336  *
2337  * Hence callers should look only at XMAX_INVALID.
2338  *
2339  * Note this is not allowed for tuples whose xmax is a multixact.
2340  */
2341 static void
2343 {
2345  Assert(!(tuple->t_infomask & HEAP_XMAX_IS_MULTI));
2346 
2347  if (!(tuple->t_infomask & (HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID)))
2348  {
2349  if (!HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask) &&
2352  xid);
2353  else
2354  HeapTupleSetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
2356  }
2357 }
2358 
2359 
2360 /*
2361  * GetBulkInsertState - prepare status object for a bulk insert
2362  */
2365 {
2366  BulkInsertState bistate;
2367 
2368  bistate = (BulkInsertState) palloc(sizeof(BulkInsertStateData));
2370  bistate->current_buf = InvalidBuffer;
2371  return bistate;
2372 }
2373 
2374 /*
2375  * FreeBulkInsertState - clean up after finishing a bulk insert
2376  */
2377 void
2379 {
2380  if (bistate->current_buf != InvalidBuffer)
2381  ReleaseBuffer(bistate->current_buf);
2382  FreeAccessStrategy(bistate->strategy);
2383  pfree(bistate);
2384 }
2385 
2386 /*
2387  * ReleaseBulkInsertStatePin - release a buffer currently held in bistate
2388  */
2389 void
2391 {
2392  if (bistate->current_buf != InvalidBuffer)
2393  ReleaseBuffer(bistate->current_buf);
2394  bistate->current_buf = InvalidBuffer;
2395 }
2396 
2397 
2398 /*
2399  * heap_insert - insert tuple into a heap
2400  *
2401  * The new tuple is stamped with current transaction ID and the specified
2402  * command ID.
2403  *
2404  * If the HEAP_INSERT_SKIP_WAL option is specified, the new tuple is not
2405  * logged in WAL, even for a non-temp relation. Safe usage of this behavior
2406  * requires that we arrange that all new tuples go into new pages not
2407  * containing any tuples from other transactions, and that the relation gets
2408  * fsync'd before commit. (See also heap_sync() comments)
2409  *
2410  * The HEAP_INSERT_SKIP_FSM option is passed directly to
2411  * RelationGetBufferForTuple, which see for more info.
2412  *
2413  * HEAP_INSERT_FROZEN should only be specified for inserts into
2414  * relfilenodes created during the current subtransaction and when
2415  * there are no prior snapshots or pre-existing portals open.
2416  * This causes rows to be frozen, which is an MVCC violation and
2417  * requires explicit options chosen by user.
2418  *
2419  * HEAP_INSERT_SPECULATIVE is used on so-called "speculative insertions",
2420  * which can be backed out afterwards without aborting the whole transaction.
2421  * Other sessions can wait for the speculative insertion to be confirmed,
2422  * turning it into a regular tuple, or aborted, as if it never existed.
2423  * Speculatively inserted tuples behave as "value locks" of short duration,
2424  * used to implement INSERT .. ON CONFLICT.
2425  *
2426  * Note that most of these options will be applied when inserting into the
2427  * heap's TOAST table, too, if the tuple requires any out-of-line data. Only
2428  * HEAP_INSERT_SPECULATIVE is explicitly ignored, as the toast data does not
2429  * partake in speculative insertion.
2430  *
2431  * The BulkInsertState object (if any; bistate can be NULL for default
2432  * behavior) is also just passed through to RelationGetBufferForTuple.
2433  *
2434  * The return value is the OID assigned to the tuple (either here or by the
2435  * caller), or InvalidOid if no OID. The header fields of *tup are updated
2436  * to match the stored tuple; in particular tup->t_self receives the actual
2437  * TID where the tuple was stored. But note that any toasting of fields
2438  * within the tuple data is NOT reflected into *tup.
2439  */
2440 Oid
2442  int options, BulkInsertState bistate)
2443 {
2445  HeapTuple heaptup;
2446  Buffer buffer;
2447  Buffer vmbuffer = InvalidBuffer;
2448  bool all_visible_cleared = false;
2449 
2450  /*
2451  * Fill in tuple header fields, assign an OID, and toast the tuple if
2452  * necessary.
2453  *
2454  * Note: below this point, heaptup is the data we actually intend to store
2455  * into the relation; tup is the caller's original untoasted data.
2456  */
2457  heaptup = heap_prepare_insert(relation, tup, xid, cid, options);
2458 
2459  /*
2460  * Find buffer to insert this tuple into. If the page is all visible,
2461  * this will also pin the requisite visibility map page.
2462  */
2463  buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
2464  InvalidBuffer, options, bistate,
2465  &vmbuffer, NULL);
2466 
2467  /*
2468  * We're about to do the actual insert -- but check for conflict first, to
2469  * avoid possibly having to roll back work we've just done.
2470  *
2471  * This is safe without a recheck as long as there is no possibility of
2472  * another process scanning the page between this check and the insert
2473  * being visible to the scan (i.e., an exclusive buffer content lock is
2474  * continuously held from this point until the tuple insert is visible).
2475  *
2476  * For a heap insert, we only need to check for table-level SSI locks. Our
2477  * new tuple can't possibly conflict with existing tuple locks, and heap
2478  * page locks are only consolidated versions of tuple locks; they do not
2479  * lock "gaps" as index page locks do. So we don't need to specify a
2480  * buffer when making the call, which makes for a faster check.
2481  */
2483 
2484  /* NO EREPORT(ERROR) from here till changes are logged */
2486 
2487  RelationPutHeapTuple(relation, buffer, heaptup,
2488  (options & HEAP_INSERT_SPECULATIVE) != 0);
2489 
2490  if (PageIsAllVisible(BufferGetPage(buffer)))
2491  {
2492  all_visible_cleared = true;
2494  visibilitymap_clear(relation,
2495  ItemPointerGetBlockNumber(&(heaptup->t_self)),
2496  vmbuffer, VISIBILITYMAP_VALID_BITS);
2497  }
2498 
2499  /*
2500  * XXX Should we set PageSetPrunable on this page ?
2501  *
2502  * The inserting transaction may eventually abort thus making this tuple
2503  * DEAD and hence available for pruning. Though we don't want to optimize
2504  * for aborts, if no other tuple in this page is UPDATEd/DELETEd, the
2505  * aborted tuple will never be pruned until next vacuum is triggered.
2506  *
2507  * If you do add PageSetPrunable here, add it in heap_xlog_insert too.
2508  */
2509 
2510  MarkBufferDirty(buffer);
2511 
2512  /* XLOG stuff */
2513  if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))
2514  {
2515  xl_heap_insert xlrec;
2516  xl_heap_header xlhdr;
2517  XLogRecPtr recptr;
2518  Page page = BufferGetPage(buffer);
2519  uint8 info = XLOG_HEAP_INSERT;
2520  int bufflags = 0;
2521 
2522  /*
2523  * If this is a catalog, we need to transmit combocids to properly
2524  * decode, so log that as well.
2525  */
2527  log_heap_new_cid(relation, heaptup);
2528 
2529  /*
2530  * If this is the single and first tuple on page, we can reinit the
2531  * page instead of restoring the whole thing. Set flag, and hide
2532  * buffer references from XLogInsert.
2533  */
2534  if (ItemPointerGetOffsetNumber(&(heaptup->t_self)) == FirstOffsetNumber &&
2536  {
2537  info |= XLOG_HEAP_INIT_PAGE;
2538  bufflags |= REGBUF_WILL_INIT;
2539  }
2540 
2541  xlrec.offnum = ItemPointerGetOffsetNumber(&heaptup->t_self);
2542  xlrec.flags = 0;
2543  if (all_visible_cleared)
2545  if (options & HEAP_INSERT_SPECULATIVE)
2548 
2549  /*
2550  * For logical decoding, we need the tuple even if we're doing a full
2551  * page write, so make sure it's included even if we take a full-page
2552  * image. (XXX We could alternatively store a pointer into the FPW).
2553  */
2554  if (RelationIsLogicallyLogged(relation))
2555  {
2557  bufflags |= REGBUF_KEEP_DATA;
2558  }
2559 
2560  XLogBeginInsert();
2561  XLogRegisterData((char *) &xlrec, SizeOfHeapInsert);
2562 
2563  xlhdr.t_infomask2 = heaptup->t_data->t_infomask2;
2564  xlhdr.t_infomask = heaptup->t_data->t_infomask;
2565  xlhdr.t_hoff = heaptup->t_data->t_hoff;
2566 
2567  /*
2568  * note we mark xlhdr as belonging to buffer; if XLogInsert decides to
2569  * write the whole page to the xlog, we don't need to store
2570  * xl_heap_header in the xlog.
2571  */
2572  XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
2573  XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader);
2574  /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
2576  (char *) heaptup->t_data + SizeofHeapTupleHeader,
2577  heaptup->t_len - SizeofHeapTupleHeader);
2578 
2579  /* filtering by origin on a row level is much more efficient */
2581 
2582  recptr = XLogInsert(RM_HEAP_ID, info);
2583 
2584  PageSetLSN(page, recptr);
2585  }
2586 
2587  END_CRIT_SECTION();
2588 
2589  UnlockReleaseBuffer(buffer);
2590  if (vmbuffer != InvalidBuffer)
2591  ReleaseBuffer(vmbuffer);
2592 
2593  /*
2594  * If tuple is cachable, mark it for invalidation from the caches in case
2595  * we abort. Note it is OK to do this after releasing the buffer, because
2596  * the heaptup data structure is all in local memory, not in the shared
2597  * buffer.
2598  */
2599  CacheInvalidateHeapTuple(relation, heaptup, NULL);
2600 
2601  /* Note: speculative insertions are counted too, even if aborted later */
2602  pgstat_count_heap_insert(relation, 1);
2603 
2604  /*
2605  * If heaptup is a private copy, release it. Don't forget to copy t_self
2606  * back to the caller's image, too.
2607  */
2608  if (heaptup != tup)
2609  {
2610  tup->t_self = heaptup->t_self;
2611  heap_freetuple(heaptup);
2612  }
2613 
2614  return HeapTupleGetOid(tup);
2615 }
2616 
2617 /*
2618  * Subroutine for heap_insert(). Prepares a tuple for insertion. This sets the
2619  * tuple header fields, assigns an OID, and toasts the tuple if necessary.
2620  * Returns a toasted version of the tuple if it was toasted, or the original
2621  * tuple if not. Note that in any case, the header fields are also set in
2622  * the original tuple.
2623  */
2624 static HeapTuple
2626  CommandId cid, int options)
2627 {
2628  /*
2629  * Parallel operations are required to be strictly read-only in a parallel
2630  * worker. Parallel inserts are not safe even in the leader in the
2631  * general case, because group locking means that heavyweight locks for
2632  * relation extension or GIN page locks will not conflict between members
2633  * of a lock group, but we don't prohibit that case here because there are
2634  * useful special cases that we can safely allow, such as CREATE TABLE AS.
2635  */
2636  if (IsParallelWorker())
2637  ereport(ERROR,
2638  (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2639  errmsg("cannot insert tuples in a parallel worker")));
2640 
2641  if (relation->rd_rel->relhasoids)
2642  {
2643 #ifdef NOT_USED
2644  /* this is redundant with an Assert in HeapTupleSetOid */
2646 #endif
2647 
2648  /*
2649  * If the object id of this tuple has already been assigned, trust the
2650  * caller. There are a couple of ways this can happen. At initial db
2651  * creation, the backend program sets oids for tuples. When we define
2652  * an index, we set the oid. Finally, in the future, we may allow
2653  * users to set their own object ids in order to support a persistent
2654  * object store (objects need to contain pointers to one another).
2655  */
2656  if (!OidIsValid(HeapTupleGetOid(tup)))
2657  HeapTupleSetOid(tup, GetNewOid(relation));
2658  }
2659  else
2660  {
2661  /* check there is not space for an OID */
2662  Assert(!(tup->t_data->t_infomask & HEAP_HASOID));
2663  }
2664 
2665  tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
2666  tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
2668  HeapTupleHeaderSetXmin(tup->t_data, xid);
2669  if (options & HEAP_INSERT_FROZEN)
2671 
2672  HeapTupleHeaderSetCmin(tup->t_data, cid);
2673  HeapTupleHeaderSetXmax(tup->t_data, 0); /* for cleanliness */
2674  tup->t_tableOid = RelationGetRelid(relation);
2675 
2676  /*
2677  * If the new tuple is too big for storage or contains already toasted
2678  * out-of-line attributes from some other relation, invoke the toaster.
2679  */
2680  if (relation->rd_rel->relkind != RELKIND_RELATION &&
2681  relation->rd_rel->relkind != RELKIND_MATVIEW)
2682  {
2683  /* toast table entries should never be recursively toasted */
2685  return tup;
2686  }
2687  else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
2688  return toast_insert_or_update(relation, tup, NULL, options);
2689  else
2690  return tup;
2691 }
2692 
2693 /*
2694  * heap_multi_insert - insert multiple tuple into a heap
2695  *
2696  * This is like heap_insert(), but inserts multiple tuples in one operation.
2697  * That's faster than calling heap_insert() in a loop, because when multiple
2698  * tuples can be inserted on a single page, we can write just a single WAL
2699  * record covering all of them, and only need to lock/unlock the page once.
2700  *
2701  * Note: this leaks memory into the current memory context. You can create a
2702  * temporary context before calling this, if that's a problem.
2703  */
2704 void
2705 heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
2706  CommandId cid, int options, BulkInsertState bistate)
2707 {
2709  HeapTuple *heaptuples;
2710  int i;
2711  int ndone;
2712  char *scratch = NULL;
2713  Page page;
2714  bool needwal;
2715  Size saveFreeSpace;
2716  bool need_tuple_data = RelationIsLogicallyLogged(relation);
2717  bool need_cids = RelationIsAccessibleInLogicalDecoding(relation);
2718 
2719  needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation);
2720  saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
2722 
2723  /* Toast and set header data in all the tuples */
2724  heaptuples = palloc(ntuples * sizeof(HeapTuple));
2725  for (i = 0; i < ntuples; i++)
2726  heaptuples[i] = heap_prepare_insert(relation, tuples[i],
2727  xid, cid, options);
2728 
2729  /*
2730  * Allocate some memory to use for constructing the WAL record. Using
2731  * palloc() within a critical section is not safe, so we allocate this
2732  * beforehand.
2733  */
2734  if (needwal)
2735  scratch = palloc(BLCKSZ);
2736 
2737  /*
2738  * We're about to do the actual inserts -- but check for conflict first,
2739  * to minimize the possibility of having to roll back work we've just
2740  * done.
2741  *
2742  * A check here does not definitively prevent a serialization anomaly;
2743  * that check MUST be done at least past the point of acquiring an
2744  * exclusive buffer content lock on every buffer that will be affected,
2745  * and MAY be done after all inserts are reflected in the buffers and
2746  * those locks are released; otherwise there race condition. Since
2747  * multiple buffers can be locked and unlocked in the loop below, and it
2748  * would not be feasible to identify and lock all of those buffers before
2749  * the loop, we must do a final check at the end.
2750  *
2751  * The check here could be omitted with no loss of correctness; it is
2752  * present strictly as an optimization.
2753  *
2754  * For heap inserts, we only need to check for table-level SSI locks. Our
2755  * new tuples can't possibly conflict with existing tuple locks, and heap
2756  * page locks are only consolidated versions of tuple locks; they do not
2757  * lock "gaps" as index page locks do. So we don't need to specify a
2758  * buffer when making the call, which makes for a faster check.
2759  */
2761 
2762  ndone = 0;
2763  while (ndone < ntuples)
2764  {
2765  Buffer buffer;
2766  Buffer vmbuffer = InvalidBuffer;
2767  bool all_visible_cleared = false;
2768  int nthispage;
2769 
2771 
2772  /*
2773  * Find buffer where at least the next tuple will fit. If the page is
2774  * all-visible, this will also pin the requisite visibility map page.
2775  */
2776  buffer = RelationGetBufferForTuple(relation, heaptuples[ndone]->t_len,
2777  InvalidBuffer, options, bistate,
2778  &vmbuffer, NULL);
2779  page = BufferGetPage(buffer);
2780 
2781  /* NO EREPORT(ERROR) from here till changes are logged */
2783 
2784  /*
2785  * RelationGetBufferForTuple has ensured that the first tuple fits.
2786  * Put that on the page, and then as many other tuples as fit.
2787  */
2788  RelationPutHeapTuple(relation, buffer, heaptuples[ndone], false);
2789  for (nthispage = 1; ndone + nthispage < ntuples; nthispage++)
2790  {
2791  HeapTuple heaptup = heaptuples[ndone + nthispage];
2792 
2793  if (PageGetHeapFreeSpace(page) < MAXALIGN(heaptup->t_len) + saveFreeSpace)
2794  break;
2795 
2796  RelationPutHeapTuple(relation, buffer, heaptup, false);
2797 
2798  /*
2799  * We don't use heap_multi_insert for catalog tuples yet, but
2800  * better be prepared...
2801  */
2802  if (needwal && need_cids)
2803  log_heap_new_cid(relation, heaptup);
2804  }
2805 
2806  if (PageIsAllVisible(page))
2807  {
2808  all_visible_cleared = true;
2809  PageClearAllVisible(page);
2810  visibilitymap_clear(relation,
2811  BufferGetBlockNumber(buffer),
2812  vmbuffer, VISIBILITYMAP_VALID_BITS);
2813  }
2814 
2815  /*
2816  * XXX Should we set PageSetPrunable on this page ? See heap_insert()
2817  */
2818 
2819  MarkBufferDirty(buffer);
2820 
2821  /* XLOG stuff */
2822  if (needwal)
2823  {
2824  XLogRecPtr recptr;
2825  xl_heap_multi_insert *xlrec;
2827  char *tupledata;
2828  int totaldatalen;
2829  char *scratchptr = scratch;
2830  bool init;
2831  int bufflags = 0;
2832 
2833  /*
2834  * If the page was previously empty, we can reinit the page
2835  * instead of restoring the whole thing.
2836  */
2837  init = (ItemPointerGetOffsetNumber(&(heaptuples[ndone]->t_self)) == FirstOffsetNumber &&
2838  PageGetMaxOffsetNumber(page) == FirstOffsetNumber + nthispage - 1);
2839 
2840  /* allocate xl_heap_multi_insert struct from the scratch area */
2841  xlrec = (xl_heap_multi_insert *) scratchptr;
2842  scratchptr += SizeOfHeapMultiInsert;
2843 
2844  /*
2845  * Allocate offsets array. Unless we're reinitializing the page,
2846  * in that case the tuples are stored in order starting at
2847  * FirstOffsetNumber and we don't need to store the offsets
2848  * explicitly.
2849  */
2850  if (!init)
2851  scratchptr += nthispage * sizeof(OffsetNumber);
2852 
2853  /* the rest of the scratch space is used for tuple data */
2854  tupledata = scratchptr;
2855 
2856  xlrec->flags = all_visible_cleared ? XLH_INSERT_ALL_VISIBLE_CLEARED : 0;
2857  xlrec->ntuples = nthispage;
2858 
2859  /*
2860  * Write out an xl_multi_insert_tuple and the tuple data itself
2861  * for each tuple.
2862  */
2863  for (i = 0; i < nthispage; i++)
2864  {
2865  HeapTuple heaptup = heaptuples[ndone + i];
2866  xl_multi_insert_tuple *tuphdr;
2867  int datalen;
2868 
2869  if (!init)
2870  xlrec->offsets[i] = ItemPointerGetOffsetNumber(&heaptup->t_self);
2871  /* xl_multi_insert_tuple needs two-byte alignment. */
2872  tuphdr = (xl_multi_insert_tuple *) SHORTALIGN(scratchptr);
2873  scratchptr = ((char *) tuphdr) + SizeOfMultiInsertTuple;
2874 
2875  tuphdr->t_infomask2 = heaptup->t_data->t_infomask2;
2876  tuphdr->t_infomask = heaptup->t_data->t_infomask;
2877  tuphdr->t_hoff = heaptup->t_data->t_hoff;
2878 
2879  /* write bitmap [+ padding] [+ oid] + data */
2880  datalen = heaptup->t_len - SizeofHeapTupleHeader;
2881  memcpy(scratchptr,
2882  (char *) heaptup->t_data + SizeofHeapTupleHeader,
2883  datalen);
2884  tuphdr->datalen = datalen;
2885  scratchptr += datalen;
2886  }
2887  totaldatalen = scratchptr - tupledata;
2888  Assert((scratchptr - scratch) < BLCKSZ);
2889 
2890  if (need_tuple_data)
2892 
2893  /*
2894  * Signal that this is the last xl_heap_multi_insert record
2895  * emitted by this call to heap_multi_insert(). Needed for logical
2896  * decoding so it knows when to cleanup temporary data.
2897  */
2898  if (ndone + nthispage == ntuples)
2899  xlrec->flags |= XLH_INSERT_LAST_IN_MULTI;
2900 
2901  if (init)
2902  {
2903  info |= XLOG_HEAP_INIT_PAGE;
2904  bufflags |= REGBUF_WILL_INIT;
2905  }
2906 
2907  /*
2908  * If we're doing logical decoding, include the new tuple data
2909  * even if we take a full-page image of the page.
2910  */
2911  if (need_tuple_data)
2912  bufflags |= REGBUF_KEEP_DATA;
2913 
2914  XLogBeginInsert();
2915  XLogRegisterData((char *) xlrec, tupledata - scratch);
2916  XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
2917 
2918  XLogRegisterBufData(0, tupledata, totaldatalen);
2919 
2920  /* filtering by origin on a row level is much more efficient */
2922 
2923  recptr = XLogInsert(RM_HEAP2_ID, info);
2924 
2925  PageSetLSN(page, recptr);
2926  }
2927 
2928  END_CRIT_SECTION();
2929 
2930  UnlockReleaseBuffer(buffer);
2931  if (vmbuffer != InvalidBuffer)
2932  ReleaseBuffer(vmbuffer);
2933 
2934  ndone += nthispage;
2935  }
2936 
2937  /*
2938  * We're done with the actual inserts. Check for conflicts again, to
2939  * ensure that all rw-conflicts in to these inserts are detected. Without
2940  * this final check, a sequential scan of the heap may have locked the
2941  * table after the "before" check, missing one opportunity to detect the
2942  * conflict, and then scanned the table before the new tuples were there,
2943  * missing the other chance to detect the conflict.
2944  *
2945  * For heap inserts, we only need to check for table-level SSI locks. Our
2946  * new tuples can't possibly conflict with existing tuple locks, and heap
2947  * page locks are only consolidated versions of tuple locks; they do not
2948  * lock "gaps" as index page locks do. So we don't need to specify a
2949  * buffer when making the call.
2950  */
2952 
2953  /*
2954  * If tuples are cachable, mark them for invalidation from the caches in
2955  * case we abort. Note it is OK to do this after releasing the buffer,
2956  * because the heaptuples data structure is all in local memory, not in
2957  * the shared buffer.
2958  */
2959  if (IsCatalogRelation(relation))
2960  {
2961  for (i = 0; i < ntuples; i++)
2962  CacheInvalidateHeapTuple(relation, heaptuples[i], NULL);
2963  }
2964 
2965  /*
2966  * Copy t_self fields back to the caller's original tuples. This does
2967  * nothing for untoasted tuples (tuples[i] == heaptuples[i)], but it's
2968  * probably faster to always copy than check.
2969  */
2970  for (i = 0; i < ntuples; i++)
2971  tuples[i]->t_self = heaptuples[i]->t_self;
2972 
2973  pgstat_count_heap_insert(relation, ntuples);
2974 }
2975 
2976 /*
2977  * simple_heap_insert - insert a tuple
2978  *
2979  * Currently, this routine differs from heap_insert only in supplying
2980  * a default command ID and not allowing access to the speedup options.
2981  *
2982  * This should be used rather than using heap_insert directly in most places
2983  * where we are modifying system catalogs.
2984  */
2985 Oid
2987 {
2988  return heap_insert(relation, tup, GetCurrentCommandId(true), 0, NULL);
2989 }
2990 
2991 /*
2992  * Given infomask/infomask2, compute the bits that must be saved in the
2993  * "infobits" field of xl_heap_delete, xl_heap_update, xl_heap_lock,
2994  * xl_heap_lock_updated WAL records.
2995  *
2996  * See fix_infomask_from_infobits.
2997  */
2998 static uint8
2999 compute_infobits(uint16 infomask, uint16 infomask2)
3000 {
3001  return
3002  ((infomask & HEAP_XMAX_IS_MULTI) != 0 ? XLHL_XMAX_IS_MULTI : 0) |
3003  ((infomask & HEAP_XMAX_LOCK_ONLY) != 0 ? XLHL_XMAX_LOCK_ONLY : 0) |
3004  ((infomask & HEAP_XMAX_EXCL_LOCK) != 0 ? XLHL_XMAX_EXCL_LOCK : 0) |
3005  /* note we ignore HEAP_XMAX_SHR_LOCK here */
3006  ((infomask & HEAP_XMAX_KEYSHR_LOCK) != 0 ? XLHL_XMAX_KEYSHR_LOCK : 0) |
3007  ((infomask2 & HEAP_KEYS_UPDATED) != 0 ?
3008  XLHL_KEYS_UPDATED : 0);
3009 }
3010 
3011 /*
3012  * Given two versions of the same t_infomask for a tuple, compare them and
3013  * return whether the relevant status for a tuple Xmax has changed. This is
3014  * used after a buffer lock has been released and reacquired: we want to ensure
3015  * that the tuple state continues to be the same it was when we previously
3016  * examined it.
3017  *
3018  * Note the Xmax field itself must be compared separately.
3019  */
3020 static inline bool
3021 xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask)
3022 {
3023  const uint16 interesting =
3025 
3026  if ((new_infomask & interesting) != (old_infomask & interesting))
3027  return true;
3028 
3029  return false;
3030 }
3031 
3032 /*
3033  * heap_delete - delete a tuple
3034  *
3035  * NB: do not call this directly unless you are prepared to deal with
3036  * concurrent-update conditions. Use simple_heap_delete instead.
3037  *
3038  * relation - table to be modified (caller must hold suitable lock)
3039  * tid - TID of tuple to be deleted
3040  * cid - delete command ID (used for visibility test, and stored into
3041  * cmax if successful)
3042  * crosscheck - if not InvalidSnapshot, also check tuple against this
3043  * wait - true if should wait for any conflicting update to commit/abort
3044  * hufd - output parameter, filled in failure cases (see below)
3045  * changingPart - true iff the tuple is being moved to another partition
3046  * table due to an update of the partition key. Otherwise, false.
3047  *
3048  * Normal, successful return value is HeapTupleMayBeUpdated, which
3049  * actually means we did delete it. Failure return codes are
3050  * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
3051  * (the last only possible if wait == false).
3052  *
3053  * In the failure cases, the routine fills *hufd with the tuple's t_ctid,
3054  * t_xmax (resolving a possible MultiXact, if necessary), and t_cmax
3055  * (the last only for HeapTupleSelfUpdated, since we
3056  * cannot obtain cmax from a combocid generated by another transaction).
3057  * See comments for struct HeapUpdateFailureData for additional info.
3058  */
3061  CommandId cid, Snapshot crosscheck, bool wait,
3062  HeapUpdateFailureData *hufd, bool changingPart)
3063 {
3064  HTSU_Result result;
3066  ItemId lp;
3067  HeapTupleData tp;
3068  Page page;
3069  BlockNumber block;
3070  Buffer buffer;
3071  Buffer vmbuffer = InvalidBuffer;
3072  TransactionId new_xmax;
3073  uint16 new_infomask,
3074  new_infomask2;
3075  bool have_tuple_lock = false;
3076  bool iscombo;
3077  bool all_visible_cleared = false;
3078  HeapTuple old_key_tuple = NULL; /* replica identity of the tuple */
3079  bool old_key_copied = false;
3080 
3081  Assert(ItemPointerIsValid(tid));
3082 
3083  /*
3084  * Forbid this during a parallel operation, lest it allocate a combocid.
3085  * Other workers might need that combocid for visibility checks, and we
3086  * have no provision for broadcasting it to them.
3087  */
3088  if (IsInParallelMode())
3089  ereport(ERROR,
3090  (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
3091  errmsg("cannot delete tuples during a parallel operation")));
3092 
3093  block = ItemPointerGetBlockNumber(tid);
3094  buffer = ReadBuffer(relation, block);
3095  page = BufferGetPage(buffer);
3096 
3097  /*
3098  * Before locking the buffer, pin the visibility map page if it appears to
3099  * be necessary. Since we haven't got the lock yet, someone else might be
3100  * in the middle of changing this, so we'll need to recheck after we have
3101  * the lock.
3102  */
3103  if (PageIsAllVisible(page))
3104  visibilitymap_pin(relation, block, &vmbuffer);
3105 
3107 
3108  /*
3109  * If we didn't pin the visibility map page and the page has become all
3110  * visible while we were busy locking the buffer, we'll have to unlock and
3111  * re-lock, to avoid holding the buffer lock across an I/O. That's a bit
3112  * unfortunate, but hopefully shouldn't happen often.
3113  */
3114  if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))
3115  {
3116  LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3117  visibilitymap_pin(relation, block, &vmbuffer);
3119  }
3120 
3121  lp = PageGetItemId(page, ItemPointerGetOffsetNumber(tid));
3122  Assert(ItemIdIsNormal(lp));
3123 
3124  tp.t_tableOid = RelationGetRelid(relation);
3125  tp.t_data = (HeapTupleHeader) PageGetItem(page, lp);
3126  tp.t_len = ItemIdGetLength(lp);
3127  tp.t_self = *tid;
3128 
3129 l1:
3130  result = HeapTupleSatisfiesUpdate(&tp, cid, buffer);
3131 
3132  if (result == HeapTupleInvisible)
3133  {
3134  UnlockReleaseBuffer(buffer);
3135  ereport(ERROR,
3136  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3137  errmsg("attempted to delete invisible tuple")));
3138  }
3139  else if (result == HeapTupleBeingUpdated && wait)
3140  {
3141  TransactionId xwait;
3142  uint16 infomask;
3143 
3144  /* must copy state data before unlocking buffer */
3145  xwait = HeapTupleHeaderGetRawXmax(tp.t_data);
3146  infomask = tp.t_data->t_infomask;
3147 
3148  /*
3149  * Sleep until concurrent transaction ends -- except when there's a
3150  * single locker and it's our own transaction. Note we don't care
3151  * which lock mode the locker has, because we need the strongest one.
3152  *
3153  * Before sleeping, we need to acquire tuple lock to establish our
3154  * priority for the tuple (see heap_lock_tuple). LockTuple will
3155  * release us when we are next-in-line for the tuple.
3156  *
3157  * If we are forced to "start over" below, we keep the tuple lock;
3158  * this arranges that we stay at the head of the line while rechecking
3159  * tuple state.
3160  */
3161  if (infomask & HEAP_XMAX_IS_MULTI)
3162  {
3163  /* wait for multixact */
3164  if (DoesMultiXactIdConflict((MultiXactId) xwait, infomask,
3166  {
3167  LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3168 
3169  /* acquire tuple lock, if necessary */
3171  LockWaitBlock, &have_tuple_lock);
3172 
3173  /* wait for multixact */
3175  relation, &(tp.t_self), XLTW_Delete,
3176  NULL);
3178 
3179  /*
3180  * If xwait had just locked the tuple then some other xact
3181  * could update this tuple before we get to this point. Check
3182  * for xmax change, and start over if so.
3183  */
3184  if (xmax_infomask_changed(tp.t_data->t_infomask, infomask) ||
3186  xwait))
3187  goto l1;
3188  }
3189 
3190  /*
3191  * You might think the multixact is necessarily done here, but not
3192  * so: it could have surviving members, namely our own xact or
3193  * other subxacts of this backend. It is legal for us to delete
3194  * the tuple in either case, however (the latter case is
3195  * essentially a situation of upgrading our former shared lock to
3196  * exclusive). We don't bother changing the on-disk hint bits
3197  * since we are about to overwrite the xmax altogether.
3198  */
3199  }
3200  else if (!TransactionIdIsCurrentTransactionId(xwait))
3201  {
3202  /*
3203  * Wait for regular transaction to end; but first, acquire tuple
3204  * lock.
3205  */
3206  LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3208  LockWaitBlock, &have_tuple_lock);
3209  XactLockTableWait(xwait, relation, &(tp.t_self), XLTW_Delete);
3211 
3212  /*
3213  * xwait is done, but if xwait had just locked the tuple then some
3214  * other xact could update this tuple before we get to this point.
3215  * Check for xmax change, and start over if so.
3216  */
3217  if (xmax_infomask_changed(tp.t_data->t_infomask, infomask) ||
3219  xwait))
3220  goto l1;
3221 
3222  /* Otherwise check if it committed or aborted */
3223  UpdateXmaxHintBits(tp.t_data, buffer, xwait);
3224  }
3225 
3226  /*
3227  * We may overwrite if previous xmax aborted, or if it committed but
3228  * only locked the tuple without updating it.
3229  */
3230  if ((tp.t_data->t_infomask & HEAP_XMAX_INVALID) ||
3233  result = HeapTupleMayBeUpdated;
3234  else
3235  result = HeapTupleUpdated;
3236  }
3237 
3238  if (crosscheck != InvalidSnapshot && result == HeapTupleMayBeUpdated)
3239  {
3240  /* Perform additional check for transaction-snapshot mode RI updates */
3241  if (!HeapTupleSatisfiesVisibility(&tp, crosscheck, buffer))
3242  result = HeapTupleUpdated;
3243  }
3244 
3245  if (result != HeapTupleMayBeUpdated)
3246  {
3247  Assert(result == HeapTupleSelfUpdated ||
3248  result == HeapTupleUpdated ||
3249  result == HeapTupleBeingUpdated);
3251  hufd->ctid = tp.t_data->t_ctid;
3253  if (result == HeapTupleSelfUpdated)
3254  hufd->cmax = HeapTupleHeaderGetCmax(tp.t_data);
3255  else
3256  hufd->cmax = InvalidCommandId;
3257  UnlockReleaseBuffer(buffer);
3258  if (have_tuple_lock)
3259  UnlockTupleTuplock(relation, &(tp.t_self), LockTupleExclusive);
3260  if (vmbuffer != InvalidBuffer)
3261  ReleaseBuffer(vmbuffer);
3262  return result;
3263  }
3264 
3265  /*
3266  * We're about to do the actual delete -- check for conflict first, to
3267  * avoid possibly having to roll back work we've just done.
3268  *
3269  * This is safe without a recheck as long as there is no possibility of
3270  * another process scanning the page between this check and the delete
3271  * being visible to the scan (i.e., an exclusive buffer content lock is
3272  * continuously held from this point until the tuple delete is visible).
3273  */
3274  CheckForSerializableConflictIn(relation, &tp, buffer);
3275 
3276  /* replace cid with a combo cid if necessary */
3277  HeapTupleHeaderAdjustCmax(tp.t_data, &cid, &iscombo);
3278 
3279  /*
3280  * Compute replica identity tuple before entering the critical section so
3281  * we don't PANIC upon a memory allocation failure.
3282  */
3283  old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied);
3284 
3285  /*
3286  * If this is the first possibly-multixact-able operation in the current
3287  * transaction, set my per-backend OldestMemberMXactId setting. We can be
3288  * certain that the transaction will never become a member of any older
3289  * MultiXactIds than that. (We have to do this even if we end up just
3290  * using our own TransactionId below, since some other backend could
3291  * incorporate our XID into a MultiXact immediately afterwards.)
3292  */
3294 
3297  xid, LockTupleExclusive, true,
3298  &new_xmax, &new_infomask, &new_infomask2);
3299 
3301 
3302  /*
3303  * If this transaction commits, the tuple will become DEAD sooner or
3304  * later. Set flag that this page is a candidate for pruning once our xid
3305  * falls below the OldestXmin horizon. If the transaction finally aborts,
3306  * the subsequent page pruning will be a no-op and the hint will be
3307  * cleared.
3308  */
3309  PageSetPrunable(page, xid);
3310 
3311  if (PageIsAllVisible(page))
3312  {
3313  all_visible_cleared = true;
3314  PageClearAllVisible(page);
3315  visibilitymap_clear(relation, BufferGetBlockNumber(buffer),
3316  vmbuffer, VISIBILITYMAP_VALID_BITS);
3317  }
3318 
3319  /* store transaction information of xact deleting the tuple */
3322  tp.t_data->t_infomask |= new_infomask;
3323  tp.t_data->t_infomask2 |= new_infomask2;
3325  HeapTupleHeaderSetXmax(tp.t_data, new_xmax);
3326  HeapTupleHeaderSetCmax(tp.t_data, cid, iscombo);
3327  /* Make sure there is no forward chain link in t_ctid */
3328  tp.t_data->t_ctid = tp.t_self;
3329 
3330  /* Signal that this is actually a move into another partition */
3331  if (changingPart)
3333 
3334  MarkBufferDirty(buffer);
3335 
3336  /*
3337  * XLOG stuff
3338  *
3339  * NB: heap_abort_speculative() uses the same xlog record and replay
3340  * routines.
3341  */
3342  if (RelationNeedsWAL(relation))
3343  {
3344  xl_heap_delete xlrec;
3345  XLogRecPtr recptr;
3346 
3347  /* For logical decode we need combocids to properly decode the catalog */
3349  log_heap_new_cid(relation, &tp);
3350 
3351  xlrec.flags = 0;
3352  if (all_visible_cleared)
3354  if (changingPart)
3357  tp.t_data->t_infomask2);
3359  xlrec.xmax = new_xmax;
3360 
3361  if (old_key_tuple != NULL)
3362  {
3363  if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
3365  else
3367  }
3368 
3369  XLogBeginInsert();
3370  XLogRegisterData((char *) &xlrec, SizeOfHeapDelete);
3371 
3372  XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
3373 
3374  /*
3375  * Log replica identity of the deleted tuple if there is one
3376  */
3377  if (old_key_tuple != NULL)
3378  {
3379  xl_heap_header xlhdr;
3380 
3381  xlhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2;
3382  xlhdr.t_infomask = old_key_tuple->t_data->t_infomask;
3383  xlhdr.t_hoff = old_key_tuple->t_data->t_hoff;
3384 
3385  XLogRegisterData((char *) &xlhdr, SizeOfHeapHeader);
3386  XLogRegisterData((char *) old_key_tuple->t_data
3388  old_key_tuple->t_len
3390  }
3391 
3392  /* filtering by origin on a row level is much more efficient */
3394 
3395  recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE);
3396 
3397  PageSetLSN(page, recptr);
3398  }
3399 
3400  END_CRIT_SECTION();
3401 
3402  LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3403 
3404  if (vmbuffer != InvalidBuffer)
3405  ReleaseBuffer(vmbuffer);
3406 
3407  /*
3408  * If the tuple has toasted out-of-line attributes, we need to delete
3409  * those items too. We have to do this before releasing the buffer
3410  * because we need to look at the contents of the tuple, but it's OK to
3411  * release the content lock on the buffer first.
3412  */
3413  if (relation->rd_rel->relkind != RELKIND_RELATION &&
3414  relation->rd_rel->relkind != RELKIND_MATVIEW)
3415  {
3416  /* toast table entries should never be recursively toasted */
3418  }
3419  else if (HeapTupleHasExternal(&tp))
3420  toast_delete(relation, &tp, false);
3421 
3422  /*
3423  * Mark tuple for invalidation from system caches at next command
3424  * boundary. We have to do this before releasing the buffer because we
3425  * need to look at the contents of the tuple.
3426  */
3427  CacheInvalidateHeapTuple(relation, &tp, NULL);
3428 
3429  /* Now we can release the buffer */
3430  ReleaseBuffer(buffer);
3431 
3432  /*
3433  * Release the lmgr tuple lock, if we had it.
3434  */
3435  if (have_tuple_lock)
3436  UnlockTupleTuplock(relation, &(tp.t_self), LockTupleExclusive);
3437 
3438  pgstat_count_heap_delete(relation);
3439 
3440  if (old_key_tuple != NULL && old_key_copied)
3441  heap_freetuple(old_key_tuple);
3442 
3443  return HeapTupleMayBeUpdated;
3444 }
3445 
3446 /*
3447  * simple_heap_delete - delete a tuple
3448  *
3449  * This routine may be used to delete a tuple when concurrent updates of
3450  * the target tuple are not expected (for example, because we have a lock
3451  * on the relation associated with the tuple). Any failure is reported
3452  * via ereport().
3453  */
3454 void
3456 {
3457  HTSU_Result result;
3458  HeapUpdateFailureData hufd;
3459 
3460  result = heap_delete(relation, tid,
3462  true /* wait for commit */ ,
3463  &hufd, false /* changingPart */ );
3464  switch (result)
3465  {
3466  case HeapTupleSelfUpdated:
3467  /* Tuple was already updated in current command? */
3468  elog(ERROR, "tuple already updated by self");
3469  break;
3470 
3471  case HeapTupleMayBeUpdated:
3472  /* done successfully */
3473  break;
3474 
3475  case HeapTupleUpdated:
3476  elog(ERROR, "tuple concurrently updated");
3477  break;
3478 
3479  default:
3480  elog(ERROR, "unrecognized heap_delete status: %u", result);
3481  break;
3482  }
3483 }
3484 
3485 /*
3486  * heap_update - replace a tuple
3487  *
3488  * NB: do not call this directly unless you are prepared to deal with
3489  * concurrent-update conditions. Use simple_heap_update instead.
3490  *
3491  * relation - table to be modified (caller must hold suitable lock)
3492  * otid - TID of old tuple to be replaced
3493  * newtup - newly constructed tuple data to store
3494  * cid - update command ID (used for visibility test, and stored into
3495  * cmax/cmin if successful)
3496  * crosscheck - if not InvalidSnapshot, also check old tuple against this
3497  * wait - true if should wait for any conflicting update to commit/abort
3498  * hufd - output parameter, filled in failure cases (see below)
3499  * lockmode - output parameter, filled with lock mode acquired on tuple
3500  *
3501  * Normal, successful return value is HeapTupleMayBeUpdated, which
3502  * actually means we *did* update it. Failure return codes are
3503  * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
3504  * (the last only possible if wait == false).
3505  *
3506  * On success, the header fields of *newtup are updated to match the new
3507  * stored tuple; in particular, newtup->t_self is set to the TID where the
3508  * new tuple was inserted, and its HEAP_ONLY_TUPLE flag is set iff a HOT
3509  * update was done. However, any TOAST changes in the new tuple's
3510  * data are not reflected into *newtup.
3511  *
3512  * In the failure cases, the routine fills *hufd with the tuple's t_ctid,
3513  * t_xmax (resolving a possible MultiXact, if necessary), and t_cmax
3514  * (the last only for HeapTupleSelfUpdated, since we
3515  * cannot obtain cmax from a combocid generated by another transaction).
3516  * See comments for struct HeapUpdateFailureData for additional info.
3517  */
3520  CommandId cid, Snapshot crosscheck, bool wait,
3521  HeapUpdateFailureData *hufd, LockTupleMode *lockmode)
3522 {
3523  HTSU_Result result;
3525  Bitmapset *hot_attrs;
3526  Bitmapset *proj_idx_attrs;
3527  Bitmapset *key_attrs;
3528  Bitmapset *id_attrs;
3529  Bitmapset *interesting_attrs;
3530  Bitmapset *modified_attrs;
3531  ItemId lp;
3532  HeapTupleData oldtup;
3533  HeapTuple heaptup;
3534  HeapTuple old_key_tuple = NULL;
3535  bool old_key_copied = false;
3536  Page page;
3537  BlockNumber block;
3538  MultiXactStatus mxact_status;
3539  Buffer buffer,
3540  newbuf,
3541  vmbuffer = InvalidBuffer,
3542  vmbuffer_new = InvalidBuffer;
3543  bool need_toast;
3544  Size newtupsize,
3545  pagefree;
3546  bool have_tuple_lock = false;
3547  bool iscombo;
3548  bool use_hot_update = false;
3549  bool hot_attrs_checked = false;
3550  bool key_intact;
3551  bool all_visible_cleared = false;
3552  bool all_visible_cleared_new = false;
3553  bool checked_lockers;
3554  bool locker_remains;
3555  TransactionId xmax_new_tuple,
3556  xmax_old_tuple;
3557  uint16 infomask_old_tuple,
3558  infomask2_old_tuple,
3559  infomask_new_tuple,
3560  infomask2_new_tuple;
3561 
3562  Assert(ItemPointerIsValid(otid));
3563 
3564  /*
3565  * Forbid this during a parallel operation, lest it allocate a combocid.
3566  * Other workers might need that combocid for visibility checks, and we
3567  * have no provision for broadcasting it to them.
3568  */
3569  if (IsInParallelMode())
3570  ereport(ERROR,
3571  (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
3572  errmsg("cannot update tuples during a parallel operation")));
3573 
3574  /*
3575  * Fetch the list of attributes to be checked for various operations.
3576  *
3577  * For HOT considerations, this is wasted effort if we fail to update or
3578  * have to put the new tuple on a different page. But we must compute the
3579  * list before obtaining buffer lock --- in the worst case, if we are
3580  * doing an update on one of the relevant system catalogs, we could
3581  * deadlock if we try to fetch the list later. In any case, the relcache
3582  * caches the data so this is usually pretty cheap.
3583  *
3584  * We also need columns used by the replica identity and columns that are
3585  * considered the "key" of rows in the table.
3586  *
3587  * Note that we get copies of each bitmap, so we need not worry about
3588  * relcache flush happening midway through.
3589  */
3590  hot_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_HOT);
3591  proj_idx_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_PROJ);
3592  key_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_KEY);
3593  id_attrs = RelationGetIndexAttrBitmap(relation,
3595  block = ItemPointerGetBlockNumber(otid);
3596  buffer = ReadBuffer(relation, block);
3597  page = BufferGetPage(buffer);
3598 
3599  interesting_attrs = NULL;
3600 
3601  /*
3602  * If the page is already full, there is hardly any chance of doing a HOT
3603  * update on this page. It might be wasteful effort to look for index
3604  * column updates only to later reject HOT updates for lack of space in
3605  * the same page. So we be conservative and only fetch hot_attrs if the
3606  * page is not already full. Since we are already holding a pin on the
3607  * buffer, there is no chance that the buffer can get cleaned up
3608  * concurrently and even if that was possible, in the worst case we lose a
3609  * chance to do a HOT update.
3610  */
3611  if (!PageIsFull(page))
3612  {
3613  interesting_attrs = bms_add_members(interesting_attrs, hot_attrs);
3614  interesting_attrs = bms_add_members(interesting_attrs, proj_idx_attrs);
3615  hot_attrs_checked = true;
3616  }
3617  interesting_attrs = bms_add_members(interesting_attrs, key_attrs);
3618  interesting_attrs = bms_add_members(interesting_attrs, id_attrs);
3619 
3620  /*
3621  * Before locking the buffer, pin the visibility map page if it appears to
3622  * be necessary. Since we haven't got the lock yet, someone else might be
3623  * in the middle of changing this, so we'll need to recheck after we have
3624  * the lock.
3625  */
3626  if (PageIsAllVisible(page))
3627  visibilitymap_pin(relation, block, &vmbuffer);
3628 
3630 
3631  lp = PageGetItemId(page, ItemPointerGetOffsetNumber(otid));
3632  Assert(ItemIdIsNormal(lp));
3633 
3634  /*
3635  * Fill in enough data in oldtup for HeapDetermineModifiedColumns to work
3636  * properly.
3637  */
3638  oldtup.t_tableOid = RelationGetRelid(relation);
3639  oldtup.t_data = (HeapTupleHeader) PageGetItem(page, lp);
3640  oldtup.t_len = ItemIdGetLength(lp);
3641  oldtup.t_self = *otid;
3642 
3643  /* the new tuple is ready, except for this: */
3644  newtup->t_tableOid = RelationGetRelid(relation);
3645 
3646  /* Fill in OID for newtup */
3647  if (relation->rd_rel->relhasoids)
3648  {
3649 #ifdef NOT_USED
3650  /* this is redundant with an Assert in HeapTupleSetOid */
3651  Assert(newtup->t_data->t_infomask & HEAP_HASOID);
3652 #endif
3653  HeapTupleSetOid(newtup, HeapTupleGetOid(&oldtup));
3654  }
3655  else
3656  {
3657  /* check there is not space for an OID */
3658  Assert(!(newtup->t_data->t_infomask & HEAP_HASOID));
3659  }
3660 
3661  /* Determine columns modified by the update. */
3662  modified_attrs = HeapDetermineModifiedColumns(relation, interesting_attrs,
3663  &oldtup, newtup);
3664 
3665  /*
3666  * If we're not updating any "key" column, we can grab a weaker lock type.
3667  * This allows for more concurrency when we are running simultaneously
3668  * with foreign key checks.
3669  *
3670  * Note that if a column gets detoasted while executing the update, but
3671  * the value ends up being the same, this test will fail and we will use
3672  * the stronger lock. This is acceptable; the important case to optimize
3673  * is updates that don't manipulate key columns, not those that
3674  * serendipitiously arrive at the same key values.
3675  */
3676  if (!bms_overlap(modified_attrs, key_attrs))
3677  {
3678  *lockmode = LockTupleNoKeyExclusive;
3679  mxact_status = MultiXactStatusNoKeyUpdate;
3680  key_intact = true;
3681 
3682  /*
3683  * If this is the first possibly-multixact-able operation in the
3684  * current transaction, set my per-backend OldestMemberMXactId
3685  * setting. We can be certain that the transaction will never become a
3686  * member of any older MultiXactIds than that. (We have to do this
3687  * even if we end up just using our own TransactionId below, since
3688  * some other backend could incorporate our XID into a MultiXact
3689  * immediately afterwards.)
3690  */
3692  }
3693  else
3694  {
3695  *lockmode = LockTupleExclusive;
3696  mxact_status = MultiXactStatusUpdate;
3697  key_intact = false;
3698  }
3699 
3700  /*
3701  * Note: beyond this point, use oldtup not otid to refer to old tuple.
3702  * otid may very well point at newtup->t_self, which we will overwrite
3703  * with the new tuple's location, so there's great risk of confusion if we
3704  * use otid anymore.
3705  */
3706 
3707 l2:
3708  checked_lockers = false;
3709  locker_remains = false;
3710  result = HeapTupleSatisfiesUpdate(&oldtup, cid, buffer);
3711 
3712  /* see below about the "no wait" case */
3713  Assert(result != HeapTupleBeingUpdated || wait);
3714 
3715  if (result == HeapTupleInvisible)
3716  {
3717  UnlockReleaseBuffer(buffer);
3718  ereport(ERROR,
3719  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3720  errmsg("attempted to update invisible tuple")));
3721  }
3722  else if (result == HeapTupleBeingUpdated && wait)
3723  {
3724  TransactionId xwait;
3725  uint16 infomask;
3726  bool can_continue = false;
3727 
3728  /*
3729  * XXX note that we don't consider the "no wait" case here. This
3730  * isn't a problem currently because no caller uses that case, but it
3731  * should be fixed if such a caller is introduced. It wasn't a
3732  * problem previously because this code would always wait, but now
3733  * that some tuple locks do not conflict with one of the lock modes we
3734  * use, it is possible that this case is interesting to handle
3735  * specially.
3736  *
3737  * This may cause failures with third-party code that calls
3738  * heap_update directly.
3739  */
3740 
3741  /* must copy state data before unlocking buffer */
3742  xwait = HeapTupleHeaderGetRawXmax(oldtup.t_data);
3743  infomask = oldtup.t_data->t_infomask;
3744 
3745  /*
3746  * Now we have to do something about the existing locker. If it's a
3747  * multi, sleep on it; we might be awakened before it is completely
3748  * gone (or even not sleep at all in some cases); we need to preserve
3749  * it as locker, unless it is gone completely.
3750  *
3751  * If it's not a multi, we need to check for sleeping conditions
3752  * before actually going to sleep. If the update doesn't conflict
3753  * with the locks, we just continue without sleeping (but making sure
3754  * it is preserved).
3755  *
3756  * Before sleeping, we need to acquire tuple lock to establish our
3757  * priority for the tuple (see heap_lock_tuple). LockTuple will
3758  * release us when we are next-in-line for the tuple. Note we must
3759  * not acquire the tuple lock until we're sure we're going to sleep;
3760  * otherwise we're open for race conditions with other transactions
3761  * holding the tuple lock which sleep on us.
3762  *
3763  * If we are forced to "start over" below, we keep the tuple lock;
3764  * this arranges that we stay at the head of the line while rechecking
3765  * tuple state.
3766  */
3767  if (infomask & HEAP_XMAX_IS_MULTI)
3768  {
3769  TransactionId update_xact;
3770  int remain;
3771 
3772  if (DoesMultiXactIdConflict((MultiXactId) xwait, infomask,
3773  *lockmode))
3774  {
3775  LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3776 
3777  /* acquire tuple lock, if necessary */
3778  heap_acquire_tuplock(relation, &(oldtup.t_self), *lockmode,
3779  LockWaitBlock, &have_tuple_lock);
3780 
3781  /* wait for multixact */
3782  MultiXactIdWait((MultiXactId) xwait, mxact_status, infomask,
3783  relation, &oldtup.t_self, XLTW_Update,
3784  &remain);
3785  checked_lockers = true;
3786  locker_remains = remain != 0;
3788 
3789  /*
3790  * If xwait had just locked the tuple then some other xact
3791  * could update this tuple before we get to this point. Check
3792  * for xmax change, and start over if so.
3793  */
3795  infomask) ||
3797  xwait))
3798  goto l2;
3799  }
3800 
3801  /*
3802  * Note that the multixact may not be done by now. It could have
3803  * surviving members; our own xact or other subxacts of this
3804  * backend, and also any other concurrent transaction that locked
3805  * the tuple with KeyShare if we only got TupleLockUpdate. If
3806  * this is the case, we have to be careful to mark the updated
3807  * tuple with the surviving members in Xmax.
3808  *
3809  * Note that there could have been another update in the
3810  * MultiXact. In that case, we need to check whether it committed
3811  * or aborted. If it aborted we are safe to update it again;
3812  * otherwise there is an update conflict, and we have to return
3813  * HeapTupleUpdated below.
3814  *
3815  * In the LockTupleExclusive case, we still need to preserve the
3816  * surviving members: those would include the tuple locks we had
3817  * before this one, which are important to keep in case this
3818  * subxact aborts.
3819  */
3821  update_xact = HeapTupleGetUpdateXid(oldtup.t_data);
3822  else
3823  update_xact = InvalidTransactionId;
3824 
3825  /*
3826  * There was no UPDATE in the MultiXact; or it aborted. No
3827  * TransactionIdIsInProgress() call needed here, since we called
3828  * MultiXactIdWait() above.
3829  */
3830  if (!TransactionIdIsValid(update_xact) ||
3831  TransactionIdDidAbort(update_xact))
3832  can_continue = true;
3833  }
3834  else if (TransactionIdIsCurrentTransactionId(xwait))
3835  {
3836  /*
3837  * The only locker is ourselves; we can avoid grabbing the tuple
3838  * lock here, but must preserve our locking information.
3839  */
3840  checked_lockers = true;
3841  locker_remains = true;
3842  can_continue = true;
3843  }
3844  else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask) && key_intact)
3845  {
3846  /*
3847  * If it's just a key-share locker, and we're not changing the key
3848  * columns, we don't need to wait for it to end; but we need to
3849  * preserve it as locker.
3850  */
3851  checked_lockers = true;
3852  locker_remains = true;
3853  can_continue = true;
3854  }
3855  else
3856  {
3857  /*
3858  * Wait for regular transaction to end; but first, acquire tuple
3859  * lock.
3860  */
3861  LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3862  heap_acquire_tuplock(relation, &(oldtup.t_self), *lockmode,
3863  LockWaitBlock, &have_tuple_lock);
3864  XactLockTableWait(xwait, relation, &oldtup.t_self,
3865  XLTW_Update);
3866  checked_lockers = true;
3868 
3869  /*
3870  * xwait is done, but if xwait had just locked the tuple then some
3871  * other xact could update this tuple before we get to this point.
3872  * Check for xmax change, and start over if so.
3873  */
3874  if (xmax_infomask_changed(oldtup.t_data->t_infomask, infomask) ||
3875  !TransactionIdEquals(xwait,
3877  goto l2;
3878 
3879  /* Otherwise check if it committed or aborted */
3880  UpdateXmaxHintBits(oldtup.t_data, buffer, xwait);
3881  if (oldtup.t_data->t_infomask & HEAP_XMAX_INVALID)
3882  can_continue = true;
3883  }
3884 
3885  result = can_continue ? HeapTupleMayBeUpdated : HeapTupleUpdated;
3886  }
3887 
3888  if (crosscheck != InvalidSnapshot && result == HeapTupleMayBeUpdated)
3889  {
3890  /* Perform additional check for transaction-snapshot mode RI updates */
3891  if (!HeapTupleSatisfiesVisibility(&oldtup, crosscheck, buffer))
3892  result = HeapTupleUpdated;
3893  }
3894 
3895  if (result != HeapTupleMayBeUpdated)
3896  {
3897  Assert(result == HeapTupleSelfUpdated ||
3898  result == HeapTupleUpdated ||
3899  result == HeapTupleBeingUpdated);
3900  Assert(!(oldtup.t_data->t_infomask & HEAP_XMAX_INVALID));
3901  hufd->ctid = oldtup.t_data->t_ctid;
3902  hufd->xmax = HeapTupleHeaderGetUpdateXid(oldtup.t_data);
3903  if (result == HeapTupleSelfUpdated)
3904  hufd->cmax = HeapTupleHeaderGetCmax(oldtup.t_data);
3905  else
3906  hufd->cmax = InvalidCommandId;
3907  UnlockReleaseBuffer(buffer);
3908  if (have_tuple_lock)
3909  UnlockTupleTuplock(relation, &(oldtup.t_self), *lockmode);
3910  if (vmbuffer != InvalidBuffer)
3911  ReleaseBuffer(vmbuffer);
3912  bms_free(hot_attrs);
3913  bms_free(proj_idx_attrs);
3914  bms_free(key_attrs);
3915  bms_free(id_attrs);
3916  bms_free(modified_attrs);
3917  bms_free(interesting_attrs);
3918  return result;
3919  }
3920 
3921  /*
3922  * If we didn't pin the visibility map page and the page has become all
3923  * visible while we were busy locking the buffer, or during some
3924  * subsequent window during which we had it unlocked, we'll have to unlock
3925  * and re-lock, to avoid holding the buffer lock across an I/O. That's a
3926  * bit unfortunate, especially since we'll now have to recheck whether the
3927  * tuple has been locked or updated under us, but hopefully it won't
3928  * happen very often.
3929  */
3930  if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))
3931  {
3932  LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3933  visibilitymap_pin(relation, block, &vmbuffer);
3935  goto l2;
3936  }
3937 
3938  /* Fill in transaction status data */
3939 
3940  /*
3941  * If the tuple we're updating is locked, we need to preserve the locking
3942  * info in the old tuple's Xmax. Prepare a new Xmax value for this.
3943  */
3945  oldtup.t_data->t_infomask,
3946  oldtup.t_data->t_infomask2,
3947  xid, *lockmode, true,
3948  &xmax_old_tuple, &infomask_old_tuple,
3949  &infomask2_old_tuple);
3950 
3951  /*
3952  * And also prepare an Xmax value for the new copy of the tuple. If there
3953  * was no xmax previously, or there was one but all lockers are now gone,
3954  * then use InvalidXid; otherwise, get the xmax from the old tuple. (In
3955  * rare cases that might also be InvalidXid and yet not have the
3956  * HEAP_XMAX_INVALID bit set; that's fine.)
3957  */
3958  if ((oldtup.t_data->t_infomask & HEAP_XMAX_INVALID) ||
3960  (checked_lockers && !locker_remains))
3961  xmax_new_tuple = InvalidTransactionId;
3962  else
3963  xmax_new_tuple = HeapTupleHeaderGetRawXmax(oldtup.t_data);
3964 
3965  if (!TransactionIdIsValid(xmax_new_tuple))
3966  {
3967  infomask_new_tuple = HEAP_XMAX_INVALID;
3968  infomask2_new_tuple = 0;
3969  }
3970  else
3971  {
3972  /*
3973  * If we found a valid Xmax for the new tuple, then the infomask bits
3974  * to use on the new tuple depend on what was there on the old one.
3975  * Note that since we're doing an update, the only possibility is that
3976  * the lockers had FOR KEY SHARE lock.
3977  */
3978  if (oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI)
3979  {
3980  GetMultiXactIdHintBits(xmax_new_tuple, &infomask_new_tuple,
3981  &infomask2_new_tuple);
3982  }
3983  else
3984  {
3985  infomask_new_tuple = HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_LOCK_ONLY;
3986  infomask2_new_tuple = 0;
3987  }
3988  }
3989 
3990  /*
3991  * Prepare the new tuple with the appropriate initial values of Xmin and
3992  * Xmax, as well as initial infomask bits as computed above.
3993  */
3994  newtup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
3995  newtup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
3996  HeapTupleHeaderSetXmin(newtup->t_data, xid);
3997  HeapTupleHeaderSetCmin(newtup->t_data, cid);
3998  newtup->t_data->t_infomask |= HEAP_UPDATED | infomask_new_tuple;
3999  newtup->t_data->t_infomask2 |= infomask2_new_tuple;
4000  HeapTupleHeaderSetXmax(newtup->t_data, xmax_new_tuple);
4001 
4002  /*
4003  * Replace cid with a combo cid if necessary. Note that we already put
4004  * the plain cid into the new tuple.
4005  */
4006  HeapTupleHeaderAdjustCmax(oldtup.t_data, &cid, &iscombo);
4007 
4008  /*
4009  * If the toaster needs to be activated, OR if the new tuple will not fit
4010  * on the same page as the old, then we need to release the content lock
4011  * (but not the pin!) on the old tuple's buffer while we are off doing
4012  * TOAST and/or table-file-extension work. We must mark the old tuple to
4013  * show that it's locked, else other processes may try to update it
4014  * themselves.
4015  *
4016  * We need to invoke the toaster if there are already any out-of-line
4017  * toasted values present, or if the new tuple is over-threshold.
4018  */
4019  if (relation->rd_rel->relkind != RELKIND_RELATION &&
4020  relation->rd_rel->relkind != RELKIND_MATVIEW)
4021  {
4022  /* toast table entries should never be recursively toasted */
4023  Assert(!HeapTupleHasExternal(&oldtup));
4024  Assert(!HeapTupleHasExternal(newtup));
4025  need_toast = false;
4026  }
4027  else
4028  need_toast = (HeapTupleHasExternal(&oldtup) ||
4029  HeapTupleHasExternal(newtup) ||
4030  newtup->t_len > TOAST_TUPLE_THRESHOLD);
4031 
4032  pagefree = PageGetHeapFreeSpace(page);
4033 
4034  newtupsize = MAXALIGN(newtup->t_len);
4035 
4036  if (need_toast || newtupsize > pagefree)
4037  {
4038  TransactionId xmax_lock_old_tuple;
4039  uint16 infomask_lock_old_tuple,
4040  infomask2_lock_old_tuple;
4041  bool cleared_all_frozen = false;
4042 
4043  /*
4044  * To prevent concurrent sessions from updating the tuple, we have to
4045  * temporarily mark it locked, while we release the page-level lock.
4046  *
4047  * To satisfy the rule that any xid potentially appearing in a buffer
4048  * written out to disk, we unfortunately have to WAL log this
4049  * temporary modification. We can reuse xl_heap_lock for this
4050  * purpose. If we crash/error before following through with the
4051  * actual update, xmax will be of an aborted transaction, allowing
4052  * other sessions to proceed.
4053  */
4054 
4055  /*
4056  * Compute xmax / infomask appropriate for locking the tuple. This has
4057  * to be done separately from the combo that's going to be used for
4058  * updating, because the potentially created multixact would otherwise
4059  * be wrong.
4060  */
4062  oldtup.t_data->t_infomask,
4063  oldtup.t_data->t_infomask2,
4064  xid, *lockmode, false,
4065  &xmax_lock_old_tuple, &infomask_lock_old_tuple,
4066  &infomask2_lock_old_tuple);
4067 
4068  Assert(HEAP_XMAX_IS_LOCKED_ONLY(infomask_lock_old_tuple));
4069 
4071 
4072  /* Clear obsolete visibility flags ... */
4073  oldtup.t_data->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
4074  oldtup.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;
4075  HeapTupleClearHotUpdated(&oldtup);
4076  /* ... and store info about transaction updating this tuple */
4077  Assert(TransactionIdIsValid(xmax_lock_old_tuple));
4078  HeapTupleHeaderSetXmax(oldtup.t_data, xmax_lock_old_tuple);
4079  oldtup.t_data->t_infomask |= infomask_lock_old_tuple;
4080  oldtup.t_data->t_infomask2 |= infomask2_lock_old_tuple;
4081  HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo);
4082 
4083  /* temporarily make it look not-updated, but locked */
4084  oldtup.t_data->t_ctid = oldtup.t_self;
4085 
4086  /*
4087  * Clear all-frozen bit on visibility map if needed. We could
4088  * immediately reset ALL_VISIBLE, but given that the WAL logging
4089  * overhead would be unchanged, that doesn't seem necessarily
4090  * worthwhile.
4091  */
4092  if (PageIsAllVisible(BufferGetPage(buffer)) &&
4093  visibilitymap_clear(relation, block, vmbuffer,
4095  cleared_all_frozen = true;
4096 
4097  MarkBufferDirty(buffer);
4098 
4099  if (RelationNeedsWAL(relation))
4100  {
4101  xl_heap_lock xlrec;
4102  XLogRecPtr recptr;
4103 
4104  XLogBeginInsert();
4105  XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
4106 
4107  xlrec.offnum = ItemPointerGetOffsetNumber(&oldtup.t_self);
4108  xlrec.locking_xid = xmax_lock_old_tuple;
4110  oldtup.t_data->t_infomask2);
4111  xlrec.flags =
4112  cleared_all_frozen ? XLH_LOCK_ALL_FROZEN_CLEARED : 0;
4113  XLogRegisterData((char *) &xlrec, SizeOfHeapLock);
4114  recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK);
4115  PageSetLSN(page, recptr);
4116  }
4117 
4118  END_CRIT_SECTION();
4119 
4120  LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
4121 
4122  /*
4123  * Let the toaster do its thing, if needed.
4124  *
4125  * Note: below this point, heaptup is the data we actually intend to
4126  * store into the relation; newtup is the caller's original untoasted
4127  * data.
4128  */
4129  if (need_toast)
4130  {
4131  /* Note we always use WAL and FSM during updates */
4132  heaptup = toast_insert_or_update(relation, newtup, &oldtup, 0);
4133  newtupsize = MAXALIGN(heaptup->t_len);
4134  }
4135  else
4136  heaptup = newtup;
4137 
4138  /*
4139  * Now, do we need a new page for the tuple, or not? This is a bit
4140  * tricky since someone else could have added tuples to the page while
4141  * we weren't looking. We have to recheck the available space after
4142  * reacquiring the buffer lock. But don't bother to do that if the
4143  * former amount of free space is still not enough; it's unlikely
4144  * there's more free now than before.
4145  *
4146  * What's more, if we need to get a new page, we will need to acquire
4147  * buffer locks on both old and new pages. To avoid deadlock against
4148  * some other backend trying to get the same two locks in the other
4149  * order, we must be consistent about the order we get the locks in.
4150  * We use the rule "lock the lower-numbered page of the relation
4151  * first". To implement this, we must do RelationGetBufferForTuple
4152  * while not holding the lock on the old page, and we must rely on it
4153  * to get the locks on both pages in the correct order.
4154  */
4155  if (newtupsize > pagefree)
4156  {
4157  /* Assume there's no chance to put heaptup on same page. */
4158  newbuf = RelationGetBufferForTuple(relation, heaptup->t_len,
4159  buffer, 0, NULL,
4160  &vmbuffer_new, &vmbuffer);
4161  }
4162  else
4163  {
4164  /* Re-acquire the lock on the old tuple's page. */
4166  /* Re-check using the up-to-date free space */
4167  pagefree = PageGetHeapFreeSpace(page);
4168  if (newtupsize > pagefree)
4169  {
4170  /*
4171  * Rats, it doesn't fit anymore. We must now unlock and
4172  * relock to avoid deadlock. Fortunately, this path should
4173  * seldom be taken.
4174  */
4175  LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
4176  newbuf = RelationGetBufferForTuple(relation, heaptup->t_len,
4177  buffer, 0, NULL,
4178  &vmbuffer_new, &vmbuffer);
4179  }
4180  else
4181  {
4182  /* OK, it fits here, so we're done. */
4183  newbuf = buffer;
4184  }
4185  }
4186  }
4187  else
4188  {
4189  /* No TOAST work needed, and it'll fit on same page */
4190  newbuf = buffer;
4191  heaptup = newtup;
4192  }
4193 
4194  /*
4195  * We're about to do the actual update -- check for conflict first, to
4196  * avoid possibly having to roll back work we've just done.
4197  *
4198  * This is safe without a recheck as long as there is no possibility of
4199  * another process scanning the pages between this check and the update
4200  * being visible to the scan (i.e., exclusive buffer content lock(s) are
4201  * continuously held from this point until the tuple update is visible).
4202  *
4203  * For the new tuple the only check needed is at the relation level, but
4204  * since both tuples are in the same relation and the check for oldtup
4205  * will include checking the relation level, there is no benefit to a
4206  * separate check for the new tuple.
4207  */
4208  CheckForSerializableConflictIn(relation, &oldtup, buffer);
4209 
4210  /*
4211  * At this point newbuf and buffer are both pinned and locked, and newbuf
4212  * has enough space for the new tuple. If they are the same buffer, only
4213  * one pin is held.
4214  */
4215 
4216  if (newbuf == buffer)
4217  {
4218  /*
4219  * Since the new tuple is going into the same page, we might be able
4220  * to do a HOT update. Check if any of the index columns have been
4221  * changed, or if we have projection functional indexes, check whether
4222  * the old and the new values are the same. If the page was already
4223  * full, we may have skipped checking for index columns. If so, HOT
4224  * update is possible.
4225  */
4226  if (hot_attrs_checked
4227  && !bms_overlap(modified_attrs, hot_attrs)
4228  && (!bms_overlap(modified_attrs, proj_idx_attrs)
4229  || ProjIndexIsUnchanged(relation, &oldtup, newtup)))
4230  {
4231  use_hot_update = true;
4232  }
4233  }
4234  else
4235  {
4236  /* Set a hint that the old page could use prune/defrag */
4237  PageSetFull(page);
4238  }
4239 
4240  /*
4241  * Compute replica identity tuple before entering the critical section so
4242  * we don't PANIC upon a memory allocation failure.
4243  * ExtractReplicaIdentity() will return NULL if nothing needs to be
4244  * logged.
4245  */
4246  old_key_tuple = ExtractReplicaIdentity(relation, &oldtup,
4247  bms_overlap(modified_attrs, id_attrs),
4248  &old_key_copied);
4249 
4250  /* NO EREPORT(ERROR) from here till changes are logged */
4252 
4253  /*
4254  * If this transaction commits, the old tuple will become DEAD sooner or
4255  * later. Set flag that this page is a candidate for pruning once our xid
4256  * falls below the OldestXmin horizon. If the transaction finally aborts,
4257  * the subsequent page pruning will be a no-op and the hint will be
4258  * cleared.
4259  *
4260  * XXX Should we set hint on newbuf as well? If the transaction aborts,
4261  * there would be a prunable tuple in the newbuf; but for now we choose
4262  * not to optimize for aborts. Note that heap_xlog_update must be kept in
4263  * sync if this decision changes.
4264  */
4265  PageSetPrunable(page, xid);
4266 
4267  if (use_hot_update)
4268  {
4269  /* Mark the old tuple as HOT-updated */
4270  HeapTupleSetHotUpdated(&oldtup);
4271  /* And mark the new tuple as heap-only */
4272  HeapTupleSetHeapOnly(heaptup);
4273  /* Mark the caller's copy too, in case different from heaptup */
4274  HeapTupleSetHeapOnly(newtup);
4275  }
4276  else
4277  {
4278  /* Make sure tuples are correctly marked as not-HOT */
4279  HeapTupleClearHotUpdated(&oldtup);
4280  HeapTupleClearHeapOnly(heaptup);
4281  HeapTupleClearHeapOnly(newtup);
4282  }
4283 
4284  RelationPutHeapTuple(relation, newbuf, heaptup, false); /* insert new tuple */
4285 
4286 
4287  /* Clear obsolete visibility flags, possibly set by ourselves above... */
4288  oldtup.t_data->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
4289  oldtup.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;
4290  /* ... and store info about transaction updating this tuple */
4291  Assert(TransactionIdIsValid(xmax_old_tuple));
4292  HeapTupleHeaderSetXmax(oldtup.t_data, xmax_old_tuple);
4293  oldtup.t_data->t_infomask |= infomask_old_tuple;
4294  oldtup.t_data->t_infomask2 |= infomask2_old_tuple;
4295  HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo);
4296 
4297  /* record address of new tuple in t_ctid of old one */
4298  oldtup.t_data->t_ctid = heaptup->t_self;
4299 
4300  /* clear PD_ALL_VISIBLE flags, reset all visibilitymap bits */
4301  if (PageIsAllVisible(BufferGetPage(buffer)))
4302  {
4303  all_visible_cleared = true;
4305  visibilitymap_clear(relation, BufferGetBlockNumber(buffer),
4306  vmbuffer, VISIBILITYMAP_VALID_BITS);
4307  }
4308  if (newbuf != buffer && PageIsAllVisible(BufferGetPage(newbuf)))
4309  {
4310  all_visible_cleared_new = true;
4312  visibilitymap_clear(relation, BufferGetBlockNumber(newbuf),
4313  vmbuffer_new, VISIBILITYMAP_VALID_BITS);
4314  }
4315 
4316  if (newbuf != buffer)
4317  MarkBufferDirty(newbuf);
4318  MarkBufferDirty(buffer);
4319 
4320  /* XLOG stuff */
4321  if (RelationNeedsWAL(relation))
4322  {
4323  XLogRecPtr recptr;
4324 
4325  /*
4326  * For logical decoding we need combocids to properly decode the
4327  * catalog.
4328  */
4330  {
4331  log_heap_new_cid(relation, &oldtup);
4332  log_heap_new_cid(relation, heaptup);
4333  }
4334 
4335  recptr = log_heap_update(relation, buffer,
4336  newbuf, &oldtup, heaptup,
4337  old_key_tuple,
4338  all_visible_cleared,
4339  all_visible_cleared_new);
4340  if (newbuf != buffer)
4341  {
4342  PageSetLSN(BufferGetPage(newbuf), recptr);
4343  }
4344  PageSetLSN(BufferGetPage(buffer), recptr);
4345  }
4346 
4347  END_CRIT_SECTION();
4348 
4349  if (newbuf != buffer)
4350  LockBuffer(newbuf, BUFFER_LOCK_UNLOCK);
4351  LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
4352 
4353  /*
4354  * Mark old tuple for invalidation from system caches at next command
4355  * boundary, and mark the new tuple for invalidation in case we abort. We
4356  * have to do this before releasing the buffer because oldtup is in the
4357  * buffer. (heaptup is all in local memory, but it's necessary to process
4358  * both tuple versions in one call to inval.c so we can avoid redundant
4359  * sinval messages.)
4360  */
4361  CacheInvalidateHeapTuple(relation, &oldtup, heaptup);
4362 
4363  /* Now we can release the buffer(s) */
4364  if (newbuf != buffer)
4365  ReleaseBuffer(newbuf);
4366  ReleaseBuffer(buffer);
4367  if (BufferIsValid(vmbuffer_new))
4368  ReleaseBuffer(vmbuffer_new);
4369  if (BufferIsValid(vmbuffer))
4370  ReleaseBuffer(vmbuffer);
4371 
4372  /*
4373  * Release the lmgr tuple lock, if we had it.
4374  */
4375  if (have_tuple_lock)
4376  UnlockTupleTuplock(relation, &(oldtup.t_self), *lockmode);
4377 
4378  pgstat_count_heap_update(relation, use_hot_update);
4379 
4380  /*
4381  * If heaptup is a private copy, release it. Don't forget to copy t_self
4382  * back to the caller's image, too.
4383  */
4384  if (heaptup != newtup)
4385  {
4386  newtup->t_self = heaptup->t_self;
4387  heap_freetuple(heaptup);
4388  }
4389 
4390  if (old_key_tuple != NULL && old_key_copied)
4391  heap_freetuple(old_key_tuple);
4392 
4393  bms_free(hot_attrs);
4394  bms_free(proj_idx_attrs);
4395  bms_free(key_attrs);
4396  bms_free(id_attrs);
4397  bms_free(modified_attrs);
4398  bms_free(interesting_attrs);
4399 
4400  return HeapTupleMayBeUpdated;
4401 }
4402 
4403 /*
4404  * Check if the specified attribute's value is same in both given tuples.
4405  * Subroutine for HeapDetermineModifiedColumns.
4406  */
4407 static bool
4408 heap_tuple_attr_equals(TupleDesc tupdesc, int attrnum,
4409  HeapTuple tup1, HeapTuple tup2)
4410 {
4411  Datum value1,
4412  value2;
4413  bool isnull1,
4414  isnull2;
4415  Form_pg_attribute att;
4416 
4417  /*
4418  * If it's a whole-tuple reference, say "not equal". It's not really
4419  * worth supporting this case, since it could only succeed after a no-op
4420  * update, which is hardly a case worth optimizing for.
4421  */
4422  if (attrnum == 0)
4423  return false;
4424 
4425  /*
4426  * Likewise, automatically say "not equal" for any system attribute other
4427  * than OID and tableOID; we cannot expect these to be consistent in a HOT
4428  * chain, or even to be set correctly yet in the new tuple.
4429  */
4430  if (attrnum < 0)
4431  {
4432  if (attrnum != ObjectIdAttributeNumber &&
4433  attrnum != TableOidAttributeNumber)
4434  return false;
4435  }
4436 
4437  /*
4438  * Extract the corresponding values. XXX this is pretty inefficient if
4439  * there are many indexed columns. Should HeapDetermineModifiedColumns do
4440  * a single heap_deform_tuple call on each tuple, instead? But that
4441  * doesn't work for system columns ...
4442  */
4443  value1 = heap_getattr(tup1, attrnum, tupdesc, &isnull1);
4444  value2 = heap_getattr(tup2, attrnum, tupdesc, &isnull2);
4445 
4446  /*
4447  * If one value is NULL and other is not, then they are certainly not
4448  * equal
4449  */
4450  if (isnull1 != isnull2)
4451  return false;
4452 
4453  /*
4454  * If both are NULL, they can be considered equal.
4455  */
4456  if (isnull1)
4457  return true;
4458 
4459  /*
4460  * We do simple binary comparison of the two datums. This may be overly
4461  * strict because there can be multiple binary representations for the
4462  * same logical value. But we should be OK as long as there are no false
4463  * positives. Using a type-specific equality operator is messy because
4464  * there could be multiple notions of equality in different operator
4465  * classes; furthermore, we cannot safely invoke user-defined functions
4466  * while holding exclusive buffer lock.
4467  */
4468  if (attrnum <= 0)
4469  {
4470  /* The only allowed system columns are OIDs, so do this */
4471  return (DatumGetObjectId(value1) == DatumGetObjectId(value2));
4472  }
4473  else
4474  {
4475  Assert(attrnum <= tupdesc->natts);
4476  att = TupleDescAttr(tupdesc, attrnum - 1);
4477  return datumIsEqual(value1, value2, att->attbyval, att->attlen);
4478  }
4479 }
4480 
4481 /*
4482  * Check whether the value is unchanged after update of a projection
4483  * functional index. Compare the new and old values of the indexed
4484  * expression to see if we are able to use a HOT update or not.
4485  */
4486 static bool
4488 {
4489  ListCell *l;
4490  List *indexoidlist = RelationGetIndexList(relation);
4491  EState *estate = CreateExecutorState();
4492  ExprContext *econtext = GetPerTupleExprContext(estate);
4494  bool equals = true;
4495  Datum old_values[INDEX_MAX_KEYS];
4496  bool old_isnull[INDEX_MAX_KEYS];
4497  Datum new_values[INDEX_MAX_KEYS];
4498  bool new_isnull[INDEX_MAX_KEYS];
4499  int indexno = 0;
4500 
4501  econtext->ecxt_scantuple = slot;
4502 
4503  foreach(l, indexoidlist)
4504  {
4505  if (bms_is_member(indexno, relation->rd_projidx))
4506  {
4507  Oid indexOid = lfirst_oid(l);
4508  Relation indexDesc = index_open(indexOid, AccessShareLock);
4509  IndexInfo *indexInfo = BuildIndexInfo(indexDesc);
4510  int i;
4511 
4512  ResetExprContext(econtext);
4513  ExecStoreTuple(oldtup, slot, InvalidBuffer, false);
4514  FormIndexDatum(indexInfo,
4515  slot,
4516  estate,
4517  old_values,
4518  old_isnull);
4519 
4520  ExecStoreTuple(newtup, slot, InvalidBuffer, false);
4521  FormIndexDatum(indexInfo,
4522  slot,
4523  estate,
4524  new_values,
4525  new_isnull);
4526 
4527  for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
4528  {
4529  if (old_isnull[i] != new_isnull[i])
4530  {
4531  equals = false;
4532  break;
4533  }
4534  else if (!old_isnull[i])
4535  {
4536  Form_pg_attribute att = TupleDescAttr(RelationGetDescr(indexDesc), i);
4537 
4538  if (!datumIsEqual(old_values[i], new_values[i], att->attbyval, att->attlen))
4539  {
4540  equals = false;
4541  break;
4542  }
4543  }
4544  }
4545  index_close(indexDesc, AccessShareLock);
4546 
4547  if (!equals)
4548  {
4549  break;
4550  }
4551  }
4552  indexno += 1;
4553  }
4555  FreeExecutorState(estate);
4556 
4557  return equals;
4558 }
4559 
4560 
4561 /*
4562  * Check which columns are being updated.
4563  *
4564  * Given an updated tuple, determine (and return into the output bitmapset),
4565  * from those listed as interesting, the set of columns that changed.
4566  *
4567  * The input bitmapset is destructively modified; that is OK since this is
4568  * invoked at most once in heap_update.
4569  */
4570 static Bitmapset *
4572  HeapTuple oldtup, HeapTuple newtup)
4573 {
4574  int attnum;
4575  Bitmapset *modified = NULL;
4576 
4577  while ((attnum = bms_first_member(interesting_cols)) >= 0)
4578  {
4580 
4582  attnum, oldtup, newtup))
4583  modified = bms_add_member(modified,
4585  }
4586 
4587  return modified;
4588 }
4589 
4590 /*
4591  * simple_heap_update - replace a tuple
4592  *
4593  * This routine may be used to update a tuple when concurrent updates of
4594  * the target tuple are not expected (for example, because we have a lock
4595  * on the relation associated with the tuple). Any failure is reported
4596  * via ereport().
4597  */
4598 void
4600 {
4601  HTSU_Result result;
4602  HeapUpdateFailureData hufd;
4603  LockTupleMode lockmode;
4604 
4605  result = heap_update(relation, otid, tup,
4607  true /* wait for commit */ ,
4608  &hufd, &lockmode);
4609  switch (result)
4610  {
4611  case HeapTupleSelfUpdated:
4612  /* Tuple was already updated in current command? */
4613  elog(ERROR, "tuple already updated by self");
4614  break;
4615 
4616  case HeapTupleMayBeUpdated:
4617  /* done successfully */
4618  break;
4619 
4620  case HeapTupleUpdated:
4621  elog(ERROR, "tuple concurrently updated");
4622  break;
4623 
4624  default:
4625  elog(ERROR, "unrecognized heap_update status: %u", result);
4626  break;
4627  }
4628 }
4629 
4630 
4631 /*
4632  * Return the MultiXactStatus corresponding to the given tuple lock mode.
4633  */
4634 static MultiXactStatus
4636 {
4637  int retval;
4638 
4639  if (is_update)
4640  retval = tupleLockExtraInfo[mode].updstatus;
4641  else
4642  retval = tupleLockExtraInfo[mode].lockstatus;
4643 
4644  if (retval == -1)
4645  elog(ERROR, "invalid lock tuple mode %d/%s", mode,
4646  is_update ? "true" : "false");
4647 
4648  return (MultiXactStatus) retval;
4649 }
4650 
4651 /*
4652  * heap_lock_tuple - lock a tuple in shared or exclusive mode
4653  *
4654  * Note that this acquires a buffer pin, which the caller must release.
4655  *
4656  * Input parameters:
4657  * relation: relation containing tuple (caller must hold suitable lock)
4658  * tuple->t_self: TID of tuple to lock (rest of struct need not be valid)
4659  * cid: current command ID (used for visibility test, and stored into
4660  * tuple's cmax if lock is successful)
4661  * mode: indicates if shared or exclusive tuple lock is desired
4662  * wait_policy: what to do if tuple lock is not available
4663  * follow_updates: if true, follow the update chain to also lock descendant
4664  * tuples.
4665  *
4666  * Output parameters:
4667  * *tuple: all fields filled in
4668  * *buffer: set to buffer holding tuple (pinned but not locked at exit)
4669  * *hufd: filled in failure cases (see below)
4670  *
4671  * Function result may be:
4672  * HeapTupleMayBeUpdated: lock was successfully acquired
4673  * HeapTupleInvisible: lock failed because tuple was never visible to us
4674  * HeapTupleSelfUpdated: lock failed because tuple updated by self
4675  * HeapTupleUpdated: lock failed because tuple updated by other xact
4676  * HeapTupleWouldBlock: lock couldn't be acquired and wait_policy is skip
4677  *
4678  * In the failure cases other than HeapTupleInvisible, the routine fills
4679  * *hufd with the tuple's t_ctid, t_xmax (resolving a possible MultiXact,
4680  * if necessary), and t_cmax (the last only for HeapTupleSelfUpdated,
4681  * since we cannot obtain cmax from a combocid generated by another
4682  * transaction).
4683  * See comments for struct HeapUpdateFailureData for additional info.
4684  *
4685  * See README.tuplock for a thorough explanation of this mechanism.
4686  */
4689  CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy,
4690  bool follow_updates,
4692 {
4693  HTSU_Result result;
4694  ItemPointer tid = &(tuple->t_self);
4695  ItemId lp;
4696  Page page;
4697  Buffer vmbuffer = InvalidBuffer;
4698  BlockNumber block;
4699  TransactionId xid,
4700  xmax;
4701  uint16 old_infomask,
4702  new_infomask,
4703  new_infomask2;
4704  bool first_time = true;
4705  bool have_tuple_lock = false;
4706  bool cleared_all_frozen = false;
4707 
4708  *buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
4709  block = ItemPointerGetBlockNumber(tid);
4710 
4711  /*
4712  * Before locking the buffer, pin the visibility map page if it appears to
4713  * be necessary. Since we haven't got the lock yet, someone else might be
4714  * in the middle of changing this, so we'll need to recheck after we have
4715  * the lock.
4716  */
4717  if (PageIsAllVisible(BufferGetPage(*buffer)))
4718  visibilitymap_pin(relation, block, &vmbuffer);
4719 
4721 
4722  page = BufferGetPage(*buffer);
4723  lp = PageGetItemId(page, ItemPointerGetOffsetNumber(tid));
4724  Assert(ItemIdIsNormal(lp));
4725 
4726  tuple->t_data = (HeapTupleHeader) PageGetItem(page, lp);
4727  tuple->t_len = ItemIdGetLength(lp);
4728  tuple->t_tableOid = RelationGetRelid(relation);
4729 
4730 l3:
4731  result = HeapTupleSatisfiesUpdate(tuple, cid, *buffer);
4732 
4733  if (result == HeapTupleInvisible)
4734  {
4735  /*
4736  * This is possible, but only when locking a tuple for ON CONFLICT
4737  * UPDATE. We return this value here rather than throwing an error in
4738  * order to give that case the opportunity to throw a more specific
4739  * error.
4740  */
4741  result = HeapTupleInvisible;
4742  goto out_locked;
4743  }
4744  else if (result == HeapTupleBeingUpdated || result == HeapTupleUpdated)
4745  {
4746  TransactionId xwait;
4747  uint16 infomask;
4748  uint16 infomask2;
4749  bool require_sleep;
4750  ItemPointerData t_ctid;
4751 
4752  /* must copy state data before unlocking buffer */
4753  xwait = HeapTupleHeaderGetRawXmax(tuple->t_data);
4754  infomask = tuple->t_data->t_infomask;
4755  infomask2 = tuple->t_data->t_infomask2;
4756  ItemPointerCopy(&tuple->t_data->t_ctid, &t_ctid);
4757 
4758  LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
4759 
4760  /*
4761  * If any subtransaction of the current top transaction already holds
4762  * a lock as strong as or stronger than what we're requesting, we
4763  * effectively hold the desired lock already. We *must* succeed
4764  * without trying to take the tuple lock, else we will deadlock
4765  * against anyone wanting to acquire a stronger lock.
4766  *
4767  * Note we only do this the first time we loop on the HTSU result;
4768  * there is no point in testing in subsequent passes, because
4769  * evidently our own transaction cannot have acquired a new lock after
4770  * the first time we checked.
4771  */
4772  if (first_time)
4773  {
4774  first_time = false;
4775 
4776  if (infomask & HEAP_XMAX_IS_MULTI)
4777  {
4778  int i;
4779  int nmembers;
4780  MultiXactMember *members;
4781 
4782  /*
4783  * We don't need to allow old multixacts here; if that had
4784  * been the case, HeapTupleSatisfiesUpdate would have returned
4785  * MayBeUpdated and we wouldn't be here.
4786  */
4787  nmembers =
4788  GetMultiXactIdMembers(xwait, &members, false,
4789  HEAP_XMAX_IS_LOCKED_ONLY(infomask));
4790 
4791  for (i = 0; i < nmembers; i++)
4792  {
4793  /* only consider members of our own transaction */
4794  if (!TransactionIdIsCurrentTransactionId(members[i].xid))
4795  continue;
4796 
4797  if (TUPLOCK_from_mxstatus(members[i].status) >= mode)
4798  {
4799  pfree(members);
4800  result = HeapTupleMayBeUpdated;
4801  goto out_unlocked;
4802  }
4803  }
4804 
4805  if (members)
4806  pfree(members);
4807  }
4808  else if (TransactionIdIsCurrentTransactionId(xwait))
4809  {
4810  switch (mode)
4811  {
4812  case LockTupleKeyShare:
4813  Assert(HEAP_XMAX_IS_KEYSHR_LOCKED(infomask) ||
4814  HEAP_XMAX_IS_SHR_LOCKED(infomask) ||
4815  HEAP_XMAX_IS_EXCL_LOCKED(infomask));
4816  result = HeapTupleMayBeUpdated;
4817  goto out_unlocked;
4818  case LockTupleShare:
4819  if (HEAP_XMAX_IS_SHR_LOCKED(infomask) ||
4820  HEAP_XMAX_IS_EXCL_LOCKED(infomask))
4821  {
4822  result = HeapTupleMayBeUpdated;
4823  goto out_unlocked;
4824  }
4825  break;
4827  if (HEAP_XMAX_IS_EXCL_LOCKED(infomask))
4828  {
4829  result = HeapTupleMayBeUpdated;
4830  goto out_unlocked;
4831  }
4832  break;
4833  case LockTupleExclusive:
4834  if (HEAP_XMAX_IS_EXCL_LOCKED(infomask) &&
4835  infomask2 & HEAP_KEYS_UPDATED)
4836  {
4837  result = HeapTupleMayBeUpdated;
4838  goto out_unlocked;
4839  }
4840  break;
4841  }
4842  }
4843  }
4844 
4845  /*
4846  * Initially assume that we will have to wait for the locking
4847  * transaction(s) to finish. We check various cases below in which
4848  * this can be turned off.
4849  */
4850  require_sleep = true;
4851  if (mode == LockTupleKeyShare)
4852  {
4853  /*
4854  * If we're requesting KeyShare, and there's no update present, we
4855  * don't need to wait. Even if there is an update, we can still
4856  * continue if the key hasn't been modified.
4857  *
4858  * However, if there are updates, we need to walk the update chain
4859  * to mark future versions of the row as locked, too. That way,
4860  * if somebody deletes that future version, we're protected
4861  * against the key going away. This locking of future versions
4862  * could block momentarily, if a concurrent transaction is
4863  * deleting a key; or it could return a value to the effect that
4864  * the transaction deleting the key has already committed. So we
4865  * do this before re-locking the buffer; otherwise this would be
4866  * prone to deadlocks.
4867  *
4868  * Note that the TID we're locking was grabbed before we unlocked
4869  * the buffer. For it to change while we're not looking, the
4870  * other properties we're testing for below after re-locking the
4871  * buffer would also change, in which case we would restart this
4872  * loop above.
4873  */
4874  if (!(infomask2 & HEAP_KEYS_UPDATED))
4875  {
4876  bool updated;
4877 
4878  updated = !HEAP_XMAX_IS_LOCKED_ONLY(infomask);
4879 
4880  /*
4881  * If there are updates, follow the update chain; bail out if
4882  * that cannot be done.
4883  */
4884  if (follow_updates && updated)
4885  {
4886  HTSU_Result res;
4887 
4888  res = heap_lock_updated_tuple(relation, tuple, &t_ctid,
4890  mode);
4891  if (res != HeapTupleMayBeUpdated)
4892  {
4893  result = res;
4894  /* recovery code expects to have buffer lock held */
4896  goto failed;
4897  }
4898  }
4899 
4901 
4902  /*
4903  * Make sure it's still an appropriate lock, else start over.
4904  * Also, if it wasn't updated before we released the lock, but
4905  * is updated now, we start over too; the reason is that we
4906  * now need to follow the update chain to lock the new
4907  * versions.
4908  */
4909  if (!HeapTupleHeaderIsOnlyLocked(tuple->t_data) &&
4910  ((tuple->t_data->t_infomask2 & HEAP_KEYS_UPDATED) ||
4911  !updated))
4912  goto l3;
4913 
4914  /* Things look okay, so we can skip sleeping */
4915  require_sleep = false;
4916 
4917  /*
4918  * Note we allow Xmax to change here; other updaters/lockers
4919  * could have modified it before we grabbed the buffer lock.
4920  * However, this is not a problem, because with the recheck we
4921  * just did we ensure that they still don't conflict with the
4922  * lock we want.
4923  */
4924  }
4925  }
4926  else if (mode == LockTupleShare)
4927  {
4928  /*
4929  * If we're requesting Share, we can similarly avoid sleeping if
4930  * there's no update and no exclusive lock present.
4931  */
4932  if (HEAP_XMAX_IS_LOCKED_ONLY(infomask) &&
4933  !HEAP_XMAX_IS_EXCL_LOCKED(infomask))
4934  {
4936 
4937  /*
4938  * Make sure it's still an appropriate lock, else start over.
4939  * See above about allowing xmax to change.
4940  */
4941  if (!HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_data->t_infomask) ||
4943  goto l3;
4944  require_sleep = false;
4945  }
4946  }
4947  else if (mode == LockTupleNoKeyExclusive)
4948  {
4949  /*
4950  * If we're requesting NoKeyExclusive, we might also be able to
4951  * avoid sleeping; just ensure that there no conflicting lock
4952  * already acquired.
4953  */
4954  if (infomask & HEAP_XMAX_IS_MULTI)
4955  {
4956  if (!DoesMultiXactIdConflict((MultiXactId) xwait, infomask,
4957  mode))
4958  {
4959  /*
4960  * No conflict, but if the xmax changed under us in the
4961  * meantime, start over.
4962  */
4964  if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) ||
4966  xwait))
4967  goto l3;
4968 
4969  /* otherwise, we're good */
4970  require_sleep = false;
4971  }
4972  }
4973  else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask))
4974  {
4976 
4977  /* if the xmax changed in the meantime, start over */
4978  if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) ||
4981  xwait))
4982  goto l3;
4983  /* otherwise, we're good */
4984  require_sleep = false;
4985  }
4986  }
4987 
4988  /*
4989  * As a check independent from those above, we can also avoid sleeping
4990  * if the current transaction is the sole locker of the tuple. Note
4991  * that the strength of the lock already held is irrelevant; this is
4992  * not about recording the lock in Xmax (which will be done regardless
4993  * of this optimization, below). Also, note that the cases where we
4994  * hold a lock stronger than we are requesting are already handled
4995  * above by not doing anything.
4996  *
4997  * Note we only deal with the non-multixact case here; MultiXactIdWait
4998  * is well equipped to deal with this situation on its own.
4999  */
5000  if (require_sleep && !(infomask & HEAP_XMAX_IS_MULTI) &&
5002  {
5003  /* ... but if the xmax changed in the meantime, start over */
5005  if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) ||
5007  xwait))
5008  goto l3;
5010  require_sleep = false;
5011  }
5012 
5013  /*
5014  * Time to sleep on the other transaction/multixact, if necessary.
5015  *
5016  * If the other transaction is an update that's already committed,
5017  * then sleeping cannot possibly do any good: if we're required to
5018  * sleep, get out to raise an error instead.
5019  *
5020  * By here, we either have already acquired the buffer exclusive lock,
5021  * or we must wait for the locking transaction or multixact; so below
5022  * we ensure that we grab buffer lock after the sleep.
5023  */
5024  if (require_sleep && result == HeapTupleUpdated)
5025  {
5027  goto failed;
5028  }
5029  else if (require_sleep)
5030  {
5031  /*
5032  * Acquire tuple lock to establish our priority for the tuple, or
5033  * die trying. LockTuple will release us when we are next-in-line
5034  * for the tuple. We must do this even if we are share-locking.
5035  *
5036  * If we are forced to "start over" below, we keep the tuple lock;
5037  * this arranges that we stay at the head of the line while
5038  * rechecking tuple state.
5039  */
5040  if (!heap_acquire_tuplock(relation, tid, mode, wait_policy,
5041  &have_tuple_lock))
5042  {
5043  /*
5044  * This can only happen if wait_policy is Skip and the lock
5045  * couldn't be obtained.
5046  */
5047  result = HeapTupleWouldBlock;
5048  /* recovery code expects to have buffer lock held */
5050  goto failed;
5051  }
5052 
5053  if (infomask & HEAP_XMAX_IS_MULTI)
5054  {
5056 
5057  /* We only ever lock tuples, never update them */
5058  if (status >= MultiXactStatusNoKeyUpdate)
5059  elog(ERROR, "invalid lock mode in heap_lock_tuple");
5060 
5061  /* wait for multixact to end, or die trying */
5062  switch (wait_policy)
5063  {
5064  case LockWaitBlock:
5065  MultiXactIdWait((MultiXactId) xwait, status, infomask,
5066  relation, &tuple->t_self, XLTW_Lock, NULL);
5067  break;
5068  case LockWaitSkip:
5070  status, infomask, relation,
5071  NULL))
5072  {
5073  result = HeapTupleWouldBlock;
5074  /* recovery code expects to have buffer lock held */
5076  goto failed;
5077  }
5078  break;
5079  case LockWaitError:
5081  status, infomask, relation,
5082  NULL))
5083  ereport(ERROR,
5084  (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
5085  errmsg("could not obtain lock on row in relation \"%s\"",
5086  RelationGetRelationName(relation))));
5087 
5088  break;
5089  }
5090 
5091  /*
5092  * Of course, the multixact might not be done here: if we're
5093  * requesting a light lock mode, other transactions with light
5094  * locks could still be alive, as well as locks owned by our
5095  * own xact or other subxacts of this backend. We need to
5096  * preserve the surviving MultiXact members. Note that it
5097  * isn't absolutely necessary in the latter case, but doing so
5098  * is simpler.
5099  */
5100  }
5101  else
5102  {
5103  /* wait for regular transaction to end, or die trying */
5104  switch (wait_policy)
5105  {
5106  case LockWaitBlock:
5107  XactLockTableWait(xwait, relation, &tuple->t_self,
5108  XLTW_Lock);
5109  break;
5110  case LockWaitSkip:
5111  if (!ConditionalXactLockTableWait(xwait))
5112  {
5113  result = HeapTupleWouldBlock;
5114  /* recovery code expects to have buffer lock held */
5116  goto failed;
5117  }
5118  break;
5119  case LockWaitError:
5120  if (!ConditionalXactLockTableWait(xwait))
5121  ereport(ERROR,
5122  (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
5123  errmsg("could not obtain lock on row in relation \"%s\"",
5124  RelationGetRelationName(relation))));
5125  break;
5126  }
5127  }
5128 
5129  /* if there are updates, follow the update chain */
5130  if (follow_updates && !HEAP_XMAX_IS_LOCKED_ONLY(infomask))
5131  {
5132  HTSU_Result res;
5133 
5134  res = heap_lock_updated_tuple(relation, tuple, &t_ctid,
5136  mode);
5137  if (res != HeapTupleMayBeUpdated)
5138  {
5139  result = res;
5140  /* recovery code expects to have buffer lock held */
5142  goto failed;
5143  }
5144  }
5145 
5147 
5148  /*
5149  * xwait is done, but if xwait had just locked the tuple then some
5150  * other xact could update this tuple before we get to this point.
5151  * Check for xmax change, and start over if so.
5152  */
5153  if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) ||
5155  xwait))
5156  goto l3;
5157 
5158  if (!(infomask & HEAP_XMAX_IS_MULTI))
5159  {
5160  /*
5161  * Otherwise check if it committed or aborted. Note we cannot
5162  * be here if the tuple was only locked by somebody who didn't
5163  * conflict with us; that would have been handled above. So
5164  * that transaction must necessarily be gone by now. But
5165  * don't check for this in the multixact case, because some
5166  * locker transactions might still be running.
5167  */
5168  UpdateXmaxHintBits(tuple->t_data, *buffer, xwait);
5169  }
5170  }
5171 
5172  /* By here, we're certain that we hold buffer exclusive lock again */
5173 
5174  /*
5175  * We may lock if previous xmax aborted, or if it committed but only
5176  * locked the tuple without updating it; or if we didn't have to wait
5177  * at all for whatever reason.
5178  */
5179  if (!require_sleep ||
5180  (tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
5183  result = HeapTupleMayBeUpdated;
5184  else
5185  result = HeapTupleUpdated;
5186  }
5187 
5188 failed:
5189  if (result != HeapTupleMayBeUpdated)
5190  {
5191  Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated ||
5192  result == HeapTupleWouldBlock);
5193  Assert(!(tuple->t_data->t_infomask & HEAP_XMAX_INVALID));
5194  hufd->ctid = tuple->t_data->t_ctid;
5195  hufd->xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
5196  if (result == HeapTupleSelfUpdated)
5197  hufd->cmax = HeapTupleHeaderGetCmax(tuple->t_data);
5198  else
5199  hufd->cmax = InvalidCommandId;
5200  goto out_locked;
5201  }
5202 
5203  /*
5204  * If we didn't pin the visibility map page and the page has become all
5205  * visible while we were busy locking the buffer, or during some
5206  * subsequent window during which we had it unlocked, we'll have to unlock
5207  * and re-lock, to avoid holding the buffer lock across I/O. That's a bit
5208  * unfortunate, especially since we'll now have to recheck whether the
5209  * tuple has been locked or updated under us, but hopefully it won't
5210  * happen very often.
5211  */
5212  if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))
5213  {
5214  LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
5215  visibilitymap_pin(relation, block, &vmbuffer);
5217  goto l3;
5218  }
5219 
5220  xmax = HeapTupleHeaderGetRawXmax(tuple->t_data);
5221  old_infomask = tuple->t_data->t_infomask;
5222 
5223  /*
5224  * If this is the first possibly-multixact-able operation in the current
5225  * transaction, set my per-backend OldestMemberMXactId setting. We can be
5226  * certain that the transaction will never become a member of any older
5227  * MultiXactIds than that. (We have to do this even if we end up just
5228  * using our own TransactionId below, since some other backend could
5229  * incorporate our XID into a MultiXact immediately afterwards.)
5230  */
5232 
5233  /*
5234  * Compute the new xmax and infomask to store into the tuple. Note we do
5235  * not modify the tuple just yet, because that would leave it in the wrong
5236  * state if multixact.c elogs.
5237  */
5238  compute_new_xmax_infomask(xmax, old_infomask, tuple->t_data->t_infomask2,
5239  GetCurrentTransactionId(), mode, false,
5240  &xid, &new_infomask, &new_infomask2);
5241 
5243 
5244  /*
5245  * Store transaction information of xact locking the tuple.
5246  *
5247  * Note: Cmax is meaningless in this context, so don't set it; this avoids
5248  * possibly generating a useless combo CID. Moreover, if we're locking a
5249  * previously updated tuple, it's important to preserve the Cmax.
5250  *
5251  * Also reset the HOT UPDATE bit, but only if there's no update; otherwise
5252  * we would break the HOT chain.
5253  */
5254  tuple->t_data->t_infomask &= ~HEAP_XMAX_BITS;
5255  tuple->t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;
5256  tuple->t_data->t_infomask |= new_infomask;
5257  tuple->t_data->t_infomask2 |= new_infomask2;
5258  if (HEAP_XMAX_IS_LOCKED_ONLY(new_infomask))
5260  HeapTupleHeaderSetXmax(tuple->t_data, xid);
5261 
5262  /*
5263  * Make sure there is no forward chain link in t_ctid. Note that in the
5264  * cases where the tuple has been updated, we must not overwrite t_ctid,
5265  * because it was set by the updater. Moreover, if the tuple has been
5266  * updated, we need to follow the update chain to lock the new versions of
5267  * the tuple as well.
5268  */
5269  if (HEAP_XMAX_IS_LOCKED_ONLY(new_infomask))
5270  tuple->t_data->t_ctid = *tid;
5271 
5272  /* Clear only the all-frozen bit on visibility map if needed */
5273  if (PageIsAllVisible(page) &&
5274  visibilitymap_clear(relation, block, vmbuffer,
5276  cleared_all_frozen = true;
5277 
5278 
5279  MarkBufferDirty(*buffer);
5280 
5281  /*
5282  * XLOG stuff. You might think that we don't need an XLOG record because
5283  * there is no state change worth restoring after a crash. You would be
5284  * wrong however: we have just written either a TransactionId or a
5285  * MultiXactId that may never have been seen on disk before, and we need
5286  * to make sure that there are XLOG entries covering those ID numbers.
5287  * Else the same IDs might be re-used after a crash, which would be
5288  * disastrous if this page made it to disk before the crash. Essentially
5289  * we have to enforce the WAL log-before-data rule even in this case.
5290  * (Also, in a PITR log-shipping or 2PC environment, we have to have XLOG
5291  * entries for everything anyway.)
5292  */
5293  if (RelationNeedsWAL(relation))
5294  {
5295  xl_heap_lock xlrec;
5296  XLogRecPtr recptr;
5297 
5298  XLogBeginInsert();
5299  XLogRegisterBuffer(0, *buffer, REGBUF_STANDARD);
5300 
5301  xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self);
5302  xlrec.locking_xid = xid;
5303  xlrec.infobits_set = compute_infobits(new_infomask,
5304  tuple->t_data->t_infomask2);
5305  xlrec.flags = cleared_all_frozen ? XLH_LOCK_ALL_FROZEN_CLEARED : 0;
5306  XLogRegisterData((char *) &xlrec, SizeOfHeapLock);
5307 
5308  /* we don't decode row locks atm, so no need to log the origin */
5309 
5310  recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK);
5311 
5312  PageSetLSN(page, recptr);
5313  }
5314 
5315  END_CRIT_SECTION();
5316 
5317  result = HeapTupleMayBeUpdated;
5318 
5319 out_locked:
5320  LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
5321 
5322 out_unlocked:
5323  if (BufferIsValid(vmbuffer))
5324  ReleaseBuffer(vmbuffer);
5325 
5326  /*
5327  * Don't update the visibility map here. Locking a tuple doesn't change
5328  * visibility info.
5329  */
5330 
5331  /*
5332  * Now that we have successfully marked the tuple as locked, we can
5333  * release the lmgr tuple lock, if we had it.
5334  */
5335  if (have_tuple_lock)
5336  UnlockTupleTuplock(relation, tid, mode);
5337 
5338  return result;
5339 }
5340 
5341 /*
5342  * Acquire heavyweight lock on the given tuple, in preparation for acquiring
5343  * its normal, Xmax-based tuple lock.
5344  *
5345  * have_tuple_lock is an input and output parameter: on input, it indicates
5346  * whether the lock has previously been acquired (and this function does
5347  * nothing in that case). If this function returns success, have_tuple_lock
5348  * has been flipped to true.
5349  *
5350  * Returns false if it was unable to obtain the lock; this can only happen if
5351  * wait_policy is Skip.
5352  */
5353 static bool
5355  LockWaitPolicy wait_policy, bool *have_tuple_lock)
5356 {
5357  if (*have_tuple_lock)
5358  return true;
5359 
5360  switch (wait_policy)
5361  {
5362  case LockWaitBlock:
5363  LockTupleTuplock(relation, tid, mode);
5364  break;
5365 
5366  case LockWaitSkip:
5367  if (!ConditionalLockTupleTuplock(relation, tid, mode))
5368  return false;
5369  break;
5370 
5371  case LockWaitError:
5372  if (!ConditionalLockTupleTuplock(relation, tid, mode))
5373  ereport(ERROR,
5374  (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
5375  errmsg("could not obtain lock on row in relation \"%s\"",
5376  RelationGetRelationName(relation))));
5377  break;
5378  }
5379  *have_tuple_lock = true;
5380 
5381  return true;
5382 }
5383 
5384 /*
5385  * Given an original set of Xmax and infomask, and a transaction (identified by
5386  * add_to_xmax) acquiring a new lock of some mode, compute the new Xmax and
5387  * corresponding infomasks to use on the tuple.
5388  *
5389  * Note that this might have side effects such as creating a new MultiXactId.
5390  *
5391  * Most callers will have called HeapTupleSatisfiesUpdate before this function;
5392  * that will have set the HEAP_XMAX_INVALID bit if the xmax was a MultiXactId
5393  * but it was not running anymore. There is a race condition, which is that the
5394  * MultiXactId may have finished since then, but that uncommon case is handled
5395  * either here, or within MultiXactIdExpand.
5396  *
5397  * There is a similar race condition possible when the old xmax was a regular
5398  * TransactionId. We test TransactionIdIsInProgress again just to narrow the
5399  * window, but it's still possible to end up creating an unnecessary
5400  * MultiXactId. Fortunately this is harmless.
5401  */
5402 static void
5404  uint16 old_infomask2, TransactionId add_to_xmax,
5405  LockTupleMode mode, bool is_update,
5406  TransactionId *result_xmax, uint16 *result_infomask,
5407  uint16 *result_infomask2)
5408 {
5409  TransactionId new_xmax;
5410  uint16 new_infomask,
5411  new_infomask2;
5412 
5414 
5415 l5:
5416  new_infomask = 0;
5417  new_infomask2 = 0;
5418  if (old_infomask & HEAP_XMAX_INVALID)
5419  {
5420  /*
5421  * No previous locker; we just insert our own TransactionId.
5422  *
5423  * Note that it's critical that this case be the first one checked,
5424  * because there are several blocks below that come back to this one
5425  * to implement certain optimizations; old_infomask might contain
5426  * other dirty bits in those cases, but we don't really care.
5427  */
5428  if (is_update)
5429  {
5430  new_xmax = add_to_xmax;
5431  if (mode == LockTupleExclusive)
5432  new_infomask2 |= HEAP_KEYS_UPDATED;
5433  }
5434  else
5435  {
5436  new_infomask |= HEAP_XMAX_LOCK_ONLY;
5437  switch (mode)
5438  {
5439  case LockTupleKeyShare:
5440  new_xmax = add_to_xmax;
5441  new_infomask |= HEAP_XMAX_KEYSHR_LOCK;
5442  break;
5443  case LockTupleShare:
5444  new_xmax = add_to_xmax;
5445  new_infomask |= HEAP_XMAX_SHR_LOCK;
5446  break;
5448  new_xmax = add_to_xmax;
5449  new_infomask |= HEAP_XMAX_EXCL_LOCK;
5450  break;
5451  case LockTupleExclusive:
5452  new_xmax = add_to_xmax;
5453  new_infomask |= HEAP_XMAX_EXCL_LOCK;
5454  new_infomask2 |= HEAP_KEYS_UPDATED;
5455  break;
5456  default:
5457  new_xmax = InvalidTransactionId; /* silence compiler */
5458  elog(ERROR, "invalid lock mode");
5459  }
5460  }
5461  }
5462  else if (old_infomask & HEAP_XMAX_IS_MULTI)
5463  {
5464  MultiXactStatus new_status;
5465 
5466  /*
5467  * Currently we don't allow XMAX_COMMITTED to be set for multis, so
5468  * cross-check.
5469  */
5470  Assert(!(old_infomask & HEAP_XMAX_COMMITTED));
5471 
5472  /*
5473  * A multixact together with LOCK_ONLY set but neither lock bit set
5474  * (i.e. a pg_upgraded share locked tuple) cannot possibly be running
5475  * anymore. This check is critical for databases upgraded by
5476  * pg_upgrade; both MultiXactIdIsRunning and MultiXactIdExpand assume
5477  * that such multis are never passed.
5478  */
5479  if (HEAP_LOCKED_UPGRADED(old_infomask))
5480  {
5481  old_infomask &= ~HEAP_XMAX_IS_MULTI;
5482  old_infomask |= HEAP_XMAX_INVALID;
5483  goto l5;
5484  }
5485 
5486  /*
5487  * If the XMAX is already a MultiXactId, then we need to expand it to
5488  * include add_to_xmax; but if all the members were lockers and are
5489  * all gone, we can do away with the IS_MULTI bit and just set
5490  * add_to_xmax as the only locker/updater. If all lockers are gone
5491  * and we have an updater that aborted, we can also do without a
5492  * multi.
5493  *
5494  * The cost of doing GetMultiXactIdMembers would be paid by
5495  * MultiXactIdExpand if we weren't to do this, so this check is not
5496  * incurring extra work anyhow.
5497  */
5498  if (!MultiXactIdIsRunning(xmax, HEAP_XMAX_IS_LOCKED_ONLY(old_infomask)))
5499  {
5500  if (HEAP_XMAX_IS_LOCKED_ONLY(old_infomask) ||
5502  old_infomask)))
5503  {
5504  /*
5505  * Reset these bits and restart; otherwise fall through to
5506  * create a new multi below.
5507  */
5508  old_infomask &= ~HEAP_XMAX_IS_MULTI;
5509  old_infomask |= HEAP_XMAX_INVALID;
5510  goto l5;
5511  }
5512  }
5513 
5514  new_status = get_mxact_status_for_lock(mode, is_update);
5515 
5516  new_xmax = MultiXactIdExpand((MultiXactId) xmax, add_to_xmax,
5517  new_status);
5518  GetMultiXactIdHintBits(new_xmax, &new_infomask, &new_infomask2);
5519  }
5520  else if (old_infomask & HEAP_XMAX_COMMITTED)
5521  {
5522  /*
5523  * It's a committed update, so we need to preserve him as updater of
5524  * the tuple.
5525  */
5527  MultiXactStatus new_status;
5528 
5529  if (old_infomask2 & HEAP_KEYS_UPDATED)
5530  status = MultiXactStatusUpdate;
5531  else
5532  status = MultiXactStatusNoKeyUpdate;
5533 
5534  new_status = get_mxact_status_for_lock(mode, is_update);
5535 
5536  /*
5537  * since it's not running, it's obviously impossible for the old
5538  * updater to be identical to the current one, so we need not check
5539  * for that case as we do in the block above.
5540  */
5541  new_xmax = MultiXactIdCreate(xmax, status, add_to_xmax, new_status);
5542  GetMultiXactIdHintBits(new_xmax, &new_infomask, &new_infomask2);
5543  }
5544  else if (TransactionIdIsInProgress(xmax))
5545  {
5546  /*
5547  * If the XMAX is a valid, in-progress TransactionId, then we need to
5548  * create a new MultiXactId that includes both the old locker or
5549  * updater and our own TransactionId.
5550  */
5551  MultiXactStatus new_status;
5552  MultiXactStatus old_status;
5553  LockTupleMode old_mode;
5554 
5555  if (HEAP_XMAX_IS_LOCKED_ONLY(old_infomask))
5556  {
5557  if (HEAP_XMAX_IS_KEYSHR_LOCKED(old_infomask))
5558  old_status = MultiXactStatusForKeyShare;
5559  else if (HEAP_XMAX_IS_SHR_LOCKED(old_infomask))
5560  old_status = MultiXactStatusForShare;
5561  else if (HEAP_XMAX_IS_EXCL_LOCKED(old_infomask))
5562  {
5563  if (old_infomask2 & HEAP_KEYS_UPDATED)
5564  old_status = MultiXactStatusForUpdate;
5565  else
5566  old_status = MultiXactStatusForNoKeyUpdate;
5567  }
5568  else
5569  {
5570  /*
5571  * LOCK_ONLY can be present alone only when a page has been
5572  * upgraded by pg_upgrade. But in that case,
5573  * TransactionIdIsInProgress() should have returned false. We
5574  * assume it's no longer locked in this case.
5575  */
5576  elog(WARNING, "LOCK_ONLY found for Xid in progress %u", xmax);
5577  old_infomask |= HEAP_XMAX_INVALID;
5578  old_infomask &= ~HEAP_XMAX_LOCK_ONLY;
5579  goto l5;
5580  }
5581  }
5582  else
5583  {
5584  /* it's an update, but which kind? */
5585  if (old_infomask2 & HEAP_KEYS_UPDATED)
5586  old_status = MultiXactStatusUpdate;
5587  else
5588  old_status = MultiXactStatusNoKeyUpdate;
5589  }
5590 
5591  old_mode = TUPLOCK_from_mxstatus(old_status);
5592 
5593  /*
5594  * If the lock to be acquired is for the same TransactionId as the
5595  * existing lock, there's an optimization possible: consider only the
5596  * strongest of both locks as the only one present, and restart.
5597  */
5598  if (xmax == add_to_xmax)
5599  {
5600  /*
5601  * Note that it's not possible for the original tuple to be
5602  * updated: we wouldn't be here because the tuple would have been
5603  * invisible and we wouldn't try to update it. As a subtlety,
5604  * this code can also run when traversing an update chain to lock
5605  * future versions of a tuple. But we wouldn't be here either,
5606  * because the add_to_xmax would be different from the original
5607  * updater.
5608  */
5609  Assert(HEAP_XMAX_IS_LOCKED_ONLY(old_infomask));
5610 
5611  /* acquire the strongest of both */
5612  if (mode < old_mode)
5613  mode = old_mode;
5614  /* mustn't touch is_update */
5615 
5616  old_infomask |= HEAP_XMAX_INVALID;
5617  goto l5;
5618  }
5619 
5620  /* otherwise, just fall back to creating a new multixact */
5621  new_status = get_mxact_status_for_lock(mode, is_update);
5622  new_xmax = MultiXactIdCreate(xmax, old_status,
5623  add_to_xmax, new_status);
5624  GetMultiXactIdHintBits(new_xmax, &new_infomask, &new_infomask2);
5625  }
5626  else if (!HEAP_XMAX_IS_LOCKED_ONLY(old_infomask) &&
5627  TransactionIdDidCommit(xmax))
5628  {
5629  /*
5630  * It's a committed update, so we gotta preserve him as updater of the
5631  * tuple.
5632  */
5634  MultiXactStatus new_status;
5635 
5636  if (old_infomask2 & HEAP_KEYS_UPDATED)
5637  status = MultiXactStatusUpdate;
5638  else
5639  status = MultiXactStatusNoKeyUpdate;
5640 
5641  new_status = get_mxact_status_for_lock(mode, is_update);
5642 
5643  /*
5644  * since it's not running, it's obviously impossible for the old
5645  * updater to be identical to the current one, so we need not check
5646  * for that case as we do in the block above.
5647  */
5648  new_xmax = MultiXactIdCreate(xmax, status, add_to_xmax, new_status);
5649  GetMultiXactIdHintBits(new_xmax, &new_infomask, &new_infomask2);
5650  }
5651  else
5652  {
5653  /*
5654  * Can get here iff the locking/updating transaction was running when
5655  * the infomask was extracted from the tuple, but finished before
5656  * TransactionIdIsInProgress got to run. Deal with it as if there was
5657  * no locker at all in the first place.
5658  */
5659  old_infomask |= HEAP_XMAX_INVALID;
5660  goto l5;
5661  }
5662 
5663  *result_infomask = new_infomask;
5664  *result_infomask2 = new_infomask2;
5665  *result_xmax = new_xmax;
5666 }
5667 
5668 /*
5669  * Subroutine for heap_lock_updated_tuple_rec.
5670  *
5671  * Given a hypothetical multixact status held by the transaction identified
5672  * with the given xid, does the current transaction need to wait, fail, or can
5673  * it continue if it wanted to acquire a lock of the given mode? "needwait"
5674  * is set to true if waiting is necessary; if it can continue, then
5675  * HeapTupleMayBeUpdated is returned. If the lock is already held by the
5676  * current transaction, return HeapTupleSelfUpdated. In case of a conflict
5677  * with another transaction, a different HeapTupleSatisfiesUpdate return code
5678  * is returned.
5679  *
5680  * The held status is said to be hypothetical because it might correspond to a
5681  * lock held by a single Xid, i.e. not a real MultiXactId; we express it this
5682  * way for simplicity of API.
5683  */
5684 static HTSU_Result
5686  LockTupleMode mode, bool *needwait)
5687 {
5688  MultiXactStatus wantedstatus;
5689 
5690  *needwait = false;
5691  wantedstatus = get_mxact_status_for_lock(mode, false);
5692 
5693  /*
5694  * Note: we *must* check TransactionIdIsInProgress before
5695  * TransactionIdDidAbort/Commit; see comment at top of tqual.c for an
5696  * explanation.
5697  */
5699  {
5700  /*
5701  * The tuple has already been locked by our own transaction. This is
5702  * very rare but can happen if multiple transactions are trying to
5703  * lock an ancient version of the same tuple.
5704  */
5705  return HeapTupleSelfUpdated;
5706  }
5707  else if (TransactionIdIsInProgress(xid))
5708  {
5709  /*
5710  * If the locking transaction is running, what we do depends on
5711  * whether the lock modes conflict: if they do, then we must wait for
5712  * it to finish; otherwise we can fall through to lock this tuple
5713  * version without waiting.
5714  */
5716  LOCKMODE_from_mxstatus(wantedstatus)))
5717  {
5718  *needwait = true;
5719  }
5720 
5721  /*
5722  * If we set needwait above, then this value doesn't matter;
5723  * otherwise, this value signals to caller that it's okay to proceed.
5724  */
5725  return HeapTupleMayBeUpdated;
5726  }
5727  else if (TransactionIdDidAbort(xid))
5728  return HeapTupleMayBeUpdated;
5729  else if (TransactionIdDidCommit(xid))
5730  {
5731  /*
5732  * The other transaction committed. If it was only a locker, then the
5733  * lock is completely gone now and we can return success; but if it
5734  * was an update, then what we do depends on whether the two lock
5735  * modes conflict. If they conflict, then we must report error to
5736  * caller. But if they don't, we can fall through to allow the current
5737  * transaction to lock the tuple.
5738  *
5739  * Note: the reason we worry about ISUPDATE here is because as soon as
5740  * a transaction ends, all its locks are gone and meaningless, and
5741  * thus we can ignore them; whereas its updates persist. In the
5742  * TransactionIdIsInProgress case, above, we don't need to check
5743  * because we know the lock is still "alive" and thus a conflict needs
5744  * always be checked.
5745  */
5746  if (!ISUPDATE_from_mxstatus(status))
5747  return HeapTupleMayBeUpdated;
5748 
5750  LOCKMODE_from_mxstatus(wantedstatus)))
5751  /* bummer */
5752  return HeapTupleUpdated;
5753 
5754  return HeapTupleMayBeUpdated;
5755  }
5756 
5757  /* Not in progress, not aborted, not committed -- must have crashed */
5758  return HeapTupleMayBeUpdated;
5759 }
5760 
5761 
5762 /*
5763  * Recursive part of heap_lock_updated_tuple
5764  *
5765  * Fetch the tuple pointed to by tid in rel, and mark it as locked by the given
5766  * xid with the given mode; if this tuple is updated, recurse to lock the new
5767  * version as well.
5768  */
5769 static HTSU_Result
5771  LockTupleMode mode)
5772 {
5773  HTSU_Result result;
5774  ItemPointerData tupid;
5775  HeapTupleData mytup;
5776  Buffer buf;
5777  uint16 new_infomask,
5778  new_infomask2,
5779  old_infomask,
5780  old_infomask2;
5781  TransactionId xmax,
5782  new_xmax;
5783  TransactionId priorXmax = InvalidTransactionId;
5784  bool cleared_all_frozen = false;
5785  bool pinned_desired_page;
5786  Buffer vmbuffer = InvalidBuffer;
5787  BlockNumber block;
5788 
5789  ItemPointerCopy(tid, &tupid);
5790 
5791  for (;;)
5792  {
5793  new_infomask = 0;
5794  new_xmax = InvalidTransactionId;
5795  block = ItemPointerGetBlockNumber(&tupid);
5796  ItemPointerCopy(&tupid, &(mytup.t_self));
5797 
5798  if (!heap_fetch(rel, SnapshotAny, &mytup, &buf, false, NULL))
5799  {
5800  /*
5801  * if we fail to find the updated version of the tuple, it's
5802  * because it was vacuumed/pruned away after its creator
5803  * transaction aborted. So behave as if we got to the end of the
5804  * chain, and there's no further tuple to lock: return success to
5805  * caller.
5806  */
5807  result = HeapTupleMayBeUpdated;
5808  goto out_unlocked;
5809  }
5810 
5811 l4:
5813 
5814  /*
5815  * Before locking the buffer, pin the visibility map page if it
5816  * appears to be necessary. Since we haven't got the lock yet,
5817  * someone else might be in the middle of changing this, so we'll need
5818  * to recheck after we have the lock.
5819  */
5820  if (PageIsAllVisible(BufferGetPage(buf)))
5821  {
5822  visibilitymap_pin(rel, block, &vmbuffer);
5823  pinned_desired_page = true;
5824  }
5825  else
5826  pinned_desired_page = false;
5827 
5829 
5830  /*
5831  * If we didn't pin the visibility map page and the page has become
5832  * all visible while we were busy locking the buffer, we'll have to
5833  * unlock and re-lock, to avoid holding the buffer lock across I/O.
5834  * That's a bit unfortunate, but hopefully shouldn't happen often.
5835  *
5836  * Note: in some paths through this function, we will reach here
5837  * holding a pin on a vm page that may or may not be the one matching
5838  * this page. If this page isn't all-visible, we won't use the vm
5839  * page, but we hold onto such a pin till the end of the function.
5840  */
5841  if (!pinned_desired_page && PageIsAllVisible(BufferGetPage(buf)))
5842  {
5844  visibilitymap_pin(rel, block, &vmbuffer);
5846  }
5847 
5848  /*
5849  * Check the tuple XMIN against prior XMAX, if any. If we reached the
5850  * end of the chain, we're done, so return success.
5851  */
5852  if (TransactionIdIsValid(priorXmax) &&
5854  priorXmax))
5855  {
5856  result = HeapTupleMayBeUpdated;
5857  goto out_locked;
5858  }
5859 
5860  /*
5861  * Also check Xmin: if this tuple was created by an aborted
5862  * (sub)transaction, then we already locked the last live one in the
5863  * chain, thus we're done, so return success.
5864  */
5866  {
5867  result = HeapTupleMayBeUpdated;
5868  goto out_locked;
5869  }
5870 
5871  old_infomask = mytup.t_data->t_infomask;
5872  old_infomask2 = mytup.t_data->t_infomask2;
5873  xmax = HeapTupleHeaderGetRawXmax(mytup.t_data);
5874 
5875  /*
5876  * If this tuple version has been updated or locked by some concurrent
5877  * transaction(s), what we do depends on whether our lock mode
5878  * conflicts with what those other transactions hold, and also on the
5879  * status of them.
5880  */
5881  if (!(old_infomask & HEAP_XMAX_INVALID))
5882  {
5883  TransactionId rawxmax;
5884  bool needwait;
5885 
5886  rawxmax = HeapTupleHeaderGetRawXmax(mytup.t_data);
5887  if (old_infomask & HEAP_XMAX_IS_MULTI)
5888  {
5889  int nmembers;
5890  int i;
5891  MultiXactMember *members;
5892 
5893  /*
5894  * We don't need a test for pg_upgrade'd tuples: this is only
5895  * applied to tuples after the first in an update chain. Said
5896  * first tuple in the chain may well be locked-in-9.2-and-
5897  * pg_upgraded, but that one was already locked by our caller,
5898  * not us; and any subsequent ones cannot be because our
5899  * caller must necessarily have obtained a snapshot later than
5900  * the pg_upgrade itself.
5901  */
5903 
5904  nmembers = GetMultiXactIdMembers(rawxmax, &members, false,
5905  HEAP_XMAX_IS_LOCKED_ONLY(old_infomask));
5906  for (i = 0; i < nmembers; i++)
5907  {
5908  result = test_lockmode_for_conflict(members[i].status,
5909  members[i].xid,
5910  mode, &needwait);
5911 
5912  /*
5913  * If the tuple was already locked by ourselves in a
5914  * previous iteration of this (say heap_lock_tuple was
5915  * forced to restart the locking loop because of a change
5916  * in xmax), then we hold the lock already on this tuple
5917  * version and we don't need to do anything; and this is
5918