PostgreSQL Source Code  git master
spgxlog.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * spgxlog.c
4  * WAL replay logic for SP-GiST
5  *
6  *
7  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  * src/backend/access/spgist/spgxlog.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/bufmask.h"
18 #include "access/spgist_private.h"
19 #include "access/spgxlog.h"
20 #include "access/xlogutils.h"
21 #include "storage/standby.h"
22 #include "utils/memutils.h"
23 
24 
25 static MemoryContext opCtx; /* working memory for operations */
26 
27 
28 /*
29  * Prepare a dummy SpGistState, with just the minimum info needed for replay.
30  *
31  * At present, all we need is enough info to support spgFormDeadTuple(),
32  * plus the isBuild flag.
33  */
34 static void
36 {
37  memset(state, 0, sizeof(*state));
38 
39  state->myXid = stateSrc.myXid;
40  state->isBuild = stateSrc.isBuild;
41  state->deadTupleStorage = palloc0(SGDTSIZE);
42 }
43 
44 /*
45  * Add a leaf tuple, or replace an existing placeholder tuple. This is used
46  * to replay SpGistPageAddNewItem() operations. If the offset points at an
47  * existing tuple, it had better be a placeholder tuple.
48  */
49 static void
50 addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
51 {
52  if (offset <= PageGetMaxOffsetNumber(page))
53  {
55  PageGetItemId(page, offset));
56 
57  if (dt->tupstate != SPGIST_PLACEHOLDER)
58  elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
59 
60  Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
61  SpGistPageGetOpaque(page)->nPlaceholder--;
62 
63  PageIndexTupleDelete(page, offset);
64  }
65 
66  Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
67 
68  if (PageAddItem(page, tuple, size, offset, false, false) != offset)
69  elog(ERROR, "failed to add item of size %u to SPGiST index page",
70  size);
71 }
72 
73 static void
75 {
76  XLogRecPtr lsn = record->EndRecPtr;
77  char *ptr = XLogRecGetData(record);
78  spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
79  char *leafTuple;
80  SpGistLeafTupleData leafTupleHdr;
81  Buffer buffer;
82  Page page;
84 
85  ptr += sizeof(spgxlogAddLeaf);
86  leafTuple = ptr;
87  /* the leaf tuple is unaligned, so make a copy to access its header */
88  memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
89 
90  /*
91  * In normal operation we would have both current and parent pages locked
92  * simultaneously; but in WAL replay it should be safe to update the leaf
93  * page before updating the parent.
94  */
95  if (xldata->newPage)
96  {
97  buffer = XLogInitBufferForRedo(record, 0);
98  SpGistInitBuffer(buffer,
99  SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
101  }
102  else
103  action = XLogReadBufferForRedo(record, 0, &buffer);
104 
105  if (action == BLK_NEEDS_REDO)
106  {
107  page = BufferGetPage(buffer);
108 
109  /* insert new tuple */
110  if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
111  {
112  /* normal cases, tuple was added by SpGistPageAddNewItem */
113  addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
114  xldata->offnumLeaf);
115 
116  /* update head tuple's chain link if needed */
117  if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
118  {
119  SpGistLeafTuple head;
120 
121  head = (SpGistLeafTuple) PageGetItem(page,
122  PageGetItemId(page, xldata->offnumHeadLeaf));
123  Assert(SGLT_GET_NEXTOFFSET(head) == SGLT_GET_NEXTOFFSET(&leafTupleHdr));
124  SGLT_SET_NEXTOFFSET(head, xldata->offnumLeaf);
125  }
126  }
127  else
128  {
129  /* replacing a DEAD tuple */
130  PageIndexTupleDelete(page, xldata->offnumLeaf);
131  if (PageAddItem(page,
132  (Item) leafTuple, leafTupleHdr.size,
133  xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
134  elog(ERROR, "failed to add item of size %u to SPGiST index page",
135  leafTupleHdr.size);
136  }
137 
138  PageSetLSN(page, lsn);
139  MarkBufferDirty(buffer);
140  }
141  if (BufferIsValid(buffer))
142  UnlockReleaseBuffer(buffer);
143 
144  /* update parent downlink if necessary */
145  if (xldata->offnumParent != InvalidOffsetNumber)
146  {
147  if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
148  {
149  SpGistInnerTuple tuple;
150  BlockNumber blknoLeaf;
151 
152  XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
153 
154  page = BufferGetPage(buffer);
155 
156  tuple = (SpGistInnerTuple) PageGetItem(page,
157  PageGetItemId(page, xldata->offnumParent));
158 
159  spgUpdateNodeLink(tuple, xldata->nodeI,
160  blknoLeaf, xldata->offnumLeaf);
161 
162  PageSetLSN(page, lsn);
163  MarkBufferDirty(buffer);
164  }
165  if (BufferIsValid(buffer))
166  UnlockReleaseBuffer(buffer);
167  }
168 }
169 
170 static void
172 {
173  XLogRecPtr lsn = record->EndRecPtr;
174  char *ptr = XLogRecGetData(record);
175  spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
177  OffsetNumber *toDelete;
178  OffsetNumber *toInsert;
179  int nInsert;
180  Buffer buffer;
181  Page page;
183  BlockNumber blknoDst;
184 
185  XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
186 
187  fillFakeState(&state, xldata->stateSrc);
188 
189  nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
190 
191  ptr += SizeOfSpgxlogMoveLeafs;
192  toDelete = (OffsetNumber *) ptr;
193  ptr += sizeof(OffsetNumber) * xldata->nMoves;
194  toInsert = (OffsetNumber *) ptr;
195  ptr += sizeof(OffsetNumber) * nInsert;
196 
197  /* now ptr points to the list of leaf tuples */
198 
199  /*
200  * In normal operation we would have all three pages (source, dest, and
201  * parent) locked simultaneously; but in WAL replay it should be safe to
202  * update them one at a time, as long as we do it in the right order.
203  */
204 
205  /* Insert tuples on the dest page (do first, so redirect is valid) */
206  if (xldata->newPage)
207  {
208  buffer = XLogInitBufferForRedo(record, 1);
209  SpGistInitBuffer(buffer,
210  SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
212  }
213  else
214  action = XLogReadBufferForRedo(record, 1, &buffer);
215 
216  if (action == BLK_NEEDS_REDO)
217  {
218  int i;
219 
220  page = BufferGetPage(buffer);
221 
222  for (i = 0; i < nInsert; i++)
223  {
224  char *leafTuple;
225  SpGistLeafTupleData leafTupleHdr;
226 
227  /*
228  * the tuples are not aligned, so must copy to access the size
229  * field.
230  */
231  leafTuple = ptr;
232  memcpy(&leafTupleHdr, leafTuple,
233  sizeof(SpGistLeafTupleData));
234 
235  addOrReplaceTuple(page, (Item) leafTuple,
236  leafTupleHdr.size, toInsert[i]);
237  ptr += leafTupleHdr.size;
238  }
239 
240  PageSetLSN(page, lsn);
241  MarkBufferDirty(buffer);
242  }
243  if (BufferIsValid(buffer))
244  UnlockReleaseBuffer(buffer);
245 
246  /* Delete tuples from the source page, inserting a redirection pointer */
247  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
248  {
249  page = BufferGetPage(buffer);
250 
251  spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
254  blknoDst,
255  toInsert[nInsert - 1]);
256 
257  PageSetLSN(page, lsn);
258  MarkBufferDirty(buffer);
259  }
260  if (BufferIsValid(buffer))
261  UnlockReleaseBuffer(buffer);
262 
263  /* And update the parent downlink */
264  if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
265  {
266  SpGistInnerTuple tuple;
267 
268  page = BufferGetPage(buffer);
269 
270  tuple = (SpGistInnerTuple) PageGetItem(page,
271  PageGetItemId(page, xldata->offnumParent));
272 
273  spgUpdateNodeLink(tuple, xldata->nodeI,
274  blknoDst, toInsert[nInsert - 1]);
275 
276  PageSetLSN(page, lsn);
277  MarkBufferDirty(buffer);
278  }
279  if (BufferIsValid(buffer))
280  UnlockReleaseBuffer(buffer);
281 }
282 
283 static void
285 {
286  XLogRecPtr lsn = record->EndRecPtr;
287  char *ptr = XLogRecGetData(record);
288  spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
289  char *innerTuple;
290  SpGistInnerTupleData innerTupleHdr;
292  Buffer buffer;
293  Page page;
295 
296  ptr += sizeof(spgxlogAddNode);
297  innerTuple = ptr;
298  /* the tuple is unaligned, so make a copy to access its header */
299  memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
300 
301  fillFakeState(&state, xldata->stateSrc);
302 
303  if (!XLogRecHasBlockRef(record, 1))
304  {
305  /* update in place */
306  Assert(xldata->parentBlk == -1);
307  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
308  {
309  page = BufferGetPage(buffer);
310 
311  PageIndexTupleDelete(page, xldata->offnum);
312  if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size,
313  xldata->offnum,
314  false, false) != xldata->offnum)
315  elog(ERROR, "failed to add item of size %u to SPGiST index page",
316  innerTupleHdr.size);
317 
318  PageSetLSN(page, lsn);
319  MarkBufferDirty(buffer);
320  }
321  if (BufferIsValid(buffer))
322  UnlockReleaseBuffer(buffer);
323  }
324  else
325  {
326  BlockNumber blkno;
327  BlockNumber blknoNew;
328 
329  XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
330  XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
331 
332  /*
333  * In normal operation we would have all three pages (source, dest,
334  * and parent) locked simultaneously; but in WAL replay it should be
335  * safe to update them one at a time, as long as we do it in the right
336  * order. We must insert the new tuple before replacing the old tuple
337  * with the redirect tuple.
338  */
339 
340  /* Install new tuple first so redirect is valid */
341  if (xldata->newPage)
342  {
343  /* AddNode is not used for nulls pages */
344  buffer = XLogInitBufferForRedo(record, 1);
345  SpGistInitBuffer(buffer, 0);
347  }
348  else
349  action = XLogReadBufferForRedo(record, 1, &buffer);
350  if (action == BLK_NEEDS_REDO)
351  {
352  page = BufferGetPage(buffer);
353 
354  addOrReplaceTuple(page, (Item) innerTuple,
355  innerTupleHdr.size, xldata->offnumNew);
356 
357  /*
358  * If parent is in this same page, update it now.
359  */
360  if (xldata->parentBlk == 1)
361  {
362  SpGistInnerTuple parentTuple;
363 
364  parentTuple = (SpGistInnerTuple) PageGetItem(page,
365  PageGetItemId(page, xldata->offnumParent));
366 
367  spgUpdateNodeLink(parentTuple, xldata->nodeI,
368  blknoNew, xldata->offnumNew);
369  }
370  PageSetLSN(page, lsn);
371  MarkBufferDirty(buffer);
372  }
373  if (BufferIsValid(buffer))
374  UnlockReleaseBuffer(buffer);
375 
376  /* Delete old tuple, replacing it with redirect or placeholder tuple */
377  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
378  {
379  SpGistDeadTuple dt;
380 
381  page = BufferGetPage(buffer);
382 
383  if (state.isBuild)
387  else
389  blknoNew,
390  xldata->offnumNew);
391 
392  PageIndexTupleDelete(page, xldata->offnum);
393  if (PageAddItem(page, (Item) dt, dt->size,
394  xldata->offnum,
395  false, false) != xldata->offnum)
396  elog(ERROR, "failed to add item of size %u to SPGiST index page",
397  dt->size);
398 
399  if (state.isBuild)
400  SpGistPageGetOpaque(page)->nPlaceholder++;
401  else
402  SpGistPageGetOpaque(page)->nRedirection++;
403 
404  /*
405  * If parent is in this same page, update it now.
406  */
407  if (xldata->parentBlk == 0)
408  {
409  SpGistInnerTuple parentTuple;
410 
411  parentTuple = (SpGistInnerTuple) PageGetItem(page,
412  PageGetItemId(page, xldata->offnumParent));
413 
414  spgUpdateNodeLink(parentTuple, xldata->nodeI,
415  blknoNew, xldata->offnumNew);
416  }
417  PageSetLSN(page, lsn);
418  MarkBufferDirty(buffer);
419  }
420  if (BufferIsValid(buffer))
421  UnlockReleaseBuffer(buffer);
422 
423  /*
424  * Update parent downlink (if we didn't do it as part of the source or
425  * destination page update already).
426  */
427  if (xldata->parentBlk == 2)
428  {
429  if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
430  {
431  SpGistInnerTuple parentTuple;
432 
433  page = BufferGetPage(buffer);
434 
435  parentTuple = (SpGistInnerTuple) PageGetItem(page,
436  PageGetItemId(page, xldata->offnumParent));
437 
438  spgUpdateNodeLink(parentTuple, xldata->nodeI,
439  blknoNew, xldata->offnumNew);
440 
441  PageSetLSN(page, lsn);
442  MarkBufferDirty(buffer);
443  }
444  if (BufferIsValid(buffer))
445  UnlockReleaseBuffer(buffer);
446  }
447  }
448 }
449 
450 static void
452 {
453  XLogRecPtr lsn = record->EndRecPtr;
454  char *ptr = XLogRecGetData(record);
455  spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
456  char *prefixTuple;
457  SpGistInnerTupleData prefixTupleHdr;
458  char *postfixTuple;
459  SpGistInnerTupleData postfixTupleHdr;
460  Buffer buffer;
461  Page page;
463 
464  ptr += sizeof(spgxlogSplitTuple);
465  prefixTuple = ptr;
466  /* the prefix tuple is unaligned, so make a copy to access its header */
467  memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
468  ptr += prefixTupleHdr.size;
469  postfixTuple = ptr;
470  /* postfix tuple is also unaligned */
471  memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
472 
473  /*
474  * In normal operation we would have both pages locked simultaneously; but
475  * in WAL replay it should be safe to update them one at a time, as long
476  * as we do it in the right order.
477  */
478 
479  /* insert postfix tuple first to avoid dangling link */
480  if (!xldata->postfixBlkSame)
481  {
482  if (xldata->newPage)
483  {
484  buffer = XLogInitBufferForRedo(record, 1);
485  /* SplitTuple is not used for nulls pages */
486  SpGistInitBuffer(buffer, 0);
488  }
489  else
490  action = XLogReadBufferForRedo(record, 1, &buffer);
491  if (action == BLK_NEEDS_REDO)
492  {
493  page = BufferGetPage(buffer);
494 
495  addOrReplaceTuple(page, (Item) postfixTuple,
496  postfixTupleHdr.size, xldata->offnumPostfix);
497 
498  PageSetLSN(page, lsn);
499  MarkBufferDirty(buffer);
500  }
501  if (BufferIsValid(buffer))
502  UnlockReleaseBuffer(buffer);
503  }
504 
505  /* now handle the original page */
506  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
507  {
508  page = BufferGetPage(buffer);
509 
510  PageIndexTupleDelete(page, xldata->offnumPrefix);
511  if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size,
512  xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
513  elog(ERROR, "failed to add item of size %u to SPGiST index page",
514  prefixTupleHdr.size);
515 
516  if (xldata->postfixBlkSame)
517  addOrReplaceTuple(page, (Item) postfixTuple,
518  postfixTupleHdr.size,
519  xldata->offnumPostfix);
520 
521  PageSetLSN(page, lsn);
522  MarkBufferDirty(buffer);
523  }
524  if (BufferIsValid(buffer))
525  UnlockReleaseBuffer(buffer);
526 }
527 
528 static void
530 {
531  XLogRecPtr lsn = record->EndRecPtr;
532  char *ptr = XLogRecGetData(record);
533  spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
534  char *innerTuple;
535  SpGistInnerTupleData innerTupleHdr;
537  OffsetNumber *toDelete;
538  OffsetNumber *toInsert;
539  uint8 *leafPageSelect;
540  Buffer srcBuffer;
541  Buffer destBuffer;
542  Buffer innerBuffer;
543  Page srcPage;
544  Page destPage;
545  Page page;
546  int i;
547  BlockNumber blknoInner;
549 
550  XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
551 
552  fillFakeState(&state, xldata->stateSrc);
553 
554  ptr += SizeOfSpgxlogPickSplit;
555  toDelete = (OffsetNumber *) ptr;
556  ptr += sizeof(OffsetNumber) * xldata->nDelete;
557  toInsert = (OffsetNumber *) ptr;
558  ptr += sizeof(OffsetNumber) * xldata->nInsert;
559  leafPageSelect = (uint8 *) ptr;
560  ptr += sizeof(uint8) * xldata->nInsert;
561 
562  innerTuple = ptr;
563  /* the inner tuple is unaligned, so make a copy to access its header */
564  memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
565  ptr += innerTupleHdr.size;
566 
567  /* now ptr points to the list of leaf tuples */
568 
569  if (xldata->isRootSplit)
570  {
571  /* when splitting root, we touch it only in the guise of new inner */
572  srcBuffer = InvalidBuffer;
573  srcPage = NULL;
574  }
575  else if (xldata->initSrc)
576  {
577  /* just re-init the source page */
578  srcBuffer = XLogInitBufferForRedo(record, 0);
579  srcPage = (Page) BufferGetPage(srcBuffer);
580 
581  SpGistInitBuffer(srcBuffer,
582  SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
583  /* don't update LSN etc till we're done with it */
584  }
585  else
586  {
587  /*
588  * Delete the specified tuples from source page. (In case we're in
589  * Hot Standby, we need to hold lock on the page till we're done
590  * inserting leaf tuples and the new inner tuple, else the added
591  * redirect tuple will be a dangling link.)
592  */
593  srcPage = NULL;
594  if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
595  {
596  srcPage = BufferGetPage(srcBuffer);
597 
598  /*
599  * We have it a bit easier here than in doPickSplit(), because we
600  * know the inner tuple's location already, so we can inject the
601  * correct redirection tuple now.
602  */
603  if (!state.isBuild)
604  spgPageIndexMultiDelete(&state, srcPage,
605  toDelete, xldata->nDelete,
608  blknoInner,
609  xldata->offnumInner);
610  else
611  spgPageIndexMultiDelete(&state, srcPage,
612  toDelete, xldata->nDelete,
617 
618  /* don't update LSN etc till we're done with it */
619  }
620  }
621 
622  /* try to access dest page if any */
623  if (!XLogRecHasBlockRef(record, 1))
624  {
625  destBuffer = InvalidBuffer;
626  destPage = NULL;
627  }
628  else if (xldata->initDest)
629  {
630  /* just re-init the dest page */
631  destBuffer = XLogInitBufferForRedo(record, 1);
632  destPage = (Page) BufferGetPage(destBuffer);
633 
634  SpGistInitBuffer(destBuffer,
635  SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
636  /* don't update LSN etc till we're done with it */
637  }
638  else
639  {
640  /*
641  * We could probably release the page lock immediately in the
642  * full-page-image case, but for safety let's hold it till later.
643  */
644  if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
645  destPage = (Page) BufferGetPage(destBuffer);
646  else
647  destPage = NULL; /* don't do any page updates */
648  }
649 
650  /* restore leaf tuples to src and/or dest page */
651  for (i = 0; i < xldata->nInsert; i++)
652  {
653  char *leafTuple;
654  SpGistLeafTupleData leafTupleHdr;
655 
656  /* the tuples are not aligned, so must copy to access the size field. */
657  leafTuple = ptr;
658  memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
659  ptr += leafTupleHdr.size;
660 
661  page = leafPageSelect[i] ? destPage : srcPage;
662  if (page == NULL)
663  continue; /* no need to touch this page */
664 
665  addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
666  toInsert[i]);
667  }
668 
669  /* Now update src and dest page LSNs if needed */
670  if (srcPage != NULL)
671  {
672  PageSetLSN(srcPage, lsn);
673  MarkBufferDirty(srcBuffer);
674  }
675  if (destPage != NULL)
676  {
677  PageSetLSN(destPage, lsn);
678  MarkBufferDirty(destBuffer);
679  }
680 
681  /* restore new inner tuple */
682  if (xldata->initInner)
683  {
684  innerBuffer = XLogInitBufferForRedo(record, 2);
685  SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
687  }
688  else
689  action = XLogReadBufferForRedo(record, 2, &innerBuffer);
690 
691  if (action == BLK_NEEDS_REDO)
692  {
693  page = BufferGetPage(innerBuffer);
694 
695  addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size,
696  xldata->offnumInner);
697 
698  /* if inner is also parent, update link while we're here */
699  if (xldata->innerIsParent)
700  {
701  SpGistInnerTuple parent;
702 
703  parent = (SpGistInnerTuple) PageGetItem(page,
704  PageGetItemId(page, xldata->offnumParent));
705  spgUpdateNodeLink(parent, xldata->nodeI,
706  blknoInner, xldata->offnumInner);
707  }
708 
709  PageSetLSN(page, lsn);
710  MarkBufferDirty(innerBuffer);
711  }
712  if (BufferIsValid(innerBuffer))
713  UnlockReleaseBuffer(innerBuffer);
714 
715  /*
716  * Now we can release the leaf-page locks. It's okay to do this before
717  * updating the parent downlink.
718  */
719  if (BufferIsValid(srcBuffer))
720  UnlockReleaseBuffer(srcBuffer);
721  if (BufferIsValid(destBuffer))
722  UnlockReleaseBuffer(destBuffer);
723 
724  /* update parent downlink, unless we did it above */
725  if (XLogRecHasBlockRef(record, 3))
726  {
727  Buffer parentBuffer;
728 
729  if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
730  {
731  SpGistInnerTuple parent;
732 
733  page = BufferGetPage(parentBuffer);
734 
735  parent = (SpGistInnerTuple) PageGetItem(page,
736  PageGetItemId(page, xldata->offnumParent));
737  spgUpdateNodeLink(parent, xldata->nodeI,
738  blknoInner, xldata->offnumInner);
739 
740  PageSetLSN(page, lsn);
741  MarkBufferDirty(parentBuffer);
742  }
743  if (BufferIsValid(parentBuffer))
744  UnlockReleaseBuffer(parentBuffer);
745  }
746  else
747  Assert(xldata->innerIsParent || xldata->isRootSplit);
748 }
749 
750 static void
752 {
753  XLogRecPtr lsn = record->EndRecPtr;
754  char *ptr = XLogRecGetData(record);
755  spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
756  OffsetNumber *toDead;
757  OffsetNumber *toPlaceholder;
758  OffsetNumber *moveSrc;
759  OffsetNumber *moveDest;
760  OffsetNumber *chainSrc;
761  OffsetNumber *chainDest;
763  Buffer buffer;
764  Page page;
765  int i;
766 
767  fillFakeState(&state, xldata->stateSrc);
768 
770  toDead = (OffsetNumber *) ptr;
771  ptr += sizeof(OffsetNumber) * xldata->nDead;
772  toPlaceholder = (OffsetNumber *) ptr;
773  ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
774  moveSrc = (OffsetNumber *) ptr;
775  ptr += sizeof(OffsetNumber) * xldata->nMove;
776  moveDest = (OffsetNumber *) ptr;
777  ptr += sizeof(OffsetNumber) * xldata->nMove;
778  chainSrc = (OffsetNumber *) ptr;
779  ptr += sizeof(OffsetNumber) * xldata->nChain;
780  chainDest = (OffsetNumber *) ptr;
781 
782  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
783  {
784  page = BufferGetPage(buffer);
785 
787  toDead, xldata->nDead,
791 
793  toPlaceholder, xldata->nPlaceholder,
797 
798  /* see comments in vacuumLeafPage() */
799  for (i = 0; i < xldata->nMove; i++)
800  {
801  ItemId idSrc = PageGetItemId(page, moveSrc[i]);
802  ItemId idDest = PageGetItemId(page, moveDest[i]);
803  ItemIdData tmp;
804 
805  tmp = *idSrc;
806  *idSrc = *idDest;
807  *idDest = tmp;
808  }
809 
811  moveSrc, xldata->nMove,
815 
816  for (i = 0; i < xldata->nChain; i++)
817  {
818  SpGistLeafTuple lt;
819 
820  lt = (SpGistLeafTuple) PageGetItem(page,
821  PageGetItemId(page, chainSrc[i]));
822  Assert(lt->tupstate == SPGIST_LIVE);
823  SGLT_SET_NEXTOFFSET(lt, chainDest[i]);
824  }
825 
826  PageSetLSN(page, lsn);
827  MarkBufferDirty(buffer);
828  }
829  if (BufferIsValid(buffer))
830  UnlockReleaseBuffer(buffer);
831 }
832 
833 static void
835 {
836  XLogRecPtr lsn = record->EndRecPtr;
837  char *ptr = XLogRecGetData(record);
838  spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
839  OffsetNumber *toDelete;
840  Buffer buffer;
841  Page page;
842 
843  toDelete = xldata->offsets;
844 
845  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
846  {
847  page = BufferGetPage(buffer);
848 
849  /* The tuple numbers are in order */
850  PageIndexMultiDelete(page, toDelete, xldata->nDelete);
851 
852  PageSetLSN(page, lsn);
853  MarkBufferDirty(buffer);
854  }
855  if (BufferIsValid(buffer))
856  UnlockReleaseBuffer(buffer);
857 }
858 
859 static void
861 {
862  XLogRecPtr lsn = record->EndRecPtr;
863  char *ptr = XLogRecGetData(record);
865  OffsetNumber *itemToPlaceholder;
866  Buffer buffer;
867 
868  itemToPlaceholder = xldata->offsets;
869 
870  /*
871  * If any redirection tuples are being removed, make sure there are no
872  * live Hot Standby transactions that might need to see them.
873  */
874  if (InHotStandby)
875  {
876  RelFileLocator locator;
877 
878  XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
880  xldata->isCatalogRel,
881  locator);
882  }
883 
884  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
885  {
886  Page page = BufferGetPage(buffer);
887  SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
888  int i;
889 
890  /* Convert redirect pointers to plain placeholders */
891  for (i = 0; i < xldata->nToPlaceholder; i++)
892  {
893  SpGistDeadTuple dt;
894 
895  dt = (SpGistDeadTuple) PageGetItem(page,
896  PageGetItemId(page, itemToPlaceholder[i]));
900  }
901 
902  Assert(opaque->nRedirection >= xldata->nToPlaceholder);
903  opaque->nRedirection -= xldata->nToPlaceholder;
904  opaque->nPlaceholder += xldata->nToPlaceholder;
905 
906  /* Remove placeholder tuples at end of page */
907  if (xldata->firstPlaceholder != InvalidOffsetNumber)
908  {
909  int max = PageGetMaxOffsetNumber(page);
910  OffsetNumber *toDelete;
911 
912  toDelete = palloc(sizeof(OffsetNumber) * max);
913 
914  for (i = xldata->firstPlaceholder; i <= max; i++)
915  toDelete[i - xldata->firstPlaceholder] = i;
916 
917  i = max - xldata->firstPlaceholder + 1;
918  Assert(opaque->nPlaceholder >= i);
919  opaque->nPlaceholder -= i;
920 
921  /* The array is sorted, so can use PageIndexMultiDelete */
922  PageIndexMultiDelete(page, toDelete, i);
923 
924  pfree(toDelete);
925  }
926 
927  PageSetLSN(page, lsn);
928  MarkBufferDirty(buffer);
929  }
930  if (BufferIsValid(buffer))
931  UnlockReleaseBuffer(buffer);
932 }
933 
934 void
936 {
937  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
938  MemoryContext oldCxt;
939 
940  oldCxt = MemoryContextSwitchTo(opCtx);
941  switch (info)
942  {
944  spgRedoAddLeaf(record);
945  break;
947  spgRedoMoveLeafs(record);
948  break;
950  spgRedoAddNode(record);
951  break;
953  spgRedoSplitTuple(record);
954  break;
956  spgRedoPickSplit(record);
957  break;
959  spgRedoVacuumLeaf(record);
960  break;
962  spgRedoVacuumRoot(record);
963  break;
965  spgRedoVacuumRedirect(record);
966  break;
967  default:
968  elog(PANIC, "spg_redo: unknown op code %u", info);
969  }
970 
971  MemoryContextSwitchTo(oldCxt);
973 }
974 
975 void
977 {
979  "SP-GiST temporary context",
981 }
982 
983 void
985 {
987  opCtx = NULL;
988 }
989 
990 /*
991  * Mask a SpGist page before performing consistency checks on it.
992  */
993 void
994 spg_mask(char *pagedata, BlockNumber blkno)
995 {
996  Page page = (Page) pagedata;
997  PageHeader pagehdr = (PageHeader) page;
998 
1000 
1001  mask_page_hint_bits(page);
1002 
1003  /*
1004  * Mask the unused space, but only if the page's pd_lower appears to have
1005  * been set correctly.
1006  */
1007  if (pagehdr->pd_lower >= SizeOfPageHeaderData)
1008  mask_unused_space(page);
1009 }
uint32 BlockNumber
Definition: block.h:31
#define InvalidBlockNumber
Definition: block.h:33
int Buffer
Definition: buf.h:23
#define InvalidBuffer
Definition: buf.h:25
void mask_page_lsn_and_checksum(Page page)
Definition: bufmask.c:31
void mask_unused_space(Page page)
Definition: bufmask.c:71
void mask_page_hint_bits(Page page)
Definition: bufmask.c:46
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4577
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:2189
static Page BufferGetPage(Buffer buffer)
Definition: bufmgr.h:350
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:301
void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
Definition: bufpage.c:1161
void PageIndexTupleDelete(Page page, OffsetNumber offnum)
Definition: bufpage.c:1052
PageHeaderData * PageHeader
Definition: bufpage.h:170
Pointer Page
Definition: bufpage.h:78
static Item PageGetItem(Page page, ItemId itemId)
Definition: bufpage.h:351
#define SizeOfPageHeaderData
Definition: bufpage.h:213
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
Definition: bufpage.h:240
static void PageSetLSN(Page page, XLogRecPtr lsn)
Definition: bufpage.h:388
static OffsetNumber PageGetMaxOffsetNumber(Page page)
Definition: bufpage.h:369
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:468
unsigned char uint8
Definition: c.h:491
#define PANIC
Definition: elog.h:42
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
int i
Definition: isn.c:73
Pointer Item
Definition: item.h:17
static void ItemPointerSetInvalid(ItemPointerData *pointer)
Definition: itemptr.h:184
Assert(fmt[strlen(fmt) - 1] !='\n')
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:371
void pfree(void *pointer)
Definition: mcxt.c:1508
void * palloc0(Size size)
Definition: mcxt.c:1334
MemoryContext CurrentMemoryContext
Definition: mcxt.c:131
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:442
void * palloc(Size size)
Definition: mcxt.c:1304
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define InvalidOffsetNumber
Definition: off.h:26
uint16 OffsetNumber
Definition: off.h:24
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
static pg_noinline void Size size
Definition: slab.c:607
void spgPageIndexMultiDelete(SpGistState *state, Page page, OffsetNumber *itemnos, int nitems, int firststate, int reststate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:131
void spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN, BlockNumber blkno, OffsetNumber offset)
Definition: spgdoinsert.c:52
SpGistDeadTupleData * SpGistDeadTuple
#define SPGIST_REDIRECT
SpGistInnerTupleData * SpGistInnerTuple
#define SPGIST_LIVE
#define SGDTSIZE
#define SGLT_GET_NEXTOFFSET(spgLeafTuple)
#define SPGIST_PLACEHOLDER
#define SPGIST_DEAD
#define SGLT_SET_NEXTOFFSET(spgLeafTuple, offsetNumber)
struct SpGistLeafTupleData * SpGistLeafTuple
#define SPGIST_NULLS
#define SpGistPageGetOpaque(page)
#define SPGIST_LEAF
SpGistDeadTuple spgFormDeadTuple(SpGistState *state, int tupstate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgutils.c:1066
void SpGistInitBuffer(Buffer b, uint16 f)
Definition: spgutils.c:703
static void addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
Definition: spgxlog.c:50
void spg_redo(XLogReaderState *record)
Definition: spgxlog.c:935
static void spgRedoVacuumRoot(XLogReaderState *record)
Definition: spgxlog.c:834
static void spgRedoSplitTuple(XLogReaderState *record)
Definition: spgxlog.c:451
static void spgRedoVacuumRedirect(XLogReaderState *record)
Definition: spgxlog.c:860
void spg_xlog_cleanup(void)
Definition: spgxlog.c:984
void spg_mask(char *pagedata, BlockNumber blkno)
Definition: spgxlog.c:994
static void spgRedoMoveLeafs(XLogReaderState *record)
Definition: spgxlog.c:171
static void fillFakeState(SpGistState *state, spgxlogState stateSrc)
Definition: spgxlog.c:35
void spg_xlog_startup(void)
Definition: spgxlog.c:976
static void spgRedoAddNode(XLogReaderState *record)
Definition: spgxlog.c:284
static MemoryContext opCtx
Definition: spgxlog.c:25
static void spgRedoVacuumLeaf(XLogReaderState *record)
Definition: spgxlog.c:751
static void spgRedoPickSplit(XLogReaderState *record)
Definition: spgxlog.c:529
static void spgRedoAddLeaf(XLogReaderState *record)
Definition: spgxlog.c:74
struct spgxlogSplitTuple spgxlogSplitTuple
#define XLOG_SPGIST_SPLIT_TUPLE
Definition: spgxlog.h:25
#define SizeOfSpgxlogPickSplit
Definition: spgxlog.h:199
#define XLOG_SPGIST_VACUUM_ROOT
Definition: spgxlog.h:28
struct spgxlogAddLeaf spgxlogAddLeaf
#define SizeOfSpgxlogVacuumLeaf
Definition: spgxlog.h:223
#define SizeOfSpgxlogMoveLeafs
Definition: spgxlog.h:91
#define XLOG_SPGIST_VACUUM_LEAF
Definition: spgxlog.h:27
#define XLOG_SPGIST_ADD_NODE
Definition: spgxlog.h:24
#define XLOG_SPGIST_ADD_LEAF
Definition: spgxlog.h:22
struct spgxlogAddNode spgxlogAddNode
#define XLOG_SPGIST_MOVE_LEAFS
Definition: spgxlog.h:23
#define XLOG_SPGIST_PICKSPLIT
Definition: spgxlog.h:26
#define XLOG_SPGIST_VACUUM_REDIRECT
Definition: spgxlog.h:29
void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, bool isCatalogRel, RelFileLocator locator)
Definition: standby.c:467
LocationIndex pd_lower
Definition: bufpage.h:162
unsigned int tupstate
ItemPointerData pointer
unsigned int tupstate
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
uint16 nodeI
Definition: spgxlog.h:54
bool newPage
Definition: spgxlog.h:48
OffsetNumber offnumLeaf
Definition: spgxlog.h:50
bool storesNulls
Definition: spgxlog.h:49
OffsetNumber offnumHeadLeaf
Definition: spgxlog.h:51
OffsetNumber offnumParent
Definition: spgxlog.h:53
OffsetNumber offnumNew
Definition: spgxlog.h:111
bool newPage
Definition: spgxlog.h:112
OffsetNumber offnumParent
Definition: spgxlog.h:126
OffsetNumber offnum
Definition: spgxlog.h:105
spgxlogState stateSrc
Definition: spgxlog.h:130
uint16 nodeI
Definition: spgxlog.h:128
int8 parentBlk
Definition: spgxlog.h:125
bool replaceDead
Definition: spgxlog.h:68
bool storesNulls
Definition: spgxlog.h:69
uint16 nMoves
Definition: spgxlog.h:66
uint16 nodeI
Definition: spgxlog.h:73
OffsetNumber offnumParent
Definition: spgxlog.h:72
spgxlogState stateSrc
Definition: spgxlog.h:75
uint16 nInsert
Definition: spgxlog.h:170
spgxlogState stateSrc
Definition: spgxlog.h:185
bool innerIsParent
Definition: spgxlog.h:181
uint16 nodeI
Definition: spgxlog.h:183
bool storesNulls
Definition: spgxlog.h:178
OffsetNumber offnumParent
Definition: spgxlog.h:182
OffsetNumber offnumInner
Definition: spgxlog.h:175
uint16 nDelete
Definition: spgxlog.h:169
bool isRootSplit
Definition: spgxlog.h:167
OffsetNumber offnumPostfix
Definition: spgxlog.h:147
OffsetNumber offnumPrefix
Definition: spgxlog.h:144
bool postfixBlkSame
Definition: spgxlog.h:149
TransactionId myXid
Definition: spgxlog.h:38
bool isBuild
Definition: spgxlog.h:39
spgxlogState stateSrc
Definition: spgxlog.h:208
uint16 nPlaceholder
Definition: spgxlog.h:204
OffsetNumber firstPlaceholder
Definition: spgxlog.h:241
TransactionId snapshotConflictHorizon
Definition: spgxlog.h:242
OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER]
Definition: spgxlog.h:247
uint16 nDelete
Definition: spgxlog.h:228
OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER]
Definition: spgxlog.h:233
Definition: regguts.h:323
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)
Definition: xlogreader.c:1971
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:410
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415
#define XLogRecHasBlockRef(decoder, block_id)
Definition: xlogreader.h:420
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
XLogRedoAction XLogReadBufferForRedo(XLogReaderState *record, uint8 block_id, Buffer *buf)
Definition: xlogutils.c:314
Buffer XLogInitBufferForRedo(XLogReaderState *record, uint8 block_id)
Definition: xlogutils.c:326
#define InHotStandby
Definition: xlogutils.h:57
XLogRedoAction
Definition: xlogutils.h:70
@ BLK_NEEDS_REDO
Definition: xlogutils.h:71