PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
spgdoinsert.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * spgdoinsert.c
4 * implementation of insert algorithm
5 *
6 *
7 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/spgist/spgdoinsert.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16#include "postgres.h"
17
18#include "access/genam.h"
20#include "access/spgxlog.h"
21#include "access/xloginsert.h"
22#include "common/int.h"
23#include "common/pg_prng.h"
24#include "miscadmin.h"
25#include "storage/bufmgr.h"
26#include "utils/rel.h"
27
28
29/*
30 * SPPageDesc tracks all info about a page we are inserting into. In some
31 * situations it actually identifies a tuple, or even a specific node within
32 * an inner tuple. But any of the fields can be invalid. If the buffer
33 * field is valid, it implies we hold pin and exclusive lock on that buffer.
34 * page pointer should be valid exactly when buffer is.
35 */
36typedef struct SPPageDesc
37{
38 BlockNumber blkno; /* block number, or InvalidBlockNumber */
39 Buffer buffer; /* page's buffer number, or InvalidBuffer */
40 Page page; /* pointer to page buffer, or NULL */
41 OffsetNumber offnum; /* offset of tuple, or InvalidOffsetNumber */
42 int node; /* node number within inner tuple, or -1 */
44
45
46/*
47 * Set the item pointer in the nodeN'th entry in inner tuple tup. This
48 * is used to update the parent inner tuple's downlink after a move or
49 * split operation.
50 */
51void
53 BlockNumber blkno, OffsetNumber offset)
54{
55 int i;
56 SpGistNodeTuple node;
57
58 SGITITERATE(tup, i, node)
59 {
60 if (i == nodeN)
61 {
62 ItemPointerSet(&node->t_tid, blkno, offset);
63 return;
64 }
65 }
66
67 elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
68 nodeN);
69}
70
71/*
72 * Form a new inner tuple containing one more node than the given one, with
73 * the specified label datum, inserted at offset "offset" in the node array.
74 * The new tuple's prefix is the same as the old one's.
75 *
76 * Note that the new node initially has an invalid downlink. We'll find a
77 * page to point it to later.
78 */
81{
82 SpGistNodeTuple node,
83 *nodes;
84 int i;
85
86 /* if offset is negative, insert at end */
87 if (offset < 0)
88 offset = tuple->nNodes;
89 else if (offset > tuple->nNodes)
90 elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
91
92 nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
93 SGITITERATE(tuple, i, node)
94 {
95 if (i < offset)
96 nodes[i] = node;
97 else
98 nodes[i + 1] = node;
99 }
100
101 nodes[offset] = spgFormNodeTuple(state, label, false);
102
104 (tuple->prefixSize > 0),
105 SGITDATUM(tuple, state),
106 tuple->nNodes + 1,
107 nodes);
108}
109
110/* qsort comparator for sorting OffsetNumbers */
111static int
112cmpOffsetNumbers(const void *a, const void *b)
113{
114 return pg_cmp_u16(*(const OffsetNumber *) a, *(const OffsetNumber *) b);
115}
116
117/*
118 * Delete multiple tuples from an index page, preserving tuple offset numbers.
119 *
120 * The first tuple in the given list is replaced with a dead tuple of type
121 * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
122 * with dead tuples of type "reststate". If either firststate or reststate
123 * is REDIRECT, blkno/offnum specify where to link to.
124 *
125 * NB: this is used during WAL replay, so beware of trying to make it too
126 * smart. In particular, it shouldn't use "state" except for calling
127 * spgFormDeadTuple(). This is also used in a critical section, so no
128 * pallocs either!
129 */
130void
132 OffsetNumber *itemnos, int nitems,
133 int firststate, int reststate,
134 BlockNumber blkno, OffsetNumber offnum)
135{
136 OffsetNumber firstItem;
138 SpGistDeadTuple tuple = NULL;
139 int i;
140
141 if (nitems == 0)
142 return; /* nothing to do */
143
144 /*
145 * For efficiency we want to use PageIndexMultiDelete, which requires the
146 * targets to be listed in sorted order, so we have to sort the itemnos
147 * array. (This also greatly simplifies the math for reinserting the
148 * replacement tuples.) However, we must not scribble on the caller's
149 * array, so we have to make a copy.
150 */
151 memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
152 if (nitems > 1)
153 qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
154
155 PageIndexMultiDelete(page, sortednos, nitems);
156
157 firstItem = itemnos[0];
158
159 for (i = 0; i < nitems; i++)
160 {
161 OffsetNumber itemno = sortednos[i];
162 int tupstate;
163
164 tupstate = (itemno == firstItem) ? firststate : reststate;
165 if (tuple == NULL || tuple->tupstate != tupstate)
166 tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
167
168 if (PageAddItem(page, (Item) tuple, tuple->size,
169 itemno, false, false) != itemno)
170 elog(ERROR, "failed to add item of size %u to SPGiST index page",
171 tuple->size);
172
173 if (tupstate == SPGIST_REDIRECT)
174 SpGistPageGetOpaque(page)->nRedirection++;
175 else if (tupstate == SPGIST_PLACEHOLDER)
176 SpGistPageGetOpaque(page)->nPlaceholder++;
177 }
178}
179
180/*
181 * Update the parent inner tuple's downlink, and mark the parent buffer
182 * dirty (this must be the last change to the parent page in the current
183 * WAL action).
184 */
185static void
187 BlockNumber blkno, OffsetNumber offnum)
188{
189 SpGistInnerTuple innerTuple;
190
191 innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
192 PageGetItemId(parent->page, parent->offnum));
193
194 spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
195
196 MarkBufferDirty(parent->buffer);
197}
198
199/*
200 * Add a leaf tuple to a leaf page where there is known to be room for it
201 */
202static void
204 SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
205{
206 spgxlogAddLeaf xlrec;
207
208 xlrec.newPage = isNew;
209 xlrec.storesNulls = isNulls;
210
211 /* these will be filled below as needed */
215 xlrec.nodeI = 0;
216
218
219 if (current->offnum == InvalidOffsetNumber ||
220 SpGistBlockIsRoot(current->blkno))
221 {
222 /* Tuple is not part of a chain */
224 current->offnum = SpGistPageAddNewItem(state, current->page,
225 (Item) leafTuple, leafTuple->size,
226 NULL, false);
227
228 xlrec.offnumLeaf = current->offnum;
229
230 /* Must update parent's downlink if any */
231 if (parent->buffer != InvalidBuffer)
232 {
233 xlrec.offnumParent = parent->offnum;
234 xlrec.nodeI = parent->node;
235
236 saveNodeLink(index, parent, current->blkno, current->offnum);
237 }
238 }
239 else
240 {
241 /*
242 * Tuple must be inserted into existing chain. We mustn't change the
243 * chain's head address, but we don't need to chase the entire chain
244 * to put the tuple at the end; we can insert it second.
245 *
246 * Also, it's possible that the "chain" consists only of a DEAD tuple,
247 * in which case we should replace the DEAD tuple in-place.
248 */
249 SpGistLeafTuple head;
250 OffsetNumber offnum;
251
252 head = (SpGistLeafTuple) PageGetItem(current->page,
253 PageGetItemId(current->page, current->offnum));
254 if (head->tupstate == SPGIST_LIVE)
255 {
257 offnum = SpGistPageAddNewItem(state, current->page,
258 (Item) leafTuple, leafTuple->size,
259 NULL, false);
260
261 /*
262 * re-get head of list because it could have been moved on page,
263 * and set new second element
264 */
265 head = (SpGistLeafTuple) PageGetItem(current->page,
266 PageGetItemId(current->page, current->offnum));
267 SGLT_SET_NEXTOFFSET(head, offnum);
268
269 xlrec.offnumLeaf = offnum;
270 xlrec.offnumHeadLeaf = current->offnum;
271 }
272 else if (head->tupstate == SPGIST_DEAD)
273 {
275 PageIndexTupleDelete(current->page, current->offnum);
276 if (PageAddItem(current->page,
277 (Item) leafTuple, leafTuple->size,
278 current->offnum, false, false) != current->offnum)
279 elog(ERROR, "failed to add item of size %u to SPGiST index page",
280 leafTuple->size);
281
282 /* WAL replay distinguishes this case by equal offnums */
283 xlrec.offnumLeaf = current->offnum;
284 xlrec.offnumHeadLeaf = current->offnum;
285 }
286 else
287 elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
288 }
289
290 MarkBufferDirty(current->buffer);
291
292 if (RelationNeedsWAL(index) && !state->isBuild)
293 {
294 XLogRecPtr recptr;
295 int flags;
296
298 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
299 XLogRegisterData((char *) leafTuple, leafTuple->size);
300
301 flags = REGBUF_STANDARD;
302 if (xlrec.newPage)
303 flags |= REGBUF_WILL_INIT;
304 XLogRegisterBuffer(0, current->buffer, flags);
305 if (xlrec.offnumParent != InvalidOffsetNumber)
307
308 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
309
310 PageSetLSN(current->page, recptr);
311
312 /* update parent only if we actually changed it */
313 if (xlrec.offnumParent != InvalidOffsetNumber)
314 {
315 PageSetLSN(parent->page, recptr);
316 }
317 }
318
320}
321
322/*
323 * Count the number and total size of leaf tuples in the chain starting at
324 * current->offnum. Return number into *nToSplit and total size as function
325 * result.
326 *
327 * Klugy special case when considering the root page (i.e., root is a leaf
328 * page, but we're about to split for the first time): return fake large
329 * values to force spgdoinsert() to take the doPickSplit rather than
330 * moveLeafs code path. moveLeafs is not prepared to deal with root page.
331 */
332static int
334 SPPageDesc *current, int *nToSplit)
335{
336 int i,
337 n = 0,
338 totalSize = 0;
339
340 if (SpGistBlockIsRoot(current->blkno))
341 {
342 /* return impossible values to force split */
343 *nToSplit = BLCKSZ;
344 return BLCKSZ;
345 }
346
347 i = current->offnum;
348 while (i != InvalidOffsetNumber)
349 {
351
353 i <= PageGetMaxOffsetNumber(current->page));
354 it = (SpGistLeafTuple) PageGetItem(current->page,
355 PageGetItemId(current->page, i));
356 if (it->tupstate == SPGIST_LIVE)
357 {
358 n++;
359 totalSize += it->size + sizeof(ItemIdData);
360 }
361 else if (it->tupstate == SPGIST_DEAD)
362 {
363 /* We could see a DEAD tuple as first/only chain item */
364 Assert(i == current->offnum);
366 /* Don't count it in result, because it won't go to other page */
367 }
368 else
369 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
370
372 }
373
374 *nToSplit = n;
375
376 return totalSize;
377}
378
379/*
380 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
381 * but the chain has to be moved because there's not enough room to add
382 * newLeafTuple to its page. We use this method when the chain contains
383 * very little data so a split would be inefficient. We are sure we can
384 * fit the chain plus newLeafTuple on one other page.
385 */
386static void
388 SPPageDesc *current, SPPageDesc *parent,
389 SpGistLeafTuple newLeafTuple, bool isNulls)
390{
391 int i,
392 nDelete,
393 nInsert,
394 size;
395 Buffer nbuf;
396 Page npage;
398 startOffset = InvalidOffsetNumber;
399 bool replaceDead = false;
400 OffsetNumber *toDelete;
401 OffsetNumber *toInsert;
402 BlockNumber nblkno;
403 spgxlogMoveLeafs xlrec;
404 char *leafdata,
405 *leafptr;
406
407 /* This doesn't work on root page */
408 Assert(parent->buffer != InvalidBuffer);
409 Assert(parent->buffer != current->buffer);
410
411 /* Locate the tuples to be moved, and count up the space needed */
412 i = PageGetMaxOffsetNumber(current->page);
413 toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
414 toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
415
416 size = newLeafTuple->size + sizeof(ItemIdData);
417
418 nDelete = 0;
419 i = current->offnum;
420 while (i != InvalidOffsetNumber)
421 {
423
425 i <= PageGetMaxOffsetNumber(current->page));
426 it = (SpGistLeafTuple) PageGetItem(current->page,
427 PageGetItemId(current->page, i));
428
429 if (it->tupstate == SPGIST_LIVE)
430 {
431 toDelete[nDelete] = i;
432 size += it->size + sizeof(ItemIdData);
433 nDelete++;
434 }
435 else if (it->tupstate == SPGIST_DEAD)
436 {
437 /* We could see a DEAD tuple as first/only chain item */
438 Assert(i == current->offnum);
440 /* We don't want to move it, so don't count it in size */
441 toDelete[nDelete] = i;
442 nDelete++;
443 replaceDead = true;
444 }
445 else
446 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
447
449 }
450
451 /* Find a leaf page that will hold them */
452 nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
453 size, &xlrec.newPage);
454 npage = BufferGetPage(nbuf);
455 nblkno = BufferGetBlockNumber(nbuf);
456 Assert(nblkno != current->blkno);
457
458 leafdata = leafptr = palloc(size);
459
461
462 /* copy all the old tuples to new page, unless they're dead */
463 nInsert = 0;
464 if (!replaceDead)
465 {
466 for (i = 0; i < nDelete; i++)
467 {
469
470 it = (SpGistLeafTuple) PageGetItem(current->page,
471 PageGetItemId(current->page, toDelete[i]));
473
474 /*
475 * Update chain link (notice the chain order gets reversed, but we
476 * don't care). We're modifying the tuple on the source page
477 * here, but it's okay since we're about to delete it.
478 */
479 SGLT_SET_NEXTOFFSET(it, r);
480
481 r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
482 &startOffset, false);
483
484 toInsert[nInsert] = r;
485 nInsert++;
486
487 /* save modified tuple into leafdata as well */
488 memcpy(leafptr, it, it->size);
489 leafptr += it->size;
490 }
491 }
492
493 /* add the new tuple as well */
494 SGLT_SET_NEXTOFFSET(newLeafTuple, r);
495 r = SpGistPageAddNewItem(state, npage,
496 (Item) newLeafTuple, newLeafTuple->size,
497 &startOffset, false);
498 toInsert[nInsert] = r;
499 nInsert++;
500 memcpy(leafptr, newLeafTuple, newLeafTuple->size);
501 leafptr += newLeafTuple->size;
502
503 /*
504 * Now delete the old tuples, leaving a redirection pointer behind for the
505 * first one, unless we're doing an index build; in which case there can't
506 * be any concurrent scan so we need not provide a redirect.
507 */
508 spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
511 nblkno, r);
512
513 /* Update parent's downlink and mark parent page dirty */
514 saveNodeLink(index, parent, nblkno, r);
515
516 /* Mark the leaf pages too */
517 MarkBufferDirty(current->buffer);
518 MarkBufferDirty(nbuf);
519
520 if (RelationNeedsWAL(index) && !state->isBuild)
521 {
522 XLogRecPtr recptr;
523
524 /* prepare WAL info */
525 STORE_STATE(state, xlrec.stateSrc);
526
527 xlrec.nMoves = nDelete;
528 xlrec.replaceDead = replaceDead;
529 xlrec.storesNulls = isNulls;
530
531 xlrec.offnumParent = parent->offnum;
532 xlrec.nodeI = parent->node;
533
536 XLogRegisterData((char *) toDelete,
537 sizeof(OffsetNumber) * nDelete);
538 XLogRegisterData((char *) toInsert,
539 sizeof(OffsetNumber) * nInsert);
540 XLogRegisterData((char *) leafdata, leafptr - leafdata);
541
543 XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
545
546 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
547
548 PageSetLSN(current->page, recptr);
549 PageSetLSN(npage, recptr);
550 PageSetLSN(parent->page, recptr);
551 }
552
554
555 /* Update local free-space cache and release new buffer */
558}
559
560/*
561 * Update previously-created redirection tuple with appropriate destination
562 *
563 * We use this when it's not convenient to know the destination first.
564 * The tuple should have been made with the "impossible" destination of
565 * the metapage.
566 */
567static void
569 BlockNumber blkno, OffsetNumber offnum)
570{
572
573 dt = (SpGistDeadTuple) PageGetItem(current->page,
574 PageGetItemId(current->page, position));
577 ItemPointerSet(&dt->pointer, blkno, offnum);
578}
579
580/*
581 * Test to see if the user-defined picksplit function failed to do its job,
582 * ie, it put all the leaf tuples into the same node.
583 * If so, randomly divide the tuples into several nodes (all with the same
584 * label) and return true to select allTheSame mode for this inner tuple.
585 *
586 * (This code is also used to forcibly select allTheSame mode for nulls.)
587 *
588 * If we know that the leaf tuples wouldn't all fit on one page, then we
589 * exclude the last tuple (which is the incoming new tuple that forced a split)
590 * from the check to see if more than one node is used. The reason for this
591 * is that if the existing tuples are put into only one chain, then even if
592 * we move them all to an empty page, there would still not be room for the
593 * new tuple, so we'd get into an infinite loop of picksplit attempts.
594 * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
595 * be split across pages. (Exercise for the reader: figure out why this
596 * fixes the problem even when there is only one old tuple.)
597 */
598static bool
600 bool *includeNew)
601{
602 int theNode;
603 int limit;
604 int i;
605
606 /* For the moment, assume we can include the new leaf tuple */
607 *includeNew = true;
608
609 /* If there's only the new leaf tuple, don't select allTheSame mode */
610 if (in->nTuples <= 1)
611 return false;
612
613 /* If tuple set doesn't fit on one page, ignore the new tuple in test */
614 limit = tooBig ? in->nTuples - 1 : in->nTuples;
615
616 /* Check to see if more than one node is populated */
617 theNode = out->mapTuplesToNodes[0];
618 for (i = 1; i < limit; i++)
619 {
620 if (out->mapTuplesToNodes[i] != theNode)
621 return false;
622 }
623
624 /* Nope, so override the picksplit function's decisions */
625
626 /* If the new tuple is in its own node, it can't be included in split */
627 if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
628 *includeNew = false;
629
630 out->nNodes = 8; /* arbitrary number of child nodes */
631
632 /* Random assignment of tuples to nodes (note we include new tuple) */
633 for (i = 0; i < in->nTuples; i++)
634 out->mapTuplesToNodes[i] = i % out->nNodes;
635
636 /* The opclass may not use node labels, but if it does, duplicate 'em */
637 if (out->nodeLabels)
638 {
639 Datum theLabel = out->nodeLabels[theNode];
640
641 out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
642 for (i = 0; i < out->nNodes; i++)
643 out->nodeLabels[i] = theLabel;
644 }
645
646 /* We don't touch the prefix or the leaf tuple datum assignments */
647
648 return true;
649}
650
651/*
652 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
653 * but the chain has to be split because there's not enough room to add
654 * newLeafTuple to its page.
655 *
656 * This function splits the leaf tuple set according to picksplit's rules,
657 * creating one or more new chains that are spread across the current page
658 * and an additional leaf page (we assume that two leaf pages will be
659 * sufficient). A new inner tuple is created, and the parent downlink
660 * pointer is updated to point to that inner tuple instead of the leaf chain.
661 *
662 * On exit, current contains the address of the new inner tuple.
663 *
664 * Returns true if we successfully inserted newLeafTuple during this function,
665 * false if caller still has to do it (meaning another picksplit operation is
666 * probably needed). Failure could occur if the picksplit result is fairly
667 * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
668 * Because we force the picksplit result to be at least two chains, each
669 * cycle will get rid of at least one leaf tuple from the chain, so the loop
670 * will eventually terminate if lack of balance is the issue. If the tuple
671 * is too big, we assume that repeated picksplit operations will eventually
672 * make it small enough by repeated prefix-stripping. A broken opclass could
673 * make this an infinite loop, though, so spgdoinsert() checks that the
674 * leaf datums get smaller each time.
675 */
676static bool
678 SPPageDesc *current, SPPageDesc *parent,
679 SpGistLeafTuple newLeafTuple,
680 int level, bool isNulls, bool isNew)
681{
682 bool insertedNew = false;
684 spgPickSplitOut out;
685 FmgrInfo *procinfo;
686 bool includeNew;
687 int i,
688 max,
689 n;
690 SpGistInnerTuple innerTuple;
691 SpGistNodeTuple node,
692 *nodes;
693 Buffer newInnerBuffer,
694 newLeafBuffer;
695 uint8 *leafPageSelect;
696 int *leafSizes;
697 OffsetNumber *toDelete;
698 OffsetNumber *toInsert;
699 OffsetNumber redirectTuplePos = InvalidOffsetNumber;
700 OffsetNumber startOffsets[2];
701 SpGistLeafTuple *oldLeafs;
702 SpGistLeafTuple *newLeafs;
703 Datum leafDatums[INDEX_MAX_KEYS];
704 bool leafIsnulls[INDEX_MAX_KEYS];
705 int spaceToDelete;
706 int currentFreeSpace;
707 int totalLeafSizes;
708 bool allTheSame;
709 spgxlogPickSplit xlrec;
710 char *leafdata,
711 *leafptr;
712 SPPageDesc saveCurrent;
713 int nToDelete,
714 nToInsert,
715 maxToInclude;
716
717 in.level = level;
718
719 /*
720 * Allocate per-leaf-tuple work arrays with max possible size
721 */
722 max = PageGetMaxOffsetNumber(current->page);
723 n = max + 1;
724 in.datums = (Datum *) palloc(sizeof(Datum) * n);
725 toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
726 toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
727 oldLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
728 newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
729 leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
730
731 STORE_STATE(state, xlrec.stateSrc);
732
733 /*
734 * Form list of leaf tuples which will be distributed as split result;
735 * also, count up the amount of space that will be freed from current.
736 * (Note that in the non-root case, we won't actually delete the old
737 * tuples, only replace them with redirects or placeholders.)
738 */
739 nToInsert = 0;
740 nToDelete = 0;
741 spaceToDelete = 0;
742 if (SpGistBlockIsRoot(current->blkno))
743 {
744 /*
745 * We are splitting the root (which up to now is also a leaf page).
746 * Its tuples are not linked, so scan sequentially to get them all. We
747 * ignore the original value of current->offnum.
748 */
749 for (i = FirstOffsetNumber; i <= max; i++)
750 {
752
753 it = (SpGistLeafTuple) PageGetItem(current->page,
754 PageGetItemId(current->page, i));
755 if (it->tupstate == SPGIST_LIVE)
756 {
757 in.datums[nToInsert] =
758 isNulls ? (Datum) 0 : SGLTDATUM(it, state);
759 oldLeafs[nToInsert] = it;
760 nToInsert++;
761 toDelete[nToDelete] = i;
762 nToDelete++;
763 /* we will delete the tuple altogether, so count full space */
764 spaceToDelete += it->size + sizeof(ItemIdData);
765 }
766 else /* tuples on root should be live */
767 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
768 }
769 }
770 else
771 {
772 /* Normal case, just collect the leaf tuples in the chain */
773 i = current->offnum;
774 while (i != InvalidOffsetNumber)
775 {
777
778 Assert(i >= FirstOffsetNumber && i <= max);
779 it = (SpGistLeafTuple) PageGetItem(current->page,
780 PageGetItemId(current->page, i));
781 if (it->tupstate == SPGIST_LIVE)
782 {
783 in.datums[nToInsert] =
784 isNulls ? (Datum) 0 : SGLTDATUM(it, state);
785 oldLeafs[nToInsert] = it;
786 nToInsert++;
787 toDelete[nToDelete] = i;
788 nToDelete++;
789 /* we will not delete the tuple, only replace with dead */
790 Assert(it->size >= SGDTSIZE);
791 spaceToDelete += it->size - SGDTSIZE;
792 }
793 else if (it->tupstate == SPGIST_DEAD)
794 {
795 /* We could see a DEAD tuple as first/only chain item */
796 Assert(i == current->offnum);
798 toDelete[nToDelete] = i;
799 nToDelete++;
800 /* replacing it with redirect will save no space */
801 }
802 else
803 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
804
806 }
807 }
808 in.nTuples = nToInsert;
809
810 /*
811 * We may not actually insert new tuple because another picksplit may be
812 * necessary due to too large value, but we will try to allocate enough
813 * space to include it; and in any case it has to be included in the input
814 * for the picksplit function. So don't increment nToInsert yet.
815 */
816 in.datums[in.nTuples] =
817 isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state);
818 oldLeafs[in.nTuples] = newLeafTuple;
819 in.nTuples++;
820
821 memset(&out, 0, sizeof(out));
822
823 if (!isNulls)
824 {
825 /*
826 * Perform split using user-defined method.
827 */
829 FunctionCall2Coll(procinfo,
830 index->rd_indcollation[0],
831 PointerGetDatum(&in),
832 PointerGetDatum(&out));
833
834 /*
835 * Form new leaf tuples and count up the total space needed.
836 */
837 totalLeafSizes = 0;
838 for (i = 0; i < in.nTuples; i++)
839 {
840 if (state->leafTupDesc->natts > 1)
841 spgDeformLeafTuple(oldLeafs[i],
842 state->leafTupDesc,
843 leafDatums,
844 leafIsnulls,
845 isNulls);
846
847 leafDatums[spgKeyColumn] = out.leafTupleDatums[i];
848 leafIsnulls[spgKeyColumn] = false;
849
850 newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
851 leafDatums,
852 leafIsnulls);
853 totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
854 }
855 }
856 else
857 {
858 /*
859 * Perform dummy split that puts all tuples into one node.
860 * checkAllTheSame will override this and force allTheSame mode.
861 */
862 out.hasPrefix = false;
863 out.nNodes = 1;
864 out.nodeLabels = NULL;
865 out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
866
867 /*
868 * Form new leaf tuples and count up the total space needed.
869 */
870 totalLeafSizes = 0;
871 for (i = 0; i < in.nTuples; i++)
872 {
873 if (state->leafTupDesc->natts > 1)
874 spgDeformLeafTuple(oldLeafs[i],
875 state->leafTupDesc,
876 leafDatums,
877 leafIsnulls,
878 isNulls);
879
880 /*
881 * Nulls tree can contain only null key values.
882 */
883 leafDatums[spgKeyColumn] = (Datum) 0;
884 leafIsnulls[spgKeyColumn] = true;
885
886 newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
887 leafDatums,
888 leafIsnulls);
889 totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
890 }
891 }
892
893 /*
894 * Check to see if the picksplit function failed to separate the values,
895 * ie, it put them all into the same child node. If so, select allTheSame
896 * mode and create a random split instead. See comments for
897 * checkAllTheSame as to why we need to know if the new leaf tuples could
898 * fit on one page.
899 */
900 allTheSame = checkAllTheSame(&in, &out,
901 totalLeafSizes > SPGIST_PAGE_CAPACITY,
902 &includeNew);
903
904 /*
905 * If checkAllTheSame decided we must exclude the new tuple, don't
906 * consider it any further.
907 */
908 if (includeNew)
909 maxToInclude = in.nTuples;
910 else
911 {
912 maxToInclude = in.nTuples - 1;
913 totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
914 }
915
916 /*
917 * Allocate per-node work arrays. Since checkAllTheSame could replace
918 * out.nNodes with a value larger than the number of tuples on the input
919 * page, we can't allocate these arrays before here.
920 */
921 nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
922 leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
923
924 /*
925 * Form nodes of inner tuple and inner tuple itself
926 */
927 for (i = 0; i < out.nNodes; i++)
928 {
929 Datum label = (Datum) 0;
930 bool labelisnull = (out.nodeLabels == NULL);
931
932 if (!labelisnull)
933 label = out.nodeLabels[i];
934 nodes[i] = spgFormNodeTuple(state, label, labelisnull);
935 }
936 innerTuple = spgFormInnerTuple(state,
937 out.hasPrefix, out.prefixDatum,
938 out.nNodes, nodes);
939 innerTuple->allTheSame = allTheSame;
940
941 /*
942 * Update nodes[] array to point into the newly formed innerTuple, so that
943 * we can adjust their downlinks below.
944 */
945 SGITITERATE(innerTuple, i, node)
946 {
947 nodes[i] = node;
948 }
949
950 /*
951 * Re-scan new leaf tuples and count up the space needed under each node.
952 */
953 for (i = 0; i < maxToInclude; i++)
954 {
955 n = out.mapTuplesToNodes[i];
956 if (n < 0 || n >= out.nNodes)
957 elog(ERROR, "inconsistent result of SPGiST picksplit function");
958 leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
959 }
960
961 /*
962 * To perform the split, we must insert a new inner tuple, which can't go
963 * on a leaf page; and unless we are splitting the root page, we must then
964 * update the parent tuple's downlink to point to the inner tuple. If
965 * there is room, we'll put the new inner tuple on the same page as the
966 * parent tuple, otherwise we need another non-leaf buffer. But if the
967 * parent page is the root, we can't add the new inner tuple there,
968 * because the root page must have only one inner tuple.
969 */
970 xlrec.initInner = false;
971 if (parent->buffer != InvalidBuffer &&
972 !SpGistBlockIsRoot(parent->blkno) &&
973 (SpGistPageGetFreeSpace(parent->page, 1) >=
974 innerTuple->size + sizeof(ItemIdData)))
975 {
976 /* New inner tuple will fit on parent page */
977 newInnerBuffer = parent->buffer;
978 }
979 else if (parent->buffer != InvalidBuffer)
980 {
981 /* Send tuple to page with next triple parity (see README) */
982 newInnerBuffer = SpGistGetBuffer(index,
983 GBUF_INNER_PARITY(parent->blkno + 1) |
984 (isNulls ? GBUF_NULLS : 0),
985 innerTuple->size + sizeof(ItemIdData),
986 &xlrec.initInner);
987 }
988 else
989 {
990 /* Root page split ... inner tuple will go to root page */
991 newInnerBuffer = InvalidBuffer;
992 }
993
994 /*
995 * The new leaf tuples converted from the existing ones should require the
996 * same or less space, and therefore should all fit onto one page
997 * (although that's not necessarily the current page, since we can't
998 * delete the old tuples but only replace them with placeholders).
999 * However, the incoming new tuple might not also fit, in which case we
1000 * might need another picksplit cycle to reduce it some more.
1001 *
1002 * If there's not room to put everything back onto the current page, then
1003 * we decide on a per-node basis which tuples go to the new page. (We do
1004 * it like that because leaf tuple chains can't cross pages, so we must
1005 * place all leaf tuples belonging to the same parent node on the same
1006 * page.)
1007 *
1008 * If we are splitting the root page (turning it from a leaf page into an
1009 * inner page), then no leaf tuples can go back to the current page; they
1010 * must all go somewhere else.
1011 */
1012 if (!SpGistBlockIsRoot(current->blkno))
1013 currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
1014 else
1015 currentFreeSpace = 0; /* prevent assigning any tuples to current */
1016
1017 xlrec.initDest = false;
1018
1019 if (totalLeafSizes <= currentFreeSpace)
1020 {
1021 /* All the leaf tuples will fit on current page */
1022 newLeafBuffer = InvalidBuffer;
1023 /* mark new leaf tuple as included in insertions, if allowed */
1024 if (includeNew)
1025 {
1026 nToInsert++;
1027 insertedNew = true;
1028 }
1029 for (i = 0; i < nToInsert; i++)
1030 leafPageSelect[i] = 0; /* signifies current page */
1031 }
1032 else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1033 {
1034 /*
1035 * We're trying to split up a long value by repeated suffixing, but
1036 * it's not going to fit yet. Don't bother allocating a second leaf
1037 * buffer that we won't be able to use.
1038 */
1039 newLeafBuffer = InvalidBuffer;
1040 Assert(includeNew);
1041 Assert(nToInsert == 0);
1042 }
1043 else
1044 {
1045 /* We will need another leaf page */
1046 uint8 *nodePageSelect;
1047 int curspace;
1048 int newspace;
1049
1050 newLeafBuffer = SpGistGetBuffer(index,
1051 GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1052 Min(totalLeafSizes,
1054 &xlrec.initDest);
1055
1056 /*
1057 * Attempt to assign node groups to the two pages. We might fail to
1058 * do so, even if totalLeafSizes is less than the available space,
1059 * because we can't split a group across pages.
1060 */
1061 nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1062
1063 curspace = currentFreeSpace;
1064 newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1065 for (i = 0; i < out.nNodes; i++)
1066 {
1067 if (leafSizes[i] <= curspace)
1068 {
1069 nodePageSelect[i] = 0; /* signifies current page */
1070 curspace -= leafSizes[i];
1071 }
1072 else
1073 {
1074 nodePageSelect[i] = 1; /* signifies new leaf page */
1075 newspace -= leafSizes[i];
1076 }
1077 }
1078 if (curspace >= 0 && newspace >= 0)
1079 {
1080 /* Successful assignment, so we can include the new leaf tuple */
1081 if (includeNew)
1082 {
1083 nToInsert++;
1084 insertedNew = true;
1085 }
1086 }
1087 else if (includeNew)
1088 {
1089 /* We must exclude the new leaf tuple from the split */
1090 int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1091
1092 leafSizes[nodeOfNewTuple] -=
1093 newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1094
1095 /* Repeat the node assignment process --- should succeed now */
1096 curspace = currentFreeSpace;
1097 newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1098 for (i = 0; i < out.nNodes; i++)
1099 {
1100 if (leafSizes[i] <= curspace)
1101 {
1102 nodePageSelect[i] = 0; /* signifies current page */
1103 curspace -= leafSizes[i];
1104 }
1105 else
1106 {
1107 nodePageSelect[i] = 1; /* signifies new leaf page */
1108 newspace -= leafSizes[i];
1109 }
1110 }
1111 if (curspace < 0 || newspace < 0)
1112 elog(ERROR, "failed to divide leaf tuple groups across pages");
1113 }
1114 else
1115 {
1116 /* oops, we already excluded new tuple ... should not get here */
1117 elog(ERROR, "failed to divide leaf tuple groups across pages");
1118 }
1119 /* Expand the per-node assignments to be shown per leaf tuple */
1120 for (i = 0; i < nToInsert; i++)
1121 {
1122 n = out.mapTuplesToNodes[i];
1123 leafPageSelect[i] = nodePageSelect[n];
1124 }
1125 }
1126
1127 /* Start preparing WAL record */
1128 xlrec.nDelete = 0;
1129 xlrec.initSrc = isNew;
1130 xlrec.storesNulls = isNulls;
1131 xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1132
1133 leafdata = leafptr = (char *) palloc(totalLeafSizes);
1134
1135 /* Here we begin making the changes to the target pages */
1137
1138 /*
1139 * Delete old leaf tuples from current buffer, except when we're splitting
1140 * the root; in that case there's no need because we'll re-init the page
1141 * below. We do this first to make room for reinserting new leaf tuples.
1142 */
1143 if (!SpGistBlockIsRoot(current->blkno))
1144 {
1145 /*
1146 * Init buffer instead of deleting individual tuples, but only if
1147 * there aren't any other live tuples and only during build; otherwise
1148 * we need to set a redirection tuple for concurrent scans.
1149 */
1150 if (state->isBuild &&
1151 nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1152 PageGetMaxOffsetNumber(current->page))
1153 {
1154 SpGistInitBuffer(current->buffer,
1155 SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1156 xlrec.initSrc = true;
1157 }
1158 else if (isNew)
1159 {
1160 /* don't expose the freshly init'd buffer as a backup block */
1161 Assert(nToDelete == 0);
1162 }
1163 else
1164 {
1165 xlrec.nDelete = nToDelete;
1166
1167 if (!state->isBuild)
1168 {
1169 /*
1170 * Need to create redirect tuple (it will point to new inner
1171 * tuple) but right now the new tuple's location is not known
1172 * yet. So, set the redirection pointer to "impossible" value
1173 * and remember its position to update tuple later.
1174 */
1175 if (nToDelete > 0)
1176 redirectTuplePos = toDelete[0];
1178 toDelete, nToDelete,
1183 }
1184 else
1185 {
1186 /*
1187 * During index build there is not concurrent searches, so we
1188 * don't need to create redirection tuple.
1189 */
1191 toDelete, nToDelete,
1196 }
1197 }
1198 }
1199
1200 /*
1201 * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1202 * nodes.
1203 */
1204 startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1205 for (i = 0; i < nToInsert; i++)
1206 {
1207 SpGistLeafTuple it = newLeafs[i];
1208 Buffer leafBuffer;
1209 BlockNumber leafBlock;
1210 OffsetNumber newoffset;
1211
1212 /* Which page is it going to? */
1213 leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1214 leafBlock = BufferGetBlockNumber(leafBuffer);
1215
1216 /* Link tuple into correct chain for its node */
1217 n = out.mapTuplesToNodes[i];
1218
1219 if (ItemPointerIsValid(&nodes[n]->t_tid))
1220 {
1221 Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1222 SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
1223 }
1224 else
1226
1227 /* Insert it on page */
1228 newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1229 (Item) it, it->size,
1230 &startOffsets[leafPageSelect[i]],
1231 false);
1232 toInsert[i] = newoffset;
1233
1234 /* ... and complete the chain linking */
1235 ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1236
1237 /* Also copy leaf tuple into WAL data */
1238 memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1239 leafptr += newLeafs[i]->size;
1240 }
1241
1242 /*
1243 * We're done modifying the other leaf buffer (if any), so mark it dirty.
1244 * current->buffer will be marked below, after we're entirely done
1245 * modifying it.
1246 */
1247 if (newLeafBuffer != InvalidBuffer)
1248 {
1249 MarkBufferDirty(newLeafBuffer);
1250 }
1251
1252 /* Remember current buffer, since we're about to change "current" */
1253 saveCurrent = *current;
1254
1255 /*
1256 * Store the new innerTuple
1257 */
1258 if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1259 {
1260 /*
1261 * new inner tuple goes to parent page
1262 */
1263 Assert(current->buffer != parent->buffer);
1264
1265 /* Repoint "current" at the new inner tuple */
1266 current->blkno = parent->blkno;
1267 current->buffer = parent->buffer;
1268 current->page = parent->page;
1269 xlrec.offnumInner = current->offnum =
1271 (Item) innerTuple, innerTuple->size,
1272 NULL, false);
1273
1274 /*
1275 * Update parent node link and mark parent page dirty
1276 */
1277 xlrec.innerIsParent = true;
1278 xlrec.offnumParent = parent->offnum;
1279 xlrec.nodeI = parent->node;
1280 saveNodeLink(index, parent, current->blkno, current->offnum);
1281
1282 /*
1283 * Update redirection link (in old current buffer)
1284 */
1285 if (redirectTuplePos != InvalidOffsetNumber)
1286 setRedirectionTuple(&saveCurrent, redirectTuplePos,
1287 current->blkno, current->offnum);
1288
1289 /* Done modifying old current buffer, mark it dirty */
1290 MarkBufferDirty(saveCurrent.buffer);
1291 }
1292 else if (parent->buffer != InvalidBuffer)
1293 {
1294 /*
1295 * new inner tuple will be stored on a new page
1296 */
1297 Assert(newInnerBuffer != InvalidBuffer);
1298
1299 /* Repoint "current" at the new inner tuple */
1300 current->buffer = newInnerBuffer;
1301 current->blkno = BufferGetBlockNumber(current->buffer);
1302 current->page = BufferGetPage(current->buffer);
1303 xlrec.offnumInner = current->offnum =
1305 (Item) innerTuple, innerTuple->size,
1306 NULL, false);
1307
1308 /* Done modifying new current buffer, mark it dirty */
1309 MarkBufferDirty(current->buffer);
1310
1311 /*
1312 * Update parent node link and mark parent page dirty
1313 */
1314 xlrec.innerIsParent = (parent->buffer == current->buffer);
1315 xlrec.offnumParent = parent->offnum;
1316 xlrec.nodeI = parent->node;
1317 saveNodeLink(index, parent, current->blkno, current->offnum);
1318
1319 /*
1320 * Update redirection link (in old current buffer)
1321 */
1322 if (redirectTuplePos != InvalidOffsetNumber)
1323 setRedirectionTuple(&saveCurrent, redirectTuplePos,
1324 current->blkno, current->offnum);
1325
1326 /* Done modifying old current buffer, mark it dirty */
1327 MarkBufferDirty(saveCurrent.buffer);
1328 }
1329 else
1330 {
1331 /*
1332 * Splitting root page, which was a leaf but now becomes inner page
1333 * (and so "current" continues to point at it)
1334 */
1335 Assert(SpGistBlockIsRoot(current->blkno));
1336 Assert(redirectTuplePos == InvalidOffsetNumber);
1337
1338 SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1339 xlrec.initInner = true;
1340 xlrec.innerIsParent = false;
1341
1342 xlrec.offnumInner = current->offnum =
1343 PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1344 InvalidOffsetNumber, false, false);
1345 if (current->offnum != FirstOffsetNumber)
1346 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1347 innerTuple->size);
1348
1349 /* No parent link to update, nor redirection to do */
1351 xlrec.nodeI = 0;
1352
1353 /* Done modifying new current buffer, mark it dirty */
1354 MarkBufferDirty(current->buffer);
1355
1356 /* saveCurrent doesn't represent a different buffer */
1357 saveCurrent.buffer = InvalidBuffer;
1358 }
1359
1360 if (RelationNeedsWAL(index) && !state->isBuild)
1361 {
1362 XLogRecPtr recptr;
1363 int flags;
1364
1366
1367 xlrec.nInsert = nToInsert;
1368 XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
1369
1370 XLogRegisterData((char *) toDelete,
1371 sizeof(OffsetNumber) * xlrec.nDelete);
1372 XLogRegisterData((char *) toInsert,
1373 sizeof(OffsetNumber) * xlrec.nInsert);
1374 XLogRegisterData((char *) leafPageSelect,
1375 sizeof(uint8) * xlrec.nInsert);
1376 XLogRegisterData((char *) innerTuple, innerTuple->size);
1377 XLogRegisterData(leafdata, leafptr - leafdata);
1378
1379 /* Old leaf page */
1380 if (BufferIsValid(saveCurrent.buffer))
1381 {
1382 flags = REGBUF_STANDARD;
1383 if (xlrec.initSrc)
1384 flags |= REGBUF_WILL_INIT;
1385 XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1386 }
1387
1388 /* New leaf page */
1389 if (BufferIsValid(newLeafBuffer))
1390 {
1391 flags = REGBUF_STANDARD;
1392 if (xlrec.initDest)
1393 flags |= REGBUF_WILL_INIT;
1394 XLogRegisterBuffer(1, newLeafBuffer, flags);
1395 }
1396
1397 /* Inner page */
1398 flags = REGBUF_STANDARD;
1399 if (xlrec.initInner)
1400 flags |= REGBUF_WILL_INIT;
1401 XLogRegisterBuffer(2, current->buffer, flags);
1402
1403 /* Parent page, if different from inner page */
1404 if (parent->buffer != InvalidBuffer)
1405 {
1406 if (parent->buffer != current->buffer)
1408 else
1409 Assert(xlrec.innerIsParent);
1410 }
1411
1412 /* Issue the WAL record */
1413 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1414
1415 /* Update page LSNs on all affected pages */
1416 if (newLeafBuffer != InvalidBuffer)
1417 {
1418 Page page = BufferGetPage(newLeafBuffer);
1419
1420 PageSetLSN(page, recptr);
1421 }
1422
1423 if (saveCurrent.buffer != InvalidBuffer)
1424 {
1425 Page page = BufferGetPage(saveCurrent.buffer);
1426
1427 PageSetLSN(page, recptr);
1428 }
1429
1430 PageSetLSN(current->page, recptr);
1431
1432 if (parent->buffer != InvalidBuffer)
1433 {
1434 PageSetLSN(parent->page, recptr);
1435 }
1436 }
1437
1439
1440 /* Update local free-space cache and unlock buffers */
1441 if (newLeafBuffer != InvalidBuffer)
1442 {
1443 SpGistSetLastUsedPage(index, newLeafBuffer);
1444 UnlockReleaseBuffer(newLeafBuffer);
1445 }
1446 if (saveCurrent.buffer != InvalidBuffer)
1447 {
1448 SpGistSetLastUsedPage(index, saveCurrent.buffer);
1449 UnlockReleaseBuffer(saveCurrent.buffer);
1450 }
1451
1452 return insertedNew;
1453}
1454
1455/*
1456 * spgMatchNode action: descend to N'th child node of current inner tuple
1457 */
1458static void
1460 SpGistInnerTuple innerTuple,
1461 SPPageDesc *current, SPPageDesc *parent, int nodeN)
1462{
1463 int i;
1464 SpGistNodeTuple node;
1465
1466 /* Release previous parent buffer if any */
1467 if (parent->buffer != InvalidBuffer &&
1468 parent->buffer != current->buffer)
1469 {
1471 UnlockReleaseBuffer(parent->buffer);
1472 }
1473
1474 /* Repoint parent to specified node of current inner tuple */
1475 parent->blkno = current->blkno;
1476 parent->buffer = current->buffer;
1477 parent->page = current->page;
1478 parent->offnum = current->offnum;
1479 parent->node = nodeN;
1480
1481 /* Locate that node */
1482 SGITITERATE(innerTuple, i, node)
1483 {
1484 if (i == nodeN)
1485 break;
1486 }
1487
1488 if (i != nodeN)
1489 elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1490 nodeN);
1491
1492 /* Point current to the downlink location, if any */
1493 if (ItemPointerIsValid(&node->t_tid))
1494 {
1495 current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1496 current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1497 }
1498 else
1499 {
1500 /* Downlink is empty, so we'll need to find a new page */
1501 current->blkno = InvalidBlockNumber;
1502 current->offnum = InvalidOffsetNumber;
1503 }
1504
1505 current->buffer = InvalidBuffer;
1506 current->page = NULL;
1507}
1508
1509/*
1510 * spgAddNode action: add a node to the inner tuple at current
1511 */
1512static void
1514 SpGistInnerTuple innerTuple,
1515 SPPageDesc *current, SPPageDesc *parent,
1516 int nodeN, Datum nodeLabel)
1517{
1518 SpGistInnerTuple newInnerTuple;
1519 spgxlogAddNode xlrec;
1520
1521 /* Should not be applied to nulls */
1522 Assert(!SpGistPageStoresNulls(current->page));
1523
1524 /* Construct new inner tuple with additional node */
1525 newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1526
1527 /* Prepare WAL record */
1528 STORE_STATE(state, xlrec.stateSrc);
1529 xlrec.offnum = current->offnum;
1530
1531 /* we don't fill these unless we need to change the parent downlink */
1532 xlrec.parentBlk = -1;
1534 xlrec.nodeI = 0;
1535
1536 /* we don't fill these unless tuple has to be moved */
1538 xlrec.newPage = false;
1539
1540 if (PageGetExactFreeSpace(current->page) >=
1541 newInnerTuple->size - innerTuple->size)
1542 {
1543 /*
1544 * We can replace the inner tuple by new version in-place
1545 */
1547
1548 PageIndexTupleDelete(current->page, current->offnum);
1549 if (PageAddItem(current->page,
1550 (Item) newInnerTuple, newInnerTuple->size,
1551 current->offnum, false, false) != current->offnum)
1552 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1553 newInnerTuple->size);
1554
1555 MarkBufferDirty(current->buffer);
1556
1557 if (RelationNeedsWAL(index) && !state->isBuild)
1558 {
1559 XLogRecPtr recptr;
1560
1562 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1563 XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1564
1566
1567 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1568
1569 PageSetLSN(current->page, recptr);
1570 }
1571
1573 }
1574 else
1575 {
1576 /*
1577 * move inner tuple to another page, and update parent
1578 */
1579 SpGistDeadTuple dt;
1580 SPPageDesc saveCurrent;
1581
1582 /*
1583 * It should not be possible to get here for the root page, since we
1584 * allow only one inner tuple on the root page, and spgFormInnerTuple
1585 * always checks that inner tuples don't exceed the size of a page.
1586 */
1587 if (SpGistBlockIsRoot(current->blkno))
1588 elog(ERROR, "cannot enlarge root tuple any more");
1589 Assert(parent->buffer != InvalidBuffer);
1590
1591 saveCurrent = *current;
1592
1593 xlrec.offnumParent = parent->offnum;
1594 xlrec.nodeI = parent->node;
1595
1596 /*
1597 * obtain new buffer with the same parity as current, since it will be
1598 * a child of same parent tuple
1599 */
1600 current->buffer = SpGistGetBuffer(index,
1601 GBUF_INNER_PARITY(current->blkno),
1602 newInnerTuple->size + sizeof(ItemIdData),
1603 &xlrec.newPage);
1604 current->blkno = BufferGetBlockNumber(current->buffer);
1605 current->page = BufferGetPage(current->buffer);
1606
1607 /*
1608 * Let's just make real sure new current isn't same as old. Right now
1609 * that's impossible, but if SpGistGetBuffer ever got smart enough to
1610 * delete placeholder tuples before checking space, maybe it wouldn't
1611 * be impossible. The case would appear to work except that WAL
1612 * replay would be subtly wrong, so I think a mere assert isn't enough
1613 * here.
1614 */
1615 if (current->blkno == saveCurrent.blkno)
1616 elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1617
1618 /*
1619 * New current and parent buffer will both be modified; but note that
1620 * parent buffer could be same as either new or old current.
1621 */
1622 if (parent->buffer == saveCurrent.buffer)
1623 xlrec.parentBlk = 0;
1624 else if (parent->buffer == current->buffer)
1625 xlrec.parentBlk = 1;
1626 else
1627 xlrec.parentBlk = 2;
1628
1630
1631 /* insert new ... */
1632 xlrec.offnumNew = current->offnum =
1634 (Item) newInnerTuple, newInnerTuple->size,
1635 NULL, false);
1636
1637 MarkBufferDirty(current->buffer);
1638
1639 /* update parent's downlink and mark parent page dirty */
1640 saveNodeLink(index, parent, current->blkno, current->offnum);
1641
1642 /*
1643 * Replace old tuple with a placeholder or redirection tuple. Unless
1644 * doing an index build, we have to insert a redirection tuple for
1645 * possible concurrent scans. We can't just delete it in any case,
1646 * because that could change the offsets of other tuples on the page,
1647 * breaking downlinks from their parents.
1648 */
1649 if (state->isBuild)
1652 else
1654 current->blkno, current->offnum);
1655
1656 PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1657 if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1658 saveCurrent.offnum,
1659 false, false) != saveCurrent.offnum)
1660 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1661 dt->size);
1662
1663 if (state->isBuild)
1664 SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1665 else
1666 SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1667
1668 MarkBufferDirty(saveCurrent.buffer);
1669
1670 if (RelationNeedsWAL(index) && !state->isBuild)
1671 {
1672 XLogRecPtr recptr;
1673 int flags;
1674
1676
1677 /* orig page */
1678 XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1679 /* new page */
1680 flags = REGBUF_STANDARD;
1681 if (xlrec.newPage)
1682 flags |= REGBUF_WILL_INIT;
1683 XLogRegisterBuffer(1, current->buffer, flags);
1684 /* parent page (if different from orig and new) */
1685 if (xlrec.parentBlk == 2)
1687
1688 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1689 XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1690
1691 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1692
1693 /* we don't bother to check if any of these are redundant */
1694 PageSetLSN(current->page, recptr);
1695 PageSetLSN(parent->page, recptr);
1696 PageSetLSN(saveCurrent.page, recptr);
1697 }
1698
1700
1701 /* Release saveCurrent if it's not same as current or parent */
1702 if (saveCurrent.buffer != current->buffer &&
1703 saveCurrent.buffer != parent->buffer)
1704 {
1705 SpGistSetLastUsedPage(index, saveCurrent.buffer);
1706 UnlockReleaseBuffer(saveCurrent.buffer);
1707 }
1708 }
1709}
1710
1711/*
1712 * spgSplitNode action: split inner tuple at current into prefix and postfix
1713 */
1714static void
1716 SpGistInnerTuple innerTuple,
1717 SPPageDesc *current, spgChooseOut *out)
1718{
1719 SpGistInnerTuple prefixTuple,
1720 postfixTuple;
1721 SpGistNodeTuple node,
1722 *nodes;
1723 BlockNumber postfixBlkno;
1724 OffsetNumber postfixOffset;
1725 int i;
1726 spgxlogSplitTuple xlrec;
1727 Buffer newBuffer = InvalidBuffer;
1728
1729 /* Should not be applied to nulls */
1730 Assert(!SpGistPageStoresNulls(current->page));
1731
1732 /* Check opclass gave us sane values */
1733 if (out->result.splitTuple.prefixNNodes <= 0 ||
1735 elog(ERROR, "invalid number of prefix nodes: %d",
1737 if (out->result.splitTuple.childNodeN < 0 ||
1740 elog(ERROR, "invalid child node number: %d",
1742
1743 /*
1744 * Construct new prefix tuple with requested number of nodes. We'll fill
1745 * in the childNodeN'th node's downlink below.
1746 */
1747 nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
1749
1750 for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
1751 {
1752 Datum label = (Datum) 0;
1753 bool labelisnull;
1754
1755 labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
1756 if (!labelisnull)
1758 nodes[i] = spgFormNodeTuple(state, label, labelisnull);
1759 }
1760
1761 prefixTuple = spgFormInnerTuple(state,
1765 nodes);
1766
1767 /* it must fit in the space that innerTuple now occupies */
1768 if (prefixTuple->size > innerTuple->size)
1769 elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1770
1771 /*
1772 * Construct new postfix tuple, containing all nodes of innerTuple with
1773 * same node datums, but with the prefix specified by the picksplit
1774 * function.
1775 */
1776 nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1777 SGITITERATE(innerTuple, i, node)
1778 {
1779 nodes[i] = node;
1780 }
1781
1782 postfixTuple = spgFormInnerTuple(state,
1785 innerTuple->nNodes, nodes);
1786
1787 /* Postfix tuple is allTheSame if original tuple was */
1788 postfixTuple->allTheSame = innerTuple->allTheSame;
1789
1790 /* prep data for WAL record */
1791 xlrec.newPage = false;
1792
1793 /*
1794 * If we can't fit both tuples on the current page, get a new page for the
1795 * postfix tuple. In particular, can't split to the root page.
1796 *
1797 * For the space calculation, note that prefixTuple replaces innerTuple
1798 * but postfixTuple will be a new entry.
1799 */
1800 if (SpGistBlockIsRoot(current->blkno) ||
1801 SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1802 prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1803 {
1804 /*
1805 * Choose page with next triple parity, because postfix tuple is a
1806 * child of prefix one
1807 */
1808 newBuffer = SpGistGetBuffer(index,
1809 GBUF_INNER_PARITY(current->blkno + 1),
1810 postfixTuple->size + sizeof(ItemIdData),
1811 &xlrec.newPage);
1812 }
1813
1815
1816 /*
1817 * Replace old tuple by prefix tuple
1818 */
1819 PageIndexTupleDelete(current->page, current->offnum);
1820 xlrec.offnumPrefix = PageAddItem(current->page,
1821 (Item) prefixTuple, prefixTuple->size,
1822 current->offnum, false, false);
1823 if (xlrec.offnumPrefix != current->offnum)
1824 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1825 prefixTuple->size);
1826
1827 /*
1828 * put postfix tuple into appropriate page
1829 */
1830 if (newBuffer == InvalidBuffer)
1831 {
1832 postfixBlkno = current->blkno;
1833 xlrec.offnumPostfix = postfixOffset =
1835 (Item) postfixTuple, postfixTuple->size,
1836 NULL, false);
1837 xlrec.postfixBlkSame = true;
1838 }
1839 else
1840 {
1841 postfixBlkno = BufferGetBlockNumber(newBuffer);
1842 xlrec.offnumPostfix = postfixOffset =
1844 (Item) postfixTuple, postfixTuple->size,
1845 NULL, false);
1846 MarkBufferDirty(newBuffer);
1847 xlrec.postfixBlkSame = false;
1848 }
1849
1850 /*
1851 * And set downlink pointer in the prefix tuple to point to postfix tuple.
1852 * (We can't avoid this step by doing the above two steps in opposite
1853 * order, because there might not be enough space on the page to insert
1854 * the postfix tuple first.) We have to update the local copy of the
1855 * prefixTuple too, because that's what will be written to WAL.
1856 */
1858 postfixBlkno, postfixOffset);
1859 prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1860 PageGetItemId(current->page, current->offnum));
1862 postfixBlkno, postfixOffset);
1863
1864 MarkBufferDirty(current->buffer);
1865
1866 if (RelationNeedsWAL(index) && !state->isBuild)
1867 {
1868 XLogRecPtr recptr;
1869
1871 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1872 XLogRegisterData((char *) prefixTuple, prefixTuple->size);
1873 XLogRegisterData((char *) postfixTuple, postfixTuple->size);
1874
1876 if (newBuffer != InvalidBuffer)
1877 {
1878 int flags;
1879
1880 flags = REGBUF_STANDARD;
1881 if (xlrec.newPage)
1882 flags |= REGBUF_WILL_INIT;
1883 XLogRegisterBuffer(1, newBuffer, flags);
1884 }
1885
1886 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1887
1888 PageSetLSN(current->page, recptr);
1889
1890 if (newBuffer != InvalidBuffer)
1891 {
1892 PageSetLSN(BufferGetPage(newBuffer), recptr);
1893 }
1894 }
1895
1897
1898 /* Update local free-space cache and release buffer */
1899 if (newBuffer != InvalidBuffer)
1900 {
1901 SpGistSetLastUsedPage(index, newBuffer);
1902 UnlockReleaseBuffer(newBuffer);
1903 }
1904}
1905
1906/*
1907 * Insert one item into the index.
1908 *
1909 * Returns true on success, false if we failed to complete the insertion
1910 * (typically because of conflict with a concurrent insert). In the latter
1911 * case, caller should re-call spgdoinsert() with the same args.
1912 */
1913bool
1915 ItemPointer heapPtr, Datum *datums, bool *isnulls)
1916{
1917 bool result = true;
1918 TupleDesc leafDescriptor = state->leafTupDesc;
1919 bool isnull = isnulls[spgKeyColumn];
1920 int level = 0;
1921 Datum leafDatums[INDEX_MAX_KEYS];
1922 int leafSize;
1923 int bestLeafSize;
1924 int numNoProgressCycles = 0;
1925 SPPageDesc current,
1926 parent;
1927 FmgrInfo *procinfo = NULL;
1928
1929 /*
1930 * Look up FmgrInfo of the user-defined choose function once, to save
1931 * cycles in the loop below.
1932 */
1933 if (!isnull)
1935
1936 /*
1937 * Prepare the leaf datum to insert.
1938 *
1939 * If an optional "compress" method is provided, then call it to form the
1940 * leaf key datum from the input datum. Otherwise, store the input datum
1941 * as is. Since we don't use index_form_tuple in this AM, we have to make
1942 * sure value to be inserted is not toasted; FormIndexDatum doesn't
1943 * guarantee that. But we assume the "compress" method to return an
1944 * untoasted value.
1945 */
1946 if (!isnull)
1947 {
1949 {
1950 FmgrInfo *compressProcinfo = NULL;
1951
1952 compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
1953 leafDatums[spgKeyColumn] =
1954 FunctionCall1Coll(compressProcinfo,
1955 index->rd_indcollation[spgKeyColumn],
1956 datums[spgKeyColumn]);
1957 }
1958 else
1959 {
1960 Assert(state->attLeafType.type == state->attType.type);
1961
1962 if (state->attType.attlen == -1)
1963 leafDatums[spgKeyColumn] =
1965 else
1966 leafDatums[spgKeyColumn] = datums[spgKeyColumn];
1967 }
1968 }
1969 else
1970 leafDatums[spgKeyColumn] = (Datum) 0;
1971
1972 /* Likewise, ensure that any INCLUDE values are not toasted */
1973 for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++)
1974 {
1975 if (!isnulls[i])
1976 {
1977 if (TupleDescCompactAttr(leafDescriptor, i)->attlen == -1)
1978 leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i]));
1979 else
1980 leafDatums[i] = datums[i];
1981 }
1982 else
1983 leafDatums[i] = (Datum) 0;
1984 }
1985
1986 /*
1987 * Compute space needed for a leaf tuple containing the given data.
1988 */
1989 leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls);
1990 /* Account for an item pointer, too */
1991 leafSize += sizeof(ItemIdData);
1992
1993 /*
1994 * If it isn't gonna fit, and the opclass can't reduce the datum size by
1995 * suffixing, bail out now rather than doing a lot of useless work.
1996 */
1997 if (leafSize > SPGIST_PAGE_CAPACITY &&
1998 (isnull || !state->config.longValuesOK))
1999 ereport(ERROR,
2000 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2001 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2002 leafSize - sizeof(ItemIdData),
2005 errhint("Values larger than a buffer page cannot be indexed.")));
2006 bestLeafSize = leafSize;
2007
2008 /* Initialize "current" to the appropriate root page */
2009 current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
2010 current.buffer = InvalidBuffer;
2011 current.page = NULL;
2012 current.offnum = FirstOffsetNumber;
2013 current.node = -1;
2014
2015 /* "parent" is invalid for the moment */
2016 parent.blkno = InvalidBlockNumber;
2017 parent.buffer = InvalidBuffer;
2018 parent.page = NULL;
2019 parent.offnum = InvalidOffsetNumber;
2020 parent.node = -1;
2021
2022 /*
2023 * Before entering the loop, try to clear any pending interrupt condition.
2024 * If a query cancel is pending, we might as well accept it now not later;
2025 * while if a non-canceling condition is pending, servicing it here avoids
2026 * having to restart the insertion and redo all the work so far.
2027 */
2029
2030 for (;;)
2031 {
2032 bool isNew = false;
2033
2034 /*
2035 * Bail out if query cancel is pending. We must have this somewhere
2036 * in the loop since a broken opclass could produce an infinite
2037 * picksplit loop. However, because we'll be holding buffer lock(s)
2038 * after the first iteration, ProcessInterrupts() wouldn't be able to
2039 * throw a cancel error here. Hence, if we see that an interrupt is
2040 * pending, break out of the loop and deal with the situation below.
2041 * Set result = false because we must restart the insertion if the
2042 * interrupt isn't a query-cancel-or-die case.
2043 */
2045 {
2046 result = false;
2047 break;
2048 }
2049
2050 if (current.blkno == InvalidBlockNumber)
2051 {
2052 /*
2053 * Create a leaf page. If leafSize is too large to fit on a page,
2054 * we won't actually use the page yet, but it simplifies the API
2055 * for doPickSplit to always have a leaf page at hand; so just
2056 * quietly limit our request to a page size.
2057 */
2058 current.buffer =
2060 GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
2061 Min(leafSize, SPGIST_PAGE_CAPACITY),
2062 &isNew);
2063 current.blkno = BufferGetBlockNumber(current.buffer);
2064 }
2065 else if (parent.buffer == InvalidBuffer)
2066 {
2067 /* we hold no parent-page lock, so no deadlock is possible */
2068 current.buffer = ReadBuffer(index, current.blkno);
2070 }
2071 else if (current.blkno != parent.blkno)
2072 {
2073 /* descend to a new child page */
2074 current.buffer = ReadBuffer(index, current.blkno);
2075
2076 /*
2077 * Attempt to acquire lock on child page. We must beware of
2078 * deadlock against another insertion process descending from that
2079 * page to our parent page (see README). If we fail to get lock,
2080 * abandon the insertion and tell our caller to start over.
2081 *
2082 * XXX this could be improved, because failing to get lock on a
2083 * buffer is not proof of a deadlock situation; the lock might be
2084 * held by a reader, or even just background writer/checkpointer
2085 * process. Perhaps it'd be worth retrying after sleeping a bit?
2086 */
2087 if (!ConditionalLockBuffer(current.buffer))
2088 {
2089 ReleaseBuffer(current.buffer);
2091 return false;
2092 }
2093 }
2094 else
2095 {
2096 /* inner tuple can be stored on the same page as parent one */
2097 current.buffer = parent.buffer;
2098 }
2099 current.page = BufferGetPage(current.buffer);
2100
2101 /* should not arrive at a page of the wrong type */
2102 if (isnull ? !SpGistPageStoresNulls(current.page) :
2103 SpGistPageStoresNulls(current.page))
2104 elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2105 current.blkno);
2106
2107 if (SpGistPageIsLeaf(current.page))
2108 {
2109 SpGistLeafTuple leafTuple;
2110 int nToSplit,
2111 sizeToSplit;
2112
2113 leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls);
2114 if (leafTuple->size + sizeof(ItemIdData) <=
2115 SpGistPageGetFreeSpace(current.page, 1))
2116 {
2117 /* it fits on page, so insert it and we're done */
2118 addLeafTuple(index, state, leafTuple,
2119 &current, &parent, isnull, isNew);
2120 break;
2121 }
2122 else if ((sizeToSplit =
2123 checkSplitConditions(index, state, &current,
2124 &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
2125 nToSplit < 64 &&
2126 leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2127 {
2128 /*
2129 * the amount of data is pretty small, so just move the whole
2130 * chain to another leaf page rather than splitting it.
2131 */
2132 Assert(!isNew);
2133 moveLeafs(index, state, &current, &parent, leafTuple, isnull);
2134 break; /* we're done */
2135 }
2136 else
2137 {
2138 /* picksplit */
2139 if (doPickSplit(index, state, &current, &parent,
2140 leafTuple, level, isnull, isNew))
2141 break; /* doPickSplit installed new tuples */
2142
2143 /* leaf tuple will not be inserted yet */
2144 pfree(leafTuple);
2145
2146 /*
2147 * current now describes new inner tuple, go insert into it
2148 */
2149 Assert(!SpGistPageIsLeaf(current.page));
2150 goto process_inner_tuple;
2151 }
2152 }
2153 else /* non-leaf page */
2154 {
2155 /*
2156 * Apply the opclass choose function to figure out how to insert
2157 * the given datum into the current inner tuple.
2158 */
2159 SpGistInnerTuple innerTuple;
2160 spgChooseIn in;
2161 spgChooseOut out;
2162
2163 /*
2164 * spgAddNode and spgSplitTuple cases will loop back to here to
2165 * complete the insertion operation. Just in case the choose
2166 * function is broken and produces add or split requests
2167 * repeatedly, check for query cancel (see comments above).
2168 */
2169 process_inner_tuple:
2171 {
2172 result = false;
2173 break;
2174 }
2175
2176 innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2177 PageGetItemId(current.page, current.offnum));
2178
2179 in.datum = datums[spgKeyColumn];
2180 in.leafDatum = leafDatums[spgKeyColumn];
2181 in.level = level;
2182 in.allTheSame = innerTuple->allTheSame;
2183 in.hasPrefix = (innerTuple->prefixSize > 0);
2184 in.prefixDatum = SGITDATUM(innerTuple, state);
2185 in.nNodes = innerTuple->nNodes;
2186 in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2187
2188 memset(&out, 0, sizeof(out));
2189
2190 if (!isnull)
2191 {
2192 /* use user-defined choose method */
2193 FunctionCall2Coll(procinfo,
2194 index->rd_indcollation[0],
2195 PointerGetDatum(&in),
2196 PointerGetDatum(&out));
2197 }
2198 else
2199 {
2200 /* force "match" action (to insert to random subnode) */
2202 }
2203
2204 if (innerTuple->allTheSame)
2205 {
2206 /*
2207 * It's not allowed to do an AddNode at an allTheSame tuple.
2208 * Opclass must say "match", in which case we choose a random
2209 * one of the nodes to descend into, or "split".
2210 */
2211 if (out.resultType == spgAddNode)
2212 elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2213 else if (out.resultType == spgMatchNode)
2214 out.result.matchNode.nodeN =
2216 0, innerTuple->nNodes - 1);
2217 }
2218
2219 switch (out.resultType)
2220 {
2221 case spgMatchNode:
2222 /* Descend to N'th child node */
2223 spgMatchNodeAction(index, state, innerTuple,
2224 &current, &parent,
2225 out.result.matchNode.nodeN);
2226 /* Adjust level as per opclass request */
2227 level += out.result.matchNode.levelAdd;
2228 /* Replace leafDatum and recompute leafSize */
2229 if (!isnull)
2230 {
2231 leafDatums[spgKeyColumn] = out.result.matchNode.restDatum;
2232 leafSize = SpGistGetLeafTupleSize(leafDescriptor,
2233 leafDatums, isnulls);
2234 leafSize += sizeof(ItemIdData);
2235 }
2236
2237 /*
2238 * Check new tuple size; fail if it can't fit, unless the
2239 * opclass says it can handle the situation by suffixing.
2240 *
2241 * However, the opclass can only shorten the leaf datum,
2242 * which may not be enough to ever make the tuple fit,
2243 * since INCLUDE columns might alone use more than a page.
2244 * Depending on the opclass' behavior, that could lead to
2245 * an infinite loop --- spgtextproc.c, for example, will
2246 * just repeatedly generate an empty-string leaf datum
2247 * once it runs out of data. Actual bugs in opclasses
2248 * might cause infinite looping, too. To detect such a
2249 * loop, check to see if we are making progress by
2250 * reducing the leafSize in each pass. This is a bit
2251 * tricky though. Because of alignment considerations,
2252 * the total tuple size might not decrease on every pass.
2253 * Also, there are edge cases where the choose method
2254 * might seem to not make progress for a cycle or two.
2255 * Somewhat arbitrarily, we allow up to 10 no-progress
2256 * iterations before failing. (This limit should be more
2257 * than MAXALIGN, to accommodate opclasses that trim one
2258 * byte from the leaf datum per pass.)
2259 */
2260 if (leafSize > SPGIST_PAGE_CAPACITY)
2261 {
2262 bool ok = false;
2263
2264 if (state->config.longValuesOK && !isnull)
2265 {
2266 if (leafSize < bestLeafSize)
2267 {
2268 ok = true;
2269 bestLeafSize = leafSize;
2270 numNoProgressCycles = 0;
2271 }
2272 else if (++numNoProgressCycles < 10)
2273 ok = true;
2274 }
2275 if (!ok)
2276 ereport(ERROR,
2277 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2278 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2279 leafSize - sizeof(ItemIdData),
2282 errhint("Values larger than a buffer page cannot be indexed.")));
2283 }
2284
2285 /*
2286 * Loop around and attempt to insert the new leafDatum at
2287 * "current" (which might reference an existing child
2288 * tuple, or might be invalid to force us to find a new
2289 * page for the tuple).
2290 */
2291 break;
2292 case spgAddNode:
2293 /* AddNode is not sensible if nodes don't have labels */
2294 if (in.nodeLabels == NULL)
2295 elog(ERROR, "cannot add a node to an inner tuple without node labels");
2296 /* Add node to inner tuple, per request */
2297 spgAddNodeAction(index, state, innerTuple,
2298 &current, &parent,
2299 out.result.addNode.nodeN,
2301
2302 /*
2303 * Retry insertion into the enlarged node. We assume that
2304 * we'll get a MatchNode result this time.
2305 */
2306 goto process_inner_tuple;
2307 break;
2308 case spgSplitTuple:
2309 /* Split inner tuple, per request */
2310 spgSplitNodeAction(index, state, innerTuple,
2311 &current, &out);
2312
2313 /* Retry insertion into the split node */
2314 goto process_inner_tuple;
2315 break;
2316 default:
2317 elog(ERROR, "unrecognized SPGiST choose result: %d",
2318 (int) out.resultType);
2319 break;
2320 }
2321 }
2322 } /* end loop */
2323
2324 /*
2325 * Release any buffers we're still holding. Beware of possibility that
2326 * current and parent reference same buffer.
2327 */
2328 if (current.buffer != InvalidBuffer)
2329 {
2331 UnlockReleaseBuffer(current.buffer);
2332 }
2333 if (parent.buffer != InvalidBuffer &&
2334 parent.buffer != current.buffer)
2335 {
2338 }
2339
2340 /*
2341 * We do not support being called while some outer function is holding a
2342 * buffer lock (or any other reason to postpone query cancels). If that
2343 * were the case, telling the caller to retry would create an infinite
2344 * loop.
2345 */
2347
2348 /*
2349 * Finally, check for interrupts again. If there was a query cancel,
2350 * ProcessInterrupts() will be able to throw the error here. If it was
2351 * some other kind of interrupt that can just be cleared, return false to
2352 * tell our caller to retry.
2353 */
2355
2356 return result;
2357}
uint32 BlockNumber
Definition: block.h:31
#define InvalidBlockNumber
Definition: block.h:33
int Buffer
Definition: buf.h:23
#define InvalidBuffer
Definition: buf.h:25
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:3724
bool ConditionalLockBuffer(Buffer buffer)
Definition: bufmgr.c:5184
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4924
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4941
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:2532
void LockBuffer(Buffer buffer, int mode)
Definition: bufmgr.c:5158
Buffer ReadBuffer(Relation reln, BlockNumber blockNum)
Definition: bufmgr.c:746
static Page BufferGetPage(Buffer buffer)
Definition: bufmgr.h:400
#define BUFFER_LOCK_EXCLUSIVE
Definition: bufmgr.h:191
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:351
void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
Definition: bufpage.c:1150
void PageIndexTupleDelete(Page page, OffsetNumber offnum)
Definition: bufpage.c:1041
Size PageGetExactFreeSpace(Page page)
Definition: bufpage.c:947
Pointer Page
Definition: bufpage.h:81
static Item PageGetItem(Page page, ItemId itemId)
Definition: bufpage.h:354
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
Definition: bufpage.h:243
static void PageSetLSN(Page page, XLogRecPtr lsn)
Definition: bufpage.h:391
static OffsetNumber PageGetMaxOffsetNumber(Page page)
Definition: bufpage.h:372
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:471
#define Min(x, y)
Definition: c.h:958
uint8_t uint8
Definition: c.h:483
#define Assert(condition)
Definition: c.h:812
#define OidIsValid(objectId)
Definition: c.h:729
int errhint(const char *fmt,...)
Definition: elog.c:1317
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1149
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
Definition: fmgr.c:1129
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:240
#define nitems(x)
Definition: indent.h:31
FmgrInfo * index_getprocinfo(Relation irel, AttrNumber attnum, uint16 procnum)
Definition: indexam.c:862
RegProcedure index_getprocid(Relation irel, AttrNumber attnum, uint16 procnum)
Definition: indexam.c:828
static int pg_cmp_u16(uint16 a, uint16 b)
Definition: int.h:640
int b
Definition: isn.c:69
int a
Definition: isn.c:68
int i
Definition: isn.c:72
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:76
Pointer Item
Definition: item.h:17
struct ItemIdData ItemIdData
static void ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum)
Definition: itemptr.h:135
static OffsetNumber ItemPointerGetOffsetNumber(const ItemPointerData *pointer)
Definition: itemptr.h:124
static BlockNumber ItemPointerGetBlockNumber(const ItemPointerData *pointer)
Definition: itemptr.h:103
static bool ItemPointerIsValid(const ItemPointerData *pointer)
Definition: itemptr.h:83
#define MaxIndexTuplesPerPage
Definition: itup.h:167
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
void * palloc(Size size)
Definition: mcxt.c:1317
#define INTERRUPTS_CAN_BE_PROCESSED()
Definition: miscadmin.h:129
#define START_CRIT_SECTION()
Definition: miscadmin.h:149
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define INTERRUPTS_PENDING_CONDITION()
Definition: miscadmin.h:113
#define END_CRIT_SECTION()
Definition: miscadmin.h:151
#define InvalidOffsetNumber
Definition: off.h:26
uint16 OffsetNumber
Definition: off.h:24
#define FirstOffsetNumber
Definition: off.h:27
static char * label
#define INDEX_MAX_KEYS
uint64 pg_prng_uint64_range(pg_prng_state *state, uint64 rmin, uint64 rmax)
Definition: pg_prng.c:144
pg_prng_state pg_global_prng_state
Definition: pg_prng.c:34
#define qsort(a, b, c, d)
Definition: port.h:447
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RelationNeedsWAL(relation)
Definition: rel.h:628
static pg_noinline void Size size
Definition: slab.c:607
static void spgAddNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN, Datum nodeLabel)
Definition: spgdoinsert.c:1513
static void setRedirectionTuple(SPPageDesc *current, OffsetNumber position, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:568
static bool checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig, bool *includeNew)
Definition: spgdoinsert.c:599
static void spgSplitNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, spgChooseOut *out)
Definition: spgdoinsert.c:1715
static SpGistInnerTuple addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
Definition: spgdoinsert.c:80
bool spgdoinsert(Relation index, SpGistState *state, ItemPointer heapPtr, Datum *datums, bool *isnulls)
Definition: spgdoinsert.c:1914
void spgPageIndexMultiDelete(SpGistState *state, Page page, OffsetNumber *itemnos, int nitems, int firststate, int reststate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:131
static void spgMatchNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN)
Definition: spgdoinsert.c:1459
static bool doPickSplit(Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, int level, bool isNulls, bool isNew)
Definition: spgdoinsert.c:677
void spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN, BlockNumber blkno, OffsetNumber offset)
Definition: spgdoinsert.c:52
static void moveLeafs(Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, bool isNulls)
Definition: spgdoinsert.c:387
static void saveNodeLink(Relation index, SPPageDesc *parent, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:186
struct SPPageDesc SPPageDesc
static int checkSplitConditions(Relation index, SpGistState *state, SPPageDesc *current, int *nToSplit)
Definition: spgdoinsert.c:333
static void addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
Definition: spgdoinsert.c:203
static int cmpOffsetNumbers(const void *a, const void *b)
Definition: spgdoinsert.c:112
#define SPGIST_COMPRESS_PROC
Definition: spgist.h:28
#define SPGIST_CHOOSE_PROC
Definition: spgist.h:24
@ spgMatchNode
Definition: spgist.h:69
@ spgAddNode
Definition: spgist.h:70
@ spgSplitTuple
Definition: spgist.h:71
#define SPGIST_PICKSPLIT_PROC
Definition: spgist.h:25
#define SGLTDATUM(x, s)
#define SPGIST_NULL_BLKNO
SpGistDeadTupleData * SpGistDeadTuple
#define GBUF_NULLS
#define SPGIST_REDIRECT
SpGistInnerTupleData * SpGistInnerTuple
#define SPGIST_LIVE
#define SGDTSIZE
#define SpGistPageStoresNulls(page)
#define SGITDATUM(x, s)
#define SGLT_GET_NEXTOFFSET(spgLeafTuple)
#define SPGIST_PLACEHOLDER
#define SGITITERATE(x, i, nt)
#define GBUF_LEAF
#define spgFirstIncludeColumn
#define SPGIST_DEAD
#define SpGistPageIsLeaf(page)
#define SPGIST_METAPAGE_BLKNO
#define SGLT_SET_NEXTOFFSET(spgLeafTuple, offsetNumber)
#define SpGistBlockIsRoot(blkno)
struct SpGistLeafTupleData * SpGistLeafTuple
#define SPGIST_NULLS
#define STORE_STATE(s, d)
#define SPGIST_PAGE_CAPACITY
#define SGITMAXNNODES
#define SpGistPageGetOpaque(page)
#define GBUF_INNER_PARITY(x)
#define SPGIST_LEAF
#define spgKeyColumn
#define SPGIST_ROOT_BLKNO
#define SpGistPageGetFreeSpace(p, n)
Datum * spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple)
Definition: spgutils.c:1155
SpGistInnerTuple spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix, int nNodes, SpGistNodeTuple *nodes)
Definition: spgutils.c:997
SpGistLeafTuple spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, const Datum *datums, const bool *isnulls)
Definition: spgutils.c:866
void spgDeformLeafTuple(SpGistLeafTuple tup, TupleDesc tupleDescriptor, Datum *datums, bool *isnulls, bool keyColumnIsNull)
Definition: spgutils.c:1110
SpGistDeadTuple spgFormDeadTuple(SpGistState *state, int tupstate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgutils.c:1080
void SpGistInitBuffer(Buffer b, uint16 f)
Definition: spgutils.c:717
Buffer SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
Definition: spgutils.c:564
void SpGistSetLastUsedPage(Relation index, Buffer buffer)
Definition: spgutils.c:668
OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size, OffsetNumber *startOffset, bool errorOK)
Definition: spgutils.c:1198
Size SpGistGetLeafTupleSize(TupleDesc tupleDescriptor, const Datum *datums, const bool *isnulls)
Definition: spgutils.c:813
SpGistNodeTuple spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
Definition: spgutils.c:955
#define XLOG_SPGIST_SPLIT_TUPLE
Definition: spgxlog.h:25
#define SizeOfSpgxlogPickSplit
Definition: spgxlog.h:199
#define SizeOfSpgxlogMoveLeafs
Definition: spgxlog.h:91
#define XLOG_SPGIST_ADD_NODE
Definition: spgxlog.h:24
#define XLOG_SPGIST_ADD_LEAF
Definition: spgxlog.h:22
#define XLOG_SPGIST_MOVE_LEAFS
Definition: spgxlog.h:23
#define XLOG_SPGIST_PICKSPLIT
Definition: spgxlog.h:26
int16 attlen
Definition: tupdesc.h:69
Definition: fmgr.h:57
ItemPointerData t_tid
Definition: itup.h:37
Buffer buffer
Definition: spgdoinsert.c:39
OffsetNumber offnum
Definition: spgdoinsert.c:41
BlockNumber blkno
Definition: spgdoinsert.c:38
unsigned int tupstate
ItemPointerData pointer
unsigned int prefixSize
unsigned int allTheSame
unsigned int tupstate
Definition: type.h:96
Datum * nodeLabels
Definition: spgist.h:64
bool hasPrefix
Definition: spgist.h:61
Datum prefixDatum
Definition: spgist.h:62
int nNodes
Definition: spgist.h:63
Datum datum
Definition: spgist.h:55
int level
Definition: spgist.h:57
Datum leafDatum
Definition: spgist.h:56
bool allTheSame
Definition: spgist.h:60
bool postfixHasPrefix
Definition: spgist.h:101
int childNodeN
Definition: spgist.h:98
spgChooseResultType resultType
Definition: spgist.h:76
int levelAdd
Definition: spgist.h:82
struct spgChooseOut::@51::@54 splitTuple
Datum nodeLabel
Definition: spgist.h:87
Datum * prefixNodeLabels
Definition: spgist.h:96
Datum postfixPrefixDatum
Definition: spgist.h:102
Datum restDatum
Definition: spgist.h:83
int prefixNNodes
Definition: spgist.h:95
int nodeN
Definition: spgist.h:81
Datum prefixPrefixDatum
Definition: spgist.h:94
struct spgChooseOut::@51::@53 addNode
bool prefixHasPrefix
Definition: spgist.h:93
struct spgChooseOut::@51::@52 matchNode
union spgChooseOut::@51 result
Datum * datums
Definition: spgist.h:113
bool hasPrefix
Definition: spgist.h:119
int * mapTuplesToNodes
Definition: spgist.h:125
Datum * nodeLabels
Definition: spgist.h:123
Datum * leafTupleDatums
Definition: spgist.h:126
Datum prefixDatum
Definition: spgist.h:120
uint16 nodeI
Definition: spgxlog.h:54
bool newPage
Definition: spgxlog.h:48
OffsetNumber offnumLeaf
Definition: spgxlog.h:50
bool storesNulls
Definition: spgxlog.h:49
OffsetNumber offnumHeadLeaf
Definition: spgxlog.h:51
OffsetNumber offnumParent
Definition: spgxlog.h:53
OffsetNumber offnumNew
Definition: spgxlog.h:111
bool newPage
Definition: spgxlog.h:112
OffsetNumber offnumParent
Definition: spgxlog.h:126
OffsetNumber offnum
Definition: spgxlog.h:105
spgxlogState stateSrc
Definition: spgxlog.h:130
uint16 nodeI
Definition: spgxlog.h:128
int8 parentBlk
Definition: spgxlog.h:125
bool replaceDead
Definition: spgxlog.h:68
bool storesNulls
Definition: spgxlog.h:69
uint16 nMoves
Definition: spgxlog.h:66
uint16 nodeI
Definition: spgxlog.h:73
OffsetNumber offnumParent
Definition: spgxlog.h:72
spgxlogState stateSrc
Definition: spgxlog.h:75
uint16 nInsert
Definition: spgxlog.h:170
spgxlogState stateSrc
Definition: spgxlog.h:185
bool innerIsParent
Definition: spgxlog.h:181
uint16 nodeI
Definition: spgxlog.h:183
bool storesNulls
Definition: spgxlog.h:178
OffsetNumber offnumParent
Definition: spgxlog.h:182
OffsetNumber offnumInner
Definition: spgxlog.h:175
uint16 nDelete
Definition: spgxlog.h:169
bool isRootSplit
Definition: spgxlog.h:167
OffsetNumber offnumPostfix
Definition: spgxlog.h:147
OffsetNumber offnumPrefix
Definition: spgxlog.h:144
bool postfixBlkSame
Definition: spgxlog.h:149
Definition: regguts.h:323
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:169
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:474
void XLogRegisterData(const char *data, uint32 len)
Definition: xloginsert.c:364
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:242
void XLogBeginInsert(void)
Definition: xloginsert.c:149
#define REGBUF_STANDARD
Definition: xloginsert.h:34
#define REGBUF_WILL_INIT
Definition: xloginsert.h:33