PostgreSQL Source Code  git master
spgdoinsert.c File Reference
#include "postgres.h"
#include "access/genam.h"
#include "access/spgist_private.h"
#include "access/spgxlog.h"
#include "access/xloginsert.h"
#include "common/int.h"
#include "common/pg_prng.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "utils/rel.h"
Include dependency graph for spgdoinsert.c:

Go to the source code of this file.

Data Structures

struct  SPPageDesc
 

Typedefs

typedef struct SPPageDesc SPPageDesc
 

Functions

void spgUpdateNodeLink (SpGistInnerTuple tup, int nodeN, BlockNumber blkno, OffsetNumber offset)
 
static SpGistInnerTuple addNode (SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
 
static int cmpOffsetNumbers (const void *a, const void *b)
 
void spgPageIndexMultiDelete (SpGistState *state, Page page, OffsetNumber *itemnos, int nitems, int firststate, int reststate, BlockNumber blkno, OffsetNumber offnum)
 
static void saveNodeLink (Relation index, SPPageDesc *parent, BlockNumber blkno, OffsetNumber offnum)
 
static void addLeafTuple (Relation index, SpGistState *state, SpGistLeafTuple leafTuple, SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
 
static int checkSplitConditions (Relation index, SpGistState *state, SPPageDesc *current, int *nToSplit)
 
static void moveLeafs (Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, bool isNulls)
 
static void setRedirectionTuple (SPPageDesc *current, OffsetNumber position, BlockNumber blkno, OffsetNumber offnum)
 
static bool checkAllTheSame (spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig, bool *includeNew)
 
static bool doPickSplit (Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, int level, bool isNulls, bool isNew)
 
static void spgMatchNodeAction (Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN)
 
static void spgAddNodeAction (Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN, Datum nodeLabel)
 
static void spgSplitNodeAction (Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, spgChooseOut *out)
 
bool spgdoinsert (Relation index, SpGistState *state, ItemPointer heapPtr, Datum *datums, bool *isnulls)
 

Typedef Documentation

◆ SPPageDesc

typedef struct SPPageDesc SPPageDesc

Function Documentation

◆ addLeafTuple()

static void addLeafTuple ( Relation  index,
SpGistState state,
SpGistLeafTuple  leafTuple,
SPPageDesc current,
SPPageDesc parent,
bool  isNulls,
bool  isNew 
)
static

Definition at line 203 of file spgdoinsert.c.

205 {
206  spgxlogAddLeaf xlrec;
207 
208  xlrec.newPage = isNew;
209  xlrec.storesNulls = isNulls;
210 
211  /* these will be filled below as needed */
215  xlrec.nodeI = 0;
216 
218 
219  if (current->offnum == InvalidOffsetNumber ||
220  SpGistBlockIsRoot(current->blkno))
221  {
222  /* Tuple is not part of a chain */
224  current->offnum = SpGistPageAddNewItem(state, current->page,
225  (Item) leafTuple, leafTuple->size,
226  NULL, false);
227 
228  xlrec.offnumLeaf = current->offnum;
229 
230  /* Must update parent's downlink if any */
231  if (parent->buffer != InvalidBuffer)
232  {
233  xlrec.offnumParent = parent->offnum;
234  xlrec.nodeI = parent->node;
235 
236  saveNodeLink(index, parent, current->blkno, current->offnum);
237  }
238  }
239  else
240  {
241  /*
242  * Tuple must be inserted into existing chain. We mustn't change the
243  * chain's head address, but we don't need to chase the entire chain
244  * to put the tuple at the end; we can insert it second.
245  *
246  * Also, it's possible that the "chain" consists only of a DEAD tuple,
247  * in which case we should replace the DEAD tuple in-place.
248  */
249  SpGistLeafTuple head;
250  OffsetNumber offnum;
251 
252  head = (SpGistLeafTuple) PageGetItem(current->page,
253  PageGetItemId(current->page, current->offnum));
254  if (head->tupstate == SPGIST_LIVE)
255  {
256  SGLT_SET_NEXTOFFSET(leafTuple, SGLT_GET_NEXTOFFSET(head));
257  offnum = SpGistPageAddNewItem(state, current->page,
258  (Item) leafTuple, leafTuple->size,
259  NULL, false);
260 
261  /*
262  * re-get head of list because it could have been moved on page,
263  * and set new second element
264  */
265  head = (SpGistLeafTuple) PageGetItem(current->page,
266  PageGetItemId(current->page, current->offnum));
267  SGLT_SET_NEXTOFFSET(head, offnum);
268 
269  xlrec.offnumLeaf = offnum;
270  xlrec.offnumHeadLeaf = current->offnum;
271  }
272  else if (head->tupstate == SPGIST_DEAD)
273  {
275  PageIndexTupleDelete(current->page, current->offnum);
276  if (PageAddItem(current->page,
277  (Item) leafTuple, leafTuple->size,
278  current->offnum, false, false) != current->offnum)
279  elog(ERROR, "failed to add item of size %u to SPGiST index page",
280  leafTuple->size);
281 
282  /* WAL replay distinguishes this case by equal offnums */
283  xlrec.offnumLeaf = current->offnum;
284  xlrec.offnumHeadLeaf = current->offnum;
285  }
286  else
287  elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
288  }
289 
290  MarkBufferDirty(current->buffer);
291 
292  if (RelationNeedsWAL(index) && !state->isBuild)
293  {
294  XLogRecPtr recptr;
295  int flags;
296 
297  XLogBeginInsert();
298  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
299  XLogRegisterData((char *) leafTuple, leafTuple->size);
300 
301  flags = REGBUF_STANDARD;
302  if (xlrec.newPage)
303  flags |= REGBUF_WILL_INIT;
304  XLogRegisterBuffer(0, current->buffer, flags);
305  if (xlrec.offnumParent != InvalidOffsetNumber)
307 
308  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
309 
310  PageSetLSN(current->page, recptr);
311 
312  /* update parent only if we actually changed it */
313  if (xlrec.offnumParent != InvalidOffsetNumber)
314  {
315  PageSetLSN(parent->page, recptr);
316  }
317  }
318 
320 }
#define InvalidBuffer
Definition: buf.h:25
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:2474
void PageIndexTupleDelete(Page page, OffsetNumber offnum)
Definition: bufpage.c:1052
static Item PageGetItem(Page page, ItemId itemId)
Definition: bufpage.h:351
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
Definition: bufpage.h:240
static void PageSetLSN(Page page, XLogRecPtr lsn)
Definition: bufpage.h:388
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:468
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
Pointer Item
Definition: item.h:17
#define START_CRIT_SECTION()
Definition: miscadmin.h:149
#define END_CRIT_SECTION()
Definition: miscadmin.h:151
#define InvalidOffsetNumber
Definition: off.h:26
uint16 OffsetNumber
Definition: off.h:24
#define RelationNeedsWAL(relation)
Definition: rel.h:628
static void saveNodeLink(Relation index, SPPageDesc *parent, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:186
#define SPGIST_LIVE
#define SGLT_GET_NEXTOFFSET(spgLeafTuple)
#define SPGIST_DEAD
#define SGLT_SET_NEXTOFFSET(spgLeafTuple, offsetNumber)
#define SpGistBlockIsRoot(blkno)
struct SpGistLeafTupleData * SpGistLeafTuple
OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size, OffsetNumber *startOffset, bool errorOK)
Definition: spgutils.c:1185
#define XLOG_SPGIST_ADD_LEAF
Definition: spgxlog.h:22
Buffer buffer
Definition: spgdoinsert.c:39
OffsetNumber offnum
Definition: spgdoinsert.c:41
BlockNumber blkno
Definition: spgdoinsert.c:38
unsigned int tupstate
Definition: type.h:95
uint16 nodeI
Definition: spgxlog.h:54
bool newPage
Definition: spgxlog.h:48
OffsetNumber offnumLeaf
Definition: spgxlog.h:50
bool storesNulls
Definition: spgxlog.h:49
OffsetNumber offnumHeadLeaf
Definition: spgxlog.h:51
OffsetNumber offnumParent
Definition: spgxlog.h:53
Definition: regguts.h:323
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void XLogRegisterData(char *data, uint32 len)
Definition: xloginsert.c:364
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:474
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:242
void XLogBeginInsert(void)
Definition: xloginsert.c:149
#define REGBUF_STANDARD
Definition: xloginsert.h:34
#define REGBUF_WILL_INIT
Definition: xloginsert.h:33

References SPPageDesc::blkno, SPPageDesc::buffer, elog, END_CRIT_SECTION, ERROR, InvalidBuffer, InvalidOffsetNumber, MarkBufferDirty(), spgxlogAddLeaf::newPage, SPPageDesc::node, spgxlogAddLeaf::nodeI, SPPageDesc::offnum, spgxlogAddLeaf::offnumHeadLeaf, spgxlogAddLeaf::offnumLeaf, spgxlogAddLeaf::offnumParent, SPPageDesc::page, PageAddItem, PageGetItem(), PageGetItemId(), PageIndexTupleDelete(), PageSetLSN(), REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, saveNodeLink(), SGLT_GET_NEXTOFFSET, SGLT_SET_NEXTOFFSET, SpGistLeafTupleData::size, SPGIST_DEAD, SPGIST_LIVE, SpGistBlockIsRoot, SpGistPageAddNewItem(), START_CRIT_SECTION, spgxlogAddLeaf::storesNulls, SpGistLeafTupleData::tupstate, XLOG_SPGIST_ADD_LEAF, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

◆ addNode()

static SpGistInnerTuple addNode ( SpGistState state,
SpGistInnerTuple  tuple,
Datum  label,
int  offset 
)
static

Definition at line 80 of file spgdoinsert.c.

81 {
82  SpGistNodeTuple node,
83  *nodes;
84  int i;
85 
86  /* if offset is negative, insert at end */
87  if (offset < 0)
88  offset = tuple->nNodes;
89  else if (offset > tuple->nNodes)
90  elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
91 
92  nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
93  SGITITERATE(tuple, i, node)
94  {
95  if (i < offset)
96  nodes[i] = node;
97  else
98  nodes[i + 1] = node;
99  }
100 
101  nodes[offset] = spgFormNodeTuple(state, label, false);
102 
103  return spgFormInnerTuple(state,
104  (tuple->prefixSize > 0),
105  SGITDATUM(tuple, state),
106  tuple->nNodes + 1,
107  nodes);
108 }
int i
Definition: isn.c:73
void * palloc(Size size)
Definition: mcxt.c:1316
static char * label
#define SGITDATUM(x, s)
#define SGITITERATE(x, i, nt)
SpGistInnerTuple spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix, int nNodes, SpGistNodeTuple *nodes)
Definition: spgutils.c:983
SpGistNodeTuple spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
Definition: spgutils.c:941
unsigned int prefixSize

References elog, ERROR, i, label, SpGistInnerTupleData::nNodes, palloc(), SpGistInnerTupleData::prefixSize, SGITDATUM, SGITITERATE, spgFormInnerTuple(), and spgFormNodeTuple().

Referenced by spgAddNodeAction().

◆ checkAllTheSame()

static bool checkAllTheSame ( spgPickSplitIn in,
spgPickSplitOut out,
bool  tooBig,
bool includeNew 
)
static

Definition at line 599 of file spgdoinsert.c.

601 {
602  int theNode;
603  int limit;
604  int i;
605 
606  /* For the moment, assume we can include the new leaf tuple */
607  *includeNew = true;
608 
609  /* If there's only the new leaf tuple, don't select allTheSame mode */
610  if (in->nTuples <= 1)
611  return false;
612 
613  /* If tuple set doesn't fit on one page, ignore the new tuple in test */
614  limit = tooBig ? in->nTuples - 1 : in->nTuples;
615 
616  /* Check to see if more than one node is populated */
617  theNode = out->mapTuplesToNodes[0];
618  for (i = 1; i < limit; i++)
619  {
620  if (out->mapTuplesToNodes[i] != theNode)
621  return false;
622  }
623 
624  /* Nope, so override the picksplit function's decisions */
625 
626  /* If the new tuple is in its own node, it can't be included in split */
627  if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
628  *includeNew = false;
629 
630  out->nNodes = 8; /* arbitrary number of child nodes */
631 
632  /* Random assignment of tuples to nodes (note we include new tuple) */
633  for (i = 0; i < in->nTuples; i++)
634  out->mapTuplesToNodes[i] = i % out->nNodes;
635 
636  /* The opclass may not use node labels, but if it does, duplicate 'em */
637  if (out->nodeLabels)
638  {
639  Datum theLabel = out->nodeLabels[theNode];
640 
641  out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
642  for (i = 0; i < out->nNodes; i++)
643  out->nodeLabels[i] = theLabel;
644  }
645 
646  /* We don't touch the prefix or the leaf tuple datum assignments */
647 
648  return true;
649 }
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
uintptr_t Datum
Definition: postgres.h:64
int * mapTuplesToNodes
Definition: spgist.h:125
Datum * nodeLabels
Definition: spgist.h:123

References i, if(), spgPickSplitOut::mapTuplesToNodes, spgPickSplitOut::nNodes, spgPickSplitOut::nodeLabels, spgPickSplitIn::nTuples, and palloc().

Referenced by doPickSplit().

◆ checkSplitConditions()

static int checkSplitConditions ( Relation  index,
SpGistState state,
SPPageDesc current,
int *  nToSplit 
)
static

Definition at line 333 of file spgdoinsert.c.

335 {
336  int i,
337  n = 0,
338  totalSize = 0;
339 
340  if (SpGistBlockIsRoot(current->blkno))
341  {
342  /* return impossible values to force split */
343  *nToSplit = BLCKSZ;
344  return BLCKSZ;
345  }
346 
347  i = current->offnum;
348  while (i != InvalidOffsetNumber)
349  {
350  SpGistLeafTuple it;
351 
353  i <= PageGetMaxOffsetNumber(current->page));
354  it = (SpGistLeafTuple) PageGetItem(current->page,
355  PageGetItemId(current->page, i));
356  if (it->tupstate == SPGIST_LIVE)
357  {
358  n++;
359  totalSize += it->size + sizeof(ItemIdData);
360  }
361  else if (it->tupstate == SPGIST_DEAD)
362  {
363  /* We could see a DEAD tuple as first/only chain item */
364  Assert(i == current->offnum);
366  /* Don't count it in result, because it won't go to other page */
367  }
368  else
369  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
370 
371  i = SGLT_GET_NEXTOFFSET(it);
372  }
373 
374  *nToSplit = n;
375 
376  return totalSize;
377 }
static OffsetNumber PageGetMaxOffsetNumber(Page page)
Definition: bufpage.h:369
#define Assert(condition)
Definition: c.h:858
struct ItemIdData ItemIdData
#define FirstOffsetNumber
Definition: off.h:27

References Assert, SPPageDesc::blkno, elog, ERROR, FirstOffsetNumber, i, InvalidOffsetNumber, SPPageDesc::offnum, SPPageDesc::page, PageGetItem(), PageGetItemId(), PageGetMaxOffsetNumber(), SGLT_GET_NEXTOFFSET, SpGistLeafTupleData::size, SPGIST_DEAD, SPGIST_LIVE, SpGistBlockIsRoot, and SpGistLeafTupleData::tupstate.

Referenced by spgdoinsert().

◆ cmpOffsetNumbers()

static int cmpOffsetNumbers ( const void *  a,
const void *  b 
)
static

Definition at line 112 of file spgdoinsert.c.

113 {
114  return pg_cmp_u16(*(const OffsetNumber *) a, *(const OffsetNumber *) b);
115 }
static int pg_cmp_u16(uint16 a, uint16 b)
Definition: int.h:477
int b
Definition: isn.c:70
int a
Definition: isn.c:69

References a, b, and pg_cmp_u16().

Referenced by spgPageIndexMultiDelete().

◆ doPickSplit()

static bool doPickSplit ( Relation  index,
SpGistState state,
SPPageDesc current,
SPPageDesc parent,
SpGistLeafTuple  newLeafTuple,
int  level,
bool  isNulls,
bool  isNew 
)
static

Definition at line 677 of file spgdoinsert.c.

681 {
682  bool insertedNew = false;
683  spgPickSplitIn in;
684  spgPickSplitOut out;
685  FmgrInfo *procinfo;
686  bool includeNew;
687  int i,
688  max,
689  n;
690  SpGistInnerTuple innerTuple;
691  SpGistNodeTuple node,
692  *nodes;
693  Buffer newInnerBuffer,
694  newLeafBuffer;
695  uint8 *leafPageSelect;
696  int *leafSizes;
697  OffsetNumber *toDelete;
698  OffsetNumber *toInsert;
699  OffsetNumber redirectTuplePos = InvalidOffsetNumber;
700  OffsetNumber startOffsets[2];
701  SpGistLeafTuple *oldLeafs;
702  SpGistLeafTuple *newLeafs;
703  Datum leafDatums[INDEX_MAX_KEYS];
704  bool leafIsnulls[INDEX_MAX_KEYS];
705  int spaceToDelete;
706  int currentFreeSpace;
707  int totalLeafSizes;
708  bool allTheSame;
709  spgxlogPickSplit xlrec;
710  char *leafdata,
711  *leafptr;
712  SPPageDesc saveCurrent;
713  int nToDelete,
714  nToInsert,
715  maxToInclude;
716 
717  in.level = level;
718 
719  /*
720  * Allocate per-leaf-tuple work arrays with max possible size
721  */
722  max = PageGetMaxOffsetNumber(current->page);
723  n = max + 1;
724  in.datums = (Datum *) palloc(sizeof(Datum) * n);
725  toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
726  toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
727  oldLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
728  newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
729  leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
730 
731  STORE_STATE(state, xlrec.stateSrc);
732 
733  /*
734  * Form list of leaf tuples which will be distributed as split result;
735  * also, count up the amount of space that will be freed from current.
736  * (Note that in the non-root case, we won't actually delete the old
737  * tuples, only replace them with redirects or placeholders.)
738  */
739  nToInsert = 0;
740  nToDelete = 0;
741  spaceToDelete = 0;
742  if (SpGistBlockIsRoot(current->blkno))
743  {
744  /*
745  * We are splitting the root (which up to now is also a leaf page).
746  * Its tuples are not linked, so scan sequentially to get them all. We
747  * ignore the original value of current->offnum.
748  */
749  for (i = FirstOffsetNumber; i <= max; i++)
750  {
751  SpGistLeafTuple it;
752 
753  it = (SpGistLeafTuple) PageGetItem(current->page,
754  PageGetItemId(current->page, i));
755  if (it->tupstate == SPGIST_LIVE)
756  {
757  in.datums[nToInsert] =
758  isNulls ? (Datum) 0 : SGLTDATUM(it, state);
759  oldLeafs[nToInsert] = it;
760  nToInsert++;
761  toDelete[nToDelete] = i;
762  nToDelete++;
763  /* we will delete the tuple altogether, so count full space */
764  spaceToDelete += it->size + sizeof(ItemIdData);
765  }
766  else /* tuples on root should be live */
767  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
768  }
769  }
770  else
771  {
772  /* Normal case, just collect the leaf tuples in the chain */
773  i = current->offnum;
774  while (i != InvalidOffsetNumber)
775  {
776  SpGistLeafTuple it;
777 
778  Assert(i >= FirstOffsetNumber && i <= max);
779  it = (SpGistLeafTuple) PageGetItem(current->page,
780  PageGetItemId(current->page, i));
781  if (it->tupstate == SPGIST_LIVE)
782  {
783  in.datums[nToInsert] =
784  isNulls ? (Datum) 0 : SGLTDATUM(it, state);
785  oldLeafs[nToInsert] = it;
786  nToInsert++;
787  toDelete[nToDelete] = i;
788  nToDelete++;
789  /* we will not delete the tuple, only replace with dead */
790  Assert(it->size >= SGDTSIZE);
791  spaceToDelete += it->size - SGDTSIZE;
792  }
793  else if (it->tupstate == SPGIST_DEAD)
794  {
795  /* We could see a DEAD tuple as first/only chain item */
796  Assert(i == current->offnum);
798  toDelete[nToDelete] = i;
799  nToDelete++;
800  /* replacing it with redirect will save no space */
801  }
802  else
803  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
804 
805  i = SGLT_GET_NEXTOFFSET(it);
806  }
807  }
808  in.nTuples = nToInsert;
809 
810  /*
811  * We may not actually insert new tuple because another picksplit may be
812  * necessary due to too large value, but we will try to allocate enough
813  * space to include it; and in any case it has to be included in the input
814  * for the picksplit function. So don't increment nToInsert yet.
815  */
816  in.datums[in.nTuples] =
817  isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state);
818  oldLeafs[in.nTuples] = newLeafTuple;
819  in.nTuples++;
820 
821  memset(&out, 0, sizeof(out));
822 
823  if (!isNulls)
824  {
825  /*
826  * Perform split using user-defined method.
827  */
829  FunctionCall2Coll(procinfo,
830  index->rd_indcollation[0],
831  PointerGetDatum(&in),
832  PointerGetDatum(&out));
833 
834  /*
835  * Form new leaf tuples and count up the total space needed.
836  */
837  totalLeafSizes = 0;
838  for (i = 0; i < in.nTuples; i++)
839  {
840  if (state->leafTupDesc->natts > 1)
841  spgDeformLeafTuple(oldLeafs[i],
842  state->leafTupDesc,
843  leafDatums,
844  leafIsnulls,
845  isNulls);
846 
847  leafDatums[spgKeyColumn] = out.leafTupleDatums[i];
848  leafIsnulls[spgKeyColumn] = false;
849 
850  newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
851  leafDatums,
852  leafIsnulls);
853  totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
854  }
855  }
856  else
857  {
858  /*
859  * Perform dummy split that puts all tuples into one node.
860  * checkAllTheSame will override this and force allTheSame mode.
861  */
862  out.hasPrefix = false;
863  out.nNodes = 1;
864  out.nodeLabels = NULL;
865  out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
866 
867  /*
868  * Form new leaf tuples and count up the total space needed.
869  */
870  totalLeafSizes = 0;
871  for (i = 0; i < in.nTuples; i++)
872  {
873  if (state->leafTupDesc->natts > 1)
874  spgDeformLeafTuple(oldLeafs[i],
875  state->leafTupDesc,
876  leafDatums,
877  leafIsnulls,
878  isNulls);
879 
880  /*
881  * Nulls tree can contain only null key values.
882  */
883  leafDatums[spgKeyColumn] = (Datum) 0;
884  leafIsnulls[spgKeyColumn] = true;
885 
886  newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
887  leafDatums,
888  leafIsnulls);
889  totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
890  }
891  }
892 
893  /*
894  * Check to see if the picksplit function failed to separate the values,
895  * ie, it put them all into the same child node. If so, select allTheSame
896  * mode and create a random split instead. See comments for
897  * checkAllTheSame as to why we need to know if the new leaf tuples could
898  * fit on one page.
899  */
900  allTheSame = checkAllTheSame(&in, &out,
901  totalLeafSizes > SPGIST_PAGE_CAPACITY,
902  &includeNew);
903 
904  /*
905  * If checkAllTheSame decided we must exclude the new tuple, don't
906  * consider it any further.
907  */
908  if (includeNew)
909  maxToInclude = in.nTuples;
910  else
911  {
912  maxToInclude = in.nTuples - 1;
913  totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
914  }
915 
916  /*
917  * Allocate per-node work arrays. Since checkAllTheSame could replace
918  * out.nNodes with a value larger than the number of tuples on the input
919  * page, we can't allocate these arrays before here.
920  */
921  nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
922  leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
923 
924  /*
925  * Form nodes of inner tuple and inner tuple itself
926  */
927  for (i = 0; i < out.nNodes; i++)
928  {
929  Datum label = (Datum) 0;
930  bool labelisnull = (out.nodeLabels == NULL);
931 
932  if (!labelisnull)
933  label = out.nodeLabels[i];
934  nodes[i] = spgFormNodeTuple(state, label, labelisnull);
935  }
936  innerTuple = spgFormInnerTuple(state,
937  out.hasPrefix, out.prefixDatum,
938  out.nNodes, nodes);
939  innerTuple->allTheSame = allTheSame;
940 
941  /*
942  * Update nodes[] array to point into the newly formed innerTuple, so that
943  * we can adjust their downlinks below.
944  */
945  SGITITERATE(innerTuple, i, node)
946  {
947  nodes[i] = node;
948  }
949 
950  /*
951  * Re-scan new leaf tuples and count up the space needed under each node.
952  */
953  for (i = 0; i < maxToInclude; i++)
954  {
955  n = out.mapTuplesToNodes[i];
956  if (n < 0 || n >= out.nNodes)
957  elog(ERROR, "inconsistent result of SPGiST picksplit function");
958  leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
959  }
960 
961  /*
962  * To perform the split, we must insert a new inner tuple, which can't go
963  * on a leaf page; and unless we are splitting the root page, we must then
964  * update the parent tuple's downlink to point to the inner tuple. If
965  * there is room, we'll put the new inner tuple on the same page as the
966  * parent tuple, otherwise we need another non-leaf buffer. But if the
967  * parent page is the root, we can't add the new inner tuple there,
968  * because the root page must have only one inner tuple.
969  */
970  xlrec.initInner = false;
971  if (parent->buffer != InvalidBuffer &&
972  !SpGistBlockIsRoot(parent->blkno) &&
973  (SpGistPageGetFreeSpace(parent->page, 1) >=
974  innerTuple->size + sizeof(ItemIdData)))
975  {
976  /* New inner tuple will fit on parent page */
977  newInnerBuffer = parent->buffer;
978  }
979  else if (parent->buffer != InvalidBuffer)
980  {
981  /* Send tuple to page with next triple parity (see README) */
982  newInnerBuffer = SpGistGetBuffer(index,
983  GBUF_INNER_PARITY(parent->blkno + 1) |
984  (isNulls ? GBUF_NULLS : 0),
985  innerTuple->size + sizeof(ItemIdData),
986  &xlrec.initInner);
987  }
988  else
989  {
990  /* Root page split ... inner tuple will go to root page */
991  newInnerBuffer = InvalidBuffer;
992  }
993 
994  /*
995  * The new leaf tuples converted from the existing ones should require the
996  * same or less space, and therefore should all fit onto one page
997  * (although that's not necessarily the current page, since we can't
998  * delete the old tuples but only replace them with placeholders).
999  * However, the incoming new tuple might not also fit, in which case we
1000  * might need another picksplit cycle to reduce it some more.
1001  *
1002  * If there's not room to put everything back onto the current page, then
1003  * we decide on a per-node basis which tuples go to the new page. (We do
1004  * it like that because leaf tuple chains can't cross pages, so we must
1005  * place all leaf tuples belonging to the same parent node on the same
1006  * page.)
1007  *
1008  * If we are splitting the root page (turning it from a leaf page into an
1009  * inner page), then no leaf tuples can go back to the current page; they
1010  * must all go somewhere else.
1011  */
1012  if (!SpGistBlockIsRoot(current->blkno))
1013  currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
1014  else
1015  currentFreeSpace = 0; /* prevent assigning any tuples to current */
1016 
1017  xlrec.initDest = false;
1018 
1019  if (totalLeafSizes <= currentFreeSpace)
1020  {
1021  /* All the leaf tuples will fit on current page */
1022  newLeafBuffer = InvalidBuffer;
1023  /* mark new leaf tuple as included in insertions, if allowed */
1024  if (includeNew)
1025  {
1026  nToInsert++;
1027  insertedNew = true;
1028  }
1029  for (i = 0; i < nToInsert; i++)
1030  leafPageSelect[i] = 0; /* signifies current page */
1031  }
1032  else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1033  {
1034  /*
1035  * We're trying to split up a long value by repeated suffixing, but
1036  * it's not going to fit yet. Don't bother allocating a second leaf
1037  * buffer that we won't be able to use.
1038  */
1039  newLeafBuffer = InvalidBuffer;
1040  Assert(includeNew);
1041  Assert(nToInsert == 0);
1042  }
1043  else
1044  {
1045  /* We will need another leaf page */
1046  uint8 *nodePageSelect;
1047  int curspace;
1048  int newspace;
1049 
1050  newLeafBuffer = SpGistGetBuffer(index,
1051  GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1052  Min(totalLeafSizes,
1054  &xlrec.initDest);
1055 
1056  /*
1057  * Attempt to assign node groups to the two pages. We might fail to
1058  * do so, even if totalLeafSizes is less than the available space,
1059  * because we can't split a group across pages.
1060  */
1061  nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1062 
1063  curspace = currentFreeSpace;
1064  newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1065  for (i = 0; i < out.nNodes; i++)
1066  {
1067  if (leafSizes[i] <= curspace)
1068  {
1069  nodePageSelect[i] = 0; /* signifies current page */
1070  curspace -= leafSizes[i];
1071  }
1072  else
1073  {
1074  nodePageSelect[i] = 1; /* signifies new leaf page */
1075  newspace -= leafSizes[i];
1076  }
1077  }
1078  if (curspace >= 0 && newspace >= 0)
1079  {
1080  /* Successful assignment, so we can include the new leaf tuple */
1081  if (includeNew)
1082  {
1083  nToInsert++;
1084  insertedNew = true;
1085  }
1086  }
1087  else if (includeNew)
1088  {
1089  /* We must exclude the new leaf tuple from the split */
1090  int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1091 
1092  leafSizes[nodeOfNewTuple] -=
1093  newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1094 
1095  /* Repeat the node assignment process --- should succeed now */
1096  curspace = currentFreeSpace;
1097  newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1098  for (i = 0; i < out.nNodes; i++)
1099  {
1100  if (leafSizes[i] <= curspace)
1101  {
1102  nodePageSelect[i] = 0; /* signifies current page */
1103  curspace -= leafSizes[i];
1104  }
1105  else
1106  {
1107  nodePageSelect[i] = 1; /* signifies new leaf page */
1108  newspace -= leafSizes[i];
1109  }
1110  }
1111  if (curspace < 0 || newspace < 0)
1112  elog(ERROR, "failed to divide leaf tuple groups across pages");
1113  }
1114  else
1115  {
1116  /* oops, we already excluded new tuple ... should not get here */
1117  elog(ERROR, "failed to divide leaf tuple groups across pages");
1118  }
1119  /* Expand the per-node assignments to be shown per leaf tuple */
1120  for (i = 0; i < nToInsert; i++)
1121  {
1122  n = out.mapTuplesToNodes[i];
1123  leafPageSelect[i] = nodePageSelect[n];
1124  }
1125  }
1126 
1127  /* Start preparing WAL record */
1128  xlrec.nDelete = 0;
1129  xlrec.initSrc = isNew;
1130  xlrec.storesNulls = isNulls;
1131  xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1132 
1133  leafdata = leafptr = (char *) palloc(totalLeafSizes);
1134 
1135  /* Here we begin making the changes to the target pages */
1137 
1138  /*
1139  * Delete old leaf tuples from current buffer, except when we're splitting
1140  * the root; in that case there's no need because we'll re-init the page
1141  * below. We do this first to make room for reinserting new leaf tuples.
1142  */
1143  if (!SpGistBlockIsRoot(current->blkno))
1144  {
1145  /*
1146  * Init buffer instead of deleting individual tuples, but only if
1147  * there aren't any other live tuples and only during build; otherwise
1148  * we need to set a redirection tuple for concurrent scans.
1149  */
1150  if (state->isBuild &&
1151  nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1152  PageGetMaxOffsetNumber(current->page))
1153  {
1154  SpGistInitBuffer(current->buffer,
1155  SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1156  xlrec.initSrc = true;
1157  }
1158  else if (isNew)
1159  {
1160  /* don't expose the freshly init'd buffer as a backup block */
1161  Assert(nToDelete == 0);
1162  }
1163  else
1164  {
1165  xlrec.nDelete = nToDelete;
1166 
1167  if (!state->isBuild)
1168  {
1169  /*
1170  * Need to create redirect tuple (it will point to new inner
1171  * tuple) but right now the new tuple's location is not known
1172  * yet. So, set the redirection pointer to "impossible" value
1173  * and remember its position to update tuple later.
1174  */
1175  if (nToDelete > 0)
1176  redirectTuplePos = toDelete[0];
1178  toDelete, nToDelete,
1183  }
1184  else
1185  {
1186  /*
1187  * During index build there is not concurrent searches, so we
1188  * don't need to create redirection tuple.
1189  */
1191  toDelete, nToDelete,
1196  }
1197  }
1198  }
1199 
1200  /*
1201  * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1202  * nodes.
1203  */
1204  startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1205  for (i = 0; i < nToInsert; i++)
1206  {
1207  SpGistLeafTuple it = newLeafs[i];
1208  Buffer leafBuffer;
1209  BlockNumber leafBlock;
1210  OffsetNumber newoffset;
1211 
1212  /* Which page is it going to? */
1213  leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1214  leafBlock = BufferGetBlockNumber(leafBuffer);
1215 
1216  /* Link tuple into correct chain for its node */
1217  n = out.mapTuplesToNodes[i];
1218 
1219  if (ItemPointerIsValid(&nodes[n]->t_tid))
1220  {
1221  Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1222  SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
1223  }
1224  else
1226 
1227  /* Insert it on page */
1228  newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1229  (Item) it, it->size,
1230  &startOffsets[leafPageSelect[i]],
1231  false);
1232  toInsert[i] = newoffset;
1233 
1234  /* ... and complete the chain linking */
1235  ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1236 
1237  /* Also copy leaf tuple into WAL data */
1238  memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1239  leafptr += newLeafs[i]->size;
1240  }
1241 
1242  /*
1243  * We're done modifying the other leaf buffer (if any), so mark it dirty.
1244  * current->buffer will be marked below, after we're entirely done
1245  * modifying it.
1246  */
1247  if (newLeafBuffer != InvalidBuffer)
1248  {
1249  MarkBufferDirty(newLeafBuffer);
1250  }
1251 
1252  /* Remember current buffer, since we're about to change "current" */
1253  saveCurrent = *current;
1254 
1255  /*
1256  * Store the new innerTuple
1257  */
1258  if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1259  {
1260  /*
1261  * new inner tuple goes to parent page
1262  */
1263  Assert(current->buffer != parent->buffer);
1264 
1265  /* Repoint "current" at the new inner tuple */
1266  current->blkno = parent->blkno;
1267  current->buffer = parent->buffer;
1268  current->page = parent->page;
1269  xlrec.offnumInner = current->offnum =
1270  SpGistPageAddNewItem(state, current->page,
1271  (Item) innerTuple, innerTuple->size,
1272  NULL, false);
1273 
1274  /*
1275  * Update parent node link and mark parent page dirty
1276  */
1277  xlrec.innerIsParent = true;
1278  xlrec.offnumParent = parent->offnum;
1279  xlrec.nodeI = parent->node;
1280  saveNodeLink(index, parent, current->blkno, current->offnum);
1281 
1282  /*
1283  * Update redirection link (in old current buffer)
1284  */
1285  if (redirectTuplePos != InvalidOffsetNumber)
1286  setRedirectionTuple(&saveCurrent, redirectTuplePos,
1287  current->blkno, current->offnum);
1288 
1289  /* Done modifying old current buffer, mark it dirty */
1290  MarkBufferDirty(saveCurrent.buffer);
1291  }
1292  else if (parent->buffer != InvalidBuffer)
1293  {
1294  /*
1295  * new inner tuple will be stored on a new page
1296  */
1297  Assert(newInnerBuffer != InvalidBuffer);
1298 
1299  /* Repoint "current" at the new inner tuple */
1300  current->buffer = newInnerBuffer;
1301  current->blkno = BufferGetBlockNumber(current->buffer);
1302  current->page = BufferGetPage(current->buffer);
1303  xlrec.offnumInner = current->offnum =
1304  SpGistPageAddNewItem(state, current->page,
1305  (Item) innerTuple, innerTuple->size,
1306  NULL, false);
1307 
1308  /* Done modifying new current buffer, mark it dirty */
1309  MarkBufferDirty(current->buffer);
1310 
1311  /*
1312  * Update parent node link and mark parent page dirty
1313  */
1314  xlrec.innerIsParent = (parent->buffer == current->buffer);
1315  xlrec.offnumParent = parent->offnum;
1316  xlrec.nodeI = parent->node;
1317  saveNodeLink(index, parent, current->blkno, current->offnum);
1318 
1319  /*
1320  * Update redirection link (in old current buffer)
1321  */
1322  if (redirectTuplePos != InvalidOffsetNumber)
1323  setRedirectionTuple(&saveCurrent, redirectTuplePos,
1324  current->blkno, current->offnum);
1325 
1326  /* Done modifying old current buffer, mark it dirty */
1327  MarkBufferDirty(saveCurrent.buffer);
1328  }
1329  else
1330  {
1331  /*
1332  * Splitting root page, which was a leaf but now becomes inner page
1333  * (and so "current" continues to point at it)
1334  */
1335  Assert(SpGistBlockIsRoot(current->blkno));
1336  Assert(redirectTuplePos == InvalidOffsetNumber);
1337 
1338  SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1339  xlrec.initInner = true;
1340  xlrec.innerIsParent = false;
1341 
1342  xlrec.offnumInner = current->offnum =
1343  PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1344  InvalidOffsetNumber, false, false);
1345  if (current->offnum != FirstOffsetNumber)
1346  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1347  innerTuple->size);
1348 
1349  /* No parent link to update, nor redirection to do */
1351  xlrec.nodeI = 0;
1352 
1353  /* Done modifying new current buffer, mark it dirty */
1354  MarkBufferDirty(current->buffer);
1355 
1356  /* saveCurrent doesn't represent a different buffer */
1357  saveCurrent.buffer = InvalidBuffer;
1358  }
1359 
1360  if (RelationNeedsWAL(index) && !state->isBuild)
1361  {
1362  XLogRecPtr recptr;
1363  int flags;
1364 
1365  XLogBeginInsert();
1366 
1367  xlrec.nInsert = nToInsert;
1368  XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
1369 
1370  XLogRegisterData((char *) toDelete,
1371  sizeof(OffsetNumber) * xlrec.nDelete);
1372  XLogRegisterData((char *) toInsert,
1373  sizeof(OffsetNumber) * xlrec.nInsert);
1374  XLogRegisterData((char *) leafPageSelect,
1375  sizeof(uint8) * xlrec.nInsert);
1376  XLogRegisterData((char *) innerTuple, innerTuple->size);
1377  XLogRegisterData(leafdata, leafptr - leafdata);
1378 
1379  /* Old leaf page */
1380  if (BufferIsValid(saveCurrent.buffer))
1381  {
1382  flags = REGBUF_STANDARD;
1383  if (xlrec.initSrc)
1384  flags |= REGBUF_WILL_INIT;
1385  XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1386  }
1387 
1388  /* New leaf page */
1389  if (BufferIsValid(newLeafBuffer))
1390  {
1391  flags = REGBUF_STANDARD;
1392  if (xlrec.initDest)
1393  flags |= REGBUF_WILL_INIT;
1394  XLogRegisterBuffer(1, newLeafBuffer, flags);
1395  }
1396 
1397  /* Inner page */
1398  flags = REGBUF_STANDARD;
1399  if (xlrec.initInner)
1400  flags |= REGBUF_WILL_INIT;
1401  XLogRegisterBuffer(2, current->buffer, flags);
1402 
1403  /* Parent page, if different from inner page */
1404  if (parent->buffer != InvalidBuffer)
1405  {
1406  if (parent->buffer != current->buffer)
1408  else
1409  Assert(xlrec.innerIsParent);
1410  }
1411 
1412  /* Issue the WAL record */
1413  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1414 
1415  /* Update page LSNs on all affected pages */
1416  if (newLeafBuffer != InvalidBuffer)
1417  {
1418  Page page = BufferGetPage(newLeafBuffer);
1419 
1420  PageSetLSN(page, recptr);
1421  }
1422 
1423  if (saveCurrent.buffer != InvalidBuffer)
1424  {
1425  Page page = BufferGetPage(saveCurrent.buffer);
1426 
1427  PageSetLSN(page, recptr);
1428  }
1429 
1430  PageSetLSN(current->page, recptr);
1431 
1432  if (parent->buffer != InvalidBuffer)
1433  {
1434  PageSetLSN(parent->page, recptr);
1435  }
1436  }
1437 
1438  END_CRIT_SECTION();
1439 
1440  /* Update local free-space cache and unlock buffers */
1441  if (newLeafBuffer != InvalidBuffer)
1442  {
1443  SpGistSetLastUsedPage(index, newLeafBuffer);
1444  UnlockReleaseBuffer(newLeafBuffer);
1445  }
1446  if (saveCurrent.buffer != InvalidBuffer)
1447  {
1448  SpGistSetLastUsedPage(index, saveCurrent.buffer);
1449  UnlockReleaseBuffer(saveCurrent.buffer);
1450  }
1451 
1452  return insertedNew;
1453 }
uint32 BlockNumber
Definition: block.h:31
#define InvalidBlockNumber
Definition: block.h:33
int Buffer
Definition: buf.h:23
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:3667
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4867
static Page BufferGetPage(Buffer buffer)
Definition: bufmgr.h:408
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:359
Size PageGetExactFreeSpace(Page page)
Definition: bufpage.c:958
Pointer Page
Definition: bufpage.h:78
#define Min(x, y)
Definition: c.h:1004
unsigned char uint8
Definition: c.h:504
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1149
FmgrInfo * index_getprocinfo(Relation irel, AttrNumber attnum, uint16 procnum)
Definition: indexam.c:861
static void ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum)
Definition: itemptr.h:135
static OffsetNumber ItemPointerGetOffsetNumber(const ItemPointerData *pointer)
Definition: itemptr.h:124
static BlockNumber ItemPointerGetBlockNumber(const ItemPointerData *pointer)
Definition: itemptr.h:103
static bool ItemPointerIsValid(const ItemPointerData *pointer)
Definition: itemptr.h:83
void * palloc0(Size size)
Definition: mcxt.c:1346
#define INDEX_MAX_KEYS
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
static pg_noinline void Size size
Definition: slab.c:607
static void setRedirectionTuple(SPPageDesc *current, OffsetNumber position, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:568
static bool checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig, bool *includeNew)
Definition: spgdoinsert.c:599
void spgPageIndexMultiDelete(SpGistState *state, Page page, OffsetNumber *itemnos, int nitems, int firststate, int reststate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:131
#define SPGIST_PICKSPLIT_PROC
Definition: spgist.h:25
#define SGLTDATUM(x, s)
#define GBUF_NULLS
#define SPGIST_REDIRECT
#define SGDTSIZE
#define SPGIST_PLACEHOLDER
#define GBUF_LEAF
#define SPGIST_METAPAGE_BLKNO
#define SPGIST_NULLS
#define STORE_STATE(s, d)
#define SPGIST_PAGE_CAPACITY
#define SpGistPageGetOpaque(page)
#define GBUF_INNER_PARITY(x)
#define SPGIST_LEAF
#define spgKeyColumn
#define SpGistPageGetFreeSpace(p, n)
SpGistLeafTuple spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, const Datum *datums, const bool *isnulls)
Definition: spgutils.c:852
void spgDeformLeafTuple(SpGistLeafTuple tup, TupleDesc tupleDescriptor, Datum *datums, bool *isnulls, bool keyColumnIsNull)
Definition: spgutils.c:1097
void SpGistInitBuffer(Buffer b, uint16 f)
Definition: spgutils.c:703
Buffer SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
Definition: spgutils.c:550
void SpGistSetLastUsedPage(Relation index, Buffer buffer)
Definition: spgutils.c:654
#define SizeOfSpgxlogPickSplit
Definition: spgxlog.h:199
#define XLOG_SPGIST_PICKSPLIT
Definition: spgxlog.h:26
Definition: fmgr.h:57
unsigned int allTheSame
Datum * datums
Definition: spgist.h:113
bool hasPrefix
Definition: spgist.h:119
Datum * leafTupleDatums
Definition: spgist.h:126
Datum prefixDatum
Definition: spgist.h:120
uint16 nInsert
Definition: spgxlog.h:170
spgxlogState stateSrc
Definition: spgxlog.h:185
bool innerIsParent
Definition: spgxlog.h:181
uint16 nodeI
Definition: spgxlog.h:183
bool storesNulls
Definition: spgxlog.h:178
OffsetNumber offnumParent
Definition: spgxlog.h:182
OffsetNumber offnumInner
Definition: spgxlog.h:175
uint16 nDelete
Definition: spgxlog.h:169
bool isRootSplit
Definition: spgxlog.h:167

References SpGistInnerTupleData::allTheSame, Assert, SPPageDesc::blkno, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage(), BufferIsValid(), checkAllTheSame(), spgPickSplitIn::datums, elog, END_CRIT_SECTION, ERROR, FirstOffsetNumber, FunctionCall2Coll(), GBUF_INNER_PARITY, GBUF_LEAF, GBUF_NULLS, spgPickSplitOut::hasPrefix, i, index_getprocinfo(), INDEX_MAX_KEYS, spgxlogPickSplit::initDest, spgxlogPickSplit::initInner, spgxlogPickSplit::initSrc, spgxlogPickSplit::innerIsParent, InvalidBlockNumber, InvalidBuffer, InvalidOffsetNumber, spgxlogPickSplit::isRootSplit, ItemPointerGetBlockNumber(), ItemPointerGetOffsetNumber(), ItemPointerIsValid(), ItemPointerSet(), label, spgPickSplitOut::leafTupleDatums, spgPickSplitIn::level, spgPickSplitOut::mapTuplesToNodes, MarkBufferDirty(), Min, spgxlogPickSplit::nDelete, spgxlogPickSplit::nInsert, spgPickSplitOut::nNodes, SPPageDesc::node, spgxlogPickSplit::nodeI, spgPickSplitOut::nodeLabels, spgPickSplitIn::nTuples, SPPageDesc::offnum, spgxlogPickSplit::offnumInner, spgxlogPickSplit::offnumParent, SPPageDesc::page, PageAddItem, PageGetExactFreeSpace(), PageGetItem(), PageGetItemId(), PageGetMaxOffsetNumber(), PageSetLSN(), palloc(), palloc0(), PointerGetDatum(), spgPickSplitOut::prefixDatum, REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, saveNodeLink(), setRedirectionTuple(), SGDTSIZE, SGITITERATE, SGLT_GET_NEXTOFFSET, SGLT_SET_NEXTOFFSET, SGLTDATUM, size, SpGistInnerTupleData::size, SpGistLeafTupleData::size, SizeOfSpgxlogPickSplit, spgDeformLeafTuple(), spgFormInnerTuple(), spgFormLeafTuple(), spgFormNodeTuple(), SPGIST_DEAD, SPGIST_LEAF, SPGIST_LIVE, SPGIST_METAPAGE_BLKNO, SPGIST_NULLS, SPGIST_PAGE_CAPACITY, SPGIST_PICKSPLIT_PROC, SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistBlockIsRoot, SpGistGetBuffer(), SpGistInitBuffer(), SpGistPageAddNewItem(), SpGistPageGetFreeSpace, SpGistPageGetOpaque, SpGistSetLastUsedPage(), spgKeyColumn, spgPageIndexMultiDelete(), START_CRIT_SECTION, spgxlogPickSplit::stateSrc, STORE_STATE, spgxlogPickSplit::storesNulls, SpGistLeafTupleData::tupstate, UnlockReleaseBuffer(), XLOG_SPGIST_PICKSPLIT, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

◆ moveLeafs()

static void moveLeafs ( Relation  index,
SpGistState state,
SPPageDesc current,
SPPageDesc parent,
SpGistLeafTuple  newLeafTuple,
bool  isNulls 
)
static

Definition at line 387 of file spgdoinsert.c.

390 {
391  int i,
392  nDelete,
393  nInsert,
394  size;
395  Buffer nbuf;
396  Page npage;
398  startOffset = InvalidOffsetNumber;
399  bool replaceDead = false;
400  OffsetNumber *toDelete;
401  OffsetNumber *toInsert;
402  BlockNumber nblkno;
403  spgxlogMoveLeafs xlrec;
404  char *leafdata,
405  *leafptr;
406 
407  /* This doesn't work on root page */
408  Assert(parent->buffer != InvalidBuffer);
409  Assert(parent->buffer != current->buffer);
410 
411  /* Locate the tuples to be moved, and count up the space needed */
412  i = PageGetMaxOffsetNumber(current->page);
413  toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
414  toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
415 
416  size = newLeafTuple->size + sizeof(ItemIdData);
417 
418  nDelete = 0;
419  i = current->offnum;
420  while (i != InvalidOffsetNumber)
421  {
422  SpGistLeafTuple it;
423 
425  i <= PageGetMaxOffsetNumber(current->page));
426  it = (SpGistLeafTuple) PageGetItem(current->page,
427  PageGetItemId(current->page, i));
428 
429  if (it->tupstate == SPGIST_LIVE)
430  {
431  toDelete[nDelete] = i;
432  size += it->size + sizeof(ItemIdData);
433  nDelete++;
434  }
435  else if (it->tupstate == SPGIST_DEAD)
436  {
437  /* We could see a DEAD tuple as first/only chain item */
438  Assert(i == current->offnum);
440  /* We don't want to move it, so don't count it in size */
441  toDelete[nDelete] = i;
442  nDelete++;
443  replaceDead = true;
444  }
445  else
446  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
447 
448  i = SGLT_GET_NEXTOFFSET(it);
449  }
450 
451  /* Find a leaf page that will hold them */
452  nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
453  size, &xlrec.newPage);
454  npage = BufferGetPage(nbuf);
455  nblkno = BufferGetBlockNumber(nbuf);
456  Assert(nblkno != current->blkno);
457 
458  leafdata = leafptr = palloc(size);
459 
461 
462  /* copy all the old tuples to new page, unless they're dead */
463  nInsert = 0;
464  if (!replaceDead)
465  {
466  for (i = 0; i < nDelete; i++)
467  {
468  SpGistLeafTuple it;
469 
470  it = (SpGistLeafTuple) PageGetItem(current->page,
471  PageGetItemId(current->page, toDelete[i]));
472  Assert(it->tupstate == SPGIST_LIVE);
473 
474  /*
475  * Update chain link (notice the chain order gets reversed, but we
476  * don't care). We're modifying the tuple on the source page
477  * here, but it's okay since we're about to delete it.
478  */
479  SGLT_SET_NEXTOFFSET(it, r);
480 
481  r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
482  &startOffset, false);
483 
484  toInsert[nInsert] = r;
485  nInsert++;
486 
487  /* save modified tuple into leafdata as well */
488  memcpy(leafptr, it, it->size);
489  leafptr += it->size;
490  }
491  }
492 
493  /* add the new tuple as well */
494  SGLT_SET_NEXTOFFSET(newLeafTuple, r);
495  r = SpGistPageAddNewItem(state, npage,
496  (Item) newLeafTuple, newLeafTuple->size,
497  &startOffset, false);
498  toInsert[nInsert] = r;
499  nInsert++;
500  memcpy(leafptr, newLeafTuple, newLeafTuple->size);
501  leafptr += newLeafTuple->size;
502 
503  /*
504  * Now delete the old tuples, leaving a redirection pointer behind for the
505  * first one, unless we're doing an index build; in which case there can't
506  * be any concurrent scan so we need not provide a redirect.
507  */
508  spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
511  nblkno, r);
512 
513  /* Update parent's downlink and mark parent page dirty */
514  saveNodeLink(index, parent, nblkno, r);
515 
516  /* Mark the leaf pages too */
517  MarkBufferDirty(current->buffer);
518  MarkBufferDirty(nbuf);
519 
520  if (RelationNeedsWAL(index) && !state->isBuild)
521  {
522  XLogRecPtr recptr;
523 
524  /* prepare WAL info */
525  STORE_STATE(state, xlrec.stateSrc);
526 
527  xlrec.nMoves = nDelete;
528  xlrec.replaceDead = replaceDead;
529  xlrec.storesNulls = isNulls;
530 
531  xlrec.offnumParent = parent->offnum;
532  xlrec.nodeI = parent->node;
533 
534  XLogBeginInsert();
535  XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
536  XLogRegisterData((char *) toDelete,
537  sizeof(OffsetNumber) * nDelete);
538  XLogRegisterData((char *) toInsert,
539  sizeof(OffsetNumber) * nInsert);
540  XLogRegisterData((char *) leafdata, leafptr - leafdata);
541 
545 
546  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
547 
548  PageSetLSN(current->page, recptr);
549  PageSetLSN(npage, recptr);
550  PageSetLSN(parent->page, recptr);
551  }
552 
554 
555  /* Update local free-space cache and release new buffer */
557  UnlockReleaseBuffer(nbuf);
558 }
#define SizeOfSpgxlogMoveLeafs
Definition: spgxlog.h:91
#define XLOG_SPGIST_MOVE_LEAFS
Definition: spgxlog.h:23
bool replaceDead
Definition: spgxlog.h:68
bool storesNulls
Definition: spgxlog.h:69
uint16 nMoves
Definition: spgxlog.h:66
uint16 nodeI
Definition: spgxlog.h:73
OffsetNumber offnumParent
Definition: spgxlog.h:72
spgxlogState stateSrc
Definition: spgxlog.h:75

References Assert, SPPageDesc::blkno, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage(), elog, END_CRIT_SECTION, ERROR, FirstOffsetNumber, GBUF_LEAF, GBUF_NULLS, i, InvalidBuffer, InvalidOffsetNumber, MarkBufferDirty(), spgxlogMoveLeafs::newPage, spgxlogMoveLeafs::nMoves, SPPageDesc::node, spgxlogMoveLeafs::nodeI, SPPageDesc::offnum, spgxlogMoveLeafs::offnumParent, SPPageDesc::page, PageGetItem(), PageGetItemId(), PageGetMaxOffsetNumber(), PageSetLSN(), palloc(), REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, spgxlogMoveLeafs::replaceDead, saveNodeLink(), SGLT_GET_NEXTOFFSET, SGLT_SET_NEXTOFFSET, size, SpGistLeafTupleData::size, SizeOfSpgxlogMoveLeafs, SPGIST_DEAD, SPGIST_LIVE, SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistGetBuffer(), SpGistPageAddNewItem(), SpGistSetLastUsedPage(), spgPageIndexMultiDelete(), START_CRIT_SECTION, spgxlogMoveLeafs::stateSrc, STORE_STATE, spgxlogMoveLeafs::storesNulls, SpGistLeafTupleData::tupstate, UnlockReleaseBuffer(), XLOG_SPGIST_MOVE_LEAFS, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

◆ saveNodeLink()

static void saveNodeLink ( Relation  index,
SPPageDesc parent,
BlockNumber  blkno,
OffsetNumber  offnum 
)
static

Definition at line 186 of file spgdoinsert.c.

188 {
189  SpGistInnerTuple innerTuple;
190 
191  innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
192  PageGetItemId(parent->page, parent->offnum));
193 
194  spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
195 
196  MarkBufferDirty(parent->buffer);
197 }
void spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN, BlockNumber blkno, OffsetNumber offset)
Definition: spgdoinsert.c:52
SpGistInnerTupleData * SpGistInnerTuple

References SPPageDesc::buffer, MarkBufferDirty(), SPPageDesc::node, SPPageDesc::offnum, SPPageDesc::page, PageGetItem(), PageGetItemId(), and spgUpdateNodeLink().

Referenced by addLeafTuple(), doPickSplit(), moveLeafs(), and spgAddNodeAction().

◆ setRedirectionTuple()

static void setRedirectionTuple ( SPPageDesc current,
OffsetNumber  position,
BlockNumber  blkno,
OffsetNumber  offnum 
)
static

◆ spgAddNodeAction()

static void spgAddNodeAction ( Relation  index,
SpGistState state,
SpGistInnerTuple  innerTuple,
SPPageDesc current,
SPPageDesc parent,
int  nodeN,
Datum  nodeLabel 
)
static

Definition at line 1513 of file spgdoinsert.c.

1517 {
1518  SpGistInnerTuple newInnerTuple;
1519  spgxlogAddNode xlrec;
1520 
1521  /* Should not be applied to nulls */
1522  Assert(!SpGistPageStoresNulls(current->page));
1523 
1524  /* Construct new inner tuple with additional node */
1525  newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1526 
1527  /* Prepare WAL record */
1528  STORE_STATE(state, xlrec.stateSrc);
1529  xlrec.offnum = current->offnum;
1530 
1531  /* we don't fill these unless we need to change the parent downlink */
1532  xlrec.parentBlk = -1;
1534  xlrec.nodeI = 0;
1535 
1536  /* we don't fill these unless tuple has to be moved */
1538  xlrec.newPage = false;
1539 
1540  if (PageGetExactFreeSpace(current->page) >=
1541  newInnerTuple->size - innerTuple->size)
1542  {
1543  /*
1544  * We can replace the inner tuple by new version in-place
1545  */
1547 
1548  PageIndexTupleDelete(current->page, current->offnum);
1549  if (PageAddItem(current->page,
1550  (Item) newInnerTuple, newInnerTuple->size,
1551  current->offnum, false, false) != current->offnum)
1552  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1553  newInnerTuple->size);
1554 
1555  MarkBufferDirty(current->buffer);
1556 
1557  if (RelationNeedsWAL(index) && !state->isBuild)
1558  {
1559  XLogRecPtr recptr;
1560 
1561  XLogBeginInsert();
1562  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1563  XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1564 
1566 
1567  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1568 
1569  PageSetLSN(current->page, recptr);
1570  }
1571 
1572  END_CRIT_SECTION();
1573  }
1574  else
1575  {
1576  /*
1577  * move inner tuple to another page, and update parent
1578  */
1579  SpGistDeadTuple dt;
1580  SPPageDesc saveCurrent;
1581 
1582  /*
1583  * It should not be possible to get here for the root page, since we
1584  * allow only one inner tuple on the root page, and spgFormInnerTuple
1585  * always checks that inner tuples don't exceed the size of a page.
1586  */
1587  if (SpGistBlockIsRoot(current->blkno))
1588  elog(ERROR, "cannot enlarge root tuple any more");
1589  Assert(parent->buffer != InvalidBuffer);
1590 
1591  saveCurrent = *current;
1592 
1593  xlrec.offnumParent = parent->offnum;
1594  xlrec.nodeI = parent->node;
1595 
1596  /*
1597  * obtain new buffer with the same parity as current, since it will be
1598  * a child of same parent tuple
1599  */
1600  current->buffer = SpGistGetBuffer(index,
1601  GBUF_INNER_PARITY(current->blkno),
1602  newInnerTuple->size + sizeof(ItemIdData),
1603  &xlrec.newPage);
1604  current->blkno = BufferGetBlockNumber(current->buffer);
1605  current->page = BufferGetPage(current->buffer);
1606 
1607  /*
1608  * Let's just make real sure new current isn't same as old. Right now
1609  * that's impossible, but if SpGistGetBuffer ever got smart enough to
1610  * delete placeholder tuples before checking space, maybe it wouldn't
1611  * be impossible. The case would appear to work except that WAL
1612  * replay would be subtly wrong, so I think a mere assert isn't enough
1613  * here.
1614  */
1615  if (current->blkno == saveCurrent.blkno)
1616  elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1617 
1618  /*
1619  * New current and parent buffer will both be modified; but note that
1620  * parent buffer could be same as either new or old current.
1621  */
1622  if (parent->buffer == saveCurrent.buffer)
1623  xlrec.parentBlk = 0;
1624  else if (parent->buffer == current->buffer)
1625  xlrec.parentBlk = 1;
1626  else
1627  xlrec.parentBlk = 2;
1628 
1630 
1631  /* insert new ... */
1632  xlrec.offnumNew = current->offnum =
1633  SpGistPageAddNewItem(state, current->page,
1634  (Item) newInnerTuple, newInnerTuple->size,
1635  NULL, false);
1636 
1637  MarkBufferDirty(current->buffer);
1638 
1639  /* update parent's downlink and mark parent page dirty */
1640  saveNodeLink(index, parent, current->blkno, current->offnum);
1641 
1642  /*
1643  * Replace old tuple with a placeholder or redirection tuple. Unless
1644  * doing an index build, we have to insert a redirection tuple for
1645  * possible concurrent scans. We can't just delete it in any case,
1646  * because that could change the offsets of other tuples on the page,
1647  * breaking downlinks from their parents.
1648  */
1649  if (state->isBuild)
1652  else
1654  current->blkno, current->offnum);
1655 
1656  PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1657  if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1658  saveCurrent.offnum,
1659  false, false) != saveCurrent.offnum)
1660  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1661  dt->size);
1662 
1663  if (state->isBuild)
1664  SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1665  else
1666  SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1667 
1668  MarkBufferDirty(saveCurrent.buffer);
1669 
1670  if (RelationNeedsWAL(index) && !state->isBuild)
1671  {
1672  XLogRecPtr recptr;
1673  int flags;
1674 
1675  XLogBeginInsert();
1676 
1677  /* orig page */
1678  XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1679  /* new page */
1680  flags = REGBUF_STANDARD;
1681  if (xlrec.newPage)
1682  flags |= REGBUF_WILL_INIT;
1683  XLogRegisterBuffer(1, current->buffer, flags);
1684  /* parent page (if different from orig and new) */
1685  if (xlrec.parentBlk == 2)
1687 
1688  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1689  XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1690 
1691  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1692 
1693  /* we don't bother to check if any of these are redundant */
1694  PageSetLSN(current->page, recptr);
1695  PageSetLSN(parent->page, recptr);
1696  PageSetLSN(saveCurrent.page, recptr);
1697  }
1698 
1699  END_CRIT_SECTION();
1700 
1701  /* Release saveCurrent if it's not same as current or parent */
1702  if (saveCurrent.buffer != current->buffer &&
1703  saveCurrent.buffer != parent->buffer)
1704  {
1705  SpGistSetLastUsedPage(index, saveCurrent.buffer);
1706  UnlockReleaseBuffer(saveCurrent.buffer);
1707  }
1708  }
1709 }
static SpGistInnerTuple addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
Definition: spgdoinsert.c:80
#define SpGistPageStoresNulls(page)
SpGistDeadTuple spgFormDeadTuple(SpGistState *state, int tupstate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgutils.c:1066
#define XLOG_SPGIST_ADD_NODE
Definition: spgxlog.h:24
OffsetNumber offnumNew
Definition: spgxlog.h:111
bool newPage
Definition: spgxlog.h:112
OffsetNumber offnumParent
Definition: spgxlog.h:126
OffsetNumber offnum
Definition: spgxlog.h:105
spgxlogState stateSrc
Definition: spgxlog.h:130
uint16 nodeI
Definition: spgxlog.h:128
int8 parentBlk
Definition: spgxlog.h:125

References addNode(), Assert, SPPageDesc::blkno, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage(), elog, END_CRIT_SECTION, ERROR, GBUF_INNER_PARITY, InvalidBlockNumber, InvalidBuffer, InvalidOffsetNumber, MarkBufferDirty(), spgxlogAddNode::newPage, SPPageDesc::node, spgxlogAddNode::nodeI, SPPageDesc::offnum, spgxlogAddNode::offnum, spgxlogAddNode::offnumNew, spgxlogAddNode::offnumParent, SPPageDesc::page, PageAddItem, PageGetExactFreeSpace(), PageIndexTupleDelete(), PageSetLSN(), spgxlogAddNode::parentBlk, REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, saveNodeLink(), SpGistInnerTupleData::size, SpGistDeadTupleData::size, spgFormDeadTuple(), SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistBlockIsRoot, SpGistGetBuffer(), SpGistPageAddNewItem(), SpGistPageGetOpaque, SpGistPageStoresNulls, SpGistSetLastUsedPage(), START_CRIT_SECTION, spgxlogAddNode::stateSrc, STORE_STATE, UnlockReleaseBuffer(), XLOG_SPGIST_ADD_NODE, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

◆ spgdoinsert()

bool spgdoinsert ( Relation  index,
SpGistState state,
ItemPointer  heapPtr,
Datum datums,
bool isnulls 
)

Definition at line 1914 of file spgdoinsert.c.

1916 {
1917  bool result = true;
1918  TupleDesc leafDescriptor = state->leafTupDesc;
1919  bool isnull = isnulls[spgKeyColumn];
1920  int level = 0;
1921  Datum leafDatums[INDEX_MAX_KEYS];
1922  int leafSize;
1923  int bestLeafSize;
1924  int numNoProgressCycles = 0;
1925  SPPageDesc current,
1926  parent;
1927  FmgrInfo *procinfo = NULL;
1928 
1929  /*
1930  * Look up FmgrInfo of the user-defined choose function once, to save
1931  * cycles in the loop below.
1932  */
1933  if (!isnull)
1934  procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1935 
1936  /*
1937  * Prepare the leaf datum to insert.
1938  *
1939  * If an optional "compress" method is provided, then call it to form the
1940  * leaf key datum from the input datum. Otherwise, store the input datum
1941  * as is. Since we don't use index_form_tuple in this AM, we have to make
1942  * sure value to be inserted is not toasted; FormIndexDatum doesn't
1943  * guarantee that. But we assume the "compress" method to return an
1944  * untoasted value.
1945  */
1946  if (!isnull)
1947  {
1949  {
1950  FmgrInfo *compressProcinfo = NULL;
1951 
1952  compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
1953  leafDatums[spgKeyColumn] =
1954  FunctionCall1Coll(compressProcinfo,
1955  index->rd_indcollation[spgKeyColumn],
1956  datums[spgKeyColumn]);
1957  }
1958  else
1959  {
1960  Assert(state->attLeafType.type == state->attType.type);
1961 
1962  if (state->attType.attlen == -1)
1963  leafDatums[spgKeyColumn] =
1965  else
1966  leafDatums[spgKeyColumn] = datums[spgKeyColumn];
1967  }
1968  }
1969  else
1970  leafDatums[spgKeyColumn] = (Datum) 0;
1971 
1972  /* Likewise, ensure that any INCLUDE values are not toasted */
1973  for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++)
1974  {
1975  if (!isnulls[i])
1976  {
1977  if (TupleDescAttr(leafDescriptor, i)->attlen == -1)
1978  leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i]));
1979  else
1980  leafDatums[i] = datums[i];
1981  }
1982  else
1983  leafDatums[i] = (Datum) 0;
1984  }
1985 
1986  /*
1987  * Compute space needed for a leaf tuple containing the given data.
1988  */
1989  leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls);
1990  /* Account for an item pointer, too */
1991  leafSize += sizeof(ItemIdData);
1992 
1993  /*
1994  * If it isn't gonna fit, and the opclass can't reduce the datum size by
1995  * suffixing, bail out now rather than doing a lot of useless work.
1996  */
1997  if (leafSize > SPGIST_PAGE_CAPACITY &&
1998  (isnull || !state->config.longValuesOK))
1999  ereport(ERROR,
2000  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2001  errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2002  leafSize - sizeof(ItemIdData),
2003  SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2005  errhint("Values larger than a buffer page cannot be indexed.")));
2006  bestLeafSize = leafSize;
2007 
2008  /* Initialize "current" to the appropriate root page */
2009  current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
2010  current.buffer = InvalidBuffer;
2011  current.page = NULL;
2012  current.offnum = FirstOffsetNumber;
2013  current.node = -1;
2014 
2015  /* "parent" is invalid for the moment */
2016  parent.blkno = InvalidBlockNumber;
2017  parent.buffer = InvalidBuffer;
2018  parent.page = NULL;
2019  parent.offnum = InvalidOffsetNumber;
2020  parent.node = -1;
2021 
2022  /*
2023  * Before entering the loop, try to clear any pending interrupt condition.
2024  * If a query cancel is pending, we might as well accept it now not later;
2025  * while if a non-canceling condition is pending, servicing it here avoids
2026  * having to restart the insertion and redo all the work so far.
2027  */
2029 
2030  for (;;)
2031  {
2032  bool isNew = false;
2033 
2034  /*
2035  * Bail out if query cancel is pending. We must have this somewhere
2036  * in the loop since a broken opclass could produce an infinite
2037  * picksplit loop. However, because we'll be holding buffer lock(s)
2038  * after the first iteration, ProcessInterrupts() wouldn't be able to
2039  * throw a cancel error here. Hence, if we see that an interrupt is
2040  * pending, break out of the loop and deal with the situation below.
2041  * Set result = false because we must restart the insertion if the
2042  * interrupt isn't a query-cancel-or-die case.
2043  */
2045  {
2046  result = false;
2047  break;
2048  }
2049 
2050  if (current.blkno == InvalidBlockNumber)
2051  {
2052  /*
2053  * Create a leaf page. If leafSize is too large to fit on a page,
2054  * we won't actually use the page yet, but it simplifies the API
2055  * for doPickSplit to always have a leaf page at hand; so just
2056  * quietly limit our request to a page size.
2057  */
2058  current.buffer =
2060  GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
2061  Min(leafSize, SPGIST_PAGE_CAPACITY),
2062  &isNew);
2063  current.blkno = BufferGetBlockNumber(current.buffer);
2064  }
2065  else if (parent.buffer == InvalidBuffer)
2066  {
2067  /* we hold no parent-page lock, so no deadlock is possible */
2068  current.buffer = ReadBuffer(index, current.blkno);
2070  }
2071  else if (current.blkno != parent.blkno)
2072  {
2073  /* descend to a new child page */
2074  current.buffer = ReadBuffer(index, current.blkno);
2075 
2076  /*
2077  * Attempt to acquire lock on child page. We must beware of
2078  * deadlock against another insertion process descending from that
2079  * page to our parent page (see README). If we fail to get lock,
2080  * abandon the insertion and tell our caller to start over.
2081  *
2082  * XXX this could be improved, because failing to get lock on a
2083  * buffer is not proof of a deadlock situation; the lock might be
2084  * held by a reader, or even just background writer/checkpointer
2085  * process. Perhaps it'd be worth retrying after sleeping a bit?
2086  */
2087  if (!ConditionalLockBuffer(current.buffer))
2088  {
2089  ReleaseBuffer(current.buffer);
2090  UnlockReleaseBuffer(parent.buffer);
2091  return false;
2092  }
2093  }
2094  else
2095  {
2096  /* inner tuple can be stored on the same page as parent one */
2097  current.buffer = parent.buffer;
2098  }
2099  current.page = BufferGetPage(current.buffer);
2100 
2101  /* should not arrive at a page of the wrong type */
2102  if (isnull ? !SpGistPageStoresNulls(current.page) :
2103  SpGistPageStoresNulls(current.page))
2104  elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2105  current.blkno);
2106 
2107  if (SpGistPageIsLeaf(current.page))
2108  {
2109  SpGistLeafTuple leafTuple;
2110  int nToSplit,
2111  sizeToSplit;
2112 
2113  leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls);
2114  if (leafTuple->size + sizeof(ItemIdData) <=
2115  SpGistPageGetFreeSpace(current.page, 1))
2116  {
2117  /* it fits on page, so insert it and we're done */
2118  addLeafTuple(index, state, leafTuple,
2119  &current, &parent, isnull, isNew);
2120  break;
2121  }
2122  else if ((sizeToSplit =
2123  checkSplitConditions(index, state, &current,
2124  &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
2125  nToSplit < 64 &&
2126  leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2127  {
2128  /*
2129  * the amount of data is pretty small, so just move the whole
2130  * chain to another leaf page rather than splitting it.
2131  */
2132  Assert(!isNew);
2133  moveLeafs(index, state, &current, &parent, leafTuple, isnull);
2134  break; /* we're done */
2135  }
2136  else
2137  {
2138  /* picksplit */
2139  if (doPickSplit(index, state, &current, &parent,
2140  leafTuple, level, isnull, isNew))
2141  break; /* doPickSplit installed new tuples */
2142 
2143  /* leaf tuple will not be inserted yet */
2144  pfree(leafTuple);
2145 
2146  /*
2147  * current now describes new inner tuple, go insert into it
2148  */
2149  Assert(!SpGistPageIsLeaf(current.page));
2150  goto process_inner_tuple;
2151  }
2152  }
2153  else /* non-leaf page */
2154  {
2155  /*
2156  * Apply the opclass choose function to figure out how to insert
2157  * the given datum into the current inner tuple.
2158  */
2159  SpGistInnerTuple innerTuple;
2160  spgChooseIn in;
2161  spgChooseOut out;
2162 
2163  /*
2164  * spgAddNode and spgSplitTuple cases will loop back to here to
2165  * complete the insertion operation. Just in case the choose
2166  * function is broken and produces add or split requests
2167  * repeatedly, check for query cancel (see comments above).
2168  */
2169  process_inner_tuple:
2171  {
2172  result = false;
2173  break;
2174  }
2175 
2176  innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2177  PageGetItemId(current.page, current.offnum));
2178 
2179  in.datum = datums[spgKeyColumn];
2180  in.leafDatum = leafDatums[spgKeyColumn];
2181  in.level = level;
2182  in.allTheSame = innerTuple->allTheSame;
2183  in.hasPrefix = (innerTuple->prefixSize > 0);
2184  in.prefixDatum = SGITDATUM(innerTuple, state);
2185  in.nNodes = innerTuple->nNodes;
2186  in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2187 
2188  memset(&out, 0, sizeof(out));
2189 
2190  if (!isnull)
2191  {
2192  /* use user-defined choose method */
2193  FunctionCall2Coll(procinfo,
2194  index->rd_indcollation[0],
2195  PointerGetDatum(&in),
2196  PointerGetDatum(&out));
2197  }
2198  else
2199  {
2200  /* force "match" action (to insert to random subnode) */
2201  out.resultType = spgMatchNode;
2202  }
2203 
2204  if (innerTuple->allTheSame)
2205  {
2206  /*
2207  * It's not allowed to do an AddNode at an allTheSame tuple.
2208  * Opclass must say "match", in which case we choose a random
2209  * one of the nodes to descend into, or "split".
2210  */
2211  if (out.resultType == spgAddNode)
2212  elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2213  else if (out.resultType == spgMatchNode)
2214  out.result.matchNode.nodeN =
2216  0, innerTuple->nNodes - 1);
2217  }
2218 
2219  switch (out.resultType)
2220  {
2221  case spgMatchNode:
2222  /* Descend to N'th child node */
2223  spgMatchNodeAction(index, state, innerTuple,
2224  &current, &parent,
2225  out.result.matchNode.nodeN);
2226  /* Adjust level as per opclass request */
2227  level += out.result.matchNode.levelAdd;
2228  /* Replace leafDatum and recompute leafSize */
2229  if (!isnull)
2230  {
2231  leafDatums[spgKeyColumn] = out.result.matchNode.restDatum;
2232  leafSize = SpGistGetLeafTupleSize(leafDescriptor,
2233  leafDatums, isnulls);
2234  leafSize += sizeof(ItemIdData);
2235  }
2236 
2237  /*
2238  * Check new tuple size; fail if it can't fit, unless the
2239  * opclass says it can handle the situation by suffixing.
2240  *
2241  * However, the opclass can only shorten the leaf datum,
2242  * which may not be enough to ever make the tuple fit,
2243  * since INCLUDE columns might alone use more than a page.
2244  * Depending on the opclass' behavior, that could lead to
2245  * an infinite loop --- spgtextproc.c, for example, will
2246  * just repeatedly generate an empty-string leaf datum
2247  * once it runs out of data. Actual bugs in opclasses
2248  * might cause infinite looping, too. To detect such a
2249  * loop, check to see if we are making progress by
2250  * reducing the leafSize in each pass. This is a bit
2251  * tricky though. Because of alignment considerations,
2252  * the total tuple size might not decrease on every pass.
2253  * Also, there are edge cases where the choose method
2254  * might seem to not make progress for a cycle or two.
2255  * Somewhat arbitrarily, we allow up to 10 no-progress
2256  * iterations before failing. (This limit should be more
2257  * than MAXALIGN, to accommodate opclasses that trim one
2258  * byte from the leaf datum per pass.)
2259  */
2260  if (leafSize > SPGIST_PAGE_CAPACITY)
2261  {
2262  bool ok = false;
2263 
2264  if (state->config.longValuesOK && !isnull)
2265  {
2266  if (leafSize < bestLeafSize)
2267  {
2268  ok = true;
2269  bestLeafSize = leafSize;
2270  numNoProgressCycles = 0;
2271  }
2272  else if (++numNoProgressCycles < 10)
2273  ok = true;
2274  }
2275  if (!ok)
2276  ereport(ERROR,
2277  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2278  errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2279  leafSize - sizeof(ItemIdData),
2280  SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2282  errhint("Values larger than a buffer page cannot be indexed.")));
2283  }
2284 
2285  /*
2286  * Loop around and attempt to insert the new leafDatum at
2287  * "current" (which might reference an existing child
2288  * tuple, or might be invalid to force us to find a new
2289  * page for the tuple).
2290  */
2291  break;
2292  case spgAddNode:
2293  /* AddNode is not sensible if nodes don't have labels */
2294  if (in.nodeLabels == NULL)
2295  elog(ERROR, "cannot add a node to an inner tuple without node labels");
2296  /* Add node to inner tuple, per request */
2297  spgAddNodeAction(index, state, innerTuple,
2298  &current, &parent,
2299  out.result.addNode.nodeN,
2300  out.result.addNode.nodeLabel);
2301 
2302  /*
2303  * Retry insertion into the enlarged node. We assume that
2304  * we'll get a MatchNode result this time.
2305  */
2306  goto process_inner_tuple;
2307  break;
2308  case spgSplitTuple:
2309  /* Split inner tuple, per request */
2310  spgSplitNodeAction(index, state, innerTuple,
2311  &current, &out);
2312 
2313  /* Retry insertion into the split node */
2314  goto process_inner_tuple;
2315  break;
2316  default:
2317  elog(ERROR, "unrecognized SPGiST choose result: %d",
2318  (int) out.resultType);
2319  break;
2320  }
2321  }
2322  } /* end loop */
2323 
2324  /*
2325  * Release any buffers we're still holding. Beware of possibility that
2326  * current and parent reference same buffer.
2327  */
2328  if (current.buffer != InvalidBuffer)
2329  {
2331  UnlockReleaseBuffer(current.buffer);
2332  }
2333  if (parent.buffer != InvalidBuffer &&
2334  parent.buffer != current.buffer)
2335  {
2337  UnlockReleaseBuffer(parent.buffer);
2338  }
2339 
2340  /*
2341  * We do not support being called while some outer function is holding a
2342  * buffer lock (or any other reason to postpone query cancels). If that
2343  * were the case, telling the caller to retry would create an infinite
2344  * loop.
2345  */
2347 
2348  /*
2349  * Finally, check for interrupts again. If there was a query cancel,
2350  * ProcessInterrupts() will be able to throw the error here. If it was
2351  * some other kind of interrupt that can just be cleared, return false to
2352  * tell our caller to retry.
2353  */
2355 
2356  return result;
2357 }
bool ConditionalLockBuffer(Buffer buffer)
Definition: bufmgr.c:5111
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4850
void LockBuffer(Buffer buffer, int mode)
Definition: bufmgr.c:5085
Buffer ReadBuffer(Relation reln, BlockNumber blockNum)
Definition: bufmgr.c:745
#define BUFFER_LOCK_EXCLUSIVE
Definition: bufmgr.h:199
#define OidIsValid(objectId)
Definition: c.h:775
int errhint(const char *fmt,...)
Definition: elog.c:1319
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define ereport(elevel,...)
Definition: elog.h:149
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
Definition: fmgr.c:1129
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:240
RegProcedure index_getprocid(Relation irel, AttrNumber attnum, uint16 procnum)
Definition: indexam.c:827
void pfree(void *pointer)
Definition: mcxt.c:1520
#define INTERRUPTS_CAN_BE_PROCESSED()
Definition: miscadmin.h:129
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define INTERRUPTS_PENDING_CONDITION()
Definition: miscadmin.h:113
uint64 pg_prng_uint64_range(pg_prng_state *state, uint64 rmin, uint64 rmax)
Definition: pg_prng.c:144
pg_prng_state pg_global_prng_state
Definition: pg_prng.c:34
#define RelationGetRelationName(relation)
Definition: rel.h:539
static void spgAddNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN, Datum nodeLabel)
Definition: spgdoinsert.c:1513
static void spgSplitNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, spgChooseOut *out)
Definition: spgdoinsert.c:1715
static void spgMatchNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN)
Definition: spgdoinsert.c:1459
static bool doPickSplit(Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, int level, bool isNulls, bool isNew)
Definition: spgdoinsert.c:677
static void moveLeafs(Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, bool isNulls)
Definition: spgdoinsert.c:387
static int checkSplitConditions(Relation index, SpGistState *state, SPPageDesc *current, int *nToSplit)
Definition: spgdoinsert.c:333
static void addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
Definition: spgdoinsert.c:203
#define SPGIST_COMPRESS_PROC
Definition: spgist.h:28
#define SPGIST_CHOOSE_PROC
Definition: spgist.h:24
@ spgMatchNode
Definition: spgist.h:69
@ spgAddNode
Definition: spgist.h:70
@ spgSplitTuple
Definition: spgist.h:71
#define SPGIST_NULL_BLKNO
#define spgFirstIncludeColumn
#define SpGistPageIsLeaf(page)
#define SPGIST_ROOT_BLKNO
Datum * spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple)
Definition: spgutils.c:1142
Size SpGistGetLeafTupleSize(TupleDesc tupleDescriptor, const Datum *datums, const bool *isnulls)
Definition: spgutils.c:799
Datum * nodeLabels
Definition: spgist.h:64
bool hasPrefix
Definition: spgist.h:61
Datum prefixDatum
Definition: spgist.h:62
int nNodes
Definition: spgist.h:63
Datum datum
Definition: spgist.h:55
int level
Definition: spgist.h:57
Datum leafDatum
Definition: spgist.h:56
bool allTheSame
Definition: spgist.h:60
spgChooseResultType resultType
Definition: spgist.h:76
union spgChooseOut::@50 result
struct spgChooseOut::@50::@52 addNode
struct spgChooseOut::@50::@51 matchNode
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92

References addLeafTuple(), spgChooseOut::addNode, spgChooseIn::allTheSame, SpGistInnerTupleData::allTheSame, Assert, SPPageDesc::blkno, SPPageDesc::buffer, BUFFER_LOCK_EXCLUSIVE, BufferGetBlockNumber(), BufferGetPage(), CHECK_FOR_INTERRUPTS, checkSplitConditions(), ConditionalLockBuffer(), spgChooseIn::datum, doPickSplit(), elog, ereport, errcode(), errhint(), errmsg(), ERROR, FirstOffsetNumber, FunctionCall1Coll(), FunctionCall2Coll(), GBUF_LEAF, GBUF_NULLS, spgChooseIn::hasPrefix, i, index_getprocid(), index_getprocinfo(), INDEX_MAX_KEYS, INTERRUPTS_CAN_BE_PROCESSED, INTERRUPTS_PENDING_CONDITION, InvalidBlockNumber, InvalidBuffer, InvalidOffsetNumber, spgChooseIn::leafDatum, spgChooseIn::level, LockBuffer(), spgChooseOut::matchNode, Min, moveLeafs(), TupleDescData::natts, spgChooseIn::nNodes, SpGistInnerTupleData::nNodes, SPPageDesc::node, spgChooseIn::nodeLabels, SPPageDesc::offnum, OidIsValid, SPPageDesc::page, PageGetItem(), PageGetItemId(), pfree(), PG_DETOAST_DATUM, pg_global_prng_state, pg_prng_uint64_range(), PointerGetDatum(), spgChooseIn::prefixDatum, SpGistInnerTupleData::prefixSize, ReadBuffer(), RelationGetRelationName, ReleaseBuffer(), spgChooseOut::result, spgChooseOut::resultType, SGITDATUM, SpGistLeafTupleData::size, spgAddNode, spgAddNodeAction(), spgExtractNodeLabels(), spgFirstIncludeColumn, spgFormLeafTuple(), SPGIST_CHOOSE_PROC, SPGIST_COMPRESS_PROC, SPGIST_NULL_BLKNO, SPGIST_PAGE_CAPACITY, SPGIST_ROOT_BLKNO, SpGistGetBuffer(), SpGistGetLeafTupleSize(), SpGistPageGetFreeSpace, SpGistPageIsLeaf, SpGistPageStoresNulls, SpGistSetLastUsedPage(), spgKeyColumn, spgMatchNode, spgMatchNodeAction(), spgSplitNodeAction(), spgSplitTuple, TupleDescAttr, and UnlockReleaseBuffer().

Referenced by spginsert(), and spgistBuildCallback().

◆ spgMatchNodeAction()

static void spgMatchNodeAction ( Relation  index,
SpGistState state,
SpGistInnerTuple  innerTuple,
SPPageDesc current,
SPPageDesc parent,
int  nodeN 
)
static

Definition at line 1459 of file spgdoinsert.c.

1462 {
1463  int i;
1464  SpGistNodeTuple node;
1465 
1466  /* Release previous parent buffer if any */
1467  if (parent->buffer != InvalidBuffer &&
1468  parent->buffer != current->buffer)
1469  {
1471  UnlockReleaseBuffer(parent->buffer);
1472  }
1473 
1474  /* Repoint parent to specified node of current inner tuple */
1475  parent->blkno = current->blkno;
1476  parent->buffer = current->buffer;
1477  parent->page = current->page;
1478  parent->offnum = current->offnum;
1479  parent->node = nodeN;
1480 
1481  /* Locate that node */
1482  SGITITERATE(innerTuple, i, node)
1483  {
1484  if (i == nodeN)
1485  break;
1486  }
1487 
1488  if (i != nodeN)
1489  elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1490  nodeN);
1491 
1492  /* Point current to the downlink location, if any */
1493  if (ItemPointerIsValid(&node->t_tid))
1494  {
1495  current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1496  current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1497  }
1498  else
1499  {
1500  /* Downlink is empty, so we'll need to find a new page */
1501  current->blkno = InvalidBlockNumber;
1502  current->offnum = InvalidOffsetNumber;
1503  }
1504 
1505  current->buffer = InvalidBuffer;
1506  current->page = NULL;
1507 }
ItemPointerData t_tid
Definition: itup.h:37

References SPPageDesc::blkno, SPPageDesc::buffer, elog, ERROR, i, InvalidBlockNumber, InvalidBuffer, InvalidOffsetNumber, ItemPointerGetBlockNumber(), ItemPointerGetOffsetNumber(), ItemPointerIsValid(), SPPageDesc::node, SPPageDesc::offnum, SPPageDesc::page, SGITITERATE, SpGistSetLastUsedPage(), IndexTupleData::t_tid, and UnlockReleaseBuffer().

Referenced by spgdoinsert().

◆ spgPageIndexMultiDelete()

void spgPageIndexMultiDelete ( SpGistState state,
Page  page,
OffsetNumber itemnos,
int  nitems,
int  firststate,
int  reststate,
BlockNumber  blkno,
OffsetNumber  offnum 
)

Definition at line 131 of file spgdoinsert.c.

135 {
136  OffsetNumber firstItem;
138  SpGistDeadTuple tuple = NULL;
139  int i;
140 
141  if (nitems == 0)
142  return; /* nothing to do */
143 
144  /*
145  * For efficiency we want to use PageIndexMultiDelete, which requires the
146  * targets to be listed in sorted order, so we have to sort the itemnos
147  * array. (This also greatly simplifies the math for reinserting the
148  * replacement tuples.) However, we must not scribble on the caller's
149  * array, so we have to make a copy.
150  */
151  memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
152  if (nitems > 1)
153  qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
154 
155  PageIndexMultiDelete(page, sortednos, nitems);
156 
157  firstItem = itemnos[0];
158 
159  for (i = 0; i < nitems; i++)
160  {
161  OffsetNumber itemno = sortednos[i];
162  int tupstate;
163 
164  tupstate = (itemno == firstItem) ? firststate : reststate;
165  if (tuple == NULL || tuple->tupstate != tupstate)
166  tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
167 
168  if (PageAddItem(page, (Item) tuple, tuple->size,
169  itemno, false, false) != itemno)
170  elog(ERROR, "failed to add item of size %u to SPGiST index page",
171  tuple->size);
172 
173  if (tupstate == SPGIST_REDIRECT)
174  SpGistPageGetOpaque(page)->nRedirection++;
175  else if (tupstate == SPGIST_PLACEHOLDER)
176  SpGistPageGetOpaque(page)->nPlaceholder++;
177  }
178 }
void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
Definition: bufpage.c:1161
#define nitems(x)
Definition: indent.h:31
#define MaxIndexTuplesPerPage
Definition: itup.h:165
#define qsort(a, b, c, d)
Definition: port.h:449
static int cmpOffsetNumbers(const void *a, const void *b)
Definition: spgdoinsert.c:112

References cmpOffsetNumbers(), elog, ERROR, i, MaxIndexTuplesPerPage, nitems, PageAddItem, PageIndexMultiDelete(), qsort, SpGistDeadTupleData::size, spgFormDeadTuple(), SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistPageGetOpaque, and SpGistDeadTupleData::tupstate.

Referenced by doPickSplit(), moveLeafs(), spgRedoMoveLeafs(), spgRedoPickSplit(), spgRedoVacuumLeaf(), and vacuumLeafPage().

◆ spgSplitNodeAction()

static void spgSplitNodeAction ( Relation  index,
SpGistState state,
SpGistInnerTuple  innerTuple,
SPPageDesc current,
spgChooseOut out 
)
static

Definition at line 1715 of file spgdoinsert.c.

1718 {
1719  SpGistInnerTuple prefixTuple,
1720  postfixTuple;
1721  SpGistNodeTuple node,
1722  *nodes;
1723  BlockNumber postfixBlkno;
1724  OffsetNumber postfixOffset;
1725  int i;
1726  spgxlogSplitTuple xlrec;
1727  Buffer newBuffer = InvalidBuffer;
1728 
1729  /* Should not be applied to nulls */
1730  Assert(!SpGistPageStoresNulls(current->page));
1731 
1732  /* Check opclass gave us sane values */
1733  if (out->result.splitTuple.prefixNNodes <= 0 ||
1734  out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
1735  elog(ERROR, "invalid number of prefix nodes: %d",
1736  out->result.splitTuple.prefixNNodes);
1737  if (out->result.splitTuple.childNodeN < 0 ||
1738  out->result.splitTuple.childNodeN >=
1739  out->result.splitTuple.prefixNNodes)
1740  elog(ERROR, "invalid child node number: %d",
1741  out->result.splitTuple.childNodeN);
1742 
1743  /*
1744  * Construct new prefix tuple with requested number of nodes. We'll fill
1745  * in the childNodeN'th node's downlink below.
1746  */
1747  nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
1748  out->result.splitTuple.prefixNNodes);
1749 
1750  for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
1751  {
1752  Datum label = (Datum) 0;
1753  bool labelisnull;
1754 
1755  labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
1756  if (!labelisnull)
1757  label = out->result.splitTuple.prefixNodeLabels[i];
1758  nodes[i] = spgFormNodeTuple(state, label, labelisnull);
1759  }
1760 
1761  prefixTuple = spgFormInnerTuple(state,
1762  out->result.splitTuple.prefixHasPrefix,
1763  out->result.splitTuple.prefixPrefixDatum,
1764  out->result.splitTuple.prefixNNodes,
1765  nodes);
1766 
1767  /* it must fit in the space that innerTuple now occupies */
1768  if (prefixTuple->size > innerTuple->size)
1769  elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1770 
1771  /*
1772  * Construct new postfix tuple, containing all nodes of innerTuple with
1773  * same node datums, but with the prefix specified by the picksplit
1774  * function.
1775  */
1776  nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1777  SGITITERATE(innerTuple, i, node)
1778  {
1779  nodes[i] = node;
1780  }
1781 
1782  postfixTuple = spgFormInnerTuple(state,
1783  out->result.splitTuple.postfixHasPrefix,
1784  out->result.splitTuple.postfixPrefixDatum,
1785  innerTuple->nNodes, nodes);
1786 
1787  /* Postfix tuple is allTheSame if original tuple was */
1788  postfixTuple->allTheSame = innerTuple->allTheSame;
1789 
1790  /* prep data for WAL record */
1791  xlrec.newPage = false;
1792 
1793  /*
1794  * If we can't fit both tuples on the current page, get a new page for the
1795  * postfix tuple. In particular, can't split to the root page.
1796  *
1797  * For the space calculation, note that prefixTuple replaces innerTuple
1798  * but postfixTuple will be a new entry.
1799  */
1800  if (SpGistBlockIsRoot(current->blkno) ||
1801  SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1802  prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1803  {
1804  /*
1805  * Choose page with next triple parity, because postfix tuple is a
1806  * child of prefix one
1807  */
1808  newBuffer = SpGistGetBuffer(index,
1809  GBUF_INNER_PARITY(current->blkno + 1),
1810  postfixTuple->size + sizeof(ItemIdData),
1811  &xlrec.newPage);
1812  }
1813 
1815 
1816  /*
1817  * Replace old tuple by prefix tuple
1818  */
1819  PageIndexTupleDelete(current->page, current->offnum);
1820  xlrec.offnumPrefix = PageAddItem(current->page,
1821  (Item) prefixTuple, prefixTuple->size,
1822  current->offnum, false, false);
1823  if (xlrec.offnumPrefix != current->offnum)
1824  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1825  prefixTuple->size);
1826 
1827  /*
1828  * put postfix tuple into appropriate page
1829  */
1830  if (newBuffer == InvalidBuffer)
1831  {
1832  postfixBlkno = current->blkno;
1833  xlrec.offnumPostfix = postfixOffset =
1834  SpGistPageAddNewItem(state, current->page,
1835  (Item) postfixTuple, postfixTuple->size,
1836  NULL, false);
1837  xlrec.postfixBlkSame = true;
1838  }
1839  else
1840  {
1841  postfixBlkno = BufferGetBlockNumber(newBuffer);
1842  xlrec.offnumPostfix = postfixOffset =
1844  (Item) postfixTuple, postfixTuple->size,
1845  NULL, false);
1846  MarkBufferDirty(newBuffer);
1847  xlrec.postfixBlkSame = false;
1848  }
1849 
1850  /*
1851  * And set downlink pointer in the prefix tuple to point to postfix tuple.
1852  * (We can't avoid this step by doing the above two steps in opposite
1853  * order, because there might not be enough space on the page to insert
1854  * the postfix tuple first.) We have to update the local copy of the
1855  * prefixTuple too, because that's what will be written to WAL.
1856  */
1857  spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1858  postfixBlkno, postfixOffset);
1859  prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1860  PageGetItemId(current->page, current->offnum));
1861  spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1862  postfixBlkno, postfixOffset);
1863 
1864  MarkBufferDirty(current->buffer);
1865 
1866  if (RelationNeedsWAL(index) && !state->isBuild)
1867  {
1868  XLogRecPtr recptr;
1869 
1870  XLogBeginInsert();
1871  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1872  XLogRegisterData((char *) prefixTuple, prefixTuple->size);
1873  XLogRegisterData((char *) postfixTuple, postfixTuple->size);
1874 
1876  if (newBuffer != InvalidBuffer)
1877  {
1878  int flags;
1879 
1880  flags = REGBUF_STANDARD;
1881  if (xlrec.newPage)
1882  flags |= REGBUF_WILL_INIT;
1883  XLogRegisterBuffer(1, newBuffer, flags);
1884  }
1885 
1886  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1887 
1888  PageSetLSN(current->page, recptr);
1889 
1890  if (newBuffer != InvalidBuffer)
1891  {
1892  PageSetLSN(BufferGetPage(newBuffer), recptr);
1893  }
1894  }
1895 
1896  END_CRIT_SECTION();
1897 
1898  /* Update local free-space cache and release buffer */
1899  if (newBuffer != InvalidBuffer)
1900  {
1901  SpGistSetLastUsedPage(index, newBuffer);
1902  UnlockReleaseBuffer(newBuffer);
1903  }
1904 }
#define SGITMAXNNODES
#define XLOG_SPGIST_SPLIT_TUPLE
Definition: spgxlog.h:25
struct spgChooseOut::@50::@53 splitTuple
OffsetNumber offnumPostfix
Definition: spgxlog.h:147
OffsetNumber offnumPrefix
Definition: spgxlog.h:144
bool postfixBlkSame
Definition: spgxlog.h:149

References SpGistInnerTupleData::allTheSame, Assert, SPPageDesc::blkno, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage(), elog, END_CRIT_SECTION, ERROR, GBUF_INNER_PARITY, i, InvalidBuffer, label, MarkBufferDirty(), spgxlogSplitTuple::newPage, SpGistInnerTupleData::nNodes, SPPageDesc::offnum, spgxlogSplitTuple::offnumPostfix, spgxlogSplitTuple::offnumPrefix, SPPageDesc::page, PageAddItem, PageGetItem(), PageGetItemId(), PageIndexTupleDelete(), PageSetLSN(), palloc(), spgxlogSplitTuple::postfixBlkSame, REGBUF_STANDARD, REGBUF_WILL_INIT, RelationNeedsWAL, spgChooseOut::result, SGITITERATE, SGITMAXNNODES, SpGistInnerTupleData::size, spgFormInnerTuple(), spgFormNodeTuple(), SpGistBlockIsRoot, SpGistGetBuffer(), SpGistPageAddNewItem(), SpGistPageGetFreeSpace, SpGistPageStoresNulls, SpGistSetLastUsedPage(), spgUpdateNodeLink(), spgChooseOut::splitTuple, START_CRIT_SECTION, UnlockReleaseBuffer(), XLOG_SPGIST_SPLIT_TUPLE, XLogBeginInsert(), XLogInsert(), XLogRegisterBuffer(), and XLogRegisterData().

Referenced by spgdoinsert().

◆ spgUpdateNodeLink()

void spgUpdateNodeLink ( SpGistInnerTuple  tup,
int  nodeN,
BlockNumber  blkno,
OffsetNumber  offset 
)

Definition at line 52 of file spgdoinsert.c.

54 {
55  int i;
56  SpGistNodeTuple node;
57 
58  SGITITERATE(tup, i, node)
59  {
60  if (i == nodeN)
61  {
62  ItemPointerSet(&node->t_tid, blkno, offset);
63  return;
64  }
65  }
66 
67  elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
68  nodeN);
69 }

References elog, ERROR, i, ItemPointerSet(), SGITITERATE, and IndexTupleData::t_tid.

Referenced by saveNodeLink(), spgRedoAddLeaf(), spgRedoAddNode(), spgRedoMoveLeafs(), spgRedoPickSplit(), and spgSplitNodeAction().