PostgreSQL Source Code  git master
spgdoinsert.c File Reference
#include "postgres.h"
#include "access/genam.h"
#include "access/spgist_private.h"
#include "access/spgxlog.h"
#include "access/xloginsert.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "utils/rel.h"
Include dependency graph for spgdoinsert.c:

Go to the source code of this file.

Data Structures

struct  SPPageDesc
 

Typedefs

typedef struct SPPageDesc SPPageDesc
 

Functions

void spgUpdateNodeLink (SpGistInnerTuple tup, int nodeN, BlockNumber blkno, OffsetNumber offset)
 
static SpGistInnerTuple addNode (SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
 
static int cmpOffsetNumbers (const void *a, const void *b)
 
void spgPageIndexMultiDelete (SpGistState *state, Page page, OffsetNumber *itemnos, int nitems, int firststate, int reststate, BlockNumber blkno, OffsetNumber offnum)
 
static void saveNodeLink (Relation index, SPPageDesc *parent, BlockNumber blkno, OffsetNumber offnum)
 
static void addLeafTuple (Relation index, SpGistState *state, SpGistLeafTuple leafTuple, SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
 
static int checkSplitConditions (Relation index, SpGistState *state, SPPageDesc *current, int *nToSplit)
 
static void moveLeafs (Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, bool isNulls)
 
static void setRedirectionTuple (SPPageDesc *current, OffsetNumber position, BlockNumber blkno, OffsetNumber offnum)
 
static bool checkAllTheSame (spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig, bool *includeNew)
 
static bool doPickSplit (Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, int level, bool isNulls, bool isNew)
 
static void spgMatchNodeAction (Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN)
 
static void spgAddNodeAction (Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN, Datum nodeLabel)
 
static void spgSplitNodeAction (Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, spgChooseOut *out)
 
bool spgdoinsert (Relation index, SpGistState *state, ItemPointer heapPtr, Datum *datums, bool *isnulls)
 

Typedef Documentation

◆ SPPageDesc

typedef struct SPPageDesc SPPageDesc

Function Documentation

◆ addLeafTuple()

static void addLeafTuple ( Relation  index,
SpGistState state,
SpGistLeafTuple  leafTuple,
SPPageDesc current,
SPPageDesc parent,
bool  isNulls,
bool  isNew 
)
static

Definition at line 203 of file spgdoinsert.c.

References SPPageDesc::blkno, SPPageDesc::buffer, elog, END_CRIT_SECTION, ERROR, InvalidBuffer, InvalidOffsetNumber, SpGistState::isBuild, MarkBufferDirty(), spgxlogAddLeaf::newPage, SPPageDesc::node, spgxlogAddLeaf::nodeI, SPPageDesc::offnum, spgxlogAddLeaf::offnumHeadLeaf, spgxlogAddLeaf::offnumLeaf, spgxlogAddLeaf::offnumParent, SPPageDesc::page, PageAddItem, PageGetItem, PageGetItemId, PageIndexTupleDelete(), PageSetLSN, REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, saveNodeLink(), SGLT_GET_NEXTOFFSET, SGLT_SET_NEXTOFFSET, SpGistLeafTupleData::size, SPGIST_DEAD, SPGIST_LIVE, SpGistBlockIsRoot, SpGistPageAddNewItem(), START_CRIT_SECTION, spgxlogAddLeaf::storesNulls, SpGistLeafTupleData::tupstate, XLOG_SPGIST_ADD_LEAF, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

205 {
206  spgxlogAddLeaf xlrec;
207 
208  xlrec.newPage = isNew;
209  xlrec.storesNulls = isNulls;
210 
211  /* these will be filled below as needed */
215  xlrec.nodeI = 0;
216 
218 
219  if (current->offnum == InvalidOffsetNumber ||
220  SpGistBlockIsRoot(current->blkno))
221  {
222  /* Tuple is not part of a chain */
224  current->offnum = SpGistPageAddNewItem(state, current->page,
225  (Item) leafTuple, leafTuple->size,
226  NULL, false);
227 
228  xlrec.offnumLeaf = current->offnum;
229 
230  /* Must update parent's downlink if any */
231  if (parent->buffer != InvalidBuffer)
232  {
233  xlrec.offnumParent = parent->offnum;
234  xlrec.nodeI = parent->node;
235 
236  saveNodeLink(index, parent, current->blkno, current->offnum);
237  }
238  }
239  else
240  {
241  /*
242  * Tuple must be inserted into existing chain. We mustn't change the
243  * chain's head address, but we don't need to chase the entire chain
244  * to put the tuple at the end; we can insert it second.
245  *
246  * Also, it's possible that the "chain" consists only of a DEAD tuple,
247  * in which case we should replace the DEAD tuple in-place.
248  */
249  SpGistLeafTuple head;
250  OffsetNumber offnum;
251 
252  head = (SpGistLeafTuple) PageGetItem(current->page,
253  PageGetItemId(current->page, current->offnum));
254  if (head->tupstate == SPGIST_LIVE)
255  {
256  SGLT_SET_NEXTOFFSET(leafTuple, SGLT_GET_NEXTOFFSET(head));
257  offnum = SpGistPageAddNewItem(state, current->page,
258  (Item) leafTuple, leafTuple->size,
259  NULL, false);
260 
261  /*
262  * re-get head of list because it could have been moved on page,
263  * and set new second element
264  */
265  head = (SpGistLeafTuple) PageGetItem(current->page,
266  PageGetItemId(current->page, current->offnum));
267  SGLT_SET_NEXTOFFSET(head, offnum);
268 
269  xlrec.offnumLeaf = offnum;
270  xlrec.offnumHeadLeaf = current->offnum;
271  }
272  else if (head->tupstate == SPGIST_DEAD)
273  {
275  PageIndexTupleDelete(current->page, current->offnum);
276  if (PageAddItem(current->page,
277  (Item) leafTuple, leafTuple->size,
278  current->offnum, false, false) != current->offnum)
279  elog(ERROR, "failed to add item of size %u to SPGiST index page",
280  leafTuple->size);
281 
282  /* WAL replay distinguishes this case by equal offnums */
283  xlrec.offnumLeaf = current->offnum;
284  xlrec.offnumHeadLeaf = current->offnum;
285  }
286  else
287  elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
288  }
289 
290  MarkBufferDirty(current->buffer);
291 
292  if (RelationNeedsWAL(index) && !state->isBuild)
293  {
294  XLogRecPtr recptr;
295  int flags;
296 
297  XLogBeginInsert();
298  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
299  XLogRegisterData((char *) leafTuple, leafTuple->size);
300 
301  flags = REGBUF_STANDARD;
302  if (xlrec.newPage)
303  flags |= REGBUF_WILL_INIT;
304  XLogRegisterBuffer(0, current->buffer, flags);
305  if (xlrec.offnumParent != InvalidOffsetNumber)
307 
308  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
309 
310  PageSetLSN(current->page, recptr);
311 
312  /* update parent only if we actually changed it */
313  if (xlrec.offnumParent != InvalidOffsetNumber)
314  {
315  PageSetLSN(parent->page, recptr);
316  }
317  }
318 
320 }
#define SPGIST_DEAD
static void saveNodeLink(Relation index, SPPageDesc *parent, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:186
OffsetNumber offnumParent
Definition: spgxlog.h:53
#define SGLT_SET_NEXTOFFSET(spgLeafTuple, offsetNumber)
OffsetNumber offnumLeaf
Definition: spgxlog.h:50
void PageIndexTupleDelete(Page page, OffsetNumber offnum)
Definition: bufpage.c:1045
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:1562
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:220
#define END_CRIT_SECTION()
Definition: miscadmin.h:137
Pointer Item
Definition: item.h:17
#define InvalidBuffer
Definition: buf.h:25
#define REGBUF_WILL_INIT
Definition: xloginsert.h:33
#define START_CRIT_SECTION()
Definition: miscadmin.h:135
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:416
#define XLOG_SPGIST_ADD_LEAF
Definition: spgxlog.h:22
uint16 OffsetNumber
Definition: off.h:24
#define ERROR
Definition: elog.h:46
bool newPage
Definition: spgxlog.h:48
struct SpGistLeafTupleData * SpGistLeafTuple
#define REGBUF_STANDARD
Definition: xloginsert.h:35
OffsetNumber offnum
Definition: spgdoinsert.c:39
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:235
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:330
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:422
unsigned int tupstate
OffsetNumber offnumHeadLeaf
Definition: spgxlog.h:51
bool storesNulls
Definition: spgxlog.h:49
#define InvalidOffsetNumber
Definition: off.h:26
uint16 nodeI
Definition: spgxlog.h:54
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define SpGistBlockIsRoot(blkno)
#define SPGIST_LIVE
#define RelationNeedsWAL(relation)
Definition: rel.h:570
#define SGLT_GET_NEXTOFFSET(spgLeafTuple)
#define elog(elevel,...)
Definition: elog.h:232
Buffer buffer
Definition: spgdoinsert.c:37
void XLogBeginInsert(void)
Definition: xloginsert.c:123
#define PageSetLSN(page, lsn)
Definition: bufpage.h:368
OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size, OffsetNumber *startOffset, bool errorOK)
Definition: spgutils.c:1173
#define PageGetItem(page, itemId)
Definition: bufpage.h:340
BlockNumber blkno
Definition: spgdoinsert.c:36

◆ addNode()

static SpGistInnerTuple addNode ( SpGistState state,
SpGistInnerTuple  tuple,
Datum  label,
int  offset 
)
static

Definition at line 78 of file spgdoinsert.c.

References elog, ERROR, i, SpGistInnerTupleData::nNodes, SPPageDesc::node, palloc(), SpGistInnerTupleData::prefixSize, SGITDATUM, SGITITERATE, spgFormInnerTuple(), and spgFormNodeTuple().

Referenced by spgAddNodeAction().

79 {
80  SpGistNodeTuple node,
81  *nodes;
82  int i;
83 
84  /* if offset is negative, insert at end */
85  if (offset < 0)
86  offset = tuple->nNodes;
87  else if (offset > tuple->nNodes)
88  elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
89 
90  nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
91  SGITITERATE(tuple, i, node)
92  {
93  if (i < offset)
94  nodes[i] = node;
95  else
96  nodes[i + 1] = node;
97  }
98 
99  nodes[offset] = spgFormNodeTuple(state, label, false);
100 
101  return spgFormInnerTuple(state,
102  (tuple->prefixSize > 0),
103  SGITDATUM(tuple, state),
104  tuple->nNodes + 1,
105  nodes);
106 }
#define SGITITERATE(x, i, nt)
#define SGITDATUM(x, s)
#define ERROR
Definition: elog.h:46
unsigned int prefixSize
SpGistInnerTuple spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix, int nNodes, SpGistNodeTuple *nodes)
Definition: spgutils.c:971
static char * label
void * palloc(Size size)
Definition: mcxt.c:1062
#define elog(elevel,...)
Definition: elog.h:232
int i
SpGistNodeTuple spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
Definition: spgutils.c:929

◆ checkAllTheSame()

static bool checkAllTheSame ( spgPickSplitIn in,
spgPickSplitOut out,
bool  tooBig,
bool includeNew 
)
static

Definition at line 598 of file spgdoinsert.c.

References i, spgPickSplitOut::mapTuplesToNodes, spgPickSplitOut::nNodes, spgPickSplitOut::nodeLabels, spgPickSplitIn::nTuples, and palloc().

Referenced by doPickSplit().

600 {
601  int theNode;
602  int limit;
603  int i;
604 
605  /* For the moment, assume we can include the new leaf tuple */
606  *includeNew = true;
607 
608  /* If there's only the new leaf tuple, don't select allTheSame mode */
609  if (in->nTuples <= 1)
610  return false;
611 
612  /* If tuple set doesn't fit on one page, ignore the new tuple in test */
613  limit = tooBig ? in->nTuples - 1 : in->nTuples;
614 
615  /* Check to see if more than one node is populated */
616  theNode = out->mapTuplesToNodes[0];
617  for (i = 1; i < limit; i++)
618  {
619  if (out->mapTuplesToNodes[i] != theNode)
620  return false;
621  }
622 
623  /* Nope, so override the picksplit function's decisions */
624 
625  /* If the new tuple is in its own node, it can't be included in split */
626  if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
627  *includeNew = false;
628 
629  out->nNodes = 8; /* arbitrary number of child nodes */
630 
631  /* Random assignment of tuples to nodes (note we include new tuple) */
632  for (i = 0; i < in->nTuples; i++)
633  out->mapTuplesToNodes[i] = i % out->nNodes;
634 
635  /* The opclass may not use node labels, but if it does, duplicate 'em */
636  if (out->nodeLabels)
637  {
638  Datum theLabel = out->nodeLabels[theNode];
639 
640  out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
641  for (i = 0; i < out->nNodes; i++)
642  out->nodeLabels[i] = theLabel;
643  }
644 
645  /* We don't touch the prefix or the leaf tuple datum assignments */
646 
647  return true;
648 }
Datum * nodeLabels
Definition: spgist.h:123
uintptr_t Datum
Definition: postgres.h:411
void * palloc(Size size)
Definition: mcxt.c:1062
int i
int * mapTuplesToNodes
Definition: spgist.h:125

◆ checkSplitConditions()

static int checkSplitConditions ( Relation  index,
SpGistState state,
SPPageDesc current,
int *  nToSplit 
)
static

Definition at line 333 of file spgdoinsert.c.

References Assert, SPPageDesc::blkno, elog, ERROR, FirstOffsetNumber, i, InvalidOffsetNumber, SPPageDesc::offnum, SPPageDesc::page, PageGetItem, PageGetItemId, PageGetMaxOffsetNumber, SGLT_GET_NEXTOFFSET, SpGistLeafTupleData::size, SPGIST_DEAD, SPGIST_LIVE, SpGistBlockIsRoot, and SpGistLeafTupleData::tupstate.

Referenced by spgdoinsert().

335 {
336  int i,
337  n = 0,
338  totalSize = 0;
339 
340  if (SpGistBlockIsRoot(current->blkno))
341  {
342  /* return impossible values to force split */
343  *nToSplit = BLCKSZ;
344  return BLCKSZ;
345  }
346 
347  i = current->offnum;
348  while (i != InvalidOffsetNumber)
349  {
350  SpGistLeafTuple it;
351 
352  Assert(i >= FirstOffsetNumber &&
353  i <= PageGetMaxOffsetNumber(current->page));
354  it = (SpGistLeafTuple) PageGetItem(current->page,
355  PageGetItemId(current->page, i));
356  if (it->tupstate == SPGIST_LIVE)
357  {
358  n++;
359  totalSize += it->size + sizeof(ItemIdData);
360  }
361  else if (it->tupstate == SPGIST_DEAD)
362  {
363  /* We could see a DEAD tuple as first/only chain item */
364  Assert(i == current->offnum);
366  /* Don't count it in result, because it won't go to other page */
367  }
368  else
369  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
370 
371  i = SGLT_GET_NEXTOFFSET(it);
372  }
373 
374  *nToSplit = n;
375 
376  return totalSize;
377 }
#define SPGIST_DEAD
#define PageGetMaxOffsetNumber(page)
Definition: bufpage.h:357
#define ERROR
Definition: elog.h:46
struct SpGistLeafTupleData * SpGistLeafTuple
#define FirstOffsetNumber
Definition: off.h:27
OffsetNumber offnum
Definition: spgdoinsert.c:39
struct ItemIdData ItemIdData
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:235
unsigned int tupstate
#define InvalidOffsetNumber
Definition: off.h:26
#define Assert(condition)
Definition: c.h:804
#define SpGistBlockIsRoot(blkno)
#define SPGIST_LIVE
#define SGLT_GET_NEXTOFFSET(spgLeafTuple)
#define elog(elevel,...)
Definition: elog.h:232
int i
#define PageGetItem(page, itemId)
Definition: bufpage.h:340
BlockNumber blkno
Definition: spgdoinsert.c:36

◆ cmpOffsetNumbers()

static int cmpOffsetNumbers ( const void *  a,
const void *  b 
)
static

Definition at line 110 of file spgdoinsert.c.

Referenced by spgPageIndexMultiDelete().

111 {
112  if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
113  return 0;
114  return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
115 }
uint16 OffsetNumber
Definition: off.h:24

◆ doPickSplit()

static bool doPickSplit ( Relation  index,
SpGistState state,
SPPageDesc current,
SPPageDesc parent,
SpGistLeafTuple  newLeafTuple,
int  level,
bool  isNulls,
bool  isNew 
)
static

Definition at line 675 of file spgdoinsert.c.

References SpGistInnerTupleData::allTheSame, Assert, SPPageDesc::blkno, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage, BufferIsValid, checkAllTheSame(), spgPickSplitIn::datums, elog, END_CRIT_SECTION, ERROR, FirstOffsetNumber, FunctionCall2Coll(), GBUF_INNER_PARITY, GBUF_LEAF, GBUF_NULLS, spgPickSplitOut::hasPrefix, i, index_getprocinfo(), INDEX_MAX_KEYS, spgxlogPickSplit::initDest, spgxlogPickSplit::initInner, spgxlogPickSplit::initSrc, spgxlogPickSplit::innerIsParent, InvalidBlockNumber, InvalidBuffer, InvalidOffsetNumber, SpGistState::isBuild, spgxlogPickSplit::isRootSplit, ItemPointerGetBlockNumber, ItemPointerGetOffsetNumber, ItemPointerIsValid, ItemPointerSet, label, SpGistState::leafTupDesc, spgPickSplitOut::leafTupleDatums, spgPickSplitIn::level, spgPickSplitOut::mapTuplesToNodes, MarkBufferDirty(), Min, TupleDescData::natts, spgxlogPickSplit::nDelete, spgxlogPickSplit::nInsert, spgPickSplitOut::nNodes, SPPageDesc::node, spgxlogPickSplit::nodeI, spgPickSplitOut::nodeLabels, spgPickSplitIn::nTuples, SPPageDesc::offnum, spgxlogPickSplit::offnumInner, spgxlogPickSplit::offnumParent, SPPageDesc::page, PageAddItem, PageGetExactFreeSpace(), PageGetItem, PageGetItemId, PageGetMaxOffsetNumber, PageSetLSN, palloc(), palloc0(), PointerGetDatum, spgPickSplitOut::prefixDatum, RelationData::rd_indcollation, REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, saveNodeLink(), setRedirectionTuple(), SGDTSIZE, SGITITERATE, SGLT_GET_NEXTOFFSET, SGLT_SET_NEXTOFFSET, SGLTDATUM, SpGistInnerTupleData::size, SpGistLeafTupleData::size, SizeOfSpgxlogPickSplit, spgDeformLeafTuple(), spgFormInnerTuple(), spgFormLeafTuple(), spgFormNodeTuple(), SPGIST_DEAD, SPGIST_LEAF, SPGIST_LIVE, SPGIST_METAPAGE_BLKNO, SPGIST_NULLS, SPGIST_PAGE_CAPACITY, SPGIST_PICKSPLIT_PROC, SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistBlockIsRoot, SpGistGetBuffer(), SpGistInitBuffer(), SpGistPageAddNewItem(), SpGistPageGetFreeSpace, SpGistPageGetOpaque, SpGistSetLastUsedPage(), spgKeyColumn, spgPageIndexMultiDelete(), START_CRIT_SECTION, spgxlogPickSplit::stateSrc, STORE_STATE, spgxlogPickSplit::storesNulls, SpGistLeafTupleData::tupstate, UnlockReleaseBuffer(), XLOG_SPGIST_PICKSPLIT, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

679 {
680  bool insertedNew = false;
681  spgPickSplitIn in;
682  spgPickSplitOut out;
683  FmgrInfo *procinfo;
684  bool includeNew;
685  int i,
686  max,
687  n;
688  SpGistInnerTuple innerTuple;
689  SpGistNodeTuple node,
690  *nodes;
691  Buffer newInnerBuffer,
692  newLeafBuffer;
693  uint8 *leafPageSelect;
694  int *leafSizes;
695  OffsetNumber *toDelete;
696  OffsetNumber *toInsert;
697  OffsetNumber redirectTuplePos = InvalidOffsetNumber;
698  OffsetNumber startOffsets[2];
699  SpGistLeafTuple *oldLeafs;
700  SpGistLeafTuple *newLeafs;
701  Datum leafDatums[INDEX_MAX_KEYS];
702  bool leafIsnulls[INDEX_MAX_KEYS];
703  int spaceToDelete;
704  int currentFreeSpace;
705  int totalLeafSizes;
706  bool allTheSame;
707  spgxlogPickSplit xlrec;
708  char *leafdata,
709  *leafptr;
710  SPPageDesc saveCurrent;
711  int nToDelete,
712  nToInsert,
713  maxToInclude;
714 
715  in.level = level;
716 
717  /*
718  * Allocate per-leaf-tuple work arrays with max possible size
719  */
720  max = PageGetMaxOffsetNumber(current->page);
721  n = max + 1;
722  in.datums = (Datum *) palloc(sizeof(Datum) * n);
723  toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
724  toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
725  oldLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
726  newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
727  leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
728 
729  STORE_STATE(state, xlrec.stateSrc);
730 
731  /*
732  * Form list of leaf tuples which will be distributed as split result;
733  * also, count up the amount of space that will be freed from current.
734  * (Note that in the non-root case, we won't actually delete the old
735  * tuples, only replace them with redirects or placeholders.)
736  */
737  nToInsert = 0;
738  nToDelete = 0;
739  spaceToDelete = 0;
740  if (SpGistBlockIsRoot(current->blkno))
741  {
742  /*
743  * We are splitting the root (which up to now is also a leaf page).
744  * Its tuples are not linked, so scan sequentially to get them all. We
745  * ignore the original value of current->offnum.
746  */
747  for (i = FirstOffsetNumber; i <= max; i++)
748  {
749  SpGistLeafTuple it;
750 
751  it = (SpGistLeafTuple) PageGetItem(current->page,
752  PageGetItemId(current->page, i));
753  if (it->tupstate == SPGIST_LIVE)
754  {
755  in.datums[nToInsert] =
756  isNulls ? (Datum) 0 : SGLTDATUM(it, state);
757  oldLeafs[nToInsert] = it;
758  nToInsert++;
759  toDelete[nToDelete] = i;
760  nToDelete++;
761  /* we will delete the tuple altogether, so count full space */
762  spaceToDelete += it->size + sizeof(ItemIdData);
763  }
764  else /* tuples on root should be live */
765  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
766  }
767  }
768  else
769  {
770  /* Normal case, just collect the leaf tuples in the chain */
771  i = current->offnum;
772  while (i != InvalidOffsetNumber)
773  {
774  SpGistLeafTuple it;
775 
776  Assert(i >= FirstOffsetNumber && i <= max);
777  it = (SpGistLeafTuple) PageGetItem(current->page,
778  PageGetItemId(current->page, i));
779  if (it->tupstate == SPGIST_LIVE)
780  {
781  in.datums[nToInsert] =
782  isNulls ? (Datum) 0 : SGLTDATUM(it, state);
783  oldLeafs[nToInsert] = it;
784  nToInsert++;
785  toDelete[nToDelete] = i;
786  nToDelete++;
787  /* we will not delete the tuple, only replace with dead */
788  Assert(it->size >= SGDTSIZE);
789  spaceToDelete += it->size - SGDTSIZE;
790  }
791  else if (it->tupstate == SPGIST_DEAD)
792  {
793  /* We could see a DEAD tuple as first/only chain item */
794  Assert(i == current->offnum);
796  toDelete[nToDelete] = i;
797  nToDelete++;
798  /* replacing it with redirect will save no space */
799  }
800  else
801  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
802 
803  i = SGLT_GET_NEXTOFFSET(it);
804  }
805  }
806  in.nTuples = nToInsert;
807 
808  /*
809  * We may not actually insert new tuple because another picksplit may be
810  * necessary due to too large value, but we will try to allocate enough
811  * space to include it; and in any case it has to be included in the input
812  * for the picksplit function. So don't increment nToInsert yet.
813  */
814  in.datums[in.nTuples] =
815  isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state);
816  oldLeafs[in.nTuples] = newLeafTuple;
817  in.nTuples++;
818 
819  memset(&out, 0, sizeof(out));
820 
821  if (!isNulls)
822  {
823  /*
824  * Perform split using user-defined method.
825  */
826  procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
827  FunctionCall2Coll(procinfo,
828  index->rd_indcollation[0],
829  PointerGetDatum(&in),
830  PointerGetDatum(&out));
831 
832  /*
833  * Form new leaf tuples and count up the total space needed.
834  */
835  totalLeafSizes = 0;
836  for (i = 0; i < in.nTuples; i++)
837  {
838  if (state->leafTupDesc->natts > 1)
839  spgDeformLeafTuple(oldLeafs[i],
840  state->leafTupDesc,
841  leafDatums,
842  leafIsnulls,
843  isNulls);
844 
845  leafDatums[spgKeyColumn] = out.leafTupleDatums[i];
846  leafIsnulls[spgKeyColumn] = false;
847 
848  newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
849  leafDatums,
850  leafIsnulls);
851  totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
852  }
853  }
854  else
855  {
856  /*
857  * Perform dummy split that puts all tuples into one node.
858  * checkAllTheSame will override this and force allTheSame mode.
859  */
860  out.hasPrefix = false;
861  out.nNodes = 1;
862  out.nodeLabels = NULL;
863  out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
864 
865  /*
866  * Form new leaf tuples and count up the total space needed.
867  */
868  totalLeafSizes = 0;
869  for (i = 0; i < in.nTuples; i++)
870  {
871  if (state->leafTupDesc->natts > 1)
872  spgDeformLeafTuple(oldLeafs[i],
873  state->leafTupDesc,
874  leafDatums,
875  leafIsnulls,
876  isNulls);
877 
878  /*
879  * Nulls tree can contain only null key values.
880  */
881  leafDatums[spgKeyColumn] = (Datum) 0;
882  leafIsnulls[spgKeyColumn] = true;
883 
884  newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
885  leafDatums,
886  leafIsnulls);
887  totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
888  }
889  }
890 
891  /*
892  * Check to see if the picksplit function failed to separate the values,
893  * ie, it put them all into the same child node. If so, select allTheSame
894  * mode and create a random split instead. See comments for
895  * checkAllTheSame as to why we need to know if the new leaf tuples could
896  * fit on one page.
897  */
898  allTheSame = checkAllTheSame(&in, &out,
899  totalLeafSizes > SPGIST_PAGE_CAPACITY,
900  &includeNew);
901 
902  /*
903  * If checkAllTheSame decided we must exclude the new tuple, don't
904  * consider it any further.
905  */
906  if (includeNew)
907  maxToInclude = in.nTuples;
908  else
909  {
910  maxToInclude = in.nTuples - 1;
911  totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
912  }
913 
914  /*
915  * Allocate per-node work arrays. Since checkAllTheSame could replace
916  * out.nNodes with a value larger than the number of tuples on the input
917  * page, we can't allocate these arrays before here.
918  */
919  nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
920  leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
921 
922  /*
923  * Form nodes of inner tuple and inner tuple itself
924  */
925  for (i = 0; i < out.nNodes; i++)
926  {
927  Datum label = (Datum) 0;
928  bool labelisnull = (out.nodeLabels == NULL);
929 
930  if (!labelisnull)
931  label = out.nodeLabels[i];
932  nodes[i] = spgFormNodeTuple(state, label, labelisnull);
933  }
934  innerTuple = spgFormInnerTuple(state,
935  out.hasPrefix, out.prefixDatum,
936  out.nNodes, nodes);
937  innerTuple->allTheSame = allTheSame;
938 
939  /*
940  * Update nodes[] array to point into the newly formed innerTuple, so that
941  * we can adjust their downlinks below.
942  */
943  SGITITERATE(innerTuple, i, node)
944  {
945  nodes[i] = node;
946  }
947 
948  /*
949  * Re-scan new leaf tuples and count up the space needed under each node.
950  */
951  for (i = 0; i < maxToInclude; i++)
952  {
953  n = out.mapTuplesToNodes[i];
954  if (n < 0 || n >= out.nNodes)
955  elog(ERROR, "inconsistent result of SPGiST picksplit function");
956  leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
957  }
958 
959  /*
960  * To perform the split, we must insert a new inner tuple, which can't go
961  * on a leaf page; and unless we are splitting the root page, we must then
962  * update the parent tuple's downlink to point to the inner tuple. If
963  * there is room, we'll put the new inner tuple on the same page as the
964  * parent tuple, otherwise we need another non-leaf buffer. But if the
965  * parent page is the root, we can't add the new inner tuple there,
966  * because the root page must have only one inner tuple.
967  */
968  xlrec.initInner = false;
969  if (parent->buffer != InvalidBuffer &&
970  !SpGistBlockIsRoot(parent->blkno) &&
971  (SpGistPageGetFreeSpace(parent->page, 1) >=
972  innerTuple->size + sizeof(ItemIdData)))
973  {
974  /* New inner tuple will fit on parent page */
975  newInnerBuffer = parent->buffer;
976  }
977  else if (parent->buffer != InvalidBuffer)
978  {
979  /* Send tuple to page with next triple parity (see README) */
980  newInnerBuffer = SpGistGetBuffer(index,
981  GBUF_INNER_PARITY(parent->blkno + 1) |
982  (isNulls ? GBUF_NULLS : 0),
983  innerTuple->size + sizeof(ItemIdData),
984  &xlrec.initInner);
985  }
986  else
987  {
988  /* Root page split ... inner tuple will go to root page */
989  newInnerBuffer = InvalidBuffer;
990  }
991 
992  /*
993  * The new leaf tuples converted from the existing ones should require the
994  * same or less space, and therefore should all fit onto one page
995  * (although that's not necessarily the current page, since we can't
996  * delete the old tuples but only replace them with placeholders).
997  * However, the incoming new tuple might not also fit, in which case we
998  * might need another picksplit cycle to reduce it some more.
999  *
1000  * If there's not room to put everything back onto the current page, then
1001  * we decide on a per-node basis which tuples go to the new page. (We do
1002  * it like that because leaf tuple chains can't cross pages, so we must
1003  * place all leaf tuples belonging to the same parent node on the same
1004  * page.)
1005  *
1006  * If we are splitting the root page (turning it from a leaf page into an
1007  * inner page), then no leaf tuples can go back to the current page; they
1008  * must all go somewhere else.
1009  */
1010  if (!SpGistBlockIsRoot(current->blkno))
1011  currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
1012  else
1013  currentFreeSpace = 0; /* prevent assigning any tuples to current */
1014 
1015  xlrec.initDest = false;
1016 
1017  if (totalLeafSizes <= currentFreeSpace)
1018  {
1019  /* All the leaf tuples will fit on current page */
1020  newLeafBuffer = InvalidBuffer;
1021  /* mark new leaf tuple as included in insertions, if allowed */
1022  if (includeNew)
1023  {
1024  nToInsert++;
1025  insertedNew = true;
1026  }
1027  for (i = 0; i < nToInsert; i++)
1028  leafPageSelect[i] = 0; /* signifies current page */
1029  }
1030  else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1031  {
1032  /*
1033  * We're trying to split up a long value by repeated suffixing, but
1034  * it's not going to fit yet. Don't bother allocating a second leaf
1035  * buffer that we won't be able to use.
1036  */
1037  newLeafBuffer = InvalidBuffer;
1038  Assert(includeNew);
1039  Assert(nToInsert == 0);
1040  }
1041  else
1042  {
1043  /* We will need another leaf page */
1044  uint8 *nodePageSelect;
1045  int curspace;
1046  int newspace;
1047 
1048  newLeafBuffer = SpGistGetBuffer(index,
1049  GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1050  Min(totalLeafSizes,
1052  &xlrec.initDest);
1053 
1054  /*
1055  * Attempt to assign node groups to the two pages. We might fail to
1056  * do so, even if totalLeafSizes is less than the available space,
1057  * because we can't split a group across pages.
1058  */
1059  nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1060 
1061  curspace = currentFreeSpace;
1062  newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1063  for (i = 0; i < out.nNodes; i++)
1064  {
1065  if (leafSizes[i] <= curspace)
1066  {
1067  nodePageSelect[i] = 0; /* signifies current page */
1068  curspace -= leafSizes[i];
1069  }
1070  else
1071  {
1072  nodePageSelect[i] = 1; /* signifies new leaf page */
1073  newspace -= leafSizes[i];
1074  }
1075  }
1076  if (curspace >= 0 && newspace >= 0)
1077  {
1078  /* Successful assignment, so we can include the new leaf tuple */
1079  if (includeNew)
1080  {
1081  nToInsert++;
1082  insertedNew = true;
1083  }
1084  }
1085  else if (includeNew)
1086  {
1087  /* We must exclude the new leaf tuple from the split */
1088  int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1089 
1090  leafSizes[nodeOfNewTuple] -=
1091  newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1092 
1093  /* Repeat the node assignment process --- should succeed now */
1094  curspace = currentFreeSpace;
1095  newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1096  for (i = 0; i < out.nNodes; i++)
1097  {
1098  if (leafSizes[i] <= curspace)
1099  {
1100  nodePageSelect[i] = 0; /* signifies current page */
1101  curspace -= leafSizes[i];
1102  }
1103  else
1104  {
1105  nodePageSelect[i] = 1; /* signifies new leaf page */
1106  newspace -= leafSizes[i];
1107  }
1108  }
1109  if (curspace < 0 || newspace < 0)
1110  elog(ERROR, "failed to divide leaf tuple groups across pages");
1111  }
1112  else
1113  {
1114  /* oops, we already excluded new tuple ... should not get here */
1115  elog(ERROR, "failed to divide leaf tuple groups across pages");
1116  }
1117  /* Expand the per-node assignments to be shown per leaf tuple */
1118  for (i = 0; i < nToInsert; i++)
1119  {
1120  n = out.mapTuplesToNodes[i];
1121  leafPageSelect[i] = nodePageSelect[n];
1122  }
1123  }
1124 
1125  /* Start preparing WAL record */
1126  xlrec.nDelete = 0;
1127  xlrec.initSrc = isNew;
1128  xlrec.storesNulls = isNulls;
1129  xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1130 
1131  leafdata = leafptr = (char *) palloc(totalLeafSizes);
1132 
1133  /* Here we begin making the changes to the target pages */
1135 
1136  /*
1137  * Delete old leaf tuples from current buffer, except when we're splitting
1138  * the root; in that case there's no need because we'll re-init the page
1139  * below. We do this first to make room for reinserting new leaf tuples.
1140  */
1141  if (!SpGistBlockIsRoot(current->blkno))
1142  {
1143  /*
1144  * Init buffer instead of deleting individual tuples, but only if
1145  * there aren't any other live tuples and only during build; otherwise
1146  * we need to set a redirection tuple for concurrent scans.
1147  */
1148  if (state->isBuild &&
1149  nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1150  PageGetMaxOffsetNumber(current->page))
1151  {
1152  SpGistInitBuffer(current->buffer,
1153  SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1154  xlrec.initSrc = true;
1155  }
1156  else if (isNew)
1157  {
1158  /* don't expose the freshly init'd buffer as a backup block */
1159  Assert(nToDelete == 0);
1160  }
1161  else
1162  {
1163  xlrec.nDelete = nToDelete;
1164 
1165  if (!state->isBuild)
1166  {
1167  /*
1168  * Need to create redirect tuple (it will point to new inner
1169  * tuple) but right now the new tuple's location is not known
1170  * yet. So, set the redirection pointer to "impossible" value
1171  * and remember its position to update tuple later.
1172  */
1173  if (nToDelete > 0)
1174  redirectTuplePos = toDelete[0];
1175  spgPageIndexMultiDelete(state, current->page,
1176  toDelete, nToDelete,
1181  }
1182  else
1183  {
1184  /*
1185  * During index build there is not concurrent searches, so we
1186  * don't need to create redirection tuple.
1187  */
1188  spgPageIndexMultiDelete(state, current->page,
1189  toDelete, nToDelete,
1194  }
1195  }
1196  }
1197 
1198  /*
1199  * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1200  * nodes.
1201  */
1202  startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1203  for (i = 0; i < nToInsert; i++)
1204  {
1205  SpGistLeafTuple it = newLeafs[i];
1206  Buffer leafBuffer;
1207  BlockNumber leafBlock;
1208  OffsetNumber newoffset;
1209 
1210  /* Which page is it going to? */
1211  leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1212  leafBlock = BufferGetBlockNumber(leafBuffer);
1213 
1214  /* Link tuple into correct chain for its node */
1215  n = out.mapTuplesToNodes[i];
1216 
1217  if (ItemPointerIsValid(&nodes[n]->t_tid))
1218  {
1219  Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1220  SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
1221  }
1222  else
1224 
1225  /* Insert it on page */
1226  newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1227  (Item) it, it->size,
1228  &startOffsets[leafPageSelect[i]],
1229  false);
1230  toInsert[i] = newoffset;
1231 
1232  /* ... and complete the chain linking */
1233  ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1234 
1235  /* Also copy leaf tuple into WAL data */
1236  memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1237  leafptr += newLeafs[i]->size;
1238  }
1239 
1240  /*
1241  * We're done modifying the other leaf buffer (if any), so mark it dirty.
1242  * current->buffer will be marked below, after we're entirely done
1243  * modifying it.
1244  */
1245  if (newLeafBuffer != InvalidBuffer)
1246  {
1247  MarkBufferDirty(newLeafBuffer);
1248  }
1249 
1250  /* Remember current buffer, since we're about to change "current" */
1251  saveCurrent = *current;
1252 
1253  /*
1254  * Store the new innerTuple
1255  */
1256  if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1257  {
1258  /*
1259  * new inner tuple goes to parent page
1260  */
1261  Assert(current->buffer != parent->buffer);
1262 
1263  /* Repoint "current" at the new inner tuple */
1264  current->blkno = parent->blkno;
1265  current->buffer = parent->buffer;
1266  current->page = parent->page;
1267  xlrec.offnumInner = current->offnum =
1268  SpGistPageAddNewItem(state, current->page,
1269  (Item) innerTuple, innerTuple->size,
1270  NULL, false);
1271 
1272  /*
1273  * Update parent node link and mark parent page dirty
1274  */
1275  xlrec.innerIsParent = true;
1276  xlrec.offnumParent = parent->offnum;
1277  xlrec.nodeI = parent->node;
1278  saveNodeLink(index, parent, current->blkno, current->offnum);
1279 
1280  /*
1281  * Update redirection link (in old current buffer)
1282  */
1283  if (redirectTuplePos != InvalidOffsetNumber)
1284  setRedirectionTuple(&saveCurrent, redirectTuplePos,
1285  current->blkno, current->offnum);
1286 
1287  /* Done modifying old current buffer, mark it dirty */
1288  MarkBufferDirty(saveCurrent.buffer);
1289  }
1290  else if (parent->buffer != InvalidBuffer)
1291  {
1292  /*
1293  * new inner tuple will be stored on a new page
1294  */
1295  Assert(newInnerBuffer != InvalidBuffer);
1296 
1297  /* Repoint "current" at the new inner tuple */
1298  current->buffer = newInnerBuffer;
1299  current->blkno = BufferGetBlockNumber(current->buffer);
1300  current->page = BufferGetPage(current->buffer);
1301  xlrec.offnumInner = current->offnum =
1302  SpGistPageAddNewItem(state, current->page,
1303  (Item) innerTuple, innerTuple->size,
1304  NULL, false);
1305 
1306  /* Done modifying new current buffer, mark it dirty */
1307  MarkBufferDirty(current->buffer);
1308 
1309  /*
1310  * Update parent node link and mark parent page dirty
1311  */
1312  xlrec.innerIsParent = (parent->buffer == current->buffer);
1313  xlrec.offnumParent = parent->offnum;
1314  xlrec.nodeI = parent->node;
1315  saveNodeLink(index, parent, current->blkno, current->offnum);
1316 
1317  /*
1318  * Update redirection link (in old current buffer)
1319  */
1320  if (redirectTuplePos != InvalidOffsetNumber)
1321  setRedirectionTuple(&saveCurrent, redirectTuplePos,
1322  current->blkno, current->offnum);
1323 
1324  /* Done modifying old current buffer, mark it dirty */
1325  MarkBufferDirty(saveCurrent.buffer);
1326  }
1327  else
1328  {
1329  /*
1330  * Splitting root page, which was a leaf but now becomes inner page
1331  * (and so "current" continues to point at it)
1332  */
1333  Assert(SpGistBlockIsRoot(current->blkno));
1334  Assert(redirectTuplePos == InvalidOffsetNumber);
1335 
1336  SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1337  xlrec.initInner = true;
1338  xlrec.innerIsParent = false;
1339 
1340  xlrec.offnumInner = current->offnum =
1341  PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1342  InvalidOffsetNumber, false, false);
1343  if (current->offnum != FirstOffsetNumber)
1344  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1345  innerTuple->size);
1346 
1347  /* No parent link to update, nor redirection to do */
1349  xlrec.nodeI = 0;
1350 
1351  /* Done modifying new current buffer, mark it dirty */
1352  MarkBufferDirty(current->buffer);
1353 
1354  /* saveCurrent doesn't represent a different buffer */
1355  saveCurrent.buffer = InvalidBuffer;
1356  }
1357 
1358  if (RelationNeedsWAL(index) && !state->isBuild)
1359  {
1360  XLogRecPtr recptr;
1361  int flags;
1362 
1363  XLogBeginInsert();
1364 
1365  xlrec.nInsert = nToInsert;
1366  XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
1367 
1368  XLogRegisterData((char *) toDelete,
1369  sizeof(OffsetNumber) * xlrec.nDelete);
1370  XLogRegisterData((char *) toInsert,
1371  sizeof(OffsetNumber) * xlrec.nInsert);
1372  XLogRegisterData((char *) leafPageSelect,
1373  sizeof(uint8) * xlrec.nInsert);
1374  XLogRegisterData((char *) innerTuple, innerTuple->size);
1375  XLogRegisterData(leafdata, leafptr - leafdata);
1376 
1377  /* Old leaf page */
1378  if (BufferIsValid(saveCurrent.buffer))
1379  {
1380  flags = REGBUF_STANDARD;
1381  if (xlrec.initSrc)
1382  flags |= REGBUF_WILL_INIT;
1383  XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1384  }
1385 
1386  /* New leaf page */
1387  if (BufferIsValid(newLeafBuffer))
1388  {
1389  flags = REGBUF_STANDARD;
1390  if (xlrec.initDest)
1391  flags |= REGBUF_WILL_INIT;
1392  XLogRegisterBuffer(1, newLeafBuffer, flags);
1393  }
1394 
1395  /* Inner page */
1396  flags = REGBUF_STANDARD;
1397  if (xlrec.initInner)
1398  flags |= REGBUF_WILL_INIT;
1399  XLogRegisterBuffer(2, current->buffer, flags);
1400 
1401  /* Parent page, if different from inner page */
1402  if (parent->buffer != InvalidBuffer)
1403  {
1404  if (parent->buffer != current->buffer)
1406  else
1407  Assert(xlrec.innerIsParent);
1408  }
1409 
1410  /* Issue the WAL record */
1411  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1412 
1413  /* Update page LSNs on all affected pages */
1414  if (newLeafBuffer != InvalidBuffer)
1415  {
1416  Page page = BufferGetPage(newLeafBuffer);
1417 
1418  PageSetLSN(page, recptr);
1419  }
1420 
1421  if (saveCurrent.buffer != InvalidBuffer)
1422  {
1423  Page page = BufferGetPage(saveCurrent.buffer);
1424 
1425  PageSetLSN(page, recptr);
1426  }
1427 
1428  PageSetLSN(current->page, recptr);
1429 
1430  if (parent->buffer != InvalidBuffer)
1431  {
1432  PageSetLSN(parent->page, recptr);
1433  }
1434  }
1435 
1436  END_CRIT_SECTION();
1437 
1438  /* Update local free-space cache and unlock buffers */
1439  if (newLeafBuffer != InvalidBuffer)
1440  {
1441  SpGistSetLastUsedPage(index, newLeafBuffer);
1442  UnlockReleaseBuffer(newLeafBuffer);
1443  }
1444  if (saveCurrent.buffer != InvalidBuffer)
1445  {
1446  SpGistSetLastUsedPage(index, saveCurrent.buffer);
1447  UnlockReleaseBuffer(saveCurrent.buffer);
1448  }
1449 
1450  return insertedNew;
1451 }
#define ItemPointerIsValid(pointer)
Definition: itemptr.h:82
#define SPGIST_DEAD
Definition: fmgr.h:56
uint16 nDelete
Definition: spgxlog.h:169
static void saveNodeLink(Relation index, SPPageDesc *parent, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:186
#define SpGistPageGetFreeSpace(p, n)
FmgrInfo * index_getprocinfo(Relation irel, AttrNumber attnum, uint16 procnum)
Definition: indexam.c:803
#define SGDTSIZE
#define SGLT_SET_NEXTOFFSET(spgLeafTuple, offsetNumber)
#define SPGIST_REDIRECT
Datum * leafTupleDatums
Definition: spgist.h:126
Datum * datums
Definition: spgist.h:113
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:1562
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:220
#define PointerGetDatum(X)
Definition: postgres.h:600
#define SGITITERATE(x, i, nt)
spgxlogState stateSrc
Definition: spgxlog.h:185
#define SizeOfSpgxlogPickSplit
Definition: spgxlog.h:199
#define SPGIST_PLACEHOLDER
#define Min(x, y)
Definition: c.h:986
#define END_CRIT_SECTION()
Definition: miscadmin.h:137
#define SPGIST_NULLS
void SpGistSetLastUsedPage(Relation index, Buffer buffer)
Definition: spgutils.c:641
unsigned char uint8
Definition: c.h:439
Pointer Item
Definition: item.h:17
#define InvalidBuffer
Definition: buf.h:25
#define REGBUF_WILL_INIT
Definition: xloginsert.h:33
#define START_CRIT_SECTION()
Definition: miscadmin.h:135
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:416
uint32 BlockNumber
Definition: block.h:31
uint16 nInsert
Definition: spgxlog.h:170
bool innerIsParent
Definition: spgxlog.h:181
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1148
#define PageGetMaxOffsetNumber(page)
Definition: bufpage.h:357
uint16 OffsetNumber
Definition: off.h:24
OffsetNumber offnumInner
Definition: spgxlog.h:175
#define XLOG_SPGIST_PICKSPLIT
Definition: spgxlog.h:26
unsigned int allTheSame
void spgPageIndexMultiDelete(SpGistState *state, Page page, OffsetNumber *itemnos, int nitems, int firststate, int reststate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:131
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3807
Oid * rd_indcollation
Definition: rel.h:200
#define ERROR
Definition: elog.h:46
Datum * nodeLabels
Definition: spgist.h:123
#define GBUF_INNER_PARITY(x)
struct SpGistLeafTupleData * SpGistLeafTuple
OffsetNumber offnumParent
Definition: spgxlog.h:182
uint16 nodeI
Definition: spgxlog.h:183
#define FirstOffsetNumber
Definition: off.h:27
#define REGBUF_STANDARD
Definition: xloginsert.h:35
SpGistInnerTuple spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix, int nNodes, SpGistNodeTuple *nodes)
Definition: spgutils.c:971
#define SPGIST_METAPAGE_BLKNO
OffsetNumber offnum
Definition: spgdoinsert.c:39
static void setRedirectionTuple(SPPageDesc *current, OffsetNumber position, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:567
struct ItemIdData ItemIdData
static bool checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig, bool *includeNew)
Definition: spgdoinsert.c:598
void SpGistInitBuffer(Buffer b, uint16 f)
Definition: spgutils.c:690
#define BufferGetPage(buffer)
Definition: bufmgr.h:169
#define GBUF_LEAF
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:235
#define spgKeyColumn
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:330
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:422
unsigned int tupstate
void * palloc0(Size size)
Definition: mcxt.c:1093
uintptr_t Datum
Definition: postgres.h:411
static char * label
#define SPGIST_PAGE_CAPACITY
#define STORE_STATE(s, d)
#define InvalidOffsetNumber
Definition: off.h:26
TupleDesc leafTupDesc
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
Buffer SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
Definition: spgutils.c:537
bool hasPrefix
Definition: spgist.h:119
#define SpGistBlockIsRoot(blkno)
#define INDEX_MAX_KEYS
#define SPGIST_LIVE
#define InvalidBlockNumber
Definition: block.h:33
#define SGLTDATUM(x, s)
#define BufferIsValid(bufnum)
Definition: bufmgr.h:123
#define GBUF_NULLS
#define ItemPointerGetOffsetNumber(pointer)
Definition: itemptr.h:117
#define RelationNeedsWAL(relation)
Definition: rel.h:570
Size PageGetExactFreeSpace(Page page)
Definition: bufpage.c:951
#define SpGistPageGetOpaque(page)
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:2758
void * palloc(Size size)
Definition: mcxt.c:1062
SpGistLeafTuple spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, Datum *datums, bool *isnulls)
Definition: spgutils.c:840
#define SGLT_GET_NEXTOFFSET(spgLeafTuple)
#define elog(elevel,...)
Definition: elog.h:232
int i
int * mapTuplesToNodes
Definition: spgist.h:125
void spgDeformLeafTuple(SpGistLeafTuple tup, TupleDesc tupleDescriptor, Datum *datums, bool *isnulls, bool keyColumnIsNull)
Definition: spgutils.c:1085
bool storesNulls
Definition: spgxlog.h:178
Buffer buffer
Definition: spgdoinsert.c:37
Datum prefixDatum
Definition: spgist.h:120
#define ItemPointerGetBlockNumber(pointer)
Definition: itemptr.h:98
void XLogBeginInsert(void)
Definition: xloginsert.c:123
#define SPGIST_LEAF
#define PageSetLSN(page, lsn)
Definition: bufpage.h:368
#define SPGIST_PICKSPLIT_PROC
Definition: spgist.h:25
int Buffer
Definition: buf.h:23
OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size, OffsetNumber *startOffset, bool errorOK)
Definition: spgutils.c:1173
SpGistNodeTuple spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
Definition: spgutils.c:929
bool isRootSplit
Definition: spgxlog.h:167
#define PageGetItem(page, itemId)
Definition: bufpage.h:340
Pointer Page
Definition: bufpage.h:78
#define ItemPointerSet(pointer, blockNumber, offNum)
Definition: itemptr.h:127
BlockNumber blkno
Definition: spgdoinsert.c:36

◆ moveLeafs()

static void moveLeafs ( Relation  index,
SpGistState state,
SPPageDesc current,
SPPageDesc parent,
SpGistLeafTuple  newLeafTuple,
bool  isNulls 
)
static

Definition at line 387 of file spgdoinsert.c.

References Assert, SPPageDesc::blkno, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage, elog, END_CRIT_SECTION, ERROR, FirstOffsetNumber, GBUF_LEAF, GBUF_NULLS, i, InvalidBuffer, InvalidOffsetNumber, SpGistState::isBuild, MarkBufferDirty(), spgxlogMoveLeafs::newPage, spgxlogMoveLeafs::nMoves, SPPageDesc::node, spgxlogMoveLeafs::nodeI, SPPageDesc::offnum, spgxlogMoveLeafs::offnumParent, SPPageDesc::page, PageGetItem, PageGetItemId, PageGetMaxOffsetNumber, PageSetLSN, palloc(), REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, spgxlogMoveLeafs::replaceDead, saveNodeLink(), SGLT_GET_NEXTOFFSET, SGLT_SET_NEXTOFFSET, SpGistLeafTupleData::size, SizeOfSpgxlogMoveLeafs, SPGIST_DEAD, SPGIST_LIVE, SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistGetBuffer(), SpGistPageAddNewItem(), SpGistSetLastUsedPage(), spgPageIndexMultiDelete(), START_CRIT_SECTION, spgxlogMoveLeafs::stateSrc, STORE_STATE, spgxlogMoveLeafs::storesNulls, SpGistLeafTupleData::tupstate, UnlockReleaseBuffer(), XLOG_SPGIST_MOVE_LEAFS, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

390 {
391  int i,
392  nDelete,
393  nInsert,
394  size;
395  Buffer nbuf;
396  Page npage;
397  SpGistLeafTuple it;
399  startOffset = InvalidOffsetNumber;
400  bool replaceDead = false;
401  OffsetNumber *toDelete;
402  OffsetNumber *toInsert;
403  BlockNumber nblkno;
404  spgxlogMoveLeafs xlrec;
405  char *leafdata,
406  *leafptr;
407 
408  /* This doesn't work on root page */
409  Assert(parent->buffer != InvalidBuffer);
410  Assert(parent->buffer != current->buffer);
411 
412  /* Locate the tuples to be moved, and count up the space needed */
413  i = PageGetMaxOffsetNumber(current->page);
414  toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
415  toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
416 
417  size = newLeafTuple->size + sizeof(ItemIdData);
418 
419  nDelete = 0;
420  i = current->offnum;
421  while (i != InvalidOffsetNumber)
422  {
423  SpGistLeafTuple it;
424 
425  Assert(i >= FirstOffsetNumber &&
426  i <= PageGetMaxOffsetNumber(current->page));
427  it = (SpGistLeafTuple) PageGetItem(current->page,
428  PageGetItemId(current->page, i));
429 
430  if (it->tupstate == SPGIST_LIVE)
431  {
432  toDelete[nDelete] = i;
433  size += it->size + sizeof(ItemIdData);
434  nDelete++;
435  }
436  else if (it->tupstate == SPGIST_DEAD)
437  {
438  /* We could see a DEAD tuple as first/only chain item */
439  Assert(i == current->offnum);
441  /* We don't want to move it, so don't count it in size */
442  toDelete[nDelete] = i;
443  nDelete++;
444  replaceDead = true;
445  }
446  else
447  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
448 
449  i = SGLT_GET_NEXTOFFSET(it);
450  }
451 
452  /* Find a leaf page that will hold them */
453  nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
454  size, &xlrec.newPage);
455  npage = BufferGetPage(nbuf);
456  nblkno = BufferGetBlockNumber(nbuf);
457  Assert(nblkno != current->blkno);
458 
459  leafdata = leafptr = palloc(size);
460 
462 
463  /* copy all the old tuples to new page, unless they're dead */
464  nInsert = 0;
465  if (!replaceDead)
466  {
467  for (i = 0; i < nDelete; i++)
468  {
469  it = (SpGistLeafTuple) PageGetItem(current->page,
470  PageGetItemId(current->page, toDelete[i]));
471  Assert(it->tupstate == SPGIST_LIVE);
472 
473  /*
474  * Update chain link (notice the chain order gets reversed, but we
475  * don't care). We're modifying the tuple on the source page
476  * here, but it's okay since we're about to delete it.
477  */
478  SGLT_SET_NEXTOFFSET(it, r);
479 
480  r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
481  &startOffset, false);
482 
483  toInsert[nInsert] = r;
484  nInsert++;
485 
486  /* save modified tuple into leafdata as well */
487  memcpy(leafptr, it, it->size);
488  leafptr += it->size;
489  }
490  }
491 
492  /* add the new tuple as well */
493  SGLT_SET_NEXTOFFSET(newLeafTuple, r);
494  r = SpGistPageAddNewItem(state, npage,
495  (Item) newLeafTuple, newLeafTuple->size,
496  &startOffset, false);
497  toInsert[nInsert] = r;
498  nInsert++;
499  memcpy(leafptr, newLeafTuple, newLeafTuple->size);
500  leafptr += newLeafTuple->size;
501 
502  /*
503  * Now delete the old tuples, leaving a redirection pointer behind for the
504  * first one, unless we're doing an index build; in which case there can't
505  * be any concurrent scan so we need not provide a redirect.
506  */
507  spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
510  nblkno, r);
511 
512  /* Update parent's downlink and mark parent page dirty */
513  saveNodeLink(index, parent, nblkno, r);
514 
515  /* Mark the leaf pages too */
516  MarkBufferDirty(current->buffer);
517  MarkBufferDirty(nbuf);
518 
519  if (RelationNeedsWAL(index) && !state->isBuild)
520  {
521  XLogRecPtr recptr;
522 
523  /* prepare WAL info */
524  STORE_STATE(state, xlrec.stateSrc);
525 
526  xlrec.nMoves = nDelete;
527  xlrec.replaceDead = replaceDead;
528  xlrec.storesNulls = isNulls;
529 
530  xlrec.offnumParent = parent->offnum;
531  xlrec.nodeI = parent->node;
532 
533  XLogBeginInsert();
534  XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
535  XLogRegisterData((char *) toDelete,
536  sizeof(OffsetNumber) * nDelete);
537  XLogRegisterData((char *) toInsert,
538  sizeof(OffsetNumber) * nInsert);
539  XLogRegisterData((char *) leafdata, leafptr - leafdata);
540 
544 
545  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
546 
547  PageSetLSN(current->page, recptr);
548  PageSetLSN(npage, recptr);
549  PageSetLSN(parent->page, recptr);
550  }
551 
553 
554  /* Update local free-space cache and release new buffer */
555  SpGistSetLastUsedPage(index, nbuf);
556  UnlockReleaseBuffer(nbuf);
557 }
#define XLOG_SPGIST_MOVE_LEAFS
Definition: spgxlog.h:23
#define SPGIST_DEAD
static void saveNodeLink(Relation index, SPPageDesc *parent, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:186
#define SGLT_SET_NEXTOFFSET(spgLeafTuple, offsetNumber)
#define SPGIST_REDIRECT
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:1562
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:220
uint16 nodeI
Definition: spgxlog.h:73
#define SizeOfSpgxlogMoveLeafs
Definition: spgxlog.h:91
#define SPGIST_PLACEHOLDER
#define END_CRIT_SECTION()
Definition: miscadmin.h:137
void SpGistSetLastUsedPage(Relation index, Buffer buffer)
Definition: spgutils.c:641
Pointer Item
Definition: item.h:17
#define InvalidBuffer
Definition: buf.h:25
#define REGBUF_WILL_INIT
Definition: xloginsert.h:33
#define START_CRIT_SECTION()
Definition: miscadmin.h:135
uint32 BlockNumber
Definition: block.h:31
#define PageGetMaxOffsetNumber(page)
Definition: bufpage.h:357
uint16 OffsetNumber
Definition: off.h:24
void spgPageIndexMultiDelete(SpGistState *state, Page page, OffsetNumber *itemnos, int nitems, int firststate, int reststate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:131
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3807
#define ERROR
Definition: elog.h:46
spgxlogState stateSrc
Definition: spgxlog.h:75
struct SpGistLeafTupleData * SpGistLeafTuple
#define FirstOffsetNumber
Definition: off.h:27
#define REGBUF_STANDARD
Definition: xloginsert.h:35
OffsetNumber offnum
Definition: spgdoinsert.c:39
struct ItemIdData ItemIdData
#define BufferGetPage(buffer)
Definition: bufmgr.h:169
bool replaceDead
Definition: spgxlog.h:68
#define GBUF_LEAF
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:235
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:330
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:422
unsigned int tupstate
#define STORE_STATE(s, d)
#define InvalidOffsetNumber
Definition: off.h:26
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
Buffer SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
Definition: spgutils.c:537
#define SPGIST_LIVE
#define GBUF_NULLS
#define RelationNeedsWAL(relation)
Definition: rel.h:570
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:2758
void * palloc(Size size)
Definition: mcxt.c:1062
#define SGLT_GET_NEXTOFFSET(spgLeafTuple)
#define elog(elevel,...)
Definition: elog.h:232
int i
OffsetNumber offnumParent
Definition: spgxlog.h:72
bool storesNulls
Definition: spgxlog.h:69
Buffer buffer
Definition: spgdoinsert.c:37
uint16 nMoves
Definition: spgxlog.h:66
void XLogBeginInsert(void)
Definition: xloginsert.c:123
#define PageSetLSN(page, lsn)
Definition: bufpage.h:368
int Buffer
Definition: buf.h:23
OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size, OffsetNumber *startOffset, bool errorOK)
Definition: spgutils.c:1173
#define PageGetItem(page, itemId)
Definition: bufpage.h:340
Pointer Page
Definition: bufpage.h:78
BlockNumber blkno
Definition: spgdoinsert.c:36

◆ saveNodeLink()

static void saveNodeLink ( Relation  index,
SPPageDesc parent,
BlockNumber  blkno,
OffsetNumber  offnum 
)
static

Definition at line 186 of file spgdoinsert.c.

References SPPageDesc::buffer, MarkBufferDirty(), SPPageDesc::node, SPPageDesc::offnum, SPPageDesc::page, PageGetItem, PageGetItemId, and spgUpdateNodeLink().

Referenced by addLeafTuple(), doPickSplit(), moveLeafs(), and spgAddNodeAction().

188 {
189  SpGistInnerTuple innerTuple;
190 
191  innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
192  PageGetItemId(parent->page, parent->offnum));
193 
194  spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
195 
196  MarkBufferDirty(parent->buffer);
197 }
void spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN, BlockNumber blkno, OffsetNumber offset)
Definition: spgdoinsert.c:50
SpGistInnerTupleData * SpGistInnerTuple
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:1562
OffsetNumber offnum
Definition: spgdoinsert.c:39
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:235
Buffer buffer
Definition: spgdoinsert.c:37
#define PageGetItem(page, itemId)
Definition: bufpage.h:340

◆ setRedirectionTuple()

static void setRedirectionTuple ( SPPageDesc current,
OffsetNumber  position,
BlockNumber  blkno,
OffsetNumber  offnum 
)
static

Definition at line 567 of file spgdoinsert.c.

References Assert, ItemPointerGetBlockNumber, ItemPointerSet, SPPageDesc::page, PageGetItem, PageGetItemId, SpGistDeadTupleData::pointer, SPGIST_METAPAGE_BLKNO, SPGIST_REDIRECT, and SpGistDeadTupleData::tupstate.

Referenced by doPickSplit().

569 {
570  SpGistDeadTuple dt;
571 
572  dt = (SpGistDeadTuple) PageGetItem(current->page,
573  PageGetItemId(current->page, position));
576  ItemPointerSet(&dt->pointer, blkno, offnum);
577 }
#define SPGIST_REDIRECT
ItemPointerData pointer
#define SPGIST_METAPAGE_BLKNO
SpGistDeadTupleData * SpGistDeadTuple
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:235
unsigned int tupstate
#define Assert(condition)
Definition: c.h:804
#define ItemPointerGetBlockNumber(pointer)
Definition: itemptr.h:98
#define PageGetItem(page, itemId)
Definition: bufpage.h:340
#define ItemPointerSet(pointer, blockNumber, offNum)
Definition: itemptr.h:127

◆ spgAddNodeAction()

static void spgAddNodeAction ( Relation  index,
SpGistState state,
SpGistInnerTuple  innerTuple,
SPPageDesc current,
SPPageDesc parent,
int  nodeN,
Datum  nodeLabel 
)
static

Definition at line 1511 of file spgdoinsert.c.

References addNode(), Assert, SPPageDesc::blkno, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage, elog, END_CRIT_SECTION, ERROR, GBUF_INNER_PARITY, InvalidBlockNumber, InvalidBuffer, InvalidOffsetNumber, SpGistState::isBuild, MarkBufferDirty(), spgxlogAddNode::newPage, SPPageDesc::node, spgxlogAddNode::nodeI, SPPageDesc::offnum, spgxlogAddNode::offnum, spgxlogAddNode::offnumNew, spgxlogAddNode::offnumParent, SPPageDesc::page, PageAddItem, PageGetExactFreeSpace(), PageIndexTupleDelete(), PageSetLSN, spgxlogAddNode::parentBlk, REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, saveNodeLink(), SpGistInnerTupleData::size, SpGistDeadTupleData::size, spgFormDeadTuple(), SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistBlockIsRoot, SpGistGetBuffer(), SpGistPageAddNewItem(), SpGistPageGetOpaque, SpGistPageStoresNulls, SpGistSetLastUsedPage(), START_CRIT_SECTION, spgxlogAddNode::stateSrc, STORE_STATE, UnlockReleaseBuffer(), XLOG_SPGIST_ADD_NODE, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

1515 {
1516  SpGistInnerTuple newInnerTuple;
1517  spgxlogAddNode xlrec;
1518 
1519  /* Should not be applied to nulls */
1520  Assert(!SpGistPageStoresNulls(current->page));
1521 
1522  /* Construct new inner tuple with additional node */
1523  newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1524 
1525  /* Prepare WAL record */
1526  STORE_STATE(state, xlrec.stateSrc);
1527  xlrec.offnum = current->offnum;
1528 
1529  /* we don't fill these unless we need to change the parent downlink */
1530  xlrec.parentBlk = -1;
1532  xlrec.nodeI = 0;
1533 
1534  /* we don't fill these unless tuple has to be moved */
1536  xlrec.newPage = false;
1537 
1538  if (PageGetExactFreeSpace(current->page) >=
1539  newInnerTuple->size - innerTuple->size)
1540  {
1541  /*
1542  * We can replace the inner tuple by new version in-place
1543  */
1545 
1546  PageIndexTupleDelete(current->page, current->offnum);
1547  if (PageAddItem(current->page,
1548  (Item) newInnerTuple, newInnerTuple->size,
1549  current->offnum, false, false) != current->offnum)
1550  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1551  newInnerTuple->size);
1552 
1553  MarkBufferDirty(current->buffer);
1554 
1555  if (RelationNeedsWAL(index) && !state->isBuild)
1556  {
1557  XLogRecPtr recptr;
1558 
1559  XLogBeginInsert();
1560  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1561  XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1562 
1564 
1565  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1566 
1567  PageSetLSN(current->page, recptr);
1568  }
1569 
1570  END_CRIT_SECTION();
1571  }
1572  else
1573  {
1574  /*
1575  * move inner tuple to another page, and update parent
1576  */
1577  SpGistDeadTuple dt;
1578  SPPageDesc saveCurrent;
1579 
1580  /*
1581  * It should not be possible to get here for the root page, since we
1582  * allow only one inner tuple on the root page, and spgFormInnerTuple
1583  * always checks that inner tuples don't exceed the size of a page.
1584  */
1585  if (SpGistBlockIsRoot(current->blkno))
1586  elog(ERROR, "cannot enlarge root tuple any more");
1587  Assert(parent->buffer != InvalidBuffer);
1588 
1589  saveCurrent = *current;
1590 
1591  xlrec.offnumParent = parent->offnum;
1592  xlrec.nodeI = parent->node;
1593 
1594  /*
1595  * obtain new buffer with the same parity as current, since it will be
1596  * a child of same parent tuple
1597  */
1598  current->buffer = SpGistGetBuffer(index,
1599  GBUF_INNER_PARITY(current->blkno),
1600  newInnerTuple->size + sizeof(ItemIdData),
1601  &xlrec.newPage);
1602  current->blkno = BufferGetBlockNumber(current->buffer);
1603  current->page = BufferGetPage(current->buffer);
1604 
1605  /*
1606  * Let's just make real sure new current isn't same as old. Right now
1607  * that's impossible, but if SpGistGetBuffer ever got smart enough to
1608  * delete placeholder tuples before checking space, maybe it wouldn't
1609  * be impossible. The case would appear to work except that WAL
1610  * replay would be subtly wrong, so I think a mere assert isn't enough
1611  * here.
1612  */
1613  if (current->blkno == saveCurrent.blkno)
1614  elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1615 
1616  /*
1617  * New current and parent buffer will both be modified; but note that
1618  * parent buffer could be same as either new or old current.
1619  */
1620  if (parent->buffer == saveCurrent.buffer)
1621  xlrec.parentBlk = 0;
1622  else if (parent->buffer == current->buffer)
1623  xlrec.parentBlk = 1;
1624  else
1625  xlrec.parentBlk = 2;
1626 
1628 
1629  /* insert new ... */
1630  xlrec.offnumNew = current->offnum =
1631  SpGistPageAddNewItem(state, current->page,
1632  (Item) newInnerTuple, newInnerTuple->size,
1633  NULL, false);
1634 
1635  MarkBufferDirty(current->buffer);
1636 
1637  /* update parent's downlink and mark parent page dirty */
1638  saveNodeLink(index, parent, current->blkno, current->offnum);
1639 
1640  /*
1641  * Replace old tuple with a placeholder or redirection tuple. Unless
1642  * doing an index build, we have to insert a redirection tuple for
1643  * possible concurrent scans. We can't just delete it in any case,
1644  * because that could change the offsets of other tuples on the page,
1645  * breaking downlinks from their parents.
1646  */
1647  if (state->isBuild)
1650  else
1651  dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1652  current->blkno, current->offnum);
1653 
1654  PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1655  if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1656  saveCurrent.offnum,
1657  false, false) != saveCurrent.offnum)
1658  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1659  dt->size);
1660 
1661  if (state->isBuild)
1662  SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1663  else
1664  SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1665 
1666  MarkBufferDirty(saveCurrent.buffer);
1667 
1668  if (RelationNeedsWAL(index) && !state->isBuild)
1669  {
1670  XLogRecPtr recptr;
1671  int flags;
1672 
1673  XLogBeginInsert();
1674 
1675  /* orig page */
1676  XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1677  /* new page */
1678  flags = REGBUF_STANDARD;
1679  if (xlrec.newPage)
1680  flags |= REGBUF_WILL_INIT;
1681  XLogRegisterBuffer(1, current->buffer, flags);
1682  /* parent page (if different from orig and new) */
1683  if (xlrec.parentBlk == 2)
1685 
1686  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1687  XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1688 
1689  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1690 
1691  /* we don't bother to check if any of these are redundant */
1692  PageSetLSN(current->page, recptr);
1693  PageSetLSN(parent->page, recptr);
1694  PageSetLSN(saveCurrent.page, recptr);
1695  }
1696 
1697  END_CRIT_SECTION();
1698 
1699  /* Release saveCurrent if it's not same as current or parent */
1700  if (saveCurrent.buffer != current->buffer &&
1701  saveCurrent.buffer != parent->buffer)
1702  {
1703  SpGistSetLastUsedPage(index, saveCurrent.buffer);
1704  UnlockReleaseBuffer(saveCurrent.buffer);
1705  }
1706  }
1707 }
static void saveNodeLink(Relation index, SPPageDesc *parent, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:186
SpGistDeadTuple spgFormDeadTuple(SpGistState *state, int tupstate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgutils.c:1054
#define SPGIST_REDIRECT
void PageIndexTupleDelete(Page page, OffsetNumber offnum)
Definition: bufpage.c:1045
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:1562
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:220
#define SPGIST_PLACEHOLDER
#define END_CRIT_SECTION()
Definition: miscadmin.h:137
void SpGistSetLastUsedPage(Relation index, Buffer buffer)
Definition: spgutils.c:641
Pointer Item
Definition: item.h:17
#define InvalidBuffer
Definition: buf.h:25
#define REGBUF_WILL_INIT
Definition: xloginsert.h:33
#define START_CRIT_SECTION()
Definition: miscadmin.h:135
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:416
#define XLOG_SPGIST_ADD_NODE
Definition: spgxlog.h:24
OffsetNumber offnum
Definition: spgxlog.h:105
static SpGistInnerTuple addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
Definition: spgdoinsert.c:78
OffsetNumber offnumNew
Definition: spgxlog.h:111
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3807
#define ERROR
Definition: elog.h:46
#define GBUF_INNER_PARITY(x)
#define REGBUF_STANDARD
Definition: xloginsert.h:35
spgxlogState stateSrc
Definition: spgxlog.h:130
OffsetNumber offnum
Definition: spgdoinsert.c:39
#define BufferGetPage(buffer)
Definition: bufmgr.h:169
int8 parentBlk
Definition: spgxlog.h:125
uint16 nodeI
Definition: spgxlog.h:128
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:330
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:422
bool newPage
Definition: spgxlog.h:112
#define STORE_STATE(s, d)
#define InvalidOffsetNumber
Definition: off.h:26
#define SpGistPageStoresNulls(page)
OffsetNumber offnumParent
Definition: spgxlog.h:126
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
Buffer SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
Definition: spgutils.c:537
#define SpGistBlockIsRoot(blkno)
#define InvalidBlockNumber
Definition: block.h:33
#define RelationNeedsWAL(relation)
Definition: rel.h:570
Size PageGetExactFreeSpace(Page page)
Definition: bufpage.c:951
#define SpGistPageGetOpaque(page)
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:2758
#define elog(elevel,...)
Definition: elog.h:232
Buffer buffer
Definition: spgdoinsert.c:37
void XLogBeginInsert(void)
Definition: xloginsert.c:123
#define PageSetLSN(page, lsn)
Definition: bufpage.h:368
OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size, OffsetNumber *startOffset, bool errorOK)
Definition: spgutils.c:1173
BlockNumber blkno
Definition: spgdoinsert.c:36

◆ spgdoinsert()

bool spgdoinsert ( Relation  index,
SpGistState state,
ItemPointer  heapPtr,
Datum datums,
bool isnulls 
)

Definition at line 1912 of file spgdoinsert.c.

References addLeafTuple(), spgChooseOut::addNode, spgChooseIn::allTheSame, SpGistInnerTupleData::allTheSame, Assert, SpGistState::attLeafType, SpGistTypeDesc::attlen, SpGistState::attType, SPPageDesc::blkno, SPPageDesc::buffer, BUFFER_LOCK_EXCLUSIVE, BufferGetBlockNumber(), BufferGetPage, CHECK_FOR_INTERRUPTS, checkSplitConditions(), ConditionalLockBuffer(), SpGistState::config, spgChooseIn::datum, doPickSplit(), elog, ereport, errcode(), errhint(), errmsg(), ERROR, FirstOffsetNumber, FunctionCall1Coll(), FunctionCall2Coll(), GBUF_LEAF, GBUF_NULLS, spgChooseIn::hasPrefix, i, index_getprocid(), index_getprocinfo(), INDEX_MAX_KEYS, InvalidBlockNumber, InvalidBuffer, InvalidOffsetNumber, spgChooseIn::leafDatum, SpGistState::leafTupDesc, spgChooseIn::level, LockBuffer(), spgConfigOut::longValuesOK, spgChooseOut::matchNode, Min, moveLeafs(), TupleDescData::natts, spgChooseIn::nNodes, SpGistInnerTupleData::nNodes, SPPageDesc::node, spgChooseIn::nodeLabels, SPPageDesc::offnum, OidIsValid, SPPageDesc::page, PageGetItem, PageGetItemId, pfree(), PG_DETOAST_DATUM, PointerGetDatum, spgChooseIn::prefixDatum, SpGistInnerTupleData::prefixSize, random(), RelationData::rd_indcollation, ReadBuffer(), RelationGetRelationName, ReleaseBuffer(), spgChooseOut::result, spgChooseOut::resultType, SGITDATUM, SpGistLeafTupleData::size, spgAddNode, spgAddNodeAction(), spgExtractNodeLabels(), spgFirstIncludeColumn, spgFormLeafTuple(), SPGIST_CHOOSE_PROC, SPGIST_COMPRESS_PROC, SPGIST_NULL_BLKNO, SPGIST_PAGE_CAPACITY, SPGIST_ROOT_BLKNO, SpGistGetBuffer(), SpGistGetLeafTupleSize(), SpGistPageGetFreeSpace, SpGistPageIsLeaf, SpGistPageStoresNulls, SpGistSetLastUsedPage(), spgKeyColumn, spgMatchNode, spgMatchNodeAction(), spgSplitNodeAction(), spgSplitTuple, TupleDescAttr, SpGistTypeDesc::type, and UnlockReleaseBuffer().

Referenced by spginsert(), and spgistBuildCallback().

1914 {
1915  TupleDesc leafDescriptor = state->leafTupDesc;
1916  bool isnull = isnulls[spgKeyColumn];
1917  int level = 0;
1918  Datum leafDatums[INDEX_MAX_KEYS];
1919  int leafSize;
1920  SPPageDesc current,
1921  parent;
1922  FmgrInfo *procinfo = NULL;
1923 
1924  /*
1925  * Look up FmgrInfo of the user-defined choose function once, to save
1926  * cycles in the loop below.
1927  */
1928  if (!isnull)
1929  procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1930 
1931  /*
1932  * Prepare the leaf datum to insert.
1933  *
1934  * If an optional "compress" method is provided, then call it to form the
1935  * leaf key datum from the input datum. Otherwise, store the input datum
1936  * as is. Since we don't use index_form_tuple in this AM, we have to make
1937  * sure value to be inserted is not toasted; FormIndexDatum doesn't
1938  * guarantee that. But we assume the "compress" method to return an
1939  * untoasted value.
1940  */
1941  if (!isnull)
1942  {
1944  {
1945  FmgrInfo *compressProcinfo = NULL;
1946 
1947  compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
1948  leafDatums[spgKeyColumn] =
1949  FunctionCall1Coll(compressProcinfo,
1950  index->rd_indcollation[spgKeyColumn],
1951  datums[spgKeyColumn]);
1952  }
1953  else
1954  {
1955  Assert(state->attLeafType.type == state->attType.type);
1956 
1957  if (state->attType.attlen == -1)
1958  leafDatums[spgKeyColumn] =
1960  else
1961  leafDatums[spgKeyColumn] = datums[spgKeyColumn];
1962  }
1963  }
1964  else
1965  leafDatums[spgKeyColumn] = (Datum) 0;
1966 
1967  /* Likewise, ensure that any INCLUDE values are not toasted */
1968  for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++)
1969  {
1970  if (!isnulls[i])
1971  {
1972  if (TupleDescAttr(leafDescriptor, i)->attlen == -1)
1973  leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i]));
1974  else
1975  leafDatums[i] = datums[i];
1976  }
1977  else
1978  leafDatums[i] = (Datum) 0;
1979  }
1980 
1981  /*
1982  * Compute space needed for a leaf tuple containing the given data.
1983  */
1984  leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls);
1985  /* Account for an item pointer, too */
1986  leafSize += sizeof(ItemIdData);
1987 
1988  /*
1989  * If it isn't gonna fit, and the opclass can't reduce the datum size by
1990  * suffixing, bail out now rather than getting into an endless loop.
1991  */
1992  if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
1993  ereport(ERROR,
1994  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1995  errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1996  leafSize - sizeof(ItemIdData),
1997  SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
1998  RelationGetRelationName(index)),
1999  errhint("Values larger than a buffer page cannot be indexed.")));
2000 
2001  /* Initialize "current" to the appropriate root page */
2002  current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
2003  current.buffer = InvalidBuffer;
2004  current.page = NULL;
2005  current.offnum = FirstOffsetNumber;
2006  current.node = -1;
2007 
2008  /* "parent" is invalid for the moment */
2009  parent.blkno = InvalidBlockNumber;
2010  parent.buffer = InvalidBuffer;
2011  parent.page = NULL;
2012  parent.offnum = InvalidOffsetNumber;
2013  parent.node = -1;
2014 
2015  for (;;)
2016  {
2017  bool isNew = false;
2018 
2019  /*
2020  * Bail out if query cancel is pending. We must have this somewhere
2021  * in the loop since a broken opclass could produce an infinite
2022  * picksplit loop.
2023  */
2025 
2026  if (current.blkno == InvalidBlockNumber)
2027  {
2028  /*
2029  * Create a leaf page. If leafSize is too large to fit on a page,
2030  * we won't actually use the page yet, but it simplifies the API
2031  * for doPickSplit to always have a leaf page at hand; so just
2032  * quietly limit our request to a page size.
2033  */
2034  current.buffer =
2035  SpGistGetBuffer(index,
2036  GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
2037  Min(leafSize, SPGIST_PAGE_CAPACITY),
2038  &isNew);
2039  current.blkno = BufferGetBlockNumber(current.buffer);
2040  }
2041  else if (parent.buffer == InvalidBuffer)
2042  {
2043  /* we hold no parent-page lock, so no deadlock is possible */
2044  current.buffer = ReadBuffer(index, current.blkno);
2046  }
2047  else if (current.blkno != parent.blkno)
2048  {
2049  /* descend to a new child page */
2050  current.buffer = ReadBuffer(index, current.blkno);
2051 
2052  /*
2053  * Attempt to acquire lock on child page. We must beware of
2054  * deadlock against another insertion process descending from that
2055  * page to our parent page (see README). If we fail to get lock,
2056  * abandon the insertion and tell our caller to start over.
2057  *
2058  * XXX this could be improved, because failing to get lock on a
2059  * buffer is not proof of a deadlock situation; the lock might be
2060  * held by a reader, or even just background writer/checkpointer
2061  * process. Perhaps it'd be worth retrying after sleeping a bit?
2062  */
2063  if (!ConditionalLockBuffer(current.buffer))
2064  {
2065  ReleaseBuffer(current.buffer);
2066  UnlockReleaseBuffer(parent.buffer);
2067  return false;
2068  }
2069  }
2070  else
2071  {
2072  /* inner tuple can be stored on the same page as parent one */
2073  current.buffer = parent.buffer;
2074  }
2075  current.page = BufferGetPage(current.buffer);
2076 
2077  /* should not arrive at a page of the wrong type */
2078  if (isnull ? !SpGistPageStoresNulls(current.page) :
2079  SpGistPageStoresNulls(current.page))
2080  elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2081  current.blkno);
2082 
2083  if (SpGistPageIsLeaf(current.page))
2084  {
2085  SpGistLeafTuple leafTuple;
2086  int nToSplit,
2087  sizeToSplit;
2088 
2089  leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls);
2090  if (leafTuple->size + sizeof(ItemIdData) <=
2091  SpGistPageGetFreeSpace(current.page, 1))
2092  {
2093  /* it fits on page, so insert it and we're done */
2094  addLeafTuple(index, state, leafTuple,
2095  &current, &parent, isnull, isNew);
2096  break;
2097  }
2098  else if ((sizeToSplit =
2099  checkSplitConditions(index, state, &current,
2100  &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
2101  nToSplit < 64 &&
2102  leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2103  {
2104  /*
2105  * the amount of data is pretty small, so just move the whole
2106  * chain to another leaf page rather than splitting it.
2107  */
2108  Assert(!isNew);
2109  moveLeafs(index, state, &current, &parent, leafTuple, isnull);
2110  break; /* we're done */
2111  }
2112  else
2113  {
2114  /* picksplit */
2115  if (doPickSplit(index, state, &current, &parent,
2116  leafTuple, level, isnull, isNew))
2117  break; /* doPickSplit installed new tuples */
2118 
2119  /* leaf tuple will not be inserted yet */
2120  pfree(leafTuple);
2121 
2122  /*
2123  * current now describes new inner tuple, go insert into it
2124  */
2125  Assert(!SpGistPageIsLeaf(current.page));
2126  goto process_inner_tuple;
2127  }
2128  }
2129  else /* non-leaf page */
2130  {
2131  /*
2132  * Apply the opclass choose function to figure out how to insert
2133  * the given datum into the current inner tuple.
2134  */
2135  SpGistInnerTuple innerTuple;
2136  spgChooseIn in;
2137  spgChooseOut out;
2138 
2139  /*
2140  * spgAddNode and spgSplitTuple cases will loop back to here to
2141  * complete the insertion operation. Just in case the choose
2142  * function is broken and produces add or split requests
2143  * repeatedly, check for query cancel.
2144  */
2145  process_inner_tuple:
2147 
2148  innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2149  PageGetItemId(current.page, current.offnum));
2150 
2151  in.datum = datums[spgKeyColumn];
2152  in.leafDatum = leafDatums[spgKeyColumn];
2153  in.level = level;
2154  in.allTheSame = innerTuple->allTheSame;
2155  in.hasPrefix = (innerTuple->prefixSize > 0);
2156  in.prefixDatum = SGITDATUM(innerTuple, state);
2157  in.nNodes = innerTuple->nNodes;
2158  in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2159 
2160  memset(&out, 0, sizeof(out));
2161 
2162  if (!isnull)
2163  {
2164  /* use user-defined choose method */
2165  FunctionCall2Coll(procinfo,
2166  index->rd_indcollation[0],
2167  PointerGetDatum(&in),
2168  PointerGetDatum(&out));
2169  }
2170  else
2171  {
2172  /* force "match" action (to insert to random subnode) */
2173  out.resultType = spgMatchNode;
2174  }
2175 
2176  if (innerTuple->allTheSame)
2177  {
2178  /*
2179  * It's not allowed to do an AddNode at an allTheSame tuple.
2180  * Opclass must say "match", in which case we choose a random
2181  * one of the nodes to descend into, or "split".
2182  */
2183  if (out.resultType == spgAddNode)
2184  elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2185  else if (out.resultType == spgMatchNode)
2186  out.result.matchNode.nodeN = random() % innerTuple->nNodes;
2187  }
2188 
2189  switch (out.resultType)
2190  {
2191  case spgMatchNode:
2192  /* Descend to N'th child node */
2193  spgMatchNodeAction(index, state, innerTuple,
2194  &current, &parent,
2195  out.result.matchNode.nodeN);
2196  /* Adjust level as per opclass request */
2197  level += out.result.matchNode.levelAdd;
2198  /* Replace leafDatum and recompute leafSize */
2199  if (!isnull)
2200  {
2201  leafDatums[spgKeyColumn] = out.result.matchNode.restDatum;
2202  leafSize = SpGistGetLeafTupleSize(leafDescriptor,
2203  leafDatums, isnulls);
2204  leafSize += sizeof(ItemIdData);
2205  }
2206 
2207  /*
2208  * Loop around and attempt to insert the new leafDatum at
2209  * "current" (which might reference an existing child
2210  * tuple, or might be invalid to force us to find a new
2211  * page for the tuple).
2212  *
2213  * Note: if the opclass sets longValuesOK, we rely on the
2214  * choose function to eventually shorten the leafDatum
2215  * enough to fit on a page. We could add a test here to
2216  * complain if the datum doesn't get visibly shorter each
2217  * time, but that could get in the way of opclasses that
2218  * "simplify" datums in a way that doesn't necessarily
2219  * lead to physical shortening on every cycle.
2220  */
2221  break;
2222  case spgAddNode:
2223  /* AddNode is not sensible if nodes don't have labels */
2224  if (in.nodeLabels == NULL)
2225  elog(ERROR, "cannot add a node to an inner tuple without node labels");
2226  /* Add node to inner tuple, per request */
2227  spgAddNodeAction(index, state, innerTuple,
2228  &current, &parent,
2229  out.result.addNode.nodeN,
2230  out.result.addNode.nodeLabel);
2231 
2232  /*
2233  * Retry insertion into the enlarged node. We assume that
2234  * we'll get a MatchNode result this time.
2235  */
2236  goto process_inner_tuple;
2237  break;
2238  case spgSplitTuple:
2239  /* Split inner tuple, per request */
2240  spgSplitNodeAction(index, state, innerTuple,
2241  &current, &out);
2242 
2243  /* Retry insertion into the split node */
2244  goto process_inner_tuple;
2245  break;
2246  default:
2247  elog(ERROR, "unrecognized SPGiST choose result: %d",
2248  (int) out.resultType);
2249  break;
2250  }
2251  }
2252  } /* end loop */
2253 
2254  /*
2255  * Release any buffers we're still holding. Beware of possibility that
2256  * current and parent reference same buffer.
2257  */
2258  if (current.buffer != InvalidBuffer)
2259  {
2260  SpGistSetLastUsedPage(index, current.buffer);
2261  UnlockReleaseBuffer(current.buffer);
2262  }
2263  if (parent.buffer != InvalidBuffer &&
2264  parent.buffer != current.buffer)
2265  {
2266  SpGistSetLastUsedPage(index, parent.buffer);
2267  UnlockReleaseBuffer(parent.buffer);
2268  }
2269 
2270  return true;
2271 }
Definition: fmgr.h:56
Datum * spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple)
Definition: spgutils.c:1130
Datum datum
Definition: spgist.h:55
SpGistInnerTupleData * SpGistInnerTuple
#define SpGistPageIsLeaf(page)
bool hasPrefix
Definition: spgist.h:61
int errhint(const char *fmt,...)
Definition: elog.c:1156
#define SpGistPageGetFreeSpace(p, n)
FmgrInfo * index_getprocinfo(Relation irel, AttrNumber attnum, uint16 procnum)
Definition: indexam.c:803
int level
Definition: spgist.h:57
#define PointerGetDatum(X)
Definition: postgres.h:600
long random(void)
Definition: random.c:22
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
SpGistTypeDesc attLeafType
#define Min(x, y)
Definition: c.h:986
SpGistTypeDesc attType
void SpGistSetLastUsedPage(Relation index, Buffer buffer)
Definition: spgutils.c:641
#define InvalidBuffer
Definition: buf.h:25
Datum prefixDatum
Definition: spgist.h:62
int errcode(int sqlerrcode)
Definition: elog.c:698
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3784
struct spgChooseOut::@45::@47 addNode
#define BUFFER_LOCK_EXCLUSIVE
Definition: bufmgr.h:98
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1148
#define SPGIST_ROOT_BLKNO
#define OidIsValid(objectId)
Definition: c.h:710
union spgChooseOut::@45 result
#define SGITDATUM(x, s)
static void addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
Definition: spgdoinsert.c:203
spgConfigOut config
struct spgChooseOut::@45::@46 matchNode
void pfree(void *pointer)
Definition: mcxt.c:1169
unsigned int allTheSame
static bool doPickSplit(Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, int level, bool isNulls, bool isNew)
Definition: spgdoinsert.c:675
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3807
Oid * rd_indcollation
Definition: rel.h:200
#define ERROR
Definition: elog.h:46
unsigned int prefixSize
#define SPGIST_NULL_BLKNO
#define FirstOffsetNumber
Definition: off.h:27
OffsetNumber offnum
Definition: spgdoinsert.c:39
static void spgSplitNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, spgChooseOut *out)
Definition: spgdoinsert.c:1713
#define RelationGetRelationName(relation)
Definition: rel.h:491
struct ItemIdData ItemIdData
Size SpGistGetLeafTupleSize(TupleDesc tupleDescriptor, Datum *datums, bool *isnulls)
Definition: spgutils.c:787
#define BufferGetPage(buffer)
Definition: bufmgr.h:169
bool ConditionalLockBuffer(Buffer buffer)
Definition: bufmgr.c:4049
int nNodes
Definition: spgist.h:63
#define GBUF_LEAF
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:235
#define spgKeyColumn
#define spgFirstIncludeColumn
bool longValuesOK
Definition: spgist.h:47
uintptr_t Datum
Definition: postgres.h:411
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
Definition: fmgr.c:1128
void LockBuffer(Buffer buffer, int mode)
Definition: bufmgr.c:4023
#define SPGIST_PAGE_CAPACITY
spgChooseResultType resultType
Definition: spgist.h:76
#define InvalidOffsetNumber
Definition: off.h:26
#define ereport(elevel,...)
Definition: elog.h:157
#define SpGistPageStoresNulls(page)
static void spgMatchNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN)
Definition: spgdoinsert.c:1457
TupleDesc leafTupDesc
#define Assert(condition)
Definition: c.h:804
Datum leafDatum
Definition: spgist.h:56
Buffer SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
Definition: spgutils.c:537
#define SPGIST_COMPRESS_PROC
Definition: spgist.h:28
Buffer ReadBuffer(Relation reln, BlockNumber blockNum)
Definition: bufmgr.c:697
#define INDEX_MAX_KEYS
#define InvalidBlockNumber
Definition: block.h:33
#define GBUF_NULLS
static void spgAddNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN, Datum nodeLabel)
Definition: spgdoinsert.c:1511
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:2758
Datum * nodeLabels
Definition: spgist.h:64
int errmsg(const char *fmt,...)
Definition: elog.c:909
SpGistLeafTuple spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, Datum *datums, bool *isnulls)
Definition: spgutils.c:840
#define elog(elevel,...)
Definition: elog.h:232
int i
bool allTheSame
Definition: spgist.h:60
static int checkSplitConditions(Relation index, SpGistState *state, SPPageDesc *current, int *nToSplit)
Definition: spgdoinsert.c:333
Buffer buffer
Definition: spgdoinsert.c:37
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:240
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:102
#define SPGIST_CHOOSE_PROC
Definition: spgist.h:24
static void moveLeafs(Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, bool isNulls)
Definition: spgdoinsert.c:387
#define PageGetItem(page, itemId)
Definition: bufpage.h:340
BlockNumber blkno
Definition: spgdoinsert.c:36
RegProcedure index_getprocid(Relation irel, AttrNumber attnum, uint16 procnum)
Definition: indexam.c:769

◆ spgMatchNodeAction()

static void spgMatchNodeAction ( Relation  index,
SpGistState state,
SpGistInnerTuple  innerTuple,
SPPageDesc current,
SPPageDesc parent,
int  nodeN 
)
static

Definition at line 1457 of file spgdoinsert.c.

References SPPageDesc::blkno, SPPageDesc::buffer, elog, ERROR, i, InvalidBlockNumber, InvalidBuffer, InvalidOffsetNumber, ItemPointerGetBlockNumber, ItemPointerGetOffsetNumber, ItemPointerIsValid, SPPageDesc::node, SPPageDesc::offnum, SPPageDesc::page, SGITITERATE, SpGistSetLastUsedPage(), IndexTupleData::t_tid, and UnlockReleaseBuffer().

Referenced by spgdoinsert().

1460 {
1461  int i;
1462  SpGistNodeTuple node;
1463 
1464  /* Release previous parent buffer if any */
1465  if (parent->buffer != InvalidBuffer &&
1466  parent->buffer != current->buffer)
1467  {
1468  SpGistSetLastUsedPage(index, parent->buffer);
1469  UnlockReleaseBuffer(parent->buffer);
1470  }
1471 
1472  /* Repoint parent to specified node of current inner tuple */
1473  parent->blkno = current->blkno;
1474  parent->buffer = current->buffer;
1475  parent->page = current->page;
1476  parent->offnum = current->offnum;
1477  parent->node = nodeN;
1478 
1479  /* Locate that node */
1480  SGITITERATE(innerTuple, i, node)
1481  {
1482  if (i == nodeN)
1483  break;
1484  }
1485 
1486  if (i != nodeN)
1487  elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1488  nodeN);
1489 
1490  /* Point current to the downlink location, if any */
1491  if (ItemPointerIsValid(&node->t_tid))
1492  {
1493  current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1494  current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1495  }
1496  else
1497  {
1498  /* Downlink is empty, so we'll need to find a new page */
1499  current->blkno = InvalidBlockNumber;
1500  current->offnum = InvalidOffsetNumber;
1501  }
1502 
1503  current->buffer = InvalidBuffer;
1504  current->page = NULL;
1505 }
#define ItemPointerIsValid(pointer)
Definition: itemptr.h:82
#define SGITITERATE(x, i, nt)
ItemPointerData t_tid
Definition: itup.h:37
void SpGistSetLastUsedPage(Relation index, Buffer buffer)
Definition: spgutils.c:641
#define InvalidBuffer
Definition: buf.h:25
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3807
#define ERROR
Definition: elog.h:46
OffsetNumber offnum
Definition: spgdoinsert.c:39
#define InvalidOffsetNumber
Definition: off.h:26
#define InvalidBlockNumber
Definition: block.h:33
#define ItemPointerGetOffsetNumber(pointer)
Definition: itemptr.h:117
#define elog(elevel,...)
Definition: elog.h:232
int i
Buffer buffer
Definition: spgdoinsert.c:37
#define ItemPointerGetBlockNumber(pointer)
Definition: itemptr.h:98
BlockNumber blkno
Definition: spgdoinsert.c:36

◆ spgPageIndexMultiDelete()

void spgPageIndexMultiDelete ( SpGistState state,
Page  page,
OffsetNumber itemnos,
int  nitems,
int  firststate,
int  reststate,
BlockNumber  blkno,
OffsetNumber  offnum 
)

Definition at line 131 of file spgdoinsert.c.

References cmpOffsetNumbers(), elog, ERROR, i, MaxIndexTuplesPerPage, PageAddItem, PageIndexMultiDelete(), qsort, SpGistDeadTupleData::size, spgFormDeadTuple(), SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistPageGetOpaque, and SpGistDeadTupleData::tupstate.

Referenced by doPickSplit(), moveLeafs(), spgRedoMoveLeafs(), spgRedoPickSplit(), spgRedoVacuumLeaf(), and vacuumLeafPage().

135 {
136  OffsetNumber firstItem;
138  SpGistDeadTuple tuple = NULL;
139  int i;
140 
141  if (nitems == 0)
142  return; /* nothing to do */
143 
144  /*
145  * For efficiency we want to use PageIndexMultiDelete, which requires the
146  * targets to be listed in sorted order, so we have to sort the itemnos
147  * array. (This also greatly simplifies the math for reinserting the
148  * replacement tuples.) However, we must not scribble on the caller's
149  * array, so we have to make a copy.
150  */
151  memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
152  if (nitems > 1)
153  qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
154 
155  PageIndexMultiDelete(page, sortednos, nitems);
156 
157  firstItem = itemnos[0];
158 
159  for (i = 0; i < nitems; i++)
160  {
161  OffsetNumber itemno = sortednos[i];
162  int tupstate;
163 
164  tupstate = (itemno == firstItem) ? firststate : reststate;
165  if (tuple == NULL || tuple->tupstate != tupstate)
166  tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
167 
168  if (PageAddItem(page, (Item) tuple, tuple->size,
169  itemno, false, false) != itemno)
170  elog(ERROR, "failed to add item of size %u to SPGiST index page",
171  tuple->size);
172 
173  if (tupstate == SPGIST_REDIRECT)
174  SpGistPageGetOpaque(page)->nRedirection++;
175  else if (tupstate == SPGIST_PLACEHOLDER)
176  SpGistPageGetOpaque(page)->nPlaceholder++;
177  }
178 }
SpGistDeadTuple spgFormDeadTuple(SpGistState *state, int tupstate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgutils.c:1054
#define SPGIST_REDIRECT
#define SPGIST_PLACEHOLDER
Pointer Item
Definition: item.h:17
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:416
uint16 OffsetNumber
Definition: off.h:24
#define ERROR
Definition: elog.h:46
unsigned int tupstate
void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
Definition: bufpage.c:1154
#define SpGistPageGetOpaque(page)
#define MaxIndexTuplesPerPage
Definition: itup.h:145
#define elog(elevel,...)
Definition: elog.h:232
int i
#define qsort(a, b, c, d)
Definition: port.h:504
static int cmpOffsetNumbers(const void *a, const void *b)
Definition: spgdoinsert.c:110

◆ spgSplitNodeAction()

static void spgSplitNodeAction ( Relation  index,
SpGistState state,
SpGistInnerTuple  innerTuple,
SPPageDesc current,
spgChooseOut out 
)
static

Definition at line 1713 of file spgdoinsert.c.

References SpGistInnerTupleData::allTheSame, Assert, SPPageDesc::blkno, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage, elog, END_CRIT_SECTION, ERROR, GBUF_INNER_PARITY, i, InvalidBuffer, SpGistState::isBuild, label, MarkBufferDirty(), spgxlogSplitTuple::newPage, SpGistInnerTupleData::nNodes, SPPageDesc::node, SPPageDesc::offnum, spgxlogSplitTuple::offnumPostfix, spgxlogSplitTuple::offnumPrefix, SPPageDesc::page, PageAddItem, PageGetItem, PageGetItemId, PageIndexTupleDelete(), PageSetLSN, palloc(), spgxlogSplitTuple::postfixBlkSame, REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, spgChooseOut::result, SGITITERATE, SGITMAXNNODES, SpGistInnerTupleData::size, spgFormInnerTuple(), spgFormNodeTuple(), SpGistBlockIsRoot, SpGistGetBuffer(), SpGistPageAddNewItem(), SpGistPageGetFreeSpace, SpGistPageStoresNulls, SpGistSetLastUsedPage(), spgUpdateNodeLink(), spgChooseOut::splitTuple, START_CRIT_SECTION, UnlockReleaseBuffer(), XLOG_SPGIST_SPLIT_TUPLE, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

1716 {
1717  SpGistInnerTuple prefixTuple,
1718  postfixTuple;
1719  SpGistNodeTuple node,
1720  *nodes;
1721  BlockNumber postfixBlkno;
1722  OffsetNumber postfixOffset;
1723  int i;
1724  spgxlogSplitTuple xlrec;
1725  Buffer newBuffer = InvalidBuffer;
1726 
1727  /* Should not be applied to nulls */
1728  Assert(!SpGistPageStoresNulls(current->page));
1729 
1730  /* Check opclass gave us sane values */
1731  if (out->result.splitTuple.prefixNNodes <= 0 ||
1732  out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
1733  elog(ERROR, "invalid number of prefix nodes: %d",
1734  out->result.splitTuple.prefixNNodes);
1735  if (out->result.splitTuple.childNodeN < 0 ||
1736  out->result.splitTuple.childNodeN >=
1737  out->result.splitTuple.prefixNNodes)
1738  elog(ERROR, "invalid child node number: %d",
1739  out->result.splitTuple.childNodeN);
1740 
1741  /*
1742  * Construct new prefix tuple with requested number of nodes. We'll fill
1743  * in the childNodeN'th node's downlink below.
1744  */
1745  nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
1746  out->result.splitTuple.prefixNNodes);
1747 
1748  for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
1749  {
1750  Datum label = (Datum) 0;
1751  bool labelisnull;
1752 
1753  labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
1754  if (!labelisnull)
1755  label = out->result.splitTuple.prefixNodeLabels[i];
1756  nodes[i] = spgFormNodeTuple(state, label, labelisnull);
1757  }
1758 
1759  prefixTuple = spgFormInnerTuple(state,
1760  out->result.splitTuple.prefixHasPrefix,
1761  out->result.splitTuple.prefixPrefixDatum,
1762  out->result.splitTuple.prefixNNodes,
1763  nodes);
1764 
1765  /* it must fit in the space that innerTuple now occupies */
1766  if (prefixTuple->size > innerTuple->size)
1767  elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1768 
1769  /*
1770  * Construct new postfix tuple, containing all nodes of innerTuple with
1771  * same node datums, but with the prefix specified by the picksplit
1772  * function.
1773  */
1774  nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1775  SGITITERATE(innerTuple, i, node)
1776  {
1777  nodes[i] = node;
1778  }
1779 
1780  postfixTuple = spgFormInnerTuple(state,
1781  out->result.splitTuple.postfixHasPrefix,
1782  out->result.splitTuple.postfixPrefixDatum,
1783  innerTuple->nNodes, nodes);
1784 
1785  /* Postfix tuple is allTheSame if original tuple was */
1786  postfixTuple->allTheSame = innerTuple->allTheSame;
1787 
1788  /* prep data for WAL record */
1789  xlrec.newPage = false;
1790 
1791  /*
1792  * If we can't fit both tuples on the current page, get a new page for the
1793  * postfix tuple. In particular, can't split to the root page.
1794  *
1795  * For the space calculation, note that prefixTuple replaces innerTuple
1796  * but postfixTuple will be a new entry.
1797  */
1798  if (SpGistBlockIsRoot(current->blkno) ||
1799  SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1800  prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1801  {
1802  /*
1803  * Choose page with next triple parity, because postfix tuple is a
1804  * child of prefix one
1805  */
1806  newBuffer = SpGistGetBuffer(index,
1807  GBUF_INNER_PARITY(current->blkno + 1),
1808  postfixTuple->size + sizeof(ItemIdData),
1809  &xlrec.newPage);
1810  }
1811 
1813 
1814  /*
1815  * Replace old tuple by prefix tuple
1816  */
1817  PageIndexTupleDelete(current->page, current->offnum);
1818  xlrec.offnumPrefix = PageAddItem(current->page,
1819  (Item) prefixTuple, prefixTuple->size,
1820  current->offnum, false, false);
1821  if (xlrec.offnumPrefix != current->offnum)
1822  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1823  prefixTuple->size);
1824 
1825  /*
1826  * put postfix tuple into appropriate page
1827  */
1828  if (newBuffer == InvalidBuffer)
1829  {
1830  postfixBlkno = current->blkno;
1831  xlrec.offnumPostfix = postfixOffset =
1832  SpGistPageAddNewItem(state, current->page,
1833  (Item) postfixTuple, postfixTuple->size,
1834  NULL, false);
1835  xlrec.postfixBlkSame = true;
1836  }
1837  else
1838  {
1839  postfixBlkno = BufferGetBlockNumber(newBuffer);
1840  xlrec.offnumPostfix = postfixOffset =
1841  SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
1842  (Item) postfixTuple, postfixTuple->size,
1843  NULL, false);
1844  MarkBufferDirty(newBuffer);
1845  xlrec.postfixBlkSame = false;
1846  }
1847 
1848  /*
1849  * And set downlink pointer in the prefix tuple to point to postfix tuple.
1850  * (We can't avoid this step by doing the above two steps in opposite
1851  * order, because there might not be enough space on the page to insert
1852  * the postfix tuple first.) We have to update the local copy of the
1853  * prefixTuple too, because that's what will be written to WAL.
1854  */
1855  spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1856  postfixBlkno, postfixOffset);
1857  prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1858  PageGetItemId(current->page, current->offnum));
1859  spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1860  postfixBlkno, postfixOffset);
1861 
1862  MarkBufferDirty(current->buffer);
1863 
1864  if (RelationNeedsWAL(index) && !state->isBuild)
1865  {
1866  XLogRecPtr recptr;
1867 
1868  XLogBeginInsert();
1869  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1870  XLogRegisterData((char *) prefixTuple, prefixTuple->size);
1871  XLogRegisterData((char *) postfixTuple, postfixTuple->size);
1872 
1874  if (newBuffer != InvalidBuffer)
1875  {
1876  int flags;
1877 
1878  flags = REGBUF_STANDARD;
1879  if (xlrec.newPage)
1880  flags |= REGBUF_WILL_INIT;
1881  XLogRegisterBuffer(1, newBuffer, flags);
1882  }
1883 
1884  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1885 
1886  PageSetLSN(current->page, recptr);
1887 
1888  if (newBuffer != InvalidBuffer)
1889  {
1890  PageSetLSN(BufferGetPage(newBuffer), recptr);
1891  }
1892  }
1893 
1894  END_CRIT_SECTION();
1895 
1896  /* Update local free-space cache and release buffer */
1897  if (newBuffer != InvalidBuffer)
1898  {
1899  SpGistSetLastUsedPage(index, newBuffer);
1900  UnlockReleaseBuffer(newBuffer);
1901  }
1902 }
OffsetNumber offnumPostfix
Definition: spgxlog.h:147
void spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN, BlockNumber blkno, OffsetNumber offset)
Definition: spgdoinsert.c:50
SpGistInnerTupleData * SpGistInnerTuple
#define SpGistPageGetFreeSpace(p, n)
#define XLOG_SPGIST_SPLIT_TUPLE
Definition: spgxlog.h:25
void PageIndexTupleDelete(Page page, OffsetNumber offnum)
Definition: bufpage.c:1045
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:1562
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:220
#define SGITITERATE(x, i, nt)
#define END_CRIT_SECTION()
Definition: miscadmin.h:137
void SpGistSetLastUsedPage(Relation index, Buffer buffer)
Definition: spgutils.c:641
Pointer Item
Definition: item.h:17
#define InvalidBuffer
Definition: buf.h:25
#define REGBUF_WILL_INIT
Definition: xloginsert.h:33
#define START_CRIT_SECTION()
Definition: miscadmin.h:135
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:416
uint32 BlockNumber
Definition: block.h:31
union spgChooseOut::@45 result
uint16 OffsetNumber
Definition: off.h:24
unsigned int allTheSame
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3807
#define ERROR
Definition: elog.h:46
#define GBUF_INNER_PARITY(x)
#define REGBUF_STANDARD
Definition: xloginsert.h:35
SpGistInnerTuple spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix, int nNodes, SpGistNodeTuple *nodes)
Definition: spgutils.c:971
OffsetNumber offnum
Definition: spgdoinsert.c:39
#define BufferGetPage(buffer)
Definition: bufmgr.h:169
#define SGITMAXNNODES
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:235
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:330
OffsetNumber offnumPrefix
Definition: spgxlog.h:144
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:422
uintptr_t Datum
Definition: postgres.h:411
static char * label
bool postfixBlkSame
Definition: spgxlog.h:149
#define SpGistPageStoresNulls(page)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
Buffer SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
Definition: spgutils.c:537
#define SpGistBlockIsRoot(blkno)
#define RelationNeedsWAL(relation)
Definition: rel.h:570
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:2758
void * palloc(Size size)
Definition: mcxt.c:1062
#define elog(elevel,...)
Definition: elog.h:232
int i
Buffer buffer
Definition: spgdoinsert.c:37
struct spgChooseOut::@45::@48 splitTuple
void XLogBeginInsert(void)
Definition: xloginsert.c:123
#define PageSetLSN(page, lsn)
Definition: bufpage.h:368
int Buffer
Definition: buf.h:23
OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size, OffsetNumber *startOffset, bool errorOK)
Definition: spgutils.c:1173
SpGistNodeTuple spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
Definition: spgutils.c:929
#define PageGetItem(page, itemId)
Definition: bufpage.h:340
BlockNumber blkno
Definition: spgdoinsert.c:36

◆ spgUpdateNodeLink()

void spgUpdateNodeLink ( SpGistInnerTuple  tup,
int  nodeN,
BlockNumber  blkno,
OffsetNumber  offset 
)

Definition at line 50 of file spgdoinsert.c.

References elog, ERROR, i, ItemPointerSet, SPPageDesc::node, SGITITERATE, and IndexTupleData::t_tid.

Referenced by saveNodeLink(), spgRedoAddLeaf(), spgRedoAddNode(), spgRedoMoveLeafs(), spgRedoPickSplit(), and spgSplitNodeAction().

52 {
53  int i;
54  SpGistNodeTuple node;
55 
56  SGITITERATE(tup, i, node)
57  {
58  if (i == nodeN)
59  {
60  ItemPointerSet(&node->t_tid, blkno, offset);
61  return;
62  }
63  }
64 
65  elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
66  nodeN);
67 }
#define SGITITERATE(x, i, nt)
ItemPointerData t_tid
Definition: itup.h:37
#define ERROR
Definition: elog.h:46
#define elog(elevel,...)
Definition: elog.h:232
int i
#define ItemPointerSet(pointer, blockNumber, offNum)
Definition: itemptr.h:127