PostgreSQL Source Code  git master
ginfast.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * ginfast.c
4  * Fast insert routines for the Postgres inverted index access method.
5  * Pending entries are stored in linear list of pages. Later on
6  * (typically during VACUUM), ginInsertCleanup() will be invoked to
7  * transfer pending entries into the regular index structure. This
8  * wins because bulk insertion is much more efficient than retail.
9  *
10  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  * IDENTIFICATION
14  * src/backend/access/gin/ginfast.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "access/gin_private.h"
22 #include "access/ginxlog.h"
23 #include "access/xlog.h"
24 #include "access/xloginsert.h"
25 #include "catalog/pg_am.h"
26 #include "commands/vacuum.h"
27 #include "miscadmin.h"
28 #include "port/pg_bitutils.h"
29 #include "postmaster/autovacuum.h"
30 #include "storage/indexfsm.h"
31 #include "storage/lmgr.h"
32 #include "storage/predicate.h"
33 #include "utils/acl.h"
34 #include "utils/builtins.h"
35 #include "utils/memutils.h"
36 #include "utils/rel.h"
37 
38 /* GUC parameter */
40 
41 #define GIN_PAGE_FREESIZE \
42  ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
43 
44 typedef struct KeyArray
45 {
46  Datum *keys; /* expansible array */
47  GinNullCategory *categories; /* another expansible array */
48  int32 nvalues; /* current number of valid entries */
49  int32 maxvalues; /* allocated size of arrays */
51 
52 
53 /*
54  * Build a pending-list page from the given array of tuples, and write it out.
55  *
56  * Returns amount of free space left on the page.
57  */
58 static int32
60  IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
61 {
62  Page page = BufferGetPage(buffer);
63  int32 i,
64  freesize,
65  size = 0;
66  OffsetNumber l,
67  off;
68  PGAlignedBlock workspace;
69  char *ptr;
70 
72 
73  GinInitBuffer(buffer, GIN_LIST);
74 
75  off = FirstOffsetNumber;
76  ptr = workspace.data;
77 
78  for (i = 0; i < ntuples; i++)
79  {
80  int this_size = IndexTupleSize(tuples[i]);
81 
82  memcpy(ptr, tuples[i], this_size);
83  ptr += this_size;
84  size += this_size;
85 
86  l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
87 
88  if (l == InvalidOffsetNumber)
89  elog(ERROR, "failed to add item to index page in \"%s\"",
91 
92  off++;
93  }
94 
95  Assert(size <= BLCKSZ); /* else we overran workspace */
96 
97  GinPageGetOpaque(page)->rightlink = rightlink;
98 
99  /*
100  * tail page may contain only whole row(s) or final part of row placed on
101  * previous pages (a "row" here meaning all the index tuples generated for
102  * one heap tuple)
103  */
104  if (rightlink == InvalidBlockNumber)
105  {
106  GinPageSetFullRow(page);
107  GinPageGetOpaque(page)->maxoff = 1;
108  }
109  else
110  {
111  GinPageGetOpaque(page)->maxoff = 0;
112  }
113 
114  MarkBufferDirty(buffer);
115 
116  if (RelationNeedsWAL(index))
117  {
119  XLogRecPtr recptr;
120 
121  data.rightlink = rightlink;
122  data.ntuples = ntuples;
123 
124  XLogBeginInsert();
125  XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
126 
128  XLogRegisterBufData(0, workspace.data, size);
129 
130  recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
131  PageSetLSN(page, recptr);
132  }
133 
134  /* get free space before releasing buffer */
135  freesize = PageGetExactFreeSpace(page);
136 
137  UnlockReleaseBuffer(buffer);
138 
140 
141  return freesize;
142 }
143 
144 static void
147 {
148  Buffer curBuffer = InvalidBuffer;
149  Buffer prevBuffer = InvalidBuffer;
150  int i,
151  size = 0,
152  tupsize;
153  int startTuple = 0;
154 
155  Assert(ntuples > 0);
156 
157  /*
158  * Split tuples into pages
159  */
160  for (i = 0; i < ntuples; i++)
161  {
162  if (curBuffer == InvalidBuffer)
163  {
164  curBuffer = GinNewBuffer(index);
165 
166  if (prevBuffer != InvalidBuffer)
167  {
168  res->nPendingPages++;
169  writeListPage(index, prevBuffer,
170  tuples + startTuple,
171  i - startTuple,
172  BufferGetBlockNumber(curBuffer));
173  }
174  else
175  {
176  res->head = BufferGetBlockNumber(curBuffer);
177  }
178 
179  prevBuffer = curBuffer;
180  startTuple = i;
181  size = 0;
182  }
183 
184  tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
185 
186  if (size + tupsize > GinListPageSize)
187  {
188  /* won't fit, force a new page and reprocess */
189  i--;
190  curBuffer = InvalidBuffer;
191  }
192  else
193  {
194  size += tupsize;
195  }
196  }
197 
198  /*
199  * Write last page
200  */
201  res->tail = BufferGetBlockNumber(curBuffer);
202  res->tailFreeSize = writeListPage(index, curBuffer,
203  tuples + startTuple,
204  ntuples - startTuple,
206  res->nPendingPages++;
207  /* that was only one heap tuple */
208  res->nPendingHeapTuples = 1;
209 }
210 
211 /*
212  * Write the index tuples contained in *collector into the index's
213  * pending list.
214  *
215  * Function guarantees that all these tuples will be inserted consecutively,
216  * preserving order
217  */
218 void
220 {
221  Relation index = ginstate->index;
222  Buffer metabuffer;
223  Page metapage;
224  GinMetaPageData *metadata = NULL;
225  Buffer buffer = InvalidBuffer;
226  Page page = NULL;
228  bool separateList = false;
229  bool needCleanup = false;
230  int cleanupSize;
231  bool needWal;
232 
233  if (collector->ntuples == 0)
234  return;
235 
236  needWal = RelationNeedsWAL(index);
237 
238  data.locator = index->rd_locator;
239  data.ntuples = 0;
240  data.newRightlink = data.prevTail = InvalidBlockNumber;
241 
242  metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
243  metapage = BufferGetPage(metabuffer);
244 
245  /*
246  * An insertion to the pending list could logically belong anywhere in the
247  * tree, so it conflicts with all serializable scans. All scans acquire a
248  * predicate lock on the metabuffer to represent that.
249  */
251 
252  if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
253  {
254  /*
255  * Total size is greater than one page => make sublist
256  */
257  separateList = true;
258  }
259  else
260  {
261  LockBuffer(metabuffer, GIN_EXCLUSIVE);
262  metadata = GinPageGetMeta(metapage);
263 
264  if (metadata->head == InvalidBlockNumber ||
265  collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
266  {
267  /*
268  * Pending list is empty or total size is greater than freespace
269  * on tail page => make sublist
270  *
271  * We unlock metabuffer to keep high concurrency
272  */
273  separateList = true;
274  LockBuffer(metabuffer, GIN_UNLOCK);
275  }
276  }
277 
278  if (separateList)
279  {
280  /*
281  * We should make sublist separately and append it to the tail
282  */
283  GinMetaPageData sublist;
284 
285  memset(&sublist, 0, sizeof(GinMetaPageData));
286  makeSublist(index, collector->tuples, collector->ntuples, &sublist);
287 
288  /*
289  * metapage was unlocked, see above
290  */
291  LockBuffer(metabuffer, GIN_EXCLUSIVE);
292  metadata = GinPageGetMeta(metapage);
293 
294  if (metadata->head == InvalidBlockNumber)
295  {
296  /*
297  * Main list is empty, so just insert sublist as main list
298  */
300 
301  metadata->head = sublist.head;
302  metadata->tail = sublist.tail;
303  metadata->tailFreeSize = sublist.tailFreeSize;
304 
305  metadata->nPendingPages = sublist.nPendingPages;
306  metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
307 
308  if (needWal)
309  XLogBeginInsert();
310  }
311  else
312  {
313  /*
314  * Merge lists
315  */
316  data.prevTail = metadata->tail;
317  data.newRightlink = sublist.head;
318 
319  buffer = ReadBuffer(index, metadata->tail);
320  LockBuffer(buffer, GIN_EXCLUSIVE);
321  page = BufferGetPage(buffer);
322 
323  Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
324 
326 
327  GinPageGetOpaque(page)->rightlink = sublist.head;
328 
329  MarkBufferDirty(buffer);
330 
331  metadata->tail = sublist.tail;
332  metadata->tailFreeSize = sublist.tailFreeSize;
333 
334  metadata->nPendingPages += sublist.nPendingPages;
335  metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
336 
337  if (needWal)
338  {
339  XLogBeginInsert();
341  }
342  }
343  }
344  else
345  {
346  /*
347  * Insert into tail page. Metapage is already locked
348  */
349  OffsetNumber l,
350  off;
351  int i,
352  tupsize;
353  char *ptr;
354  char *collectordata;
355 
356  buffer = ReadBuffer(index, metadata->tail);
357  LockBuffer(buffer, GIN_EXCLUSIVE);
358  page = BufferGetPage(buffer);
359 
360  off = (PageIsEmpty(page)) ? FirstOffsetNumber :
362 
363  collectordata = ptr = (char *) palloc(collector->sumsize);
364 
365  data.ntuples = collector->ntuples;
366 
368 
369  if (needWal)
370  XLogBeginInsert();
371 
372  /*
373  * Increase counter of heap tuples
374  */
375  Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
376  GinPageGetOpaque(page)->maxoff++;
377  metadata->nPendingHeapTuples++;
378 
379  for (i = 0; i < collector->ntuples; i++)
380  {
381  tupsize = IndexTupleSize(collector->tuples[i]);
382  l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
383 
384  if (l == InvalidOffsetNumber)
385  elog(ERROR, "failed to add item to index page in \"%s\"",
387 
388  memcpy(ptr, collector->tuples[i], tupsize);
389  ptr += tupsize;
390 
391  off++;
392  }
393 
394  Assert((ptr - collectordata) <= collector->sumsize);
395  if (needWal)
396  {
398  XLogRegisterBufData(1, collectordata, collector->sumsize);
399  }
400 
401  metadata->tailFreeSize = PageGetExactFreeSpace(page);
402 
403  MarkBufferDirty(buffer);
404  }
405 
406  /*
407  * Set pd_lower just past the end of the metadata. This is essential,
408  * because without doing so, metadata will be lost if xlog.c compresses
409  * the page. (We must do this here because pre-v11 versions of PG did not
410  * set the metapage's pd_lower correctly, so a pg_upgraded index might
411  * contain the wrong value.)
412  */
413  ((PageHeader) metapage)->pd_lower =
414  ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
415 
416  /*
417  * Write metabuffer, make xlog entry
418  */
419  MarkBufferDirty(metabuffer);
420 
421  if (needWal)
422  {
423  XLogRecPtr recptr;
424 
425  memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
426 
428  XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
429 
430  recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
431  PageSetLSN(metapage, recptr);
432 
433  if (buffer != InvalidBuffer)
434  {
435  PageSetLSN(page, recptr);
436  }
437  }
438 
439  if (buffer != InvalidBuffer)
440  UnlockReleaseBuffer(buffer);
441 
442  /*
443  * Force pending list cleanup when it becomes too long. And,
444  * ginInsertCleanup could take significant amount of time, so we prefer to
445  * call it when it can do all the work in a single collection cycle. In
446  * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
447  * while pending list is still small enough to fit into
448  * gin_pending_list_limit.
449  *
450  * ginInsertCleanup() should not be called inside our CRIT_SECTION.
451  */
452  cleanupSize = GinGetPendingListCleanupSize(index);
453  if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
454  needCleanup = true;
455 
456  UnlockReleaseBuffer(metabuffer);
457 
459 
460  /*
461  * Since it could contend with concurrent cleanup process we cleanup
462  * pending list not forcibly.
463  */
464  if (needCleanup)
465  ginInsertCleanup(ginstate, false, true, false, NULL);
466 }
467 
468 /*
469  * Create temporary index tuples for a single indexable item (one index column
470  * for the heap tuple specified by ht_ctid), and append them to the array
471  * in *collector. They will subsequently be written out using
472  * ginHeapTupleFastInsert. Note that to guarantee consistent state, all
473  * temp tuples for a given heap tuple must be written in one call to
474  * ginHeapTupleFastInsert.
475  */
476 void
478  GinTupleCollector *collector,
479  OffsetNumber attnum, Datum value, bool isNull,
480  ItemPointer ht_ctid)
481 {
482  Datum *entries;
483  GinNullCategory *categories;
484  int32 i,
485  nentries;
486 
487  /*
488  * Extract the key values that need to be inserted in the index
489  */
490  entries = ginExtractEntries(ginstate, attnum, value, isNull,
491  &nentries, &categories);
492 
493  /*
494  * Protect against integer overflow in allocation calculations
495  */
496  if (nentries < 0 ||
497  collector->ntuples + nentries > MaxAllocSize / sizeof(IndexTuple))
498  elog(ERROR, "too many entries for GIN index");
499 
500  /*
501  * Allocate/reallocate memory for storing collected tuples
502  */
503  if (collector->tuples == NULL)
504  {
505  /*
506  * Determine the number of elements to allocate in the tuples array
507  * initially. Make it a power of 2 to avoid wasting memory when
508  * resizing (since palloc likes powers of 2).
509  */
510  collector->lentuples = pg_nextpower2_32(Max(16, nentries));
511  collector->tuples = palloc_array(IndexTuple, collector->lentuples);
512  }
513  else if (collector->lentuples < collector->ntuples + nentries)
514  {
515  /*
516  * Advance lentuples to the next suitable power of 2. This won't
517  * overflow, though we could get to a value that exceeds
518  * MaxAllocSize/sizeof(IndexTuple), causing an error in repalloc.
519  */
520  collector->lentuples = pg_nextpower2_32(collector->ntuples + nentries);
521  collector->tuples = repalloc_array(collector->tuples,
522  IndexTuple, collector->lentuples);
523  }
524 
525  /*
526  * Build an index tuple for each key value, and add to array. In pending
527  * tuples we just stick the heap TID into t_tid.
528  */
529  for (i = 0; i < nentries; i++)
530  {
531  IndexTuple itup;
532 
533  itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
534  NULL, 0, 0, true);
535  itup->t_tid = *ht_ctid;
536  collector->tuples[collector->ntuples++] = itup;
537  collector->sumsize += IndexTupleSize(itup);
538  }
539 }
540 
541 /*
542  * Deletes pending list pages up to (not including) newHead page.
543  * If newHead == InvalidBlockNumber then function drops the whole list.
544  *
545  * metapage is pinned and exclusive-locked throughout this function.
546  */
547 static void
549  bool fill_fsm, IndexBulkDeleteResult *stats)
550 {
551  Page metapage;
552  GinMetaPageData *metadata;
553  BlockNumber blknoToDelete;
554 
555  metapage = BufferGetPage(metabuffer);
556  metadata = GinPageGetMeta(metapage);
557  blknoToDelete = metadata->head;
558 
559  do
560  {
561  Page page;
562  int i;
563  int64 nDeletedHeapTuples = 0;
565  Buffer buffers[GIN_NDELETE_AT_ONCE];
566  BlockNumber freespace[GIN_NDELETE_AT_ONCE];
567 
568  data.ndeleted = 0;
569  while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
570  {
571  freespace[data.ndeleted] = blknoToDelete;
572  buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
573  LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
574  page = BufferGetPage(buffers[data.ndeleted]);
575 
576  data.ndeleted++;
577 
578  Assert(!GinPageIsDeleted(page));
579 
580  nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
581  blknoToDelete = GinPageGetOpaque(page)->rightlink;
582  }
583 
584  if (stats)
585  stats->pages_deleted += data.ndeleted;
586 
587  /*
588  * This operation touches an unusually large number of pages, so
589  * prepare the XLogInsert machinery for that before entering the
590  * critical section.
591  */
592  if (RelationNeedsWAL(index))
593  XLogEnsureRecordSpace(data.ndeleted, 0);
594 
596 
597  metadata->head = blknoToDelete;
598 
599  Assert(metadata->nPendingPages >= data.ndeleted);
600  metadata->nPendingPages -= data.ndeleted;
601  Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
602  metadata->nPendingHeapTuples -= nDeletedHeapTuples;
603 
604  if (blknoToDelete == InvalidBlockNumber)
605  {
606  metadata->tail = InvalidBlockNumber;
607  metadata->tailFreeSize = 0;
608  metadata->nPendingPages = 0;
609  metadata->nPendingHeapTuples = 0;
610  }
611 
612  /*
613  * Set pd_lower just past the end of the metadata. This is essential,
614  * because without doing so, metadata will be lost if xlog.c
615  * compresses the page. (We must do this here because pre-v11
616  * versions of PG did not set the metapage's pd_lower correctly, so a
617  * pg_upgraded index might contain the wrong value.)
618  */
619  ((PageHeader) metapage)->pd_lower =
620  ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
621 
622  MarkBufferDirty(metabuffer);
623 
624  for (i = 0; i < data.ndeleted; i++)
625  {
626  page = BufferGetPage(buffers[i]);
627  GinPageGetOpaque(page)->flags = GIN_DELETED;
628  MarkBufferDirty(buffers[i]);
629  }
630 
631  if (RelationNeedsWAL(index))
632  {
633  XLogRecPtr recptr;
634 
635  XLogBeginInsert();
636  XLogRegisterBuffer(0, metabuffer,
638  for (i = 0; i < data.ndeleted; i++)
639  XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
640 
641  memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
642 
643  XLogRegisterData((char *) &data,
644  sizeof(ginxlogDeleteListPages));
645 
646  recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
647  PageSetLSN(metapage, recptr);
648 
649  for (i = 0; i < data.ndeleted; i++)
650  {
651  page = BufferGetPage(buffers[i]);
652  PageSetLSN(page, recptr);
653  }
654  }
655 
656  for (i = 0; i < data.ndeleted; i++)
657  UnlockReleaseBuffer(buffers[i]);
658 
660 
661  for (i = 0; fill_fsm && i < data.ndeleted; i++)
662  RecordFreeIndexPage(index, freespace[i]);
663 
664  } while (blknoToDelete != newHead);
665 }
666 
667 /* Initialize empty KeyArray */
668 static void
669 initKeyArray(KeyArray *keys, int32 maxvalues)
670 {
671  keys->keys = palloc_array(Datum, maxvalues);
672  keys->categories = palloc_array(GinNullCategory, maxvalues);
673  keys->nvalues = 0;
674  keys->maxvalues = maxvalues;
675 }
676 
677 /* Add datum to KeyArray, resizing if needed */
678 static void
679 addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
680 {
681  if (keys->nvalues >= keys->maxvalues)
682  {
683  keys->maxvalues *= 2;
684  keys->keys = repalloc_array(keys->keys, Datum, keys->maxvalues);
686  }
687 
688  keys->keys[keys->nvalues] = datum;
689  keys->categories[keys->nvalues] = category;
690  keys->nvalues++;
691 }
692 
693 /*
694  * Collect data from a pending-list page in preparation for insertion into
695  * the main index.
696  *
697  * Go through all tuples >= startoff on page and collect values in accum
698  *
699  * Note that ka is just workspace --- it does not carry any state across
700  * calls.
701  */
702 static void
704  Page page, OffsetNumber startoff)
705 {
706  ItemPointerData heapptr;
707  OffsetNumber i,
708  maxoff;
709  OffsetNumber attrnum;
710 
711  /* reset *ka to empty */
712  ka->nvalues = 0;
713 
714  maxoff = PageGetMaxOffsetNumber(page);
715  Assert(maxoff >= FirstOffsetNumber);
716  ItemPointerSetInvalid(&heapptr);
717  attrnum = 0;
718 
719  for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
720  {
721  IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
722  OffsetNumber curattnum;
723  Datum curkey;
724  GinNullCategory curcategory;
725 
726  /* Check for change of heap TID or attnum */
727  curattnum = gintuple_get_attrnum(accum->ginstate, itup);
728 
729  if (!ItemPointerIsValid(&heapptr))
730  {
731  heapptr = itup->t_tid;
732  attrnum = curattnum;
733  }
734  else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
735  curattnum == attrnum))
736  {
737  /*
738  * ginInsertBAEntries can insert several datums per call, but only
739  * for one heap tuple and one column. So call it at a boundary,
740  * and reset ka.
741  */
742  ginInsertBAEntries(accum, &heapptr, attrnum,
743  ka->keys, ka->categories, ka->nvalues);
744  ka->nvalues = 0;
745  heapptr = itup->t_tid;
746  attrnum = curattnum;
747  }
748 
749  /* Add key to KeyArray */
750  curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
751  addDatum(ka, curkey, curcategory);
752  }
753 
754  /* Dump out all remaining keys */
755  ginInsertBAEntries(accum, &heapptr, attrnum,
756  ka->keys, ka->categories, ka->nvalues);
757 }
758 
759 /*
760  * Move tuples from pending pages into regular GIN structure.
761  *
762  * On first glance it looks completely not crash-safe. But if we crash
763  * after posting entries to the main index and before removing them from the
764  * pending list, it's okay because when we redo the posting later on, nothing
765  * bad will happen.
766  *
767  * fill_fsm indicates that ginInsertCleanup should add deleted pages
768  * to FSM otherwise caller is responsible to put deleted pages into
769  * FSM.
770  *
771  * If stats isn't null, we count deleted pending pages into the counts.
772  */
773 void
774 ginInsertCleanup(GinState *ginstate, bool full_clean,
775  bool fill_fsm, bool forceCleanup,
776  IndexBulkDeleteResult *stats)
777 {
778  Relation index = ginstate->index;
779  Buffer metabuffer,
780  buffer;
781  Page metapage,
782  page;
783  GinMetaPageData *metadata;
785  oldCtx;
786  BuildAccumulator accum;
787  KeyArray datums;
788  BlockNumber blkno,
789  blknoFinish;
790  bool cleanupFinish = false;
791  bool fsm_vac = false;
792  Size workMemory;
793 
794  /*
795  * We would like to prevent concurrent cleanup process. For that we will
796  * lock metapage in exclusive mode using LockPage() call. Nobody other
797  * will use that lock for metapage, so we keep possibility of concurrent
798  * insertion into pending list
799  */
800 
801  if (forceCleanup)
802  {
803  /*
804  * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
805  * and we would like to wait concurrent cleanup to finish.
806  */
808  workMemory =
811  }
812  else
813  {
814  /*
815  * We are called from regular insert and if we see concurrent cleanup
816  * just exit in hope that concurrent process will clean up pending
817  * list.
818  */
820  return;
821  workMemory = work_mem;
822  }
823 
824  metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
825  LockBuffer(metabuffer, GIN_SHARE);
826  metapage = BufferGetPage(metabuffer);
827  metadata = GinPageGetMeta(metapage);
828 
829  if (metadata->head == InvalidBlockNumber)
830  {
831  /* Nothing to do */
832  UnlockReleaseBuffer(metabuffer);
834  return;
835  }
836 
837  /*
838  * Remember a tail page to prevent infinite cleanup if other backends add
839  * new tuples faster than we can cleanup.
840  */
841  blknoFinish = metadata->tail;
842 
843  /*
844  * Read and lock head of pending list
845  */
846  blkno = metadata->head;
847  buffer = ReadBuffer(index, blkno);
848  LockBuffer(buffer, GIN_SHARE);
849  page = BufferGetPage(buffer);
850 
851  LockBuffer(metabuffer, GIN_UNLOCK);
852 
853  /*
854  * Initialize. All temporary space will be in opCtx
855  */
857  "GIN insert cleanup temporary context",
859 
860  oldCtx = MemoryContextSwitchTo(opCtx);
861 
862  initKeyArray(&datums, 128);
863  ginInitBA(&accum);
864  accum.ginstate = ginstate;
865 
866  /*
867  * At the top of this loop, we have pin and lock on the current page of
868  * the pending list. However, we'll release that before exiting the loop.
869  * Note we also have pin but not lock on the metapage.
870  */
871  for (;;)
872  {
873  Assert(!GinPageIsDeleted(page));
874 
875  /*
876  * Are we walk through the page which as we remember was a tail when
877  * we start our cleanup? But if caller asks us to clean up whole
878  * pending list then ignore old tail, we will work until list becomes
879  * empty.
880  */
881  if (blkno == blknoFinish && full_clean == false)
882  cleanupFinish = true;
883 
884  /*
885  * read page's datums into accum
886  */
887  processPendingPage(&accum, &datums, page, FirstOffsetNumber);
888 
890 
891  /*
892  * Is it time to flush memory to disk? Flush if we are at the end of
893  * the pending list, or if we have a full row and memory is getting
894  * full.
895  */
896  if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
897  (GinPageHasFullRow(page) &&
898  (accum.allocatedMemory >= workMemory * 1024L)))
899  {
901  uint32 nlist;
902  Datum key;
903  GinNullCategory category;
904  OffsetNumber maxoff,
905  attnum;
906 
907  /*
908  * Unlock current page to increase performance. Changes of page
909  * will be checked later by comparing maxoff after completion of
910  * memory flush.
911  */
912  maxoff = PageGetMaxOffsetNumber(page);
913  LockBuffer(buffer, GIN_UNLOCK);
914 
915  /*
916  * Moving collected data into regular structure can take
917  * significant amount of time - so, run it without locking pending
918  * list.
919  */
920  ginBeginBAScan(&accum);
921  while ((list = ginGetBAEntry(&accum,
922  &attnum, &key, &category, &nlist)) != NULL)
923  {
924  ginEntryInsert(ginstate, attnum, key, category,
925  list, nlist, NULL);
927  }
928 
929  /*
930  * Lock the whole list to remove pages
931  */
932  LockBuffer(metabuffer, GIN_EXCLUSIVE);
933  LockBuffer(buffer, GIN_SHARE);
934 
935  Assert(!GinPageIsDeleted(page));
936 
937  /*
938  * While we left the page unlocked, more stuff might have gotten
939  * added to it. If so, process those entries immediately. There
940  * shouldn't be very many, so we don't worry about the fact that
941  * we're doing this with exclusive lock. Insertion algorithm
942  * guarantees that inserted row(s) will not continue on next page.
943  * NOTE: intentionally no vacuum_delay_point in this loop.
944  */
945  if (PageGetMaxOffsetNumber(page) != maxoff)
946  {
947  ginInitBA(&accum);
948  processPendingPage(&accum, &datums, page, maxoff + 1);
949 
950  ginBeginBAScan(&accum);
951  while ((list = ginGetBAEntry(&accum,
952  &attnum, &key, &category, &nlist)) != NULL)
953  ginEntryInsert(ginstate, attnum, key, category,
954  list, nlist, NULL);
955  }
956 
957  /*
958  * Remember next page - it will become the new list head
959  */
960  blkno = GinPageGetOpaque(page)->rightlink;
961  UnlockReleaseBuffer(buffer); /* shiftList will do exclusive
962  * locking */
963 
964  /*
965  * remove read pages from pending list, at this point all content
966  * of read pages is in regular structure
967  */
968  shiftList(index, metabuffer, blkno, fill_fsm, stats);
969 
970  /* At this point, some pending pages have been freed up */
971  fsm_vac = true;
972 
973  Assert(blkno == metadata->head);
974  LockBuffer(metabuffer, GIN_UNLOCK);
975 
976  /*
977  * if we removed the whole pending list or we cleanup tail (which
978  * we remembered on start our cleanup process) then just exit
979  */
980  if (blkno == InvalidBlockNumber || cleanupFinish)
981  break;
982 
983  /*
984  * release memory used so far and reinit state
985  */
987  initKeyArray(&datums, datums.maxvalues);
988  ginInitBA(&accum);
989  }
990  else
991  {
992  blkno = GinPageGetOpaque(page)->rightlink;
993  UnlockReleaseBuffer(buffer);
994  }
995 
996  /*
997  * Read next page in pending list
998  */
1000  buffer = ReadBuffer(index, blkno);
1001  LockBuffer(buffer, GIN_SHARE);
1002  page = BufferGetPage(buffer);
1003  }
1004 
1006  ReleaseBuffer(metabuffer);
1007 
1008  /*
1009  * As pending list pages can have a high churn rate, it is desirable to
1010  * recycle them immediately to the FreeSpaceMap when ordinary backends
1011  * clean the list.
1012  */
1013  if (fsm_vac && fill_fsm)
1015 
1016  /* Clean up temporary space */
1017  MemoryContextSwitchTo(oldCtx);
1019 }
1020 
1021 /*
1022  * SQL-callable function to clean the insert pending list
1023  */
1024 Datum
1026 {
1027  Oid indexoid = PG_GETARG_OID(0);
1028  Relation indexRel = index_open(indexoid, RowExclusiveLock);
1029  IndexBulkDeleteResult stats;
1030  GinState ginstate;
1031 
1032  if (RecoveryInProgress())
1033  ereport(ERROR,
1034  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1035  errmsg("recovery is in progress"),
1036  errhint("GIN pending list cannot be cleaned up during recovery.")));
1037 
1038  /* Must be a GIN index */
1039  if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1040  indexRel->rd_rel->relam != GIN_AM_OID)
1041  ereport(ERROR,
1042  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1043  errmsg("\"%s\" is not a GIN index",
1044  RelationGetRelationName(indexRel))));
1045 
1046  /*
1047  * Reject attempts to read non-local temporary relations; we would be
1048  * likely to get wrong data since we have no visibility into the owning
1049  * session's local buffers.
1050  */
1051  if (RELATION_IS_OTHER_TEMP(indexRel))
1052  ereport(ERROR,
1053  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1054  errmsg("cannot access temporary indexes of other sessions")));
1055 
1056  /* User must own the index (comparable to privileges needed for VACUUM) */
1057  if (!object_ownercheck(RelationRelationId, indexoid, GetUserId()))
1059  RelationGetRelationName(indexRel));
1060 
1061  memset(&stats, 0, sizeof(stats));
1062  initGinState(&ginstate, indexRel);
1063  ginInsertCleanup(&ginstate, true, true, true, &stats);
1064 
1065  index_close(indexRel, RowExclusiveLock);
1066 
1067  PG_RETURN_INT64((int64) stats.pages_deleted);
1068 }
@ ACLCHECK_NOT_OWNER
Definition: acl.h:185
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2679
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition: aclchk.c:3984
int autovacuum_work_mem
Definition: autovacuum.c:118
bool IsAutoVacuumWorkerProcess(void)
Definition: autovacuum.c:3321
uint32 BlockNumber
Definition: block.h:31
#define InvalidBlockNumber
Definition: block.h:33
int Buffer
Definition: buf.h:23
#define InvalidBuffer
Definition: buf.h:25
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:2811
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4004
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4027
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:1631
void LockBuffer(Buffer buffer, int mode)
Definition: bufmgr.c:4245
Buffer ReadBuffer(Relation reln, BlockNumber blockNum)
Definition: bufmgr.c:704
static Page BufferGetPage(Buffer buffer)
Definition: bufmgr.h:285
Size PageGetExactFreeSpace(Page page)
Definition: bufpage.c:958
PageHeaderData * PageHeader
Definition: bufpage.h:170
static bool PageIsEmpty(Page page)
Definition: bufpage.h:220
Pointer Page
Definition: bufpage.h:78
static Item PageGetItem(Page page, ItemId itemId)
Definition: bufpage.h:351
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
Definition: bufpage.h:240
static void PageSetLSN(Page page, XLogRecPtr lsn)
Definition: bufpage.h:388
static OffsetNumber PageGetMaxOffsetNumber(Page page)
Definition: bufpage.h:369
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:468
unsigned int uint32
Definition: c.h:490
#define MAXALIGN(LEN)
Definition: c.h:795
signed int int32
Definition: c.h:478
#define Max(x, y)
Definition: c.h:982
size_t Size
Definition: c.h:589
int errhint(const char *fmt,...)
Definition: elog.c:1316
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
#define repalloc_array(pointer, type, count)
Definition: fe_memutils.h:66
#define palloc_array(type, count)
Definition: fe_memutils.h:64
#define PG_GETARG_OID(n)
Definition: fmgr.h:275
#define PG_RETURN_INT64(x)
Definition: fmgr.h:368
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define GinGetPendingListCleanupSize(relation)
Definition: gin_private.h:38
#define GIN_UNLOCK
Definition: gin_private.h:48
#define GIN_EXCLUSIVE
Definition: gin_private.h:50
#define GIN_SHARE
Definition: gin_private.h:49
#define GinListPageSize
Definition: ginblock.h:327
#define GIN_METAPAGE_BLKNO
Definition: ginblock.h:51
#define GinPageHasFullRow(page)
Definition: ginblock.h:119
#define GinPageGetOpaque(page)
Definition: ginblock.h:110
#define GIN_DELETED
Definition: ginblock.h:43
#define GIN_LIST
Definition: ginblock.h:45
signed char GinNullCategory
Definition: ginblock.h:206
#define GinPageGetMeta(p)
Definition: ginblock.h:104
#define GinPageIsDeleted(page)
Definition: ginblock.h:124
#define GinPageSetFullRow(page)
Definition: ginblock.h:120
void ginBeginBAScan(BuildAccumulator *accum)
Definition: ginbulk.c:257
void ginInsertBAEntries(BuildAccumulator *accum, ItemPointer heapptr, OffsetNumber attnum, Datum *entries, GinNullCategory *categories, int32 nentries)
Definition: ginbulk.c:210
void ginInitBA(BuildAccumulator *accum)
Definition: ginbulk.c:109
ItemPointerData * ginGetBAEntry(BuildAccumulator *accum, OffsetNumber *attnum, Datum *key, GinNullCategory *category, uint32 *n)
Definition: ginbulk.c:268
IndexTuple GinFormTuple(GinState *ginstate, OffsetNumber attnum, Datum key, GinNullCategory category, Pointer data, Size dataSize, int nipd, bool errorTooBig)
Definition: ginentrypage.c:45
#define GIN_PAGE_FREESIZE
Definition: ginfast.c:41
Datum gin_clean_pending_list(PG_FUNCTION_ARGS)
Definition: ginfast.c:1025
void ginInsertCleanup(GinState *ginstate, bool full_clean, bool fill_fsm, bool forceCleanup, IndexBulkDeleteResult *stats)
Definition: ginfast.c:774
void ginHeapTupleFastCollect(GinState *ginstate, GinTupleCollector *collector, OffsetNumber attnum, Datum value, bool isNull, ItemPointer ht_ctid)
Definition: ginfast.c:477
static int32 writeListPage(Relation index, Buffer buffer, IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
Definition: ginfast.c:59
int gin_pending_list_limit
Definition: ginfast.c:39
static void processPendingPage(BuildAccumulator *accum, KeyArray *ka, Page page, OffsetNumber startoff)
Definition: ginfast.c:703
static void initKeyArray(KeyArray *keys, int32 maxvalues)
Definition: ginfast.c:669
static void makeSublist(Relation index, IndexTuple *tuples, int32 ntuples, GinMetaPageData *res)
Definition: ginfast.c:145
static void shiftList(Relation index, Buffer metabuffer, BlockNumber newHead, bool fill_fsm, IndexBulkDeleteResult *stats)
Definition: ginfast.c:548
static void addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
Definition: ginfast.c:679
void ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
Definition: ginfast.c:219
struct KeyArray KeyArray
void ginEntryInsert(GinState *ginstate, OffsetNumber attnum, Datum key, GinNullCategory category, ItemPointerData *items, uint32 nitem, GinStatsData *buildStats)
Definition: gininsert.c:179
Datum * ginExtractEntries(GinState *ginstate, OffsetNumber attnum, Datum value, bool isNull, int32 *nentries, GinNullCategory **categories)
Definition: ginutil.c:490
OffsetNumber gintuple_get_attrnum(GinState *ginstate, IndexTuple tuple)
Definition: ginutil.c:225
Buffer GinNewBuffer(Relation index)
Definition: ginutil.c:299
void GinInitBuffer(Buffer b, uint32 f)
Definition: ginutil.c:357
Datum gintuple_get_key(GinState *ginstate, IndexTuple tuple, GinNullCategory *category)
Definition: ginutil.c:258
void initGinState(GinState *state, Relation index)
Definition: ginutil.c:96
static MemoryContext opCtx
Definition: ginxlog.c:22
#define XLOG_GIN_UPDATE_META_PAGE
Definition: ginxlog.h:162
#define GIN_NDELETE_AT_ONCE
Definition: ginxlog.h:202
#define XLOG_GIN_INSERT_LISTPAGE
Definition: ginxlog.h:180
#define XLOG_GIN_DELETE_LISTPAGE
Definition: ginxlog.h:194
int maintenance_work_mem
Definition: globals.c:127
int work_mem
Definition: globals.c:125
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:158
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:132
void IndexFreeSpaceMapVacuum(Relation rel)
Definition: indexfsm.c:71
void RecordFreeIndexPage(Relation rel, BlockNumber freeBlock)
Definition: indexfsm.c:52
static struct @143 value
int i
Definition: isn.c:73
Pointer Item
Definition: item.h:17
struct ItemIdData ItemIdData
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
Definition: itemptr.c:35
static void ItemPointerSetInvalid(ItemPointerData *pointer)
Definition: itemptr.h:184
static bool ItemPointerIsValid(const ItemPointerData *pointer)
Definition: itemptr.h:83
IndexTupleData * IndexTuple
Definition: itup.h:53
#define IndexTupleSize(itup)
Definition: itup.h:70
Assert(fmt[strlen(fmt) - 1] !='\n')
bool ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
Definition: lmgr.c:533
void LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
Definition: lmgr.c:514
void UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
Definition: lmgr.c:549
#define ExclusiveLock
Definition: lockdefs.h:42
#define RowExclusiveLock
Definition: lockdefs.h:38
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:314
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:387
void * palloc(Size size)
Definition: mcxt.c:1210
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define MaxAllocSize
Definition: memutils.h:40
#define START_CRIT_SECTION()
Definition: miscadmin.h:148
#define END_CRIT_SECTION()
Definition: miscadmin.h:150
Oid GetUserId(void)
Definition: miscinit.c:510
#define InvalidOffsetNumber
Definition: off.h:26
#define OffsetNumberNext(offsetNumber)
Definition: off.h:52
uint16 OffsetNumber
Definition: off.h:24
#define FirstOffsetNumber
Definition: off.h:27
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
@ OBJECT_INDEX
Definition: parsenodes.h:1995
int16 attnum
Definition: pg_attribute.h:83
static uint32 pg_nextpower2_32(uint32 num)
Definition: pg_bitutils.h:185
const void * data
uintptr_t Datum
Definition: postgres.h:64
unsigned int Oid
Definition: postgres_ext.h:31
void CheckForSerializableConflictIn(Relation relation, ItemPointer tid, BlockNumber blkno)
Definition: predicate.c:4270
#define RelationGetRelationName(relation)
Definition: rel.h:537
#define RelationNeedsWAL(relation)
Definition: rel.h:628
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:658
GinState * ginstate
Definition: gin_private.h:432
BlockNumber tail
Definition: ginblock.h:62
uint32 tailFreeSize
Definition: ginblock.h:67
BlockNumber nPendingPages
Definition: ginblock.h:73
int64 nPendingHeapTuples
Definition: ginblock.h:74
BlockNumber head
Definition: ginblock.h:61
Relation index
Definition: gin_private.h:58
IndexTuple * tuples
Definition: gin_private.h:454
BlockNumber pages_deleted
Definition: genam.h:81
ItemPointerData t_tid
Definition: itup.h:37
Datum * keys
Definition: ginfast.c:46
GinNullCategory * categories
Definition: ginfast.c:47
int32 nvalues
Definition: ginfast.c:48
int32 maxvalues
Definition: ginfast.c:49
Form_pg_class rd_rel
Definition: rel.h:110
Definition: type.h:95
char data[BLCKSZ]
Definition: c.h:1130
void vacuum_delay_point(void)
Definition: vacuum.c:2211
bool RecoveryInProgress(void)
Definition: xlog.c:5908
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void XLogRegisterData(char *data, uint32 len)
Definition: xloginsert.c:351
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:451
void XLogRegisterBufData(uint8 block_id, char *data, uint32 len)
Definition: xloginsert.c:389
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:243
void XLogBeginInsert(void)
Definition: xloginsert.c:150
void XLogEnsureRecordSpace(int max_block_id, int ndatas)
Definition: xloginsert.c:176
#define REGBUF_STANDARD
Definition: xloginsert.h:34
#define REGBUF_WILL_INIT
Definition: xloginsert.h:33