PostgreSQL Source Code  git master
ginfast.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * ginfast.c
4  * Fast insert routines for the Postgres inverted index access method.
5  * Pending entries are stored in linear list of pages. Later on
6  * (typically during VACUUM), ginInsertCleanup() will be invoked to
7  * transfer pending entries into the regular index structure. This
8  * wins because bulk insertion is much more efficient than retail.
9  *
10  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  * IDENTIFICATION
14  * src/backend/access/gin/ginfast.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "access/gin_private.h"
22 #include "access/ginxlog.h"
23 #include "access/xlog.h"
24 #include "access/xloginsert.h"
25 #include "catalog/pg_am.h"
26 #include "commands/vacuum.h"
27 #include "miscadmin.h"
28 #include "port/pg_bitutils.h"
29 #include "postmaster/autovacuum.h"
30 #include "storage/indexfsm.h"
31 #include "storage/lmgr.h"
32 #include "storage/predicate.h"
33 #include "utils/acl.h"
34 #include "utils/fmgrprotos.h"
35 #include "utils/memutils.h"
36 #include "utils/rel.h"
37 
38 /* GUC parameter */
40 
41 #define GIN_PAGE_FREESIZE \
42  ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
43 
44 typedef struct KeyArray
45 {
46  Datum *keys; /* expansible array */
47  GinNullCategory *categories; /* another expansible array */
48  int32 nvalues; /* current number of valid entries */
49  int32 maxvalues; /* allocated size of arrays */
51 
52 
53 /*
54  * Build a pending-list page from the given array of tuples, and write it out.
55  *
56  * Returns amount of free space left on the page.
57  */
58 static int32
60  IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
61 {
62  Page page = BufferGetPage(buffer);
63  int32 i,
64  freesize,
65  size = 0;
66  OffsetNumber l,
67  off;
68  PGAlignedBlock workspace;
69  char *ptr;
70 
72 
73  GinInitBuffer(buffer, GIN_LIST);
74 
75  off = FirstOffsetNumber;
76  ptr = workspace.data;
77 
78  for (i = 0; i < ntuples; i++)
79  {
80  int this_size = IndexTupleSize(tuples[i]);
81 
82  memcpy(ptr, tuples[i], this_size);
83  ptr += this_size;
84  size += this_size;
85 
86  l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
87 
88  if (l == InvalidOffsetNumber)
89  elog(ERROR, "failed to add item to index page in \"%s\"",
91 
92  off++;
93  }
94 
95  Assert(size <= BLCKSZ); /* else we overran workspace */
96 
97  GinPageGetOpaque(page)->rightlink = rightlink;
98 
99  /*
100  * tail page may contain only whole row(s) or final part of row placed on
101  * previous pages (a "row" here meaning all the index tuples generated for
102  * one heap tuple)
103  */
104  if (rightlink == InvalidBlockNumber)
105  {
106  GinPageSetFullRow(page);
107  GinPageGetOpaque(page)->maxoff = 1;
108  }
109  else
110  {
111  GinPageGetOpaque(page)->maxoff = 0;
112  }
113 
114  MarkBufferDirty(buffer);
115 
116  if (RelationNeedsWAL(index))
117  {
119  XLogRecPtr recptr;
120 
121  data.rightlink = rightlink;
122  data.ntuples = ntuples;
123 
124  XLogBeginInsert();
125  XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
126 
128  XLogRegisterBufData(0, workspace.data, size);
129 
130  recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
131  PageSetLSN(page, recptr);
132  }
133 
134  /* get free space before releasing buffer */
135  freesize = PageGetExactFreeSpace(page);
136 
137  UnlockReleaseBuffer(buffer);
138 
140 
141  return freesize;
142 }
143 
144 static void
147 {
148  Buffer curBuffer = InvalidBuffer;
149  Buffer prevBuffer = InvalidBuffer;
150  int i,
151  size = 0,
152  tupsize;
153  int startTuple = 0;
154 
155  Assert(ntuples > 0);
156 
157  /*
158  * Split tuples into pages
159  */
160  for (i = 0; i < ntuples; i++)
161  {
162  if (curBuffer == InvalidBuffer)
163  {
164  curBuffer = GinNewBuffer(index);
165 
166  if (prevBuffer != InvalidBuffer)
167  {
168  res->nPendingPages++;
169  writeListPage(index, prevBuffer,
170  tuples + startTuple,
171  i - startTuple,
172  BufferGetBlockNumber(curBuffer));
173  }
174  else
175  {
176  res->head = BufferGetBlockNumber(curBuffer);
177  }
178 
179  prevBuffer = curBuffer;
180  startTuple = i;
181  size = 0;
182  }
183 
184  tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
185 
186  if (size + tupsize > GinListPageSize)
187  {
188  /* won't fit, force a new page and reprocess */
189  i--;
190  curBuffer = InvalidBuffer;
191  }
192  else
193  {
194  size += tupsize;
195  }
196  }
197 
198  /*
199  * Write last page
200  */
201  res->tail = BufferGetBlockNumber(curBuffer);
202  res->tailFreeSize = writeListPage(index, curBuffer,
203  tuples + startTuple,
204  ntuples - startTuple,
206  res->nPendingPages++;
207  /* that was only one heap tuple */
208  res->nPendingHeapTuples = 1;
209 }
210 
211 /*
212  * Write the index tuples contained in *collector into the index's
213  * pending list.
214  *
215  * Function guarantees that all these tuples will be inserted consecutively,
216  * preserving order
217  */
218 void
220 {
221  Relation index = ginstate->index;
222  Buffer metabuffer;
223  Page metapage;
224  GinMetaPageData *metadata = NULL;
225  Buffer buffer = InvalidBuffer;
226  Page page = NULL;
228  bool separateList = false;
229  bool needCleanup = false;
230  int cleanupSize;
231  bool needWal;
232 
233  if (collector->ntuples == 0)
234  return;
235 
236  needWal = RelationNeedsWAL(index);
237 
238  data.locator = index->rd_locator;
239  data.ntuples = 0;
240  data.newRightlink = data.prevTail = InvalidBlockNumber;
241 
242  metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
243  metapage = BufferGetPage(metabuffer);
244 
245  /*
246  * An insertion to the pending list could logically belong anywhere in the
247  * tree, so it conflicts with all serializable scans. All scans acquire a
248  * predicate lock on the metabuffer to represent that. Therefore we'll
249  * check for conflicts in, but not until we have the page locked and are
250  * ready to modify the page.
251  */
252 
253  if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
254  {
255  /*
256  * Total size is greater than one page => make sublist
257  */
258  separateList = true;
259  }
260  else
261  {
262  LockBuffer(metabuffer, GIN_EXCLUSIVE);
263  metadata = GinPageGetMeta(metapage);
264 
265  if (metadata->head == InvalidBlockNumber ||
266  collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
267  {
268  /*
269  * Pending list is empty or total size is greater than freespace
270  * on tail page => make sublist
271  *
272  * We unlock metabuffer to keep high concurrency
273  */
274  separateList = true;
275  LockBuffer(metabuffer, GIN_UNLOCK);
276  }
277  }
278 
279  if (separateList)
280  {
281  /*
282  * We should make sublist separately and append it to the tail
283  */
284  GinMetaPageData sublist;
285 
286  memset(&sublist, 0, sizeof(GinMetaPageData));
287  makeSublist(index, collector->tuples, collector->ntuples, &sublist);
288 
289  /*
290  * metapage was unlocked, see above
291  */
292  LockBuffer(metabuffer, GIN_EXCLUSIVE);
293  metadata = GinPageGetMeta(metapage);
294 
296 
297  if (metadata->head == InvalidBlockNumber)
298  {
299  /*
300  * Main list is empty, so just insert sublist as main list
301  */
303 
304  metadata->head = sublist.head;
305  metadata->tail = sublist.tail;
306  metadata->tailFreeSize = sublist.tailFreeSize;
307 
308  metadata->nPendingPages = sublist.nPendingPages;
309  metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
310 
311  if (needWal)
312  XLogBeginInsert();
313  }
314  else
315  {
316  /*
317  * Merge lists
318  */
319  data.prevTail = metadata->tail;
320  data.newRightlink = sublist.head;
321 
322  buffer = ReadBuffer(index, metadata->tail);
323  LockBuffer(buffer, GIN_EXCLUSIVE);
324  page = BufferGetPage(buffer);
325 
326  Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
327 
329 
330  GinPageGetOpaque(page)->rightlink = sublist.head;
331 
332  MarkBufferDirty(buffer);
333 
334  metadata->tail = sublist.tail;
335  metadata->tailFreeSize = sublist.tailFreeSize;
336 
337  metadata->nPendingPages += sublist.nPendingPages;
338  metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
339 
340  if (needWal)
341  {
342  XLogBeginInsert();
344  }
345  }
346  }
347  else
348  {
349  /*
350  * Insert into tail page. Metapage is already locked
351  */
352  OffsetNumber l,
353  off;
354  int i,
355  tupsize;
356  char *ptr;
357  char *collectordata;
358 
360 
361  buffer = ReadBuffer(index, metadata->tail);
362  LockBuffer(buffer, GIN_EXCLUSIVE);
363  page = BufferGetPage(buffer);
364 
365  off = (PageIsEmpty(page)) ? FirstOffsetNumber :
367 
368  collectordata = ptr = (char *) palloc(collector->sumsize);
369 
370  data.ntuples = collector->ntuples;
371 
373 
374  if (needWal)
375  XLogBeginInsert();
376 
377  /*
378  * Increase counter of heap tuples
379  */
380  Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
381  GinPageGetOpaque(page)->maxoff++;
382  metadata->nPendingHeapTuples++;
383 
384  for (i = 0; i < collector->ntuples; i++)
385  {
386  tupsize = IndexTupleSize(collector->tuples[i]);
387  l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
388 
389  if (l == InvalidOffsetNumber)
390  elog(ERROR, "failed to add item to index page in \"%s\"",
392 
393  memcpy(ptr, collector->tuples[i], tupsize);
394  ptr += tupsize;
395 
396  off++;
397  }
398 
399  Assert((ptr - collectordata) <= collector->sumsize);
400 
401  MarkBufferDirty(buffer);
402 
403  if (needWal)
404  {
406  XLogRegisterBufData(1, collectordata, collector->sumsize);
407  }
408 
409  metadata->tailFreeSize = PageGetExactFreeSpace(page);
410  }
411 
412  /*
413  * Set pd_lower just past the end of the metadata. This is essential,
414  * because without doing so, metadata will be lost if xlog.c compresses
415  * the page. (We must do this here because pre-v11 versions of PG did not
416  * set the metapage's pd_lower correctly, so a pg_upgraded index might
417  * contain the wrong value.)
418  */
419  ((PageHeader) metapage)->pd_lower =
420  ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
421 
422  /*
423  * Write metabuffer, make xlog entry
424  */
425  MarkBufferDirty(metabuffer);
426 
427  if (needWal)
428  {
429  XLogRecPtr recptr;
430 
431  memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
432 
434  XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
435 
436  recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
437  PageSetLSN(metapage, recptr);
438 
439  if (buffer != InvalidBuffer)
440  {
441  PageSetLSN(page, recptr);
442  }
443  }
444 
445  if (buffer != InvalidBuffer)
446  UnlockReleaseBuffer(buffer);
447 
448  /*
449  * Force pending list cleanup when it becomes too long. And,
450  * ginInsertCleanup could take significant amount of time, so we prefer to
451  * call it when it can do all the work in a single collection cycle. In
452  * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
453  * while pending list is still small enough to fit into
454  * gin_pending_list_limit.
455  *
456  * ginInsertCleanup() should not be called inside our CRIT_SECTION.
457  */
458  cleanupSize = GinGetPendingListCleanupSize(index);
459  if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
460  needCleanup = true;
461 
462  UnlockReleaseBuffer(metabuffer);
463 
465 
466  /*
467  * Since it could contend with concurrent cleanup process we cleanup
468  * pending list not forcibly.
469  */
470  if (needCleanup)
471  ginInsertCleanup(ginstate, false, true, false, NULL);
472 }
473 
474 /*
475  * Create temporary index tuples for a single indexable item (one index column
476  * for the heap tuple specified by ht_ctid), and append them to the array
477  * in *collector. They will subsequently be written out using
478  * ginHeapTupleFastInsert. Note that to guarantee consistent state, all
479  * temp tuples for a given heap tuple must be written in one call to
480  * ginHeapTupleFastInsert.
481  */
482 void
484  GinTupleCollector *collector,
485  OffsetNumber attnum, Datum value, bool isNull,
486  ItemPointer ht_ctid)
487 {
488  Datum *entries;
489  GinNullCategory *categories;
490  int32 i,
491  nentries;
492 
493  /*
494  * Extract the key values that need to be inserted in the index
495  */
496  entries = ginExtractEntries(ginstate, attnum, value, isNull,
497  &nentries, &categories);
498 
499  /*
500  * Protect against integer overflow in allocation calculations
501  */
502  if (nentries < 0 ||
503  collector->ntuples + nentries > MaxAllocSize / sizeof(IndexTuple))
504  elog(ERROR, "too many entries for GIN index");
505 
506  /*
507  * Allocate/reallocate memory for storing collected tuples
508  */
509  if (collector->tuples == NULL)
510  {
511  /*
512  * Determine the number of elements to allocate in the tuples array
513  * initially. Make it a power of 2 to avoid wasting memory when
514  * resizing (since palloc likes powers of 2).
515  */
516  collector->lentuples = pg_nextpower2_32(Max(16, nentries));
517  collector->tuples = palloc_array(IndexTuple, collector->lentuples);
518  }
519  else if (collector->lentuples < collector->ntuples + nentries)
520  {
521  /*
522  * Advance lentuples to the next suitable power of 2. This won't
523  * overflow, though we could get to a value that exceeds
524  * MaxAllocSize/sizeof(IndexTuple), causing an error in repalloc.
525  */
526  collector->lentuples = pg_nextpower2_32(collector->ntuples + nentries);
527  collector->tuples = repalloc_array(collector->tuples,
528  IndexTuple, collector->lentuples);
529  }
530 
531  /*
532  * Build an index tuple for each key value, and add to array. In pending
533  * tuples we just stick the heap TID into t_tid.
534  */
535  for (i = 0; i < nentries; i++)
536  {
537  IndexTuple itup;
538 
539  itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
540  NULL, 0, 0, true);
541  itup->t_tid = *ht_ctid;
542  collector->tuples[collector->ntuples++] = itup;
543  collector->sumsize += IndexTupleSize(itup);
544  }
545 }
546 
547 /*
548  * Deletes pending list pages up to (not including) newHead page.
549  * If newHead == InvalidBlockNumber then function drops the whole list.
550  *
551  * metapage is pinned and exclusive-locked throughout this function.
552  */
553 static void
555  bool fill_fsm, IndexBulkDeleteResult *stats)
556 {
557  Page metapage;
558  GinMetaPageData *metadata;
559  BlockNumber blknoToDelete;
560 
561  metapage = BufferGetPage(metabuffer);
562  metadata = GinPageGetMeta(metapage);
563  blknoToDelete = metadata->head;
564 
565  do
566  {
567  Page page;
568  int i;
569  int64 nDeletedHeapTuples = 0;
571  Buffer buffers[GIN_NDELETE_AT_ONCE];
572  BlockNumber freespace[GIN_NDELETE_AT_ONCE];
573 
574  data.ndeleted = 0;
575  while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
576  {
577  freespace[data.ndeleted] = blknoToDelete;
578  buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
579  LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
580  page = BufferGetPage(buffers[data.ndeleted]);
581 
582  data.ndeleted++;
583 
584  Assert(!GinPageIsDeleted(page));
585 
586  nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
587  blknoToDelete = GinPageGetOpaque(page)->rightlink;
588  }
589 
590  if (stats)
591  stats->pages_deleted += data.ndeleted;
592 
593  /*
594  * This operation touches an unusually large number of pages, so
595  * prepare the XLogInsert machinery for that before entering the
596  * critical section.
597  */
598  if (RelationNeedsWAL(index))
599  XLogEnsureRecordSpace(data.ndeleted, 0);
600 
602 
603  metadata->head = blknoToDelete;
604 
605  Assert(metadata->nPendingPages >= data.ndeleted);
606  metadata->nPendingPages -= data.ndeleted;
607  Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
608  metadata->nPendingHeapTuples -= nDeletedHeapTuples;
609 
610  if (blknoToDelete == InvalidBlockNumber)
611  {
612  metadata->tail = InvalidBlockNumber;
613  metadata->tailFreeSize = 0;
614  metadata->nPendingPages = 0;
615  metadata->nPendingHeapTuples = 0;
616  }
617 
618  /*
619  * Set pd_lower just past the end of the metadata. This is essential,
620  * because without doing so, metadata will be lost if xlog.c
621  * compresses the page. (We must do this here because pre-v11
622  * versions of PG did not set the metapage's pd_lower correctly, so a
623  * pg_upgraded index might contain the wrong value.)
624  */
625  ((PageHeader) metapage)->pd_lower =
626  ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
627 
628  MarkBufferDirty(metabuffer);
629 
630  for (i = 0; i < data.ndeleted; i++)
631  {
632  page = BufferGetPage(buffers[i]);
633  GinPageGetOpaque(page)->flags = GIN_DELETED;
634  MarkBufferDirty(buffers[i]);
635  }
636 
637  if (RelationNeedsWAL(index))
638  {
639  XLogRecPtr recptr;
640 
641  XLogBeginInsert();
642  XLogRegisterBuffer(0, metabuffer,
644  for (i = 0; i < data.ndeleted; i++)
645  XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
646 
647  memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
648 
649  XLogRegisterData((char *) &data,
650  sizeof(ginxlogDeleteListPages));
651 
652  recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
653  PageSetLSN(metapage, recptr);
654 
655  for (i = 0; i < data.ndeleted; i++)
656  {
657  page = BufferGetPage(buffers[i]);
658  PageSetLSN(page, recptr);
659  }
660  }
661 
662  for (i = 0; i < data.ndeleted; i++)
663  UnlockReleaseBuffer(buffers[i]);
664 
666 
667  for (i = 0; fill_fsm && i < data.ndeleted; i++)
668  RecordFreeIndexPage(index, freespace[i]);
669 
670  } while (blknoToDelete != newHead);
671 }
672 
673 /* Initialize empty KeyArray */
674 static void
675 initKeyArray(KeyArray *keys, int32 maxvalues)
676 {
677  keys->keys = palloc_array(Datum, maxvalues);
678  keys->categories = palloc_array(GinNullCategory, maxvalues);
679  keys->nvalues = 0;
680  keys->maxvalues = maxvalues;
681 }
682 
683 /* Add datum to KeyArray, resizing if needed */
684 static void
685 addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
686 {
687  if (keys->nvalues >= keys->maxvalues)
688  {
689  keys->maxvalues *= 2;
690  keys->keys = repalloc_array(keys->keys, Datum, keys->maxvalues);
692  }
693 
694  keys->keys[keys->nvalues] = datum;
695  keys->categories[keys->nvalues] = category;
696  keys->nvalues++;
697 }
698 
699 /*
700  * Collect data from a pending-list page in preparation for insertion into
701  * the main index.
702  *
703  * Go through all tuples >= startoff on page and collect values in accum
704  *
705  * Note that ka is just workspace --- it does not carry any state across
706  * calls.
707  */
708 static void
710  Page page, OffsetNumber startoff)
711 {
712  ItemPointerData heapptr;
713  OffsetNumber i,
714  maxoff;
715  OffsetNumber attrnum;
716 
717  /* reset *ka to empty */
718  ka->nvalues = 0;
719 
720  maxoff = PageGetMaxOffsetNumber(page);
721  Assert(maxoff >= FirstOffsetNumber);
722  ItemPointerSetInvalid(&heapptr);
723  attrnum = 0;
724 
725  for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
726  {
727  IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
728  OffsetNumber curattnum;
729  Datum curkey;
730  GinNullCategory curcategory;
731 
732  /* Check for change of heap TID or attnum */
733  curattnum = gintuple_get_attrnum(accum->ginstate, itup);
734 
735  if (!ItemPointerIsValid(&heapptr))
736  {
737  heapptr = itup->t_tid;
738  attrnum = curattnum;
739  }
740  else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
741  curattnum == attrnum))
742  {
743  /*
744  * ginInsertBAEntries can insert several datums per call, but only
745  * for one heap tuple and one column. So call it at a boundary,
746  * and reset ka.
747  */
748  ginInsertBAEntries(accum, &heapptr, attrnum,
749  ka->keys, ka->categories, ka->nvalues);
750  ka->nvalues = 0;
751  heapptr = itup->t_tid;
752  attrnum = curattnum;
753  }
754 
755  /* Add key to KeyArray */
756  curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
757  addDatum(ka, curkey, curcategory);
758  }
759 
760  /* Dump out all remaining keys */
761  ginInsertBAEntries(accum, &heapptr, attrnum,
762  ka->keys, ka->categories, ka->nvalues);
763 }
764 
765 /*
766  * Move tuples from pending pages into regular GIN structure.
767  *
768  * On first glance it looks completely not crash-safe. But if we crash
769  * after posting entries to the main index and before removing them from the
770  * pending list, it's okay because when we redo the posting later on, nothing
771  * bad will happen.
772  *
773  * fill_fsm indicates that ginInsertCleanup should add deleted pages
774  * to FSM otherwise caller is responsible to put deleted pages into
775  * FSM.
776  *
777  * If stats isn't null, we count deleted pending pages into the counts.
778  */
779 void
780 ginInsertCleanup(GinState *ginstate, bool full_clean,
781  bool fill_fsm, bool forceCleanup,
782  IndexBulkDeleteResult *stats)
783 {
784  Relation index = ginstate->index;
785  Buffer metabuffer,
786  buffer;
787  Page metapage,
788  page;
789  GinMetaPageData *metadata;
791  oldCtx;
792  BuildAccumulator accum;
793  KeyArray datums;
794  BlockNumber blkno,
795  blknoFinish;
796  bool cleanupFinish = false;
797  bool fsm_vac = false;
798  Size workMemory;
799 
800  /*
801  * We would like to prevent concurrent cleanup process. For that we will
802  * lock metapage in exclusive mode using LockPage() call. Nobody other
803  * will use that lock for metapage, so we keep possibility of concurrent
804  * insertion into pending list
805  */
806 
807  if (forceCleanup)
808  {
809  /*
810  * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
811  * and we would like to wait concurrent cleanup to finish.
812  */
814  workMemory =
817  }
818  else
819  {
820  /*
821  * We are called from regular insert and if we see concurrent cleanup
822  * just exit in hope that concurrent process will clean up pending
823  * list.
824  */
826  return;
827  workMemory = work_mem;
828  }
829 
830  metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
831  LockBuffer(metabuffer, GIN_SHARE);
832  metapage = BufferGetPage(metabuffer);
833  metadata = GinPageGetMeta(metapage);
834 
835  if (metadata->head == InvalidBlockNumber)
836  {
837  /* Nothing to do */
838  UnlockReleaseBuffer(metabuffer);
840  return;
841  }
842 
843  /*
844  * Remember a tail page to prevent infinite cleanup if other backends add
845  * new tuples faster than we can cleanup.
846  */
847  blknoFinish = metadata->tail;
848 
849  /*
850  * Read and lock head of pending list
851  */
852  blkno = metadata->head;
853  buffer = ReadBuffer(index, blkno);
854  LockBuffer(buffer, GIN_SHARE);
855  page = BufferGetPage(buffer);
856 
857  LockBuffer(metabuffer, GIN_UNLOCK);
858 
859  /*
860  * Initialize. All temporary space will be in opCtx
861  */
863  "GIN insert cleanup temporary context",
865 
866  oldCtx = MemoryContextSwitchTo(opCtx);
867 
868  initKeyArray(&datums, 128);
869  ginInitBA(&accum);
870  accum.ginstate = ginstate;
871 
872  /*
873  * At the top of this loop, we have pin and lock on the current page of
874  * the pending list. However, we'll release that before exiting the loop.
875  * Note we also have pin but not lock on the metapage.
876  */
877  for (;;)
878  {
879  Assert(!GinPageIsDeleted(page));
880 
881  /*
882  * Are we walk through the page which as we remember was a tail when
883  * we start our cleanup? But if caller asks us to clean up whole
884  * pending list then ignore old tail, we will work until list becomes
885  * empty.
886  */
887  if (blkno == blknoFinish && full_clean == false)
888  cleanupFinish = true;
889 
890  /*
891  * read page's datums into accum
892  */
893  processPendingPage(&accum, &datums, page, FirstOffsetNumber);
894 
896 
897  /*
898  * Is it time to flush memory to disk? Flush if we are at the end of
899  * the pending list, or if we have a full row and memory is getting
900  * full.
901  */
902  if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
903  (GinPageHasFullRow(page) &&
904  (accum.allocatedMemory >= workMemory * 1024L)))
905  {
907  uint32 nlist;
908  Datum key;
909  GinNullCategory category;
910  OffsetNumber maxoff,
911  attnum;
912 
913  /*
914  * Unlock current page to increase performance. Changes of page
915  * will be checked later by comparing maxoff after completion of
916  * memory flush.
917  */
918  maxoff = PageGetMaxOffsetNumber(page);
919  LockBuffer(buffer, GIN_UNLOCK);
920 
921  /*
922  * Moving collected data into regular structure can take
923  * significant amount of time - so, run it without locking pending
924  * list.
925  */
926  ginBeginBAScan(&accum);
927  while ((list = ginGetBAEntry(&accum,
928  &attnum, &key, &category, &nlist)) != NULL)
929  {
930  ginEntryInsert(ginstate, attnum, key, category,
931  list, nlist, NULL);
933  }
934 
935  /*
936  * Lock the whole list to remove pages
937  */
938  LockBuffer(metabuffer, GIN_EXCLUSIVE);
939  LockBuffer(buffer, GIN_SHARE);
940 
941  Assert(!GinPageIsDeleted(page));
942 
943  /*
944  * While we left the page unlocked, more stuff might have gotten
945  * added to it. If so, process those entries immediately. There
946  * shouldn't be very many, so we don't worry about the fact that
947  * we're doing this with exclusive lock. Insertion algorithm
948  * guarantees that inserted row(s) will not continue on next page.
949  * NOTE: intentionally no vacuum_delay_point in this loop.
950  */
951  if (PageGetMaxOffsetNumber(page) != maxoff)
952  {
953  ginInitBA(&accum);
954  processPendingPage(&accum, &datums, page, maxoff + 1);
955 
956  ginBeginBAScan(&accum);
957  while ((list = ginGetBAEntry(&accum,
958  &attnum, &key, &category, &nlist)) != NULL)
959  ginEntryInsert(ginstate, attnum, key, category,
960  list, nlist, NULL);
961  }
962 
963  /*
964  * Remember next page - it will become the new list head
965  */
966  blkno = GinPageGetOpaque(page)->rightlink;
967  UnlockReleaseBuffer(buffer); /* shiftList will do exclusive
968  * locking */
969 
970  /*
971  * remove read pages from pending list, at this point all content
972  * of read pages is in regular structure
973  */
974  shiftList(index, metabuffer, blkno, fill_fsm, stats);
975 
976  /* At this point, some pending pages have been freed up */
977  fsm_vac = true;
978 
979  Assert(blkno == metadata->head);
980  LockBuffer(metabuffer, GIN_UNLOCK);
981 
982  /*
983  * if we removed the whole pending list or we cleanup tail (which
984  * we remembered on start our cleanup process) then just exit
985  */
986  if (blkno == InvalidBlockNumber || cleanupFinish)
987  break;
988 
989  /*
990  * release memory used so far and reinit state
991  */
993  initKeyArray(&datums, datums.maxvalues);
994  ginInitBA(&accum);
995  }
996  else
997  {
998  blkno = GinPageGetOpaque(page)->rightlink;
999  UnlockReleaseBuffer(buffer);
1000  }
1001 
1002  /*
1003  * Read next page in pending list
1004  */
1006  buffer = ReadBuffer(index, blkno);
1007  LockBuffer(buffer, GIN_SHARE);
1008  page = BufferGetPage(buffer);
1009  }
1010 
1012  ReleaseBuffer(metabuffer);
1013 
1014  /*
1015  * As pending list pages can have a high churn rate, it is desirable to
1016  * recycle them immediately to the FreeSpaceMap when ordinary backends
1017  * clean the list.
1018  */
1019  if (fsm_vac && fill_fsm)
1021 
1022  /* Clean up temporary space */
1023  MemoryContextSwitchTo(oldCtx);
1025 }
1026 
1027 /*
1028  * SQL-callable function to clean the insert pending list
1029  */
1030 Datum
1032 {
1033  Oid indexoid = PG_GETARG_OID(0);
1034  Relation indexRel = index_open(indexoid, RowExclusiveLock);
1035  IndexBulkDeleteResult stats;
1036 
1037  if (RecoveryInProgress())
1038  ereport(ERROR,
1039  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1040  errmsg("recovery is in progress"),
1041  errhint("GIN pending list cannot be cleaned up during recovery.")));
1042 
1043  /* Must be a GIN index */
1044  if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1045  indexRel->rd_rel->relam != GIN_AM_OID)
1046  ereport(ERROR,
1047  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1048  errmsg("\"%s\" is not a GIN index",
1049  RelationGetRelationName(indexRel))));
1050 
1051  /*
1052  * Reject attempts to read non-local temporary relations; we would be
1053  * likely to get wrong data since we have no visibility into the owning
1054  * session's local buffers.
1055  */
1056  if (RELATION_IS_OTHER_TEMP(indexRel))
1057  ereport(ERROR,
1058  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1059  errmsg("cannot access temporary indexes of other sessions")));
1060 
1061  /* User must own the index (comparable to privileges needed for VACUUM) */
1062  if (!object_ownercheck(RelationRelationId, indexoid, GetUserId()))
1064  RelationGetRelationName(indexRel));
1065 
1066  memset(&stats, 0, sizeof(stats));
1067 
1068  /*
1069  * Can't assume anything about the content of an !indisready index. Make
1070  * those a no-op, not an error, so users can just run this function on all
1071  * indexes of the access method. Since an indisready&&!indisvalid index
1072  * is merely awaiting missed aminsert calls, we're capable of processing
1073  * it. Decline to do so, out of an abundance of caution.
1074  */
1075  if (indexRel->rd_index->indisvalid)
1076  {
1077  GinState ginstate;
1078 
1079  initGinState(&ginstate, indexRel);
1080  ginInsertCleanup(&ginstate, true, true, true, &stats);
1081  }
1082  else
1083  ereport(DEBUG1,
1084  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1085  errmsg("index \"%s\" is not valid",
1086  RelationGetRelationName(indexRel))));
1087 
1088  index_close(indexRel, RowExclusiveLock);
1089 
1090  PG_RETURN_INT64((int64) stats.pages_deleted);
1091 }
@ ACLCHECK_NOT_OWNER
Definition: acl.h:185
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2702
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition: aclchk.c:4144
int autovacuum_work_mem
Definition: autovacuum.c:118
uint32 BlockNumber
Definition: block.h:31
#define InvalidBlockNumber
Definition: block.h:33
int Buffer
Definition: buf.h:23
#define InvalidBuffer
Definition: buf.h:25
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:3713
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4896
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4913
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:2520
void LockBuffer(Buffer buffer, int mode)
Definition: bufmgr.c:5131
Buffer ReadBuffer(Relation reln, BlockNumber blockNum)
Definition: bufmgr.c:745
static Page BufferGetPage(Buffer buffer)
Definition: bufmgr.h:404
Size PageGetExactFreeSpace(Page page)
Definition: bufpage.c:958
PageHeaderData * PageHeader
Definition: bufpage.h:170
static bool PageIsEmpty(Page page)
Definition: bufpage.h:220
Pointer Page
Definition: bufpage.h:78
static Item PageGetItem(Page page, ItemId itemId)
Definition: bufpage.h:351
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
Definition: bufpage.h:240
static void PageSetLSN(Page page, XLogRecPtr lsn)
Definition: bufpage.h:388
static OffsetNumber PageGetMaxOffsetNumber(Page page)
Definition: bufpage.h:369
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:468
unsigned int uint32
Definition: c.h:506
#define MAXALIGN(LEN)
Definition: c.h:811
signed int int32
Definition: c.h:494
#define Max(x, y)
Definition: c.h:998
#define Assert(condition)
Definition: c.h:858
size_t Size
Definition: c.h:605
int errhint(const char *fmt,...)
Definition: elog.c:1317
int errcode(int sqlerrcode)
Definition: elog.c:857
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
#define ereport(elevel,...)
Definition: elog.h:149
#define repalloc_array(pointer, type, count)
Definition: fe_memutils.h:66
#define palloc_array(type, count)
Definition: fe_memutils.h:64
#define PG_GETARG_OID(n)
Definition: fmgr.h:275
#define PG_RETURN_INT64(x)
Definition: fmgr.h:368
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define GinGetPendingListCleanupSize(relation)
Definition: gin_private.h:39
#define GIN_UNLOCK
Definition: gin_private.h:49
#define GIN_EXCLUSIVE
Definition: gin_private.h:51
#define GIN_SHARE
Definition: gin_private.h:50
#define GinListPageSize
Definition: ginblock.h:327
#define GIN_METAPAGE_BLKNO
Definition: ginblock.h:51
#define GinPageHasFullRow(page)
Definition: ginblock.h:119
#define GinPageGetOpaque(page)
Definition: ginblock.h:110
#define GIN_DELETED
Definition: ginblock.h:43
#define GIN_LIST
Definition: ginblock.h:45
signed char GinNullCategory
Definition: ginblock.h:206
#define GinPageGetMeta(p)
Definition: ginblock.h:104
#define GinPageIsDeleted(page)
Definition: ginblock.h:124
#define GinPageSetFullRow(page)
Definition: ginblock.h:120
void ginBeginBAScan(BuildAccumulator *accum)
Definition: ginbulk.c:257
void ginInsertBAEntries(BuildAccumulator *accum, ItemPointer heapptr, OffsetNumber attnum, Datum *entries, GinNullCategory *categories, int32 nentries)
Definition: ginbulk.c:210
void ginInitBA(BuildAccumulator *accum)
Definition: ginbulk.c:109
ItemPointerData * ginGetBAEntry(BuildAccumulator *accum, OffsetNumber *attnum, Datum *key, GinNullCategory *category, uint32 *n)
Definition: ginbulk.c:268
IndexTuple GinFormTuple(GinState *ginstate, OffsetNumber attnum, Datum key, GinNullCategory category, Pointer data, Size dataSize, int nipd, bool errorTooBig)
Definition: ginentrypage.c:44
#define GIN_PAGE_FREESIZE
Definition: ginfast.c:41
Datum gin_clean_pending_list(PG_FUNCTION_ARGS)
Definition: ginfast.c:1031
void ginInsertCleanup(GinState *ginstate, bool full_clean, bool fill_fsm, bool forceCleanup, IndexBulkDeleteResult *stats)
Definition: ginfast.c:780
void ginHeapTupleFastCollect(GinState *ginstate, GinTupleCollector *collector, OffsetNumber attnum, Datum value, bool isNull, ItemPointer ht_ctid)
Definition: ginfast.c:483
static int32 writeListPage(Relation index, Buffer buffer, IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
Definition: ginfast.c:59
int gin_pending_list_limit
Definition: ginfast.c:39
static void processPendingPage(BuildAccumulator *accum, KeyArray *ka, Page page, OffsetNumber startoff)
Definition: ginfast.c:709
static void initKeyArray(KeyArray *keys, int32 maxvalues)
Definition: ginfast.c:675
static void makeSublist(Relation index, IndexTuple *tuples, int32 ntuples, GinMetaPageData *res)
Definition: ginfast.c:145
static void shiftList(Relation index, Buffer metabuffer, BlockNumber newHead, bool fill_fsm, IndexBulkDeleteResult *stats)
Definition: ginfast.c:554
static void addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
Definition: ginfast.c:685
void ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
Definition: ginfast.c:219
struct KeyArray KeyArray
void ginEntryInsert(GinState *ginstate, OffsetNumber attnum, Datum key, GinNullCategory category, ItemPointerData *items, uint32 nitem, GinStatsData *buildStats)
Definition: gininsert.c:176
Datum * ginExtractEntries(GinState *ginstate, OffsetNumber attnum, Datum value, bool isNull, int32 *nentries, GinNullCategory **categories)
Definition: ginutil.c:483
OffsetNumber gintuple_get_attrnum(GinState *ginstate, IndexTuple tuple)
Definition: ginutil.c:226
Buffer GinNewBuffer(Relation index)
Definition: ginutil.c:300
void GinInitBuffer(Buffer b, uint32 f)
Definition: ginutil.c:350
Datum gintuple_get_key(GinState *ginstate, IndexTuple tuple, GinNullCategory *category)
Definition: ginutil.c:259
void initGinState(GinState *state, Relation index)
Definition: ginutil.c:97
static MemoryContext opCtx
Definition: ginxlog.c:22
#define XLOG_GIN_UPDATE_META_PAGE
Definition: ginxlog.h:162
#define GIN_NDELETE_AT_ONCE
Definition: ginxlog.h:202
#define XLOG_GIN_INSERT_LISTPAGE
Definition: ginxlog.h:180
#define XLOG_GIN_DELETE_LISTPAGE
Definition: ginxlog.h:194
int maintenance_work_mem
Definition: globals.c:130
int work_mem
Definition: globals.c:128
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
void IndexFreeSpaceMapVacuum(Relation rel)
Definition: indexfsm.c:71
void RecordFreeIndexPage(Relation rel, BlockNumber freeBlock)
Definition: indexfsm.c:52
static struct @155 value
int i
Definition: isn.c:73
Pointer Item
Definition: item.h:17
struct ItemIdData ItemIdData
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
Definition: itemptr.c:35
static void ItemPointerSetInvalid(ItemPointerData *pointer)
Definition: itemptr.h:184
static bool ItemPointerIsValid(const ItemPointerData *pointer)
Definition: itemptr.h:83
IndexTupleData * IndexTuple
Definition: itup.h:53
#define IndexTupleSize(itup)
Definition: itup.h:70
bool ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
Definition: lmgr.c:532
void LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
Definition: lmgr.c:513
void UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
Definition: lmgr.c:548
#define ExclusiveLock
Definition: lockdefs.h:42
#define RowExclusiveLock
Definition: lockdefs.h:38
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
void * palloc(Size size)
Definition: mcxt.c:1316
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define MaxAllocSize
Definition: memutils.h:40
#define AmAutoVacuumWorkerProcess()
Definition: miscadmin.h:375
#define START_CRIT_SECTION()
Definition: miscadmin.h:149
#define END_CRIT_SECTION()
Definition: miscadmin.h:151
Oid GetUserId(void)
Definition: miscinit.c:514
#define InvalidOffsetNumber
Definition: off.h:26
#define OffsetNumberNext(offsetNumber)
Definition: off.h:52
uint16 OffsetNumber
Definition: off.h:24
#define FirstOffsetNumber
Definition: off.h:27
@ OBJECT_INDEX
Definition: parsenodes.h:2281
int16 attnum
Definition: pg_attribute.h:74
static uint32 pg_nextpower2_32(uint32 num)
Definition: pg_bitutils.h:189
const void * data
uintptr_t Datum
Definition: postgres.h:64
unsigned int Oid
Definition: postgres_ext.h:31
void CheckForSerializableConflictIn(Relation relation, ItemPointer tid, BlockNumber blkno)
Definition: predicate.c:4321
MemoryContextSwitchTo(old_ctx)
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RelationNeedsWAL(relation)
Definition: rel.h:628
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:658
static pg_noinline void Size size
Definition: slab.c:607
GinState * ginstate
Definition: gin_private.h:433
BlockNumber tail
Definition: ginblock.h:62
uint32 tailFreeSize
Definition: ginblock.h:67
BlockNumber nPendingPages
Definition: ginblock.h:73
int64 nPendingHeapTuples
Definition: ginblock.h:74
BlockNumber head
Definition: ginblock.h:61
Relation index
Definition: gin_private.h:59
IndexTuple * tuples
Definition: gin_private.h:455
BlockNumber pages_deleted
Definition: genam.h:82
ItemPointerData t_tid
Definition: itup.h:37
Datum * keys
Definition: ginfast.c:46
GinNullCategory * categories
Definition: ginfast.c:47
int32 nvalues
Definition: ginfast.c:48
int32 maxvalues
Definition: ginfast.c:49
Form_pg_index rd_index
Definition: rel.h:192
Form_pg_class rd_rel
Definition: rel.h:111
Definition: type.h:95
char data[BLCKSZ]
Definition: c.h:1119
void vacuum_delay_point(void)
Definition: vacuum.c:2340
bool RecoveryInProgress(void)
Definition: xlog.c:6290
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void XLogRegisterData(char *data, uint32 len)
Definition: xloginsert.c:364
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:474
void XLogRegisterBufData(uint8 block_id, char *data, uint32 len)
Definition: xloginsert.c:405
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:242
void XLogBeginInsert(void)
Definition: xloginsert.c:149
void XLogEnsureRecordSpace(int max_block_id, int ndatas)
Definition: xloginsert.c:175
#define REGBUF_STANDARD
Definition: xloginsert.h:34
#define REGBUF_WILL_INIT
Definition: xloginsert.h:33