PostgreSQL Source Code  git master
spgdoinsert.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * spgdoinsert.c
4  * implementation of insert algorithm
5  *
6  *
7  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  * src/backend/access/spgist/spgdoinsert.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/genam.h"
19 #include "access/spgist_private.h"
20 #include "access/spgxlog.h"
21 #include "access/xloginsert.h"
22 #include "miscadmin.h"
23 #include "storage/bufmgr.h"
24 #include "utils/rel.h"
25 
26 
27 /*
28  * SPPageDesc tracks all info about a page we are inserting into. In some
29  * situations it actually identifies a tuple, or even a specific node within
30  * an inner tuple. But any of the fields can be invalid. If the buffer
31  * field is valid, it implies we hold pin and exclusive lock on that buffer.
32  * page pointer should be valid exactly when buffer is.
33  */
34 typedef struct SPPageDesc
35 {
36  BlockNumber blkno; /* block number, or InvalidBlockNumber */
37  Buffer buffer; /* page's buffer number, or InvalidBuffer */
38  Page page; /* pointer to page buffer, or NULL */
39  OffsetNumber offnum; /* offset of tuple, or InvalidOffsetNumber */
40  int node; /* node number within inner tuple, or -1 */
41 } SPPageDesc;
42 
43 
44 /*
45  * Set the item pointer in the nodeN'th entry in inner tuple tup. This
46  * is used to update the parent inner tuple's downlink after a move or
47  * split operation.
48  */
49 void
52 {
53  int i;
55 
56  SGITITERATE(tup, i, node)
57  {
58  if (i == nodeN)
59  {
60  ItemPointerSet(&node->t_tid, blkno, offset);
61  return;
62  }
63  }
64 
65  elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
66  nodeN);
67 }
68 
69 /*
70  * Form a new inner tuple containing one more node than the given one, with
71  * the specified label datum, inserted at offset "offset" in the node array.
72  * The new tuple's prefix is the same as the old one's.
73  *
74  * Note that the new node initially has an invalid downlink. We'll find a
75  * page to point it to later.
76  */
77 static SpGistInnerTuple
79 {
81  *nodes;
82  int i;
83 
84  /* if offset is negative, insert at end */
85  if (offset < 0)
86  offset = tuple->nNodes;
87  else if (offset > tuple->nNodes)
88  elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
89 
90  nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
91  SGITITERATE(tuple, i, node)
92  {
93  if (i < offset)
94  nodes[i] = node;
95  else
96  nodes[i + 1] = node;
97  }
98 
99  nodes[offset] = spgFormNodeTuple(state, label, false);
100 
101  return spgFormInnerTuple(state,
102  (tuple->prefixSize > 0),
103  SGITDATUM(tuple, state),
104  tuple->nNodes + 1,
105  nodes);
106 }
107 
108 /* qsort comparator for sorting OffsetNumbers */
109 static int
110 cmpOffsetNumbers(const void *a, const void *b)
111 {
112  if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
113  return 0;
114  return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
115 }
116 
117 /*
118  * Delete multiple tuples from an index page, preserving tuple offset numbers.
119  *
120  * The first tuple in the given list is replaced with a dead tuple of type
121  * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
122  * with dead tuples of type "reststate". If either firststate or reststate
123  * is REDIRECT, blkno/offnum specify where to link to.
124  *
125  * NB: this is used during WAL replay, so beware of trying to make it too
126  * smart. In particular, it shouldn't use "state" except for calling
127  * spgFormDeadTuple(). This is also used in a critical section, so no
128  * pallocs either!
129  */
130 void
132  OffsetNumber *itemnos, int nitems,
133  int firststate, int reststate,
135 {
136  OffsetNumber firstItem;
138  SpGistDeadTuple tuple = NULL;
139  int i;
140 
141  if (nitems == 0)
142  return; /* nothing to do */
143 
144  /*
145  * For efficiency we want to use PageIndexMultiDelete, which requires the
146  * targets to be listed in sorted order, so we have to sort the itemnos
147  * array. (This also greatly simplifies the math for reinserting the
148  * replacement tuples.) However, we must not scribble on the caller's
149  * array, so we have to make a copy.
150  */
151  memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
152  if (nitems > 1)
153  qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
154 
155  PageIndexMultiDelete(page, sortednos, nitems);
156 
157  firstItem = itemnos[0];
158 
159  for (i = 0; i < nitems; i++)
160  {
161  OffsetNumber itemno = sortednos[i];
162  int tupstate;
163 
164  tupstate = (itemno == firstItem) ? firststate : reststate;
165  if (tuple == NULL || tuple->tupstate != tupstate)
166  tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
167 
168  if (PageAddItem(page, (Item) tuple, tuple->size,
169  itemno, false, false) != itemno)
170  elog(ERROR, "failed to add item of size %u to SPGiST index page",
171  tuple->size);
172 
173  if (tupstate == SPGIST_REDIRECT)
174  SpGistPageGetOpaque(page)->nRedirection++;
175  else if (tupstate == SPGIST_PLACEHOLDER)
176  SpGistPageGetOpaque(page)->nPlaceholder++;
177  }
178 }
179 
180 /*
181  * Update the parent inner tuple's downlink, and mark the parent buffer
182  * dirty (this must be the last change to the parent page in the current
183  * WAL action).
184  */
185 static void
188 {
189  SpGistInnerTuple innerTuple;
190 
191  innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
192  PageGetItemId(parent->page, parent->offnum));
193 
194  spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
195 
196  MarkBufferDirty(parent->buffer);
197 }
198 
199 /*
200  * Add a leaf tuple to a leaf page where there is known to be room for it
201  */
202 static void
204  SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
205 {
206  spgxlogAddLeaf xlrec;
207 
208  xlrec.newPage = isNew;
209  xlrec.storesNulls = isNulls;
210 
211  /* these will be filled below as needed */
215  xlrec.nodeI = 0;
216 
218 
219  if (current->offnum == InvalidOffsetNumber ||
220  SpGistBlockIsRoot(current->blkno))
221  {
222  /* Tuple is not part of a chain */
224  current->offnum = SpGistPageAddNewItem(state, current->page,
225  (Item) leafTuple, leafTuple->size,
226  NULL, false);
227 
228  xlrec.offnumLeaf = current->offnum;
229 
230  /* Must update parent's downlink if any */
231  if (parent->buffer != InvalidBuffer)
232  {
233  xlrec.offnumParent = parent->offnum;
234  xlrec.nodeI = parent->node;
235 
236  saveNodeLink(index, parent, current->blkno, current->offnum);
237  }
238  }
239  else
240  {
241  /*
242  * Tuple must be inserted into existing chain. We mustn't change the
243  * chain's head address, but we don't need to chase the entire chain
244  * to put the tuple at the end; we can insert it second.
245  *
246  * Also, it's possible that the "chain" consists only of a DEAD tuple,
247  * in which case we should replace the DEAD tuple in-place.
248  */
249  SpGistLeafTuple head;
251 
252  head = (SpGistLeafTuple) PageGetItem(current->page,
253  PageGetItemId(current->page, current->offnum));
254  if (head->tupstate == SPGIST_LIVE)
255  {
256  SGLT_SET_NEXTOFFSET(leafTuple, SGLT_GET_NEXTOFFSET(head));
257  offnum = SpGistPageAddNewItem(state, current->page,
258  (Item) leafTuple, leafTuple->size,
259  NULL, false);
260 
261  /*
262  * re-get head of list because it could have been moved on page,
263  * and set new second element
264  */
265  head = (SpGistLeafTuple) PageGetItem(current->page,
266  PageGetItemId(current->page, current->offnum));
267  SGLT_SET_NEXTOFFSET(head, offnum);
268 
269  xlrec.offnumLeaf = offnum;
270  xlrec.offnumHeadLeaf = current->offnum;
271  }
272  else if (head->tupstate == SPGIST_DEAD)
273  {
275  PageIndexTupleDelete(current->page, current->offnum);
276  if (PageAddItem(current->page,
277  (Item) leafTuple, leafTuple->size,
278  current->offnum, false, false) != current->offnum)
279  elog(ERROR, "failed to add item of size %u to SPGiST index page",
280  leafTuple->size);
281 
282  /* WAL replay distinguishes this case by equal offnums */
283  xlrec.offnumLeaf = current->offnum;
284  xlrec.offnumHeadLeaf = current->offnum;
285  }
286  else
287  elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
288  }
289 
290  MarkBufferDirty(current->buffer);
291 
292  if (RelationNeedsWAL(index) && !state->isBuild)
293  {
294  XLogRecPtr recptr;
295  int flags;
296 
297  XLogBeginInsert();
298  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
299  XLogRegisterData((char *) leafTuple, leafTuple->size);
300 
301  flags = REGBUF_STANDARD;
302  if (xlrec.newPage)
303  flags |= REGBUF_WILL_INIT;
304  XLogRegisterBuffer(0, current->buffer, flags);
305  if (xlrec.offnumParent != InvalidOffsetNumber)
307 
308  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
309 
310  PageSetLSN(current->page, recptr);
311 
312  /* update parent only if we actually changed it */
313  if (xlrec.offnumParent != InvalidOffsetNumber)
314  {
315  PageSetLSN(parent->page, recptr);
316  }
317  }
318 
320 }
321 
322 /*
323  * Count the number and total size of leaf tuples in the chain starting at
324  * current->offnum. Return number into *nToSplit and total size as function
325  * result.
326  *
327  * Klugy special case when considering the root page (i.e., root is a leaf
328  * page, but we're about to split for the first time): return fake large
329  * values to force spgdoinsert() to take the doPickSplit rather than
330  * moveLeafs code path. moveLeafs is not prepared to deal with root page.
331  */
332 static int
334  SPPageDesc *current, int *nToSplit)
335 {
336  int i,
337  n = 0,
338  totalSize = 0;
339 
340  if (SpGistBlockIsRoot(current->blkno))
341  {
342  /* return impossible values to force split */
343  *nToSplit = BLCKSZ;
344  return BLCKSZ;
345  }
346 
347  i = current->offnum;
348  while (i != InvalidOffsetNumber)
349  {
350  SpGistLeafTuple it;
351 
352  Assert(i >= FirstOffsetNumber &&
353  i <= PageGetMaxOffsetNumber(current->page));
354  it = (SpGistLeafTuple) PageGetItem(current->page,
355  PageGetItemId(current->page, i));
356  if (it->tupstate == SPGIST_LIVE)
357  {
358  n++;
359  totalSize += it->size + sizeof(ItemIdData);
360  }
361  else if (it->tupstate == SPGIST_DEAD)
362  {
363  /* We could see a DEAD tuple as first/only chain item */
364  Assert(i == current->offnum);
366  /* Don't count it in result, because it won't go to other page */
367  }
368  else
369  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
370 
371  i = SGLT_GET_NEXTOFFSET(it);
372  }
373 
374  *nToSplit = n;
375 
376  return totalSize;
377 }
378 
379 /*
380  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
381  * but the chain has to be moved because there's not enough room to add
382  * newLeafTuple to its page. We use this method when the chain contains
383  * very little data so a split would be inefficient. We are sure we can
384  * fit the chain plus newLeafTuple on one other page.
385  */
386 static void
388  SPPageDesc *current, SPPageDesc *parent,
389  SpGistLeafTuple newLeafTuple, bool isNulls)
390 {
391  int i,
392  nDelete,
393  nInsert,
394  size;
395  Buffer nbuf;
396  Page npage;
397  SpGistLeafTuple it;
399  startOffset = InvalidOffsetNumber;
400  bool replaceDead = false;
401  OffsetNumber *toDelete;
402  OffsetNumber *toInsert;
403  BlockNumber nblkno;
404  spgxlogMoveLeafs xlrec;
405  char *leafdata,
406  *leafptr;
407 
408  /* This doesn't work on root page */
409  Assert(parent->buffer != InvalidBuffer);
410  Assert(parent->buffer != current->buffer);
411 
412  /* Locate the tuples to be moved, and count up the space needed */
413  i = PageGetMaxOffsetNumber(current->page);
414  toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
415  toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
416 
417  size = newLeafTuple->size + sizeof(ItemIdData);
418 
419  nDelete = 0;
420  i = current->offnum;
421  while (i != InvalidOffsetNumber)
422  {
423  SpGistLeafTuple it;
424 
425  Assert(i >= FirstOffsetNumber &&
426  i <= PageGetMaxOffsetNumber(current->page));
427  it = (SpGistLeafTuple) PageGetItem(current->page,
428  PageGetItemId(current->page, i));
429 
430  if (it->tupstate == SPGIST_LIVE)
431  {
432  toDelete[nDelete] = i;
433  size += it->size + sizeof(ItemIdData);
434  nDelete++;
435  }
436  else if (it->tupstate == SPGIST_DEAD)
437  {
438  /* We could see a DEAD tuple as first/only chain item */
439  Assert(i == current->offnum);
441  /* We don't want to move it, so don't count it in size */
442  toDelete[nDelete] = i;
443  nDelete++;
444  replaceDead = true;
445  }
446  else
447  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
448 
449  i = SGLT_GET_NEXTOFFSET(it);
450  }
451 
452  /* Find a leaf page that will hold them */
453  nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
454  size, &xlrec.newPage);
455  npage = BufferGetPage(nbuf);
456  nblkno = BufferGetBlockNumber(nbuf);
457  Assert(nblkno != current->blkno);
458 
459  leafdata = leafptr = palloc(size);
460 
462 
463  /* copy all the old tuples to new page, unless they're dead */
464  nInsert = 0;
465  if (!replaceDead)
466  {
467  for (i = 0; i < nDelete; i++)
468  {
469  it = (SpGistLeafTuple) PageGetItem(current->page,
470  PageGetItemId(current->page, toDelete[i]));
471  Assert(it->tupstate == SPGIST_LIVE);
472 
473  /*
474  * Update chain link (notice the chain order gets reversed, but we
475  * don't care). We're modifying the tuple on the source page
476  * here, but it's okay since we're about to delete it.
477  */
478  SGLT_SET_NEXTOFFSET(it, r);
479 
480  r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
481  &startOffset, false);
482 
483  toInsert[nInsert] = r;
484  nInsert++;
485 
486  /* save modified tuple into leafdata as well */
487  memcpy(leafptr, it, it->size);
488  leafptr += it->size;
489  }
490  }
491 
492  /* add the new tuple as well */
493  SGLT_SET_NEXTOFFSET(newLeafTuple, r);
494  r = SpGistPageAddNewItem(state, npage,
495  (Item) newLeafTuple, newLeafTuple->size,
496  &startOffset, false);
497  toInsert[nInsert] = r;
498  nInsert++;
499  memcpy(leafptr, newLeafTuple, newLeafTuple->size);
500  leafptr += newLeafTuple->size;
501 
502  /*
503  * Now delete the old tuples, leaving a redirection pointer behind for the
504  * first one, unless we're doing an index build; in which case there can't
505  * be any concurrent scan so we need not provide a redirect.
506  */
507  spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
510  nblkno, r);
511 
512  /* Update parent's downlink and mark parent page dirty */
513  saveNodeLink(index, parent, nblkno, r);
514 
515  /* Mark the leaf pages too */
516  MarkBufferDirty(current->buffer);
517  MarkBufferDirty(nbuf);
518 
519  if (RelationNeedsWAL(index) && !state->isBuild)
520  {
521  XLogRecPtr recptr;
522 
523  /* prepare WAL info */
524  STORE_STATE(state, xlrec.stateSrc);
525 
526  xlrec.nMoves = nDelete;
527  xlrec.replaceDead = replaceDead;
528  xlrec.storesNulls = isNulls;
529 
530  xlrec.offnumParent = parent->offnum;
531  xlrec.nodeI = parent->node;
532 
533  XLogBeginInsert();
534  XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
535  XLogRegisterData((char *) toDelete,
536  sizeof(OffsetNumber) * nDelete);
537  XLogRegisterData((char *) toInsert,
538  sizeof(OffsetNumber) * nInsert);
539  XLogRegisterData((char *) leafdata, leafptr - leafdata);
540 
542  XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
544 
545  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
546 
547  PageSetLSN(current->page, recptr);
548  PageSetLSN(npage, recptr);
549  PageSetLSN(parent->page, recptr);
550  }
551 
553 
554  /* Update local free-space cache and release new buffer */
555  SpGistSetLastUsedPage(index, nbuf);
556  UnlockReleaseBuffer(nbuf);
557 }
558 
559 /*
560  * Update previously-created redirection tuple with appropriate destination
561  *
562  * We use this when it's not convenient to know the destination first.
563  * The tuple should have been made with the "impossible" destination of
564  * the metapage.
565  */
566 static void
569 {
570  SpGistDeadTuple dt;
571 
572  dt = (SpGistDeadTuple) PageGetItem(current->page,
573  PageGetItemId(current->page, position));
576  ItemPointerSet(&dt->pointer, blkno, offnum);
577 }
578 
579 /*
580  * Test to see if the user-defined picksplit function failed to do its job,
581  * ie, it put all the leaf tuples into the same node.
582  * If so, randomly divide the tuples into several nodes (all with the same
583  * label) and return true to select allTheSame mode for this inner tuple.
584  *
585  * (This code is also used to forcibly select allTheSame mode for nulls.)
586  *
587  * If we know that the leaf tuples wouldn't all fit on one page, then we
588  * exclude the last tuple (which is the incoming new tuple that forced a split)
589  * from the check to see if more than one node is used. The reason for this
590  * is that if the existing tuples are put into only one chain, then even if
591  * we move them all to an empty page, there would still not be room for the
592  * new tuple, so we'd get into an infinite loop of picksplit attempts.
593  * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
594  * be split across pages. (Exercise for the reader: figure out why this
595  * fixes the problem even when there is only one old tuple.)
596  */
597 static bool
599  bool *includeNew)
600 {
601  int theNode;
602  int limit;
603  int i;
604 
605  /* For the moment, assume we can include the new leaf tuple */
606  *includeNew = true;
607 
608  /* If there's only the new leaf tuple, don't select allTheSame mode */
609  if (in->nTuples <= 1)
610  return false;
611 
612  /* If tuple set doesn't fit on one page, ignore the new tuple in test */
613  limit = tooBig ? in->nTuples - 1 : in->nTuples;
614 
615  /* Check to see if more than one node is populated */
616  theNode = out->mapTuplesToNodes[0];
617  for (i = 1; i < limit; i++)
618  {
619  if (out->mapTuplesToNodes[i] != theNode)
620  return false;
621  }
622 
623  /* Nope, so override the picksplit function's decisions */
624 
625  /* If the new tuple is in its own node, it can't be included in split */
626  if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
627  *includeNew = false;
628 
629  out->nNodes = 8; /* arbitrary number of child nodes */
630 
631  /* Random assignment of tuples to nodes (note we include new tuple) */
632  for (i = 0; i < in->nTuples; i++)
633  out->mapTuplesToNodes[i] = i % out->nNodes;
634 
635  /* The opclass may not use node labels, but if it does, duplicate 'em */
636  if (out->nodeLabels)
637  {
638  Datum theLabel = out->nodeLabels[theNode];
639 
640  out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
641  for (i = 0; i < out->nNodes; i++)
642  out->nodeLabels[i] = theLabel;
643  }
644 
645  /* We don't touch the prefix or the leaf tuple datum assignments */
646 
647  return true;
648 }
649 
650 /*
651  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
652  * but the chain has to be split because there's not enough room to add
653  * newLeafTuple to its page.
654  *
655  * This function splits the leaf tuple set according to picksplit's rules,
656  * creating one or more new chains that are spread across the current page
657  * and an additional leaf page (we assume that two leaf pages will be
658  * sufficient). A new inner tuple is created, and the parent downlink
659  * pointer is updated to point to that inner tuple instead of the leaf chain.
660  *
661  * On exit, current contains the address of the new inner tuple.
662  *
663  * Returns true if we successfully inserted newLeafTuple during this function,
664  * false if caller still has to do it (meaning another picksplit operation is
665  * probably needed). Failure could occur if the picksplit result is fairly
666  * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
667  * Because we force the picksplit result to be at least two chains, each
668  * cycle will get rid of at least one leaf tuple from the chain, so the loop
669  * will eventually terminate if lack of balance is the issue. If the tuple
670  * is too big, we assume that repeated picksplit operations will eventually
671  * make it small enough by repeated prefix-stripping. A broken opclass could
672  * make this an infinite loop, though, so spgdoinsert() checks that the
673  * leaf datums get smaller each time.
674  */
675 static bool
677  SPPageDesc *current, SPPageDesc *parent,
678  SpGistLeafTuple newLeafTuple,
679  int level, bool isNulls, bool isNew)
680 {
681  bool insertedNew = false;
682  spgPickSplitIn in;
683  spgPickSplitOut out;
684  FmgrInfo *procinfo;
685  bool includeNew;
686  int i,
687  max,
688  n;
689  SpGistInnerTuple innerTuple;
691  *nodes;
692  Buffer newInnerBuffer,
693  newLeafBuffer;
694  uint8 *leafPageSelect;
695  int *leafSizes;
696  OffsetNumber *toDelete;
697  OffsetNumber *toInsert;
698  OffsetNumber redirectTuplePos = InvalidOffsetNumber;
699  OffsetNumber startOffsets[2];
700  SpGistLeafTuple *oldLeafs;
701  SpGistLeafTuple *newLeafs;
702  Datum leafDatums[INDEX_MAX_KEYS];
703  bool leafIsnulls[INDEX_MAX_KEYS];
704  int spaceToDelete;
705  int currentFreeSpace;
706  int totalLeafSizes;
707  bool allTheSame;
708  spgxlogPickSplit xlrec;
709  char *leafdata,
710  *leafptr;
711  SPPageDesc saveCurrent;
712  int nToDelete,
713  nToInsert,
714  maxToInclude;
715 
716  in.level = level;
717 
718  /*
719  * Allocate per-leaf-tuple work arrays with max possible size
720  */
721  max = PageGetMaxOffsetNumber(current->page);
722  n = max + 1;
723  in.datums = (Datum *) palloc(sizeof(Datum) * n);
724  toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
725  toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
726  oldLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
727  newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
728  leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
729 
730  STORE_STATE(state, xlrec.stateSrc);
731 
732  /*
733  * Form list of leaf tuples which will be distributed as split result;
734  * also, count up the amount of space that will be freed from current.
735  * (Note that in the non-root case, we won't actually delete the old
736  * tuples, only replace them with redirects or placeholders.)
737  */
738  nToInsert = 0;
739  nToDelete = 0;
740  spaceToDelete = 0;
741  if (SpGistBlockIsRoot(current->blkno))
742  {
743  /*
744  * We are splitting the root (which up to now is also a leaf page).
745  * Its tuples are not linked, so scan sequentially to get them all. We
746  * ignore the original value of current->offnum.
747  */
748  for (i = FirstOffsetNumber; i <= max; i++)
749  {
750  SpGistLeafTuple it;
751 
752  it = (SpGistLeafTuple) PageGetItem(current->page,
753  PageGetItemId(current->page, i));
754  if (it->tupstate == SPGIST_LIVE)
755  {
756  in.datums[nToInsert] =
757  isNulls ? (Datum) 0 : SGLTDATUM(it, state);
758  oldLeafs[nToInsert] = it;
759  nToInsert++;
760  toDelete[nToDelete] = i;
761  nToDelete++;
762  /* we will delete the tuple altogether, so count full space */
763  spaceToDelete += it->size + sizeof(ItemIdData);
764  }
765  else /* tuples on root should be live */
766  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
767  }
768  }
769  else
770  {
771  /* Normal case, just collect the leaf tuples in the chain */
772  i = current->offnum;
773  while (i != InvalidOffsetNumber)
774  {
775  SpGistLeafTuple it;
776 
777  Assert(i >= FirstOffsetNumber && i <= max);
778  it = (SpGistLeafTuple) PageGetItem(current->page,
779  PageGetItemId(current->page, i));
780  if (it->tupstate == SPGIST_LIVE)
781  {
782  in.datums[nToInsert] =
783  isNulls ? (Datum) 0 : SGLTDATUM(it, state);
784  oldLeafs[nToInsert] = it;
785  nToInsert++;
786  toDelete[nToDelete] = i;
787  nToDelete++;
788  /* we will not delete the tuple, only replace with dead */
789  Assert(it->size >= SGDTSIZE);
790  spaceToDelete += it->size - SGDTSIZE;
791  }
792  else if (it->tupstate == SPGIST_DEAD)
793  {
794  /* We could see a DEAD tuple as first/only chain item */
795  Assert(i == current->offnum);
797  toDelete[nToDelete] = i;
798  nToDelete++;
799  /* replacing it with redirect will save no space */
800  }
801  else
802  elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
803 
804  i = SGLT_GET_NEXTOFFSET(it);
805  }
806  }
807  in.nTuples = nToInsert;
808 
809  /*
810  * We may not actually insert new tuple because another picksplit may be
811  * necessary due to too large value, but we will try to allocate enough
812  * space to include it; and in any case it has to be included in the input
813  * for the picksplit function. So don't increment nToInsert yet.
814  */
815  in.datums[in.nTuples] =
816  isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state);
817  oldLeafs[in.nTuples] = newLeafTuple;
818  in.nTuples++;
819 
820  memset(&out, 0, sizeof(out));
821 
822  if (!isNulls)
823  {
824  /*
825  * Perform split using user-defined method.
826  */
827  procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
828  FunctionCall2Coll(procinfo,
829  index->rd_indcollation[0],
830  PointerGetDatum(&in),
831  PointerGetDatum(&out));
832 
833  /*
834  * Form new leaf tuples and count up the total space needed.
835  */
836  totalLeafSizes = 0;
837  for (i = 0; i < in.nTuples; i++)
838  {
839  if (state->leafTupDesc->natts > 1)
840  spgDeformLeafTuple(oldLeafs[i],
841  state->leafTupDesc,
842  leafDatums,
843  leafIsnulls,
844  isNulls);
845 
846  leafDatums[spgKeyColumn] = out.leafTupleDatums[i];
847  leafIsnulls[spgKeyColumn] = false;
848 
849  newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
850  leafDatums,
851  leafIsnulls);
852  totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
853  }
854  }
855  else
856  {
857  /*
858  * Perform dummy split that puts all tuples into one node.
859  * checkAllTheSame will override this and force allTheSame mode.
860  */
861  out.hasPrefix = false;
862  out.nNodes = 1;
863  out.nodeLabels = NULL;
864  out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
865 
866  /*
867  * Form new leaf tuples and count up the total space needed.
868  */
869  totalLeafSizes = 0;
870  for (i = 0; i < in.nTuples; i++)
871  {
872  if (state->leafTupDesc->natts > 1)
873  spgDeformLeafTuple(oldLeafs[i],
874  state->leafTupDesc,
875  leafDatums,
876  leafIsnulls,
877  isNulls);
878 
879  /*
880  * Nulls tree can contain only null key values.
881  */
882  leafDatums[spgKeyColumn] = (Datum) 0;
883  leafIsnulls[spgKeyColumn] = true;
884 
885  newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
886  leafDatums,
887  leafIsnulls);
888  totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
889  }
890  }
891 
892  /*
893  * Check to see if the picksplit function failed to separate the values,
894  * ie, it put them all into the same child node. If so, select allTheSame
895  * mode and create a random split instead. See comments for
896  * checkAllTheSame as to why we need to know if the new leaf tuples could
897  * fit on one page.
898  */
899  allTheSame = checkAllTheSame(&in, &out,
900  totalLeafSizes > SPGIST_PAGE_CAPACITY,
901  &includeNew);
902 
903  /*
904  * If checkAllTheSame decided we must exclude the new tuple, don't
905  * consider it any further.
906  */
907  if (includeNew)
908  maxToInclude = in.nTuples;
909  else
910  {
911  maxToInclude = in.nTuples - 1;
912  totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
913  }
914 
915  /*
916  * Allocate per-node work arrays. Since checkAllTheSame could replace
917  * out.nNodes with a value larger than the number of tuples on the input
918  * page, we can't allocate these arrays before here.
919  */
920  nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
921  leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
922 
923  /*
924  * Form nodes of inner tuple and inner tuple itself
925  */
926  for (i = 0; i < out.nNodes; i++)
927  {
928  Datum label = (Datum) 0;
929  bool labelisnull = (out.nodeLabels == NULL);
930 
931  if (!labelisnull)
932  label = out.nodeLabels[i];
933  nodes[i] = spgFormNodeTuple(state, label, labelisnull);
934  }
935  innerTuple = spgFormInnerTuple(state,
936  out.hasPrefix, out.prefixDatum,
937  out.nNodes, nodes);
938  innerTuple->allTheSame = allTheSame;
939 
940  /*
941  * Update nodes[] array to point into the newly formed innerTuple, so that
942  * we can adjust their downlinks below.
943  */
944  SGITITERATE(innerTuple, i, node)
945  {
946  nodes[i] = node;
947  }
948 
949  /*
950  * Re-scan new leaf tuples and count up the space needed under each node.
951  */
952  for (i = 0; i < maxToInclude; i++)
953  {
954  n = out.mapTuplesToNodes[i];
955  if (n < 0 || n >= out.nNodes)
956  elog(ERROR, "inconsistent result of SPGiST picksplit function");
957  leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
958  }
959 
960  /*
961  * To perform the split, we must insert a new inner tuple, which can't go
962  * on a leaf page; and unless we are splitting the root page, we must then
963  * update the parent tuple's downlink to point to the inner tuple. If
964  * there is room, we'll put the new inner tuple on the same page as the
965  * parent tuple, otherwise we need another non-leaf buffer. But if the
966  * parent page is the root, we can't add the new inner tuple there,
967  * because the root page must have only one inner tuple.
968  */
969  xlrec.initInner = false;
970  if (parent->buffer != InvalidBuffer &&
971  !SpGistBlockIsRoot(parent->blkno) &&
972  (SpGistPageGetFreeSpace(parent->page, 1) >=
973  innerTuple->size + sizeof(ItemIdData)))
974  {
975  /* New inner tuple will fit on parent page */
976  newInnerBuffer = parent->buffer;
977  }
978  else if (parent->buffer != InvalidBuffer)
979  {
980  /* Send tuple to page with next triple parity (see README) */
981  newInnerBuffer = SpGistGetBuffer(index,
982  GBUF_INNER_PARITY(parent->blkno + 1) |
983  (isNulls ? GBUF_NULLS : 0),
984  innerTuple->size + sizeof(ItemIdData),
985  &xlrec.initInner);
986  }
987  else
988  {
989  /* Root page split ... inner tuple will go to root page */
990  newInnerBuffer = InvalidBuffer;
991  }
992 
993  /*
994  * The new leaf tuples converted from the existing ones should require the
995  * same or less space, and therefore should all fit onto one page
996  * (although that's not necessarily the current page, since we can't
997  * delete the old tuples but only replace them with placeholders).
998  * However, the incoming new tuple might not also fit, in which case we
999  * might need another picksplit cycle to reduce it some more.
1000  *
1001  * If there's not room to put everything back onto the current page, then
1002  * we decide on a per-node basis which tuples go to the new page. (We do
1003  * it like that because leaf tuple chains can't cross pages, so we must
1004  * place all leaf tuples belonging to the same parent node on the same
1005  * page.)
1006  *
1007  * If we are splitting the root page (turning it from a leaf page into an
1008  * inner page), then no leaf tuples can go back to the current page; they
1009  * must all go somewhere else.
1010  */
1011  if (!SpGistBlockIsRoot(current->blkno))
1012  currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
1013  else
1014  currentFreeSpace = 0; /* prevent assigning any tuples to current */
1015 
1016  xlrec.initDest = false;
1017 
1018  if (totalLeafSizes <= currentFreeSpace)
1019  {
1020  /* All the leaf tuples will fit on current page */
1021  newLeafBuffer = InvalidBuffer;
1022  /* mark new leaf tuple as included in insertions, if allowed */
1023  if (includeNew)
1024  {
1025  nToInsert++;
1026  insertedNew = true;
1027  }
1028  for (i = 0; i < nToInsert; i++)
1029  leafPageSelect[i] = 0; /* signifies current page */
1030  }
1031  else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1032  {
1033  /*
1034  * We're trying to split up a long value by repeated suffixing, but
1035  * it's not going to fit yet. Don't bother allocating a second leaf
1036  * buffer that we won't be able to use.
1037  */
1038  newLeafBuffer = InvalidBuffer;
1039  Assert(includeNew);
1040  Assert(nToInsert == 0);
1041  }
1042  else
1043  {
1044  /* We will need another leaf page */
1045  uint8 *nodePageSelect;
1046  int curspace;
1047  int newspace;
1048 
1049  newLeafBuffer = SpGistGetBuffer(index,
1050  GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1051  Min(totalLeafSizes,
1053  &xlrec.initDest);
1054 
1055  /*
1056  * Attempt to assign node groups to the two pages. We might fail to
1057  * do so, even if totalLeafSizes is less than the available space,
1058  * because we can't split a group across pages.
1059  */
1060  nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1061 
1062  curspace = currentFreeSpace;
1063  newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1064  for (i = 0; i < out.nNodes; i++)
1065  {
1066  if (leafSizes[i] <= curspace)
1067  {
1068  nodePageSelect[i] = 0; /* signifies current page */
1069  curspace -= leafSizes[i];
1070  }
1071  else
1072  {
1073  nodePageSelect[i] = 1; /* signifies new leaf page */
1074  newspace -= leafSizes[i];
1075  }
1076  }
1077  if (curspace >= 0 && newspace >= 0)
1078  {
1079  /* Successful assignment, so we can include the new leaf tuple */
1080  if (includeNew)
1081  {
1082  nToInsert++;
1083  insertedNew = true;
1084  }
1085  }
1086  else if (includeNew)
1087  {
1088  /* We must exclude the new leaf tuple from the split */
1089  int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1090 
1091  leafSizes[nodeOfNewTuple] -=
1092  newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1093 
1094  /* Repeat the node assignment process --- should succeed now */
1095  curspace = currentFreeSpace;
1096  newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1097  for (i = 0; i < out.nNodes; i++)
1098  {
1099  if (leafSizes[i] <= curspace)
1100  {
1101  nodePageSelect[i] = 0; /* signifies current page */
1102  curspace -= leafSizes[i];
1103  }
1104  else
1105  {
1106  nodePageSelect[i] = 1; /* signifies new leaf page */
1107  newspace -= leafSizes[i];
1108  }
1109  }
1110  if (curspace < 0 || newspace < 0)
1111  elog(ERROR, "failed to divide leaf tuple groups across pages");
1112  }
1113  else
1114  {
1115  /* oops, we already excluded new tuple ... should not get here */
1116  elog(ERROR, "failed to divide leaf tuple groups across pages");
1117  }
1118  /* Expand the per-node assignments to be shown per leaf tuple */
1119  for (i = 0; i < nToInsert; i++)
1120  {
1121  n = out.mapTuplesToNodes[i];
1122  leafPageSelect[i] = nodePageSelect[n];
1123  }
1124  }
1125 
1126  /* Start preparing WAL record */
1127  xlrec.nDelete = 0;
1128  xlrec.initSrc = isNew;
1129  xlrec.storesNulls = isNulls;
1130  xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1131 
1132  leafdata = leafptr = (char *) palloc(totalLeafSizes);
1133 
1134  /* Here we begin making the changes to the target pages */
1136 
1137  /*
1138  * Delete old leaf tuples from current buffer, except when we're splitting
1139  * the root; in that case there's no need because we'll re-init the page
1140  * below. We do this first to make room for reinserting new leaf tuples.
1141  */
1142  if (!SpGistBlockIsRoot(current->blkno))
1143  {
1144  /*
1145  * Init buffer instead of deleting individual tuples, but only if
1146  * there aren't any other live tuples and only during build; otherwise
1147  * we need to set a redirection tuple for concurrent scans.
1148  */
1149  if (state->isBuild &&
1150  nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1151  PageGetMaxOffsetNumber(current->page))
1152  {
1153  SpGistInitBuffer(current->buffer,
1154  SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1155  xlrec.initSrc = true;
1156  }
1157  else if (isNew)
1158  {
1159  /* don't expose the freshly init'd buffer as a backup block */
1160  Assert(nToDelete == 0);
1161  }
1162  else
1163  {
1164  xlrec.nDelete = nToDelete;
1165 
1166  if (!state->isBuild)
1167  {
1168  /*
1169  * Need to create redirect tuple (it will point to new inner
1170  * tuple) but right now the new tuple's location is not known
1171  * yet. So, set the redirection pointer to "impossible" value
1172  * and remember its position to update tuple later.
1173  */
1174  if (nToDelete > 0)
1175  redirectTuplePos = toDelete[0];
1176  spgPageIndexMultiDelete(state, current->page,
1177  toDelete, nToDelete,
1182  }
1183  else
1184  {
1185  /*
1186  * During index build there is not concurrent searches, so we
1187  * don't need to create redirection tuple.
1188  */
1189  spgPageIndexMultiDelete(state, current->page,
1190  toDelete, nToDelete,
1195  }
1196  }
1197  }
1198 
1199  /*
1200  * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1201  * nodes.
1202  */
1203  startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1204  for (i = 0; i < nToInsert; i++)
1205  {
1206  SpGistLeafTuple it = newLeafs[i];
1207  Buffer leafBuffer;
1208  BlockNumber leafBlock;
1209  OffsetNumber newoffset;
1210 
1211  /* Which page is it going to? */
1212  leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1213  leafBlock = BufferGetBlockNumber(leafBuffer);
1214 
1215  /* Link tuple into correct chain for its node */
1216  n = out.mapTuplesToNodes[i];
1217 
1218  if (ItemPointerIsValid(&nodes[n]->t_tid))
1219  {
1220  Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1221  SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
1222  }
1223  else
1225 
1226  /* Insert it on page */
1227  newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1228  (Item) it, it->size,
1229  &startOffsets[leafPageSelect[i]],
1230  false);
1231  toInsert[i] = newoffset;
1232 
1233  /* ... and complete the chain linking */
1234  ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1235 
1236  /* Also copy leaf tuple into WAL data */
1237  memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1238  leafptr += newLeafs[i]->size;
1239  }
1240 
1241  /*
1242  * We're done modifying the other leaf buffer (if any), so mark it dirty.
1243  * current->buffer will be marked below, after we're entirely done
1244  * modifying it.
1245  */
1246  if (newLeafBuffer != InvalidBuffer)
1247  {
1248  MarkBufferDirty(newLeafBuffer);
1249  }
1250 
1251  /* Remember current buffer, since we're about to change "current" */
1252  saveCurrent = *current;
1253 
1254  /*
1255  * Store the new innerTuple
1256  */
1257  if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1258  {
1259  /*
1260  * new inner tuple goes to parent page
1261  */
1262  Assert(current->buffer != parent->buffer);
1263 
1264  /* Repoint "current" at the new inner tuple */
1265  current->blkno = parent->blkno;
1266  current->buffer = parent->buffer;
1267  current->page = parent->page;
1268  xlrec.offnumInner = current->offnum =
1269  SpGistPageAddNewItem(state, current->page,
1270  (Item) innerTuple, innerTuple->size,
1271  NULL, false);
1272 
1273  /*
1274  * Update parent node link and mark parent page dirty
1275  */
1276  xlrec.innerIsParent = true;
1277  xlrec.offnumParent = parent->offnum;
1278  xlrec.nodeI = parent->node;
1279  saveNodeLink(index, parent, current->blkno, current->offnum);
1280 
1281  /*
1282  * Update redirection link (in old current buffer)
1283  */
1284  if (redirectTuplePos != InvalidOffsetNumber)
1285  setRedirectionTuple(&saveCurrent, redirectTuplePos,
1286  current->blkno, current->offnum);
1287 
1288  /* Done modifying old current buffer, mark it dirty */
1289  MarkBufferDirty(saveCurrent.buffer);
1290  }
1291  else if (parent->buffer != InvalidBuffer)
1292  {
1293  /*
1294  * new inner tuple will be stored on a new page
1295  */
1296  Assert(newInnerBuffer != InvalidBuffer);
1297 
1298  /* Repoint "current" at the new inner tuple */
1299  current->buffer = newInnerBuffer;
1300  current->blkno = BufferGetBlockNumber(current->buffer);
1301  current->page = BufferGetPage(current->buffer);
1302  xlrec.offnumInner = current->offnum =
1303  SpGistPageAddNewItem(state, current->page,
1304  (Item) innerTuple, innerTuple->size,
1305  NULL, false);
1306 
1307  /* Done modifying new current buffer, mark it dirty */
1308  MarkBufferDirty(current->buffer);
1309 
1310  /*
1311  * Update parent node link and mark parent page dirty
1312  */
1313  xlrec.innerIsParent = (parent->buffer == current->buffer);
1314  xlrec.offnumParent = parent->offnum;
1315  xlrec.nodeI = parent->node;
1316  saveNodeLink(index, parent, current->blkno, current->offnum);
1317 
1318  /*
1319  * Update redirection link (in old current buffer)
1320  */
1321  if (redirectTuplePos != InvalidOffsetNumber)
1322  setRedirectionTuple(&saveCurrent, redirectTuplePos,
1323  current->blkno, current->offnum);
1324 
1325  /* Done modifying old current buffer, mark it dirty */
1326  MarkBufferDirty(saveCurrent.buffer);
1327  }
1328  else
1329  {
1330  /*
1331  * Splitting root page, which was a leaf but now becomes inner page
1332  * (and so "current" continues to point at it)
1333  */
1334  Assert(SpGistBlockIsRoot(current->blkno));
1335  Assert(redirectTuplePos == InvalidOffsetNumber);
1336 
1337  SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1338  xlrec.initInner = true;
1339  xlrec.innerIsParent = false;
1340 
1341  xlrec.offnumInner = current->offnum =
1342  PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1343  InvalidOffsetNumber, false, false);
1344  if (current->offnum != FirstOffsetNumber)
1345  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1346  innerTuple->size);
1347 
1348  /* No parent link to update, nor redirection to do */
1350  xlrec.nodeI = 0;
1351 
1352  /* Done modifying new current buffer, mark it dirty */
1353  MarkBufferDirty(current->buffer);
1354 
1355  /* saveCurrent doesn't represent a different buffer */
1356  saveCurrent.buffer = InvalidBuffer;
1357  }
1358 
1359  if (RelationNeedsWAL(index) && !state->isBuild)
1360  {
1361  XLogRecPtr recptr;
1362  int flags;
1363 
1364  XLogBeginInsert();
1365 
1366  xlrec.nInsert = nToInsert;
1367  XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
1368 
1369  XLogRegisterData((char *) toDelete,
1370  sizeof(OffsetNumber) * xlrec.nDelete);
1371  XLogRegisterData((char *) toInsert,
1372  sizeof(OffsetNumber) * xlrec.nInsert);
1373  XLogRegisterData((char *) leafPageSelect,
1374  sizeof(uint8) * xlrec.nInsert);
1375  XLogRegisterData((char *) innerTuple, innerTuple->size);
1376  XLogRegisterData(leafdata, leafptr - leafdata);
1377 
1378  /* Old leaf page */
1379  if (BufferIsValid(saveCurrent.buffer))
1380  {
1381  flags = REGBUF_STANDARD;
1382  if (xlrec.initSrc)
1383  flags |= REGBUF_WILL_INIT;
1384  XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1385  }
1386 
1387  /* New leaf page */
1388  if (BufferIsValid(newLeafBuffer))
1389  {
1390  flags = REGBUF_STANDARD;
1391  if (xlrec.initDest)
1392  flags |= REGBUF_WILL_INIT;
1393  XLogRegisterBuffer(1, newLeafBuffer, flags);
1394  }
1395 
1396  /* Inner page */
1397  flags = REGBUF_STANDARD;
1398  if (xlrec.initInner)
1399  flags |= REGBUF_WILL_INIT;
1400  XLogRegisterBuffer(2, current->buffer, flags);
1401 
1402  /* Parent page, if different from inner page */
1403  if (parent->buffer != InvalidBuffer)
1404  {
1405  if (parent->buffer != current->buffer)
1407  else
1408  Assert(xlrec.innerIsParent);
1409  }
1410 
1411  /* Issue the WAL record */
1412  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1413 
1414  /* Update page LSNs on all affected pages */
1415  if (newLeafBuffer != InvalidBuffer)
1416  {
1417  Page page = BufferGetPage(newLeafBuffer);
1418 
1419  PageSetLSN(page, recptr);
1420  }
1421 
1422  if (saveCurrent.buffer != InvalidBuffer)
1423  {
1424  Page page = BufferGetPage(saveCurrent.buffer);
1425 
1426  PageSetLSN(page, recptr);
1427  }
1428 
1429  PageSetLSN(current->page, recptr);
1430 
1431  if (parent->buffer != InvalidBuffer)
1432  {
1433  PageSetLSN(parent->page, recptr);
1434  }
1435  }
1436 
1437  END_CRIT_SECTION();
1438 
1439  /* Update local free-space cache and unlock buffers */
1440  if (newLeafBuffer != InvalidBuffer)
1441  {
1442  SpGistSetLastUsedPage(index, newLeafBuffer);
1443  UnlockReleaseBuffer(newLeafBuffer);
1444  }
1445  if (saveCurrent.buffer != InvalidBuffer)
1446  {
1447  SpGistSetLastUsedPage(index, saveCurrent.buffer);
1448  UnlockReleaseBuffer(saveCurrent.buffer);
1449  }
1450 
1451  return insertedNew;
1452 }
1453 
1454 /*
1455  * spgMatchNode action: descend to N'th child node of current inner tuple
1456  */
1457 static void
1459  SpGistInnerTuple innerTuple,
1460  SPPageDesc *current, SPPageDesc *parent, int nodeN)
1461 {
1462  int i;
1464 
1465  /* Release previous parent buffer if any */
1466  if (parent->buffer != InvalidBuffer &&
1467  parent->buffer != current->buffer)
1468  {
1469  SpGistSetLastUsedPage(index, parent->buffer);
1470  UnlockReleaseBuffer(parent->buffer);
1471  }
1472 
1473  /* Repoint parent to specified node of current inner tuple */
1474  parent->blkno = current->blkno;
1475  parent->buffer = current->buffer;
1476  parent->page = current->page;
1477  parent->offnum = current->offnum;
1478  parent->node = nodeN;
1479 
1480  /* Locate that node */
1481  SGITITERATE(innerTuple, i, node)
1482  {
1483  if (i == nodeN)
1484  break;
1485  }
1486 
1487  if (i != nodeN)
1488  elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1489  nodeN);
1490 
1491  /* Point current to the downlink location, if any */
1492  if (ItemPointerIsValid(&node->t_tid))
1493  {
1494  current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1495  current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1496  }
1497  else
1498  {
1499  /* Downlink is empty, so we'll need to find a new page */
1500  current->blkno = InvalidBlockNumber;
1501  current->offnum = InvalidOffsetNumber;
1502  }
1503 
1504  current->buffer = InvalidBuffer;
1505  current->page = NULL;
1506 }
1507 
1508 /*
1509  * spgAddNode action: add a node to the inner tuple at current
1510  */
1511 static void
1513  SpGistInnerTuple innerTuple,
1514  SPPageDesc *current, SPPageDesc *parent,
1515  int nodeN, Datum nodeLabel)
1516 {
1517  SpGistInnerTuple newInnerTuple;
1518  spgxlogAddNode xlrec;
1519 
1520  /* Should not be applied to nulls */
1521  Assert(!SpGistPageStoresNulls(current->page));
1522 
1523  /* Construct new inner tuple with additional node */
1524  newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1525 
1526  /* Prepare WAL record */
1527  STORE_STATE(state, xlrec.stateSrc);
1528  xlrec.offnum = current->offnum;
1529 
1530  /* we don't fill these unless we need to change the parent downlink */
1531  xlrec.parentBlk = -1;
1533  xlrec.nodeI = 0;
1534 
1535  /* we don't fill these unless tuple has to be moved */
1537  xlrec.newPage = false;
1538 
1539  if (PageGetExactFreeSpace(current->page) >=
1540  newInnerTuple->size - innerTuple->size)
1541  {
1542  /*
1543  * We can replace the inner tuple by new version in-place
1544  */
1546 
1547  PageIndexTupleDelete(current->page, current->offnum);
1548  if (PageAddItem(current->page,
1549  (Item) newInnerTuple, newInnerTuple->size,
1550  current->offnum, false, false) != current->offnum)
1551  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1552  newInnerTuple->size);
1553 
1554  MarkBufferDirty(current->buffer);
1555 
1556  if (RelationNeedsWAL(index) && !state->isBuild)
1557  {
1558  XLogRecPtr recptr;
1559 
1560  XLogBeginInsert();
1561  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1562  XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1563 
1565 
1566  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1567 
1568  PageSetLSN(current->page, recptr);
1569  }
1570 
1571  END_CRIT_SECTION();
1572  }
1573  else
1574  {
1575  /*
1576  * move inner tuple to another page, and update parent
1577  */
1578  SpGistDeadTuple dt;
1579  SPPageDesc saveCurrent;
1580 
1581  /*
1582  * It should not be possible to get here for the root page, since we
1583  * allow only one inner tuple on the root page, and spgFormInnerTuple
1584  * always checks that inner tuples don't exceed the size of a page.
1585  */
1586  if (SpGistBlockIsRoot(current->blkno))
1587  elog(ERROR, "cannot enlarge root tuple any more");
1588  Assert(parent->buffer != InvalidBuffer);
1589 
1590  saveCurrent = *current;
1591 
1592  xlrec.offnumParent = parent->offnum;
1593  xlrec.nodeI = parent->node;
1594 
1595  /*
1596  * obtain new buffer with the same parity as current, since it will be
1597  * a child of same parent tuple
1598  */
1599  current->buffer = SpGistGetBuffer(index,
1600  GBUF_INNER_PARITY(current->blkno),
1601  newInnerTuple->size + sizeof(ItemIdData),
1602  &xlrec.newPage);
1603  current->blkno = BufferGetBlockNumber(current->buffer);
1604  current->page = BufferGetPage(current->buffer);
1605 
1606  /*
1607  * Let's just make real sure new current isn't same as old. Right now
1608  * that's impossible, but if SpGistGetBuffer ever got smart enough to
1609  * delete placeholder tuples before checking space, maybe it wouldn't
1610  * be impossible. The case would appear to work except that WAL
1611  * replay would be subtly wrong, so I think a mere assert isn't enough
1612  * here.
1613  */
1614  if (current->blkno == saveCurrent.blkno)
1615  elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1616 
1617  /*
1618  * New current and parent buffer will both be modified; but note that
1619  * parent buffer could be same as either new or old current.
1620  */
1621  if (parent->buffer == saveCurrent.buffer)
1622  xlrec.parentBlk = 0;
1623  else if (parent->buffer == current->buffer)
1624  xlrec.parentBlk = 1;
1625  else
1626  xlrec.parentBlk = 2;
1627 
1629 
1630  /* insert new ... */
1631  xlrec.offnumNew = current->offnum =
1632  SpGistPageAddNewItem(state, current->page,
1633  (Item) newInnerTuple, newInnerTuple->size,
1634  NULL, false);
1635 
1636  MarkBufferDirty(current->buffer);
1637 
1638  /* update parent's downlink and mark parent page dirty */
1639  saveNodeLink(index, parent, current->blkno, current->offnum);
1640 
1641  /*
1642  * Replace old tuple with a placeholder or redirection tuple. Unless
1643  * doing an index build, we have to insert a redirection tuple for
1644  * possible concurrent scans. We can't just delete it in any case,
1645  * because that could change the offsets of other tuples on the page,
1646  * breaking downlinks from their parents.
1647  */
1648  if (state->isBuild)
1651  else
1652  dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1653  current->blkno, current->offnum);
1654 
1655  PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1656  if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1657  saveCurrent.offnum,
1658  false, false) != saveCurrent.offnum)
1659  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1660  dt->size);
1661 
1662  if (state->isBuild)
1663  SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1664  else
1665  SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1666 
1667  MarkBufferDirty(saveCurrent.buffer);
1668 
1669  if (RelationNeedsWAL(index) && !state->isBuild)
1670  {
1671  XLogRecPtr recptr;
1672  int flags;
1673 
1674  XLogBeginInsert();
1675 
1676  /* orig page */
1677  XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1678  /* new page */
1679  flags = REGBUF_STANDARD;
1680  if (xlrec.newPage)
1681  flags |= REGBUF_WILL_INIT;
1682  XLogRegisterBuffer(1, current->buffer, flags);
1683  /* parent page (if different from orig and new) */
1684  if (xlrec.parentBlk == 2)
1686 
1687  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1688  XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1689 
1690  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1691 
1692  /* we don't bother to check if any of these are redundant */
1693  PageSetLSN(current->page, recptr);
1694  PageSetLSN(parent->page, recptr);
1695  PageSetLSN(saveCurrent.page, recptr);
1696  }
1697 
1698  END_CRIT_SECTION();
1699 
1700  /* Release saveCurrent if it's not same as current or parent */
1701  if (saveCurrent.buffer != current->buffer &&
1702  saveCurrent.buffer != parent->buffer)
1703  {
1704  SpGistSetLastUsedPage(index, saveCurrent.buffer);
1705  UnlockReleaseBuffer(saveCurrent.buffer);
1706  }
1707  }
1708 }
1709 
1710 /*
1711  * spgSplitNode action: split inner tuple at current into prefix and postfix
1712  */
1713 static void
1715  SpGistInnerTuple innerTuple,
1716  SPPageDesc *current, spgChooseOut *out)
1717 {
1718  SpGistInnerTuple prefixTuple,
1719  postfixTuple;
1721  *nodes;
1722  BlockNumber postfixBlkno;
1723  OffsetNumber postfixOffset;
1724  int i;
1725  spgxlogSplitTuple xlrec;
1726  Buffer newBuffer = InvalidBuffer;
1727 
1728  /* Should not be applied to nulls */
1729  Assert(!SpGistPageStoresNulls(current->page));
1730 
1731  /* Check opclass gave us sane values */
1732  if (out->result.splitTuple.prefixNNodes <= 0 ||
1733  out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
1734  elog(ERROR, "invalid number of prefix nodes: %d",
1735  out->result.splitTuple.prefixNNodes);
1736  if (out->result.splitTuple.childNodeN < 0 ||
1737  out->result.splitTuple.childNodeN >=
1738  out->result.splitTuple.prefixNNodes)
1739  elog(ERROR, "invalid child node number: %d",
1740  out->result.splitTuple.childNodeN);
1741 
1742  /*
1743  * Construct new prefix tuple with requested number of nodes. We'll fill
1744  * in the childNodeN'th node's downlink below.
1745  */
1746  nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
1747  out->result.splitTuple.prefixNNodes);
1748 
1749  for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
1750  {
1751  Datum label = (Datum) 0;
1752  bool labelisnull;
1753 
1754  labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
1755  if (!labelisnull)
1756  label = out->result.splitTuple.prefixNodeLabels[i];
1757  nodes[i] = spgFormNodeTuple(state, label, labelisnull);
1758  }
1759 
1760  prefixTuple = spgFormInnerTuple(state,
1761  out->result.splitTuple.prefixHasPrefix,
1762  out->result.splitTuple.prefixPrefixDatum,
1763  out->result.splitTuple.prefixNNodes,
1764  nodes);
1765 
1766  /* it must fit in the space that innerTuple now occupies */
1767  if (prefixTuple->size > innerTuple->size)
1768  elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1769 
1770  /*
1771  * Construct new postfix tuple, containing all nodes of innerTuple with
1772  * same node datums, but with the prefix specified by the picksplit
1773  * function.
1774  */
1775  nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1776  SGITITERATE(innerTuple, i, node)
1777  {
1778  nodes[i] = node;
1779  }
1780 
1781  postfixTuple = spgFormInnerTuple(state,
1782  out->result.splitTuple.postfixHasPrefix,
1783  out->result.splitTuple.postfixPrefixDatum,
1784  innerTuple->nNodes, nodes);
1785 
1786  /* Postfix tuple is allTheSame if original tuple was */
1787  postfixTuple->allTheSame = innerTuple->allTheSame;
1788 
1789  /* prep data for WAL record */
1790  xlrec.newPage = false;
1791 
1792  /*
1793  * If we can't fit both tuples on the current page, get a new page for the
1794  * postfix tuple. In particular, can't split to the root page.
1795  *
1796  * For the space calculation, note that prefixTuple replaces innerTuple
1797  * but postfixTuple will be a new entry.
1798  */
1799  if (SpGistBlockIsRoot(current->blkno) ||
1800  SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1801  prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1802  {
1803  /*
1804  * Choose page with next triple parity, because postfix tuple is a
1805  * child of prefix one
1806  */
1807  newBuffer = SpGistGetBuffer(index,
1808  GBUF_INNER_PARITY(current->blkno + 1),
1809  postfixTuple->size + sizeof(ItemIdData),
1810  &xlrec.newPage);
1811  }
1812 
1814 
1815  /*
1816  * Replace old tuple by prefix tuple
1817  */
1818  PageIndexTupleDelete(current->page, current->offnum);
1819  xlrec.offnumPrefix = PageAddItem(current->page,
1820  (Item) prefixTuple, prefixTuple->size,
1821  current->offnum, false, false);
1822  if (xlrec.offnumPrefix != current->offnum)
1823  elog(ERROR, "failed to add item of size %u to SPGiST index page",
1824  prefixTuple->size);
1825 
1826  /*
1827  * put postfix tuple into appropriate page
1828  */
1829  if (newBuffer == InvalidBuffer)
1830  {
1831  postfixBlkno = current->blkno;
1832  xlrec.offnumPostfix = postfixOffset =
1833  SpGistPageAddNewItem(state, current->page,
1834  (Item) postfixTuple, postfixTuple->size,
1835  NULL, false);
1836  xlrec.postfixBlkSame = true;
1837  }
1838  else
1839  {
1840  postfixBlkno = BufferGetBlockNumber(newBuffer);
1841  xlrec.offnumPostfix = postfixOffset =
1842  SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
1843  (Item) postfixTuple, postfixTuple->size,
1844  NULL, false);
1845  MarkBufferDirty(newBuffer);
1846  xlrec.postfixBlkSame = false;
1847  }
1848 
1849  /*
1850  * And set downlink pointer in the prefix tuple to point to postfix tuple.
1851  * (We can't avoid this step by doing the above two steps in opposite
1852  * order, because there might not be enough space on the page to insert
1853  * the postfix tuple first.) We have to update the local copy of the
1854  * prefixTuple too, because that's what will be written to WAL.
1855  */
1856  spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1857  postfixBlkno, postfixOffset);
1858  prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1859  PageGetItemId(current->page, current->offnum));
1860  spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1861  postfixBlkno, postfixOffset);
1862 
1863  MarkBufferDirty(current->buffer);
1864 
1865  if (RelationNeedsWAL(index) && !state->isBuild)
1866  {
1867  XLogRecPtr recptr;
1868 
1869  XLogBeginInsert();
1870  XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1871  XLogRegisterData((char *) prefixTuple, prefixTuple->size);
1872  XLogRegisterData((char *) postfixTuple, postfixTuple->size);
1873 
1875  if (newBuffer != InvalidBuffer)
1876  {
1877  int flags;
1878 
1879  flags = REGBUF_STANDARD;
1880  if (xlrec.newPage)
1881  flags |= REGBUF_WILL_INIT;
1882  XLogRegisterBuffer(1, newBuffer, flags);
1883  }
1884 
1885  recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1886 
1887  PageSetLSN(current->page, recptr);
1888 
1889  if (newBuffer != InvalidBuffer)
1890  {
1891  PageSetLSN(BufferGetPage(newBuffer), recptr);
1892  }
1893  }
1894 
1895  END_CRIT_SECTION();
1896 
1897  /* Update local free-space cache and release buffer */
1898  if (newBuffer != InvalidBuffer)
1899  {
1900  SpGistSetLastUsedPage(index, newBuffer);
1901  UnlockReleaseBuffer(newBuffer);
1902  }
1903 }
1904 
1905 /*
1906  * Insert one item into the index.
1907  *
1908  * Returns true on success, false if we failed to complete the insertion
1909  * (typically because of conflict with a concurrent insert). In the latter
1910  * case, caller should re-call spgdoinsert() with the same args.
1911  */
1912 bool
1914  ItemPointer heapPtr, Datum *datums, bool *isnulls)
1915 {
1916  bool result = true;
1917  TupleDesc leafDescriptor = state->leafTupDesc;
1918  bool isnull = isnulls[spgKeyColumn];
1919  int level = 0;
1920  Datum leafDatums[INDEX_MAX_KEYS];
1921  int leafSize;
1922  int bestLeafSize;
1923  int numNoProgressCycles = 0;
1924  SPPageDesc current,
1925  parent;
1926  FmgrInfo *procinfo = NULL;
1927 
1928  /*
1929  * Look up FmgrInfo of the user-defined choose function once, to save
1930  * cycles in the loop below.
1931  */
1932  if (!isnull)
1933  procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1934 
1935  /*
1936  * Prepare the leaf datum to insert.
1937  *
1938  * If an optional "compress" method is provided, then call it to form the
1939  * leaf key datum from the input datum. Otherwise, store the input datum
1940  * as is. Since we don't use index_form_tuple in this AM, we have to make
1941  * sure value to be inserted is not toasted; FormIndexDatum doesn't
1942  * guarantee that. But we assume the "compress" method to return an
1943  * untoasted value.
1944  */
1945  if (!isnull)
1946  {
1948  {
1949  FmgrInfo *compressProcinfo = NULL;
1950 
1951  compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
1952  leafDatums[spgKeyColumn] =
1953  FunctionCall1Coll(compressProcinfo,
1954  index->rd_indcollation[spgKeyColumn],
1955  datums[spgKeyColumn]);
1956  }
1957  else
1958  {
1959  Assert(state->attLeafType.type == state->attType.type);
1960 
1961  if (state->attType.attlen == -1)
1962  leafDatums[spgKeyColumn] =
1964  else
1965  leafDatums[spgKeyColumn] = datums[spgKeyColumn];
1966  }
1967  }
1968  else
1969  leafDatums[spgKeyColumn] = (Datum) 0;
1970 
1971  /* Likewise, ensure that any INCLUDE values are not toasted */
1972  for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++)
1973  {
1974  if (!isnulls[i])
1975  {
1976  if (TupleDescAttr(leafDescriptor, i)->attlen == -1)
1977  leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i]));
1978  else
1979  leafDatums[i] = datums[i];
1980  }
1981  else
1982  leafDatums[i] = (Datum) 0;
1983  }
1984 
1985  /*
1986  * Compute space needed for a leaf tuple containing the given data.
1987  */
1988  leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls);
1989  /* Account for an item pointer, too */
1990  leafSize += sizeof(ItemIdData);
1991 
1992  /*
1993  * If it isn't gonna fit, and the opclass can't reduce the datum size by
1994  * suffixing, bail out now rather than doing a lot of useless work.
1995  */
1996  if (leafSize > SPGIST_PAGE_CAPACITY &&
1997  (isnull || !state->config.longValuesOK))
1998  ereport(ERROR,
1999  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2000  errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2001  leafSize - sizeof(ItemIdData),
2002  SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2003  RelationGetRelationName(index)),
2004  errhint("Values larger than a buffer page cannot be indexed.")));
2005  bestLeafSize = leafSize;
2006 
2007  /* Initialize "current" to the appropriate root page */
2008  current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
2009  current.buffer = InvalidBuffer;
2010  current.page = NULL;
2011  current.offnum = FirstOffsetNumber;
2012  current.node = -1;
2013 
2014  /* "parent" is invalid for the moment */
2015  parent.blkno = InvalidBlockNumber;
2016  parent.buffer = InvalidBuffer;
2017  parent.page = NULL;
2018  parent.offnum = InvalidOffsetNumber;
2019  parent.node = -1;
2020 
2021  /*
2022  * Before entering the loop, try to clear any pending interrupt condition.
2023  * If a query cancel is pending, we might as well accept it now not later;
2024  * while if a non-canceling condition is pending, servicing it here avoids
2025  * having to restart the insertion and redo all the work so far.
2026  */
2028 
2029  for (;;)
2030  {
2031  bool isNew = false;
2032 
2033  /*
2034  * Bail out if query cancel is pending. We must have this somewhere
2035  * in the loop since a broken opclass could produce an infinite
2036  * picksplit loop. However, because we'll be holding buffer lock(s)
2037  * after the first iteration, ProcessInterrupts() wouldn't be able to
2038  * throw a cancel error here. Hence, if we see that an interrupt is
2039  * pending, break out of the loop and deal with the situation below.
2040  * Set result = false because we must restart the insertion if the
2041  * interrupt isn't a query-cancel-or-die case.
2042  */
2044  {
2045  result = false;
2046  break;
2047  }
2048 
2049  if (current.blkno == InvalidBlockNumber)
2050  {
2051  /*
2052  * Create a leaf page. If leafSize is too large to fit on a page,
2053  * we won't actually use the page yet, but it simplifies the API
2054  * for doPickSplit to always have a leaf page at hand; so just
2055  * quietly limit our request to a page size.
2056  */
2057  current.buffer =
2058  SpGistGetBuffer(index,
2059  GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
2060  Min(leafSize, SPGIST_PAGE_CAPACITY),
2061  &isNew);
2062  current.blkno = BufferGetBlockNumber(current.buffer);
2063  }
2064  else if (parent.buffer == InvalidBuffer)
2065  {
2066  /* we hold no parent-page lock, so no deadlock is possible */
2067  current.buffer = ReadBuffer(index, current.blkno);
2069  }
2070  else if (current.blkno != parent.blkno)
2071  {
2072  /* descend to a new child page */
2073  current.buffer = ReadBuffer(index, current.blkno);
2074 
2075  /*
2076  * Attempt to acquire lock on child page. We must beware of
2077  * deadlock against another insertion process descending from that
2078  * page to our parent page (see README). If we fail to get lock,
2079  * abandon the insertion and tell our caller to start over.
2080  *
2081  * XXX this could be improved, because failing to get lock on a
2082  * buffer is not proof of a deadlock situation; the lock might be
2083  * held by a reader, or even just background writer/checkpointer
2084  * process. Perhaps it'd be worth retrying after sleeping a bit?
2085  */
2086  if (!ConditionalLockBuffer(current.buffer))
2087  {
2088  ReleaseBuffer(current.buffer);
2089  UnlockReleaseBuffer(parent.buffer);
2090  return false;
2091  }
2092  }
2093  else
2094  {
2095  /* inner tuple can be stored on the same page as parent one */
2096  current.buffer = parent.buffer;
2097  }
2098  current.page = BufferGetPage(current.buffer);
2099 
2100  /* should not arrive at a page of the wrong type */
2101  if (isnull ? !SpGistPageStoresNulls(current.page) :
2102  SpGistPageStoresNulls(current.page))
2103  elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2104  current.blkno);
2105 
2106  if (SpGistPageIsLeaf(current.page))
2107  {
2108  SpGistLeafTuple leafTuple;
2109  int nToSplit,
2110  sizeToSplit;
2111 
2112  leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls);
2113  if (leafTuple->size + sizeof(ItemIdData) <=
2114  SpGistPageGetFreeSpace(current.page, 1))
2115  {
2116  /* it fits on page, so insert it and we're done */
2117  addLeafTuple(index, state, leafTuple,
2118  &current, &parent, isnull, isNew);
2119  break;
2120  }
2121  else if ((sizeToSplit =
2122  checkSplitConditions(index, state, &current,
2123  &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
2124  nToSplit < 64 &&
2125  leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2126  {
2127  /*
2128  * the amount of data is pretty small, so just move the whole
2129  * chain to another leaf page rather than splitting it.
2130  */
2131  Assert(!isNew);
2132  moveLeafs(index, state, &current, &parent, leafTuple, isnull);
2133  break; /* we're done */
2134  }
2135  else
2136  {
2137  /* picksplit */
2138  if (doPickSplit(index, state, &current, &parent,
2139  leafTuple, level, isnull, isNew))
2140  break; /* doPickSplit installed new tuples */
2141 
2142  /* leaf tuple will not be inserted yet */
2143  pfree(leafTuple);
2144 
2145  /*
2146  * current now describes new inner tuple, go insert into it
2147  */
2148  Assert(!SpGistPageIsLeaf(current.page));
2149  goto process_inner_tuple;
2150  }
2151  }
2152  else /* non-leaf page */
2153  {
2154  /*
2155  * Apply the opclass choose function to figure out how to insert
2156  * the given datum into the current inner tuple.
2157  */
2158  SpGistInnerTuple innerTuple;
2159  spgChooseIn in;
2160  spgChooseOut out;
2161 
2162  /*
2163  * spgAddNode and spgSplitTuple cases will loop back to here to
2164  * complete the insertion operation. Just in case the choose
2165  * function is broken and produces add or split requests
2166  * repeatedly, check for query cancel (see comments above).
2167  */
2168  process_inner_tuple:
2170  {
2171  result = false;
2172  break;
2173  }
2174 
2175  innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2176  PageGetItemId(current.page, current.offnum));
2177 
2178  in.datum = datums[spgKeyColumn];
2179  in.leafDatum = leafDatums[spgKeyColumn];
2180  in.level = level;
2181  in.allTheSame = innerTuple->allTheSame;
2182  in.hasPrefix = (innerTuple->prefixSize > 0);
2183  in.prefixDatum = SGITDATUM(innerTuple, state);
2184  in.nNodes = innerTuple->nNodes;
2185  in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2186 
2187  memset(&out, 0, sizeof(out));
2188 
2189  if (!isnull)
2190  {
2191  /* use user-defined choose method */
2192  FunctionCall2Coll(procinfo,
2193  index->rd_indcollation[0],
2194  PointerGetDatum(&in),
2195  PointerGetDatum(&out));
2196  }
2197  else
2198  {
2199  /* force "match" action (to insert to random subnode) */
2200  out.resultType = spgMatchNode;
2201  }
2202 
2203  if (innerTuple->allTheSame)
2204  {
2205  /*
2206  * It's not allowed to do an AddNode at an allTheSame tuple.
2207  * Opclass must say "match", in which case we choose a random
2208  * one of the nodes to descend into, or "split".
2209  */
2210  if (out.resultType == spgAddNode)
2211  elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2212  else if (out.resultType == spgMatchNode)
2213  out.result.matchNode.nodeN = random() % innerTuple->nNodes;
2214  }
2215 
2216  switch (out.resultType)
2217  {
2218  case spgMatchNode:
2219  /* Descend to N'th child node */
2220  spgMatchNodeAction(index, state, innerTuple,
2221  &current, &parent,
2222  out.result.matchNode.nodeN);
2223  /* Adjust level as per opclass request */
2224  level += out.result.matchNode.levelAdd;
2225  /* Replace leafDatum and recompute leafSize */
2226  if (!isnull)
2227  {
2228  leafDatums[spgKeyColumn] = out.result.matchNode.restDatum;
2229  leafSize = SpGistGetLeafTupleSize(leafDescriptor,
2230  leafDatums, isnulls);
2231  leafSize += sizeof(ItemIdData);
2232  }
2233 
2234  /*
2235  * Check new tuple size; fail if it can't fit, unless the
2236  * opclass says it can handle the situation by suffixing.
2237  *
2238  * However, the opclass can only shorten the leaf datum,
2239  * which may not be enough to ever make the tuple fit,
2240  * since INCLUDE columns might alone use more than a page.
2241  * Depending on the opclass' behavior, that could lead to
2242  * an infinite loop --- spgtextproc.c, for example, will
2243  * just repeatedly generate an empty-string leaf datum
2244  * once it runs out of data. Actual bugs in opclasses
2245  * might cause infinite looping, too. To detect such a
2246  * loop, check to see if we are making progress by
2247  * reducing the leafSize in each pass. This is a bit
2248  * tricky though. Because of alignment considerations,
2249  * the total tuple size might not decrease on every pass.
2250  * Also, there are edge cases where the choose method
2251  * might seem to not make progress for a cycle or two.
2252  * Somewhat arbitrarily, we allow up to 10 no-progress
2253  * iterations before failing. (This limit should be more
2254  * than MAXALIGN, to accommodate opclasses that trim one
2255  * byte from the leaf datum per pass.)
2256  */
2257  if (leafSize > SPGIST_PAGE_CAPACITY)
2258  {
2259  bool ok = false;
2260 
2261  if (state->config.longValuesOK && !isnull)
2262  {
2263  if (leafSize < bestLeafSize)
2264  {
2265  ok = true;
2266  bestLeafSize = leafSize;
2267  numNoProgressCycles = 0;
2268  }
2269  else if (++numNoProgressCycles < 10)
2270  ok = true;
2271  }
2272  if (!ok)
2273  ereport(ERROR,
2274  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2275  errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2276  leafSize - sizeof(ItemIdData),
2277  SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2278  RelationGetRelationName(index)),
2279  errhint("Values larger than a buffer page cannot be indexed.")));
2280  }
2281 
2282  /*
2283  * Loop around and attempt to insert the new leafDatum at
2284  * "current" (which might reference an existing child
2285  * tuple, or might be invalid to force us to find a new
2286  * page for the tuple).
2287  */
2288  break;
2289  case spgAddNode:
2290  /* AddNode is not sensible if nodes don't have labels */
2291  if (in.nodeLabels == NULL)
2292  elog(ERROR, "cannot add a node to an inner tuple without node labels");
2293  /* Add node to inner tuple, per request */
2294  spgAddNodeAction(index, state, innerTuple,
2295  &current, &parent,
2296  out.result.addNode.nodeN,
2297  out.result.addNode.nodeLabel);
2298 
2299  /*
2300  * Retry insertion into the enlarged node. We assume that
2301  * we'll get a MatchNode result this time.
2302  */
2303  goto process_inner_tuple;
2304  break;
2305  case spgSplitTuple:
2306  /* Split inner tuple, per request */
2307  spgSplitNodeAction(index, state, innerTuple,
2308  &current, &out);
2309 
2310  /* Retry insertion into the split node */
2311  goto process_inner_tuple;
2312  break;
2313  default:
2314  elog(ERROR, "unrecognized SPGiST choose result: %d",
2315  (int) out.resultType);
2316  break;
2317  }
2318  }
2319  } /* end loop */
2320 
2321  /*
2322  * Release any buffers we're still holding. Beware of possibility that
2323  * current and parent reference same buffer.
2324  */
2325  if (current.buffer != InvalidBuffer)
2326  {
2327  SpGistSetLastUsedPage(index, current.buffer);
2328  UnlockReleaseBuffer(current.buffer);
2329  }
2330  if (parent.buffer != InvalidBuffer &&
2331  parent.buffer != current.buffer)
2332  {
2333  SpGistSetLastUsedPage(index, parent.buffer);
2334  UnlockReleaseBuffer(parent.buffer);
2335  }
2336 
2337  /*
2338  * We do not support being called while some outer function is holding a
2339  * buffer lock (or any other reason to postpone query cancels). If that
2340  * were the case, telling the caller to retry would create an infinite
2341  * loop.
2342  */
2344 
2345  /*
2346  * Finally, check for interrupts again. If there was a query cancel,
2347  * ProcessInterrupts() will be able to throw the error here. If it was
2348  * some other kind of interrupt that can just be cleared, return false to
2349  * tell our caller to retry.
2350  */
2352 
2353  return result;
2354 }
#define XLOG_SPGIST_MOVE_LEAFS
Definition: spgxlog.h:23
#define ItemPointerIsValid(pointer)
Definition: itemptr.h:82
#define SPGIST_DEAD
OffsetNumber offnumPostfix
Definition: spgxlog.h:147
Definition: fmgr.h:56
void spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN, BlockNumber blkno, OffsetNumber offset)
Definition: spgdoinsert.c:50
Datum * spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple)
Definition: spgutils.c:1130
uint16 nDelete
Definition: spgxlog.h:169
static void saveNodeLink(Relation index, SPPageDesc *parent, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:186
OffsetNumber offnumParent
Definition: spgxlog.h:53
Datum datum
Definition: spgist.h:55
SpGistInnerTupleData * SpGistInnerTuple
#define SpGistPageIsLeaf(page)
SpGistDeadTuple spgFormDeadTuple(SpGistState *state, int tupstate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgutils.c:1054
bool hasPrefix
Definition: spgist.h:61
int errhint(const char *fmt,...)
Definition: elog.c:1156
#define SpGistPageGetFreeSpace(p, n)
FmgrInfo * index_getprocinfo(Relation irel, AttrNumber attnum, uint16 procnum)
Definition: indexam.c:803
#define XLOG_SPGIST_SPLIT_TUPLE
Definition: spgxlog.h:25
#define SGDTSIZE
#define SGLT_SET_NEXTOFFSET(spgLeafTuple, offsetNumber)
int level
Definition: spgist.h:57
#define SPGIST_REDIRECT
Datum * leafTupleDatums
Definition: spgist.h:126
Datum * datums
Definition: spgist.h:113
OffsetNumber offnumLeaf
Definition: spgxlog.h:50
void PageIndexTupleDelete(Page page, OffsetNumber offnum)
Definition: bufpage.c:1045
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:1565
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:232
#define PointerGetDatum(X)
Definition: postgres.h:600
uint16 nodeI
Definition: spgxlog.h:73
long random(void)
Definition: random.c:22
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define SGITITERATE(x, i, nt)
spgxlogState stateSrc
Definition: spgxlog.h:185
#define SizeOfSpgxlogMoveLeafs
Definition: spgxlog.h:91
#define SizeOfSpgxlogPickSplit
Definition: spgxlog.h:199
SpGistTypeDesc attLeafType
ItemPointerData t_tid
Definition: itup.h:37
#define SPGIST_PLACEHOLDER
#define Min(x, y)
Definition: c.h:986
SpGistTypeDesc attType
#define END_CRIT_SECTION()
Definition: miscadmin.h:149
#define SPGIST_NULLS
void SpGistSetLastUsedPage(Relation index, Buffer buffer)
Definition: spgutils.c:641
unsigned char uint8
Definition: c.h:439
Pointer Item
Definition: item.h:17
#define InvalidBuffer
Definition: buf.h:25
#define REGBUF_WILL_INIT
Definition: xloginsert.h:33
Datum prefixDatum
Definition: spgist.h:62
#define START_CRIT_SECTION()
Definition: miscadmin.h:147
int errcode(int sqlerrcode)
Definition: elog.c:698
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:416
#define XLOG_SPGIST_ADD_NODE
Definition: spgxlog.h:24
uint32 BlockNumber
Definition: block.h:31
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3768
#define XLOG_SPGIST_ADD_LEAF
Definition: spgxlog.h:22
uint16 nInsert
Definition: spgxlog.h:170
bool innerIsParent
Definition: spgxlog.h:181
struct spgChooseOut::@45::@47 addNode
#define BUFFER_LOCK_EXCLUSIVE
Definition: bufmgr.h:98
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1148
#define SPGIST_ROOT_BLKNO
#define OidIsValid(objectId)
Definition: c.h:710
#define PageGetMaxOffsetNumber(page)
Definition: bufpage.h:357
union spgChooseOut::@45 result
OffsetNumber offnum
Definition: spgxlog.h:105
uint16 OffsetNumber
Definition: off.h:24
#define SGITDATUM(x, s)
OffsetNumber offnumInner
Definition: spgxlog.h:175
Definition: type.h:89
#define XLOG_SPGIST_PICKSPLIT
Definition: spgxlog.h:26
#define INTERRUPTS_CAN_BE_PROCESSED()
Definition: miscadmin.h:127
static void addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
Definition: spgdoinsert.c:203
spgConfigOut config
static SpGistInnerTuple addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
Definition: spgdoinsert.c:78
struct spgChooseOut::@45::@46 matchNode
OffsetNumber offnumNew
Definition: spgxlog.h:111
struct SPPageDesc SPPageDesc
void pfree(void *pointer)
Definition: mcxt.c:1169
unsigned int allTheSame
static bool doPickSplit(Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, int level, bool isNulls, bool isNew)
Definition: spgdoinsert.c:676
void spgPageIndexMultiDelete(SpGistState *state, Page page, OffsetNumber *itemnos, int nitems, int firststate, int reststate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:131
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3791
Oid * rd_indcollation
Definition: rel.h:212
ItemPointerData pointer
#define ERROR
Definition: elog.h:46
#define INTERRUPTS_PENDING_CONDITION()
Definition: miscadmin.h:111
Datum * nodeLabels
Definition: spgist.h:123
#define GBUF_INNER_PARITY(x)
unsigned int prefixSize
bool newPage
Definition: spgxlog.h:48
spgxlogState stateSrc
Definition: spgxlog.h:75
struct SpGistLeafTupleData * SpGistLeafTuple
OffsetNumber offnumParent
Definition: spgxlog.h:182
uint16 nodeI
Definition: spgxlog.h:183
#define SPGIST_NULL_BLKNO
#define FirstOffsetNumber
Definition: off.h:27
#define REGBUF_STANDARD
Definition: xloginsert.h:35
SpGistInnerTuple spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix, int nNodes, SpGistNodeTuple *nodes)
Definition: spgutils.c:971
spgxlogState stateSrc
Definition: spgxlog.h:130
#define SPGIST_METAPAGE_BLKNO
OffsetNumber offnum
Definition: spgdoinsert.c:39
SpGistDeadTupleData * SpGistDeadTuple
static void spgSplitNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, spgChooseOut *out)
Definition: spgdoinsert.c:1714
#define RelationGetRelationName(relation)
Definition: rel.h:511
static void setRedirectionTuple(SPPageDesc *current, OffsetNumber position, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:567
struct ItemIdData ItemIdData
static bool checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig, bool *includeNew)
Definition: spgdoinsert.c:598
Size SpGistGetLeafTupleSize(TupleDesc tupleDescriptor, Datum *datums, bool *isnulls)
Definition: spgutils.c:787
void SpGistInitBuffer(Buffer b, uint16 f)
Definition: spgutils.c:690
#define BufferGetPage(buffer)
Definition: bufmgr.h:169
bool replaceDead
Definition: spgxlog.h:68
#define SGITMAXNNODES
bool ConditionalLockBuffer(Buffer buffer)
Definition: bufmgr.c:4033
int8 parentBlk
Definition: spgxlog.h:125
int nNodes
Definition: spgist.h:63
uint16 nodeI
Definition: spgxlog.h:128
#define GBUF_LEAF
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:235
bool spgdoinsert(Relation index, SpGistState *state, ItemPointer heapPtr, Datum *datums, bool *isnulls)
Definition: spgdoinsert.c:1913
#define spgKeyColumn
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:340
OffsetNumber offnumPrefix
Definition: spgxlog.h:144
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:434
#define spgFirstIncludeColumn
unsigned int tupstate
void * palloc0(Size size)
Definition: mcxt.c:1093
unsigned int tupstate
bool newPage
Definition: spgxlog.h:112
bool longValuesOK
Definition: spgist.h:47
uintptr_t Datum
Definition: postgres.h:411
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
Definition: fmgr.c:1128
OffsetNumber offnumHeadLeaf
Definition: spgxlog.h:51
static char * label
void LockBuffer(Buffer buffer, int mode)
Definition: bufmgr.c:4007
#define SPGIST_PAGE_CAPACITY
bool storesNulls
Definition: spgxlog.h:49
#define STORE_STATE(s, d)
spgChooseResultType resultType
Definition: spgist.h:76
#define InvalidOffsetNumber
Definition: off.h:26
uint16 nodeI
Definition: spgxlog.h:54
#define ereport(elevel,...)
Definition: elog.h:157
bool postfixBlkSame
Definition: spgxlog.h:149
#define SpGistPageStoresNulls(page)
OffsetNumber offnumParent
Definition: spgxlog.h:126
static void spgMatchNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN)
Definition: spgdoinsert.c:1458
TupleDesc leafTupDesc
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
Datum leafDatum
Definition: spgist.h:56
Definition: regguts.h:317
Buffer SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
Definition: spgutils.c:537
void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
Definition: bufpage.c:1154
#define SPGIST_COMPRESS_PROC
Definition: spgist.h:28
bool hasPrefix
Definition: spgist.h:119
Buffer ReadBuffer(Relation reln, BlockNumber blockNum)
Definition: bufmgr.c:694
#define SpGistBlockIsRoot(blkno)
#define INDEX_MAX_KEYS
#define SPGIST_LIVE
#define InvalidBlockNumber
Definition: block.h:33
#define SGLTDATUM(x, s)
#define BufferIsValid(bufnum)
Definition: bufmgr.h:123
#define GBUF_NULLS
#define ItemPointerGetOffsetNumber(pointer)
Definition: itemptr.h:117
#define RelationNeedsWAL(relation)
Definition: rel.h:601
Size PageGetExactFreeSpace(Page page)
Definition: bufpage.c:951
#define SpGistPageGetOpaque(page)
static void spgAddNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN, Datum nodeLabel)
Definition: spgdoinsert.c:1512
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:2748
Datum * nodeLabels
Definition: spgist.h:64
#define MaxIndexTuplesPerPage
Definition: itup.h:145
void * palloc(Size size)
Definition: mcxt.c:1062
int errmsg(const char *fmt,...)
Definition: elog.c:909
SpGistLeafTuple spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, Datum *datums, bool *isnulls)
Definition: spgutils.c:840
#define SGLT_GET_NEXTOFFSET(spgLeafTuple)
#define elog(elevel,...)
Definition: elog.h:232
int i
OffsetNumber offnumParent
Definition: spgxlog.h:72
int * mapTuplesToNodes
Definition: spgist.h:125
void spgDeformLeafTuple(SpGistLeafTuple tup, TupleDesc tupleDescriptor, Datum *datums, bool *isnulls, bool keyColumnIsNull)
Definition: spgutils.c:1085
bool allTheSame
Definition: spgist.h:60
bool storesNulls
Definition: spgxlog.h:69
static int checkSplitConditions(Relation index, SpGistState *state, SPPageDesc *current, int *nToSplit)
Definition: spgdoinsert.c:333
bool storesNulls
Definition: spgxlog.h:178
Buffer buffer
Definition: spgdoinsert.c:37
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:240
Datum prefixDatum
Definition: spgist.h:120
uint16 nMoves
Definition: spgxlog.h:66
struct spgChooseOut::@45::@48 splitTuple
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
#define ItemPointerGetBlockNumber(pointer)
Definition: itemptr.h:98
#define SPGIST_CHOOSE_PROC
Definition: spgist.h:24
#define qsort(a, b, c, d)
Definition: port.h:505
static void moveLeafs(Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, bool isNulls)
Definition: spgdoinsert.c:387
void XLogBeginInsert(void)
Definition: xloginsert.c:135
#define SPGIST_LEAF
#define PageSetLSN(page, lsn)
Definition: bufpage.h:368
#define SPGIST_PICKSPLIT_PROC
Definition: spgist.h:25
int Buffer
Definition: buf.h:23
static int cmpOffsetNumbers(const void *a, const void *b)
Definition: spgdoinsert.c:110
OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size, OffsetNumber *startOffset, bool errorOK)
Definition: spgutils.c:1173
SpGistNodeTuple spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
Definition: spgutils.c:929
bool isRootSplit
Definition: spgxlog.h:167
#define PageGetItem(page, itemId)
Definition: bufpage.h:340
Pointer Page
Definition: bufpage.h:78
#define ItemPointerSet(pointer, blockNumber, offNum)
Definition: itemptr.h:127
BlockNumber blkno
Definition: spgdoinsert.c:36
RegProcedure index_getprocid(Relation irel, AttrNumber attnum, uint16 procnum)
Definition: indexam.c:769