PostgreSQL Source Code  git master
spgxlog.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * spgxlog.c
4  * WAL replay logic for SP-GiST
5  *
6  *
7  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  * src/backend/access/spgist/spgxlog.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/bufmask.h"
18 #include "access/spgist_private.h"
19 #include "access/spgxlog.h"
20 #include "access/transam.h"
21 #include "access/xlog.h"
22 #include "access/xlogutils.h"
23 #include "storage/standby.h"
24 #include "utils/memutils.h"
25 
26 
27 static MemoryContext opCtx; /* working memory for operations */
28 
29 
30 /*
31  * Prepare a dummy SpGistState, with just the minimum info needed for replay.
32  *
33  * At present, all we need is enough info to support spgFormDeadTuple(),
34  * plus the isBuild flag.
35  */
36 static void
38 {
39  memset(state, 0, sizeof(*state));
40 
41  state->myXid = stateSrc.myXid;
42  state->isBuild = stateSrc.isBuild;
43  state->deadTupleStorage = palloc0(SGDTSIZE);
44 }
45 
46 /*
47  * Add a leaf tuple, or replace an existing placeholder tuple. This is used
48  * to replay SpGistPageAddNewItem() operations. If the offset points at an
49  * existing tuple, it had better be a placeholder tuple.
50  */
51 static void
52 addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
53 {
54  if (offset <= PageGetMaxOffsetNumber(page))
55  {
57  PageGetItemId(page, offset));
58 
59  if (dt->tupstate != SPGIST_PLACEHOLDER)
60  elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
61 
62  Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
63  SpGistPageGetOpaque(page)->nPlaceholder--;
64 
65  PageIndexTupleDelete(page, offset);
66  }
67 
68  Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
69 
70  if (PageAddItem(page, tuple, size, offset, false, false) != offset)
71  elog(ERROR, "failed to add item of size %u to SPGiST index page",
72  size);
73 }
74 
75 static void
77 {
78  XLogRecPtr lsn = record->EndRecPtr;
79  char *ptr = XLogRecGetData(record);
80  spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
81  char *leafTuple;
82  SpGistLeafTupleData leafTupleHdr;
83  Buffer buffer;
84  Page page;
86 
87  ptr += sizeof(spgxlogAddLeaf);
88  leafTuple = ptr;
89  /* the leaf tuple is unaligned, so make a copy to access its header */
90  memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
91 
92  /*
93  * In normal operation we would have both current and parent pages locked
94  * simultaneously; but in WAL replay it should be safe to update the leaf
95  * page before updating the parent.
96  */
97  if (xldata->newPage)
98  {
99  buffer = XLogInitBufferForRedo(record, 0);
100  SpGistInitBuffer(buffer,
101  SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
103  }
104  else
105  action = XLogReadBufferForRedo(record, 0, &buffer);
106 
107  if (action == BLK_NEEDS_REDO)
108  {
109  page = BufferGetPage(buffer);
110 
111  /* insert new tuple */
112  if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
113  {
114  /* normal cases, tuple was added by SpGistPageAddNewItem */
115  addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
116  xldata->offnumLeaf);
117 
118  /* update head tuple's chain link if needed */
119  if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
120  {
121  SpGistLeafTuple head;
122 
123  head = (SpGistLeafTuple) PageGetItem(page,
124  PageGetItemId(page, xldata->offnumHeadLeaf));
125  Assert(SGLT_GET_NEXTOFFSET(head) == SGLT_GET_NEXTOFFSET(&leafTupleHdr));
126  SGLT_SET_NEXTOFFSET(head, xldata->offnumLeaf);
127  }
128  }
129  else
130  {
131  /* replacing a DEAD tuple */
132  PageIndexTupleDelete(page, xldata->offnumLeaf);
133  if (PageAddItem(page,
134  (Item) leafTuple, leafTupleHdr.size,
135  xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
136  elog(ERROR, "failed to add item of size %u to SPGiST index page",
137  leafTupleHdr.size);
138  }
139 
140  PageSetLSN(page, lsn);
141  MarkBufferDirty(buffer);
142  }
143  if (BufferIsValid(buffer))
144  UnlockReleaseBuffer(buffer);
145 
146  /* update parent downlink if necessary */
147  if (xldata->offnumParent != InvalidOffsetNumber)
148  {
149  if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
150  {
151  SpGistInnerTuple tuple;
152  BlockNumber blknoLeaf;
153 
154  XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
155 
156  page = BufferGetPage(buffer);
157 
158  tuple = (SpGistInnerTuple) PageGetItem(page,
159  PageGetItemId(page, xldata->offnumParent));
160 
161  spgUpdateNodeLink(tuple, xldata->nodeI,
162  blknoLeaf, xldata->offnumLeaf);
163 
164  PageSetLSN(page, lsn);
165  MarkBufferDirty(buffer);
166  }
167  if (BufferIsValid(buffer))
168  UnlockReleaseBuffer(buffer);
169  }
170 }
171 
172 static void
174 {
175  XLogRecPtr lsn = record->EndRecPtr;
176  char *ptr = XLogRecGetData(record);
177  spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
179  OffsetNumber *toDelete;
180  OffsetNumber *toInsert;
181  int nInsert;
182  Buffer buffer;
183  Page page;
185  BlockNumber blknoDst;
186 
187  XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
188 
189  fillFakeState(&state, xldata->stateSrc);
190 
191  nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
192 
193  ptr += SizeOfSpgxlogMoveLeafs;
194  toDelete = (OffsetNumber *) ptr;
195  ptr += sizeof(OffsetNumber) * xldata->nMoves;
196  toInsert = (OffsetNumber *) ptr;
197  ptr += sizeof(OffsetNumber) * nInsert;
198 
199  /* now ptr points to the list of leaf tuples */
200 
201  /*
202  * In normal operation we would have all three pages (source, dest, and
203  * parent) locked simultaneously; but in WAL replay it should be safe to
204  * update them one at a time, as long as we do it in the right order.
205  */
206 
207  /* Insert tuples on the dest page (do first, so redirect is valid) */
208  if (xldata->newPage)
209  {
210  buffer = XLogInitBufferForRedo(record, 1);
211  SpGistInitBuffer(buffer,
212  SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
214  }
215  else
216  action = XLogReadBufferForRedo(record, 1, &buffer);
217 
218  if (action == BLK_NEEDS_REDO)
219  {
220  int i;
221 
222  page = BufferGetPage(buffer);
223 
224  for (i = 0; i < nInsert; i++)
225  {
226  char *leafTuple;
227  SpGistLeafTupleData leafTupleHdr;
228 
229  /*
230  * the tuples are not aligned, so must copy to access the size
231  * field.
232  */
233  leafTuple = ptr;
234  memcpy(&leafTupleHdr, leafTuple,
235  sizeof(SpGistLeafTupleData));
236 
237  addOrReplaceTuple(page, (Item) leafTuple,
238  leafTupleHdr.size, toInsert[i]);
239  ptr += leafTupleHdr.size;
240  }
241 
242  PageSetLSN(page, lsn);
243  MarkBufferDirty(buffer);
244  }
245  if (BufferIsValid(buffer))
246  UnlockReleaseBuffer(buffer);
247 
248  /* Delete tuples from the source page, inserting a redirection pointer */
249  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
250  {
251  page = BufferGetPage(buffer);
252 
253  spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
256  blknoDst,
257  toInsert[nInsert - 1]);
258 
259  PageSetLSN(page, lsn);
260  MarkBufferDirty(buffer);
261  }
262  if (BufferIsValid(buffer))
263  UnlockReleaseBuffer(buffer);
264 
265  /* And update the parent downlink */
266  if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
267  {
268  SpGistInnerTuple tuple;
269 
270  page = BufferGetPage(buffer);
271 
272  tuple = (SpGistInnerTuple) PageGetItem(page,
273  PageGetItemId(page, xldata->offnumParent));
274 
275  spgUpdateNodeLink(tuple, xldata->nodeI,
276  blknoDst, toInsert[nInsert - 1]);
277 
278  PageSetLSN(page, lsn);
279  MarkBufferDirty(buffer);
280  }
281  if (BufferIsValid(buffer))
282  UnlockReleaseBuffer(buffer);
283 }
284 
285 static void
287 {
288  XLogRecPtr lsn = record->EndRecPtr;
289  char *ptr = XLogRecGetData(record);
290  spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
291  char *innerTuple;
292  SpGistInnerTupleData innerTupleHdr;
294  Buffer buffer;
295  Page page;
297 
298  ptr += sizeof(spgxlogAddNode);
299  innerTuple = ptr;
300  /* the tuple is unaligned, so make a copy to access its header */
301  memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
302 
303  fillFakeState(&state, xldata->stateSrc);
304 
305  if (!XLogRecHasBlockRef(record, 1))
306  {
307  /* update in place */
308  Assert(xldata->parentBlk == -1);
309  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
310  {
311  page = BufferGetPage(buffer);
312 
313  PageIndexTupleDelete(page, xldata->offnum);
314  if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size,
315  xldata->offnum,
316  false, false) != xldata->offnum)
317  elog(ERROR, "failed to add item of size %u to SPGiST index page",
318  innerTupleHdr.size);
319 
320  PageSetLSN(page, lsn);
321  MarkBufferDirty(buffer);
322  }
323  if (BufferIsValid(buffer))
324  UnlockReleaseBuffer(buffer);
325  }
326  else
327  {
328  BlockNumber blkno;
329  BlockNumber blknoNew;
330 
331  XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
332  XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
333 
334  /*
335  * In normal operation we would have all three pages (source, dest,
336  * and parent) locked simultaneously; but in WAL replay it should be
337  * safe to update them one at a time, as long as we do it in the right
338  * order. We must insert the new tuple before replacing the old tuple
339  * with the redirect tuple.
340  */
341 
342  /* Install new tuple first so redirect is valid */
343  if (xldata->newPage)
344  {
345  /* AddNode is not used for nulls pages */
346  buffer = XLogInitBufferForRedo(record, 1);
347  SpGistInitBuffer(buffer, 0);
349  }
350  else
351  action = XLogReadBufferForRedo(record, 1, &buffer);
352  if (action == BLK_NEEDS_REDO)
353  {
354  page = BufferGetPage(buffer);
355 
356  addOrReplaceTuple(page, (Item) innerTuple,
357  innerTupleHdr.size, xldata->offnumNew);
358 
359  /*
360  * If parent is in this same page, update it now.
361  */
362  if (xldata->parentBlk == 1)
363  {
364  SpGistInnerTuple parentTuple;
365 
366  parentTuple = (SpGistInnerTuple) PageGetItem(page,
367  PageGetItemId(page, xldata->offnumParent));
368 
369  spgUpdateNodeLink(parentTuple, xldata->nodeI,
370  blknoNew, xldata->offnumNew);
371  }
372  PageSetLSN(page, lsn);
373  MarkBufferDirty(buffer);
374  }
375  if (BufferIsValid(buffer))
376  UnlockReleaseBuffer(buffer);
377 
378  /* Delete old tuple, replacing it with redirect or placeholder tuple */
379  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
380  {
381  SpGistDeadTuple dt;
382 
383  page = BufferGetPage(buffer);
384 
385  if (state.isBuild)
389  else
391  blknoNew,
392  xldata->offnumNew);
393 
394  PageIndexTupleDelete(page, xldata->offnum);
395  if (PageAddItem(page, (Item) dt, dt->size,
396  xldata->offnum,
397  false, false) != xldata->offnum)
398  elog(ERROR, "failed to add item of size %u to SPGiST index page",
399  dt->size);
400 
401  if (state.isBuild)
402  SpGistPageGetOpaque(page)->nPlaceholder++;
403  else
404  SpGistPageGetOpaque(page)->nRedirection++;
405 
406  /*
407  * If parent is in this same page, update it now.
408  */
409  if (xldata->parentBlk == 0)
410  {
411  SpGistInnerTuple parentTuple;
412 
413  parentTuple = (SpGistInnerTuple) PageGetItem(page,
414  PageGetItemId(page, xldata->offnumParent));
415 
416  spgUpdateNodeLink(parentTuple, xldata->nodeI,
417  blknoNew, xldata->offnumNew);
418  }
419  PageSetLSN(page, lsn);
420  MarkBufferDirty(buffer);
421  }
422  if (BufferIsValid(buffer))
423  UnlockReleaseBuffer(buffer);
424 
425  /*
426  * Update parent downlink (if we didn't do it as part of the source or
427  * destination page update already).
428  */
429  if (xldata->parentBlk == 2)
430  {
431  if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
432  {
433  SpGistInnerTuple parentTuple;
434 
435  page = BufferGetPage(buffer);
436 
437  parentTuple = (SpGistInnerTuple) PageGetItem(page,
438  PageGetItemId(page, xldata->offnumParent));
439 
440  spgUpdateNodeLink(parentTuple, xldata->nodeI,
441  blknoNew, xldata->offnumNew);
442 
443  PageSetLSN(page, lsn);
444  MarkBufferDirty(buffer);
445  }
446  if (BufferIsValid(buffer))
447  UnlockReleaseBuffer(buffer);
448  }
449  }
450 }
451 
452 static void
454 {
455  XLogRecPtr lsn = record->EndRecPtr;
456  char *ptr = XLogRecGetData(record);
457  spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
458  char *prefixTuple;
459  SpGistInnerTupleData prefixTupleHdr;
460  char *postfixTuple;
461  SpGistInnerTupleData postfixTupleHdr;
462  Buffer buffer;
463  Page page;
465 
466  ptr += sizeof(spgxlogSplitTuple);
467  prefixTuple = ptr;
468  /* the prefix tuple is unaligned, so make a copy to access its header */
469  memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
470  ptr += prefixTupleHdr.size;
471  postfixTuple = ptr;
472  /* postfix tuple is also unaligned */
473  memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
474 
475  /*
476  * In normal operation we would have both pages locked simultaneously; but
477  * in WAL replay it should be safe to update them one at a time, as long
478  * as we do it in the right order.
479  */
480 
481  /* insert postfix tuple first to avoid dangling link */
482  if (!xldata->postfixBlkSame)
483  {
484  if (xldata->newPage)
485  {
486  buffer = XLogInitBufferForRedo(record, 1);
487  /* SplitTuple is not used for nulls pages */
488  SpGistInitBuffer(buffer, 0);
490  }
491  else
492  action = XLogReadBufferForRedo(record, 1, &buffer);
493  if (action == BLK_NEEDS_REDO)
494  {
495  page = BufferGetPage(buffer);
496 
497  addOrReplaceTuple(page, (Item) postfixTuple,
498  postfixTupleHdr.size, xldata->offnumPostfix);
499 
500  PageSetLSN(page, lsn);
501  MarkBufferDirty(buffer);
502  }
503  if (BufferIsValid(buffer))
504  UnlockReleaseBuffer(buffer);
505  }
506 
507  /* now handle the original page */
508  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
509  {
510  page = BufferGetPage(buffer);
511 
512  PageIndexTupleDelete(page, xldata->offnumPrefix);
513  if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size,
514  xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
515  elog(ERROR, "failed to add item of size %u to SPGiST index page",
516  prefixTupleHdr.size);
517 
518  if (xldata->postfixBlkSame)
519  addOrReplaceTuple(page, (Item) postfixTuple,
520  postfixTupleHdr.size,
521  xldata->offnumPostfix);
522 
523  PageSetLSN(page, lsn);
524  MarkBufferDirty(buffer);
525  }
526  if (BufferIsValid(buffer))
527  UnlockReleaseBuffer(buffer);
528 }
529 
530 static void
532 {
533  XLogRecPtr lsn = record->EndRecPtr;
534  char *ptr = XLogRecGetData(record);
535  spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
536  char *innerTuple;
537  SpGistInnerTupleData innerTupleHdr;
539  OffsetNumber *toDelete;
540  OffsetNumber *toInsert;
541  uint8 *leafPageSelect;
542  Buffer srcBuffer;
543  Buffer destBuffer;
544  Buffer innerBuffer;
545  Page srcPage;
546  Page destPage;
547  Page page;
548  int i;
549  BlockNumber blknoInner;
551 
552  XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
553 
554  fillFakeState(&state, xldata->stateSrc);
555 
556  ptr += SizeOfSpgxlogPickSplit;
557  toDelete = (OffsetNumber *) ptr;
558  ptr += sizeof(OffsetNumber) * xldata->nDelete;
559  toInsert = (OffsetNumber *) ptr;
560  ptr += sizeof(OffsetNumber) * xldata->nInsert;
561  leafPageSelect = (uint8 *) ptr;
562  ptr += sizeof(uint8) * xldata->nInsert;
563 
564  innerTuple = ptr;
565  /* the inner tuple is unaligned, so make a copy to access its header */
566  memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
567  ptr += innerTupleHdr.size;
568 
569  /* now ptr points to the list of leaf tuples */
570 
571  if (xldata->isRootSplit)
572  {
573  /* when splitting root, we touch it only in the guise of new inner */
574  srcBuffer = InvalidBuffer;
575  srcPage = NULL;
576  }
577  else if (xldata->initSrc)
578  {
579  /* just re-init the source page */
580  srcBuffer = XLogInitBufferForRedo(record, 0);
581  srcPage = (Page) BufferGetPage(srcBuffer);
582 
583  SpGistInitBuffer(srcBuffer,
584  SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
585  /* don't update LSN etc till we're done with it */
586  }
587  else
588  {
589  /*
590  * Delete the specified tuples from source page. (In case we're in
591  * Hot Standby, we need to hold lock on the page till we're done
592  * inserting leaf tuples and the new inner tuple, else the added
593  * redirect tuple will be a dangling link.)
594  */
595  srcPage = NULL;
596  if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
597  {
598  srcPage = BufferGetPage(srcBuffer);
599 
600  /*
601  * We have it a bit easier here than in doPickSplit(), because we
602  * know the inner tuple's location already, so we can inject the
603  * correct redirection tuple now.
604  */
605  if (!state.isBuild)
606  spgPageIndexMultiDelete(&state, srcPage,
607  toDelete, xldata->nDelete,
610  blknoInner,
611  xldata->offnumInner);
612  else
613  spgPageIndexMultiDelete(&state, srcPage,
614  toDelete, xldata->nDelete,
619 
620  /* don't update LSN etc till we're done with it */
621  }
622  }
623 
624  /* try to access dest page if any */
625  if (!XLogRecHasBlockRef(record, 1))
626  {
627  destBuffer = InvalidBuffer;
628  destPage = NULL;
629  }
630  else if (xldata->initDest)
631  {
632  /* just re-init the dest page */
633  destBuffer = XLogInitBufferForRedo(record, 1);
634  destPage = (Page) BufferGetPage(destBuffer);
635 
636  SpGistInitBuffer(destBuffer,
637  SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
638  /* don't update LSN etc till we're done with it */
639  }
640  else
641  {
642  /*
643  * We could probably release the page lock immediately in the
644  * full-page-image case, but for safety let's hold it till later.
645  */
646  if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
647  destPage = (Page) BufferGetPage(destBuffer);
648  else
649  destPage = NULL; /* don't do any page updates */
650  }
651 
652  /* restore leaf tuples to src and/or dest page */
653  for (i = 0; i < xldata->nInsert; i++)
654  {
655  char *leafTuple;
656  SpGistLeafTupleData leafTupleHdr;
657 
658  /* the tuples are not aligned, so must copy to access the size field. */
659  leafTuple = ptr;
660  memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
661  ptr += leafTupleHdr.size;
662 
663  page = leafPageSelect[i] ? destPage : srcPage;
664  if (page == NULL)
665  continue; /* no need to touch this page */
666 
667  addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
668  toInsert[i]);
669  }
670 
671  /* Now update src and dest page LSNs if needed */
672  if (srcPage != NULL)
673  {
674  PageSetLSN(srcPage, lsn);
675  MarkBufferDirty(srcBuffer);
676  }
677  if (destPage != NULL)
678  {
679  PageSetLSN(destPage, lsn);
680  MarkBufferDirty(destBuffer);
681  }
682 
683  /* restore new inner tuple */
684  if (xldata->initInner)
685  {
686  innerBuffer = XLogInitBufferForRedo(record, 2);
687  SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
689  }
690  else
691  action = XLogReadBufferForRedo(record, 2, &innerBuffer);
692 
693  if (action == BLK_NEEDS_REDO)
694  {
695  page = BufferGetPage(innerBuffer);
696 
697  addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size,
698  xldata->offnumInner);
699 
700  /* if inner is also parent, update link while we're here */
701  if (xldata->innerIsParent)
702  {
703  SpGistInnerTuple parent;
704 
705  parent = (SpGistInnerTuple) PageGetItem(page,
706  PageGetItemId(page, xldata->offnumParent));
707  spgUpdateNodeLink(parent, xldata->nodeI,
708  blknoInner, xldata->offnumInner);
709  }
710 
711  PageSetLSN(page, lsn);
712  MarkBufferDirty(innerBuffer);
713  }
714  if (BufferIsValid(innerBuffer))
715  UnlockReleaseBuffer(innerBuffer);
716 
717  /*
718  * Now we can release the leaf-page locks. It's okay to do this before
719  * updating the parent downlink.
720  */
721  if (BufferIsValid(srcBuffer))
722  UnlockReleaseBuffer(srcBuffer);
723  if (BufferIsValid(destBuffer))
724  UnlockReleaseBuffer(destBuffer);
725 
726  /* update parent downlink, unless we did it above */
727  if (XLogRecHasBlockRef(record, 3))
728  {
729  Buffer parentBuffer;
730 
731  if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
732  {
733  SpGistInnerTuple parent;
734 
735  page = BufferGetPage(parentBuffer);
736 
737  parent = (SpGistInnerTuple) PageGetItem(page,
738  PageGetItemId(page, xldata->offnumParent));
739  spgUpdateNodeLink(parent, xldata->nodeI,
740  blknoInner, xldata->offnumInner);
741 
742  PageSetLSN(page, lsn);
743  MarkBufferDirty(parentBuffer);
744  }
745  if (BufferIsValid(parentBuffer))
746  UnlockReleaseBuffer(parentBuffer);
747  }
748  else
749  Assert(xldata->innerIsParent || xldata->isRootSplit);
750 }
751 
752 static void
754 {
755  XLogRecPtr lsn = record->EndRecPtr;
756  char *ptr = XLogRecGetData(record);
757  spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
758  OffsetNumber *toDead;
759  OffsetNumber *toPlaceholder;
760  OffsetNumber *moveSrc;
761  OffsetNumber *moveDest;
762  OffsetNumber *chainSrc;
763  OffsetNumber *chainDest;
765  Buffer buffer;
766  Page page;
767  int i;
768 
769  fillFakeState(&state, xldata->stateSrc);
770 
772  toDead = (OffsetNumber *) ptr;
773  ptr += sizeof(OffsetNumber) * xldata->nDead;
774  toPlaceholder = (OffsetNumber *) ptr;
775  ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
776  moveSrc = (OffsetNumber *) ptr;
777  ptr += sizeof(OffsetNumber) * xldata->nMove;
778  moveDest = (OffsetNumber *) ptr;
779  ptr += sizeof(OffsetNumber) * xldata->nMove;
780  chainSrc = (OffsetNumber *) ptr;
781  ptr += sizeof(OffsetNumber) * xldata->nChain;
782  chainDest = (OffsetNumber *) ptr;
783 
784  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
785  {
786  page = BufferGetPage(buffer);
787 
789  toDead, xldata->nDead,
793 
795  toPlaceholder, xldata->nPlaceholder,
799 
800  /* see comments in vacuumLeafPage() */
801  for (i = 0; i < xldata->nMove; i++)
802  {
803  ItemId idSrc = PageGetItemId(page, moveSrc[i]);
804  ItemId idDest = PageGetItemId(page, moveDest[i]);
805  ItemIdData tmp;
806 
807  tmp = *idSrc;
808  *idSrc = *idDest;
809  *idDest = tmp;
810  }
811 
813  moveSrc, xldata->nMove,
817 
818  for (i = 0; i < xldata->nChain; i++)
819  {
820  SpGistLeafTuple lt;
821 
822  lt = (SpGistLeafTuple) PageGetItem(page,
823  PageGetItemId(page, chainSrc[i]));
824  Assert(lt->tupstate == SPGIST_LIVE);
825  SGLT_SET_NEXTOFFSET(lt, chainDest[i]);
826  }
827 
828  PageSetLSN(page, lsn);
829  MarkBufferDirty(buffer);
830  }
831  if (BufferIsValid(buffer))
832  UnlockReleaseBuffer(buffer);
833 }
834 
835 static void
837 {
838  XLogRecPtr lsn = record->EndRecPtr;
839  char *ptr = XLogRecGetData(record);
840  spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
841  OffsetNumber *toDelete;
842  Buffer buffer;
843  Page page;
844 
845  toDelete = xldata->offsets;
846 
847  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
848  {
849  page = BufferGetPage(buffer);
850 
851  /* The tuple numbers are in order */
852  PageIndexMultiDelete(page, toDelete, xldata->nDelete);
853 
854  PageSetLSN(page, lsn);
855  MarkBufferDirty(buffer);
856  }
857  if (BufferIsValid(buffer))
858  UnlockReleaseBuffer(buffer);
859 }
860 
861 static void
863 {
864  XLogRecPtr lsn = record->EndRecPtr;
865  char *ptr = XLogRecGetData(record);
867  OffsetNumber *itemToPlaceholder;
868  Buffer buffer;
869 
870  itemToPlaceholder = xldata->offsets;
871 
872  /*
873  * If any redirection tuples are being removed, make sure there are no
874  * live Hot Standby transactions that might need to see them.
875  */
876  if (InHotStandby)
877  {
878  RelFileLocator locator;
879 
880  XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
882  xldata->isCatalogRel,
883  locator);
884  }
885 
886  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
887  {
888  Page page = BufferGetPage(buffer);
889  SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
890  int i;
891 
892  /* Convert redirect pointers to plain placeholders */
893  for (i = 0; i < xldata->nToPlaceholder; i++)
894  {
895  SpGistDeadTuple dt;
896 
897  dt = (SpGistDeadTuple) PageGetItem(page,
898  PageGetItemId(page, itemToPlaceholder[i]));
902  }
903 
904  Assert(opaque->nRedirection >= xldata->nToPlaceholder);
905  opaque->nRedirection -= xldata->nToPlaceholder;
906  opaque->nPlaceholder += xldata->nToPlaceholder;
907 
908  /* Remove placeholder tuples at end of page */
909  if (xldata->firstPlaceholder != InvalidOffsetNumber)
910  {
911  int max = PageGetMaxOffsetNumber(page);
912  OffsetNumber *toDelete;
913 
914  toDelete = palloc(sizeof(OffsetNumber) * max);
915 
916  for (i = xldata->firstPlaceholder; i <= max; i++)
917  toDelete[i - xldata->firstPlaceholder] = i;
918 
919  i = max - xldata->firstPlaceholder + 1;
920  Assert(opaque->nPlaceholder >= i);
921  opaque->nPlaceholder -= i;
922 
923  /* The array is sorted, so can use PageIndexMultiDelete */
924  PageIndexMultiDelete(page, toDelete, i);
925 
926  pfree(toDelete);
927  }
928 
929  PageSetLSN(page, lsn);
930  MarkBufferDirty(buffer);
931  }
932  if (BufferIsValid(buffer))
933  UnlockReleaseBuffer(buffer);
934 }
935 
936 void
938 {
939  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
940  MemoryContext oldCxt;
941 
942  oldCxt = MemoryContextSwitchTo(opCtx);
943  switch (info)
944  {
946  spgRedoAddLeaf(record);
947  break;
949  spgRedoMoveLeafs(record);
950  break;
952  spgRedoAddNode(record);
953  break;
955  spgRedoSplitTuple(record);
956  break;
958  spgRedoPickSplit(record);
959  break;
961  spgRedoVacuumLeaf(record);
962  break;
964  spgRedoVacuumRoot(record);
965  break;
967  spgRedoVacuumRedirect(record);
968  break;
969  default:
970  elog(PANIC, "spg_redo: unknown op code %u", info);
971  }
972 
973  MemoryContextSwitchTo(oldCxt);
975 }
976 
977 void
979 {
981  "SP-GiST temporary context",
983 }
984 
985 void
987 {
989  opCtx = NULL;
990 }
991 
992 /*
993  * Mask a SpGist page before performing consistency checks on it.
994  */
995 void
996 spg_mask(char *pagedata, BlockNumber blkno)
997 {
998  Page page = (Page) pagedata;
999  PageHeader pagehdr = (PageHeader) page;
1000 
1002 
1003  mask_page_hint_bits(page);
1004 
1005  /*
1006  * Mask the unused space, but only if the page's pd_lower appears to have
1007  * been set correctly.
1008  */
1009  if (pagehdr->pd_lower >= SizeOfPageHeaderData)
1010  mask_unused_space(page);
1011 }
uint32 BlockNumber
Definition: block.h:31
#define InvalidBlockNumber
Definition: block.h:33
int Buffer
Definition: buf.h:23
#define InvalidBuffer
Definition: buf.h:25
void mask_page_lsn_and_checksum(Page page)
Definition: bufmask.c:31
void mask_unused_space(Page page)
Definition: bufmask.c:71
void mask_page_hint_bits(Page page)
Definition: bufmask.c:46
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4497
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:2111
static Page BufferGetPage(Buffer buffer)
Definition: bufmgr.h:355
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:303
void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
Definition: bufpage.c:1161
void PageIndexTupleDelete(Page page, OffsetNumber offnum)
Definition: bufpage.c:1052
PageHeaderData * PageHeader
Definition: bufpage.h:170
Pointer Page
Definition: bufpage.h:78
static Item PageGetItem(Page page, ItemId itemId)
Definition: bufpage.h:351
#define SizeOfPageHeaderData
Definition: bufpage.h:213
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
Definition: bufpage.h:240
static void PageSetLSN(Page page, XLogRecPtr lsn)
Definition: bufpage.h:388
static OffsetNumber PageGetMaxOffsetNumber(Page page)
Definition: bufpage.h:369
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:468
unsigned char uint8
Definition: c.h:488
#define PANIC
Definition: elog.h:42
#define ERROR
Definition: elog.h:39
int i
Definition: isn.c:73
Pointer Item
Definition: item.h:17
static void ItemPointerSetInvalid(ItemPointerData *pointer)
Definition: itemptr.h:184
Assert(fmt[strlen(fmt) - 1] !='\n')
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:330
void pfree(void *pointer)
Definition: mcxt.c:1456
void * palloc0(Size size)
Definition: mcxt.c:1257
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:403
void * palloc(Size size)
Definition: mcxt.c:1226
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define InvalidOffsetNumber
Definition: off.h:26
uint16 OffsetNumber
Definition: off.h:24
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
void spgPageIndexMultiDelete(SpGistState *state, Page page, OffsetNumber *itemnos, int nitems, int firststate, int reststate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:132
void spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN, BlockNumber blkno, OffsetNumber offset)
Definition: spgdoinsert.c:51
SpGistDeadTupleData * SpGistDeadTuple
#define SPGIST_REDIRECT
SpGistInnerTupleData * SpGistInnerTuple
#define SPGIST_LIVE
#define SGDTSIZE
#define SGLT_GET_NEXTOFFSET(spgLeafTuple)
#define SPGIST_PLACEHOLDER
#define SPGIST_DEAD
#define SGLT_SET_NEXTOFFSET(spgLeafTuple, offsetNumber)
struct SpGistLeafTupleData * SpGistLeafTuple
#define SPGIST_NULLS
#define SpGistPageGetOpaque(page)
#define SPGIST_LEAF
SpGistDeadTuple spgFormDeadTuple(SpGistState *state, int tupstate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgutils.c:1057
void SpGistInitBuffer(Buffer b, uint16 f)
Definition: spgutils.c:694
static void addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
Definition: spgxlog.c:52
void spg_redo(XLogReaderState *record)
Definition: spgxlog.c:937
static void spgRedoVacuumRoot(XLogReaderState *record)
Definition: spgxlog.c:836
static void spgRedoSplitTuple(XLogReaderState *record)
Definition: spgxlog.c:453
static void spgRedoVacuumRedirect(XLogReaderState *record)
Definition: spgxlog.c:862
void spg_xlog_cleanup(void)
Definition: spgxlog.c:986
void spg_mask(char *pagedata, BlockNumber blkno)
Definition: spgxlog.c:996
static void spgRedoMoveLeafs(XLogReaderState *record)
Definition: spgxlog.c:173
static void fillFakeState(SpGistState *state, spgxlogState stateSrc)
Definition: spgxlog.c:37
void spg_xlog_startup(void)
Definition: spgxlog.c:978
static void spgRedoAddNode(XLogReaderState *record)
Definition: spgxlog.c:286
static MemoryContext opCtx
Definition: spgxlog.c:27
static void spgRedoVacuumLeaf(XLogReaderState *record)
Definition: spgxlog.c:753
static void spgRedoPickSplit(XLogReaderState *record)
Definition: spgxlog.c:531
static void spgRedoAddLeaf(XLogReaderState *record)
Definition: spgxlog.c:76
struct spgxlogSplitTuple spgxlogSplitTuple
#define XLOG_SPGIST_SPLIT_TUPLE
Definition: spgxlog.h:25
#define SizeOfSpgxlogPickSplit
Definition: spgxlog.h:199
#define XLOG_SPGIST_VACUUM_ROOT
Definition: spgxlog.h:28
struct spgxlogAddLeaf spgxlogAddLeaf
#define SizeOfSpgxlogVacuumLeaf
Definition: spgxlog.h:223
#define SizeOfSpgxlogMoveLeafs
Definition: spgxlog.h:91
#define XLOG_SPGIST_VACUUM_LEAF
Definition: spgxlog.h:27
#define XLOG_SPGIST_ADD_NODE
Definition: spgxlog.h:24
#define XLOG_SPGIST_ADD_LEAF
Definition: spgxlog.h:22
struct spgxlogAddNode spgxlogAddNode
#define XLOG_SPGIST_MOVE_LEAFS
Definition: spgxlog.h:23
#define XLOG_SPGIST_PICKSPLIT
Definition: spgxlog.h:26
#define XLOG_SPGIST_VACUUM_REDIRECT
Definition: spgxlog.h:29
void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, bool isCatalogRel, RelFileLocator locator)
Definition: standby.c:468
LocationIndex pd_lower
Definition: bufpage.h:162
unsigned int tupstate
ItemPointerData pointer
unsigned int tupstate
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
uint16 nodeI
Definition: spgxlog.h:54
bool newPage
Definition: spgxlog.h:48
OffsetNumber offnumLeaf
Definition: spgxlog.h:50
bool storesNulls
Definition: spgxlog.h:49
OffsetNumber offnumHeadLeaf
Definition: spgxlog.h:51
OffsetNumber offnumParent
Definition: spgxlog.h:53
OffsetNumber offnumNew
Definition: spgxlog.h:111
bool newPage
Definition: spgxlog.h:112
OffsetNumber offnumParent
Definition: spgxlog.h:126
OffsetNumber offnum
Definition: spgxlog.h:105
spgxlogState stateSrc
Definition: spgxlog.h:130
uint16 nodeI
Definition: spgxlog.h:128
int8 parentBlk
Definition: spgxlog.h:125
bool replaceDead
Definition: spgxlog.h:68
bool storesNulls
Definition: spgxlog.h:69
uint16 nMoves
Definition: spgxlog.h:66
uint16 nodeI
Definition: spgxlog.h:73
OffsetNumber offnumParent
Definition: spgxlog.h:72
spgxlogState stateSrc
Definition: spgxlog.h:75
uint16 nInsert
Definition: spgxlog.h:170
spgxlogState stateSrc
Definition: spgxlog.h:185
bool innerIsParent
Definition: spgxlog.h:181
uint16 nodeI
Definition: spgxlog.h:183
bool storesNulls
Definition: spgxlog.h:178
OffsetNumber offnumParent
Definition: spgxlog.h:182
OffsetNumber offnumInner
Definition: spgxlog.h:175
uint16 nDelete
Definition: spgxlog.h:169
bool isRootSplit
Definition: spgxlog.h:167
OffsetNumber offnumPostfix
Definition: spgxlog.h:147
OffsetNumber offnumPrefix
Definition: spgxlog.h:144
bool postfixBlkSame
Definition: spgxlog.h:149
TransactionId myXid
Definition: spgxlog.h:38
bool isBuild
Definition: spgxlog.h:39
spgxlogState stateSrc
Definition: spgxlog.h:208
uint16 nPlaceholder
Definition: spgxlog.h:204
OffsetNumber firstPlaceholder
Definition: spgxlog.h:241
TransactionId snapshotConflictHorizon
Definition: spgxlog.h:242
OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER]
Definition: spgxlog.h:247
uint16 nDelete
Definition: spgxlog.h:228
OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER]
Definition: spgxlog.h:233
Definition: regguts.h:323
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)
Definition: xlogreader.c:1961
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:410
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415
#define XLogRecHasBlockRef(decoder, block_id)
Definition: xlogreader.h:420
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
XLogRedoAction XLogReadBufferForRedo(XLogReaderState *record, uint8 block_id, Buffer *buf)
Definition: xlogutils.c:317
Buffer XLogInitBufferForRedo(XLogReaderState *record, uint8 block_id)
Definition: xlogutils.c:329
#define InHotStandby
Definition: xlogutils.h:57
XLogRedoAction
Definition: xlogutils.h:70
@ BLK_NEEDS_REDO
Definition: xlogutils.h:71