PostgreSQL Source Code  git master
spgdoinsert.c File Reference
#include "postgres.h"
#include "access/genam.h"
#include "access/spgist_private.h"
#include "access/spgxlog.h"
#include "access/xloginsert.h"
#include "common/pg_prng.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "utils/rel.h"
Include dependency graph for spgdoinsert.c:

Go to the source code of this file.

Data Structures

struct  SPPageDesc
 

Typedefs

typedef struct SPPageDesc SPPageDesc
 

Functions

void spgUpdateNodeLink (SpGistInnerTuple tup, int nodeN, BlockNumber blkno, OffsetNumber offset)
 
static SpGistInnerTuple addNode (SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
 
static int cmpOffsetNumbers (const void *a, const void *b)
 
void spgPageIndexMultiDelete (SpGistState *state, Page page, OffsetNumber *itemnos, int nitems, int firststate, int reststate, BlockNumber blkno, OffsetNumber offnum)
 
static void saveNodeLink (Relation index, SPPageDesc *parent, BlockNumber blkno, OffsetNumber offnum)
 
static void addLeafTuple (Relation index, SpGistState *state, SpGistLeafTuple leafTuple, SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
 
static int checkSplitConditions (Relation index, SpGistState *state, SPPageDesc *current, int *nToSplit)
 
static void moveLeafs (Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, bool isNulls)
 
static void setRedirectionTuple (SPPageDesc *current, OffsetNumber position, BlockNumber blkno, OffsetNumber offnum)
 
static bool checkAllTheSame (spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig, bool *includeNew)
 
static bool doPickSplit (Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, int level, bool isNulls, bool isNew)
 
static void spgMatchNodeAction (Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN)
 
static void spgAddNodeAction (Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN, Datum nodeLabel)
 
static void spgSplitNodeAction (Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, spgChooseOut *out)
 
bool spgdoinsert (Relation index, SpGistState *state, ItemPointer heapPtr, Datum *datums, bool *isnulls)
 

Typedef Documentation

◆ SPPageDesc

typedef struct SPPageDesc SPPageDesc

Function Documentation

◆ addLeafTuple()

static void addLeafTuple ( Relation  index,
SpGistState state,
SpGistLeafTuple  leafTuple,
SPPageDesc current,
SPPageDesc parent,
bool  isNulls,
bool  isNew 
)
static

Definition at line 204 of file spgdoinsert.c.

206 {
207  spgxlogAddLeaf xlrec;
208 
209  xlrec.newPage = isNew;
210  xlrec.storesNulls = isNulls;
211 
212  /* these will be filled below as needed */
216  xlrec.nodeI = 0;
217 
219 
220  if (current->offnum == InvalidOffsetNumber ||
221  SpGistBlockIsRoot(current->blkno))
222  {
223  /* Tuple is not part of a chain */
225  current->offnum = SpGistPageAddNewItem(state, current->page,
226  (Item) leafTuple, leafTuple->size,
227  NULL, false);
228 
229  xlrec.offnumLeaf = current->offnum;
230 
231  /* Must update parent's downlink if any */
232  if (parent->buffer != InvalidBuffer)
233  {
234  xlrec.offnumParent = parent->offnum;
235  xlrec.nodeI = parent->node;
236 
237  saveNodeLink(index, parent, current->blkno, current->offnum);
238  }
239  }
240  else
241  {
242  /*
243  * Tuple must be inserted into existing chain. We mustn't change the
244  * chain's head address, but we don't need to chase the entire chain
245  * to put the tuple at the end; we can insert it second.
246  *
247  * Also, it's possible that the "chain" consists only of a DEAD tuple,
248  * in which case we should replace the DEAD tuple in-place.
249  */
250  SpGistLeafTuple head;
251  OffsetNumber offnum;
252 
253  head = (SpGistLeafTuple) PageGetItem(current->page,
254  PageGetItemId(current->page, current->offnum));
255  if (head->tupstate == SPGIST_LIVE)
256  {
257  SGLT_SET_NEXTOFFSET(leafTuple, SGLT_GET_NEXTOFFSET(head));
258  offnum = SpGistPageAddNewItem(state, current->page,
259  (Item) leafTuple, leafTuple->size,
260  NULL, false);
261 
262  /*
263  * re-get head of list because it could have been moved on page,
264  * and set new second element
265  */
266  head = (SpGistLeafTuple) PageGetItem(current->page,
267  PageGetItemId(current->page, current->offnum));
268  SGLT_SET_NEXTOFFSET(head, offnum);
269 
270  xlrec.offnumLeaf = offnum;
271  xlrec.offnumHeadLeaf = current->offnum;
272  }
273  else if (head->tupstate == SPGIST_DEAD)
274  {
276  PageIndexTupleDelete(current->page, current->offnum);
277  if (PageAddItem(current->page,
278  (Item) leafTuple, leafTuple->size,
279  current->offnum, false, false) != current->offnum)
280  elog(ERROR, "failed to add item of size %u to SPGiST index page",
281  leafTuple->size);
282 
283  /* WAL replay distinguishes this case by equal offnums */
284  xlrec.offnumLeaf = current->offnum;
285  xlrec.offnumHeadLeaf = current->offnum;
286  }
287  else
288  elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
289  }
290 
291  MarkBufferDirty(current->buffer);
292 
293  if (RelationNeedsWAL(index) && !state->isBuild)
294  {
295  XLogRecPtr recptr;
296  int flags;
297 
298  XLogBeginInsert();
299  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
300  XLogRegisterData((char *) leafTuple, leafTuple->size);
301 
302  flags = REGBUF_STANDARD;
303  if (xlrec.newPage)
304  flags |= REGBUF_WILL_INIT;
305  XLogRegisterBuffer(0, current->buffer, flags);
306  if (xlrec.offnumParent != InvalidOffsetNumber)
308 
309  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
310 
311  PageSetLSN(current->page, recptr);
312 
313  /* update parent only if we actually changed it */
314  if (xlrec.offnumParent != InvalidOffsetNumber)
315  {
316  PageSetLSN(parent->page, recptr);
317  }
318  }
319 
321 }
#define InvalidBuffer
Definition: buf.h:25
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:2111
void PageIndexTupleDelete(Page page, OffsetNumber offnum)
Definition: bufpage.c:1052
static Item PageGetItem(Page page, ItemId itemId)
Definition: bufpage.h:351
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
Definition: bufpage.h:240
static void PageSetLSN(Page page, XLogRecPtr lsn)
Definition: bufpage.h:388
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:468
#define ERROR
Definition: elog.h:39
Pointer Item
Definition: item.h:17
#define START_CRIT_SECTION()
Definition: miscadmin.h:148
#define END_CRIT_SECTION()
Definition: miscadmin.h:150
#define InvalidOffsetNumber
Definition: off.h:26
uint16 OffsetNumber
Definition: off.h:24
#define RelationNeedsWAL(relation)
Definition: rel.h:629
static void saveNodeLink(Relation index, SPPageDesc *parent, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:187
#define SPGIST_LIVE
#define SGLT_GET_NEXTOFFSET(spgLeafTuple)
#define SPGIST_DEAD
#define SGLT_SET_NEXTOFFSET(spgLeafTuple, offsetNumber)
#define SpGistBlockIsRoot(blkno)
struct SpGistLeafTupleData * SpGistLeafTuple
OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size, OffsetNumber *startOffset, bool errorOK)
Definition: spgutils.c:1176
#define XLOG_SPGIST_ADD_LEAF
Definition: spgxlog.h:22
Buffer buffer
Definition: spgdoinsert.c:38
OffsetNumber offnum
Definition: spgdoinsert.c:40
BlockNumber blkno
Definition: spgdoinsert.c:37
unsigned int tupstate
Definition: type.h:95
uint16 nodeI
Definition: spgxlog.h:54
bool newPage
Definition: spgxlog.h:48
OffsetNumber offnumLeaf
Definition: spgxlog.h:50
bool storesNulls
Definition: spgxlog.h:49
OffsetNumber offnumHeadLeaf
Definition: spgxlog.h:51
OffsetNumber offnumParent
Definition: spgxlog.h:53
Definition: regguts.h:323
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void XLogRegisterData(char *data, uint32 len)
Definition: xloginsert.c:351
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:461
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:243
void XLogBeginInsert(void)
Definition: xloginsert.c:150
#define REGBUF_STANDARD
Definition: xloginsert.h:34
#define REGBUF_WILL_INIT
Definition: xloginsert.h:33

References SPPageDesc::blkno, SPPageDesc::buffer, elog(), END_CRIT_SECTION, ERROR, InvalidBuffer, InvalidOffsetNumber, MarkBufferDirty(), spgxlogAddLeaf::newPage, SPPageDesc::node, spgxlogAddLeaf::nodeI, SPPageDesc::offnum, spgxlogAddLeaf::offnumHeadLeaf, spgxlogAddLeaf::offnumLeaf, spgxlogAddLeaf::offnumParent, SPPageDesc::page, PageAddItem, PageGetItem(), PageGetItemId(), PageIndexTupleDelete(), PageSetLSN(), REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, saveNodeLink(), SGLT_GET_NEXTOFFSET, SGLT_SET_NEXTOFFSET, SpGistLeafTupleData::size, SPGIST_DEAD, SPGIST_LIVE, SpGistBlockIsRoot, SpGistPageAddNewItem(), START_CRIT_SECTION, spgxlogAddLeaf::storesNulls, SpGistLeafTupleData::tupstate, XLOG_SPGIST_ADD_LEAF, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

◆ addNode()

static SpGistInnerTuple addNode ( SpGistState state,
SpGistInnerTuple  tuple,
Datum  label,
int  offset 
)
static

Definition at line 79 of file spgdoinsert.c.

80 {
81  SpGistNodeTuple node,
82  *nodes;
83  int i;
84 
85  /* if offset is negative, insert at end */
86  if (offset < 0)
87  offset = tuple->nNodes;
88  else if (offset > tuple->nNodes)
89  elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
90 
91  nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
92  SGITITERATE(tuple, i, node)
93  {
94  if (i < offset)
95  nodes[i] = node;
96  else
97  nodes[i + 1] = node;
98  }
99 
100  nodes[offset] = spgFormNodeTuple(state, label, false);
101 
102  return spgFormInnerTuple(state,
103  (tuple->prefixSize > 0),
104  SGITDATUM(tuple, state),
105  tuple->nNodes + 1,
106  nodes);
107 }
int i
Definition: isn.c:73
void * palloc(Size size)
Definition: mcxt.c:1226
static char * label
#define SGITDATUM(x, s)
#define SGITITERATE(x, i, nt)
SpGistInnerTuple spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix, int nNodes, SpGistNodeTuple *nodes)
Definition: spgutils.c:974
SpGistNodeTuple spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
Definition: spgutils.c:932
unsigned int prefixSize

References elog(), ERROR, i, label, SpGistInnerTupleData::nNodes, palloc(), SpGistInnerTupleData::prefixSize, SGITDATUM, SGITITERATE, spgFormInnerTuple(), and spgFormNodeTuple().

Referenced by spgAddNodeAction().

◆ checkAllTheSame()

static bool checkAllTheSame ( spgPickSplitIn in,
spgPickSplitOut out,
bool  tooBig,
bool includeNew 
)
static

Definition at line 600 of file spgdoinsert.c.

602 {
603  int theNode;
604  int limit;
605  int i;
606 
607  /* For the moment, assume we can include the new leaf tuple */
608  *includeNew = true;
609 
610  /* If there's only the new leaf tuple, don't select allTheSame mode */
611  if (in->nTuples <= 1)
612  return false;
613 
614  /* If tuple set doesn't fit on one page, ignore the new tuple in test */
615  limit = tooBig ? in->nTuples - 1 : in->nTuples;
616 
617  /* Check to see if more than one node is populated */
618  theNode = out->mapTuplesToNodes[0];
619  for (i = 1; i < limit; i++)
620  {
621  if (out->mapTuplesToNodes[i] != theNode)
622  return false;
623  }
624 
625  /* Nope, so override the picksplit function's decisions */
626 
627  /* If the new tuple is in its own node, it can't be included in split */
628  if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
629  *includeNew = false;
630 
631  out->nNodes = 8; /* arbitrary number of child nodes */
632 
633  /* Random assignment of tuples to nodes (note we include new tuple) */
634  for (i = 0; i < in->nTuples; i++)
635  out->mapTuplesToNodes[i] = i % out->nNodes;
636 
637  /* The opclass may not use node labels, but if it does, duplicate 'em */
638  if (out->nodeLabels)
639  {
640  Datum theLabel = out->nodeLabels[theNode];
641 
642  out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
643  for (i = 0; i < out->nNodes; i++)
644  out->nodeLabels[i] = theLabel;
645  }
646 
647  /* We don't touch the prefix or the leaf tuple datum assignments */
648 
649  return true;
650 }
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
uintptr_t Datum
Definition: postgres.h:64
int * mapTuplesToNodes
Definition: spgist.h:125
Datum * nodeLabels
Definition: spgist.h:123

References i, if(), spgPickSplitOut::mapTuplesToNodes, spgPickSplitOut::nNodes, spgPickSplitOut::nodeLabels, spgPickSplitIn::nTuples, and palloc().

Referenced by doPickSplit().

◆ checkSplitConditions()

static int checkSplitConditions ( Relation  index,
SpGistState state,
SPPageDesc current,
int *  nToSplit 
)
static

Definition at line 334 of file spgdoinsert.c.

336 {
337  int i,
338  n = 0,
339  totalSize = 0;
340 
341  if (SpGistBlockIsRoot(current->blkno))
342  {
343  /* return impossible values to force split */
344  *nToSplit = BLCKSZ;
345  return BLCKSZ;
346  }
347 
348  i = current->offnum;
349  while (i != InvalidOffsetNumber)
350  {
351  SpGistLeafTuple it;
352 
354  i <= PageGetMaxOffsetNumber(current->page));
355  it = (SpGistLeafTuple) PageGetItem(current->page,
356  PageGetItemId(current->page, i));
357  if (it->tupstate == SPGIST_LIVE)
358  {
359  n++;
360  totalSize += it->size + sizeof(ItemIdData);
361  }
362  else if (it->tupstate == SPGIST_DEAD)
363  {
364  /* We could see a DEAD tuple as first/only chain item */
365  Assert(i == current->offnum);
367  /* Don't count it in result, because it won't go to other page */
368  }
369  else
370  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
371 
372  i = SGLT_GET_NEXTOFFSET(it);
373  }
374 
375  *nToSplit = n;
376 
377  return totalSize;
378 }
static OffsetNumber PageGetMaxOffsetNumber(Page page)
Definition: bufpage.h:369
struct ItemIdData ItemIdData
Assert(fmt[strlen(fmt) - 1] !='\n')
#define FirstOffsetNumber
Definition: off.h:27

References Assert(), SPPageDesc::blkno, elog(), ERROR, FirstOffsetNumber, i, InvalidOffsetNumber, SPPageDesc::offnum, SPPageDesc::page, PageGetItem(), PageGetItemId(), PageGetMaxOffsetNumber(), SGLT_GET_NEXTOFFSET, SpGistLeafTupleData::size, SPGIST_DEAD, SPGIST_LIVE, SpGistBlockIsRoot, and SpGistLeafTupleData::tupstate.

Referenced by spgdoinsert().

◆ cmpOffsetNumbers()

static int cmpOffsetNumbers ( const void *  a,
const void *  b 
)
static

Definition at line 111 of file spgdoinsert.c.

112 {
113  if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
114  return 0;
115  return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
116 }
int b
Definition: isn.c:70
int a
Definition: isn.c:69

References a, and b.

Referenced by spgPageIndexMultiDelete().

◆ doPickSplit()

static bool doPickSplit ( Relation  index,
SpGistState state,
SPPageDesc current,
SPPageDesc parent,
SpGistLeafTuple  newLeafTuple,
int  level,
bool  isNulls,
bool  isNew 
)
static

Definition at line 678 of file spgdoinsert.c.

682 {
683  bool insertedNew = false;
684  spgPickSplitIn in;
685  spgPickSplitOut out;
686  FmgrInfo *procinfo;
687  bool includeNew;
688  int i,
689  max,
690  n;
691  SpGistInnerTuple innerTuple;
692  SpGistNodeTuple node,
693  *nodes;
694  Buffer newInnerBuffer,
695  newLeafBuffer;
696  uint8 *leafPageSelect;
697  int *leafSizes;
698  OffsetNumber *toDelete;
699  OffsetNumber *toInsert;
700  OffsetNumber redirectTuplePos = InvalidOffsetNumber;
701  OffsetNumber startOffsets[2];
702  SpGistLeafTuple *oldLeafs;
703  SpGistLeafTuple *newLeafs;
704  Datum leafDatums[INDEX_MAX_KEYS];
705  bool leafIsnulls[INDEX_MAX_KEYS];
706  int spaceToDelete;
707  int currentFreeSpace;
708  int totalLeafSizes;
709  bool allTheSame;
710  spgxlogPickSplit xlrec;
711  char *leafdata,
712  *leafptr;
713  SPPageDesc saveCurrent;
714  int nToDelete,
715  nToInsert,
716  maxToInclude;
717 
718  in.level = level;
719 
720  /*
721  * Allocate per-leaf-tuple work arrays with max possible size
722  */
723  max = PageGetMaxOffsetNumber(current->page);
724  n = max + 1;
725  in.datums = (Datum *) palloc(sizeof(Datum) * n);
726  toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
727  toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
728  oldLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
729  newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
730  leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
731 
732  STORE_STATE(state, xlrec.stateSrc);
733 
734  /*
735  * Form list of leaf tuples which will be distributed as split result;
736  * also, count up the amount of space that will be freed from current.
737  * (Note that in the non-root case, we won't actually delete the old
738  * tuples, only replace them with redirects or placeholders.)
739  */
740  nToInsert = 0;
741  nToDelete = 0;
742  spaceToDelete = 0;
743  if (SpGistBlockIsRoot(current->blkno))
744  {
745  /*
746  * We are splitting the root (which up to now is also a leaf page).
747  * Its tuples are not linked, so scan sequentially to get them all. We
748  * ignore the original value of current->offnum.
749  */
750  for (i = FirstOffsetNumber; i <= max; i++)
751  {
752  SpGistLeafTuple it;
753 
754  it = (SpGistLeafTuple) PageGetItem(current->page,
755  PageGetItemId(current->page, i));
756  if (it->tupstate == SPGIST_LIVE)
757  {
758  in.datums[nToInsert] =
759  isNulls ? (Datum) 0 : SGLTDATUM(it, state);
760  oldLeafs[nToInsert] = it;
761  nToInsert++;
762  toDelete[nToDelete] = i;
763  nToDelete++;
764  /* we will delete the tuple altogether, so count full space */
765  spaceToDelete += it->size + sizeof(ItemIdData);
766  }
767  else /* tuples on root should be live */
768  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
769  }
770  }
771  else
772  {
773  /* Normal case, just collect the leaf tuples in the chain */
774  i = current->offnum;
775  while (i != InvalidOffsetNumber)
776  {
777  SpGistLeafTuple it;
778 
779  Assert(i >= FirstOffsetNumber && i <= max);
780  it = (SpGistLeafTuple) PageGetItem(current->page,
781  PageGetItemId(current->page, i));
782  if (it->tupstate == SPGIST_LIVE)
783  {
784  in.datums[nToInsert] =
785  isNulls ? (Datum) 0 : SGLTDATUM(it, state);
786  oldLeafs[nToInsert] = it;
787  nToInsert++;
788  toDelete[nToDelete] = i;
789  nToDelete++;
790  /* we will not delete the tuple, only replace with dead */
791  Assert(it->size >= SGDTSIZE);
792  spaceToDelete += it->size - SGDTSIZE;
793  }
794  else if (it->tupstate == SPGIST_DEAD)
795  {
796  /* We could see a DEAD tuple as first/only chain item */
797  Assert(i == current->offnum);
799  toDelete[nToDelete] = i;
800  nToDelete++;
801  /* replacing it with redirect will save no space */
802  }
803  else
804  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
805 
806  i = SGLT_GET_NEXTOFFSET(it);
807  }
808  }
809  in.nTuples = nToInsert;
810 
811  /*
812  * We may not actually insert new tuple because another picksplit may be
813  * necessary due to too large value, but we will try to allocate enough
814  * space to include it; and in any case it has to be included in the input
815  * for the picksplit function. So don't increment nToInsert yet.
816  */
817  in.datums[in.nTuples] =
818  isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state);
819  oldLeafs[in.nTuples] = newLeafTuple;
820  in.nTuples++;
821 
822  memset(&out, 0, sizeof(out));
823 
824  if (!isNulls)
825  {
826  /*
827  * Perform split using user-defined method.
828  */
830  FunctionCall2Coll(procinfo,
831  index->rd_indcollation[0],
832  PointerGetDatum(&in),
833  PointerGetDatum(&out));
834 
835  /*
836  * Form new leaf tuples and count up the total space needed.
837  */
838  totalLeafSizes = 0;
839  for (i = 0; i < in.nTuples; i++)
840  {
841  if (state->leafTupDesc->natts > 1)
842  spgDeformLeafTuple(oldLeafs[i],
843  state->leafTupDesc,
844  leafDatums,
845  leafIsnulls,
846  isNulls);
847 
848  leafDatums[spgKeyColumn] = out.leafTupleDatums[i];
849  leafIsnulls[spgKeyColumn] = false;
850 
851  newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
852  leafDatums,
853  leafIsnulls);
854  totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
855  }
856  }
857  else
858  {
859  /*
860  * Perform dummy split that puts all tuples into one node.
861  * checkAllTheSame will override this and force allTheSame mode.
862  */
863  out.hasPrefix = false;
864  out.nNodes = 1;
865  out.nodeLabels = NULL;
866  out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
867 
868  /*
869  * Form new leaf tuples and count up the total space needed.
870  */
871  totalLeafSizes = 0;
872  for (i = 0; i < in.nTuples; i++)
873  {
874  if (state->leafTupDesc->natts > 1)
875  spgDeformLeafTuple(oldLeafs[i],
876  state->leafTupDesc,
877  leafDatums,
878  leafIsnulls,
879  isNulls);
880 
881  /*
882  * Nulls tree can contain only null key values.
883  */
884  leafDatums[spgKeyColumn] = (Datum) 0;
885  leafIsnulls[spgKeyColumn] = true;
886 
887  newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
888  leafDatums,
889  leafIsnulls);
890  totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
891  }
892  }
893 
894  /*
895  * Check to see if the picksplit function failed to separate the values,
896  * ie, it put them all into the same child node. If so, select allTheSame
897  * mode and create a random split instead. See comments for
898  * checkAllTheSame as to why we need to know if the new leaf tuples could
899  * fit on one page.
900  */
901  allTheSame = checkAllTheSame(&in, &out,
902  totalLeafSizes > SPGIST_PAGE_CAPACITY,
903  &includeNew);
904 
905  /*
906  * If checkAllTheSame decided we must exclude the new tuple, don't
907  * consider it any further.
908  */
909  if (includeNew)
910  maxToInclude = in.nTuples;
911  else
912  {
913  maxToInclude = in.nTuples - 1;
914  totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
915  }
916 
917  /*
918  * Allocate per-node work arrays. Since checkAllTheSame could replace
919  * out.nNodes with a value larger than the number of tuples on the input
920  * page, we can't allocate these arrays before here.
921  */
922  nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
923  leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
924 
925  /*
926  * Form nodes of inner tuple and inner tuple itself
927  */
928  for (i = 0; i < out.nNodes; i++)
929  {
930  Datum label = (Datum) 0;
931  bool labelisnull = (out.nodeLabels == NULL);
932 
933  if (!labelisnull)
934  label = out.nodeLabels[i];
935  nodes[i] = spgFormNodeTuple(state, label, labelisnull);
936  }
937  innerTuple = spgFormInnerTuple(state,
938  out.hasPrefix, out.prefixDatum,
939  out.nNodes, nodes);
940  innerTuple->allTheSame = allTheSame;
941 
942  /*
943  * Update nodes[] array to point into the newly formed innerTuple, so that
944  * we can adjust their downlinks below.
945  */
946  SGITITERATE(innerTuple, i, node)
947  {
948  nodes[i] = node;
949  }
950 
951  /*
952  * Re-scan new leaf tuples and count up the space needed under each node.
953  */
954  for (i = 0; i < maxToInclude; i++)
955  {
956  n = out.mapTuplesToNodes[i];
957  if (n < 0 || n >= out.nNodes)
958  elog(ERROR, "inconsistent result of SPGiST picksplit function");
959  leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
960  }
961 
962  /*
963  * To perform the split, we must insert a new inner tuple, which can't go
964  * on a leaf page; and unless we are splitting the root page, we must then
965  * update the parent tuple's downlink to point to the inner tuple. If
966  * there is room, we'll put the new inner tuple on the same page as the
967  * parent tuple, otherwise we need another non-leaf buffer. But if the
968  * parent page is the root, we can't add the new inner tuple there,
969  * because the root page must have only one inner tuple.
970  */
971  xlrec.initInner = false;
972  if (parent->buffer != InvalidBuffer &&
973  !SpGistBlockIsRoot(parent->blkno) &&
974  (SpGistPageGetFreeSpace(parent->page, 1) >=
975  innerTuple->size + sizeof(ItemIdData)))
976  {
977  /* New inner tuple will fit on parent page */
978  newInnerBuffer = parent->buffer;
979  }
980  else if (parent->buffer != InvalidBuffer)
981  {
982  /* Send tuple to page with next triple parity (see README) */
983  newInnerBuffer = SpGistGetBuffer(index,
984  GBUF_INNER_PARITY(parent->blkno + 1) |
985  (isNulls ? GBUF_NULLS : 0),
986  innerTuple->size + sizeof(ItemIdData),
987  &xlrec.initInner);
988  }
989  else
990  {
991  /* Root page split ... inner tuple will go to root page */
992  newInnerBuffer = InvalidBuffer;
993  }
994 
995  /*
996  * The new leaf tuples converted from the existing ones should require the
997  * same or less space, and therefore should all fit onto one page
998  * (although that's not necessarily the current page, since we can't
999  * delete the old tuples but only replace them with placeholders).
1000  * However, the incoming new tuple might not also fit, in which case we
1001  * might need another picksplit cycle to reduce it some more.
1002  *
1003  * If there's not room to put everything back onto the current page, then
1004  * we decide on a per-node basis which tuples go to the new page. (We do
1005  * it like that because leaf tuple chains can't cross pages, so we must
1006  * place all leaf tuples belonging to the same parent node on the same
1007  * page.)
1008  *
1009  * If we are splitting the root page (turning it from a leaf page into an
1010  * inner page), then no leaf tuples can go back to the current page; they
1011  * must all go somewhere else.
1012  */
1013  if (!SpGistBlockIsRoot(current->blkno))
1014  currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
1015  else
1016  currentFreeSpace = 0; /* prevent assigning any tuples to current */
1017 
1018  xlrec.initDest = false;
1019 
1020  if (totalLeafSizes <= currentFreeSpace)
1021  {
1022  /* All the leaf tuples will fit on current page */
1023  newLeafBuffer = InvalidBuffer;
1024  /* mark new leaf tuple as included in insertions, if allowed */
1025  if (includeNew)
1026  {
1027  nToInsert++;
1028  insertedNew = true;
1029  }
1030  for (i = 0; i < nToInsert; i++)
1031  leafPageSelect[i] = 0; /* signifies current page */
1032  }
1033  else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1034  {
1035  /*
1036  * We're trying to split up a long value by repeated suffixing, but
1037  * it's not going to fit yet. Don't bother allocating a second leaf
1038  * buffer that we won't be able to use.
1039  */
1040  newLeafBuffer = InvalidBuffer;
1041  Assert(includeNew);
1042  Assert(nToInsert == 0);
1043  }
1044  else
1045  {
1046  /* We will need another leaf page */
1047  uint8 *nodePageSelect;
1048  int curspace;
1049  int newspace;
1050 
1051  newLeafBuffer = SpGistGetBuffer(index,
1052  GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1053  Min(totalLeafSizes,
1055  &xlrec.initDest);
1056 
1057  /*
1058  * Attempt to assign node groups to the two pages. We might fail to
1059  * do so, even if totalLeafSizes is less than the available space,
1060  * because we can't split a group across pages.
1061  */
1062  nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1063 
1064  curspace = currentFreeSpace;
1065  newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1066  for (i = 0; i < out.nNodes; i++)
1067  {
1068  if (leafSizes[i] <= curspace)
1069  {
1070  nodePageSelect[i] = 0; /* signifies current page */
1071  curspace -= leafSizes[i];
1072  }
1073  else
1074  {
1075  nodePageSelect[i] = 1; /* signifies new leaf page */
1076  newspace -= leafSizes[i];
1077  }
1078  }
1079  if (curspace >= 0 && newspace >= 0)
1080  {
1081  /* Successful assignment, so we can include the new leaf tuple */
1082  if (includeNew)
1083  {
1084  nToInsert++;
1085  insertedNew = true;
1086  }
1087  }
1088  else if (includeNew)
1089  {
1090  /* We must exclude the new leaf tuple from the split */
1091  int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1092 
1093  leafSizes[nodeOfNewTuple] -=
1094  newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1095 
1096  /* Repeat the node assignment process --- should succeed now */
1097  curspace = currentFreeSpace;
1098  newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1099  for (i = 0; i < out.nNodes; i++)
1100  {
1101  if (leafSizes[i] <= curspace)
1102  {
1103  nodePageSelect[i] = 0; /* signifies current page */
1104  curspace -= leafSizes[i];
1105  }
1106  else
1107  {
1108  nodePageSelect[i] = 1; /* signifies new leaf page */
1109  newspace -= leafSizes[i];
1110  }
1111  }
1112  if (curspace < 0 || newspace < 0)
1113  elog(ERROR, "failed to divide leaf tuple groups across pages");
1114  }
1115  else
1116  {
1117  /* oops, we already excluded new tuple ... should not get here */
1118  elog(ERROR, "failed to divide leaf tuple groups across pages");
1119  }
1120  /* Expand the per-node assignments to be shown per leaf tuple */
1121  for (i = 0; i < nToInsert; i++)
1122  {
1123  n = out.mapTuplesToNodes[i];
1124  leafPageSelect[i] = nodePageSelect[n];
1125  }
1126  }
1127 
1128  /* Start preparing WAL record */
1129  xlrec.nDelete = 0;
1130  xlrec.initSrc = isNew;
1131  xlrec.storesNulls = isNulls;
1132  xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1133 
1134  leafdata = leafptr = (char *) palloc(totalLeafSizes);
1135 
1136  /* Here we begin making the changes to the target pages */
1138 
1139  /*
1140  * Delete old leaf tuples from current buffer, except when we're splitting
1141  * the root; in that case there's no need because we'll re-init the page
1142  * below. We do this first to make room for reinserting new leaf tuples.
1143  */
1144  if (!SpGistBlockIsRoot(current->blkno))
1145  {
1146  /*
1147  * Init buffer instead of deleting individual tuples, but only if
1148  * there aren't any other live tuples and only during build; otherwise
1149  * we need to set a redirection tuple for concurrent scans.
1150  */
1151  if (state->isBuild &&
1152  nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1153  PageGetMaxOffsetNumber(current->page))
1154  {
1155  SpGistInitBuffer(current->buffer,
1156  SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1157  xlrec.initSrc = true;
1158  }
1159  else if (isNew)
1160  {
1161  /* don't expose the freshly init'd buffer as a backup block */
1162  Assert(nToDelete == 0);
1163  }
1164  else
1165  {
1166  xlrec.nDelete = nToDelete;
1167 
1168  if (!state->isBuild)
1169  {
1170  /*
1171  * Need to create redirect tuple (it will point to new inner
1172  * tuple) but right now the new tuple's location is not known
1173  * yet. So, set the redirection pointer to "impossible" value
1174  * and remember its position to update tuple later.
1175  */
1176  if (nToDelete > 0)
1177  redirectTuplePos = toDelete[0];
1179  toDelete, nToDelete,
1184  }
1185  else
1186  {
1187  /*
1188  * During index build there is not concurrent searches, so we
1189  * don't need to create redirection tuple.
1190  */
1192  toDelete, nToDelete,
1197  }
1198  }
1199  }
1200 
1201  /*
1202  * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1203  * nodes.
1204  */
1205  startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1206  for (i = 0; i < nToInsert; i++)
1207  {
1208  SpGistLeafTuple it = newLeafs[i];
1209  Buffer leafBuffer;
1210  BlockNumber leafBlock;
1211  OffsetNumber newoffset;
1212 
1213  /* Which page is it going to? */
1214  leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1215  leafBlock = BufferGetBlockNumber(leafBuffer);
1216 
1217  /* Link tuple into correct chain for its node */
1218  n = out.mapTuplesToNodes[i];
1219 
1220  if (ItemPointerIsValid(&nodes[n]->t_tid))
1221  {
1222  Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1223  SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
1224  }
1225  else
1227 
1228  /* Insert it on page */
1229  newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1230  (Item) it, it->size,
1231  &startOffsets[leafPageSelect[i]],
1232  false);
1233  toInsert[i] = newoffset;
1234 
1235  /* ... and complete the chain linking */
1236  ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1237 
1238  /* Also copy leaf tuple into WAL data */
1239  memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1240  leafptr += newLeafs[i]->size;
1241  }
1242 
1243  /*
1244  * We're done modifying the other leaf buffer (if any), so mark it dirty.
1245  * current->buffer will be marked below, after we're entirely done
1246  * modifying it.
1247  */
1248  if (newLeafBuffer != InvalidBuffer)
1249  {
1250  MarkBufferDirty(newLeafBuffer);
1251  }
1252 
1253  /* Remember current buffer, since we're about to change "current" */
1254  saveCurrent = *current;
1255 
1256  /*
1257  * Store the new innerTuple
1258  */
1259  if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1260  {
1261  /*
1262  * new inner tuple goes to parent page
1263  */
1264  Assert(current->buffer != parent->buffer);
1265 
1266  /* Repoint "current" at the new inner tuple */
1267  current->blkno = parent->blkno;
1268  current->buffer = parent->buffer;
1269  current->page = parent->page;
1270  xlrec.offnumInner = current->offnum =
1271  SpGistPageAddNewItem(state, current->page,
1272  (Item) innerTuple, innerTuple->size,
1273  NULL, false);
1274 
1275  /*
1276  * Update parent node link and mark parent page dirty
1277  */
1278  xlrec.innerIsParent = true;
1279  xlrec.offnumParent = parent->offnum;
1280  xlrec.nodeI = parent->node;
1281  saveNodeLink(index, parent, current->blkno, current->offnum);
1282 
1283  /*
1284  * Update redirection link (in old current buffer)
1285  */
1286  if (redirectTuplePos != InvalidOffsetNumber)
1287  setRedirectionTuple(&saveCurrent, redirectTuplePos,
1288  current->blkno, current->offnum);
1289 
1290  /* Done modifying old current buffer, mark it dirty */
1291  MarkBufferDirty(saveCurrent.buffer);
1292  }
1293  else if (parent->buffer != InvalidBuffer)
1294  {
1295  /*
1296  * new inner tuple will be stored on a new page
1297  */
1298  Assert(newInnerBuffer != InvalidBuffer);
1299 
1300  /* Repoint "current" at the new inner tuple */
1301  current->buffer = newInnerBuffer;
1302  current->blkno = BufferGetBlockNumber(current->buffer);
1303  current->page = BufferGetPage(current->buffer);
1304  xlrec.offnumInner = current->offnum =
1305  SpGistPageAddNewItem(state, current->page,
1306  (Item) innerTuple, innerTuple->size,
1307  NULL, false);
1308 
1309  /* Done modifying new current buffer, mark it dirty */
1310  MarkBufferDirty(current->buffer);
1311 
1312  /*
1313  * Update parent node link and mark parent page dirty
1314  */
1315  xlrec.innerIsParent = (parent->buffer == current->buffer);
1316  xlrec.offnumParent = parent->offnum;
1317  xlrec.nodeI = parent->node;
1318  saveNodeLink(index, parent, current->blkno, current->offnum);
1319 
1320  /*
1321  * Update redirection link (in old current buffer)
1322  */
1323  if (redirectTuplePos != InvalidOffsetNumber)
1324  setRedirectionTuple(&saveCurrent, redirectTuplePos,
1325  current->blkno, current->offnum);
1326 
1327  /* Done modifying old current buffer, mark it dirty */
1328  MarkBufferDirty(saveCurrent.buffer);
1329  }
1330  else
1331  {
1332  /*
1333  * Splitting root page, which was a leaf but now becomes inner page
1334  * (and so "current" continues to point at it)
1335  */
1336  Assert(SpGistBlockIsRoot(current->blkno));
1337  Assert(redirectTuplePos == InvalidOffsetNumber);
1338 
1339  SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1340  xlrec.initInner = true;
1341  xlrec.innerIsParent = false;
1342 
1343  xlrec.offnumInner = current->offnum =
1344  PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1345  InvalidOffsetNumber, false, false);
1346  if (current->offnum != FirstOffsetNumber)
1347  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1348  innerTuple->size);
1349 
1350  /* No parent link to update, nor redirection to do */
1352  xlrec.nodeI = 0;
1353 
1354  /* Done modifying new current buffer, mark it dirty */
1355  MarkBufferDirty(current->buffer);
1356 
1357  /* saveCurrent doesn't represent a different buffer */
1358  saveCurrent.buffer = InvalidBuffer;
1359  }
1360 
1361  if (RelationNeedsWAL(index) && !state->isBuild)
1362  {
1363  XLogRecPtr recptr;
1364  int flags;
1365 
1366  XLogBeginInsert();
1367 
1368  xlrec.nInsert = nToInsert;
1369  XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
1370 
1371  XLogRegisterData((char *) toDelete,
1372  sizeof(OffsetNumber) * xlrec.nDelete);
1373  XLogRegisterData((char *) toInsert,
1374  sizeof(OffsetNumber) * xlrec.nInsert);
1375  XLogRegisterData((char *) leafPageSelect,
1376  sizeof(uint8) * xlrec.nInsert);
1377  XLogRegisterData((char *) innerTuple, innerTuple->size);
1378  XLogRegisterData(leafdata, leafptr - leafdata);
1379 
1380  /* Old leaf page */
1381  if (BufferIsValid(saveCurrent.buffer))
1382  {
1383  flags = REGBUF_STANDARD;
1384  if (xlrec.initSrc)
1385  flags |= REGBUF_WILL_INIT;
1386  XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1387  }
1388 
1389  /* New leaf page */
1390  if (BufferIsValid(newLeafBuffer))
1391  {
1392  flags = REGBUF_STANDARD;
1393  if (xlrec.initDest)
1394  flags |= REGBUF_WILL_INIT;
1395  XLogRegisterBuffer(1, newLeafBuffer, flags);
1396  }
1397 
1398  /* Inner page */
1399  flags = REGBUF_STANDARD;
1400  if (xlrec.initInner)
1401  flags |= REGBUF_WILL_INIT;
1402  XLogRegisterBuffer(2, current->buffer, flags);
1403 
1404  /* Parent page, if different from inner page */
1405  if (parent->buffer != InvalidBuffer)
1406  {
1407  if (parent->buffer != current->buffer)
1409  else
1410  Assert(xlrec.innerIsParent);
1411  }
1412 
1413  /* Issue the WAL record */
1414  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1415 
1416  /* Update page LSNs on all affected pages */
1417  if (newLeafBuffer != InvalidBuffer)
1418  {
1419  Page page = BufferGetPage(newLeafBuffer);
1420 
1421  PageSetLSN(page, recptr);
1422  }
1423 
1424  if (saveCurrent.buffer != InvalidBuffer)
1425  {
1426  Page page = BufferGetPage(saveCurrent.buffer);
1427 
1428  PageSetLSN(page, recptr);
1429  }
1430 
1431  PageSetLSN(current->page, recptr);
1432 
1433  if (parent->buffer != InvalidBuffer)
1434  {
1435  PageSetLSN(parent->page, recptr);
1436  }
1437  }
1438 
1439  END_CRIT_SECTION();
1440 
1441  /* Update local free-space cache and unlock buffers */
1442  if (newLeafBuffer != InvalidBuffer)
1443  {
1444  SpGistSetLastUsedPage(index, newLeafBuffer);
1445  UnlockReleaseBuffer(newLeafBuffer);
1446  }
1447  if (saveCurrent.buffer != InvalidBuffer)
1448  {
1449  SpGistSetLastUsedPage(index, saveCurrent.buffer);
1450  UnlockReleaseBuffer(saveCurrent.buffer);
1451  }
1452 
1453  return insertedNew;
1454 }
uint32 BlockNumber
Definition: block.h:31
#define InvalidBlockNumber
Definition: block.h:33
int Buffer
Definition: buf.h:23
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:3290
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4497
static Page BufferGetPage(Buffer buffer)
Definition: bufmgr.h:355
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:303
Size PageGetExactFreeSpace(Page page)
Definition: bufpage.c:958
Pointer Page
Definition: bufpage.h:78
#define Min(x, y)
Definition: c.h:988
unsigned char uint8
Definition: c.h:488
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1120
FmgrInfo * index_getprocinfo(Relation irel, AttrNumber attnum, uint16 procnum)
Definition: indexam.c:811
static void ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum)
Definition: itemptr.h:135
static OffsetNumber ItemPointerGetOffsetNumber(const ItemPointerData *pointer)
Definition: itemptr.h:124
static BlockNumber ItemPointerGetBlockNumber(const ItemPointerData *pointer)
Definition: itemptr.h:103
static bool ItemPointerIsValid(const ItemPointerData *pointer)
Definition: itemptr.h:83
void * palloc0(Size size)
Definition: mcxt.c:1257
#define INDEX_MAX_KEYS
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
static void setRedirectionTuple(SPPageDesc *current, OffsetNumber position, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:569
static bool checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig, bool *includeNew)
Definition: spgdoinsert.c:600
void spgPageIndexMultiDelete(SpGistState *state, Page page, OffsetNumber *itemnos, int nitems, int firststate, int reststate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:132
#define SPGIST_PICKSPLIT_PROC
Definition: spgist.h:25
#define SGLTDATUM(x, s)
#define GBUF_NULLS
#define SPGIST_REDIRECT
#define SGDTSIZE
#define SPGIST_PLACEHOLDER
#define GBUF_LEAF
#define SPGIST_METAPAGE_BLKNO
#define SPGIST_NULLS
#define STORE_STATE(s, d)
#define SPGIST_PAGE_CAPACITY
#define SpGistPageGetOpaque(page)
#define GBUF_INNER_PARITY(x)
#define SPGIST_LEAF
#define spgKeyColumn
#define SpGistPageGetFreeSpace(p, n)
void spgDeformLeafTuple(SpGistLeafTuple tup, TupleDesc tupleDescriptor, Datum *datums, bool *isnulls, bool keyColumnIsNull)
Definition: spgutils.c:1088
SpGistLeafTuple spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, Datum *datums, bool *isnulls)
Definition: spgutils.c:843
void SpGistInitBuffer(Buffer b, uint16 f)
Definition: spgutils.c:694
Buffer SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
Definition: spgutils.c:541
void SpGistSetLastUsedPage(Relation index, Buffer buffer)
Definition: spgutils.c:645
#define SizeOfSpgxlogPickSplit
Definition: spgxlog.h:199
#define XLOG_SPGIST_PICKSPLIT
Definition: spgxlog.h:26
Definition: fmgr.h:57
unsigned int allTheSame
Datum * datums
Definition: spgist.h:113
bool hasPrefix
Definition: spgist.h:119
Datum * leafTupleDatums
Definition: spgist.h:126
Datum prefixDatum
Definition: spgist.h:120
uint16 nInsert
Definition: spgxlog.h:170
spgxlogState stateSrc
Definition: spgxlog.h:185
bool innerIsParent
Definition: spgxlog.h:181
uint16 nodeI
Definition: spgxlog.h:183
bool storesNulls
Definition: spgxlog.h:178
OffsetNumber offnumParent
Definition: spgxlog.h:182
OffsetNumber offnumInner
Definition: spgxlog.h:175
uint16 nDelete
Definition: spgxlog.h:169
bool isRootSplit
Definition: spgxlog.h:167

References SpGistInnerTupleData::allTheSame, Assert(), SPPageDesc::blkno, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage(), BufferIsValid(), checkAllTheSame(), spgPickSplitIn::datums, elog(), END_CRIT_SECTION, ERROR, FirstOffsetNumber, FunctionCall2Coll(), GBUF_INNER_PARITY, GBUF_LEAF, GBUF_NULLS, spgPickSplitOut::hasPrefix, i, index_getprocinfo(), INDEX_MAX_KEYS, spgxlogPickSplit::initDest, spgxlogPickSplit::initInner, spgxlogPickSplit::initSrc, spgxlogPickSplit::innerIsParent, InvalidBlockNumber, InvalidBuffer, InvalidOffsetNumber, spgxlogPickSplit::isRootSplit, ItemPointerGetBlockNumber(), ItemPointerGetOffsetNumber(), ItemPointerIsValid(), ItemPointerSet(), label, spgPickSplitOut::leafTupleDatums, spgPickSplitIn::level, spgPickSplitOut::mapTuplesToNodes, MarkBufferDirty(), Min, spgxlogPickSplit::nDelete, spgxlogPickSplit::nInsert, spgPickSplitOut::nNodes, SPPageDesc::node, spgxlogPickSplit::nodeI, spgPickSplitOut::nodeLabels, spgPickSplitIn::nTuples, SPPageDesc::offnum, spgxlogPickSplit::offnumInner, spgxlogPickSplit::offnumParent, SPPageDesc::page, PageAddItem, PageGetExactFreeSpace(), PageGetItem(), PageGetItemId(), PageGetMaxOffsetNumber(), PageSetLSN(), palloc(), palloc0(), PointerGetDatum(), spgPickSplitOut::prefixDatum, REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, saveNodeLink(), setRedirectionTuple(), SGDTSIZE, SGITITERATE, SGLT_GET_NEXTOFFSET, SGLT_SET_NEXTOFFSET, SGLTDATUM, SpGistInnerTupleData::size, SpGistLeafTupleData::size, SizeOfSpgxlogPickSplit, spgDeformLeafTuple(), spgFormInnerTuple(), spgFormLeafTuple(), spgFormNodeTuple(), SPGIST_DEAD, SPGIST_LEAF, SPGIST_LIVE, SPGIST_METAPAGE_BLKNO, SPGIST_NULLS, SPGIST_PAGE_CAPACITY, SPGIST_PICKSPLIT_PROC, SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistBlockIsRoot, SpGistGetBuffer(), SpGistInitBuffer(), SpGistPageAddNewItem(), SpGistPageGetFreeSpace, SpGistPageGetOpaque, SpGistSetLastUsedPage(), spgKeyColumn, spgPageIndexMultiDelete(), START_CRIT_SECTION, spgxlogPickSplit::stateSrc, STORE_STATE, spgxlogPickSplit::storesNulls, SpGistLeafTupleData::tupstate, UnlockReleaseBuffer(), XLOG_SPGIST_PICKSPLIT, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

◆ moveLeafs()

static void moveLeafs ( Relation  index,
SpGistState state,
SPPageDesc current,
SPPageDesc parent,
SpGistLeafTuple  newLeafTuple,
bool  isNulls 
)
static

Definition at line 388 of file spgdoinsert.c.

391 {
392  int i,
393  nDelete,
394  nInsert,
395  size;
396  Buffer nbuf;
397  Page npage;
399  startOffset = InvalidOffsetNumber;
400  bool replaceDead = false;
401  OffsetNumber *toDelete;
402  OffsetNumber *toInsert;
403  BlockNumber nblkno;
404  spgxlogMoveLeafs xlrec;
405  char *leafdata,
406  *leafptr;
407 
408  /* This doesn't work on root page */
409  Assert(parent->buffer != InvalidBuffer);
410  Assert(parent->buffer != current->buffer);
411 
412  /* Locate the tuples to be moved, and count up the space needed */
413  i = PageGetMaxOffsetNumber(current->page);
414  toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
415  toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
416 
417  size = newLeafTuple->size + sizeof(ItemIdData);
418 
419  nDelete = 0;
420  i = current->offnum;
421  while (i != InvalidOffsetNumber)
422  {
423  SpGistLeafTuple it;
424 
426  i <= PageGetMaxOffsetNumber(current->page));
427  it = (SpGistLeafTuple) PageGetItem(current->page,
428  PageGetItemId(current->page, i));
429 
430  if (it->tupstate == SPGIST_LIVE)
431  {
432  toDelete[nDelete] = i;
433  size += it->size + sizeof(ItemIdData);
434  nDelete++;
435  }
436  else if (it->tupstate == SPGIST_DEAD)
437  {
438  /* We could see a DEAD tuple as first/only chain item */
439  Assert(i == current->offnum);
441  /* We don't want to move it, so don't count it in size */
442  toDelete[nDelete] = i;
443  nDelete++;
444  replaceDead = true;
445  }
446  else
447  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
448 
449  i = SGLT_GET_NEXTOFFSET(it);
450  }
451 
452  /* Find a leaf page that will hold them */
453  nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
454  size, &xlrec.newPage);
455  npage = BufferGetPage(nbuf);
456  nblkno = BufferGetBlockNumber(nbuf);
457  Assert(nblkno != current->blkno);
458 
459  leafdata = leafptr = palloc(size);
460 
462 
463  /* copy all the old tuples to new page, unless they're dead */
464  nInsert = 0;
465  if (!replaceDead)
466  {
467  for (i = 0; i < nDelete; i++)
468  {
469  SpGistLeafTuple it;
470 
471  it = (SpGistLeafTuple) PageGetItem(current->page,
472  PageGetItemId(current->page, toDelete[i]));
473  Assert(it->tupstate == SPGIST_LIVE);
474 
475  /*
476  * Update chain link (notice the chain order gets reversed, but we
477  * don't care). We're modifying the tuple on the source page
478  * here, but it's okay since we're about to delete it.
479  */
480  SGLT_SET_NEXTOFFSET(it, r);
481 
482  r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
483  &startOffset, false);
484 
485  toInsert[nInsert] = r;
486  nInsert++;
487 
488  /* save modified tuple into leafdata as well */
489  memcpy(leafptr, it, it->size);
490  leafptr += it->size;
491  }
492  }
493 
494  /* add the new tuple as well */
495  SGLT_SET_NEXTOFFSET(newLeafTuple, r);
496  r = SpGistPageAddNewItem(state, npage,
497  (Item) newLeafTuple, newLeafTuple->size,
498  &startOffset, false);
499  toInsert[nInsert] = r;
500  nInsert++;
501  memcpy(leafptr, newLeafTuple, newLeafTuple->size);
502  leafptr += newLeafTuple->size;
503 
504  /*
505  * Now delete the old tuples, leaving a redirection pointer behind for the
506  * first one, unless we're doing an index build; in which case there can't
507  * be any concurrent scan so we need not provide a redirect.
508  */
509  spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
512  nblkno, r);
513 
514  /* Update parent's downlink and mark parent page dirty */
515  saveNodeLink(index, parent, nblkno, r);
516 
517  /* Mark the leaf pages too */
518  MarkBufferDirty(current->buffer);
519  MarkBufferDirty(nbuf);
520 
521  if (RelationNeedsWAL(index) && !state->isBuild)
522  {
523  XLogRecPtr recptr;
524 
525  /* prepare WAL info */
526  STORE_STATE(state, xlrec.stateSrc);
527 
528  xlrec.nMoves = nDelete;
529  xlrec.replaceDead = replaceDead;
530  xlrec.storesNulls = isNulls;
531 
532  xlrec.offnumParent = parent->offnum;
533  xlrec.nodeI = parent->node;
534 
535  XLogBeginInsert();
536  XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
537  XLogRegisterData((char *) toDelete,
538  sizeof(OffsetNumber) * nDelete);
539  XLogRegisterData((char *) toInsert,
540  sizeof(OffsetNumber) * nInsert);
541  XLogRegisterData((char *) leafdata, leafptr - leafdata);
542 
546 
547  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
548 
549  PageSetLSN(current->page, recptr);
550  PageSetLSN(npage, recptr);
551  PageSetLSN(parent->page, recptr);
552  }
553 
555 
556  /* Update local free-space cache and release new buffer */
558  UnlockReleaseBuffer(nbuf);
559 }
#define SizeOfSpgxlogMoveLeafs
Definition: spgxlog.h:91
#define XLOG_SPGIST_MOVE_LEAFS
Definition: spgxlog.h:23
bool replaceDead
Definition: spgxlog.h:68
bool storesNulls
Definition: spgxlog.h:69
uint16 nMoves
Definition: spgxlog.h:66
uint16 nodeI
Definition: spgxlog.h:73
OffsetNumber offnumParent
Definition: spgxlog.h:72
spgxlogState stateSrc
Definition: spgxlog.h:75

References Assert(), SPPageDesc::blkno, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage(), elog(), END_CRIT_SECTION, ERROR, FirstOffsetNumber, GBUF_LEAF, GBUF_NULLS, i, InvalidBuffer, InvalidOffsetNumber, MarkBufferDirty(), spgxlogMoveLeafs::newPage, spgxlogMoveLeafs::nMoves, SPPageDesc::node, spgxlogMoveLeafs::nodeI, SPPageDesc::offnum, spgxlogMoveLeafs::offnumParent, SPPageDesc::page, PageGetItem(), PageGetItemId(), PageGetMaxOffsetNumber(), PageSetLSN(), palloc(), REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, spgxlogMoveLeafs::replaceDead, saveNodeLink(), SGLT_GET_NEXTOFFSET, SGLT_SET_NEXTOFFSET, SpGistLeafTupleData::size, SizeOfSpgxlogMoveLeafs, SPGIST_DEAD, SPGIST_LIVE, SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistGetBuffer(), SpGistPageAddNewItem(), SpGistSetLastUsedPage(), spgPageIndexMultiDelete(), START_CRIT_SECTION, spgxlogMoveLeafs::stateSrc, STORE_STATE, spgxlogMoveLeafs::storesNulls, SpGistLeafTupleData::tupstate, UnlockReleaseBuffer(), XLOG_SPGIST_MOVE_LEAFS, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

◆ saveNodeLink()

static void saveNodeLink ( Relation  index,
SPPageDesc parent,
BlockNumber  blkno,
OffsetNumber  offnum 
)
static

Definition at line 187 of file spgdoinsert.c.

189 {
190  SpGistInnerTuple innerTuple;
191 
192  innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
193  PageGetItemId(parent->page, parent->offnum));
194 
195  spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
196 
197  MarkBufferDirty(parent->buffer);
198 }
void spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN, BlockNumber blkno, OffsetNumber offset)
Definition: spgdoinsert.c:51
SpGistInnerTupleData * SpGistInnerTuple

References SPPageDesc::buffer, MarkBufferDirty(), SPPageDesc::node, SPPageDesc::offnum, SPPageDesc::page, PageGetItem(), PageGetItemId(), and spgUpdateNodeLink().

Referenced by addLeafTuple(), doPickSplit(), moveLeafs(), and spgAddNodeAction().

◆ setRedirectionTuple()

static void setRedirectionTuple ( SPPageDesc current,
OffsetNumber  position,
BlockNumber  blkno,
OffsetNumber  offnum 
)
static

◆ spgAddNodeAction()

static void spgAddNodeAction ( Relation  index,
SpGistState state,
SpGistInnerTuple  innerTuple,
SPPageDesc current,
SPPageDesc parent,
int  nodeN,
Datum  nodeLabel 
)
static

Definition at line 1514 of file spgdoinsert.c.

1518 {
1519  SpGistInnerTuple newInnerTuple;
1520  spgxlogAddNode xlrec;
1521 
1522  /* Should not be applied to nulls */
1523  Assert(!SpGistPageStoresNulls(current->page));
1524 
1525  /* Construct new inner tuple with additional node */
1526  newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1527 
1528  /* Prepare WAL record */
1529  STORE_STATE(state, xlrec.stateSrc);
1530  xlrec.offnum = current->offnum;
1531 
1532  /* we don't fill these unless we need to change the parent downlink */
1533  xlrec.parentBlk = -1;
1535  xlrec.nodeI = 0;
1536 
1537  /* we don't fill these unless tuple has to be moved */
1539  xlrec.newPage = false;
1540 
1541  if (PageGetExactFreeSpace(current->page) >=
1542  newInnerTuple->size - innerTuple->size)
1543  {
1544  /*
1545  * We can replace the inner tuple by new version in-place
1546  */
1548 
1549  PageIndexTupleDelete(current->page, current->offnum);
1550  if (PageAddItem(current->page,
1551  (Item) newInnerTuple, newInnerTuple->size,
1552  current->offnum, false, false) != current->offnum)
1553  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1554  newInnerTuple->size);
1555 
1556  MarkBufferDirty(current->buffer);
1557 
1558  if (RelationNeedsWAL(index) && !state->isBuild)
1559  {
1560  XLogRecPtr recptr;
1561 
1562  XLogBeginInsert();
1563  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1564  XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1565 
1567 
1568  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1569 
1570  PageSetLSN(current->page, recptr);
1571  }
1572 
1573  END_CRIT_SECTION();
1574  }
1575  else
1576  {
1577  /*
1578  * move inner tuple to another page, and update parent
1579  */
1580  SpGistDeadTuple dt;
1581  SPPageDesc saveCurrent;
1582 
1583  /*
1584  * It should not be possible to get here for the root page, since we
1585  * allow only one inner tuple on the root page, and spgFormInnerTuple
1586  * always checks that inner tuples don't exceed the size of a page.
1587  */
1588  if (SpGistBlockIsRoot(current->blkno))
1589  elog(ERROR, "cannot enlarge root tuple any more");
1590  Assert(parent->buffer != InvalidBuffer);
1591 
1592  saveCurrent = *current;
1593 
1594  xlrec.offnumParent = parent->offnum;
1595  xlrec.nodeI = parent->node;
1596 
1597  /*
1598  * obtain new buffer with the same parity as current, since it will be
1599  * a child of same parent tuple
1600  */
1601  current->buffer = SpGistGetBuffer(index,
1602  GBUF_INNER_PARITY(current->blkno),
1603  newInnerTuple->size + sizeof(ItemIdData),
1604  &xlrec.newPage);
1605  current->blkno = BufferGetBlockNumber(current->buffer);
1606  current->page = BufferGetPage(current->buffer);
1607 
1608  /*
1609  * Let's just make real sure new current isn't same as old. Right now
1610  * that's impossible, but if SpGistGetBuffer ever got smart enough to
1611  * delete placeholder tuples before checking space, maybe it wouldn't
1612  * be impossible. The case would appear to work except that WAL
1613  * replay would be subtly wrong, so I think a mere assert isn't enough
1614  * here.
1615  */
1616  if (current->blkno == saveCurrent.blkno)
1617  elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1618 
1619  /*
1620  * New current and parent buffer will both be modified; but note that
1621  * parent buffer could be same as either new or old current.
1622  */
1623  if (parent->buffer == saveCurrent.buffer)
1624  xlrec.parentBlk = 0;
1625  else if (parent->buffer == current->buffer)
1626  xlrec.parentBlk = 1;
1627  else
1628  xlrec.parentBlk = 2;
1629 
1631 
1632  /* insert new ... */
1633  xlrec.offnumNew = current->offnum =
1634  SpGistPageAddNewItem(state, current->page,
1635  (Item) newInnerTuple, newInnerTuple->size,
1636  NULL, false);
1637 
1638  MarkBufferDirty(current->buffer);
1639 
1640  /* update parent's downlink and mark parent page dirty */
1641  saveNodeLink(index, parent, current->blkno, current->offnum);
1642 
1643  /*
1644  * Replace old tuple with a placeholder or redirection tuple. Unless
1645  * doing an index build, we have to insert a redirection tuple for
1646  * possible concurrent scans. We can't just delete it in any case,
1647  * because that could change the offsets of other tuples on the page,
1648  * breaking downlinks from their parents.
1649  */
1650  if (state->isBuild)
1653  else
1655  current->blkno, current->offnum);
1656 
1657  PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1658  if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1659  saveCurrent.offnum,
1660  false, false) != saveCurrent.offnum)
1661  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1662  dt->size);
1663 
1664  if (state->isBuild)
1665  SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1666  else
1667  SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1668 
1669  MarkBufferDirty(saveCurrent.buffer);
1670 
1671  if (RelationNeedsWAL(index) && !state->isBuild)
1672  {
1673  XLogRecPtr recptr;
1674  int flags;
1675 
1676  XLogBeginInsert();
1677 
1678  /* orig page */
1679  XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1680  /* new page */
1681  flags = REGBUF_STANDARD;
1682  if (xlrec.newPage)
1683  flags |= REGBUF_WILL_INIT;
1684  XLogRegisterBuffer(1, current->buffer, flags);
1685  /* parent page (if different from orig and new) */
1686  if (xlrec.parentBlk == 2)
1688 
1689  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1690  XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1691 
1692  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1693 
1694  /* we don't bother to check if any of these are redundant */
1695  PageSetLSN(current->page, recptr);
1696  PageSetLSN(parent->page, recptr);
1697  PageSetLSN(saveCurrent.page, recptr);
1698  }
1699 
1700  END_CRIT_SECTION();
1701 
1702  /* Release saveCurrent if it's not same as current or parent */
1703  if (saveCurrent.buffer != current->buffer &&
1704  saveCurrent.buffer != parent->buffer)
1705  {
1706  SpGistSetLastUsedPage(index, saveCurrent.buffer);
1707  UnlockReleaseBuffer(saveCurrent.buffer);
1708  }
1709  }
1710 }
static SpGistInnerTuple addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
Definition: spgdoinsert.c:79
#define SpGistPageStoresNulls(page)
SpGistDeadTuple spgFormDeadTuple(SpGistState *state, int tupstate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgutils.c:1057
#define XLOG_SPGIST_ADD_NODE
Definition: spgxlog.h:24
OffsetNumber offnumNew
Definition: spgxlog.h:111
bool newPage
Definition: spgxlog.h:112
OffsetNumber offnumParent
Definition: spgxlog.h:126
OffsetNumber offnum
Definition: spgxlog.h:105
spgxlogState stateSrc
Definition: spgxlog.h:130
uint16 nodeI
Definition: spgxlog.h:128
int8 parentBlk
Definition: spgxlog.h:125

References addNode(), Assert(), SPPageDesc::blkno, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage(), elog(), END_CRIT_SECTION, ERROR, GBUF_INNER_PARITY, InvalidBlockNumber, InvalidBuffer, InvalidOffsetNumber, MarkBufferDirty(), spgxlogAddNode::newPage, SPPageDesc::node, spgxlogAddNode::nodeI, SPPageDesc::offnum, spgxlogAddNode::offnum, spgxlogAddNode::offnumNew, spgxlogAddNode::offnumParent, SPPageDesc::page, PageAddItem, PageGetExactFreeSpace(), PageIndexTupleDelete(), PageSetLSN(), spgxlogAddNode::parentBlk, REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, saveNodeLink(), SpGistInnerTupleData::size, SpGistDeadTupleData::size, spgFormDeadTuple(), SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistBlockIsRoot, SpGistGetBuffer(), SpGistPageAddNewItem(), SpGistPageGetOpaque, SpGistPageStoresNulls, SpGistSetLastUsedPage(), START_CRIT_SECTION, spgxlogAddNode::stateSrc, STORE_STATE, UnlockReleaseBuffer(), XLOG_SPGIST_ADD_NODE, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

◆ spgdoinsert()

bool spgdoinsert ( Relation  index,
SpGistState state,
ItemPointer  heapPtr,
Datum datums,
bool isnulls 
)

Definition at line 1915 of file spgdoinsert.c.

1917 {
1918  bool result = true;
1919  TupleDesc leafDescriptor = state->leafTupDesc;
1920  bool isnull = isnulls[spgKeyColumn];
1921  int level = 0;
1922  Datum leafDatums[INDEX_MAX_KEYS];
1923  int leafSize;
1924  int bestLeafSize;
1925  int numNoProgressCycles = 0;
1926  SPPageDesc current,
1927  parent;
1928  FmgrInfo *procinfo = NULL;
1929 
1930  /*
1931  * Look up FmgrInfo of the user-defined choose function once, to save
1932  * cycles in the loop below.
1933  */
1934  if (!isnull)
1935  procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1936 
1937  /*
1938  * Prepare the leaf datum to insert.
1939  *
1940  * If an optional "compress" method is provided, then call it to form the
1941  * leaf key datum from the input datum. Otherwise, store the input datum
1942  * as is. Since we don't use index_form_tuple in this AM, we have to make
1943  * sure value to be inserted is not toasted; FormIndexDatum doesn't
1944  * guarantee that. But we assume the "compress" method to return an
1945  * untoasted value.
1946  */
1947  if (!isnull)
1948  {
1950  {
1951  FmgrInfo *compressProcinfo = NULL;
1952 
1953  compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
1954  leafDatums[spgKeyColumn] =
1955  FunctionCall1Coll(compressProcinfo,
1956  index->rd_indcollation[spgKeyColumn],
1957  datums[spgKeyColumn]);
1958  }
1959  else
1960  {
1961  Assert(state->attLeafType.type == state->attType.type);
1962 
1963  if (state->attType.attlen == -1)
1964  leafDatums[spgKeyColumn] =
1966  else
1967  leafDatums[spgKeyColumn] = datums[spgKeyColumn];
1968  }
1969  }
1970  else
1971  leafDatums[spgKeyColumn] = (Datum) 0;
1972 
1973  /* Likewise, ensure that any INCLUDE values are not toasted */
1974  for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++)
1975  {
1976  if (!isnulls[i])
1977  {
1978  if (TupleDescAttr(leafDescriptor, i)->attlen == -1)
1979  leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i]));
1980  else
1981  leafDatums[i] = datums[i];
1982  }
1983  else
1984  leafDatums[i] = (Datum) 0;
1985  }
1986 
1987  /*
1988  * Compute space needed for a leaf tuple containing the given data.
1989  */
1990  leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls);
1991  /* Account for an item pointer, too */
1992  leafSize += sizeof(ItemIdData);
1993 
1994  /*
1995  * If it isn't gonna fit, and the opclass can't reduce the datum size by
1996  * suffixing, bail out now rather than doing a lot of useless work.
1997  */
1998  if (leafSize > SPGIST_PAGE_CAPACITY &&
1999  (isnull || !state->config.longValuesOK))
2000  ereport(ERROR,
2001  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2002  errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2003  leafSize - sizeof(ItemIdData),
2004  SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2006  errhint("Values larger than a buffer page cannot be indexed.")));
2007  bestLeafSize = leafSize;
2008 
2009  /* Initialize "current" to the appropriate root page */
2010  current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
2011  current.buffer = InvalidBuffer;
2012  current.page = NULL;
2013  current.offnum = FirstOffsetNumber;
2014  current.node = -1;
2015 
2016  /* "parent" is invalid for the moment */
2017  parent.blkno = InvalidBlockNumber;
2018  parent.buffer = InvalidBuffer;
2019  parent.page = NULL;
2020  parent.offnum = InvalidOffsetNumber;
2021  parent.node = -1;
2022 
2023  /*
2024  * Before entering the loop, try to clear any pending interrupt condition.
2025  * If a query cancel is pending, we might as well accept it now not later;
2026  * while if a non-canceling condition is pending, servicing it here avoids
2027  * having to restart the insertion and redo all the work so far.
2028  */
2030 
2031  for (;;)
2032  {
2033  bool isNew = false;
2034 
2035  /*
2036  * Bail out if query cancel is pending. We must have this somewhere
2037  * in the loop since a broken opclass could produce an infinite
2038  * picksplit loop. However, because we'll be holding buffer lock(s)
2039  * after the first iteration, ProcessInterrupts() wouldn't be able to
2040  * throw a cancel error here. Hence, if we see that an interrupt is
2041  * pending, break out of the loop and deal with the situation below.
2042  * Set result = false because we must restart the insertion if the
2043  * interrupt isn't a query-cancel-or-die case.
2044  */
2046  {
2047  result = false;
2048  break;
2049  }
2050 
2051  if (current.blkno == InvalidBlockNumber)
2052  {
2053  /*
2054  * Create a leaf page. If leafSize is too large to fit on a page,
2055  * we won't actually use the page yet, but it simplifies the API
2056  * for doPickSplit to always have a leaf page at hand; so just
2057  * quietly limit our request to a page size.
2058  */
2059  current.buffer =
2061  GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
2062  Min(leafSize, SPGIST_PAGE_CAPACITY),
2063  &isNew);
2064  current.blkno = BufferGetBlockNumber(current.buffer);
2065  }
2066  else if (parent.buffer == InvalidBuffer)
2067  {
2068  /* we hold no parent-page lock, so no deadlock is possible */
2069  current.buffer = ReadBuffer(index, current.blkno);
2071  }
2072  else if (current.blkno != parent.blkno)
2073  {
2074  /* descend to a new child page */
2075  current.buffer = ReadBuffer(index, current.blkno);
2076 
2077  /*
2078  * Attempt to acquire lock on child page. We must beware of
2079  * deadlock against another insertion process descending from that
2080  * page to our parent page (see README). If we fail to get lock,
2081  * abandon the insertion and tell our caller to start over.
2082  *
2083  * XXX this could be improved, because failing to get lock on a
2084  * buffer is not proof of a deadlock situation; the lock might be
2085  * held by a reader, or even just background writer/checkpointer
2086  * process. Perhaps it'd be worth retrying after sleeping a bit?
2087  */
2088  if (!ConditionalLockBuffer(current.buffer))
2089  {
2090  ReleaseBuffer(current.buffer);
2091  UnlockReleaseBuffer(parent.buffer);
2092  return false;
2093  }
2094  }
2095  else
2096  {
2097  /* inner tuple can be stored on the same page as parent one */
2098  current.buffer = parent.buffer;
2099  }
2100  current.page = BufferGetPage(current.buffer);
2101 
2102  /* should not arrive at a page of the wrong type */
2103  if (isnull ? !SpGistPageStoresNulls(current.page) :
2104  SpGistPageStoresNulls(current.page))
2105  elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2106  current.blkno);
2107 
2108  if (SpGistPageIsLeaf(current.page))
2109  {
2110  SpGistLeafTuple leafTuple;
2111  int nToSplit,
2112  sizeToSplit;
2113 
2114  leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls);
2115  if (leafTuple->size + sizeof(ItemIdData) <=
2116  SpGistPageGetFreeSpace(current.page, 1))
2117  {
2118  /* it fits on page, so insert it and we're done */
2119  addLeafTuple(index, state, leafTuple,
2120  &current, &parent, isnull, isNew);
2121  break;
2122  }
2123  else if ((sizeToSplit =
2124  checkSplitConditions(index, state, &current,
2125  &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
2126  nToSplit < 64 &&
2127  leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2128  {
2129  /*
2130  * the amount of data is pretty small, so just move the whole
2131  * chain to another leaf page rather than splitting it.
2132  */
2133  Assert(!isNew);
2134  moveLeafs(index, state, &current, &parent, leafTuple, isnull);
2135  break; /* we're done */
2136  }
2137  else
2138  {
2139  /* picksplit */
2140  if (doPickSplit(index, state, &current, &parent,
2141  leafTuple, level, isnull, isNew))
2142  break; /* doPickSplit installed new tuples */
2143 
2144  /* leaf tuple will not be inserted yet */
2145  pfree(leafTuple);
2146 
2147  /*
2148  * current now describes new inner tuple, go insert into it
2149  */
2150  Assert(!SpGistPageIsLeaf(current.page));
2151  goto process_inner_tuple;
2152  }
2153  }
2154  else /* non-leaf page */
2155  {
2156  /*
2157  * Apply the opclass choose function to figure out how to insert
2158  * the given datum into the current inner tuple.
2159  */
2160  SpGistInnerTuple innerTuple;
2161  spgChooseIn in;
2162  spgChooseOut out;
2163 
2164  /*
2165  * spgAddNode and spgSplitTuple cases will loop back to here to
2166  * complete the insertion operation. Just in case the choose
2167  * function is broken and produces add or split requests
2168  * repeatedly, check for query cancel (see comments above).
2169  */
2170  process_inner_tuple:
2172  {
2173  result = false;
2174  break;
2175  }
2176 
2177  innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2178  PageGetItemId(current.page, current.offnum));
2179 
2180  in.datum = datums[spgKeyColumn];
2181  in.leafDatum = leafDatums[spgKeyColumn];
2182  in.level = level;
2183  in.allTheSame = innerTuple->allTheSame;
2184  in.hasPrefix = (innerTuple->prefixSize > 0);
2185  in.prefixDatum = SGITDATUM(innerTuple, state);
2186  in.nNodes = innerTuple->nNodes;
2187  in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2188 
2189  memset(&out, 0, sizeof(out));
2190 
2191  if (!isnull)
2192  {
2193  /* use user-defined choose method */
2194  FunctionCall2Coll(procinfo,
2195  index->rd_indcollation[0],
2196  PointerGetDatum(&in),
2197  PointerGetDatum(&out));
2198  }
2199  else
2200  {
2201  /* force "match" action (to insert to random subnode) */
2202  out.resultType = spgMatchNode;
2203  }
2204 
2205  if (innerTuple->allTheSame)
2206  {
2207  /*
2208  * It's not allowed to do an AddNode at an allTheSame tuple.
2209  * Opclass must say "match", in which case we choose a random
2210  * one of the nodes to descend into, or "split".
2211  */
2212  if (out.resultType == spgAddNode)
2213  elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2214  else if (out.resultType == spgMatchNode)
2215  out.result.matchNode.nodeN =
2217  0, innerTuple->nNodes - 1);
2218  }
2219 
2220  switch (out.resultType)
2221  {
2222  case spgMatchNode:
2223  /* Descend to N'th child node */
2224  spgMatchNodeAction(index, state, innerTuple,
2225  &current, &parent,
2226  out.result.matchNode.nodeN);
2227  /* Adjust level as per opclass request */
2228  level += out.result.matchNode.levelAdd;
2229  /* Replace leafDatum and recompute leafSize */
2230  if (!isnull)
2231  {
2232  leafDatums[spgKeyColumn] = out.result.matchNode.restDatum;
2233  leafSize = SpGistGetLeafTupleSize(leafDescriptor,
2234  leafDatums, isnulls);
2235  leafSize += sizeof(ItemIdData);
2236  }
2237 
2238  /*
2239  * Check new tuple size; fail if it can't fit, unless the
2240  * opclass says it can handle the situation by suffixing.
2241  *
2242  * However, the opclass can only shorten the leaf datum,
2243  * which may not be enough to ever make the tuple fit,
2244  * since INCLUDE columns might alone use more than a page.
2245  * Depending on the opclass' behavior, that could lead to
2246  * an infinite loop --- spgtextproc.c, for example, will
2247  * just repeatedly generate an empty-string leaf datum
2248  * once it runs out of data. Actual bugs in opclasses
2249  * might cause infinite looping, too. To detect such a
2250  * loop, check to see if we are making progress by
2251  * reducing the leafSize in each pass. This is a bit
2252  * tricky though. Because of alignment considerations,
2253  * the total tuple size might not decrease on every pass.
2254  * Also, there are edge cases where the choose method
2255  * might seem to not make progress for a cycle or two.
2256  * Somewhat arbitrarily, we allow up to 10 no-progress
2257  * iterations before failing. (This limit should be more
2258  * than MAXALIGN, to accommodate opclasses that trim one
2259  * byte from the leaf datum per pass.)
2260  */
2261  if (leafSize > SPGIST_PAGE_CAPACITY)
2262  {
2263  bool ok = false;
2264 
2265  if (state->config.longValuesOK && !isnull)
2266  {
2267  if (leafSize < bestLeafSize)
2268  {
2269  ok = true;
2270  bestLeafSize = leafSize;
2271  numNoProgressCycles = 0;
2272  }
2273  else if (++numNoProgressCycles < 10)
2274  ok = true;
2275  }
2276  if (!ok)
2277  ereport(ERROR,
2278  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2279  errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2280  leafSize - sizeof(ItemIdData),
2281  SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2283  errhint("Values larger than a buffer page cannot be indexed.")));
2284  }
2285 
2286  /*
2287  * Loop around and attempt to insert the new leafDatum at
2288  * "current" (which might reference an existing child
2289  * tuple, or might be invalid to force us to find a new
2290  * page for the tuple).
2291  */
2292  break;
2293  case spgAddNode:
2294  /* AddNode is not sensible if nodes don't have labels */
2295  if (in.nodeLabels == NULL)
2296  elog(ERROR, "cannot add a node to an inner tuple without node labels");
2297  /* Add node to inner tuple, per request */
2298  spgAddNodeAction(index, state, innerTuple,
2299  &current, &parent,
2300  out.result.addNode.nodeN,
2301  out.result.addNode.nodeLabel);
2302 
2303  /*
2304  * Retry insertion into the enlarged node. We assume that
2305  * we'll get a MatchNode result this time.
2306  */
2307  goto process_inner_tuple;
2308  break;
2309  case spgSplitTuple:
2310  /* Split inner tuple, per request */
2311  spgSplitNodeAction(index, state, innerTuple,
2312  &current, &out);
2313 
2314  /* Retry insertion into the split node */
2315  goto process_inner_tuple;
2316  break;
2317  default:
2318  elog(ERROR, "unrecognized SPGiST choose result: %d",
2319  (int) out.resultType);
2320  break;
2321  }
2322  }
2323  } /* end loop */
2324 
2325  /*
2326  * Release any buffers we're still holding. Beware of possibility that
2327  * current and parent reference same buffer.
2328  */
2329  if (current.buffer != InvalidBuffer)
2330  {
2332  UnlockReleaseBuffer(current.buffer);
2333  }
2334  if (parent.buffer != InvalidBuffer &&
2335  parent.buffer != current.buffer)
2336  {
2338  UnlockReleaseBuffer(parent.buffer);
2339  }
2340 
2341  /*
2342  * We do not support being called while some outer function is holding a
2343  * buffer lock (or any other reason to postpone query cancels). If that
2344  * were the case, telling the caller to retry would create an infinite
2345  * loop.
2346  */
2348 
2349  /*
2350  * Finally, check for interrupts again. If there was a query cancel,
2351  * ProcessInterrupts() will be able to throw the error here. If it was
2352  * some other kind of interrupt that can just be cleared, return false to
2353  * tell our caller to retry.
2354  */
2356 
2357  return result;
2358 }
bool ConditionalLockBuffer(Buffer buffer)
Definition: bufmgr.c:4741
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4480
void LockBuffer(Buffer buffer, int mode)
Definition: bufmgr.c:4715
Buffer ReadBuffer(Relation reln, BlockNumber blockNum)
Definition: bufmgr.c:708
#define BUFFER_LOCK_EXCLUSIVE
Definition: bufmgr.h:159
#define OidIsValid(objectId)
Definition: c.h:759
int errhint(const char *fmt,...)
Definition: elog.c:1316
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ereport(elevel,...)
Definition: elog.h:149
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
Definition: fmgr.c:1100
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:240
RegProcedure index_getprocid(Relation irel, AttrNumber attnum, uint16 procnum)
Definition: indexam.c:777
void pfree(void *pointer)
Definition: mcxt.c:1456
#define INTERRUPTS_CAN_BE_PROCESSED()
Definition: miscadmin.h:128
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
#define INTERRUPTS_PENDING_CONDITION()
Definition: miscadmin.h:112
uint64 pg_prng_uint64_range(pg_prng_state *state, uint64 rmin, uint64 rmax)
Definition: pg_prng.c:144
pg_prng_state pg_global_prng_state
Definition: pg_prng.c:34
#define RelationGetRelationName(relation)
Definition: rel.h:538
static void spgAddNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN, Datum nodeLabel)
Definition: spgdoinsert.c:1514
static void spgSplitNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, spgChooseOut *out)
Definition: spgdoinsert.c:1716
static void spgMatchNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN)
Definition: spgdoinsert.c:1460
static bool doPickSplit(Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, int level, bool isNulls, bool isNew)
Definition: spgdoinsert.c:678
static void moveLeafs(Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, bool isNulls)
Definition: spgdoinsert.c:388
static int checkSplitConditions(Relation index, SpGistState *state, SPPageDesc *current, int *nToSplit)
Definition: spgdoinsert.c:334
static void addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
Definition: spgdoinsert.c:204
#define SPGIST_COMPRESS_PROC
Definition: spgist.h:28
#define SPGIST_CHOOSE_PROC
Definition: spgist.h:24
@ spgMatchNode
Definition: spgist.h:69
@ spgAddNode
Definition: spgist.h:70
@ spgSplitTuple
Definition: spgist.h:71
#define SPGIST_NULL_BLKNO
#define spgFirstIncludeColumn
#define SpGistPageIsLeaf(page)
#define SPGIST_ROOT_BLKNO
Datum * spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple)
Definition: spgutils.c:1133
Size SpGistGetLeafTupleSize(TupleDesc tupleDescriptor, Datum *datums, bool *isnulls)
Definition: spgutils.c:790
Datum * nodeLabels
Definition: spgist.h:64
bool hasPrefix
Definition: spgist.h:61
Datum prefixDatum
Definition: spgist.h:62
int nNodes
Definition: spgist.h:63
Datum datum
Definition: spgist.h:55
int level
Definition: spgist.h:57
Datum leafDatum
Definition: spgist.h:56
bool allTheSame
Definition: spgist.h:60
spgChooseResultType resultType
Definition: spgist.h:76
struct spgChooseOut::@46::@48 addNode
union spgChooseOut::@46 result
struct spgChooseOut::@46::@47 matchNode
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92

References addLeafTuple(), spgChooseOut::addNode, spgChooseIn::allTheSame, SpGistInnerTupleData::allTheSame, Assert(), SPPageDesc::blkno, SPPageDesc::buffer, BUFFER_LOCK_EXCLUSIVE, BufferGetBlockNumber(), BufferGetPage(), CHECK_FOR_INTERRUPTS, checkSplitConditions(), ConditionalLockBuffer(), spgChooseIn::datum, doPickSplit(), elog(), ereport, errcode(), errhint(), errmsg(), ERROR, FirstOffsetNumber, FunctionCall1Coll(), FunctionCall2Coll(), GBUF_LEAF, GBUF_NULLS, spgChooseIn::hasPrefix, i, index_getprocid(), index_getprocinfo(), INDEX_MAX_KEYS, INTERRUPTS_CAN_BE_PROCESSED, INTERRUPTS_PENDING_CONDITION, InvalidBlockNumber, InvalidBuffer, InvalidOffsetNumber, spgChooseIn::leafDatum, spgChooseIn::level, LockBuffer(), spgChooseOut::matchNode, Min, moveLeafs(), TupleDescData::natts, spgChooseIn::nNodes, SpGistInnerTupleData::nNodes, SPPageDesc::node, spgChooseIn::nodeLabels, SPPageDesc::offnum, OidIsValid, SPPageDesc::page, PageGetItem(), PageGetItemId(), pfree(), PG_DETOAST_DATUM, pg_global_prng_state, pg_prng_uint64_range(), PointerGetDatum(), spgChooseIn::prefixDatum, SpGistInnerTupleData::prefixSize, ReadBuffer(), RelationGetRelationName, ReleaseBuffer(), spgChooseOut::result, spgChooseOut::resultType, SGITDATUM, SpGistLeafTupleData::size, spgAddNode, spgAddNodeAction(), spgExtractNodeLabels(), spgFirstIncludeColumn, spgFormLeafTuple(), SPGIST_CHOOSE_PROC, SPGIST_COMPRESS_PROC, SPGIST_NULL_BLKNO, SPGIST_PAGE_CAPACITY, SPGIST_ROOT_BLKNO, SpGistGetBuffer(), SpGistGetLeafTupleSize(), SpGistPageGetFreeSpace, SpGistPageIsLeaf, SpGistPageStoresNulls, SpGistSetLastUsedPage(), spgKeyColumn, spgMatchNode, spgMatchNodeAction(), spgSplitNodeAction(), spgSplitTuple, TupleDescAttr, and UnlockReleaseBuffer().

Referenced by spginsert(), and spgistBuildCallback().

◆ spgMatchNodeAction()

static void spgMatchNodeAction ( Relation  index,
SpGistState state,
SpGistInnerTuple  innerTuple,
SPPageDesc current,
SPPageDesc parent,
int  nodeN 
)
static

Definition at line 1460 of file spgdoinsert.c.

1463 {
1464  int i;
1465  SpGistNodeTuple node;
1466 
1467  /* Release previous parent buffer if any */
1468  if (parent->buffer != InvalidBuffer &&
1469  parent->buffer != current->buffer)
1470  {
1472  UnlockReleaseBuffer(parent->buffer);
1473  }
1474 
1475  /* Repoint parent to specified node of current inner tuple */
1476  parent->blkno = current->blkno;
1477  parent->buffer = current->buffer;
1478  parent->page = current->page;
1479  parent->offnum = current->offnum;
1480  parent->node = nodeN;
1481 
1482  /* Locate that node */
1483  SGITITERATE(innerTuple, i, node)
1484  {
1485  if (i == nodeN)
1486  break;
1487  }
1488 
1489  if (i != nodeN)
1490  elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1491  nodeN);
1492 
1493  /* Point current to the downlink location, if any */
1494  if (ItemPointerIsValid(&node->t_tid))
1495  {
1496  current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1497  current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1498  }
1499  else
1500  {
1501  /* Downlink is empty, so we'll need to find a new page */
1502  current->blkno = InvalidBlockNumber;
1503  current->offnum = InvalidOffsetNumber;
1504  }
1505 
1506  current->buffer = InvalidBuffer;
1507  current->page = NULL;
1508 }
ItemPointerData t_tid
Definition: itup.h:37

References SPPageDesc::blkno, SPPageDesc::buffer, elog(), ERROR, i, InvalidBlockNumber, InvalidBuffer, InvalidOffsetNumber, ItemPointerGetBlockNumber(), ItemPointerGetOffsetNumber(), ItemPointerIsValid(), SPPageDesc::node, SPPageDesc::offnum, SPPageDesc::page, SGITITERATE, SpGistSetLastUsedPage(), IndexTupleData::t_tid, and UnlockReleaseBuffer().

Referenced by spgdoinsert().

◆ spgPageIndexMultiDelete()

void spgPageIndexMultiDelete ( SpGistState state,
Page  page,
OffsetNumber itemnos,
int  nitems,
int  firststate,
int  reststate,
BlockNumber  blkno,
OffsetNumber  offnum 
)

Definition at line 132 of file spgdoinsert.c.

136 {
137  OffsetNumber firstItem;
139  SpGistDeadTuple tuple = NULL;
140  int i;
141 
142  if (nitems == 0)
143  return; /* nothing to do */
144 
145  /*
146  * For efficiency we want to use PageIndexMultiDelete, which requires the
147  * targets to be listed in sorted order, so we have to sort the itemnos
148  * array. (This also greatly simplifies the math for reinserting the
149  * replacement tuples.) However, we must not scribble on the caller's
150  * array, so we have to make a copy.
151  */
152  memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
153  if (nitems > 1)
154  qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
155 
156  PageIndexMultiDelete(page, sortednos, nitems);
157 
158  firstItem = itemnos[0];
159 
160  for (i = 0; i < nitems; i++)
161  {
162  OffsetNumber itemno = sortednos[i];
163  int tupstate;
164 
165  tupstate = (itemno == firstItem) ? firststate : reststate;
166  if (tuple == NULL || tuple->tupstate != tupstate)
167  tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
168 
169  if (PageAddItem(page, (Item) tuple, tuple->size,
170  itemno, false, false) != itemno)
171  elog(ERROR, "failed to add item of size %u to SPGiST index page",
172  tuple->size);
173 
174  if (tupstate == SPGIST_REDIRECT)
175  SpGistPageGetOpaque(page)->nRedirection++;
176  else if (tupstate == SPGIST_PLACEHOLDER)
177  SpGistPageGetOpaque(page)->nPlaceholder++;
178  }
179 }
void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
Definition: bufpage.c:1161
#define nitems(x)
Definition: indent.h:31
#define MaxIndexTuplesPerPage
Definition: itup.h:165
#define qsort(a, b, c, d)
Definition: port.h:445
static int cmpOffsetNumbers(const void *a, const void *b)
Definition: spgdoinsert.c:111

References cmpOffsetNumbers(), elog(), ERROR, i, MaxIndexTuplesPerPage, nitems, PageAddItem, PageIndexMultiDelete(), qsort, SpGistDeadTupleData::size, spgFormDeadTuple(), SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistPageGetOpaque, and SpGistDeadTupleData::tupstate.

Referenced by doPickSplit(), moveLeafs(), spgRedoMoveLeafs(), spgRedoPickSplit(), spgRedoVacuumLeaf(), and vacuumLeafPage().

◆ spgSplitNodeAction()

static void spgSplitNodeAction ( Relation  index,
SpGistState state,
SpGistInnerTuple  innerTuple,
SPPageDesc current,
spgChooseOut out 
)
static

Definition at line 1716 of file spgdoinsert.c.

1719 {
1720  SpGistInnerTuple prefixTuple,
1721  postfixTuple;
1722  SpGistNodeTuple node,
1723  *nodes;
1724  BlockNumber postfixBlkno;
1725  OffsetNumber postfixOffset;
1726  int i;
1727  spgxlogSplitTuple xlrec;
1728  Buffer newBuffer = InvalidBuffer;
1729 
1730  /* Should not be applied to nulls */
1731  Assert(!SpGistPageStoresNulls(current->page));
1732 
1733  /* Check opclass gave us sane values */
1734  if (out->result.splitTuple.prefixNNodes <= 0 ||
1735  out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
1736  elog(ERROR, "invalid number of prefix nodes: %d",
1737  out->result.splitTuple.prefixNNodes);
1738  if (out->result.splitTuple.childNodeN < 0 ||
1739  out->result.splitTuple.childNodeN >=
1740  out->result.splitTuple.prefixNNodes)
1741  elog(ERROR, "invalid child node number: %d",
1742  out->result.splitTuple.childNodeN);
1743 
1744  /*
1745  * Construct new prefix tuple with requested number of nodes. We'll fill
1746  * in the childNodeN'th node's downlink below.
1747  */
1748  nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
1749  out->result.splitTuple.prefixNNodes);
1750 
1751  for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
1752  {
1753  Datum label = (Datum) 0;
1754  bool labelisnull;
1755 
1756  labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
1757  if (!labelisnull)
1758  label = out->result.splitTuple.prefixNodeLabels[i];
1759  nodes[i] = spgFormNodeTuple(state, label, labelisnull);
1760  }
1761 
1762  prefixTuple = spgFormInnerTuple(state,
1763  out->result.splitTuple.prefixHasPrefix,
1764  out->result.splitTuple.prefixPrefixDatum,
1765  out->result.splitTuple.prefixNNodes,
1766  nodes);
1767 
1768  /* it must fit in the space that innerTuple now occupies */
1769  if (prefixTuple->size > innerTuple->size)
1770  elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1771 
1772  /*
1773  * Construct new postfix tuple, containing all nodes of innerTuple with
1774  * same node datums, but with the prefix specified by the picksplit
1775  * function.
1776  */
1777  nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1778  SGITITERATE(innerTuple, i, node)
1779  {
1780  nodes[i] = node;
1781  }
1782 
1783  postfixTuple = spgFormInnerTuple(state,
1784  out->result.splitTuple.postfixHasPrefix,
1785  out->result.splitTuple.postfixPrefixDatum,
1786  innerTuple->nNodes, nodes);
1787 
1788  /* Postfix tuple is allTheSame if original tuple was */
1789  postfixTuple->allTheSame = innerTuple->allTheSame;
1790 
1791  /* prep data for WAL record */
1792  xlrec.newPage = false;
1793 
1794  /*
1795  * If we can't fit both tuples on the current page, get a new page for the
1796  * postfix tuple. In particular, can't split to the root page.
1797  *
1798  * For the space calculation, note that prefixTuple replaces innerTuple
1799  * but postfixTuple will be a new entry.
1800  */
1801  if (SpGistBlockIsRoot(current->blkno) ||
1802  SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1803  prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1804  {
1805  /*
1806  * Choose page with next triple parity, because postfix tuple is a
1807  * child of prefix one
1808  */
1809  newBuffer = SpGistGetBuffer(index,
1810  GBUF_INNER_PARITY(current->blkno + 1),
1811  postfixTuple->size + sizeof(ItemIdData),
1812  &xlrec.newPage);
1813  }
1814 
1816 
1817  /*
1818  * Replace old tuple by prefix tuple
1819  */
1820  PageIndexTupleDelete(current->page, current->offnum);
1821  xlrec.offnumPrefix = PageAddItem(current->page,
1822  (Item) prefixTuple, prefixTuple->size,
1823  current->offnum, false, false);
1824  if (xlrec.offnumPrefix != current->offnum)
1825  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1826  prefixTuple->size);
1827 
1828  /*
1829  * put postfix tuple into appropriate page
1830  */
1831  if (newBuffer == InvalidBuffer)
1832  {
1833  postfixBlkno = current->blkno;
1834  xlrec.offnumPostfix = postfixOffset =
1835  SpGistPageAddNewItem(state, current->page,
1836  (Item) postfixTuple, postfixTuple->size,
1837  NULL, false);
1838  xlrec.postfixBlkSame = true;
1839  }
1840  else
1841  {
1842  postfixBlkno = BufferGetBlockNumber(newBuffer);
1843  xlrec.offnumPostfix = postfixOffset =
1845  (Item) postfixTuple, postfixTuple->size,
1846  NULL, false);
1847  MarkBufferDirty(newBuffer);
1848  xlrec.postfixBlkSame = false;
1849  }
1850 
1851  /*
1852  * And set downlink pointer in the prefix tuple to point to postfix tuple.
1853  * (We can't avoid this step by doing the above two steps in opposite
1854  * order, because there might not be enough space on the page to insert
1855  * the postfix tuple first.) We have to update the local copy of the
1856  * prefixTuple too, because that's what will be written to WAL.
1857  */
1858  spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1859  postfixBlkno, postfixOffset);
1860  prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1861  PageGetItemId(current->page, current->offnum));
1862  spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1863  postfixBlkno, postfixOffset);
1864 
1865  MarkBufferDirty(current->buffer);
1866 
1867  if (RelationNeedsWAL(index) && !state->isBuild)
1868  {
1869  XLogRecPtr recptr;
1870 
1871  XLogBeginInsert();
1872  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1873  XLogRegisterData((char *) prefixTuple, prefixTuple->size);
1874  XLogRegisterData((char *) postfixTuple, postfixTuple->size);
1875 
1877  if (newBuffer != InvalidBuffer)
1878  {
1879  int flags;
1880 
1881  flags = REGBUF_STANDARD;
1882  if (xlrec.newPage)
1883  flags |= REGBUF_WILL_INIT;
1884  XLogRegisterBuffer(1, newBuffer, flags);
1885  }
1886 
1887  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1888 
1889  PageSetLSN(current->page, recptr);
1890 
1891  if (newBuffer != InvalidBuffer)
1892  {
1893  PageSetLSN(BufferGetPage(newBuffer), recptr);
1894  }
1895  }
1896 
1897  END_CRIT_SECTION();
1898 
1899  /* Update local free-space cache and release buffer */
1900  if (newBuffer != InvalidBuffer)
1901  {
1902  SpGistSetLastUsedPage(index, newBuffer);
1903  UnlockReleaseBuffer(newBuffer);
1904  }
1905 }
#define SGITMAXNNODES
#define XLOG_SPGIST_SPLIT_TUPLE
Definition: spgxlog.h:25
struct spgChooseOut::@46::@49 splitTuple
OffsetNumber offnumPostfix
Definition: spgxlog.h:147
OffsetNumber offnumPrefix
Definition: spgxlog.h:144
bool postfixBlkSame
Definition: spgxlog.h:149

References SpGistInnerTupleData::allTheSame, Assert(), SPPageDesc::blkno, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage(), elog(), END_CRIT_SECTION, ERROR, GBUF_INNER_PARITY, i, InvalidBuffer, label, MarkBufferDirty(), spgxlogSplitTuple::newPage, SpGistInnerTupleData::nNodes, SPPageDesc::offnum, spgxlogSplitTuple::offnumPostfix, spgxlogSplitTuple::offnumPrefix, SPPageDesc::page, PageAddItem, PageGetItem(), PageGetItemId(), PageIndexTupleDelete(), PageSetLSN(), palloc(), spgxlogSplitTuple::postfixBlkSame, REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, spgChooseOut::result, SGITITERATE, SGITMAXNNODES, SpGistInnerTupleData::size, spgFormInnerTuple(), spgFormNodeTuple(), SpGistBlockIsRoot, SpGistGetBuffer(), SpGistPageAddNewItem(), SpGistPageGetFreeSpace, SpGistPageStoresNulls, SpGistSetLastUsedPage(), spgUpdateNodeLink(), spgChooseOut::splitTuple, START_CRIT_SECTION, UnlockReleaseBuffer(), XLOG_SPGIST_SPLIT_TUPLE, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

◆ spgUpdateNodeLink()

void spgUpdateNodeLink ( SpGistInnerTuple  tup,
int  nodeN,
BlockNumber  blkno,
OffsetNumber  offset 
)

Definition at line 51 of file spgdoinsert.c.

53 {
54  int i;
55  SpGistNodeTuple node;
56 
57  SGITITERATE(tup, i, node)
58  {
59  if (i == nodeN)
60  {
61  ItemPointerSet(&node->t_tid, blkno, offset);
62  return;
63  }
64  }
65 
66  elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
67  nodeN);
68 }

References elog(), ERROR, i, ItemPointerSet(), SGITITERATE, and IndexTupleData::t_tid.

Referenced by saveNodeLink(), spgRedoAddLeaf(), spgRedoAddNode(), spgRedoMoveLeafs(), spgRedoPickSplit(), and spgSplitNodeAction().