PostgreSQL Source Code  git master
ginfast.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * ginfast.c
4  * Fast insert routines for the Postgres inverted index access method.
5  * Pending entries are stored in linear list of pages. Later on
6  * (typically during VACUUM), ginInsertCleanup() will be invoked to
7  * transfer pending entries into the regular index structure. This
8  * wins because bulk insertion is much more efficient than retail.
9  *
10  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  * IDENTIFICATION
14  * src/backend/access/gin/ginfast.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "access/gin_private.h"
22 #include "access/ginxlog.h"
23 #include "access/xlog.h"
24 #include "access/xloginsert.h"
25 #include "catalog/pg_am.h"
26 #include "commands/vacuum.h"
27 #include "miscadmin.h"
28 #include "port/pg_bitutils.h"
29 #include "postmaster/autovacuum.h"
30 #include "storage/indexfsm.h"
31 #include "storage/lmgr.h"
32 #include "storage/predicate.h"
33 #include "utils/acl.h"
34 #include "utils/builtins.h"
35 #include "utils/memutils.h"
36 #include "utils/rel.h"
37 
38 /* GUC parameter */
40 
41 #define GIN_PAGE_FREESIZE \
42  ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
43 
44 typedef struct KeyArray
45 {
46  Datum *keys; /* expansible array */
47  GinNullCategory *categories; /* another expansible array */
48  int32 nvalues; /* current number of valid entries */
49  int32 maxvalues; /* allocated size of arrays */
50 } KeyArray;
51 
52 
53 /*
54  * Build a pending-list page from the given array of tuples, and write it out.
55  *
56  * Returns amount of free space left on the page.
57  */
58 static int32
60  IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
61 {
62  Page page = BufferGetPage(buffer);
63  int32 i,
64  freesize,
65  size = 0;
66  OffsetNumber l,
67  off;
68  PGAlignedBlock workspace;
69  char *ptr;
70 
72 
73  GinInitBuffer(buffer, GIN_LIST);
74 
75  off = FirstOffsetNumber;
76  ptr = workspace.data;
77 
78  for (i = 0; i < ntuples; i++)
79  {
80  int this_size = IndexTupleSize(tuples[i]);
81 
82  memcpy(ptr, tuples[i], this_size);
83  ptr += this_size;
84  size += this_size;
85 
86  l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
87 
88  if (l == InvalidOffsetNumber)
89  elog(ERROR, "failed to add item to index page in \"%s\"",
91 
92  off++;
93  }
94 
95  Assert(size <= BLCKSZ); /* else we overran workspace */
96 
97  GinPageGetOpaque(page)->rightlink = rightlink;
98 
99  /*
100  * tail page may contain only whole row(s) or final part of row placed on
101  * previous pages (a "row" here meaning all the index tuples generated for
102  * one heap tuple)
103  */
104  if (rightlink == InvalidBlockNumber)
105  {
106  GinPageSetFullRow(page);
107  GinPageGetOpaque(page)->maxoff = 1;
108  }
109  else
110  {
111  GinPageGetOpaque(page)->maxoff = 0;
112  }
113 
114  MarkBufferDirty(buffer);
115 
116  if (RelationNeedsWAL(index))
117  {
119  XLogRecPtr recptr;
120 
121  data.rightlink = rightlink;
122  data.ntuples = ntuples;
123 
124  XLogBeginInsert();
125  XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
126 
128  XLogRegisterBufData(0, workspace.data, size);
129 
130  recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
131  PageSetLSN(page, recptr);
132  }
133 
134  /* get free space before releasing buffer */
135  freesize = PageGetExactFreeSpace(page);
136 
137  UnlockReleaseBuffer(buffer);
138 
140 
141  return freesize;
142 }
143 
144 static void
146  GinMetaPageData *res)
147 {
148  Buffer curBuffer = InvalidBuffer;
149  Buffer prevBuffer = InvalidBuffer;
150  int i,
151  size = 0,
152  tupsize;
153  int startTuple = 0;
154 
155  Assert(ntuples > 0);
156 
157  /*
158  * Split tuples into pages
159  */
160  for (i = 0; i < ntuples; i++)
161  {
162  if (curBuffer == InvalidBuffer)
163  {
164  curBuffer = GinNewBuffer(index);
165 
166  if (prevBuffer != InvalidBuffer)
167  {
168  res->nPendingPages++;
169  writeListPage(index, prevBuffer,
170  tuples + startTuple,
171  i - startTuple,
172  BufferGetBlockNumber(curBuffer));
173  }
174  else
175  {
176  res->head = BufferGetBlockNumber(curBuffer);
177  }
178 
179  prevBuffer = curBuffer;
180  startTuple = i;
181  size = 0;
182  }
183 
184  tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
185 
186  if (size + tupsize > GinListPageSize)
187  {
188  /* won't fit, force a new page and reprocess */
189  i--;
190  curBuffer = InvalidBuffer;
191  }
192  else
193  {
194  size += tupsize;
195  }
196  }
197 
198  /*
199  * Write last page
200  */
201  res->tail = BufferGetBlockNumber(curBuffer);
202  res->tailFreeSize = writeListPage(index, curBuffer,
203  tuples + startTuple,
204  ntuples - startTuple,
206  res->nPendingPages++;
207  /* that was only one heap tuple */
208  res->nPendingHeapTuples = 1;
209 }
210 
211 /*
212  * Write the index tuples contained in *collector into the index's
213  * pending list.
214  *
215  * Function guarantees that all these tuples will be inserted consecutively,
216  * preserving order
217  */
218 void
220 {
221  Relation index = ginstate->index;
222  Buffer metabuffer;
223  Page metapage;
224  GinMetaPageData *metadata = NULL;
225  Buffer buffer = InvalidBuffer;
226  Page page = NULL;
227  ginxlogUpdateMeta data;
228  bool separateList = false;
229  bool needCleanup = false;
230  int cleanupSize;
231  bool needWal;
232 
233  if (collector->ntuples == 0)
234  return;
235 
236  needWal = RelationNeedsWAL(index);
237 
238  data.node = index->rd_node;
239  data.ntuples = 0;
241 
242  metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
243  metapage = BufferGetPage(metabuffer);
244 
245  /*
246  * An insertion to the pending list could logically belong anywhere in the
247  * tree, so it conflicts with all serializable scans. All scans acquire a
248  * predicate lock on the metabuffer to represent that.
249  */
251 
252  if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
253  {
254  /*
255  * Total size is greater than one page => make sublist
256  */
257  separateList = true;
258  }
259  else
260  {
261  LockBuffer(metabuffer, GIN_EXCLUSIVE);
262  metadata = GinPageGetMeta(metapage);
263 
264  if (metadata->head == InvalidBlockNumber ||
265  collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
266  {
267  /*
268  * Pending list is empty or total size is greater than freespace
269  * on tail page => make sublist
270  *
271  * We unlock metabuffer to keep high concurrency
272  */
273  separateList = true;
274  LockBuffer(metabuffer, GIN_UNLOCK);
275  }
276  }
277 
278  if (separateList)
279  {
280  /*
281  * We should make sublist separately and append it to the tail
282  */
283  GinMetaPageData sublist;
284 
285  memset(&sublist, 0, sizeof(GinMetaPageData));
286  makeSublist(index, collector->tuples, collector->ntuples, &sublist);
287 
288  if (needWal)
289  XLogBeginInsert();
290 
291  /*
292  * metapage was unlocked, see above
293  */
294  LockBuffer(metabuffer, GIN_EXCLUSIVE);
295  metadata = GinPageGetMeta(metapage);
296 
297  if (metadata->head == InvalidBlockNumber)
298  {
299  /*
300  * Main list is empty, so just insert sublist as main list
301  */
303 
304  metadata->head = sublist.head;
305  metadata->tail = sublist.tail;
306  metadata->tailFreeSize = sublist.tailFreeSize;
307 
308  metadata->nPendingPages = sublist.nPendingPages;
309  metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
310  }
311  else
312  {
313  /*
314  * Merge lists
315  */
316  data.prevTail = metadata->tail;
317  data.newRightlink = sublist.head;
318 
319  buffer = ReadBuffer(index, metadata->tail);
320  LockBuffer(buffer, GIN_EXCLUSIVE);
321  page = BufferGetPage(buffer);
322 
323  Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
324 
326 
327  GinPageGetOpaque(page)->rightlink = sublist.head;
328 
329  MarkBufferDirty(buffer);
330 
331  metadata->tail = sublist.tail;
332  metadata->tailFreeSize = sublist.tailFreeSize;
333 
334  metadata->nPendingPages += sublist.nPendingPages;
335  metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
336 
337  if (needWal)
339  }
340  }
341  else
342  {
343  /*
344  * Insert into tail page. Metapage is already locked
345  */
346  OffsetNumber l,
347  off;
348  int i,
349  tupsize;
350  char *ptr;
351  char *collectordata;
352 
353  buffer = ReadBuffer(index, metadata->tail);
354  LockBuffer(buffer, GIN_EXCLUSIVE);
355  page = BufferGetPage(buffer);
356 
357  off = (PageIsEmpty(page)) ? FirstOffsetNumber :
359 
360  collectordata = ptr = (char *) palloc(collector->sumsize);
361 
362  data.ntuples = collector->ntuples;
363 
364  if (needWal)
365  XLogBeginInsert();
366 
368 
369  /*
370  * Increase counter of heap tuples
371  */
372  Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
373  GinPageGetOpaque(page)->maxoff++;
374  metadata->nPendingHeapTuples++;
375 
376  for (i = 0; i < collector->ntuples; i++)
377  {
378  tupsize = IndexTupleSize(collector->tuples[i]);
379  l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
380 
381  if (l == InvalidOffsetNumber)
382  elog(ERROR, "failed to add item to index page in \"%s\"",
383  RelationGetRelationName(index));
384 
385  memcpy(ptr, collector->tuples[i], tupsize);
386  ptr += tupsize;
387 
388  off++;
389  }
390 
391  Assert((ptr - collectordata) <= collector->sumsize);
392  if (needWal)
393  {
395  XLogRegisterBufData(1, collectordata, collector->sumsize);
396  }
397 
398  metadata->tailFreeSize = PageGetExactFreeSpace(page);
399 
400  MarkBufferDirty(buffer);
401  }
402 
403  /*
404  * Set pd_lower just past the end of the metadata. This is essential,
405  * because without doing so, metadata will be lost if xlog.c compresses
406  * the page. (We must do this here because pre-v11 versions of PG did not
407  * set the metapage's pd_lower correctly, so a pg_upgraded index might
408  * contain the wrong value.)
409  */
410  ((PageHeader) metapage)->pd_lower =
411  ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
412 
413  /*
414  * Write metabuffer, make xlog entry
415  */
416  MarkBufferDirty(metabuffer);
417 
418  if (needWal)
419  {
420  XLogRecPtr recptr;
421 
422  memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
423 
425  XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
426 
427  recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
428  PageSetLSN(metapage, recptr);
429 
430  if (buffer != InvalidBuffer)
431  {
432  PageSetLSN(page, recptr);
433  }
434  }
435 
436  if (buffer != InvalidBuffer)
437  UnlockReleaseBuffer(buffer);
438 
439  /*
440  * Force pending list cleanup when it becomes too long. And,
441  * ginInsertCleanup could take significant amount of time, so we prefer to
442  * call it when it can do all the work in a single collection cycle. In
443  * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
444  * while pending list is still small enough to fit into
445  * gin_pending_list_limit.
446  *
447  * ginInsertCleanup() should not be called inside our CRIT_SECTION.
448  */
449  cleanupSize = GinGetPendingListCleanupSize(index);
450  if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
451  needCleanup = true;
452 
453  UnlockReleaseBuffer(metabuffer);
454 
456 
457  /*
458  * Since it could contend with concurrent cleanup process we cleanup
459  * pending list not forcibly.
460  */
461  if (needCleanup)
462  ginInsertCleanup(ginstate, false, true, false, NULL);
463 }
464 
465 /*
466  * Create temporary index tuples for a single indexable item (one index column
467  * for the heap tuple specified by ht_ctid), and append them to the array
468  * in *collector. They will subsequently be written out using
469  * ginHeapTupleFastInsert. Note that to guarantee consistent state, all
470  * temp tuples for a given heap tuple must be written in one call to
471  * ginHeapTupleFastInsert.
472  */
473 void
475  GinTupleCollector *collector,
476  OffsetNumber attnum, Datum value, bool isNull,
477  ItemPointer ht_ctid)
478 {
479  Datum *entries;
481  int32 i,
482  nentries;
483 
484  /*
485  * Extract the key values that need to be inserted in the index
486  */
487  entries = ginExtractEntries(ginstate, attnum, value, isNull,
488  &nentries, &categories);
489 
490  /*
491  * Protect against integer overflow in allocation calculations
492  */
493  if (nentries < 0 ||
494  collector->ntuples + nentries > MaxAllocSize / sizeof(IndexTuple))
495  elog(ERROR, "too many entries for GIN index");
496 
497  /*
498  * Allocate/reallocate memory for storing collected tuples
499  */
500  if (collector->tuples == NULL)
501  {
502  /*
503  * Determine the number of elements to allocate in the tuples array
504  * initially. Make it a power of 2 to avoid wasting memory when
505  * resizing (since palloc likes powers of 2).
506  */
507  collector->lentuples = pg_nextpower2_32(Max(16, nentries));
508  collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
509  }
510  else if (collector->lentuples < collector->ntuples + nentries)
511  {
512  /*
513  * Advance lentuples to the next suitable power of 2. This won't
514  * overflow, though we could get to a value that exceeds
515  * MaxAllocSize/sizeof(IndexTuple), causing an error in repalloc.
516  */
517  collector->lentuples = pg_nextpower2_32(collector->ntuples + nentries);
518  collector->tuples = (IndexTuple *) repalloc(collector->tuples,
519  sizeof(IndexTuple) * collector->lentuples);
520  }
521 
522  /*
523  * Build an index tuple for each key value, and add to array. In pending
524  * tuples we just stick the heap TID into t_tid.
525  */
526  for (i = 0; i < nentries; i++)
527  {
528  IndexTuple itup;
529 
530  itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
531  NULL, 0, 0, true);
532  itup->t_tid = *ht_ctid;
533  collector->tuples[collector->ntuples++] = itup;
534  collector->sumsize += IndexTupleSize(itup);
535  }
536 }
537 
538 /*
539  * Deletes pending list pages up to (not including) newHead page.
540  * If newHead == InvalidBlockNumber then function drops the whole list.
541  *
542  * metapage is pinned and exclusive-locked throughout this function.
543  */
544 static void
546  bool fill_fsm, IndexBulkDeleteResult *stats)
547 {
548  Page metapage;
549  GinMetaPageData *metadata;
550  BlockNumber blknoToDelete;
551 
552  metapage = BufferGetPage(metabuffer);
553  metadata = GinPageGetMeta(metapage);
554  blknoToDelete = metadata->head;
555 
556  do
557  {
558  Page page;
559  int i;
560  int64 nDeletedHeapTuples = 0;
562  Buffer buffers[GIN_NDELETE_AT_ONCE];
563  BlockNumber freespace[GIN_NDELETE_AT_ONCE];
564 
565  data.ndeleted = 0;
566  while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
567  {
568  freespace[data.ndeleted] = blknoToDelete;
569  buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
570  LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
571  page = BufferGetPage(buffers[data.ndeleted]);
572 
573  data.ndeleted++;
574 
575  Assert(!GinPageIsDeleted(page));
576 
577  nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
578  blknoToDelete = GinPageGetOpaque(page)->rightlink;
579  }
580 
581  if (stats)
582  stats->pages_deleted += data.ndeleted;
583 
584  /*
585  * This operation touches an unusually large number of pages, so
586  * prepare the XLogInsert machinery for that before entering the
587  * critical section.
588  */
589  if (RelationNeedsWAL(index))
591 
593 
594  metadata->head = blknoToDelete;
595 
596  Assert(metadata->nPendingPages >= data.ndeleted);
597  metadata->nPendingPages -= data.ndeleted;
598  Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
599  metadata->nPendingHeapTuples -= nDeletedHeapTuples;
600 
601  if (blknoToDelete == InvalidBlockNumber)
602  {
603  metadata->tail = InvalidBlockNumber;
604  metadata->tailFreeSize = 0;
605  metadata->nPendingPages = 0;
606  metadata->nPendingHeapTuples = 0;
607  }
608 
609  /*
610  * Set pd_lower just past the end of the metadata. This is essential,
611  * because without doing so, metadata will be lost if xlog.c
612  * compresses the page. (We must do this here because pre-v11
613  * versions of PG did not set the metapage's pd_lower correctly, so a
614  * pg_upgraded index might contain the wrong value.)
615  */
616  ((PageHeader) metapage)->pd_lower =
617  ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
618 
619  MarkBufferDirty(metabuffer);
620 
621  for (i = 0; i < data.ndeleted; i++)
622  {
623  page = BufferGetPage(buffers[i]);
624  GinPageGetOpaque(page)->flags = GIN_DELETED;
625  MarkBufferDirty(buffers[i]);
626  }
627 
628  if (RelationNeedsWAL(index))
629  {
630  XLogRecPtr recptr;
631 
632  XLogBeginInsert();
633  XLogRegisterBuffer(0, metabuffer,
635  for (i = 0; i < data.ndeleted; i++)
636  XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
637 
638  memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
639 
640  XLogRegisterData((char *) &data,
641  sizeof(ginxlogDeleteListPages));
642 
643  recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
644  PageSetLSN(metapage, recptr);
645 
646  for (i = 0; i < data.ndeleted; i++)
647  {
648  page = BufferGetPage(buffers[i]);
649  PageSetLSN(page, recptr);
650  }
651  }
652 
653  for (i = 0; i < data.ndeleted; i++)
654  UnlockReleaseBuffer(buffers[i]);
655 
657 
658  for (i = 0; fill_fsm && i < data.ndeleted; i++)
659  RecordFreeIndexPage(index, freespace[i]);
660 
661  } while (blknoToDelete != newHead);
662 }
663 
664 /* Initialize empty KeyArray */
665 static void
667 {
668  keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
669  keys->categories = (GinNullCategory *)
670  palloc(sizeof(GinNullCategory) * maxvalues);
671  keys->nvalues = 0;
672  keys->maxvalues = maxvalues;
673 }
674 
675 /* Add datum to KeyArray, resizing if needed */
676 static void
678 {
679  if (keys->nvalues >= keys->maxvalues)
680  {
681  keys->maxvalues *= 2;
682  keys->keys = (Datum *)
683  repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
684  keys->categories = (GinNullCategory *)
685  repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
686  }
687 
688  keys->keys[keys->nvalues] = datum;
689  keys->categories[keys->nvalues] = category;
690  keys->nvalues++;
691 }
692 
693 /*
694  * Collect data from a pending-list page in preparation for insertion into
695  * the main index.
696  *
697  * Go through all tuples >= startoff on page and collect values in accum
698  *
699  * Note that ka is just workspace --- it does not carry any state across
700  * calls.
701  */
702 static void
704  Page page, OffsetNumber startoff)
705 {
706  ItemPointerData heapptr;
707  OffsetNumber i,
708  maxoff;
709  OffsetNumber attrnum;
710 
711  /* reset *ka to empty */
712  ka->nvalues = 0;
713 
714  maxoff = PageGetMaxOffsetNumber(page);
715  Assert(maxoff >= FirstOffsetNumber);
716  ItemPointerSetInvalid(&heapptr);
717  attrnum = 0;
718 
719  for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
720  {
721  IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
722  OffsetNumber curattnum;
723  Datum curkey;
724  GinNullCategory curcategory;
725 
726  /* Check for change of heap TID or attnum */
727  curattnum = gintuple_get_attrnum(accum->ginstate, itup);
728 
729  if (!ItemPointerIsValid(&heapptr))
730  {
731  heapptr = itup->t_tid;
732  attrnum = curattnum;
733  }
734  else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
735  curattnum == attrnum))
736  {
737  /*
738  * ginInsertBAEntries can insert several datums per call, but only
739  * for one heap tuple and one column. So call it at a boundary,
740  * and reset ka.
741  */
742  ginInsertBAEntries(accum, &heapptr, attrnum,
743  ka->keys, ka->categories, ka->nvalues);
744  ka->nvalues = 0;
745  heapptr = itup->t_tid;
746  attrnum = curattnum;
747  }
748 
749  /* Add key to KeyArray */
750  curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
751  addDatum(ka, curkey, curcategory);
752  }
753 
754  /* Dump out all remaining keys */
755  ginInsertBAEntries(accum, &heapptr, attrnum,
756  ka->keys, ka->categories, ka->nvalues);
757 }
758 
759 /*
760  * Move tuples from pending pages into regular GIN structure.
761  *
762  * On first glance it looks completely not crash-safe. But if we crash
763  * after posting entries to the main index and before removing them from the
764  * pending list, it's okay because when we redo the posting later on, nothing
765  * bad will happen.
766  *
767  * fill_fsm indicates that ginInsertCleanup should add deleted pages
768  * to FSM otherwise caller is responsible to put deleted pages into
769  * FSM.
770  *
771  * If stats isn't null, we count deleted pending pages into the counts.
772  */
773 void
774 ginInsertCleanup(GinState *ginstate, bool full_clean,
775  bool fill_fsm, bool forceCleanup,
776  IndexBulkDeleteResult *stats)
777 {
778  Relation index = ginstate->index;
779  Buffer metabuffer,
780  buffer;
781  Page metapage,
782  page;
783  GinMetaPageData *metadata;
785  oldCtx;
786  BuildAccumulator accum;
787  KeyArray datums;
788  BlockNumber blkno,
789  blknoFinish;
790  bool cleanupFinish = false;
791  bool fsm_vac = false;
792  Size workMemory;
793 
794  /*
795  * We would like to prevent concurrent cleanup process. For that we will
796  * lock metapage in exclusive mode using LockPage() call. Nobody other
797  * will use that lock for metapage, so we keep possibility of concurrent
798  * insertion into pending list
799  */
800 
801  if (forceCleanup)
802  {
803  /*
804  * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
805  * and we would like to wait concurrent cleanup to finish.
806  */
808  workMemory =
811  }
812  else
813  {
814  /*
815  * We are called from regular insert and if we see concurrent cleanup
816  * just exit in hope that concurrent process will clean up pending
817  * list.
818  */
820  return;
821  workMemory = work_mem;
822  }
823 
824  metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
825  LockBuffer(metabuffer, GIN_SHARE);
826  metapage = BufferGetPage(metabuffer);
827  metadata = GinPageGetMeta(metapage);
828 
829  if (metadata->head == InvalidBlockNumber)
830  {
831  /* Nothing to do */
832  UnlockReleaseBuffer(metabuffer);
834  return;
835  }
836 
837  /*
838  * Remember a tail page to prevent infinite cleanup if other backends add
839  * new tuples faster than we can cleanup.
840  */
841  blknoFinish = metadata->tail;
842 
843  /*
844  * Read and lock head of pending list
845  */
846  blkno = metadata->head;
847  buffer = ReadBuffer(index, blkno);
848  LockBuffer(buffer, GIN_SHARE);
849  page = BufferGetPage(buffer);
850 
851  LockBuffer(metabuffer, GIN_UNLOCK);
852 
853  /*
854  * Initialize. All temporary space will be in opCtx
855  */
857  "GIN insert cleanup temporary context",
859 
860  oldCtx = MemoryContextSwitchTo(opCtx);
861 
862  initKeyArray(&datums, 128);
863  ginInitBA(&accum);
864  accum.ginstate = ginstate;
865 
866  /*
867  * At the top of this loop, we have pin and lock on the current page of
868  * the pending list. However, we'll release that before exiting the loop.
869  * Note we also have pin but not lock on the metapage.
870  */
871  for (;;)
872  {
873  Assert(!GinPageIsDeleted(page));
874 
875  /*
876  * Are we walk through the page which as we remember was a tail when
877  * we start our cleanup? But if caller asks us to clean up whole
878  * pending list then ignore old tail, we will work until list becomes
879  * empty.
880  */
881  if (blkno == blknoFinish && full_clean == false)
882  cleanupFinish = true;
883 
884  /*
885  * read page's datums into accum
886  */
887  processPendingPage(&accum, &datums, page, FirstOffsetNumber);
888 
890 
891  /*
892  * Is it time to flush memory to disk? Flush if we are at the end of
893  * the pending list, or if we have a full row and memory is getting
894  * full.
895  */
896  if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
897  (GinPageHasFullRow(page) &&
898  (accum.allocatedMemory >= workMemory * 1024L)))
899  {
901  uint32 nlist;
902  Datum key;
903  GinNullCategory category;
904  OffsetNumber maxoff,
905  attnum;
906 
907  /*
908  * Unlock current page to increase performance. Changes of page
909  * will be checked later by comparing maxoff after completion of
910  * memory flush.
911  */
912  maxoff = PageGetMaxOffsetNumber(page);
913  LockBuffer(buffer, GIN_UNLOCK);
914 
915  /*
916  * Moving collected data into regular structure can take
917  * significant amount of time - so, run it without locking pending
918  * list.
919  */
920  ginBeginBAScan(&accum);
921  while ((list = ginGetBAEntry(&accum,
922  &attnum, &key, &category, &nlist)) != NULL)
923  {
924  ginEntryInsert(ginstate, attnum, key, category,
925  list, nlist, NULL);
927  }
928 
929  /*
930  * Lock the whole list to remove pages
931  */
932  LockBuffer(metabuffer, GIN_EXCLUSIVE);
933  LockBuffer(buffer, GIN_SHARE);
934 
935  Assert(!GinPageIsDeleted(page));
936 
937  /*
938  * While we left the page unlocked, more stuff might have gotten
939  * added to it. If so, process those entries immediately. There
940  * shouldn't be very many, so we don't worry about the fact that
941  * we're doing this with exclusive lock. Insertion algorithm
942  * guarantees that inserted row(s) will not continue on next page.
943  * NOTE: intentionally no vacuum_delay_point in this loop.
944  */
945  if (PageGetMaxOffsetNumber(page) != maxoff)
946  {
947  ginInitBA(&accum);
948  processPendingPage(&accum, &datums, page, maxoff + 1);
949 
950  ginBeginBAScan(&accum);
951  while ((list = ginGetBAEntry(&accum,
952  &attnum, &key, &category, &nlist)) != NULL)
953  ginEntryInsert(ginstate, attnum, key, category,
954  list, nlist, NULL);
955  }
956 
957  /*
958  * Remember next page - it will become the new list head
959  */
960  blkno = GinPageGetOpaque(page)->rightlink;
961  UnlockReleaseBuffer(buffer); /* shiftList will do exclusive
962  * locking */
963 
964  /*
965  * remove read pages from pending list, at this point all content
966  * of read pages is in regular structure
967  */
968  shiftList(index, metabuffer, blkno, fill_fsm, stats);
969 
970  /* At this point, some pending pages have been freed up */
971  fsm_vac = true;
972 
973  Assert(blkno == metadata->head);
974  LockBuffer(metabuffer, GIN_UNLOCK);
975 
976  /*
977  * if we removed the whole pending list or we cleanup tail (which
978  * we remembered on start our cleanup process) then just exit
979  */
980  if (blkno == InvalidBlockNumber || cleanupFinish)
981  break;
982 
983  /*
984  * release memory used so far and reinit state
985  */
986  MemoryContextReset(opCtx);
987  initKeyArray(&datums, datums.maxvalues);
988  ginInitBA(&accum);
989  }
990  else
991  {
992  blkno = GinPageGetOpaque(page)->rightlink;
993  UnlockReleaseBuffer(buffer);
994  }
995 
996  /*
997  * Read next page in pending list
998  */
1000  buffer = ReadBuffer(index, blkno);
1001  LockBuffer(buffer, GIN_SHARE);
1002  page = BufferGetPage(buffer);
1003  }
1004 
1006  ReleaseBuffer(metabuffer);
1007 
1008  /*
1009  * As pending list pages can have a high churn rate, it is desirable to
1010  * recycle them immediately to the FreeSpaceMap when ordinary backends
1011  * clean the list.
1012  */
1013  if (fsm_vac && fill_fsm)
1014  IndexFreeSpaceMapVacuum(index);
1015 
1016  /* Clean up temporary space */
1017  MemoryContextSwitchTo(oldCtx);
1018  MemoryContextDelete(opCtx);
1019 }
1020 
1021 /*
1022  * SQL-callable function to clean the insert pending list
1023  */
1024 Datum
1026 {
1027  Oid indexoid = PG_GETARG_OID(0);
1028  Relation indexRel = index_open(indexoid, RowExclusiveLock);
1029  IndexBulkDeleteResult stats;
1030  GinState ginstate;
1031 
1032  if (RecoveryInProgress())
1033  ereport(ERROR,
1034  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1035  errmsg("recovery is in progress"),
1036  errhint("GIN pending list cannot be cleaned up during recovery.")));
1037 
1038  /* Must be a GIN index */
1039  if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1040  indexRel->rd_rel->relam != GIN_AM_OID)
1041  ereport(ERROR,
1042  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1043  errmsg("\"%s\" is not a GIN index",
1044  RelationGetRelationName(indexRel))));
1045 
1046  /*
1047  * Reject attempts to read non-local temporary relations; we would be
1048  * likely to get wrong data since we have no visibility into the owning
1049  * session's local buffers.
1050  */
1051  if (RELATION_IS_OTHER_TEMP(indexRel))
1052  ereport(ERROR,
1053  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1054  errmsg("cannot access temporary indexes of other sessions")));
1055 
1056  /* User must own the index (comparable to privileges needed for VACUUM) */
1057  if (!pg_class_ownercheck(indexoid, GetUserId()))
1059  RelationGetRelationName(indexRel));
1060 
1061  memset(&stats, 0, sizeof(stats));
1062  initGinState(&ginstate, indexRel);
1063  ginInsertCleanup(&ginstate, true, true, true, &stats);
1064 
1065  index_close(indexRel, RowExclusiveLock);
1066 
1067  PG_RETURN_INT64((int64) stats.pages_deleted);
1068 }
#define GinPageHasFullRow(page)
Definition: ginblock.h:120
int autovacuum_work_mem
Definition: autovacuum.c:116
#define ItemPointerIsValid(pointer)
Definition: itemptr.h:82
BlockNumber prevTail
Definition: ginxlog.h:172
#define GIN_UNLOCK
Definition: gin_private.h:48
void XLogRegisterBufData(uint8 block_id, char *data, int len)
Definition: xloginsert.c:368
#define GIN_DELETED
Definition: ginblock.h:43
Buffer GinNewBuffer(Relation index)
Definition: ginutil.c:298
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
#define AllocSetContextCreate
Definition: memutils.h:170
#define PageIsEmpty(page)
Definition: bufpage.h:222
RelFileNode node
Definition: ginxlog.h:170
void ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
Definition: ginfast.c:219
int errhint(const char *fmt,...)
Definition: elog.c:1149
Relation index
Definition: gin_private.h:58
BlockNumber rightlink
Definition: ginxlog.h:184
Oid GetUserId(void)
Definition: miscinit.c:476
#define PG_RETURN_INT64(x)
Definition: fmgr.h:368
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:1471
#define ExclusiveLock
Definition: lockdefs.h:44
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:220
ItemPointerData t_tid
Definition: itup.h:37
#define END_CRIT_SECTION()
Definition: miscadmin.h:134
static MemoryContext opCtx
Definition: ginxlog.c:22
#define GinListPageSize
Definition: ginblock.h:328
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Pointer Item
Definition: item.h:17
static void addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
Definition: ginfast.c:677
#define InvalidBuffer
Definition: buf.h:25
#define REGBUF_WILL_INIT
Definition: xloginsert.h:33
#define GIN_NDELETE_AT_ONCE
Definition: ginxlog.h:202
#define START_CRIT_SECTION()
Definition: miscadmin.h:132
int errcode(int sqlerrcode)
Definition: elog.c:691
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:416
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:137
uint32 BlockNumber
Definition: block.h:31
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3513
#define GinPageGetOpaque(page)
Definition: ginblock.h:111
#define GIN_METAPAGE_BLKNO
Definition: ginblock.h:52
static int32 writeListPage(Relation index, Buffer buffer, IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
Definition: ginfast.c:59
Form_pg_class rd_rel
Definition: rel.h:110
unsigned int Oid
Definition: postgres_ext.h:31
bool RecoveryInProgress(void)
Definition: xlog.c:8070
#define PageGetMaxOffsetNumber(page)
Definition: bufpage.h:357
static void makeSublist(Relation index, IndexTuple *tuples, int32 ntuples, GinMetaPageData *res)
Definition: ginfast.c:145
signed int int32
Definition: c.h:417
uint16 OffsetNumber
Definition: off.h:24
#define XLOG_GIN_INSERT_LISTPAGE
Definition: ginxlog.h:180
static void initKeyArray(KeyArray *keys, int32 maxvalues)
Definition: ginfast.c:666
Definition: type.h:89
int64 nPendingHeapTuples
Definition: ginblock.h:75
GinMetaPageData metadata
Definition: ginxlog.h:205
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3294
char data[BLCKSZ]
Definition: c.h:1137
void ginEntryInsert(GinState *ginstate, OffsetNumber attnum, Datum key, GinNullCategory category, ItemPointerData *items, uint32 nitem, GinStatsData *buildStats)
Definition: gininsert.c:179
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3536
#define ERROR
Definition: elog.h:43
signed char GinNullCategory
Definition: ginblock.h:207
#define XLOG_GIN_UPDATE_META_PAGE
Definition: ginxlog.h:162
BlockNumber head
Definition: ginblock.h:62
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
void LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
Definition: lmgr.c:485
OffsetNumber gintuple_get_attrnum(GinState *ginstate, IndexTuple tuple)
Definition: ginutil.c:224
#define GIN_LIST
Definition: ginblock.h:45
#define PG_GETARG_OID(n)
Definition: fmgr.h:275
BlockNumber tail
Definition: ginblock.h:63
#define FirstOffsetNumber
Definition: off.h:27
#define RowExclusiveLock
Definition: lockdefs.h:38
IndexTupleData * IndexTuple
Definition: itup.h:53
#define REGBUF_STANDARD
Definition: xloginsert.h:35
void initGinState(GinState *state, Relation index)
Definition: ginutil.c:95
#define GinGetPendingListCleanupSize(relation)
Definition: gin_private.h:38
int32 nvalues
Definition: ginfast.c:48
static uint32 pg_nextpower2_32(uint32 num)
Definition: pg_bitutils.h:146
#define RelationGetRelationName(relation)
Definition: rel.h:491
unsigned int uint32
Definition: c.h:429
struct ItemIdData ItemIdData
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
GinState * ginstate
Definition: gin_private.h:430
Datum gintuple_get_key(GinState *ginstate, IndexTuple tuple, GinNullCategory *category)
Definition: ginutil.c:257
BlockNumber pages_deleted
Definition: genam.h:79
#define BufferGetPage(buffer)
Definition: bufmgr.h:169
bool IsAutoVacuumWorkerProcess(void)
Definition: autovacuum.c:3325
BlockNumber newRightlink
Definition: ginxlog.h:173
#define GIN_SHARE
Definition: gin_private.h:49
#define GinPageSetFullRow(page)
Definition: ginblock.h:121
#define MaxAllocSize
Definition: memutils.h:40
int32 maxvalues
Definition: ginfast.c:49
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:235
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:330
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:422
Datum * keys
Definition: ginfast.c:46
IndexTuple * tuples
Definition: gin_private.h:452
static void processPendingPage(BuildAccumulator *accum, KeyArray *ka, Page page, OffsetNumber startoff)
Definition: ginfast.c:703
#define GinPageIsDeleted(page)
Definition: ginblock.h:125
uintptr_t Datum
Definition: postgres.h:367
#define GIN_EXCLUSIVE
Definition: gin_private.h:50
void ginInsertBAEntries(BuildAccumulator *accum, ItemPointer heapptr, OffsetNumber attnum, Datum *entries, GinNullCategory *categories, int32 nentries)
Definition: ginbulk.c:210
void LockBuffer(Buffer buffer, int mode)
Definition: bufmgr.c:3752
int work_mem
Definition: globals.c:121
void CheckForSerializableConflictIn(Relation relation, ItemPointer tid, BlockNumber blkno)
Definition: predicate.c:4375
#define InvalidOffsetNumber
Definition: off.h:26
int16 attnum
Definition: pg_attribute.h:79
static struct @143 value
#define ereport(elevel,...)
Definition: elog.h:155
int maintenance_work_mem
Definition: globals.c:123
void ginInsertCleanup(GinState *ginstate, bool full_clean, bool fill_fsm, bool forceCleanup, IndexBulkDeleteResult *stats)
Definition: ginfast.c:774
GinMetaPageData metadata
Definition: ginxlog.h:171
RelFileNode rd_node
Definition: rel.h:55
#define Max(x, y)
Definition: c.h:976
PageHeaderData * PageHeader
Definition: bufpage.h:166
uint32 tailFreeSize
Definition: ginblock.h:68
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:800
static void shiftList(Relation index, Buffer metabuffer, BlockNumber newHead, bool fill_fsm, IndexBulkDeleteResult *stats)
Definition: ginfast.c:545
IndexTuple GinFormTuple(GinState *ginstate, OffsetNumber attnum, Datum key, GinNullCategory category, Pointer data, Size dataSize, int nipd, bool errorTooBig)
Definition: ginentrypage.c:45
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:594
void IndexFreeSpaceMapVacuum(Relation rel)
Definition: indexfsm.c:71
bool pg_class_ownercheck(Oid class_oid, Oid roleid)
Definition: aclchk.c:4687
Buffer ReadBuffer(Relation reln, BlockNumber blockNum)
Definition: bufmgr.c:607
#define OffsetNumberNext(offsetNumber)
Definition: off.h:52
size_t Size
Definition: c.h:528
#define InvalidBlockNumber
Definition: block.h:33
void XLogEnsureRecordSpace(int max_block_id, int ndatas)
Definition: xloginsert.c:149
void GinInitBuffer(Buffer b, uint32 f)
Definition: ginutil.c:357
ItemPointerData * ginGetBAEntry(BuildAccumulator *accum, OffsetNumber *attnum, Datum *key, GinNullCategory *category, uint32 *n)
Definition: ginbulk.c:268
#define MAXALIGN(LEN)
Definition: c.h:753
#define RelationNeedsWAL(relation)
Definition: rel.h:563
void UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
Definition: lmgr.c:520
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
Definition: itemptr.c:29
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1070
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:158
Size PageGetExactFreeSpace(Page page)
Definition: bufpage.c:841
bool ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
Definition: lmgr.c:504
void ginBeginBAScan(BuildAccumulator *accum)
Definition: ginbulk.c:257
Datum gin_clean_pending_list(PG_FUNCTION_ARGS)
Definition: ginfast.c:1025
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:2663
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:172
void * palloc(Size size)
Definition: mcxt.c:950
int errmsg(const char *fmt,...)
Definition: elog.c:902
#define elog(elevel,...)
Definition: elog.h:228
int i
void ginHeapTupleFastCollect(GinState *ginstate, GinTupleCollector *collector, OffsetNumber attnum, Datum value, bool isNull, ItemPointer ht_ctid)
Definition: ginfast.c:474
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define GIN_PAGE_FREESIZE
Definition: ginfast.c:41
GinNullCategory * categories
Definition: ginfast.c:47
#define GinPageGetMeta(p)
Definition: ginblock.h:105
int gin_pending_list_limit
Definition: ginfast.c:39
void vacuum_delay_point(void)
Definition: vacuum.c:2031
void XLogBeginInsert(void)
Definition: xloginsert.c:123
void RecordFreeIndexPage(Relation rel, BlockNumber freeBlock)
Definition: indexfsm.c:52
#define PageSetLSN(page, lsn)
Definition: bufpage.h:368
int Buffer
Definition: buf.h:23
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:132
Datum * ginExtractEntries(GinState *ginstate, OffsetNumber attnum, Datum value, bool isNull, int32 *nentries, GinNullCategory **categories)
Definition: ginutil.c:490
struct KeyArray KeyArray
#define PageGetItem(page, itemId)
Definition: bufpage.h:340
Pointer Page
Definition: bufpage.h:78
#define IndexTupleSize(itup)
Definition: itup.h:71
void ginInitBA(BuildAccumulator *accum)
Definition: ginbulk.c:109
BlockNumber nPendingPages
Definition: ginblock.h:74
#define XLOG_GIN_DELETE_LISTPAGE
Definition: ginxlog.h:194