PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
spgdoinsert.c File Reference
#include "postgres.h"
#include "access/genam.h"
#include "access/spgist_private.h"
#include "access/spgxlog.h"
#include "access/xloginsert.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "utils/rel.h"
Include dependency graph for spgdoinsert.c:

Go to the source code of this file.

Data Structures

struct  SPPageDesc
 

Typedefs

typedef struct SPPageDesc SPPageDesc
 

Functions

void spgUpdateNodeLink (SpGistInnerTuple tup, int nodeN, BlockNumber blkno, OffsetNumber offset)
 
static SpGistInnerTuple addNode (SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
 
static int cmpOffsetNumbers (const void *a, const void *b)
 
void spgPageIndexMultiDelete (SpGistState *state, Page page, OffsetNumber *itemnos, int nitems, int firststate, int reststate, BlockNumber blkno, OffsetNumber offnum)
 
static void saveNodeLink (Relation index, SPPageDesc *parent, BlockNumber blkno, OffsetNumber offnum)
 
static void addLeafTuple (Relation index, SpGistState *state, SpGistLeafTuple leafTuple, SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
 
static int checkSplitConditions (Relation index, SpGistState *state, SPPageDesc *current, int *nToSplit)
 
static void moveLeafs (Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, bool isNulls)
 
static void setRedirectionTuple (SPPageDesc *current, OffsetNumber position, BlockNumber blkno, OffsetNumber offnum)
 
static bool checkAllTheSame (spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig, bool *includeNew)
 
static bool doPickSplit (Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, int level, bool isNulls, bool isNew)
 
static void spgMatchNodeAction (Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN)
 
static void spgAddNodeAction (Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN, Datum nodeLabel)
 
static void spgSplitNodeAction (Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, spgChooseOut *out)
 
bool spgdoinsert (Relation index, SpGistState *state, ItemPointer heapPtr, Datum datum, bool isnull)
 

Typedef Documentation

Function Documentation

static void addLeafTuple ( Relation  index,
SpGistState state,
SpGistLeafTuple  leafTuple,
SPPageDesc current,
SPPageDesc parent,
bool  isNulls,
bool  isNew 
)
static

Definition at line 203 of file spgdoinsert.c.

References SPPageDesc::blkno, SPPageDesc::buffer, elog, END_CRIT_SECTION, ERROR, InvalidBuffer, InvalidOffsetNumber, MarkBufferDirty(), spgxlogAddLeaf::newPage, SpGistLeafTupleData::nextOffset, SPPageDesc::node, spgxlogAddLeaf::nodeI, NULL, SPPageDesc::offnum, spgxlogAddLeaf::offnumHeadLeaf, spgxlogAddLeaf::offnumLeaf, spgxlogAddLeaf::offnumParent, SPPageDesc::page, PageAddItem, PageGetItem, PageGetItemId, PageIndexTupleDelete(), PageSetLSN, REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, saveNodeLink(), SpGistLeafTupleData::size, SPGIST_DEAD, SPGIST_LIVE, SpGistBlockIsRoot, SpGistPageAddNewItem(), START_CRIT_SECTION, spgxlogAddLeaf::storesNulls, SpGistLeafTupleData::tupstate, XLOG_SPGIST_ADD_LEAF, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

205 {
206  spgxlogAddLeaf xlrec;
207 
208  xlrec.newPage = isNew;
209  xlrec.storesNulls = isNulls;
210 
211  /* these will be filled below as needed */
215  xlrec.nodeI = 0;
216 
218 
219  if (current->offnum == InvalidOffsetNumber ||
220  SpGistBlockIsRoot(current->blkno))
221  {
222  /* Tuple is not part of a chain */
223  leafTuple->nextOffset = InvalidOffsetNumber;
224  current->offnum = SpGistPageAddNewItem(state, current->page,
225  (Item) leafTuple, leafTuple->size,
226  NULL, false);
227 
228  xlrec.offnumLeaf = current->offnum;
229 
230  /* Must update parent's downlink if any */
231  if (parent->buffer != InvalidBuffer)
232  {
233  xlrec.offnumParent = parent->offnum;
234  xlrec.nodeI = parent->node;
235 
236  saveNodeLink(index, parent, current->blkno, current->offnum);
237  }
238  }
239  else
240  {
241  /*
242  * Tuple must be inserted into existing chain. We mustn't change the
243  * chain's head address, but we don't need to chase the entire chain
244  * to put the tuple at the end; we can insert it second.
245  *
246  * Also, it's possible that the "chain" consists only of a DEAD tuple,
247  * in which case we should replace the DEAD tuple in-place.
248  */
249  SpGistLeafTuple head;
250  OffsetNumber offnum;
251 
252  head = (SpGistLeafTuple) PageGetItem(current->page,
253  PageGetItemId(current->page, current->offnum));
254  if (head->tupstate == SPGIST_LIVE)
255  {
256  leafTuple->nextOffset = head->nextOffset;
257  offnum = SpGistPageAddNewItem(state, current->page,
258  (Item) leafTuple, leafTuple->size,
259  NULL, false);
260 
261  /*
262  * re-get head of list because it could have been moved on page,
263  * and set new second element
264  */
265  head = (SpGistLeafTuple) PageGetItem(current->page,
266  PageGetItemId(current->page, current->offnum));
267  head->nextOffset = offnum;
268 
269  xlrec.offnumLeaf = offnum;
270  xlrec.offnumHeadLeaf = current->offnum;
271  }
272  else if (head->tupstate == SPGIST_DEAD)
273  {
274  leafTuple->nextOffset = InvalidOffsetNumber;
275  PageIndexTupleDelete(current->page, current->offnum);
276  if (PageAddItem(current->page,
277  (Item) leafTuple, leafTuple->size,
278  current->offnum, false, false) != current->offnum)
279  elog(ERROR, "failed to add item of size %u to SPGiST index page",
280  leafTuple->size);
281 
282  /* WAL replay distinguishes this case by equal offnums */
283  xlrec.offnumLeaf = current->offnum;
284  xlrec.offnumHeadLeaf = current->offnum;
285  }
286  else
287  elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
288  }
289 
290  MarkBufferDirty(current->buffer);
291 
292  if (RelationNeedsWAL(index))
293  {
294  XLogRecPtr recptr;
295  int flags;
296 
297  XLogBeginInsert();
298  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
299  XLogRegisterData((char *) leafTuple, leafTuple->size);
300 
301  flags = REGBUF_STANDARD;
302  if (xlrec.newPage)
303  flags |= REGBUF_WILL_INIT;
304  XLogRegisterBuffer(0, current->buffer, flags);
305  if (xlrec.offnumParent != InvalidOffsetNumber)
307 
308  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
309 
310  PageSetLSN(current->page, recptr);
311 
312  /* update parent only if we actually changed it */
313  if (xlrec.offnumParent != InvalidOffsetNumber)
314  {
315  PageSetLSN(parent->page, recptr);
316  }
317  }
318 
320 }
#define SPGIST_DEAD
static void saveNodeLink(Relation index, SPPageDesc *parent, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:186
OffsetNumber offnumParent
Definition: spgxlog.h:53
OffsetNumber offnumLeaf
Definition: spgxlog.h:50
void PageIndexTupleDelete(Page page, OffsetNumber offnum)
Definition: bufpage.c:700
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:1445
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:213
#define END_CRIT_SECTION()
Definition: miscadmin.h:132
Pointer Item
Definition: item.h:17
#define InvalidBuffer
Definition: buf.h:25
#define REGBUF_WILL_INIT
Definition: xloginsert.h:32
#define START_CRIT_SECTION()
Definition: miscadmin.h:130
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:413
#define XLOG_SPGIST_ADD_LEAF
Definition: spgxlog.h:22
uint16 OffsetNumber
Definition: off.h:24
#define ERROR
Definition: elog.h:43
bool newPage
Definition: spgxlog.h:48
#define REGBUF_STANDARD
Definition: xloginsert.h:35
OffsetNumber offnum
Definition: spgdoinsert.c:39
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:232
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
unsigned int tupstate
OffsetNumber offnumHeadLeaf
Definition: spgxlog.h:51
bool storesNulls
Definition: spgxlog.h:49
#define InvalidOffsetNumber
Definition: off.h:26
uint16 nodeI
Definition: spgxlog.h:54
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define SpGistBlockIsRoot(blkno)
#define SPGIST_LIVE
OffsetNumber nextOffset
#define RelationNeedsWAL(relation)
Definition: rel.h:502
Buffer buffer
Definition: spgdoinsert.c:37
#define elog
Definition: elog.h:219
SpGistLeafTupleData * SpGistLeafTuple
void XLogBeginInsert(void)
Definition: xloginsert.c:120
#define PageSetLSN(page, lsn)
Definition: bufpage.h:365
OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size, OffsetNumber *startOffset, bool errorOK)
Definition: spgutils.c:827
#define PageGetItem(page, itemId)
Definition: bufpage.h:337
BlockNumber blkno
Definition: spgdoinsert.c:36
static SpGistInnerTuple addNode ( SpGistState state,
SpGistInnerTuple  tuple,
Datum  label,
int  offset 
)
static

Definition at line 78 of file spgdoinsert.c.

References elog, ERROR, i, SpGistInnerTupleData::nNodes, palloc(), SpGistInnerTupleData::prefixSize, SGITDATUM, SGITITERATE, spgFormInnerTuple(), and spgFormNodeTuple().

Referenced by spgAddNodeAction().

79 {
80  SpGistNodeTuple node,
81  *nodes;
82  int i;
83 
84  /* if offset is negative, insert at end */
85  if (offset < 0)
86  offset = tuple->nNodes;
87  else if (offset > tuple->nNodes)
88  elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
89 
90  nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
91  SGITITERATE(tuple, i, node)
92  {
93  if (i < offset)
94  nodes[i] = node;
95  else
96  nodes[i + 1] = node;
97  }
98 
99  nodes[offset] = spgFormNodeTuple(state, label, false);
100 
101  return spgFormInnerTuple(state,
102  (tuple->prefixSize > 0),
103  SGITDATUM(tuple, state),
104  tuple->nNodes + 1,
105  nodes);
106 }
#define SGITITERATE(x, i, nt)
#define SGITDATUM(x, s)
#define ERROR
Definition: elog.h:43
unsigned int prefixSize
SpGistInnerTuple spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix, int nNodes, SpGistNodeTuple *nodes)
Definition: spgutils.c:671
static char * label
Definition: pg_basebackup.c:83
void * palloc(Size size)
Definition: mcxt.c:891
int i
#define elog
Definition: elog.h:219
SpGistNodeTuple spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
Definition: spgutils.c:629
static bool checkAllTheSame ( spgPickSplitIn in,
spgPickSplitOut out,
bool  tooBig,
bool includeNew 
)
static

Definition at line 598 of file spgdoinsert.c.

References i, spgPickSplitOut::mapTuplesToNodes, spgPickSplitOut::nNodes, spgPickSplitOut::nodeLabels, spgPickSplitIn::nTuples, and palloc().

Referenced by doPickSplit().

600 {
601  int theNode;
602  int limit;
603  int i;
604 
605  /* For the moment, assume we can include the new leaf tuple */
606  *includeNew = true;
607 
608  /* If there's only the new leaf tuple, don't select allTheSame mode */
609  if (in->nTuples <= 1)
610  return false;
611 
612  /* If tuple set doesn't fit on one page, ignore the new tuple in test */
613  limit = tooBig ? in->nTuples - 1 : in->nTuples;
614 
615  /* Check to see if more than one node is populated */
616  theNode = out->mapTuplesToNodes[0];
617  for (i = 1; i < limit; i++)
618  {
619  if (out->mapTuplesToNodes[i] != theNode)
620  return false;
621  }
622 
623  /* Nope, so override the picksplit function's decisions */
624 
625  /* If the new tuple is in its own node, it can't be included in split */
626  if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
627  *includeNew = false;
628 
629  out->nNodes = 8; /* arbitrary number of child nodes */
630 
631  /* Random assignment of tuples to nodes (note we include new tuple) */
632  for (i = 0; i < in->nTuples; i++)
633  out->mapTuplesToNodes[i] = i % out->nNodes;
634 
635  /* The opclass may not use node labels, but if it does, duplicate 'em */
636  if (out->nodeLabels)
637  {
638  Datum theLabel = out->nodeLabels[theNode];
639 
640  out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
641  for (i = 0; i < out->nNodes; i++)
642  out->nodeLabels[i] = theLabel;
643  }
644 
645  /* We don't touch the prefix or the leaf tuple datum assignments */
646 
647  return true;
648 }
Datum * nodeLabels
Definition: spgist.h:124
uintptr_t Datum
Definition: postgres.h:374
void * palloc(Size size)
Definition: mcxt.c:891
int i
int * mapTuplesToNodes
Definition: spgist.h:126
static int checkSplitConditions ( Relation  index,
SpGistState state,
SPPageDesc current,
int *  nToSplit 
)
static

Definition at line 333 of file spgdoinsert.c.

References Assert, SPPageDesc::blkno, elog, ERROR, FirstOffsetNumber, i, InvalidOffsetNumber, SpGistLeafTupleData::nextOffset, SPPageDesc::offnum, SPPageDesc::page, PageGetItem, PageGetItemId, PageGetMaxOffsetNumber, SpGistLeafTupleData::size, SPGIST_DEAD, SPGIST_LIVE, SpGistBlockIsRoot, and SpGistLeafTupleData::tupstate.

Referenced by spgdoinsert().

335 {
336  int i,
337  n = 0,
338  totalSize = 0;
339 
340  if (SpGistBlockIsRoot(current->blkno))
341  {
342  /* return impossible values to force split */
343  *nToSplit = BLCKSZ;
344  return BLCKSZ;
345  }
346 
347  i = current->offnum;
348  while (i != InvalidOffsetNumber)
349  {
350  SpGistLeafTuple it;
351 
352  Assert(i >= FirstOffsetNumber &&
353  i <= PageGetMaxOffsetNumber(current->page));
354  it = (SpGistLeafTuple) PageGetItem(current->page,
355  PageGetItemId(current->page, i));
356  if (it->tupstate == SPGIST_LIVE)
357  {
358  n++;
359  totalSize += it->size + sizeof(ItemIdData);
360  }
361  else if (it->tupstate == SPGIST_DEAD)
362  {
363  /* We could see a DEAD tuple as first/only chain item */
364  Assert(i == current->offnum);
366  /* Don't count it in result, because it won't go to other page */
367  }
368  else
369  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
370 
371  i = it->nextOffset;
372  }
373 
374  *nToSplit = n;
375 
376  return totalSize;
377 }
#define SPGIST_DEAD
#define PageGetMaxOffsetNumber(page)
Definition: bufpage.h:354
#define ERROR
Definition: elog.h:43
#define FirstOffsetNumber
Definition: off.h:27
OffsetNumber offnum
Definition: spgdoinsert.c:39
struct ItemIdData ItemIdData
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:232
unsigned int tupstate
#define InvalidOffsetNumber
Definition: off.h:26
#define Assert(condition)
Definition: c.h:671
#define SpGistBlockIsRoot(blkno)
#define SPGIST_LIVE
OffsetNumber nextOffset
int i
#define elog
Definition: elog.h:219
SpGistLeafTupleData * SpGistLeafTuple
#define PageGetItem(page, itemId)
Definition: bufpage.h:337
BlockNumber blkno
Definition: spgdoinsert.c:36
static int cmpOffsetNumbers ( const void *  a,
const void *  b 
)
static

Definition at line 110 of file spgdoinsert.c.

Referenced by spgPageIndexMultiDelete().

111 {
112  if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
113  return 0;
114  return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
115 }
uint16 OffsetNumber
Definition: off.h:24
static bool doPickSplit ( Relation  index,
SpGistState state,
SPPageDesc current,
SPPageDesc parent,
SpGistLeafTuple  newLeafTuple,
int  level,
bool  isNulls,
bool  isNew 
)
static

Definition at line 675 of file spgdoinsert.c.

References SpGistInnerTupleData::allTheSame, Assert, SPPageDesc::blkno, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage, BufferIsValid, checkAllTheSame(), spgPickSplitIn::datums, elog, END_CRIT_SECTION, ERROR, FirstOffsetNumber, FunctionCall2Coll(), GBUF_INNER_PARITY, GBUF_LEAF, GBUF_NULLS, spgPickSplitOut::hasPrefix, SpGistLeafTupleData::heapPtr, i, index_getprocinfo(), spgxlogPickSplit::initDest, spgxlogPickSplit::initInner, spgxlogPickSplit::initSrc, spgxlogPickSplit::innerIsParent, InvalidBlockNumber, InvalidBuffer, InvalidOffsetNumber, SpGistState::isBuild, spgxlogPickSplit::isRootSplit, ItemPointerGetBlockNumber, ItemPointerGetOffsetNumber, ItemPointerIsValid, ItemPointerSet, label, spgPickSplitOut::leafTupleDatums, spgPickSplitIn::level, spgPickSplitOut::mapTuplesToNodes, MarkBufferDirty(), Min, spgxlogPickSplit::nDelete, SpGistLeafTupleData::nextOffset, spgxlogPickSplit::nInsert, spgPickSplitOut::nNodes, SPPageDesc::node, spgxlogPickSplit::nodeI, spgPickSplitOut::nodeLabels, spgPickSplitIn::nTuples, NULL, SPPageDesc::offnum, spgxlogPickSplit::offnumInner, spgxlogPickSplit::offnumParent, SPPageDesc::page, PageAddItem, PageGetExactFreeSpace(), PageGetItem, PageGetItemId, PageGetMaxOffsetNumber, PageSetLSN, palloc(), palloc0(), PointerGetDatum, spgPickSplitOut::prefixDatum, RelationData::rd_indcollation, REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, saveNodeLink(), setRedirectionTuple(), SGDTSIZE, SGITITERATE, SGLTDATUM, SpGistInnerTupleData::size, SpGistLeafTupleData::size, SizeOfSpgxlogPickSplit, spgFormInnerTuple(), spgFormLeafTuple(), spgFormNodeTuple(), SPGIST_DEAD, SPGIST_LEAF, SPGIST_LIVE, SPGIST_METAPAGE_BLKNO, SPGIST_NULLS, SPGIST_PAGE_CAPACITY, SPGIST_PICKSPLIT_PROC, SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistBlockIsRoot, SpGistGetBuffer(), SpGistInitBuffer(), SpGistPageAddNewItem(), SpGistPageGetFreeSpace, SpGistPageGetOpaque, SpGistSetLastUsedPage(), spgPageIndexMultiDelete(), START_CRIT_SECTION, spgxlogPickSplit::stateSrc, STORE_STATE, spgxlogPickSplit::storesNulls, SpGistLeafTupleData::tupstate, UnlockReleaseBuffer(), XLOG_SPGIST_PICKSPLIT, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

679 {
680  bool insertedNew = false;
681  spgPickSplitIn in;
682  spgPickSplitOut out;
683  FmgrInfo *procinfo;
684  bool includeNew;
685  int i,
686  max,
687  n;
688  SpGistInnerTuple innerTuple;
689  SpGistNodeTuple node,
690  *nodes;
691  Buffer newInnerBuffer,
692  newLeafBuffer;
693  ItemPointerData *heapPtrs;
694  uint8 *leafPageSelect;
695  int *leafSizes;
696  OffsetNumber *toDelete;
697  OffsetNumber *toInsert;
698  OffsetNumber redirectTuplePos = InvalidOffsetNumber;
699  OffsetNumber startOffsets[2];
700  SpGistLeafTuple *newLeafs;
701  int spaceToDelete;
702  int currentFreeSpace;
703  int totalLeafSizes;
704  bool allTheSame;
705  spgxlogPickSplit xlrec;
706  char *leafdata,
707  *leafptr;
708  SPPageDesc saveCurrent;
709  int nToDelete,
710  nToInsert,
711  maxToInclude;
712 
713  in.level = level;
714 
715  /*
716  * Allocate per-leaf-tuple work arrays with max possible size
717  */
718  max = PageGetMaxOffsetNumber(current->page);
719  n = max + 1;
720  in.datums = (Datum *) palloc(sizeof(Datum) * n);
721  heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n);
722  toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
723  toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
724  newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
725  leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
726 
727  STORE_STATE(state, xlrec.stateSrc);
728 
729  /*
730  * Form list of leaf tuples which will be distributed as split result;
731  * also, count up the amount of space that will be freed from current.
732  * (Note that in the non-root case, we won't actually delete the old
733  * tuples, only replace them with redirects or placeholders.)
734  *
735  * Note: the SGLTDATUM calls here are safe even when dealing with a nulls
736  * page. For a pass-by-value data type we will fetch a word that must
737  * exist even though it may contain garbage (because of the fact that leaf
738  * tuples must have size at least SGDTSIZE). For a pass-by-reference type
739  * we are just computing a pointer that isn't going to get dereferenced.
740  * So it's not worth guarding the calls with isNulls checks.
741  */
742  nToInsert = 0;
743  nToDelete = 0;
744  spaceToDelete = 0;
745  if (SpGistBlockIsRoot(current->blkno))
746  {
747  /*
748  * We are splitting the root (which up to now is also a leaf page).
749  * Its tuples are not linked, so scan sequentially to get them all. We
750  * ignore the original value of current->offnum.
751  */
752  for (i = FirstOffsetNumber; i <= max; i++)
753  {
754  SpGistLeafTuple it;
755 
756  it = (SpGistLeafTuple) PageGetItem(current->page,
757  PageGetItemId(current->page, i));
758  if (it->tupstate == SPGIST_LIVE)
759  {
760  in.datums[nToInsert] = SGLTDATUM(it, state);
761  heapPtrs[nToInsert] = it->heapPtr;
762  nToInsert++;
763  toDelete[nToDelete] = i;
764  nToDelete++;
765  /* we will delete the tuple altogether, so count full space */
766  spaceToDelete += it->size + sizeof(ItemIdData);
767  }
768  else /* tuples on root should be live */
769  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
770  }
771  }
772  else
773  {
774  /* Normal case, just collect the leaf tuples in the chain */
775  i = current->offnum;
776  while (i != InvalidOffsetNumber)
777  {
778  SpGistLeafTuple it;
779 
780  Assert(i >= FirstOffsetNumber && i <= max);
781  it = (SpGistLeafTuple) PageGetItem(current->page,
782  PageGetItemId(current->page, i));
783  if (it->tupstate == SPGIST_LIVE)
784  {
785  in.datums[nToInsert] = SGLTDATUM(it, state);
786  heapPtrs[nToInsert] = it->heapPtr;
787  nToInsert++;
788  toDelete[nToDelete] = i;
789  nToDelete++;
790  /* we will not delete the tuple, only replace with dead */
791  Assert(it->size >= SGDTSIZE);
792  spaceToDelete += it->size - SGDTSIZE;
793  }
794  else if (it->tupstate == SPGIST_DEAD)
795  {
796  /* We could see a DEAD tuple as first/only chain item */
797  Assert(i == current->offnum);
799  toDelete[nToDelete] = i;
800  nToDelete++;
801  /* replacing it with redirect will save no space */
802  }
803  else
804  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
805 
806  i = it->nextOffset;
807  }
808  }
809  in.nTuples = nToInsert;
810 
811  /*
812  * We may not actually insert new tuple because another picksplit may be
813  * necessary due to too large value, but we will try to allocate enough
814  * space to include it; and in any case it has to be included in the input
815  * for the picksplit function. So don't increment nToInsert yet.
816  */
817  in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state);
818  heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
819  in.nTuples++;
820 
821  memset(&out, 0, sizeof(out));
822 
823  if (!isNulls)
824  {
825  /*
826  * Perform split using user-defined method.
827  */
828  procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
829  FunctionCall2Coll(procinfo,
830  index->rd_indcollation[0],
831  PointerGetDatum(&in),
832  PointerGetDatum(&out));
833 
834  /*
835  * Form new leaf tuples and count up the total space needed.
836  */
837  totalLeafSizes = 0;
838  for (i = 0; i < in.nTuples; i++)
839  {
840  newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
841  out.leafTupleDatums[i],
842  false);
843  totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
844  }
845  }
846  else
847  {
848  /*
849  * Perform dummy split that puts all tuples into one node.
850  * checkAllTheSame will override this and force allTheSame mode.
851  */
852  out.hasPrefix = false;
853  out.nNodes = 1;
854  out.nodeLabels = NULL;
855  out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
856 
857  /*
858  * Form new leaf tuples and count up the total space needed.
859  */
860  totalLeafSizes = 0;
861  for (i = 0; i < in.nTuples; i++)
862  {
863  newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
864  (Datum) 0,
865  true);
866  totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
867  }
868  }
869 
870  /*
871  * Check to see if the picksplit function failed to separate the values,
872  * ie, it put them all into the same child node. If so, select allTheSame
873  * mode and create a random split instead. See comments for
874  * checkAllTheSame as to why we need to know if the new leaf tuples could
875  * fit on one page.
876  */
877  allTheSame = checkAllTheSame(&in, &out,
878  totalLeafSizes > SPGIST_PAGE_CAPACITY,
879  &includeNew);
880 
881  /*
882  * If checkAllTheSame decided we must exclude the new tuple, don't
883  * consider it any further.
884  */
885  if (includeNew)
886  maxToInclude = in.nTuples;
887  else
888  {
889  maxToInclude = in.nTuples - 1;
890  totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
891  }
892 
893  /*
894  * Allocate per-node work arrays. Since checkAllTheSame could replace
895  * out.nNodes with a value larger than the number of tuples on the input
896  * page, we can't allocate these arrays before here.
897  */
898  nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
899  leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
900 
901  /*
902  * Form nodes of inner tuple and inner tuple itself
903  */
904  for (i = 0; i < out.nNodes; i++)
905  {
906  Datum label = (Datum) 0;
907  bool labelisnull = (out.nodeLabels == NULL);
908 
909  if (!labelisnull)
910  label = out.nodeLabels[i];
911  nodes[i] = spgFormNodeTuple(state, label, labelisnull);
912  }
913  innerTuple = spgFormInnerTuple(state,
914  out.hasPrefix, out.prefixDatum,
915  out.nNodes, nodes);
916  innerTuple->allTheSame = allTheSame;
917 
918  /*
919  * Update nodes[] array to point into the newly formed innerTuple, so that
920  * we can adjust their downlinks below.
921  */
922  SGITITERATE(innerTuple, i, node)
923  {
924  nodes[i] = node;
925  }
926 
927  /*
928  * Re-scan new leaf tuples and count up the space needed under each node.
929  */
930  for (i = 0; i < maxToInclude; i++)
931  {
932  n = out.mapTuplesToNodes[i];
933  if (n < 0 || n >= out.nNodes)
934  elog(ERROR, "inconsistent result of SPGiST picksplit function");
935  leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
936  }
937 
938  /*
939  * To perform the split, we must insert a new inner tuple, which can't go
940  * on a leaf page; and unless we are splitting the root page, we must then
941  * update the parent tuple's downlink to point to the inner tuple. If
942  * there is room, we'll put the new inner tuple on the same page as the
943  * parent tuple, otherwise we need another non-leaf buffer. But if the
944  * parent page is the root, we can't add the new inner tuple there,
945  * because the root page must have only one inner tuple.
946  */
947  xlrec.initInner = false;
948  if (parent->buffer != InvalidBuffer &&
949  !SpGistBlockIsRoot(parent->blkno) &&
950  (SpGistPageGetFreeSpace(parent->page, 1) >=
951  innerTuple->size + sizeof(ItemIdData)))
952  {
953  /* New inner tuple will fit on parent page */
954  newInnerBuffer = parent->buffer;
955  }
956  else if (parent->buffer != InvalidBuffer)
957  {
958  /* Send tuple to page with next triple parity (see README) */
959  newInnerBuffer = SpGistGetBuffer(index,
960  GBUF_INNER_PARITY(parent->blkno + 1) |
961  (isNulls ? GBUF_NULLS : 0),
962  innerTuple->size + sizeof(ItemIdData),
963  &xlrec.initInner);
964  }
965  else
966  {
967  /* Root page split ... inner tuple will go to root page */
968  newInnerBuffer = InvalidBuffer;
969  }
970 
971  /*
972  * The new leaf tuples converted from the existing ones should require the
973  * same or less space, and therefore should all fit onto one page
974  * (although that's not necessarily the current page, since we can't
975  * delete the old tuples but only replace them with placeholders).
976  * However, the incoming new tuple might not also fit, in which case we
977  * might need another picksplit cycle to reduce it some more.
978  *
979  * If there's not room to put everything back onto the current page, then
980  * we decide on a per-node basis which tuples go to the new page. (We do
981  * it like that because leaf tuple chains can't cross pages, so we must
982  * place all leaf tuples belonging to the same parent node on the same
983  * page.)
984  *
985  * If we are splitting the root page (turning it from a leaf page into an
986  * inner page), then no leaf tuples can go back to the current page; they
987  * must all go somewhere else.
988  */
989  if (!SpGistBlockIsRoot(current->blkno))
990  currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
991  else
992  currentFreeSpace = 0; /* prevent assigning any tuples to current */
993 
994  xlrec.initDest = false;
995 
996  if (totalLeafSizes <= currentFreeSpace)
997  {
998  /* All the leaf tuples will fit on current page */
999  newLeafBuffer = InvalidBuffer;
1000  /* mark new leaf tuple as included in insertions, if allowed */
1001  if (includeNew)
1002  {
1003  nToInsert++;
1004  insertedNew = true;
1005  }
1006  for (i = 0; i < nToInsert; i++)
1007  leafPageSelect[i] = 0; /* signifies current page */
1008  }
1009  else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1010  {
1011  /*
1012  * We're trying to split up a long value by repeated suffixing, but
1013  * it's not going to fit yet. Don't bother allocating a second leaf
1014  * buffer that we won't be able to use.
1015  */
1016  newLeafBuffer = InvalidBuffer;
1017  Assert(includeNew);
1018  Assert(nToInsert == 0);
1019  }
1020  else
1021  {
1022  /* We will need another leaf page */
1023  uint8 *nodePageSelect;
1024  int curspace;
1025  int newspace;
1026 
1027  newLeafBuffer = SpGistGetBuffer(index,
1028  GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1029  Min(totalLeafSizes,
1031  &xlrec.initDest);
1032 
1033  /*
1034  * Attempt to assign node groups to the two pages. We might fail to
1035  * do so, even if totalLeafSizes is less than the available space,
1036  * because we can't split a group across pages.
1037  */
1038  nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1039 
1040  curspace = currentFreeSpace;
1041  newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1042  for (i = 0; i < out.nNodes; i++)
1043  {
1044  if (leafSizes[i] <= curspace)
1045  {
1046  nodePageSelect[i] = 0; /* signifies current page */
1047  curspace -= leafSizes[i];
1048  }
1049  else
1050  {
1051  nodePageSelect[i] = 1; /* signifies new leaf page */
1052  newspace -= leafSizes[i];
1053  }
1054  }
1055  if (curspace >= 0 && newspace >= 0)
1056  {
1057  /* Successful assignment, so we can include the new leaf tuple */
1058  if (includeNew)
1059  {
1060  nToInsert++;
1061  insertedNew = true;
1062  }
1063  }
1064  else if (includeNew)
1065  {
1066  /* We must exclude the new leaf tuple from the split */
1067  int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1068 
1069  leafSizes[nodeOfNewTuple] -=
1070  newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1071 
1072  /* Repeat the node assignment process --- should succeed now */
1073  curspace = currentFreeSpace;
1074  newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1075  for (i = 0; i < out.nNodes; i++)
1076  {
1077  if (leafSizes[i] <= curspace)
1078  {
1079  nodePageSelect[i] = 0; /* signifies current page */
1080  curspace -= leafSizes[i];
1081  }
1082  else
1083  {
1084  nodePageSelect[i] = 1; /* signifies new leaf page */
1085  newspace -= leafSizes[i];
1086  }
1087  }
1088  if (curspace < 0 || newspace < 0)
1089  elog(ERROR, "failed to divide leaf tuple groups across pages");
1090  }
1091  else
1092  {
1093  /* oops, we already excluded new tuple ... should not get here */
1094  elog(ERROR, "failed to divide leaf tuple groups across pages");
1095  }
1096  /* Expand the per-node assignments to be shown per leaf tuple */
1097  for (i = 0; i < nToInsert; i++)
1098  {
1099  n = out.mapTuplesToNodes[i];
1100  leafPageSelect[i] = nodePageSelect[n];
1101  }
1102  }
1103 
1104  /* Start preparing WAL record */
1105  xlrec.nDelete = 0;
1106  xlrec.initSrc = isNew;
1107  xlrec.storesNulls = isNulls;
1108  xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1109 
1110  leafdata = leafptr = (char *) palloc(totalLeafSizes);
1111 
1112  /* Here we begin making the changes to the target pages */
1114 
1115  /*
1116  * Delete old leaf tuples from current buffer, except when we're splitting
1117  * the root; in that case there's no need because we'll re-init the page
1118  * below. We do this first to make room for reinserting new leaf tuples.
1119  */
1120  if (!SpGistBlockIsRoot(current->blkno))
1121  {
1122  /*
1123  * Init buffer instead of deleting individual tuples, but only if
1124  * there aren't any other live tuples and only during build; otherwise
1125  * we need to set a redirection tuple for concurrent scans.
1126  */
1127  if (state->isBuild &&
1128  nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1129  PageGetMaxOffsetNumber(current->page))
1130  {
1131  SpGistInitBuffer(current->buffer,
1132  SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1133  xlrec.initSrc = true;
1134  }
1135  else if (isNew)
1136  {
1137  /* don't expose the freshly init'd buffer as a backup block */
1138  Assert(nToDelete == 0);
1139  }
1140  else
1141  {
1142  xlrec.nDelete = nToDelete;
1143 
1144  if (!state->isBuild)
1145  {
1146  /*
1147  * Need to create redirect tuple (it will point to new inner
1148  * tuple) but right now the new tuple's location is not known
1149  * yet. So, set the redirection pointer to "impossible" value
1150  * and remember its position to update tuple later.
1151  */
1152  if (nToDelete > 0)
1153  redirectTuplePos = toDelete[0];
1154  spgPageIndexMultiDelete(state, current->page,
1155  toDelete, nToDelete,
1160  }
1161  else
1162  {
1163  /*
1164  * During index build there is not concurrent searches, so we
1165  * don't need to create redirection tuple.
1166  */
1167  spgPageIndexMultiDelete(state, current->page,
1168  toDelete, nToDelete,
1173  }
1174  }
1175  }
1176 
1177  /*
1178  * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1179  * nodes.
1180  */
1181  startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1182  for (i = 0; i < nToInsert; i++)
1183  {
1184  SpGistLeafTuple it = newLeafs[i];
1185  Buffer leafBuffer;
1186  BlockNumber leafBlock;
1187  OffsetNumber newoffset;
1188 
1189  /* Which page is it going to? */
1190  leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1191  leafBlock = BufferGetBlockNumber(leafBuffer);
1192 
1193  /* Link tuple into correct chain for its node */
1194  n = out.mapTuplesToNodes[i];
1195 
1196  if (ItemPointerIsValid(&nodes[n]->t_tid))
1197  {
1198  Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1199  it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid);
1200  }
1201  else
1203 
1204  /* Insert it on page */
1205  newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1206  (Item) it, it->size,
1207  &startOffsets[leafPageSelect[i]],
1208  false);
1209  toInsert[i] = newoffset;
1210 
1211  /* ... and complete the chain linking */
1212  ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1213 
1214  /* Also copy leaf tuple into WAL data */
1215  memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1216  leafptr += newLeafs[i]->size;
1217  }
1218 
1219  /*
1220  * We're done modifying the other leaf buffer (if any), so mark it dirty.
1221  * current->buffer will be marked below, after we're entirely done
1222  * modifying it.
1223  */
1224  if (newLeafBuffer != InvalidBuffer)
1225  {
1226  MarkBufferDirty(newLeafBuffer);
1227  }
1228 
1229  /* Remember current buffer, since we're about to change "current" */
1230  saveCurrent = *current;
1231 
1232  /*
1233  * Store the new innerTuple
1234  */
1235  if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1236  {
1237  /*
1238  * new inner tuple goes to parent page
1239  */
1240  Assert(current->buffer != parent->buffer);
1241 
1242  /* Repoint "current" at the new inner tuple */
1243  current->blkno = parent->blkno;
1244  current->buffer = parent->buffer;
1245  current->page = parent->page;
1246  xlrec.offnumInner = current->offnum =
1247  SpGistPageAddNewItem(state, current->page,
1248  (Item) innerTuple, innerTuple->size,
1249  NULL, false);
1250 
1251  /*
1252  * Update parent node link and mark parent page dirty
1253  */
1254  xlrec.innerIsParent = true;
1255  xlrec.offnumParent = parent->offnum;
1256  xlrec.nodeI = parent->node;
1257  saveNodeLink(index, parent, current->blkno, current->offnum);
1258 
1259  /*
1260  * Update redirection link (in old current buffer)
1261  */
1262  if (redirectTuplePos != InvalidOffsetNumber)
1263  setRedirectionTuple(&saveCurrent, redirectTuplePos,
1264  current->blkno, current->offnum);
1265 
1266  /* Done modifying old current buffer, mark it dirty */
1267  MarkBufferDirty(saveCurrent.buffer);
1268  }
1269  else if (parent->buffer != InvalidBuffer)
1270  {
1271  /*
1272  * new inner tuple will be stored on a new page
1273  */
1274  Assert(newInnerBuffer != InvalidBuffer);
1275 
1276  /* Repoint "current" at the new inner tuple */
1277  current->buffer = newInnerBuffer;
1278  current->blkno = BufferGetBlockNumber(current->buffer);
1279  current->page = BufferGetPage(current->buffer);
1280  xlrec.offnumInner = current->offnum =
1281  SpGistPageAddNewItem(state, current->page,
1282  (Item) innerTuple, innerTuple->size,
1283  NULL, false);
1284 
1285  /* Done modifying new current buffer, mark it dirty */
1286  MarkBufferDirty(current->buffer);
1287 
1288  /*
1289  * Update parent node link and mark parent page dirty
1290  */
1291  xlrec.innerIsParent = (parent->buffer == current->buffer);
1292  xlrec.offnumParent = parent->offnum;
1293  xlrec.nodeI = parent->node;
1294  saveNodeLink(index, parent, current->blkno, current->offnum);
1295 
1296  /*
1297  * Update redirection link (in old current buffer)
1298  */
1299  if (redirectTuplePos != InvalidOffsetNumber)
1300  setRedirectionTuple(&saveCurrent, redirectTuplePos,
1301  current->blkno, current->offnum);
1302 
1303  /* Done modifying old current buffer, mark it dirty */
1304  MarkBufferDirty(saveCurrent.buffer);
1305  }
1306  else
1307  {
1308  /*
1309  * Splitting root page, which was a leaf but now becomes inner page
1310  * (and so "current" continues to point at it)
1311  */
1312  Assert(SpGistBlockIsRoot(current->blkno));
1313  Assert(redirectTuplePos == InvalidOffsetNumber);
1314 
1315  SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1316  xlrec.initInner = true;
1317  xlrec.innerIsParent = false;
1318 
1319  xlrec.offnumInner = current->offnum =
1320  PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1321  InvalidOffsetNumber, false, false);
1322  if (current->offnum != FirstOffsetNumber)
1323  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1324  innerTuple->size);
1325 
1326  /* No parent link to update, nor redirection to do */
1328  xlrec.nodeI = 0;
1329 
1330  /* Done modifying new current buffer, mark it dirty */
1331  MarkBufferDirty(current->buffer);
1332 
1333  /* saveCurrent doesn't represent a different buffer */
1334  saveCurrent.buffer = InvalidBuffer;
1335  }
1336 
1337  if (RelationNeedsWAL(index))
1338  {
1339  XLogRecPtr recptr;
1340  int flags;
1341 
1342  XLogBeginInsert();
1343 
1344  xlrec.nInsert = nToInsert;
1345  XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
1346 
1347  XLogRegisterData((char *) toDelete,
1348  sizeof(OffsetNumber) * xlrec.nDelete);
1349  XLogRegisterData((char *) toInsert,
1350  sizeof(OffsetNumber) * xlrec.nInsert);
1351  XLogRegisterData((char *) leafPageSelect,
1352  sizeof(uint8) * xlrec.nInsert);
1353  XLogRegisterData((char *) innerTuple, innerTuple->size);
1354  XLogRegisterData(leafdata, leafptr - leafdata);
1355 
1356  /* Old leaf page */
1357  if (BufferIsValid(saveCurrent.buffer))
1358  {
1359  flags = REGBUF_STANDARD;
1360  if (xlrec.initSrc)
1361  flags |= REGBUF_WILL_INIT;
1362  XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1363  }
1364 
1365  /* New leaf page */
1366  if (BufferIsValid(newLeafBuffer))
1367  {
1368  flags = REGBUF_STANDARD;
1369  if (xlrec.initDest)
1370  flags |= REGBUF_WILL_INIT;
1371  XLogRegisterBuffer(1, newLeafBuffer, flags);
1372  }
1373 
1374  /* Inner page */
1375  flags = REGBUF_STANDARD;
1376  if (xlrec.initInner)
1377  flags |= REGBUF_WILL_INIT;
1378  XLogRegisterBuffer(2, current->buffer, flags);
1379 
1380  /* Parent page, if different from inner page */
1381  if (parent->buffer != InvalidBuffer)
1382  {
1383  if (parent->buffer != current->buffer)
1385  else
1386  Assert(xlrec.innerIsParent);
1387  }
1388 
1389  /* Issue the WAL record */
1390  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1391 
1392  /* Update page LSNs on all affected pages */
1393  if (newLeafBuffer != InvalidBuffer)
1394  {
1395  Page page = BufferGetPage(newLeafBuffer);
1396 
1397  PageSetLSN(page, recptr);
1398  }
1399 
1400  if (saveCurrent.buffer != InvalidBuffer)
1401  {
1402  Page page = BufferGetPage(saveCurrent.buffer);
1403 
1404  PageSetLSN(page, recptr);
1405  }
1406 
1407  PageSetLSN(current->page, recptr);
1408 
1409  if (parent->buffer != InvalidBuffer)
1410  {
1411  PageSetLSN(parent->page, recptr);
1412  }
1413  }
1414 
1415  END_CRIT_SECTION();
1416 
1417  /* Update local free-space cache and unlock buffers */
1418  if (newLeafBuffer != InvalidBuffer)
1419  {
1420  SpGistSetLastUsedPage(index, newLeafBuffer);
1421  UnlockReleaseBuffer(newLeafBuffer);
1422  }
1423  if (saveCurrent.buffer != InvalidBuffer)
1424  {
1425  SpGistSetLastUsedPage(index, saveCurrent.buffer);
1426  UnlockReleaseBuffer(saveCurrent.buffer);
1427  }
1428 
1429  return insertedNew;
1430 }
#define ItemPointerIsValid(pointer)
Definition: itemptr.h:59
#define SPGIST_DEAD
Definition: fmgr.h:53
uint16 nDelete
Definition: spgxlog.h:169
static void saveNodeLink(Relation index, SPPageDesc *parent, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:186
#define SpGistPageGetFreeSpace(p, n)
FmgrInfo * index_getprocinfo(Relation irel, AttrNumber attnum, uint16 procnum)
Definition: indexam.c:855
#define SGDTSIZE
#define SPGIST_REDIRECT
Datum * leafTupleDatums
Definition: spgist.h:127
Datum * datums
Definition: spgist.h:114
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:1445
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:213
#define PointerGetDatum(X)
Definition: postgres.h:564
#define SGITITERATE(x, i, nt)
spgxlogState stateSrc
Definition: spgxlog.h:185
#define SizeOfSpgxlogPickSplit
Definition: spgxlog.h:199
#define SPGIST_PLACEHOLDER
#define Min(x, y)
Definition: c.h:802
#define END_CRIT_SECTION()
Definition: miscadmin.h:132
#define SPGIST_NULLS
void SpGistSetLastUsedPage(Relation index, Buffer buffer)
Definition: spgutils.c:464
unsigned char uint8
Definition: c.h:263
Pointer Item
Definition: item.h:17
#define InvalidBuffer
Definition: buf.h:25
#define REGBUF_WILL_INIT
Definition: xloginsert.h:32
#define START_CRIT_SECTION()
Definition: miscadmin.h:130
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:413
uint32 BlockNumber
Definition: block.h:31
uint16 nInsert
Definition: spgxlog.h:170
bool innerIsParent
Definition: spgxlog.h:181
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1306
#define PageGetMaxOffsetNumber(page)
Definition: bufpage.h:354
uint16 OffsetNumber
Definition: off.h:24
OffsetNumber offnumInner
Definition: spgxlog.h:175
#define XLOG_SPGIST_PICKSPLIT
Definition: spgxlog.h:26
unsigned int allTheSame
void spgPageIndexMultiDelete(SpGistState *state, Page page, OffsetNumber *itemnos, int nitems, int firststate, int reststate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:131
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3315
Oid * rd_indcollation
Definition: rel.h:189
#define ERROR
Definition: elog.h:43
Datum * nodeLabels
Definition: spgist.h:124
#define GBUF_INNER_PARITY(x)
OffsetNumber offnumParent
Definition: spgxlog.h:182
uint16 nodeI
Definition: spgxlog.h:183
#define FirstOffsetNumber
Definition: off.h:27
#define REGBUF_STANDARD
Definition: xloginsert.h:35
SpGistInnerTuple spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix, int nNodes, SpGistNodeTuple *nodes)
Definition: spgutils.c:671
#define SPGIST_METAPAGE_BLKNO
OffsetNumber offnum
Definition: spgdoinsert.c:39
static void setRedirectionTuple(SPPageDesc *current, OffsetNumber position, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:567
struct ItemIdData ItemIdData
static bool checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig, bool *includeNew)
Definition: spgdoinsert.c:598
void SpGistInitBuffer(Buffer b, uint16 f)
Definition: spgutils.c:514
#define BufferGetPage(buffer)
Definition: bufmgr.h:160
#define GBUF_LEAF
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:232
SpGistLeafTuple spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, Datum datum, bool isnull)
Definition: spgutils.c:592
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
unsigned int tupstate
void * palloc0(Size size)
Definition: mcxt.c:920
uintptr_t Datum
Definition: postgres.h:374
static char * label
Definition: pg_basebackup.c:83
#define SPGIST_PAGE_CAPACITY
#define STORE_STATE(s, d)
#define InvalidOffsetNumber
Definition: off.h:26
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:671
Buffer SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
Definition: spgutils.c:359
bool hasPrefix
Definition: spgist.h:120
#define SpGistBlockIsRoot(blkno)
#define SPGIST_LIVE
#define InvalidBlockNumber
Definition: block.h:33
#define SGLTDATUM(x, s)
#define BufferIsValid(bufnum)
Definition: bufmgr.h:114
#define GBUF_NULLS
OffsetNumber nextOffset
#define ItemPointerGetOffsetNumber(pointer)
Definition: itemptr.h:76
#define RelationNeedsWAL(relation)
Definition: rel.h:502
Size PageGetExactFreeSpace(Page page)
Definition: bufpage.c:606
#define SpGistPageGetOpaque(page)
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:2588
void * palloc(Size size)
Definition: mcxt.c:891
int i
int * mapTuplesToNodes
Definition: spgist.h:126
bool storesNulls
Definition: spgxlog.h:178
Buffer buffer
Definition: spgdoinsert.c:37
Datum prefixDatum
Definition: spgist.h:121
ItemPointerData heapPtr
#define elog
Definition: elog.h:219
#define ItemPointerGetBlockNumber(pointer)
Definition: itemptr.h:66
SpGistLeafTupleData * SpGistLeafTuple
void XLogBeginInsert(void)
Definition: xloginsert.c:120
#define SPGIST_LEAF
#define PageSetLSN(page, lsn)
Definition: bufpage.h:365
#define SPGIST_PICKSPLIT_PROC
Definition: spgist.h:30
int Buffer
Definition: buf.h:23
OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size, OffsetNumber *startOffset, bool errorOK)
Definition: spgutils.c:827
SpGistNodeTuple spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
Definition: spgutils.c:629
bool isRootSplit
Definition: spgxlog.h:167
#define PageGetItem(page, itemId)
Definition: bufpage.h:337
Pointer Page
Definition: bufpage.h:74
#define ItemPointerSet(pointer, blockNumber, offNum)
Definition: itemptr.h:86
BlockNumber blkno
Definition: spgdoinsert.c:36
static void moveLeafs ( Relation  index,
SpGistState state,
SPPageDesc current,
SPPageDesc parent,
SpGistLeafTuple  newLeafTuple,
bool  isNulls 
)
static

Definition at line 387 of file spgdoinsert.c.

References Assert, SPPageDesc::blkno, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage, elog, END_CRIT_SECTION, ERROR, FirstOffsetNumber, GBUF_LEAF, GBUF_NULLS, i, InvalidBuffer, InvalidOffsetNumber, SpGistState::isBuild, MarkBufferDirty(), spgxlogMoveLeafs::newPage, SpGistLeafTupleData::nextOffset, spgxlogMoveLeafs::nMoves, SPPageDesc::node, spgxlogMoveLeafs::nodeI, SPPageDesc::offnum, spgxlogMoveLeafs::offnumParent, SPPageDesc::page, PageGetItem, PageGetItemId, PageGetMaxOffsetNumber, PageSetLSN, palloc(), REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, spgxlogMoveLeafs::replaceDead, saveNodeLink(), SpGistLeafTupleData::size, SizeOfSpgxlogMoveLeafs, SPGIST_DEAD, SPGIST_LIVE, SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistGetBuffer(), SpGistPageAddNewItem(), SpGistSetLastUsedPage(), spgPageIndexMultiDelete(), START_CRIT_SECTION, spgxlogMoveLeafs::stateSrc, STORE_STATE, spgxlogMoveLeafs::storesNulls, SpGistLeafTupleData::tupstate, UnlockReleaseBuffer(), XLOG_SPGIST_MOVE_LEAFS, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

390 {
391  int i,
392  nDelete,
393  nInsert,
394  size;
395  Buffer nbuf;
396  Page npage;
397  SpGistLeafTuple it;
399  startOffset = InvalidOffsetNumber;
400  bool replaceDead = false;
401  OffsetNumber *toDelete;
402  OffsetNumber *toInsert;
403  BlockNumber nblkno;
404  spgxlogMoveLeafs xlrec;
405  char *leafdata,
406  *leafptr;
407 
408  /* This doesn't work on root page */
409  Assert(parent->buffer != InvalidBuffer);
410  Assert(parent->buffer != current->buffer);
411 
412  /* Locate the tuples to be moved, and count up the space needed */
413  i = PageGetMaxOffsetNumber(current->page);
414  toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
415  toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
416 
417  size = newLeafTuple->size + sizeof(ItemIdData);
418 
419  nDelete = 0;
420  i = current->offnum;
421  while (i != InvalidOffsetNumber)
422  {
423  SpGistLeafTuple it;
424 
425  Assert(i >= FirstOffsetNumber &&
426  i <= PageGetMaxOffsetNumber(current->page));
427  it = (SpGistLeafTuple) PageGetItem(current->page,
428  PageGetItemId(current->page, i));
429 
430  if (it->tupstate == SPGIST_LIVE)
431  {
432  toDelete[nDelete] = i;
433  size += it->size + sizeof(ItemIdData);
434  nDelete++;
435  }
436  else if (it->tupstate == SPGIST_DEAD)
437  {
438  /* We could see a DEAD tuple as first/only chain item */
439  Assert(i == current->offnum);
441  /* We don't want to move it, so don't count it in size */
442  toDelete[nDelete] = i;
443  nDelete++;
444  replaceDead = true;
445  }
446  else
447  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
448 
449  i = it->nextOffset;
450  }
451 
452  /* Find a leaf page that will hold them */
453  nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
454  size, &xlrec.newPage);
455  npage = BufferGetPage(nbuf);
456  nblkno = BufferGetBlockNumber(nbuf);
457  Assert(nblkno != current->blkno);
458 
459  leafdata = leafptr = palloc(size);
460 
462 
463  /* copy all the old tuples to new page, unless they're dead */
464  nInsert = 0;
465  if (!replaceDead)
466  {
467  for (i = 0; i < nDelete; i++)
468  {
469  it = (SpGistLeafTuple) PageGetItem(current->page,
470  PageGetItemId(current->page, toDelete[i]));
471  Assert(it->tupstate == SPGIST_LIVE);
472 
473  /*
474  * Update chain link (notice the chain order gets reversed, but we
475  * don't care). We're modifying the tuple on the source page
476  * here, but it's okay since we're about to delete it.
477  */
478  it->nextOffset = r;
479 
480  r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
481  &startOffset, false);
482 
483  toInsert[nInsert] = r;
484  nInsert++;
485 
486  /* save modified tuple into leafdata as well */
487  memcpy(leafptr, it, it->size);
488  leafptr += it->size;
489  }
490  }
491 
492  /* add the new tuple as well */
493  newLeafTuple->nextOffset = r;
494  r = SpGistPageAddNewItem(state, npage,
495  (Item) newLeafTuple, newLeafTuple->size,
496  &startOffset, false);
497  toInsert[nInsert] = r;
498  nInsert++;
499  memcpy(leafptr, newLeafTuple, newLeafTuple->size);
500  leafptr += newLeafTuple->size;
501 
502  /*
503  * Now delete the old tuples, leaving a redirection pointer behind for the
504  * first one, unless we're doing an index build; in which case there can't
505  * be any concurrent scan so we need not provide a redirect.
506  */
507  spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
510  nblkno, r);
511 
512  /* Update parent's downlink and mark parent page dirty */
513  saveNodeLink(index, parent, nblkno, r);
514 
515  /* Mark the leaf pages too */
516  MarkBufferDirty(current->buffer);
517  MarkBufferDirty(nbuf);
518 
519  if (RelationNeedsWAL(index))
520  {
521  XLogRecPtr recptr;
522 
523  /* prepare WAL info */
524  STORE_STATE(state, xlrec.stateSrc);
525 
526  xlrec.nMoves = nDelete;
527  xlrec.replaceDead = replaceDead;
528  xlrec.storesNulls = isNulls;
529 
530  xlrec.offnumParent = parent->offnum;
531  xlrec.nodeI = parent->node;
532 
533  XLogBeginInsert();
534  XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
535  XLogRegisterData((char *) toDelete,
536  sizeof(OffsetNumber) * nDelete);
537  XLogRegisterData((char *) toInsert,
538  sizeof(OffsetNumber) * nInsert);
539  XLogRegisterData((char *) leafdata, leafptr - leafdata);
540 
544 
545  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
546 
547  PageSetLSN(current->page, recptr);
548  PageSetLSN(npage, recptr);
549  PageSetLSN(parent->page, recptr);
550  }
551 
553 
554  /* Update local free-space cache and release new buffer */
555  SpGistSetLastUsedPage(index, nbuf);
556  UnlockReleaseBuffer(nbuf);
557 }
#define XLOG_SPGIST_MOVE_LEAFS
Definition: spgxlog.h:23
#define SPGIST_DEAD
static void saveNodeLink(Relation index, SPPageDesc *parent, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:186
#define SPGIST_REDIRECT
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:1445
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:213
uint16 nodeI
Definition: spgxlog.h:73
#define SizeOfSpgxlogMoveLeafs
Definition: spgxlog.h:91
#define SPGIST_PLACEHOLDER
#define END_CRIT_SECTION()
Definition: miscadmin.h:132
void SpGistSetLastUsedPage(Relation index, Buffer buffer)
Definition: spgutils.c:464
Pointer Item
Definition: item.h:17
#define InvalidBuffer
Definition: buf.h:25
#define REGBUF_WILL_INIT
Definition: xloginsert.h:32
#define START_CRIT_SECTION()
Definition: miscadmin.h:130
uint32 BlockNumber
Definition: block.h:31
#define PageGetMaxOffsetNumber(page)
Definition: bufpage.h:354
uint16 OffsetNumber
Definition: off.h:24
void spgPageIndexMultiDelete(SpGistState *state, Page page, OffsetNumber *itemnos, int nitems, int firststate, int reststate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:131
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3315
#define ERROR
Definition: elog.h:43
spgxlogState stateSrc
Definition: spgxlog.h:75
#define FirstOffsetNumber
Definition: off.h:27
#define REGBUF_STANDARD
Definition: xloginsert.h:35
OffsetNumber offnum
Definition: spgdoinsert.c:39
struct ItemIdData ItemIdData
#define BufferGetPage(buffer)
Definition: bufmgr.h:160
bool replaceDead
Definition: spgxlog.h:68
#define GBUF_LEAF
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:232
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
unsigned int tupstate
#define STORE_STATE(s, d)
#define InvalidOffsetNumber
Definition: off.h:26
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:671
Buffer SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
Definition: spgutils.c:359
#define SPGIST_LIVE
#define GBUF_NULLS
OffsetNumber nextOffset
#define RelationNeedsWAL(relation)
Definition: rel.h:502
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:2588
void * palloc(Size size)
Definition: mcxt.c:891
int i
OffsetNumber offnumParent
Definition: spgxlog.h:72
bool storesNulls
Definition: spgxlog.h:69
Buffer buffer
Definition: spgdoinsert.c:37
uint16 nMoves
Definition: spgxlog.h:66
#define elog
Definition: elog.h:219
SpGistLeafTupleData * SpGistLeafTuple
void XLogBeginInsert(void)
Definition: xloginsert.c:120
#define PageSetLSN(page, lsn)
Definition: bufpage.h:365
int Buffer
Definition: buf.h:23
OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size, OffsetNumber *startOffset, bool errorOK)
Definition: spgutils.c:827
#define PageGetItem(page, itemId)
Definition: bufpage.h:337
Pointer Page
Definition: bufpage.h:74
BlockNumber blkno
Definition: spgdoinsert.c:36
static void saveNodeLink ( Relation  index,
SPPageDesc parent,
BlockNumber  blkno,
OffsetNumber  offnum 
)
static

Definition at line 186 of file spgdoinsert.c.

References SPPageDesc::buffer, MarkBufferDirty(), SPPageDesc::node, SPPageDesc::offnum, SPPageDesc::page, PageGetItem, PageGetItemId, and spgUpdateNodeLink().

Referenced by addLeafTuple(), doPickSplit(), moveLeafs(), and spgAddNodeAction().

188 {
189  SpGistInnerTuple innerTuple;
190 
191  innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
192  PageGetItemId(parent->page, parent->offnum));
193 
194  spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
195 
196  MarkBufferDirty(parent->buffer);
197 }
void spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN, BlockNumber blkno, OffsetNumber offset)
Definition: spgdoinsert.c:50
SpGistInnerTupleData * SpGistInnerTuple
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:1445
OffsetNumber offnum
Definition: spgdoinsert.c:39
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:232
Buffer buffer
Definition: spgdoinsert.c:37
#define PageGetItem(page, itemId)
Definition: bufpage.h:337
static void setRedirectionTuple ( SPPageDesc current,
OffsetNumber  position,
BlockNumber  blkno,
OffsetNumber  offnum 
)
static

Definition at line 567 of file spgdoinsert.c.

References Assert, ItemPointerGetBlockNumber, ItemPointerSet, SPPageDesc::page, PageGetItem, PageGetItemId, SpGistDeadTupleData::pointer, SPGIST_METAPAGE_BLKNO, SPGIST_REDIRECT, and SpGistDeadTupleData::tupstate.

Referenced by doPickSplit().

569 {
570  SpGistDeadTuple dt;
571 
572  dt = (SpGistDeadTuple) PageGetItem(current->page,
573  PageGetItemId(current->page, position));
576  ItemPointerSet(&dt->pointer, blkno, offnum);
577 }
#define SPGIST_REDIRECT
ItemPointerData pointer
#define SPGIST_METAPAGE_BLKNO
SpGistDeadTupleData * SpGistDeadTuple
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:232
unsigned int tupstate
#define Assert(condition)
Definition: c.h:671
#define ItemPointerGetBlockNumber(pointer)
Definition: itemptr.h:66
#define PageGetItem(page, itemId)
Definition: bufpage.h:337
#define ItemPointerSet(pointer, blockNumber, offNum)
Definition: itemptr.h:86
static void spgAddNodeAction ( Relation  index,
SpGistState state,
SpGistInnerTuple  innerTuple,
SPPageDesc current,
SPPageDesc parent,
int  nodeN,
Datum  nodeLabel 
)
static

Definition at line 1490 of file spgdoinsert.c.

References addNode(), Assert, SPPageDesc::blkno, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage, elog, END_CRIT_SECTION, ERROR, GBUF_INNER_PARITY, InvalidBlockNumber, InvalidBuffer, InvalidOffsetNumber, SpGistState::isBuild, MarkBufferDirty(), spgxlogAddNode::newPage, SPPageDesc::node, spgxlogAddNode::nodeI, NULL, SPPageDesc::offnum, spgxlogAddNode::offnum, spgxlogAddNode::offnumNew, spgxlogAddNode::offnumParent, SPPageDesc::page, PageAddItem, PageGetExactFreeSpace(), PageIndexTupleDelete(), PageSetLSN, spgxlogAddNode::parentBlk, REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, saveNodeLink(), SpGistInnerTupleData::size, SpGistDeadTupleData::size, spgFormDeadTuple(), SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistBlockIsRoot, SpGistGetBuffer(), SpGistPageAddNewItem(), SpGistPageGetOpaque, SpGistPageStoresNulls, SpGistSetLastUsedPage(), START_CRIT_SECTION, spgxlogAddNode::stateSrc, STORE_STATE, UnlockReleaseBuffer(), XLOG_SPGIST_ADD_NODE, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

1494 {
1495  SpGistInnerTuple newInnerTuple;
1496  spgxlogAddNode xlrec;
1497 
1498  /* Should not be applied to nulls */
1499  Assert(!SpGistPageStoresNulls(current->page));
1500 
1501  /* Construct new inner tuple with additional node */
1502  newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1503 
1504  /* Prepare WAL record */
1505  STORE_STATE(state, xlrec.stateSrc);
1506  xlrec.offnum = current->offnum;
1507 
1508  /* we don't fill these unless we need to change the parent downlink */
1509  xlrec.parentBlk = -1;
1511  xlrec.nodeI = 0;
1512 
1513  /* we don't fill these unless tuple has to be moved */
1515  xlrec.newPage = false;
1516 
1517  if (PageGetExactFreeSpace(current->page) >=
1518  newInnerTuple->size - innerTuple->size)
1519  {
1520  /*
1521  * We can replace the inner tuple by new version in-place
1522  */
1524 
1525  PageIndexTupleDelete(current->page, current->offnum);
1526  if (PageAddItem(current->page,
1527  (Item) newInnerTuple, newInnerTuple->size,
1528  current->offnum, false, false) != current->offnum)
1529  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1530  newInnerTuple->size);
1531 
1532  MarkBufferDirty(current->buffer);
1533 
1534  if (RelationNeedsWAL(index))
1535  {
1536  XLogRecPtr recptr;
1537 
1538  XLogBeginInsert();
1539  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1540  XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1541 
1543 
1544  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1545 
1546  PageSetLSN(current->page, recptr);
1547  }
1548 
1549  END_CRIT_SECTION();
1550  }
1551  else
1552  {
1553  /*
1554  * move inner tuple to another page, and update parent
1555  */
1556  SpGistDeadTuple dt;
1557  SPPageDesc saveCurrent;
1558 
1559  /*
1560  * It should not be possible to get here for the root page, since we
1561  * allow only one inner tuple on the root page, and spgFormInnerTuple
1562  * always checks that inner tuples don't exceed the size of a page.
1563  */
1564  if (SpGistBlockIsRoot(current->blkno))
1565  elog(ERROR, "cannot enlarge root tuple any more");
1566  Assert(parent->buffer != InvalidBuffer);
1567 
1568  saveCurrent = *current;
1569 
1570  xlrec.offnumParent = parent->offnum;
1571  xlrec.nodeI = parent->node;
1572 
1573  /*
1574  * obtain new buffer with the same parity as current, since it will be
1575  * a child of same parent tuple
1576  */
1577  current->buffer = SpGistGetBuffer(index,
1578  GBUF_INNER_PARITY(current->blkno),
1579  newInnerTuple->size + sizeof(ItemIdData),
1580  &xlrec.newPage);
1581  current->blkno = BufferGetBlockNumber(current->buffer);
1582  current->page = BufferGetPage(current->buffer);
1583 
1584  /*
1585  * Let's just make real sure new current isn't same as old. Right now
1586  * that's impossible, but if SpGistGetBuffer ever got smart enough to
1587  * delete placeholder tuples before checking space, maybe it wouldn't
1588  * be impossible. The case would appear to work except that WAL
1589  * replay would be subtly wrong, so I think a mere assert isn't enough
1590  * here.
1591  */
1592  if (current->blkno == saveCurrent.blkno)
1593  elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1594 
1595  /*
1596  * New current and parent buffer will both be modified; but note that
1597  * parent buffer could be same as either new or old current.
1598  */
1599  if (parent->buffer == saveCurrent.buffer)
1600  xlrec.parentBlk = 0;
1601  else if (parent->buffer == current->buffer)
1602  xlrec.parentBlk = 1;
1603  else
1604  xlrec.parentBlk = 2;
1605 
1607 
1608  /* insert new ... */
1609  xlrec.offnumNew = current->offnum =
1610  SpGistPageAddNewItem(state, current->page,
1611  (Item) newInnerTuple, newInnerTuple->size,
1612  NULL, false);
1613 
1614  MarkBufferDirty(current->buffer);
1615 
1616  /* update parent's downlink and mark parent page dirty */
1617  saveNodeLink(index, parent, current->blkno, current->offnum);
1618 
1619  /*
1620  * Replace old tuple with a placeholder or redirection tuple. Unless
1621  * doing an index build, we have to insert a redirection tuple for
1622  * possible concurrent scans. We can't just delete it in any case,
1623  * because that could change the offsets of other tuples on the page,
1624  * breaking downlinks from their parents.
1625  */
1626  if (state->isBuild)
1629  else
1630  dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1631  current->blkno, current->offnum);
1632 
1633  PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1634  if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1635  saveCurrent.offnum,
1636  false, false) != saveCurrent.offnum)
1637  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1638  dt->size);
1639 
1640  if (state->isBuild)
1641  SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1642  else
1643  SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1644 
1645  MarkBufferDirty(saveCurrent.buffer);
1646 
1647  if (RelationNeedsWAL(index))
1648  {
1649  XLogRecPtr recptr;
1650  int flags;
1651 
1652  XLogBeginInsert();
1653 
1654  /* orig page */
1655  XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1656  /* new page */
1657  flags = REGBUF_STANDARD;
1658  if (xlrec.newPage)
1659  flags |= REGBUF_WILL_INIT;
1660  XLogRegisterBuffer(1, current->buffer, flags);
1661  /* parent page (if different from orig and new) */
1662  if (xlrec.parentBlk == 2)
1664 
1665  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1666  XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1667 
1668  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1669 
1670  /* we don't bother to check if any of these are redundant */
1671  PageSetLSN(current->page, recptr);
1672  PageSetLSN(parent->page, recptr);
1673  PageSetLSN(saveCurrent.page, recptr);
1674  }
1675 
1676  END_CRIT_SECTION();
1677 
1678  /* Release saveCurrent if it's not same as current or parent */
1679  if (saveCurrent.buffer != current->buffer &&
1680  saveCurrent.buffer != parent->buffer)
1681  {
1682  SpGistSetLastUsedPage(index, saveCurrent.buffer);
1683  UnlockReleaseBuffer(saveCurrent.buffer);
1684  }
1685  }
1686 }
static void saveNodeLink(Relation index, SPPageDesc *parent, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:186
SpGistDeadTuple spgFormDeadTuple(SpGistState *state, int tupstate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgutils.c:754
#define SPGIST_REDIRECT
void PageIndexTupleDelete(Page page, OffsetNumber offnum)
Definition: bufpage.c:700
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:1445
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:213
#define SPGIST_PLACEHOLDER
#define END_CRIT_SECTION()
Definition: miscadmin.h:132
void SpGistSetLastUsedPage(Relation index, Buffer buffer)
Definition: spgutils.c:464
Pointer Item
Definition: item.h:17
#define InvalidBuffer
Definition: buf.h:25
#define REGBUF_WILL_INIT
Definition: xloginsert.h:32
#define START_CRIT_SECTION()
Definition: miscadmin.h:130
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:413
#define XLOG_SPGIST_ADD_NODE
Definition: spgxlog.h:24
OffsetNumber offnum
Definition: spgxlog.h:105
static SpGistInnerTuple addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
Definition: spgdoinsert.c:78
OffsetNumber offnumNew
Definition: spgxlog.h:111
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3315
#define ERROR
Definition: elog.h:43
#define GBUF_INNER_PARITY(x)
#define REGBUF_STANDARD
Definition: xloginsert.h:35
spgxlogState stateSrc
Definition: spgxlog.h:130
OffsetNumber offnum
Definition: spgdoinsert.c:39
#define BufferGetPage(buffer)
Definition: bufmgr.h:160
int8 parentBlk
Definition: spgxlog.h:125
uint16 nodeI
Definition: spgxlog.h:128
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
bool newPage
Definition: spgxlog.h:112
#define STORE_STATE(s, d)
#define InvalidOffsetNumber
Definition: off.h:26
#define SpGistPageStoresNulls(page)
OffsetNumber offnumParent
Definition: spgxlog.h:126
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:671
Buffer SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
Definition: spgutils.c:359
#define SpGistBlockIsRoot(blkno)
#define InvalidBlockNumber
Definition: block.h:33
#define RelationNeedsWAL(relation)
Definition: rel.h:502
Size PageGetExactFreeSpace(Page page)
Definition: bufpage.c:606
#define SpGistPageGetOpaque(page)
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:2588
Buffer buffer
Definition: spgdoinsert.c:37
#define elog
Definition: elog.h:219
void XLogBeginInsert(void)
Definition: xloginsert.c:120
#define PageSetLSN(page, lsn)
Definition: bufpage.h:365
OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size, OffsetNumber *startOffset, bool errorOK)
Definition: spgutils.c:827
BlockNumber blkno
Definition: spgdoinsert.c:36
bool spgdoinsert ( Relation  index,
SpGistState state,
ItemPointer  heapPtr,
Datum  datum,
bool  isnull 
)

Definition at line 1891 of file spgdoinsert.c.

References addLeafTuple(), spgChooseOut::addNode, spgChooseIn::allTheSame, SpGistInnerTupleData::allTheSame, Assert, SpGistTypeDesc::attlen, SpGistState::attType, SPPageDesc::blkno, SPPageDesc::buffer, BUFFER_LOCK_EXCLUSIVE, BufferGetBlockNumber(), BufferGetPage, CHECK_FOR_INTERRUPTS, checkSplitConditions(), ConditionalLockBuffer(), SpGistState::config, spgChooseIn::datum, doPickSplit(), elog, ereport, errcode(), errhint(), errmsg(), ERROR, FirstOffsetNumber, FunctionCall2Coll(), GBUF_LEAF, GBUF_NULLS, spgChooseIn::hasPrefix, index_getprocinfo(), InvalidBlockNumber, InvalidBuffer, InvalidOffsetNumber, spgChooseIn::leafDatum, spgChooseIn::level, LockBuffer(), spgConfigOut::longValuesOK, spgChooseOut::matchNode, Min, moveLeafs(), spgChooseIn::nNodes, SpGistInnerTupleData::nNodes, SPPageDesc::node, spgChooseIn::nodeLabels, NULL, SPPageDesc::offnum, SPPageDesc::page, PageGetItem, PageGetItemId, pfree(), PG_DETOAST_DATUM, PointerGetDatum, spgChooseIn::prefixDatum, SpGistInnerTupleData::prefixSize, random(), RelationData::rd_indcollation, ReadBuffer(), RelationGetRelationName, ReleaseBuffer(), spgChooseOut::result, spgChooseOut::resultType, SGDTSIZE, SGITDATUM, SGLTHDRSZ, SpGistLeafTupleData::size, spgAddNode, spgAddNodeAction(), spgExtractNodeLabels(), spgFormLeafTuple(), SPGIST_CHOOSE_PROC, SPGIST_NULL_BLKNO, SPGIST_PAGE_CAPACITY, SPGIST_ROOT_BLKNO, SpGistGetBuffer(), SpGistGetTypeSize(), SpGistPageGetFreeSpace, SpGistPageIsLeaf, SpGistPageStoresNulls, SpGistSetLastUsedPage(), spgMatchNode, spgMatchNodeAction(), spgSplitNodeAction(), spgSplitTuple, and UnlockReleaseBuffer().

Referenced by spginsert(), and spgistBuildCallback().

1893 {
1894  int level = 0;
1895  Datum leafDatum;
1896  int leafSize;
1897  SPPageDesc current,
1898  parent;
1899  FmgrInfo *procinfo = NULL;
1900 
1901  /*
1902  * Look up FmgrInfo of the user-defined choose function once, to save
1903  * cycles in the loop below.
1904  */
1905  if (!isnull)
1906  procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1907 
1908  /*
1909  * Since we don't use index_form_tuple in this AM, we have to make sure
1910  * value to be inserted is not toasted; FormIndexDatum doesn't guarantee
1911  * that.
1912  */
1913  if (!isnull && state->attType.attlen == -1)
1914  datum = PointerGetDatum(PG_DETOAST_DATUM(datum));
1915 
1916  leafDatum = datum;
1917 
1918  /*
1919  * Compute space needed for a leaf tuple containing the given datum.
1920  *
1921  * If it isn't gonna fit, and the opclass can't reduce the datum size by
1922  * suffixing, bail out now rather than getting into an endless loop.
1923  */
1924  if (!isnull)
1925  leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
1926  SpGistGetTypeSize(&state->attType, leafDatum);
1927  else
1928  leafSize = SGDTSIZE + sizeof(ItemIdData);
1929 
1930  if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
1931  ereport(ERROR,
1932  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1933  errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1934  leafSize - sizeof(ItemIdData),
1935  SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
1936  RelationGetRelationName(index)),
1937  errhint("Values larger than a buffer page cannot be indexed.")));
1938 
1939  /* Initialize "current" to the appropriate root page */
1940  current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
1941  current.buffer = InvalidBuffer;
1942  current.page = NULL;
1943  current.offnum = FirstOffsetNumber;
1944  current.node = -1;
1945 
1946  /* "parent" is invalid for the moment */
1947  parent.blkno = InvalidBlockNumber;
1948  parent.buffer = InvalidBuffer;
1949  parent.page = NULL;
1950  parent.offnum = InvalidOffsetNumber;
1951  parent.node = -1;
1952 
1953  for (;;)
1954  {
1955  bool isNew = false;
1956 
1957  /*
1958  * Bail out if query cancel is pending. We must have this somewhere
1959  * in the loop since a broken opclass could produce an infinite
1960  * picksplit loop.
1961  */
1963 
1964  if (current.blkno == InvalidBlockNumber)
1965  {
1966  /*
1967  * Create a leaf page. If leafSize is too large to fit on a page,
1968  * we won't actually use the page yet, but it simplifies the API
1969  * for doPickSplit to always have a leaf page at hand; so just
1970  * quietly limit our request to a page size.
1971  */
1972  current.buffer =
1973  SpGistGetBuffer(index,
1974  GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
1975  Min(leafSize, SPGIST_PAGE_CAPACITY),
1976  &isNew);
1977  current.blkno = BufferGetBlockNumber(current.buffer);
1978  }
1979  else if (parent.buffer == InvalidBuffer)
1980  {
1981  /* we hold no parent-page lock, so no deadlock is possible */
1982  current.buffer = ReadBuffer(index, current.blkno);
1984  }
1985  else if (current.blkno != parent.blkno)
1986  {
1987  /* descend to a new child page */
1988  current.buffer = ReadBuffer(index, current.blkno);
1989 
1990  /*
1991  * Attempt to acquire lock on child page. We must beware of
1992  * deadlock against another insertion process descending from that
1993  * page to our parent page (see README). If we fail to get lock,
1994  * abandon the insertion and tell our caller to start over.
1995  *
1996  * XXX this could be improved, because failing to get lock on a
1997  * buffer is not proof of a deadlock situation; the lock might be
1998  * held by a reader, or even just background writer/checkpointer
1999  * process. Perhaps it'd be worth retrying after sleeping a bit?
2000  */
2001  if (!ConditionalLockBuffer(current.buffer))
2002  {
2003  ReleaseBuffer(current.buffer);
2004  UnlockReleaseBuffer(parent.buffer);
2005  return false;
2006  }
2007  }
2008  else
2009  {
2010  /* inner tuple can be stored on the same page as parent one */
2011  current.buffer = parent.buffer;
2012  }
2013  current.page = BufferGetPage(current.buffer);
2014 
2015  /* should not arrive at a page of the wrong type */
2016  if (isnull ? !SpGistPageStoresNulls(current.page) :
2017  SpGistPageStoresNulls(current.page))
2018  elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2019  current.blkno);
2020 
2021  if (SpGistPageIsLeaf(current.page))
2022  {
2023  SpGistLeafTuple leafTuple;
2024  int nToSplit,
2025  sizeToSplit;
2026 
2027  leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
2028  if (leafTuple->size + sizeof(ItemIdData) <=
2029  SpGistPageGetFreeSpace(current.page, 1))
2030  {
2031  /* it fits on page, so insert it and we're done */
2032  addLeafTuple(index, state, leafTuple,
2033  &current, &parent, isnull, isNew);
2034  break;
2035  }
2036  else if ((sizeToSplit =
2037  checkSplitConditions(index, state, &current,
2038  &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
2039  nToSplit < 64 &&
2040  leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2041  {
2042  /*
2043  * the amount of data is pretty small, so just move the whole
2044  * chain to another leaf page rather than splitting it.
2045  */
2046  Assert(!isNew);
2047  moveLeafs(index, state, &current, &parent, leafTuple, isnull);
2048  break; /* we're done */
2049  }
2050  else
2051  {
2052  /* picksplit */
2053  if (doPickSplit(index, state, &current, &parent,
2054  leafTuple, level, isnull, isNew))
2055  break; /* doPickSplit installed new tuples */
2056 
2057  /* leaf tuple will not be inserted yet */
2058  pfree(leafTuple);
2059 
2060  /*
2061  * current now describes new inner tuple, go insert into it
2062  */
2063  Assert(!SpGistPageIsLeaf(current.page));
2064  goto process_inner_tuple;
2065  }
2066  }
2067  else /* non-leaf page */
2068  {
2069  /*
2070  * Apply the opclass choose function to figure out how to insert
2071  * the given datum into the current inner tuple.
2072  */
2073  SpGistInnerTuple innerTuple;
2074  spgChooseIn in;
2075  spgChooseOut out;
2076 
2077  /*
2078  * spgAddNode and spgSplitTuple cases will loop back to here to
2079  * complete the insertion operation. Just in case the choose
2080  * function is broken and produces add or split requests
2081  * repeatedly, check for query cancel.
2082  */
2083  process_inner_tuple:
2085 
2086  innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2087  PageGetItemId(current.page, current.offnum));
2088 
2089  in.datum = datum;
2090  in.leafDatum = leafDatum;
2091  in.level = level;
2092  in.allTheSame = innerTuple->allTheSame;
2093  in.hasPrefix = (innerTuple->prefixSize > 0);
2094  in.prefixDatum = SGITDATUM(innerTuple, state);
2095  in.nNodes = innerTuple->nNodes;
2096  in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2097 
2098  memset(&out, 0, sizeof(out));
2099 
2100  if (!isnull)
2101  {
2102  /* use user-defined choose method */
2103  FunctionCall2Coll(procinfo,
2104  index->rd_indcollation[0],
2105  PointerGetDatum(&in),
2106  PointerGetDatum(&out));
2107  }
2108  else
2109  {
2110  /* force "match" action (to insert to random subnode) */
2111  out.resultType = spgMatchNode;
2112  }
2113 
2114  if (innerTuple->allTheSame)
2115  {
2116  /*
2117  * It's not allowed to do an AddNode at an allTheSame tuple.
2118  * Opclass must say "match", in which case we choose a random
2119  * one of the nodes to descend into, or "split".
2120  */
2121  if (out.resultType == spgAddNode)
2122  elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2123  else if (out.resultType == spgMatchNode)
2124  out.result.matchNode.nodeN = random() % innerTuple->nNodes;
2125  }
2126 
2127  switch (out.resultType)
2128  {
2129  case spgMatchNode:
2130  /* Descend to N'th child node */
2131  spgMatchNodeAction(index, state, innerTuple,
2132  &current, &parent,
2133  out.result.matchNode.nodeN);
2134  /* Adjust level as per opclass request */
2135  level += out.result.matchNode.levelAdd;
2136  /* Replace leafDatum and recompute leafSize */
2137  if (!isnull)
2138  {
2139  leafDatum = out.result.matchNode.restDatum;
2140  leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
2141  SpGistGetTypeSize(&state->attType, leafDatum);
2142  }
2143 
2144  /*
2145  * Loop around and attempt to insert the new leafDatum at
2146  * "current" (which might reference an existing child
2147  * tuple, or might be invalid to force us to find a new
2148  * page for the tuple).
2149  *
2150  * Note: if the opclass sets longValuesOK, we rely on the
2151  * choose function to eventually shorten the leafDatum
2152  * enough to fit on a page. We could add a test here to
2153  * complain if the datum doesn't get visibly shorter each
2154  * time, but that could get in the way of opclasses that
2155  * "simplify" datums in a way that doesn't necessarily
2156  * lead to physical shortening on every cycle.
2157  */
2158  break;
2159  case spgAddNode:
2160  /* AddNode is not sensible if nodes don't have labels */
2161  if (in.nodeLabels == NULL)
2162  elog(ERROR, "cannot add a node to an inner tuple without node labels");
2163  /* Add node to inner tuple, per request */
2164  spgAddNodeAction(index, state, innerTuple,
2165  &current, &parent,
2166  out.result.addNode.nodeN,
2167  out.result.addNode.nodeLabel);
2168 
2169  /*
2170  * Retry insertion into the enlarged node. We assume that
2171  * we'll get a MatchNode result this time.
2172  */
2173  goto process_inner_tuple;
2174  break;
2175  case spgSplitTuple:
2176  /* Split inner tuple, per request */
2177  spgSplitNodeAction(index, state, innerTuple,
2178  &current, &out);
2179 
2180  /* Retry insertion into the split node */
2181  goto process_inner_tuple;
2182  break;
2183  default:
2184  elog(ERROR, "unrecognized SPGiST choose result: %d",
2185  (int) out.resultType);
2186  break;
2187  }
2188  }
2189  } /* end loop */
2190 
2191  /*
2192  * Release any buffers we're still holding. Beware of possibility that
2193  * current and parent reference same buffer.
2194  */
2195  if (current.buffer != InvalidBuffer)
2196  {
2197  SpGistSetLastUsedPage(index, current.buffer);
2198  UnlockReleaseBuffer(current.buffer);
2199  }
2200  if (parent.buffer != InvalidBuffer &&
2201  parent.buffer != current.buffer)
2202  {
2203  SpGistSetLastUsedPage(index, parent.buffer);
2204  UnlockReleaseBuffer(parent.buffer);
2205  }
2206 
2207  return true;
2208 }
Definition: fmgr.h:53
Datum * spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple)
Definition: spgutils.c:784
Datum datum
Definition: spgist.h:56
SpGistInnerTupleData * SpGistInnerTuple
#define SpGistPageIsLeaf(page)
bool hasPrefix
Definition: spgist.h:62
int errhint(const char *fmt,...)
Definition: elog.c:987
#define SpGistPageGetFreeSpace(p, n)
struct spgChooseOut::@41::@42 matchNode
FmgrInfo * index_getprocinfo(Relation irel, AttrNumber attnum, uint16 procnum)
Definition: indexam.c:855
#define SGDTSIZE
int level
Definition: spgist.h:58
#define PointerGetDatum(X)
Definition: postgres.h:564
long random(void)
Definition: random.c:22
#define Min(x, y)
Definition: c.h:802
SpGistTypeDesc attType
void SpGistSetLastUsedPage(Relation index, Buffer buffer)
Definition: spgutils.c:464
#define InvalidBuffer
Definition: buf.h:25
Datum prefixDatum
Definition: spgist.h:63
int errcode(int sqlerrcode)
Definition: elog.c:575
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3292
#define BUFFER_LOCK_EXCLUSIVE
Definition: bufmgr.h:89
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1306
#define SPGIST_ROOT_BLKNO
struct spgChooseOut::@41::@43 addNode
#define SGITDATUM(x, s)
static void addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
Definition: spgdoinsert.c:203
spgConfigOut config
void pfree(void *pointer)
Definition: mcxt.c:992
unsigned int allTheSame
static bool doPickSplit(Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, int level, bool isNulls, bool isNew)
Definition: spgdoinsert.c:675
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3315
Oid * rd_indcollation
Definition: rel.h:189
#define ERROR
Definition: elog.h:43
unsigned int prefixSize
#define SPGIST_NULL_BLKNO
#define FirstOffsetNumber
Definition: off.h:27
OffsetNumber offnum
Definition: spgdoinsert.c:39
static void spgSplitNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, spgChooseOut *out)
Definition: spgdoinsert.c:1692
#define RelationGetRelationName(relation)
Definition: rel.h:433
struct ItemIdData ItemIdData
#define BufferGetPage(buffer)
Definition: bufmgr.h:160
#define ereport(elevel, rest)
Definition: elog.h:122
bool ConditionalLockBuffer(Buffer buffer)
Definition: bufmgr.c:3555
int nNodes
Definition: spgist.h:64
#define SGLTHDRSZ
#define GBUF_LEAF
unsigned int SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum)
Definition: spgutils.c:555
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:232
SpGistLeafTuple spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, Datum datum, bool isnull)
Definition: spgutils.c:592
bool longValuesOK
Definition: spgist.h:48
uintptr_t Datum
Definition: postgres.h:374
void LockBuffer(Buffer buffer, int mode)
Definition: bufmgr.c:3529
#define SPGIST_PAGE_CAPACITY
spgChooseResultType resultType
Definition: spgist.h:77
#define InvalidOffsetNumber
Definition: off.h:26
#define SpGistPageStoresNulls(page)
static void spgMatchNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN)
Definition: spgdoinsert.c:1436
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
Datum leafDatum
Definition: spgist.h:57
Buffer SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
Definition: spgutils.c:359
Buffer ReadBuffer(Relation reln, BlockNumber blockNum)
Definition: bufmgr.c:594
#define InvalidBlockNumber
Definition: block.h:33
#define GBUF_NULLS
static void spgAddNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN, Datum nodeLabel)
Definition: spgdoinsert.c:1490
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:2588
Datum * nodeLabels
Definition: spgist.h:65
int errmsg(const char *fmt,...)
Definition: elog.c:797
union spgChooseOut::@41 result
bool allTheSame
Definition: spgist.h:61
static int checkSplitConditions(Relation index, SpGistState *state, SPPageDesc *current, int *nToSplit)
Definition: spgdoinsert.c:333
Buffer buffer
Definition: spgdoinsert.c:37
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:196
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
#define elog
Definition: elog.h:219
#define SPGIST_CHOOSE_PROC
Definition: spgist.h:29
static void moveLeafs(Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, bool isNulls)
Definition: spgdoinsert.c:387
#define PageGetItem(page, itemId)
Definition: bufpage.h:337
BlockNumber blkno
Definition: spgdoinsert.c:36
static void spgMatchNodeAction ( Relation  index,
SpGistState state,
SpGistInnerTuple  innerTuple,
SPPageDesc current,
SPPageDesc parent,
int  nodeN 
)
static

Definition at line 1436 of file spgdoinsert.c.

References SPPageDesc::blkno, SPPageDesc::buffer, elog, ERROR, i, InvalidBlockNumber, InvalidBuffer, InvalidOffsetNumber, ItemPointerGetBlockNumber, ItemPointerGetOffsetNumber, ItemPointerIsValid, SPPageDesc::node, NULL, SPPageDesc::offnum, SPPageDesc::page, SGITITERATE, SpGistSetLastUsedPage(), IndexTupleData::t_tid, and UnlockReleaseBuffer().

Referenced by spgdoinsert().

1439 {
1440  int i;
1441  SpGistNodeTuple node;
1442 
1443  /* Release previous parent buffer if any */
1444  if (parent->buffer != InvalidBuffer &&
1445  parent->buffer != current->buffer)
1446  {
1447  SpGistSetLastUsedPage(index, parent->buffer);
1448  UnlockReleaseBuffer(parent->buffer);
1449  }
1450 
1451  /* Repoint parent to specified node of current inner tuple */
1452  parent->blkno = current->blkno;
1453  parent->buffer = current->buffer;
1454  parent->page = current->page;
1455  parent->offnum = current->offnum;
1456  parent->node = nodeN;
1457 
1458  /* Locate that node */
1459  SGITITERATE(innerTuple, i, node)
1460  {
1461  if (i == nodeN)
1462  break;
1463  }
1464 
1465  if (i != nodeN)
1466  elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1467  nodeN);
1468 
1469  /* Point current to the downlink location, if any */
1470  if (ItemPointerIsValid(&node->t_tid))
1471  {
1472  current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1473  current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1474  }
1475  else
1476  {
1477  /* Downlink is empty, so we'll need to find a new page */
1478  current->blkno = InvalidBlockNumber;
1479  current->offnum = InvalidOffsetNumber;
1480  }
1481 
1482  current->buffer = InvalidBuffer;
1483  current->page = NULL;
1484 }
#define ItemPointerIsValid(pointer)
Definition: itemptr.h:59
#define SGITITERATE(x, i, nt)
ItemPointerData t_tid
Definition: itup.h:37
void SpGistSetLastUsedPage(Relation index, Buffer buffer)
Definition: spgutils.c:464
#define InvalidBuffer
Definition: buf.h:25
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3315
#define ERROR
Definition: elog.h:43
OffsetNumber offnum
Definition: spgdoinsert.c:39
#define InvalidOffsetNumber
Definition: off.h:26
#define NULL
Definition: c.h:226
#define InvalidBlockNumber
Definition: block.h:33
#define ItemPointerGetOffsetNumber(pointer)
Definition: itemptr.h:76
int i
Buffer buffer
Definition: spgdoinsert.c:37
#define elog
Definition: elog.h:219
#define ItemPointerGetBlockNumber(pointer)
Definition: itemptr.h:66
BlockNumber blkno
Definition: spgdoinsert.c:36
void spgPageIndexMultiDelete ( SpGistState state,
Page  page,
OffsetNumber itemnos,
int  nitems,
int  firststate,
int  reststate,
BlockNumber  blkno,
OffsetNumber  offnum 
)

Definition at line 131 of file spgdoinsert.c.

References cmpOffsetNumbers(), elog, ERROR, i, MaxIndexTuplesPerPage, NULL, PageAddItem, PageIndexMultiDelete(), qsort, SpGistDeadTupleData::size, spgFormDeadTuple(), SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistPageGetOpaque, and SpGistDeadTupleData::tupstate.

Referenced by doPickSplit(), moveLeafs(), spgRedoMoveLeafs(), spgRedoPickSplit(), spgRedoVacuumLeaf(), and vacuumLeafPage().

135 {
136  OffsetNumber firstItem;
138  SpGistDeadTuple tuple = NULL;
139  int i;
140 
141  if (nitems == 0)
142  return; /* nothing to do */
143 
144  /*
145  * For efficiency we want to use PageIndexMultiDelete, which requires the
146  * targets to be listed in sorted order, so we have to sort the itemnos
147  * array. (This also greatly simplifies the math for reinserting the
148  * replacement tuples.) However, we must not scribble on the caller's
149  * array, so we have to make a copy.
150  */
151  memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
152  if (nitems > 1)
153  qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
154 
155  PageIndexMultiDelete(page, sortednos, nitems);
156 
157  firstItem = itemnos[0];
158 
159  for (i = 0; i < nitems; i++)
160  {
161  OffsetNumber itemno = sortednos[i];
162  int tupstate;
163 
164  tupstate = (itemno == firstItem) ? firststate : reststate;
165  if (tuple == NULL || tuple->tupstate != tupstate)
166  tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
167 
168  if (PageAddItem(page, (Item) tuple, tuple->size,
169  itemno, false, false) != itemno)
170  elog(ERROR, "failed to add item of size %u to SPGiST index page",
171  tuple->size);
172 
173  if (tupstate == SPGIST_REDIRECT)
174  SpGistPageGetOpaque(page)->nRedirection++;
175  else if (tupstate == SPGIST_PLACEHOLDER)
176  SpGistPageGetOpaque(page)->nPlaceholder++;
177  }
178 }
SpGistDeadTuple spgFormDeadTuple(SpGistState *state, int tupstate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgutils.c:754
#define SPGIST_REDIRECT
#define SPGIST_PLACEHOLDER
Pointer Item
Definition: item.h:17
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:413
uint16 OffsetNumber
Definition: off.h:24
#define ERROR
Definition: elog.h:43
unsigned int tupstate
#define NULL
Definition: c.h:226
void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
Definition: bufpage.c:809
#define SpGistPageGetOpaque(page)
#define MaxIndexTuplesPerPage
Definition: itup.h:137
int i
#define elog
Definition: elog.h:219
#define qsort(a, b, c, d)
Definition: port.h:440
static int cmpOffsetNumbers(const void *a, const void *b)
Definition: spgdoinsert.c:110
static void spgSplitNodeAction ( Relation  index,
SpGistState state,
SpGistInnerTuple  innerTuple,
SPPageDesc current,
spgChooseOut out 
)
static

Definition at line 1692 of file spgdoinsert.c.

References SpGistInnerTupleData::allTheSame, Assert, SPPageDesc::blkno, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage, elog, END_CRIT_SECTION, ERROR, GBUF_INNER_PARITY, i, InvalidBuffer, label, MarkBufferDirty(), spgxlogSplitTuple::newPage, SpGistInnerTupleData::nNodes, NULL, SPPageDesc::offnum, spgxlogSplitTuple::offnumPostfix, spgxlogSplitTuple::offnumPrefix, SPPageDesc::page, PageAddItem, PageGetItem, PageGetItemId, PageIndexTupleDelete(), PageSetLSN, palloc(), spgxlogSplitTuple::postfixBlkSame, REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, spgChooseOut::result, SGITITERATE, SGITMAXNNODES, SpGistInnerTupleData::size, spgFormInnerTuple(), spgFormNodeTuple(), SpGistBlockIsRoot, SpGistGetBuffer(), SpGistPageAddNewItem(), SpGistPageGetFreeSpace, SpGistPageStoresNulls, SpGistSetLastUsedPage(), spgUpdateNodeLink(), spgChooseOut::splitTuple, START_CRIT_SECTION, UnlockReleaseBuffer(), XLOG_SPGIST_SPLIT_TUPLE, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

1695 {
1696  SpGistInnerTuple prefixTuple,
1697  postfixTuple;
1698  SpGistNodeTuple node,
1699  *nodes;
1700  BlockNumber postfixBlkno;
1701  OffsetNumber postfixOffset;
1702  int i;
1703  spgxlogSplitTuple xlrec;
1704  Buffer newBuffer = InvalidBuffer;
1705 
1706  /* Should not be applied to nulls */
1707  Assert(!SpGistPageStoresNulls(current->page));
1708 
1709  /* Check opclass gave us sane values */
1710  if (out->result.splitTuple.prefixNNodes <= 0 ||
1711  out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
1712  elog(ERROR, "invalid number of prefix nodes: %d",
1713  out->result.splitTuple.prefixNNodes);
1714  if (out->result.splitTuple.childNodeN < 0 ||
1715  out->result.splitTuple.childNodeN >=
1716  out->result.splitTuple.prefixNNodes)
1717  elog(ERROR, "invalid child node number: %d",
1718  out->result.splitTuple.childNodeN);
1719 
1720  /*
1721  * Construct new prefix tuple with requested number of nodes. We'll fill
1722  * in the childNodeN'th node's downlink below.
1723  */
1724  nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
1725  out->result.splitTuple.prefixNNodes);
1726 
1727  for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
1728  {
1729  Datum label = (Datum) 0;
1730  bool labelisnull;
1731 
1732  labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
1733  if (!labelisnull)
1734  label = out->result.splitTuple.prefixNodeLabels[i];
1735  nodes[i] = spgFormNodeTuple(state, label, labelisnull);
1736  }
1737 
1738  prefixTuple = spgFormInnerTuple(state,
1739  out->result.splitTuple.prefixHasPrefix,
1740  out->result.splitTuple.prefixPrefixDatum,
1741  out->result.splitTuple.prefixNNodes,
1742  nodes);
1743 
1744  /* it must fit in the space that innerTuple now occupies */
1745  if (prefixTuple->size > innerTuple->size)
1746  elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1747 
1748  /*
1749  * Construct new postfix tuple, containing all nodes of innerTuple with
1750  * same node datums, but with the prefix specified by the picksplit
1751  * function.
1752  */
1753  nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1754  SGITITERATE(innerTuple, i, node)
1755  {
1756  nodes[i] = node;
1757  }
1758 
1759  postfixTuple = spgFormInnerTuple(state,
1760  out->result.splitTuple.postfixHasPrefix,
1761  out->result.splitTuple.postfixPrefixDatum,
1762  innerTuple->nNodes, nodes);
1763 
1764  /* Postfix tuple is allTheSame if original tuple was */
1765  postfixTuple->allTheSame = innerTuple->allTheSame;
1766 
1767  /* prep data for WAL record */
1768  xlrec.newPage = false;
1769 
1770  /*
1771  * If we can't fit both tuples on the current page, get a new page for the
1772  * postfix tuple. In particular, can't split to the root page.
1773  *
1774  * For the space calculation, note that prefixTuple replaces innerTuple
1775  * but postfixTuple will be a new entry.
1776  */
1777  if (SpGistBlockIsRoot(current->blkno) ||
1778  SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1779  prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1780  {
1781  /*
1782  * Choose page with next triple parity, because postfix tuple is a
1783  * child of prefix one
1784  */
1785  newBuffer = SpGistGetBuffer(index,
1786  GBUF_INNER_PARITY(current->blkno + 1),
1787  postfixTuple->size + sizeof(ItemIdData),
1788  &xlrec.newPage);
1789  }
1790 
1792 
1793  /*
1794  * Replace old tuple by prefix tuple
1795  */
1796  PageIndexTupleDelete(current->page, current->offnum);
1797  xlrec.offnumPrefix = PageAddItem(current->page,
1798  (Item) prefixTuple, prefixTuple->size,
1799  current->offnum, false, false);
1800  if (xlrec.offnumPrefix != current->offnum)
1801  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1802  prefixTuple->size);
1803 
1804  /*
1805  * put postfix tuple into appropriate page
1806  */
1807  if (newBuffer == InvalidBuffer)
1808  {
1809  postfixBlkno = current->blkno;
1810  xlrec.offnumPostfix = postfixOffset =
1811  SpGistPageAddNewItem(state, current->page,
1812  (Item) postfixTuple, postfixTuple->size,
1813  NULL, false);
1814  xlrec.postfixBlkSame = true;
1815  }
1816  else
1817  {
1818  postfixBlkno = BufferGetBlockNumber(newBuffer);
1819  xlrec.offnumPostfix = postfixOffset =
1820  SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
1821  (Item) postfixTuple, postfixTuple->size,
1822  NULL, false);
1823  MarkBufferDirty(newBuffer);
1824  xlrec.postfixBlkSame = false;
1825  }
1826 
1827  /*
1828  * And set downlink pointer in the prefix tuple to point to postfix tuple.
1829  * (We can't avoid this step by doing the above two steps in opposite
1830  * order, because there might not be enough space on the page to insert
1831  * the postfix tuple first.) We have to update the local copy of the
1832  * prefixTuple too, because that's what will be written to WAL.
1833  */
1834  spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1835  postfixBlkno, postfixOffset);
1836  prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1837  PageGetItemId(current->page, current->offnum));
1838  spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1839  postfixBlkno, postfixOffset);
1840 
1841  MarkBufferDirty(current->buffer);
1842 
1843  if (RelationNeedsWAL(index))
1844  {
1845  XLogRecPtr recptr;
1846 
1847  XLogBeginInsert();
1848  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1849  XLogRegisterData((char *) prefixTuple, prefixTuple->size);
1850  XLogRegisterData((char *) postfixTuple, postfixTuple->size);
1851 
1853  if (newBuffer != InvalidBuffer)
1854  {
1855  int flags;
1856 
1857  flags = REGBUF_STANDARD;
1858  if (xlrec.newPage)
1859  flags |= REGBUF_WILL_INIT;
1860  XLogRegisterBuffer(1, newBuffer, flags);
1861  }
1862 
1863  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1864 
1865  PageSetLSN(current->page, recptr);
1866 
1867  if (newBuffer != InvalidBuffer)
1868  {
1869  PageSetLSN(BufferGetPage(newBuffer), recptr);
1870  }
1871  }
1872 
1873  END_CRIT_SECTION();
1874 
1875  /* Update local free-space cache and release buffer */
1876  if (newBuffer != InvalidBuffer)
1877  {
1878  SpGistSetLastUsedPage(index, newBuffer);
1879  UnlockReleaseBuffer(newBuffer);
1880  }
1881 }
OffsetNumber offnumPostfix
Definition: spgxlog.h:147
void spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN, BlockNumber blkno, OffsetNumber offset)
Definition: spgdoinsert.c:50
SpGistInnerTupleData * SpGistInnerTuple
#define SpGistPageGetFreeSpace(p, n)
#define XLOG_SPGIST_SPLIT_TUPLE
Definition: spgxlog.h:25
void PageIndexTupleDelete(Page page, OffsetNumber offnum)
Definition: bufpage.c:700
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:1445
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:213
#define SGITITERATE(x, i, nt)
#define END_CRIT_SECTION()
Definition: miscadmin.h:132
void SpGistSetLastUsedPage(Relation index, Buffer buffer)
Definition: spgutils.c:464
Pointer Item
Definition: item.h:17
#define InvalidBuffer
Definition: buf.h:25
#define REGBUF_WILL_INIT
Definition: xloginsert.h:32
#define START_CRIT_SECTION()
Definition: miscadmin.h:130
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:413
uint32 BlockNumber
Definition: block.h:31
uint16 OffsetNumber
Definition: off.h:24
unsigned int allTheSame
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3315
#define ERROR
Definition: elog.h:43
#define GBUF_INNER_PARITY(x)
#define REGBUF_STANDARD
Definition: xloginsert.h:35
SpGistInnerTuple spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix, int nNodes, SpGistNodeTuple *nodes)
Definition: spgutils.c:671
OffsetNumber offnum
Definition: spgdoinsert.c:39
struct spgChooseOut::@41::@44 splitTuple
#define BufferGetPage(buffer)
Definition: bufmgr.h:160
#define SGITMAXNNODES
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:232
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
OffsetNumber offnumPrefix
Definition: spgxlog.h:144
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
uintptr_t Datum
Definition: postgres.h:374
static char * label
Definition: pg_basebackup.c:83
bool postfixBlkSame
Definition: spgxlog.h:149
#define SpGistPageStoresNulls(page)
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:671
Buffer SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
Definition: spgutils.c:359
#define SpGistBlockIsRoot(blkno)
#define RelationNeedsWAL(relation)
Definition: rel.h:502
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:2588
void * palloc(Size size)
Definition: mcxt.c:891
int i
union spgChooseOut::@41 result
Buffer buffer
Definition: spgdoinsert.c:37
#define elog
Definition: elog.h:219
void XLogBeginInsert(void)
Definition: xloginsert.c:120
#define PageSetLSN(page, lsn)
Definition: bufpage.h:365
int Buffer
Definition: buf.h:23
OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size, OffsetNumber *startOffset, bool errorOK)
Definition: spgutils.c:827
SpGistNodeTuple spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
Definition: spgutils.c:629
#define PageGetItem(page, itemId)
Definition: bufpage.h:337
BlockNumber blkno
Definition: spgdoinsert.c:36
void spgUpdateNodeLink ( SpGistInnerTuple  tup,
int  nodeN,
BlockNumber  blkno,
OffsetNumber  offset 
)

Definition at line 50 of file spgdoinsert.c.

References elog, ERROR, i, ItemPointerSet, SGITITERATE, and IndexTupleData::t_tid.

Referenced by saveNodeLink(), spgRedoAddLeaf(), spgRedoAddNode(), spgRedoMoveLeafs(), spgRedoPickSplit(), and spgSplitNodeAction().

52 {
53  int i;
54  SpGistNodeTuple node;
55 
56  SGITITERATE(tup, i, node)
57  {
58  if (i == nodeN)
59  {
60  ItemPointerSet(&node->t_tid, blkno, offset);
61  return;
62  }
63  }
64 
65  elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
66  nodeN);
67 }
#define SGITITERATE(x, i, nt)
ItemPointerData t_tid
Definition: itup.h:37
#define ERROR
Definition: elog.h:43
int i
#define elog
Definition: elog.h:219
#define ItemPointerSet(pointer, blockNumber, offNum)
Definition: itemptr.h:86