PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
ginfast.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * ginfast.c
4 * Fast insert routines for the Postgres inverted index access method.
5 * Pending entries are stored in linear list of pages. Later on
6 * (typically during VACUUM), ginInsertCleanup() will be invoked to
7 * transfer pending entries into the regular index structure. This
8 * wins because bulk insertion is much more efficient than retail.
9 *
10 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
11 * Portions Copyright (c) 1994, Regents of the University of California
12 *
13 * IDENTIFICATION
14 * src/backend/access/gin/ginfast.c
15 *
16 *-------------------------------------------------------------------------
17 */
18
19#include "postgres.h"
20
21#include "access/gin_private.h"
22#include "access/ginxlog.h"
23#include "access/xlog.h"
24#include "access/xloginsert.h"
25#include "catalog/pg_am.h"
26#include "commands/vacuum.h"
27#include "miscadmin.h"
28#include "port/pg_bitutils.h"
30#include "storage/indexfsm.h"
31#include "storage/lmgr.h"
32#include "storage/predicate.h"
33#include "utils/acl.h"
34#include "utils/fmgrprotos.h"
35#include "utils/memutils.h"
36#include "utils/rel.h"
37
38/* GUC parameter */
40
41#define GIN_PAGE_FREESIZE \
42 ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
43
44typedef struct KeyArray
45{
46 Datum *keys; /* expansible array */
47 GinNullCategory *categories; /* another expansible array */
48 int32 nvalues; /* current number of valid entries */
49 int32 maxvalues; /* allocated size of arrays */
51
52
53/*
54 * Build a pending-list page from the given array of tuples, and write it out.
55 *
56 * Returns amount of free space left on the page.
57 */
58static int32
60 IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
61{
62 Page page = BufferGetPage(buffer);
63 int32 i,
64 freesize,
65 size = 0;
67 off;
68 PGAlignedBlock workspace;
69 char *ptr;
70
72
73 GinInitBuffer(buffer, GIN_LIST);
74
76 ptr = workspace.data;
77
78 for (i = 0; i < ntuples; i++)
79 {
80 int this_size = IndexTupleSize(tuples[i]);
81
82 memcpy(ptr, tuples[i], this_size);
83 ptr += this_size;
84 size += this_size;
85
86 l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
87
88 if (l == InvalidOffsetNumber)
89 elog(ERROR, "failed to add item to index page in \"%s\"",
91
92 off++;
93 }
94
95 Assert(size <= BLCKSZ); /* else we overran workspace */
96
97 GinPageGetOpaque(page)->rightlink = rightlink;
98
99 /*
100 * tail page may contain only whole row(s) or final part of row placed on
101 * previous pages (a "row" here meaning all the index tuples generated for
102 * one heap tuple)
103 */
104 if (rightlink == InvalidBlockNumber)
105 {
106 GinPageSetFullRow(page);
107 GinPageGetOpaque(page)->maxoff = 1;
108 }
109 else
110 {
111 GinPageGetOpaque(page)->maxoff = 0;
112 }
113
114 MarkBufferDirty(buffer);
115
117 {
119 XLogRecPtr recptr;
120
121 data.rightlink = rightlink;
122 data.ntuples = ntuples;
123
125 XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
126
128 XLogRegisterBufData(0, workspace.data, size);
129
130 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
131 PageSetLSN(page, recptr);
132 }
133
134 /* get free space before releasing buffer */
135 freesize = PageGetExactFreeSpace(page);
136
137 UnlockReleaseBuffer(buffer);
138
140
141 return freesize;
142}
143
144static void
147{
148 Buffer curBuffer = InvalidBuffer;
149 Buffer prevBuffer = InvalidBuffer;
150 int i,
151 size = 0,
152 tupsize;
153 int startTuple = 0;
154
155 Assert(ntuples > 0);
156
157 /*
158 * Split tuples into pages
159 */
160 for (i = 0; i < ntuples; i++)
161 {
162 if (curBuffer == InvalidBuffer)
163 {
164 curBuffer = GinNewBuffer(index);
165
166 if (prevBuffer != InvalidBuffer)
167 {
168 res->nPendingPages++;
169 writeListPage(index, prevBuffer,
170 tuples + startTuple,
171 i - startTuple,
172 BufferGetBlockNumber(curBuffer));
173 }
174 else
175 {
176 res->head = BufferGetBlockNumber(curBuffer);
177 }
178
179 prevBuffer = curBuffer;
180 startTuple = i;
181 size = 0;
182 }
183
184 tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
185
186 if (size + tupsize > GinListPageSize)
187 {
188 /* won't fit, force a new page and reprocess */
189 i--;
190 curBuffer = InvalidBuffer;
191 }
192 else
193 {
194 size += tupsize;
195 }
196 }
197
198 /*
199 * Write last page
200 */
201 res->tail = BufferGetBlockNumber(curBuffer);
202 res->tailFreeSize = writeListPage(index, curBuffer,
203 tuples + startTuple,
204 ntuples - startTuple,
206 res->nPendingPages++;
207 /* that was only one heap tuple */
208 res->nPendingHeapTuples = 1;
209}
210
211/*
212 * Write the index tuples contained in *collector into the index's
213 * pending list.
214 *
215 * Function guarantees that all these tuples will be inserted consecutively,
216 * preserving order
217 */
218void
220{
221 Relation index = ginstate->index;
222 Buffer metabuffer;
223 Page metapage;
224 GinMetaPageData *metadata = NULL;
225 Buffer buffer = InvalidBuffer;
226 Page page = NULL;
228 bool separateList = false;
229 bool needCleanup = false;
230 int cleanupSize;
231 bool needWal;
232
233 if (collector->ntuples == 0)
234 return;
235
236 needWal = RelationNeedsWAL(index);
237
238 data.locator = index->rd_locator;
239 data.ntuples = 0;
240 data.newRightlink = data.prevTail = InvalidBlockNumber;
241
242 metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
243 metapage = BufferGetPage(metabuffer);
244
245 /*
246 * An insertion to the pending list could logically belong anywhere in the
247 * tree, so it conflicts with all serializable scans. All scans acquire a
248 * predicate lock on the metabuffer to represent that. Therefore we'll
249 * check for conflicts in, but not until we have the page locked and are
250 * ready to modify the page.
251 */
252
253 if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
254 {
255 /*
256 * Total size is greater than one page => make sublist
257 */
258 separateList = true;
259 }
260 else
261 {
262 LockBuffer(metabuffer, GIN_EXCLUSIVE);
263 metadata = GinPageGetMeta(metapage);
264
265 if (metadata->head == InvalidBlockNumber ||
266 collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
267 {
268 /*
269 * Pending list is empty or total size is greater than freespace
270 * on tail page => make sublist
271 *
272 * We unlock metabuffer to keep high concurrency
273 */
274 separateList = true;
275 LockBuffer(metabuffer, GIN_UNLOCK);
276 }
277 }
278
279 if (separateList)
280 {
281 /*
282 * We should make sublist separately and append it to the tail
283 */
284 GinMetaPageData sublist;
285
286 memset(&sublist, 0, sizeof(GinMetaPageData));
287 makeSublist(index, collector->tuples, collector->ntuples, &sublist);
288
289 /*
290 * metapage was unlocked, see above
291 */
292 LockBuffer(metabuffer, GIN_EXCLUSIVE);
293 metadata = GinPageGetMeta(metapage);
294
296
297 if (metadata->head == InvalidBlockNumber)
298 {
299 /*
300 * Main list is empty, so just insert sublist as main list
301 */
303
304 metadata->head = sublist.head;
305 metadata->tail = sublist.tail;
306 metadata->tailFreeSize = sublist.tailFreeSize;
307
308 metadata->nPendingPages = sublist.nPendingPages;
309 metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
310
311 if (needWal)
313 }
314 else
315 {
316 /*
317 * Merge lists
318 */
319 data.prevTail = metadata->tail;
320 data.newRightlink = sublist.head;
321
322 buffer = ReadBuffer(index, metadata->tail);
323 LockBuffer(buffer, GIN_EXCLUSIVE);
324 page = BufferGetPage(buffer);
325
326 Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
327
329
330 GinPageGetOpaque(page)->rightlink = sublist.head;
331
332 MarkBufferDirty(buffer);
333
334 metadata->tail = sublist.tail;
335 metadata->tailFreeSize = sublist.tailFreeSize;
336
337 metadata->nPendingPages += sublist.nPendingPages;
338 metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
339
340 if (needWal)
341 {
344 }
345 }
346 }
347 else
348 {
349 /*
350 * Insert into tail page. Metapage is already locked
351 */
352 OffsetNumber l,
353 off;
354 int i,
355 tupsize;
356 char *ptr;
357 char *collectordata;
358
360
361 buffer = ReadBuffer(index, metadata->tail);
362 LockBuffer(buffer, GIN_EXCLUSIVE);
363 page = BufferGetPage(buffer);
364
365 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
367
368 collectordata = ptr = (char *) palloc(collector->sumsize);
369
370 data.ntuples = collector->ntuples;
371
373
374 if (needWal)
376
377 /*
378 * Increase counter of heap tuples
379 */
380 Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
381 GinPageGetOpaque(page)->maxoff++;
382 metadata->nPendingHeapTuples++;
383
384 for (i = 0; i < collector->ntuples; i++)
385 {
386 tupsize = IndexTupleSize(collector->tuples[i]);
387 l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
388
389 if (l == InvalidOffsetNumber)
390 elog(ERROR, "failed to add item to index page in \"%s\"",
392
393 memcpy(ptr, collector->tuples[i], tupsize);
394 ptr += tupsize;
395
396 off++;
397 }
398
399 Assert((ptr - collectordata) <= collector->sumsize);
400
401 MarkBufferDirty(buffer);
402
403 if (needWal)
404 {
406 XLogRegisterBufData(1, collectordata, collector->sumsize);
407 }
408
409 metadata->tailFreeSize = PageGetExactFreeSpace(page);
410 }
411
412 /*
413 * Set pd_lower just past the end of the metadata. This is essential,
414 * because without doing so, metadata will be lost if xlog.c compresses
415 * the page. (We must do this here because pre-v11 versions of PG did not
416 * set the metapage's pd_lower correctly, so a pg_upgraded index might
417 * contain the wrong value.)
418 */
419 ((PageHeader) metapage)->pd_lower =
420 ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
421
422 /*
423 * Write metabuffer, make xlog entry
424 */
425 MarkBufferDirty(metabuffer);
426
427 if (needWal)
428 {
429 XLogRecPtr recptr;
430
431 memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
432
434 XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
435
436 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
437 PageSetLSN(metapage, recptr);
438
439 if (buffer != InvalidBuffer)
440 {
441 PageSetLSN(page, recptr);
442 }
443 }
444
445 if (buffer != InvalidBuffer)
446 UnlockReleaseBuffer(buffer);
447
448 /*
449 * Force pending list cleanup when it becomes too long. And,
450 * ginInsertCleanup could take significant amount of time, so we prefer to
451 * call it when it can do all the work in a single collection cycle. In
452 * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
453 * while pending list is still small enough to fit into
454 * gin_pending_list_limit.
455 *
456 * ginInsertCleanup() should not be called inside our CRIT_SECTION.
457 */
458 cleanupSize = GinGetPendingListCleanupSize(index);
459 if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
460 needCleanup = true;
461
462 UnlockReleaseBuffer(metabuffer);
463
465
466 /*
467 * Since it could contend with concurrent cleanup process we cleanup
468 * pending list not forcibly.
469 */
470 if (needCleanup)
471 ginInsertCleanup(ginstate, false, true, false, NULL);
472}
473
474/*
475 * Create temporary index tuples for a single indexable item (one index column
476 * for the heap tuple specified by ht_ctid), and append them to the array
477 * in *collector. They will subsequently be written out using
478 * ginHeapTupleFastInsert. Note that to guarantee consistent state, all
479 * temp tuples for a given heap tuple must be written in one call to
480 * ginHeapTupleFastInsert.
481 */
482void
484 GinTupleCollector *collector,
485 OffsetNumber attnum, Datum value, bool isNull,
486 ItemPointer ht_ctid)
487{
488 Datum *entries;
489 GinNullCategory *categories;
490 int32 i,
491 nentries;
492
493 /*
494 * Extract the key values that need to be inserted in the index
495 */
496 entries = ginExtractEntries(ginstate, attnum, value, isNull,
497 &nentries, &categories);
498
499 /*
500 * Protect against integer overflow in allocation calculations
501 */
502 if (nentries < 0 ||
503 collector->ntuples + nentries > MaxAllocSize / sizeof(IndexTuple))
504 elog(ERROR, "too many entries for GIN index");
505
506 /*
507 * Allocate/reallocate memory for storing collected tuples
508 */
509 if (collector->tuples == NULL)
510 {
511 /*
512 * Determine the number of elements to allocate in the tuples array
513 * initially. Make it a power of 2 to avoid wasting memory when
514 * resizing (since palloc likes powers of 2).
515 */
516 collector->lentuples = pg_nextpower2_32(Max(16, nentries));
517 collector->tuples = palloc_array(IndexTuple, collector->lentuples);
518 }
519 else if (collector->lentuples < collector->ntuples + nentries)
520 {
521 /*
522 * Advance lentuples to the next suitable power of 2. This won't
523 * overflow, though we could get to a value that exceeds
524 * MaxAllocSize/sizeof(IndexTuple), causing an error in repalloc.
525 */
526 collector->lentuples = pg_nextpower2_32(collector->ntuples + nentries);
527 collector->tuples = repalloc_array(collector->tuples,
528 IndexTuple, collector->lentuples);
529 }
530
531 /*
532 * Build an index tuple for each key value, and add to array. In pending
533 * tuples we just stick the heap TID into t_tid.
534 */
535 for (i = 0; i < nentries; i++)
536 {
537 IndexTuple itup;
538
539 itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
540 NULL, 0, 0, true);
541 itup->t_tid = *ht_ctid;
542 collector->tuples[collector->ntuples++] = itup;
543 collector->sumsize += IndexTupleSize(itup);
544 }
545}
546
547/*
548 * Deletes pending list pages up to (not including) newHead page.
549 * If newHead == InvalidBlockNumber then function drops the whole list.
550 *
551 * metapage is pinned and exclusive-locked throughout this function.
552 */
553static void
555 bool fill_fsm, IndexBulkDeleteResult *stats)
556{
557 Page metapage;
558 GinMetaPageData *metadata;
559 BlockNumber blknoToDelete;
560
561 metapage = BufferGetPage(metabuffer);
562 metadata = GinPageGetMeta(metapage);
563 blknoToDelete = metadata->head;
564
565 do
566 {
567 Page page;
568 int i;
569 int64 nDeletedHeapTuples = 0;
573
574 data.ndeleted = 0;
575 while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
576 {
577 freespace[data.ndeleted] = blknoToDelete;
578 buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
579 LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
580 page = BufferGetPage(buffers[data.ndeleted]);
581
582 data.ndeleted++;
583
584 Assert(!GinPageIsDeleted(page));
585
586 nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
587 blknoToDelete = GinPageGetOpaque(page)->rightlink;
588 }
589
590 if (stats)
591 stats->pages_deleted += data.ndeleted;
592
593 /*
594 * This operation touches an unusually large number of pages, so
595 * prepare the XLogInsert machinery for that before entering the
596 * critical section.
597 */
599 XLogEnsureRecordSpace(data.ndeleted, 0);
600
602
603 metadata->head = blknoToDelete;
604
605 Assert(metadata->nPendingPages >= data.ndeleted);
606 metadata->nPendingPages -= data.ndeleted;
607 Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
608 metadata->nPendingHeapTuples -= nDeletedHeapTuples;
609
610 if (blknoToDelete == InvalidBlockNumber)
611 {
612 metadata->tail = InvalidBlockNumber;
613 metadata->tailFreeSize = 0;
614 metadata->nPendingPages = 0;
615 metadata->nPendingHeapTuples = 0;
616 }
617
618 /*
619 * Set pd_lower just past the end of the metadata. This is essential,
620 * because without doing so, metadata will be lost if xlog.c
621 * compresses the page. (We must do this here because pre-v11
622 * versions of PG did not set the metapage's pd_lower correctly, so a
623 * pg_upgraded index might contain the wrong value.)
624 */
625 ((PageHeader) metapage)->pd_lower =
626 ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
627
628 MarkBufferDirty(metabuffer);
629
630 for (i = 0; i < data.ndeleted; i++)
631 {
632 page = BufferGetPage(buffers[i]);
633 GinPageGetOpaque(page)->flags = GIN_DELETED;
634 MarkBufferDirty(buffers[i]);
635 }
636
638 {
639 XLogRecPtr recptr;
640
642 XLogRegisterBuffer(0, metabuffer,
644 for (i = 0; i < data.ndeleted; i++)
645 XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
646
647 memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
648
649 XLogRegisterData((char *) &data,
650 sizeof(ginxlogDeleteListPages));
651
652 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
653 PageSetLSN(metapage, recptr);
654
655 for (i = 0; i < data.ndeleted; i++)
656 {
657 page = BufferGetPage(buffers[i]);
658 PageSetLSN(page, recptr);
659 }
660 }
661
662 for (i = 0; i < data.ndeleted; i++)
663 UnlockReleaseBuffer(buffers[i]);
664
666
667 for (i = 0; fill_fsm && i < data.ndeleted; i++)
668 RecordFreeIndexPage(index, freespace[i]);
669
670 } while (blknoToDelete != newHead);
671}
672
673/* Initialize empty KeyArray */
674static void
675initKeyArray(KeyArray *keys, int32 maxvalues)
676{
677 keys->keys = palloc_array(Datum, maxvalues);
678 keys->categories = palloc_array(GinNullCategory, maxvalues);
679 keys->nvalues = 0;
680 keys->maxvalues = maxvalues;
681}
682
683/* Add datum to KeyArray, resizing if needed */
684static void
685addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
686{
687 if (keys->nvalues >= keys->maxvalues)
688 {
689 keys->maxvalues *= 2;
690 keys->keys = repalloc_array(keys->keys, Datum, keys->maxvalues);
692 }
693
694 keys->keys[keys->nvalues] = datum;
695 keys->categories[keys->nvalues] = category;
696 keys->nvalues++;
697}
698
699/*
700 * Collect data from a pending-list page in preparation for insertion into
701 * the main index.
702 *
703 * Go through all tuples >= startoff on page and collect values in accum
704 *
705 * Note that ka is just workspace --- it does not carry any state across
706 * calls.
707 */
708static void
710 Page page, OffsetNumber startoff)
711{
712 ItemPointerData heapptr;
714 maxoff;
715 OffsetNumber attrnum;
716
717 /* reset *ka to empty */
718 ka->nvalues = 0;
719
720 maxoff = PageGetMaxOffsetNumber(page);
721 Assert(maxoff >= FirstOffsetNumber);
722 ItemPointerSetInvalid(&heapptr);
723 attrnum = 0;
724
725 for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
726 {
727 IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
728 OffsetNumber curattnum;
729 Datum curkey;
730 GinNullCategory curcategory;
731
732 /* Check for change of heap TID or attnum */
733 curattnum = gintuple_get_attrnum(accum->ginstate, itup);
734
735 if (!ItemPointerIsValid(&heapptr))
736 {
737 heapptr = itup->t_tid;
738 attrnum = curattnum;
739 }
740 else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
741 curattnum == attrnum))
742 {
743 /*
744 * ginInsertBAEntries can insert several datums per call, but only
745 * for one heap tuple and one column. So call it at a boundary,
746 * and reset ka.
747 */
748 ginInsertBAEntries(accum, &heapptr, attrnum,
749 ka->keys, ka->categories, ka->nvalues);
750 ka->nvalues = 0;
751 heapptr = itup->t_tid;
752 attrnum = curattnum;
753 }
754
755 /* Add key to KeyArray */
756 curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
757 addDatum(ka, curkey, curcategory);
758 }
759
760 /* Dump out all remaining keys */
761 ginInsertBAEntries(accum, &heapptr, attrnum,
762 ka->keys, ka->categories, ka->nvalues);
763}
764
765/*
766 * Move tuples from pending pages into regular GIN structure.
767 *
768 * On first glance it looks completely not crash-safe. But if we crash
769 * after posting entries to the main index and before removing them from the
770 * pending list, it's okay because when we redo the posting later on, nothing
771 * bad will happen.
772 *
773 * fill_fsm indicates that ginInsertCleanup should add deleted pages
774 * to FSM otherwise caller is responsible to put deleted pages into
775 * FSM.
776 *
777 * If stats isn't null, we count deleted pending pages into the counts.
778 */
779void
780ginInsertCleanup(GinState *ginstate, bool full_clean,
781 bool fill_fsm, bool forceCleanup,
783{
784 Relation index = ginstate->index;
785 Buffer metabuffer,
786 buffer;
787 Page metapage,
788 page;
789 GinMetaPageData *metadata;
791 oldCtx;
792 BuildAccumulator accum;
793 KeyArray datums;
794 BlockNumber blkno,
795 blknoFinish;
796 bool cleanupFinish = false;
797 bool fsm_vac = false;
798 Size workMemory;
799
800 /*
801 * We would like to prevent concurrent cleanup process. For that we will
802 * lock metapage in exclusive mode using LockPage() call. Nobody other
803 * will use that lock for metapage, so we keep possibility of concurrent
804 * insertion into pending list
805 */
806
807 if (forceCleanup)
808 {
809 /*
810 * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
811 * and we would like to wait concurrent cleanup to finish.
812 */
814 workMemory =
817 }
818 else
819 {
820 /*
821 * We are called from regular insert and if we see concurrent cleanup
822 * just exit in hope that concurrent process will clean up pending
823 * list.
824 */
826 return;
827 workMemory = work_mem;
828 }
829
830 metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
831 LockBuffer(metabuffer, GIN_SHARE);
832 metapage = BufferGetPage(metabuffer);
833 metadata = GinPageGetMeta(metapage);
834
835 if (metadata->head == InvalidBlockNumber)
836 {
837 /* Nothing to do */
838 UnlockReleaseBuffer(metabuffer);
840 return;
841 }
842
843 /*
844 * Remember a tail page to prevent infinite cleanup if other backends add
845 * new tuples faster than we can cleanup.
846 */
847 blknoFinish = metadata->tail;
848
849 /*
850 * Read and lock head of pending list
851 */
852 blkno = metadata->head;
853 buffer = ReadBuffer(index, blkno);
854 LockBuffer(buffer, GIN_SHARE);
855 page = BufferGetPage(buffer);
856
857 LockBuffer(metabuffer, GIN_UNLOCK);
858
859 /*
860 * Initialize. All temporary space will be in opCtx
861 */
863 "GIN insert cleanup temporary context",
865
867
868 initKeyArray(&datums, 128);
869 ginInitBA(&accum);
870 accum.ginstate = ginstate;
871
872 /*
873 * At the top of this loop, we have pin and lock on the current page of
874 * the pending list. However, we'll release that before exiting the loop.
875 * Note we also have pin but not lock on the metapage.
876 */
877 for (;;)
878 {
879 Assert(!GinPageIsDeleted(page));
880
881 /*
882 * Are we walk through the page which as we remember was a tail when
883 * we start our cleanup? But if caller asks us to clean up whole
884 * pending list then ignore old tail, we will work until list becomes
885 * empty.
886 */
887 if (blkno == blknoFinish && full_clean == false)
888 cleanupFinish = true;
889
890 /*
891 * read page's datums into accum
892 */
893 processPendingPage(&accum, &datums, page, FirstOffsetNumber);
894
896
897 /*
898 * Is it time to flush memory to disk? Flush if we are at the end of
899 * the pending list, or if we have a full row and memory is getting
900 * full.
901 */
902 if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
903 (GinPageHasFullRow(page) &&
904 (accum.allocatedMemory >= workMemory * 1024L)))
905 {
907 uint32 nlist;
908 Datum key;
909 GinNullCategory category;
910 OffsetNumber maxoff,
911 attnum;
912
913 /*
914 * Unlock current page to increase performance. Changes of page
915 * will be checked later by comparing maxoff after completion of
916 * memory flush.
917 */
918 maxoff = PageGetMaxOffsetNumber(page);
919 LockBuffer(buffer, GIN_UNLOCK);
920
921 /*
922 * Moving collected data into regular structure can take
923 * significant amount of time - so, run it without locking pending
924 * list.
925 */
926 ginBeginBAScan(&accum);
927 while ((list = ginGetBAEntry(&accum,
928 &attnum, &key, &category, &nlist)) != NULL)
929 {
930 ginEntryInsert(ginstate, attnum, key, category,
931 list, nlist, NULL);
933 }
934
935 /*
936 * Lock the whole list to remove pages
937 */
938 LockBuffer(metabuffer, GIN_EXCLUSIVE);
939 LockBuffer(buffer, GIN_SHARE);
940
941 Assert(!GinPageIsDeleted(page));
942
943 /*
944 * While we left the page unlocked, more stuff might have gotten
945 * added to it. If so, process those entries immediately. There
946 * shouldn't be very many, so we don't worry about the fact that
947 * we're doing this with exclusive lock. Insertion algorithm
948 * guarantees that inserted row(s) will not continue on next page.
949 * NOTE: intentionally no vacuum_delay_point in this loop.
950 */
951 if (PageGetMaxOffsetNumber(page) != maxoff)
952 {
953 ginInitBA(&accum);
954 processPendingPage(&accum, &datums, page, maxoff + 1);
955
956 ginBeginBAScan(&accum);
957 while ((list = ginGetBAEntry(&accum,
958 &attnum, &key, &category, &nlist)) != NULL)
959 ginEntryInsert(ginstate, attnum, key, category,
960 list, nlist, NULL);
961 }
962
963 /*
964 * Remember next page - it will become the new list head
965 */
966 blkno = GinPageGetOpaque(page)->rightlink;
967 UnlockReleaseBuffer(buffer); /* shiftList will do exclusive
968 * locking */
969
970 /*
971 * remove read pages from pending list, at this point all content
972 * of read pages is in regular structure
973 */
974 shiftList(index, metabuffer, blkno, fill_fsm, stats);
975
976 /* At this point, some pending pages have been freed up */
977 fsm_vac = true;
978
979 Assert(blkno == metadata->head);
980 LockBuffer(metabuffer, GIN_UNLOCK);
981
982 /*
983 * if we removed the whole pending list or we cleanup tail (which
984 * we remembered on start our cleanup process) then just exit
985 */
986 if (blkno == InvalidBlockNumber || cleanupFinish)
987 break;
988
989 /*
990 * release memory used so far and reinit state
991 */
993 initKeyArray(&datums, datums.maxvalues);
994 ginInitBA(&accum);
995 }
996 else
997 {
998 blkno = GinPageGetOpaque(page)->rightlink;
999 UnlockReleaseBuffer(buffer);
1000 }
1001
1002 /*
1003 * Read next page in pending list
1004 */
1006 buffer = ReadBuffer(index, blkno);
1007 LockBuffer(buffer, GIN_SHARE);
1008 page = BufferGetPage(buffer);
1009 }
1010
1012 ReleaseBuffer(metabuffer);
1013
1014 /*
1015 * As pending list pages can have a high churn rate, it is desirable to
1016 * recycle them immediately to the FreeSpaceMap when ordinary backends
1017 * clean the list.
1018 */
1019 if (fsm_vac && fill_fsm)
1021
1022 /* Clean up temporary space */
1023 MemoryContextSwitchTo(oldCtx);
1025}
1026
1027/*
1028 * SQL-callable function to clean the insert pending list
1029 */
1030Datum
1032{
1033 Oid indexoid = PG_GETARG_OID(0);
1034 Relation indexRel = index_open(indexoid, RowExclusiveLock);
1036
1037 if (RecoveryInProgress())
1038 ereport(ERROR,
1039 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1040 errmsg("recovery is in progress"),
1041 errhint("GIN pending list cannot be cleaned up during recovery.")));
1042
1043 /* Must be a GIN index */
1044 if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1045 indexRel->rd_rel->relam != GIN_AM_OID)
1046 ereport(ERROR,
1047 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1048 errmsg("\"%s\" is not a GIN index",
1049 RelationGetRelationName(indexRel))));
1050
1051 /*
1052 * Reject attempts to read non-local temporary relations; we would be
1053 * likely to get wrong data since we have no visibility into the owning
1054 * session's local buffers.
1055 */
1056 if (RELATION_IS_OTHER_TEMP(indexRel))
1057 ereport(ERROR,
1058 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1059 errmsg("cannot access temporary indexes of other sessions")));
1060
1061 /* User must own the index (comparable to privileges needed for VACUUM) */
1062 if (!object_ownercheck(RelationRelationId, indexoid, GetUserId()))
1064 RelationGetRelationName(indexRel));
1065
1066 memset(&stats, 0, sizeof(stats));
1067
1068 /*
1069 * Can't assume anything about the content of an !indisready index. Make
1070 * those a no-op, not an error, so users can just run this function on all
1071 * indexes of the access method. Since an indisready&&!indisvalid index
1072 * is merely awaiting missed aminsert calls, we're capable of processing
1073 * it. Decline to do so, out of an abundance of caution.
1074 */
1075 if (indexRel->rd_index->indisvalid)
1076 {
1077 GinState ginstate;
1078
1079 initGinState(&ginstate, indexRel);
1080 ginInsertCleanup(&ginstate, true, true, true, &stats);
1081 }
1082 else
1084 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1085 errmsg("index \"%s\" is not valid",
1086 RelationGetRelationName(indexRel))));
1087
1088 index_close(indexRel, RowExclusiveLock);
1089
1091}
@ ACLCHECK_NOT_OWNER
Definition: acl.h:185
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2622
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition: aclchk.c:4064
int autovacuum_work_mem
Definition: autovacuum.c:119
uint32 BlockNumber
Definition: block.h:31
#define InvalidBlockNumber
Definition: block.h:33
int Buffer
Definition: buf.h:23
#define InvalidBuffer
Definition: buf.h:25
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:3724
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4924
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4941
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:2532
void LockBuffer(Buffer buffer, int mode)
Definition: bufmgr.c:5158
Buffer ReadBuffer(Relation reln, BlockNumber blockNum)
Definition: bufmgr.c:746
static Page BufferGetPage(Buffer buffer)
Definition: bufmgr.h:400
Size PageGetExactFreeSpace(Page page)
Definition: bufpage.c:947
PageHeaderData * PageHeader
Definition: bufpage.h:173
static bool PageIsEmpty(Page page)
Definition: bufpage.h:223
Pointer Page
Definition: bufpage.h:81
static Item PageGetItem(Page page, ItemId itemId)
Definition: bufpage.h:354
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
Definition: bufpage.h:243
static void PageSetLSN(Page page, XLogRecPtr lsn)
Definition: bufpage.h:391
static OffsetNumber PageGetMaxOffsetNumber(Page page)
Definition: bufpage.h:372
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:471
#define MAXALIGN(LEN)
Definition: c.h:765
#define Max(x, y)
Definition: c.h:952
#define Assert(condition)
Definition: c.h:812
int64_t int64
Definition: c.h:482
int32_t int32
Definition: c.h:481
uint32_t uint32
Definition: c.h:485
size_t Size
Definition: c.h:559
int errhint(const char *fmt,...)
Definition: elog.c:1317
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
#define MaxAllocSize
Definition: fe_memutils.h:22
#define repalloc_array(pointer, type, count)
Definition: fe_memutils.h:78
#define palloc_array(type, count)
Definition: fe_memutils.h:76
#define PG_GETARG_OID(n)
Definition: fmgr.h:275
#define PG_RETURN_INT64(x)
Definition: fmgr.h:368
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define GinGetPendingListCleanupSize(relation)
Definition: gin_private.h:39
#define GIN_UNLOCK
Definition: gin_private.h:49
#define GIN_EXCLUSIVE
Definition: gin_private.h:51
#define GIN_SHARE
Definition: gin_private.h:50
#define GinListPageSize
Definition: ginblock.h:327
#define GIN_METAPAGE_BLKNO
Definition: ginblock.h:51
#define GinPageHasFullRow(page)
Definition: ginblock.h:119
#define GinPageGetOpaque(page)
Definition: ginblock.h:110
#define GIN_DELETED
Definition: ginblock.h:43
#define GIN_LIST
Definition: ginblock.h:45
signed char GinNullCategory
Definition: ginblock.h:206
#define GinPageGetMeta(p)
Definition: ginblock.h:104
#define GinPageIsDeleted(page)
Definition: ginblock.h:124
#define GinPageSetFullRow(page)
Definition: ginblock.h:120
void ginBeginBAScan(BuildAccumulator *accum)
Definition: ginbulk.c:257
ItemPointerData * ginGetBAEntry(BuildAccumulator *accum, OffsetNumber *attnum, Datum *key, GinNullCategory *category, uint32 *n)
Definition: ginbulk.c:268
void ginInsertBAEntries(BuildAccumulator *accum, ItemPointer heapptr, OffsetNumber attnum, Datum *entries, GinNullCategory *categories, int32 nentries)
Definition: ginbulk.c:210
void ginInitBA(BuildAccumulator *accum)
Definition: ginbulk.c:109
IndexTuple GinFormTuple(GinState *ginstate, OffsetNumber attnum, Datum key, GinNullCategory category, Pointer data, Size dataSize, int nipd, bool errorTooBig)
Definition: ginentrypage.c:44
#define GIN_PAGE_FREESIZE
Definition: ginfast.c:41
Datum gin_clean_pending_list(PG_FUNCTION_ARGS)
Definition: ginfast.c:1031
void ginInsertCleanup(GinState *ginstate, bool full_clean, bool fill_fsm, bool forceCleanup, IndexBulkDeleteResult *stats)
Definition: ginfast.c:780
void ginHeapTupleFastCollect(GinState *ginstate, GinTupleCollector *collector, OffsetNumber attnum, Datum value, bool isNull, ItemPointer ht_ctid)
Definition: ginfast.c:483
static int32 writeListPage(Relation index, Buffer buffer, IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
Definition: ginfast.c:59
int gin_pending_list_limit
Definition: ginfast.c:39
static void processPendingPage(BuildAccumulator *accum, KeyArray *ka, Page page, OffsetNumber startoff)
Definition: ginfast.c:709
static void initKeyArray(KeyArray *keys, int32 maxvalues)
Definition: ginfast.c:675
static void makeSublist(Relation index, IndexTuple *tuples, int32 ntuples, GinMetaPageData *res)
Definition: ginfast.c:145
static void shiftList(Relation index, Buffer metabuffer, BlockNumber newHead, bool fill_fsm, IndexBulkDeleteResult *stats)
Definition: ginfast.c:554
static void addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
Definition: ginfast.c:685
void ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
Definition: ginfast.c:219
struct KeyArray KeyArray
void ginEntryInsert(GinState *ginstate, OffsetNumber attnum, Datum key, GinNullCategory category, ItemPointerData *items, uint32 nitem, GinStatsData *buildStats)
Definition: gininsert.c:176
OffsetNumber gintuple_get_attrnum(GinState *ginstate, IndexTuple tuple)
Definition: ginutil.c:227
Buffer GinNewBuffer(Relation index)
Definition: ginutil.c:301
void GinInitBuffer(Buffer b, uint32 f)
Definition: ginutil.c:351
Datum * ginExtractEntries(GinState *ginstate, OffsetNumber attnum, Datum value, bool isNull, int32 *nentries, GinNullCategory **categories)
Definition: ginutil.c:484
Datum gintuple_get_key(GinState *ginstate, IndexTuple tuple, GinNullCategory *category)
Definition: ginutil.c:260
void initGinState(GinState *state, Relation index)
Definition: ginutil.c:98
static MemoryContext opCtx
Definition: ginxlog.c:22
#define XLOG_GIN_UPDATE_META_PAGE
Definition: ginxlog.h:162
#define GIN_NDELETE_AT_ONCE
Definition: ginxlog.h:202
#define XLOG_GIN_INSERT_LISTPAGE
Definition: ginxlog.h:180
#define XLOG_GIN_DELETE_LISTPAGE
Definition: ginxlog.h:194
int maintenance_work_mem
Definition: globals.c:132
int work_mem
Definition: globals.c:130
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
void IndexFreeSpaceMapVacuum(Relation rel)
Definition: indexfsm.c:71
void RecordFreeIndexPage(Relation rel, BlockNumber freeBlock)
Definition: indexfsm.c:52
static struct @161 value
int i
Definition: isn.c:72
Pointer Item
Definition: item.h:17
struct ItemIdData ItemIdData
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
Definition: itemptr.c:35
static void ItemPointerSetInvalid(ItemPointerData *pointer)
Definition: itemptr.h:184
static bool ItemPointerIsValid(const ItemPointerData *pointer)
Definition: itemptr.h:83
IndexTupleData * IndexTuple
Definition: itup.h:53
#define IndexTupleSize(itup)
Definition: itup.h:70
bool ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
Definition: lmgr.c:521
void LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
Definition: lmgr.c:502
void UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
Definition: lmgr.c:537
#define ExclusiveLock
Definition: lockdefs.h:42
#define RowExclusiveLock
Definition: lockdefs.h:38
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383
void * palloc(Size size)
Definition: mcxt.c:1317
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define AmAutoVacuumWorkerProcess()
Definition: miscadmin.h:381
#define START_CRIT_SECTION()
Definition: miscadmin.h:149
#define END_CRIT_SECTION()
Definition: miscadmin.h:151
Oid GetUserId(void)
Definition: miscinit.c:517
#define InvalidOffsetNumber
Definition: off.h:26
#define OffsetNumberNext(offsetNumber)
Definition: off.h:52
uint16 OffsetNumber
Definition: off.h:24
#define FirstOffsetNumber
Definition: off.h:27
@ OBJECT_INDEX
Definition: parsenodes.h:2288
int16 attnum
Definition: pg_attribute.h:74
static uint32 pg_nextpower2_32(uint32 num)
Definition: pg_bitutils.h:189
const void * data
uintptr_t Datum
Definition: postgres.h:64
unsigned int Oid
Definition: postgres_ext.h:31
void CheckForSerializableConflictIn(Relation relation, ItemPointer tid, BlockNumber blkno)
Definition: predicate.c:4326
MemoryContextSwitchTo(old_ctx)
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RelationNeedsWAL(relation)
Definition: rel.h:628
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:658
static pg_noinline void Size size
Definition: slab.c:607
GinState * ginstate
Definition: gin_private.h:433
BlockNumber tail
Definition: ginblock.h:62
uint32 tailFreeSize
Definition: ginblock.h:67
BlockNumber nPendingPages
Definition: ginblock.h:73
int64 nPendingHeapTuples
Definition: ginblock.h:74
BlockNumber head
Definition: ginblock.h:61
Relation index
Definition: gin_private.h:59
IndexTuple * tuples
Definition: gin_private.h:455
BlockNumber pages_deleted
Definition: genam.h:84
ItemPointerData t_tid
Definition: itup.h:37
Datum * keys
Definition: ginfast.c:46
GinNullCategory * categories
Definition: ginfast.c:47
int32 nvalues
Definition: ginfast.c:48
int32 maxvalues
Definition: ginfast.c:49
Form_pg_index rd_index
Definition: rel.h:192
Form_pg_class rd_rel
Definition: rel.h:111
Definition: type.h:96
char data[BLCKSZ]
Definition: c.h:1073
void vacuum_delay_point(void)
Definition: vacuum.c:2362
bool RecoveryInProgress(void)
Definition: xlog.c:6334
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void XLogRegisterBufData(uint8 block_id, const char *data, uint32 len)
Definition: xloginsert.c:405
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:474
void XLogRegisterData(const char *data, uint32 len)
Definition: xloginsert.c:364
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:242
void XLogBeginInsert(void)
Definition: xloginsert.c:149
void XLogEnsureRecordSpace(int max_block_id, int ndatas)
Definition: xloginsert.c:175
#define REGBUF_STANDARD
Definition: xloginsert.h:34
#define REGBUF_WILL_INIT
Definition: xloginsert.h:33