PostgreSQL Source Code  git master
ginfast.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * ginfast.c
4  * Fast insert routines for the Postgres inverted index access method.
5  * Pending entries are stored in linear list of pages. Later on
6  * (typically during VACUUM), ginInsertCleanup() will be invoked to
7  * transfer pending entries into the regular index structure. This
8  * wins because bulk insertion is much more efficient than retail.
9  *
10  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  * IDENTIFICATION
14  * src/backend/access/gin/ginfast.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "access/gin_private.h"
22 #include "access/ginxlog.h"
23 #include "access/xloginsert.h"
24 #include "access/xlog.h"
25 #include "commands/vacuum.h"
26 #include "catalog/pg_am.h"
27 #include "miscadmin.h"
28 #include "utils/memutils.h"
29 #include "utils/rel.h"
30 #include "utils/acl.h"
31 #include "postmaster/autovacuum.h"
32 #include "storage/indexfsm.h"
33 #include "storage/lmgr.h"
34 #include "utils/builtins.h"
35 
36 /* GUC parameter */
38 
39 #define GIN_PAGE_FREESIZE \
40  ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
41 
42 typedef struct KeyArray
43 {
44  Datum *keys; /* expansible array */
45  GinNullCategory *categories; /* another expansible array */
46  int32 nvalues; /* current number of valid entries */
47  int32 maxvalues; /* allocated size of arrays */
48 } KeyArray;
49 
50 
51 /*
52  * Build a pending-list page from the given array of tuples, and write it out.
53  *
54  * Returns amount of free space left on the page.
55  */
56 static int32
58  IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
59 {
60  Page page = BufferGetPage(buffer);
61  int32 i,
62  freesize,
63  size = 0;
64  OffsetNumber l,
65  off;
66  char *workspace;
67  char *ptr;
68 
69  /* workspace could be a local array; we use palloc for alignment */
70  workspace = palloc(BLCKSZ);
71 
73 
74  GinInitBuffer(buffer, GIN_LIST);
75 
76  off = FirstOffsetNumber;
77  ptr = workspace;
78 
79  for (i = 0; i < ntuples; i++)
80  {
81  int this_size = IndexTupleSize(tuples[i]);
82 
83  memcpy(ptr, tuples[i], this_size);
84  ptr += this_size;
85  size += this_size;
86 
87  l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
88 
89  if (l == InvalidOffsetNumber)
90  elog(ERROR, "failed to add item to index page in \"%s\"",
92 
93  off++;
94  }
95 
96  Assert(size <= BLCKSZ); /* else we overran workspace */
97 
98  GinPageGetOpaque(page)->rightlink = rightlink;
99 
100  /*
101  * tail page may contain only whole row(s) or final part of row placed on
102  * previous pages (a "row" here meaning all the index tuples generated for
103  * one heap tuple)
104  */
105  if (rightlink == InvalidBlockNumber)
106  {
107  GinPageSetFullRow(page);
108  GinPageGetOpaque(page)->maxoff = 1;
109  }
110  else
111  {
112  GinPageGetOpaque(page)->maxoff = 0;
113  }
114 
115  MarkBufferDirty(buffer);
116 
117  if (RelationNeedsWAL(index))
118  {
120  XLogRecPtr recptr;
121 
122  data.rightlink = rightlink;
123  data.ntuples = ntuples;
124 
125  XLogBeginInsert();
126  XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
127 
129  XLogRegisterBufData(0, workspace, size);
130 
131  recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
132  PageSetLSN(page, recptr);
133  }
134 
135  /* get free space before releasing buffer */
136  freesize = PageGetExactFreeSpace(page);
137 
138  UnlockReleaseBuffer(buffer);
139 
141 
142  pfree(workspace);
143 
144  return freesize;
145 }
146 
147 static void
149  GinMetaPageData *res)
150 {
151  Buffer curBuffer = InvalidBuffer;
152  Buffer prevBuffer = InvalidBuffer;
153  int i,
154  size = 0,
155  tupsize;
156  int startTuple = 0;
157 
158  Assert(ntuples > 0);
159 
160  /*
161  * Split tuples into pages
162  */
163  for (i = 0; i < ntuples; i++)
164  {
165  if (curBuffer == InvalidBuffer)
166  {
167  curBuffer = GinNewBuffer(index);
168 
169  if (prevBuffer != InvalidBuffer)
170  {
171  res->nPendingPages++;
172  writeListPage(index, prevBuffer,
173  tuples + startTuple,
174  i - startTuple,
175  BufferGetBlockNumber(curBuffer));
176  }
177  else
178  {
179  res->head = BufferGetBlockNumber(curBuffer);
180  }
181 
182  prevBuffer = curBuffer;
183  startTuple = i;
184  size = 0;
185  }
186 
187  tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
188 
189  if (size + tupsize > GinListPageSize)
190  {
191  /* won't fit, force a new page and reprocess */
192  i--;
193  curBuffer = InvalidBuffer;
194  }
195  else
196  {
197  size += tupsize;
198  }
199  }
200 
201  /*
202  * Write last page
203  */
204  res->tail = BufferGetBlockNumber(curBuffer);
205  res->tailFreeSize = writeListPage(index, curBuffer,
206  tuples + startTuple,
207  ntuples - startTuple,
209  res->nPendingPages++;
210  /* that was only one heap tuple */
211  res->nPendingHeapTuples = 1;
212 }
213 
214 /*
215  * Write the index tuples contained in *collector into the index's
216  * pending list.
217  *
218  * Function guarantees that all these tuples will be inserted consecutively,
219  * preserving order
220  */
221 void
223 {
224  Relation index = ginstate->index;
225  Buffer metabuffer;
226  Page metapage;
227  GinMetaPageData *metadata = NULL;
229  Page page = NULL;
230  ginxlogUpdateMeta data;
231  bool separateList = false;
232  bool needCleanup = false;
233  int cleanupSize;
234  bool needWal;
235 
236  if (collector->ntuples == 0)
237  return;
238 
239  needWal = RelationNeedsWAL(index);
240 
241  data.node = index->rd_node;
242  data.ntuples = 0;
244 
245  metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
246  metapage = BufferGetPage(metabuffer);
247 
248  if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
249  {
250  /*
251  * Total size is greater than one page => make sublist
252  */
253  separateList = true;
254  }
255  else
256  {
257  LockBuffer(metabuffer, GIN_EXCLUSIVE);
258  metadata = GinPageGetMeta(metapage);
259 
260  if (metadata->head == InvalidBlockNumber ||
261  collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
262  {
263  /*
264  * Pending list is empty or total size is greater than freespace
265  * on tail page => make sublist
266  *
267  * We unlock metabuffer to keep high concurrency
268  */
269  separateList = true;
270  LockBuffer(metabuffer, GIN_UNLOCK);
271  }
272  }
273 
274  if (separateList)
275  {
276  /*
277  * We should make sublist separately and append it to the tail
278  */
279  GinMetaPageData sublist;
280 
281  memset(&sublist, 0, sizeof(GinMetaPageData));
282  makeSublist(index, collector->tuples, collector->ntuples, &sublist);
283 
284  if (needWal)
285  XLogBeginInsert();
286 
287  /*
288  * metapage was unlocked, see above
289  */
290  LockBuffer(metabuffer, GIN_EXCLUSIVE);
291  metadata = GinPageGetMeta(metapage);
292 
293  if (metadata->head == InvalidBlockNumber)
294  {
295  /*
296  * Main list is empty, so just insert sublist as main list
297  */
299 
300  metadata->head = sublist.head;
301  metadata->tail = sublist.tail;
302  metadata->tailFreeSize = sublist.tailFreeSize;
303 
304  metadata->nPendingPages = sublist.nPendingPages;
305  metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
306  }
307  else
308  {
309  /*
310  * Merge lists
311  */
312  data.prevTail = metadata->tail;
313  data.newRightlink = sublist.head;
314 
315  buffer = ReadBuffer(index, metadata->tail);
316  LockBuffer(buffer, GIN_EXCLUSIVE);
317  page = BufferGetPage(buffer);
318 
319  Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
320 
322 
323  GinPageGetOpaque(page)->rightlink = sublist.head;
324 
325  MarkBufferDirty(buffer);
326 
327  metadata->tail = sublist.tail;
328  metadata->tailFreeSize = sublist.tailFreeSize;
329 
330  metadata->nPendingPages += sublist.nPendingPages;
331  metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
332 
333  if (needWal)
335  }
336  }
337  else
338  {
339  /*
340  * Insert into tail page. Metapage is already locked
341  */
342  OffsetNumber l,
343  off;
344  int i,
345  tupsize;
346  char *ptr;
347  char *collectordata;
348 
349  buffer = ReadBuffer(index, metadata->tail);
350  LockBuffer(buffer, GIN_EXCLUSIVE);
351  page = BufferGetPage(buffer);
352 
353  off = (PageIsEmpty(page)) ? FirstOffsetNumber :
355 
356  collectordata = ptr = (char *) palloc(collector->sumsize);
357 
358  data.ntuples = collector->ntuples;
359 
360  if (needWal)
361  XLogBeginInsert();
362 
364 
365  /*
366  * Increase counter of heap tuples
367  */
368  Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
369  GinPageGetOpaque(page)->maxoff++;
370  metadata->nPendingHeapTuples++;
371 
372  for (i = 0; i < collector->ntuples; i++)
373  {
374  tupsize = IndexTupleSize(collector->tuples[i]);
375  l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
376 
377  if (l == InvalidOffsetNumber)
378  elog(ERROR, "failed to add item to index page in \"%s\"",
379  RelationGetRelationName(index));
380 
381  memcpy(ptr, collector->tuples[i], tupsize);
382  ptr += tupsize;
383 
384  off++;
385  }
386 
387  Assert((ptr - collectordata) <= collector->sumsize);
388  if (needWal)
389  {
391  XLogRegisterBufData(1, collectordata, collector->sumsize);
392  }
393 
394  metadata->tailFreeSize = PageGetExactFreeSpace(page);
395 
396  MarkBufferDirty(buffer);
397  }
398 
399  /*
400  * Set pd_lower just past the end of the metadata. This is essential,
401  * because without doing so, metadata will be lost if xlog.c compresses
402  * the page. (We must do this here because pre-v11 versions of PG did not
403  * set the metapage's pd_lower correctly, so a pg_upgraded index might
404  * contain the wrong value.)
405  */
406  ((PageHeader) metapage)->pd_lower =
407  ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
408 
409  /*
410  * Write metabuffer, make xlog entry
411  */
412  MarkBufferDirty(metabuffer);
413 
414  if (needWal)
415  {
416  XLogRecPtr recptr;
417 
418  memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
419 
421  XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
422 
423  recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
424  PageSetLSN(metapage, recptr);
425 
426  if (buffer != InvalidBuffer)
427  {
428  PageSetLSN(page, recptr);
429  }
430  }
431 
432  if (buffer != InvalidBuffer)
433  UnlockReleaseBuffer(buffer);
434 
435  /*
436  * Force pending list cleanup when it becomes too long. And,
437  * ginInsertCleanup could take significant amount of time, so we prefer to
438  * call it when it can do all the work in a single collection cycle. In
439  * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
440  * while pending list is still small enough to fit into
441  * gin_pending_list_limit.
442  *
443  * ginInsertCleanup() should not be called inside our CRIT_SECTION.
444  */
445  cleanupSize = GinGetPendingListCleanupSize(index);
446  if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
447  needCleanup = true;
448 
449  UnlockReleaseBuffer(metabuffer);
450 
452 
453  /*
454  * Since it could contend with concurrent cleanup process we cleanup
455  * pending list not forcibly.
456  */
457  if (needCleanup)
458  ginInsertCleanup(ginstate, false, true, false, NULL);
459 }
460 
461 /*
462  * Create temporary index tuples for a single indexable item (one index column
463  * for the heap tuple specified by ht_ctid), and append them to the array
464  * in *collector. They will subsequently be written out using
465  * ginHeapTupleFastInsert. Note that to guarantee consistent state, all
466  * temp tuples for a given heap tuple must be written in one call to
467  * ginHeapTupleFastInsert.
468  */
469 void
471  GinTupleCollector *collector,
472  OffsetNumber attnum, Datum value, bool isNull,
473  ItemPointer ht_ctid)
474 {
475  Datum *entries;
477  int32 i,
478  nentries;
479 
480  /*
481  * Extract the key values that need to be inserted in the index
482  */
483  entries = ginExtractEntries(ginstate, attnum, value, isNull,
484  &nentries, &categories);
485 
486  /*
487  * Allocate/reallocate memory for storing collected tuples
488  */
489  if (collector->tuples == NULL)
490  {
491  collector->lentuples = nentries * ginstate->origTupdesc->natts;
492  collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
493  }
494 
495  while (collector->ntuples + nentries > collector->lentuples)
496  {
497  collector->lentuples *= 2;
498  collector->tuples = (IndexTuple *) repalloc(collector->tuples,
499  sizeof(IndexTuple) * collector->lentuples);
500  }
501 
502  /*
503  * Build an index tuple for each key value, and add to array. In pending
504  * tuples we just stick the heap TID into t_tid.
505  */
506  for (i = 0; i < nentries; i++)
507  {
508  IndexTuple itup;
509 
510  itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
511  NULL, 0, 0, true);
512  itup->t_tid = *ht_ctid;
513  collector->tuples[collector->ntuples++] = itup;
514  collector->sumsize += IndexTupleSize(itup);
515  }
516 }
517 
518 /*
519  * Deletes pending list pages up to (not including) newHead page.
520  * If newHead == InvalidBlockNumber then function drops the whole list.
521  *
522  * metapage is pinned and exclusive-locked throughout this function.
523  */
524 static void
526  bool fill_fsm, IndexBulkDeleteResult *stats)
527 {
528  Page metapage;
529  GinMetaPageData *metadata;
530  BlockNumber blknoToDelete;
531 
532  metapage = BufferGetPage(metabuffer);
533  metadata = GinPageGetMeta(metapage);
534  blknoToDelete = metadata->head;
535 
536  do
537  {
538  Page page;
539  int i;
540  int64 nDeletedHeapTuples = 0;
542  Buffer buffers[GIN_NDELETE_AT_ONCE];
543  BlockNumber freespace[GIN_NDELETE_AT_ONCE];
544 
545  data.ndeleted = 0;
546  while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
547  {
548  freespace[data.ndeleted] = blknoToDelete;
549  buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
550  LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
551  page = BufferGetPage(buffers[data.ndeleted]);
552 
553  data.ndeleted++;
554 
555  Assert(!GinPageIsDeleted(page));
556 
557  nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
558  blknoToDelete = GinPageGetOpaque(page)->rightlink;
559  }
560 
561  if (stats)
562  stats->pages_deleted += data.ndeleted;
563 
564  /*
565  * This operation touches an unusually large number of pages, so
566  * prepare the XLogInsert machinery for that before entering the
567  * critical section.
568  */
569  if (RelationNeedsWAL(index))
571 
573 
574  metadata->head = blknoToDelete;
575 
576  Assert(metadata->nPendingPages >= data.ndeleted);
577  metadata->nPendingPages -= data.ndeleted;
578  Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
579  metadata->nPendingHeapTuples -= nDeletedHeapTuples;
580 
581  if (blknoToDelete == InvalidBlockNumber)
582  {
583  metadata->tail = InvalidBlockNumber;
584  metadata->tailFreeSize = 0;
585  metadata->nPendingPages = 0;
586  metadata->nPendingHeapTuples = 0;
587  }
588 
589  /*
590  * Set pd_lower just past the end of the metadata. This is essential,
591  * because without doing so, metadata will be lost if xlog.c
592  * compresses the page. (We must do this here because pre-v11
593  * versions of PG did not set the metapage's pd_lower correctly, so a
594  * pg_upgraded index might contain the wrong value.)
595  */
596  ((PageHeader) metapage)->pd_lower =
597  ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
598 
599  MarkBufferDirty(metabuffer);
600 
601  for (i = 0; i < data.ndeleted; i++)
602  {
603  page = BufferGetPage(buffers[i]);
604  GinPageGetOpaque(page)->flags = GIN_DELETED;
605  MarkBufferDirty(buffers[i]);
606  }
607 
608  if (RelationNeedsWAL(index))
609  {
610  XLogRecPtr recptr;
611 
612  XLogBeginInsert();
613  XLogRegisterBuffer(0, metabuffer,
615  for (i = 0; i < data.ndeleted; i++)
616  XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
617 
618  memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
619 
620  XLogRegisterData((char *) &data,
621  sizeof(ginxlogDeleteListPages));
622 
623  recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
624  PageSetLSN(metapage, recptr);
625 
626  for (i = 0; i < data.ndeleted; i++)
627  {
628  page = BufferGetPage(buffers[i]);
629  PageSetLSN(page, recptr);
630  }
631  }
632 
633  for (i = 0; i < data.ndeleted; i++)
634  UnlockReleaseBuffer(buffers[i]);
635 
637 
638  for (i = 0; fill_fsm && i < data.ndeleted; i++)
639  RecordFreeIndexPage(index, freespace[i]);
640 
641  } while (blknoToDelete != newHead);
642 }
643 
644 /* Initialize empty KeyArray */
645 static void
647 {
648  keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
649  keys->categories = (GinNullCategory *)
650  palloc(sizeof(GinNullCategory) * maxvalues);
651  keys->nvalues = 0;
652  keys->maxvalues = maxvalues;
653 }
654 
655 /* Add datum to KeyArray, resizing if needed */
656 static void
658 {
659  if (keys->nvalues >= keys->maxvalues)
660  {
661  keys->maxvalues *= 2;
662  keys->keys = (Datum *)
663  repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
664  keys->categories = (GinNullCategory *)
665  repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
666  }
667 
668  keys->keys[keys->nvalues] = datum;
669  keys->categories[keys->nvalues] = category;
670  keys->nvalues++;
671 }
672 
673 /*
674  * Collect data from a pending-list page in preparation for insertion into
675  * the main index.
676  *
677  * Go through all tuples >= startoff on page and collect values in accum
678  *
679  * Note that ka is just workspace --- it does not carry any state across
680  * calls.
681  */
682 static void
684  Page page, OffsetNumber startoff)
685 {
686  ItemPointerData heapptr;
687  OffsetNumber i,
688  maxoff;
689  OffsetNumber attrnum;
690 
691  /* reset *ka to empty */
692  ka->nvalues = 0;
693 
694  maxoff = PageGetMaxOffsetNumber(page);
695  Assert(maxoff >= FirstOffsetNumber);
696  ItemPointerSetInvalid(&heapptr);
697  attrnum = 0;
698 
699  for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
700  {
701  IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
702  OffsetNumber curattnum;
703  Datum curkey;
704  GinNullCategory curcategory;
705 
706  /* Check for change of heap TID or attnum */
707  curattnum = gintuple_get_attrnum(accum->ginstate, itup);
708 
709  if (!ItemPointerIsValid(&heapptr))
710  {
711  heapptr = itup->t_tid;
712  attrnum = curattnum;
713  }
714  else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
715  curattnum == attrnum))
716  {
717  /*
718  * ginInsertBAEntries can insert several datums per call, but only
719  * for one heap tuple and one column. So call it at a boundary,
720  * and reset ka.
721  */
722  ginInsertBAEntries(accum, &heapptr, attrnum,
723  ka->keys, ka->categories, ka->nvalues);
724  ka->nvalues = 0;
725  heapptr = itup->t_tid;
726  attrnum = curattnum;
727  }
728 
729  /* Add key to KeyArray */
730  curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
731  addDatum(ka, curkey, curcategory);
732  }
733 
734  /* Dump out all remaining keys */
735  ginInsertBAEntries(accum, &heapptr, attrnum,
736  ka->keys, ka->categories, ka->nvalues);
737 }
738 
739 /*
740  * Move tuples from pending pages into regular GIN structure.
741  *
742  * On first glance it looks completely not crash-safe. But if we crash
743  * after posting entries to the main index and before removing them from the
744  * pending list, it's okay because when we redo the posting later on, nothing
745  * bad will happen.
746  *
747  * fill_fsm indicates that ginInsertCleanup should add deleted pages
748  * to FSM otherwise caller is responsible to put deleted pages into
749  * FSM.
750  *
751  * If stats isn't null, we count deleted pending pages into the counts.
752  */
753 void
754 ginInsertCleanup(GinState *ginstate, bool full_clean,
755  bool fill_fsm, bool forceCleanup,
756  IndexBulkDeleteResult *stats)
757 {
758  Relation index = ginstate->index;
759  Buffer metabuffer,
760  buffer;
761  Page metapage,
762  page;
763  GinMetaPageData *metadata;
765  oldCtx;
766  BuildAccumulator accum;
767  KeyArray datums;
768  BlockNumber blkno,
769  blknoFinish;
770  bool cleanupFinish = false;
771  bool fsm_vac = false;
772  Size workMemory;
773 
774  /*
775  * We would like to prevent concurrent cleanup process. For that we will
776  * lock metapage in exclusive mode using LockPage() call. Nobody other
777  * will use that lock for metapage, so we keep possibility of concurrent
778  * insertion into pending list
779  */
780 
781  if (forceCleanup)
782  {
783  /*
784  * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
785  * and we would like to wait concurrent cleanup to finish.
786  */
788  workMemory =
791  }
792  else
793  {
794  /*
795  * We are called from regular insert and if we see concurrent cleanup
796  * just exit in hope that concurrent process will clean up pending
797  * list.
798  */
800  return;
801  workMemory = work_mem;
802  }
803 
804  metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
805  LockBuffer(metabuffer, GIN_SHARE);
806  metapage = BufferGetPage(metabuffer);
807  metadata = GinPageGetMeta(metapage);
808 
809  if (metadata->head == InvalidBlockNumber)
810  {
811  /* Nothing to do */
812  UnlockReleaseBuffer(metabuffer);
814  return;
815  }
816 
817  /*
818  * Remember a tail page to prevent infinite cleanup if other backends add
819  * new tuples faster than we can cleanup.
820  */
821  blknoFinish = metadata->tail;
822 
823  /*
824  * Read and lock head of pending list
825  */
826  blkno = metadata->head;
827  buffer = ReadBuffer(index, blkno);
828  LockBuffer(buffer, GIN_SHARE);
829  page = BufferGetPage(buffer);
830 
831  LockBuffer(metabuffer, GIN_UNLOCK);
832 
833  /*
834  * Initialize. All temporary space will be in opCtx
835  */
837  "GIN insert cleanup temporary context",
839 
840  oldCtx = MemoryContextSwitchTo(opCtx);
841 
842  initKeyArray(&datums, 128);
843  ginInitBA(&accum);
844  accum.ginstate = ginstate;
845 
846  /*
847  * At the top of this loop, we have pin and lock on the current page of
848  * the pending list. However, we'll release that before exiting the loop.
849  * Note we also have pin but not lock on the metapage.
850  */
851  for (;;)
852  {
853  Assert(!GinPageIsDeleted(page));
854 
855  /*
856  * Are we walk through the page which as we remember was a tail when
857  * we start our cleanup? But if caller asks us to clean up whole
858  * pending list then ignore old tail, we will work until list becomes
859  * empty.
860  */
861  if (blkno == blknoFinish && full_clean == false)
862  cleanupFinish = true;
863 
864  /*
865  * read page's datums into accum
866  */
867  processPendingPage(&accum, &datums, page, FirstOffsetNumber);
868 
870 
871  /*
872  * Is it time to flush memory to disk? Flush if we are at the end of
873  * the pending list, or if we have a full row and memory is getting
874  * full.
875  */
876  if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
877  (GinPageHasFullRow(page) &&
878  (accum.allocatedMemory >= workMemory * 1024L)))
879  {
881  uint32 nlist;
882  Datum key;
883  GinNullCategory category;
884  OffsetNumber maxoff,
885  attnum;
886 
887  /*
888  * Unlock current page to increase performance. Changes of page
889  * will be checked later by comparing maxoff after completion of
890  * memory flush.
891  */
892  maxoff = PageGetMaxOffsetNumber(page);
893  LockBuffer(buffer, GIN_UNLOCK);
894 
895  /*
896  * Moving collected data into regular structure can take
897  * significant amount of time - so, run it without locking pending
898  * list.
899  */
900  ginBeginBAScan(&accum);
901  while ((list = ginGetBAEntry(&accum,
902  &attnum, &key, &category, &nlist)) != NULL)
903  {
904  ginEntryInsert(ginstate, attnum, key, category,
905  list, nlist, NULL);
907  }
908 
909  /*
910  * Lock the whole list to remove pages
911  */
912  LockBuffer(metabuffer, GIN_EXCLUSIVE);
913  LockBuffer(buffer, GIN_SHARE);
914 
915  Assert(!GinPageIsDeleted(page));
916 
917  /*
918  * While we left the page unlocked, more stuff might have gotten
919  * added to it. If so, process those entries immediately. There
920  * shouldn't be very many, so we don't worry about the fact that
921  * we're doing this with exclusive lock. Insertion algorithm
922  * guarantees that inserted row(s) will not continue on next page.
923  * NOTE: intentionally no vacuum_delay_point in this loop.
924  */
925  if (PageGetMaxOffsetNumber(page) != maxoff)
926  {
927  ginInitBA(&accum);
928  processPendingPage(&accum, &datums, page, maxoff + 1);
929 
930  ginBeginBAScan(&accum);
931  while ((list = ginGetBAEntry(&accum,
932  &attnum, &key, &category, &nlist)) != NULL)
933  ginEntryInsert(ginstate, attnum, key, category,
934  list, nlist, NULL);
935  }
936 
937  /*
938  * Remember next page - it will become the new list head
939  */
940  blkno = GinPageGetOpaque(page)->rightlink;
941  UnlockReleaseBuffer(buffer); /* shiftList will do exclusive
942  * locking */
943 
944  /*
945  * remove read pages from pending list, at this point all content
946  * of read pages is in regular structure
947  */
948  shiftList(index, metabuffer, blkno, fill_fsm, stats);
949 
950  /* At this point, some pending pages have been freed up */
951  fsm_vac = true;
952 
953  Assert(blkno == metadata->head);
954  LockBuffer(metabuffer, GIN_UNLOCK);
955 
956  /*
957  * if we removed the whole pending list or we cleanup tail (which
958  * we remembered on start our cleanup process) then just exit
959  */
960  if (blkno == InvalidBlockNumber || cleanupFinish)
961  break;
962 
963  /*
964  * release memory used so far and reinit state
965  */
966  MemoryContextReset(opCtx);
967  initKeyArray(&datums, datums.maxvalues);
968  ginInitBA(&accum);
969  }
970  else
971  {
972  blkno = GinPageGetOpaque(page)->rightlink;
973  UnlockReleaseBuffer(buffer);
974  }
975 
976  /*
977  * Read next page in pending list
978  */
980  buffer = ReadBuffer(index, blkno);
981  LockBuffer(buffer, GIN_SHARE);
982  page = BufferGetPage(buffer);
983  }
984 
986  ReleaseBuffer(metabuffer);
987 
988  /*
989  * As pending list pages can have a high churn rate, it is desirable to
990  * recycle them immediately to the FreeSpace Map when ordinary backends
991  * clean the list.
992  */
993  if (fsm_vac && fill_fsm)
995 
996  /* Clean up temporary space */
997  MemoryContextSwitchTo(oldCtx);
998  MemoryContextDelete(opCtx);
999 }
1000 
1001 /*
1002  * SQL-callable function to clean the insert pending list
1003  */
1004 Datum
1006 {
1007  Oid indexoid = PG_GETARG_OID(0);
1008  Relation indexRel = index_open(indexoid, AccessShareLock);
1009  IndexBulkDeleteResult stats;
1010  GinState ginstate;
1011 
1012  if (RecoveryInProgress())
1013  ereport(ERROR,
1014  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1015  errmsg("recovery is in progress"),
1016  errhint("GIN pending list cannot be cleaned up during recovery.")));
1017 
1018  /* Must be a GIN index */
1019  if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1020  indexRel->rd_rel->relam != GIN_AM_OID)
1021  ereport(ERROR,
1022  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1023  errmsg("\"%s\" is not a GIN index",
1024  RelationGetRelationName(indexRel))));
1025 
1026  /*
1027  * Reject attempts to read non-local temporary relations; we would be
1028  * likely to get wrong data since we have no visibility into the owning
1029  * session's local buffers.
1030  */
1031  if (RELATION_IS_OTHER_TEMP(indexRel))
1032  ereport(ERROR,
1033  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1034  errmsg("cannot access temporary indexes of other sessions")));
1035 
1036  /* User must own the index (comparable to privileges needed for VACUUM) */
1037  if (!pg_class_ownercheck(indexoid, GetUserId()))
1039  RelationGetRelationName(indexRel));
1040 
1041  memset(&stats, 0, sizeof(stats));
1042  initGinState(&ginstate, indexRel);
1043  ginInsertCleanup(&ginstate, true, true, true, &stats);
1044 
1045  index_close(indexRel, AccessShareLock);
1046 
1047  PG_RETURN_INT64((int64) stats.pages_deleted);
1048 }
#define GinPageHasFullRow(page)
Definition: ginblock.h:118
int autovacuum_work_mem
Definition: autovacuum.c:116
#define ItemPointerIsValid(pointer)
Definition: itemptr.h:60
BlockNumber prevTail
Definition: ginxlog.h:173
#define GIN_UNLOCK
Definition: gin_private.h:43
void XLogRegisterBufData(uint8 block_id, char *data, int len)
Definition: xloginsert.c:361
#define GIN_DELETED
Definition: ginblock.h:41
Buffer GinNewBuffer(Relation index)
Definition: ginutil.c:289
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
#define PageIsEmpty(page)
Definition: bufpage.h:218
RelFileNode node
Definition: ginxlog.h:171
void ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
Definition: ginfast.c:222
int errhint(const char *fmt,...)
Definition: elog.c:987
Relation index
Definition: gin_private.h:53
BlockNumber rightlink
Definition: ginxlog.h:185
Oid GetUserId(void)
Definition: miscinit.c:284
#define PG_RETURN_INT64(x)
Definition: fmgr.h:327
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:1450
#define ExclusiveLock
Definition: lockdefs.h:44
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:213
ItemPointerData t_tid
Definition: itup.h:37
#define END_CRIT_SECTION()
Definition: miscadmin.h:133
static MemoryContext opCtx
Definition: ginxlog.c:22
#define GinListPageSize
Definition: ginblock.h:318
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Pointer Item
Definition: item.h:17
static void addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
Definition: ginfast.c:657
#define AccessShareLock
Definition: lockdefs.h:36
#define InvalidBuffer
Definition: buf.h:25
#define REGBUF_WILL_INIT
Definition: xloginsert.h:32
#define GIN_NDELETE_AT_ONCE
Definition: ginxlog.h:203
#define START_CRIT_SECTION()
Definition: miscadmin.h:131
int errcode(int sqlerrcode)
Definition: elog.c:575
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:412
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:135
uint32 BlockNumber
Definition: block.h:31
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3309
#define GinPageGetOpaque(page)
Definition: ginblock.h:109
#define GIN_METAPAGE_BLKNO
Definition: ginblock.h:50
static int32 writeListPage(Relation index, Buffer buffer, IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
Definition: ginfast.c:57
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
bool RecoveryInProgress(void)
Definition: xlog.c:7929
#define PageGetMaxOffsetNumber(page)
Definition: bufpage.h:353
int natts
Definition: tupdesc.h:79
static void makeSublist(Relation index, IndexTuple *tuples, int32 ntuples, GinMetaPageData *res)
Definition: ginfast.c:148
signed int int32
Definition: c.h:284
#define GIN_AM_OID
Definition: pg_am.h:79
uint16 OffsetNumber
Definition: off.h:24
#define XLOG_GIN_INSERT_LISTPAGE
Definition: ginxlog.h:181
static void initKeyArray(KeyArray *keys, int32 maxvalues)
Definition: ginfast.c:646
Definition: type.h:89
int64 nPendingHeapTuples
Definition: ginblock.h:73
GinMetaPageData metadata
Definition: ginxlog.h:206
void pfree(void *pointer)
Definition: mcxt.c:949
void ginEntryInsert(GinState *ginstate, OffsetNumber attnum, Datum key, GinNullCategory category, ItemPointerData *items, uint32 nitem, GinStatsData *buildStats)
Definition: gininsert.c:177
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3332
#define ERROR
Definition: elog.h:43
signed char GinNullCategory
Definition: ginblock.h:197
#define XLOG_GIN_UPDATE_META_PAGE
Definition: ginxlog.h:163
static struct @121 value
BlockNumber head
Definition: ginblock.h:60
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
void LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
Definition: lmgr.c:400
OffsetNumber gintuple_get_attrnum(GinState *ginstate, IndexTuple tuple)
Definition: ginutil.c:215
#define GIN_LIST
Definition: ginblock.h:43
#define PG_GETARG_OID(n)
Definition: fmgr.h:240
BlockNumber tail
Definition: ginblock.h:61
void aclcheck_error(AclResult aclerr, AclObjectKind objectkind, const char *objectname)
Definition: aclchk.c:3399
#define FirstOffsetNumber
Definition: off.h:27
IndexTupleData * IndexTuple
Definition: itup.h:53
#define REGBUF_STANDARD
Definition: xloginsert.h:34
void initGinState(GinState *state, Relation index)
Definition: ginutil.c:86
#define GinGetPendingListCleanupSize(relation)
Definition: gin_private.h:35
int32 nvalues
Definition: ginfast.c:46
#define RelationGetRelationName(relation)
Definition: rel.h:445
unsigned int uint32
Definition: c.h:296
struct ItemIdData ItemIdData
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
GinState * ginstate
Definition: gin_private.h:407
Datum gintuple_get_key(GinState *ginstate, IndexTuple tuple, GinNullCategory *category)
Definition: ginutil.c:248
BlockNumber pages_deleted
Definition: genam.h:78
#define BufferGetPage(buffer)
Definition: bufmgr.h:160
bool IsAutoVacuumWorkerProcess(void)
Definition: autovacuum.c:3256
#define ereport(elevel, rest)
Definition: elog.h:122
BlockNumber newRightlink
Definition: ginxlog.h:174
#define GIN_SHARE
Definition: gin_private.h:44
#define GinPageSetFullRow(page)
Definition: ginblock.h:119
int32 maxvalues
Definition: ginfast.c:47
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:231
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
Datum * keys
Definition: ginfast.c:44
IndexTuple * tuples
Definition: gin_private.h:429
static void processPendingPage(BuildAccumulator *accum, KeyArray *ka, Page page, OffsetNumber startoff)
Definition: ginfast.c:683
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
#define GinPageIsDeleted(page)
Definition: ginblock.h:123
uintptr_t Datum
Definition: postgres.h:372
#define GIN_EXCLUSIVE
Definition: gin_private.h:45
void ginInsertBAEntries(BuildAccumulator *accum, ItemPointer heapptr, OffsetNumber attnum, Datum *entries, GinNullCategory *categories, int32 nentries)
Definition: ginbulk.c:210
void LockBuffer(Buffer buffer, int mode)
Definition: bufmgr.c:3546
int work_mem
Definition: globals.c:113
#define InvalidOffsetNumber
Definition: off.h:26
int maintenance_work_mem
Definition: globals.c:114
void ginInsertCleanup(GinState *ginstate, bool full_clean, bool fill_fsm, bool forceCleanup, IndexBulkDeleteResult *stats)
Definition: ginfast.c:754
GinMetaPageData metadata
Definition: ginxlog.h:172
RelFileNode rd_node
Definition: rel.h:85
PageHeaderData * PageHeader
Definition: bufpage.h:162
uint32 tailFreeSize
Definition: ginblock.h:66
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:670
static void shiftList(Relation index, Buffer metabuffer, BlockNumber newHead, bool fill_fsm, IndexBulkDeleteResult *stats)
Definition: ginfast.c:525
IndexTuple GinFormTuple(GinState *ginstate, OffsetNumber attnum, Datum key, GinNullCategory category, Pointer data, Size dataSize, int nipd, bool errorTooBig)
Definition: ginentrypage.c:45
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:542
void IndexFreeSpaceMapVacuum(Relation rel)
Definition: indexfsm.c:71
bool pg_class_ownercheck(Oid class_oid, Oid roleid)
Definition: aclchk.c:4546
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:214
Buffer ReadBuffer(Relation reln, BlockNumber blockNum)
Definition: bufmgr.c:594
#define OffsetNumberNext(offsetNumber)
Definition: off.h:53
size_t Size
Definition: c.h:404
#define InvalidBlockNumber
Definition: block.h:33
void XLogEnsureRecordSpace(int max_block_id, int ndatas)
Definition: xloginsert.c:146
void GinInitBuffer(Buffer b, uint32 f)
Definition: ginutil.c:353
ItemPointerData * ginGetBAEntry(BuildAccumulator *accum, OffsetNumber *attnum, Datum *key, GinNullCategory *category, uint32 *n)
Definition: ginbulk.c:268
#define MAXALIGN(LEN)
Definition: c.h:623
#define RelationNeedsWAL(relation)
Definition: rel.h:514
void UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
Definition: lmgr.c:435
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
Definition: itemptr.c:29
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:962
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:176
Size PageGetExactFreeSpace(Page page)
Definition: bufpage.c:629
bool ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
Definition: lmgr.c:419
void ginBeginBAScan(BuildAccumulator *accum)
Definition: ginbulk.c:257
Datum gin_clean_pending_list(PG_FUNCTION_ARGS)
Definition: ginfast.c:1005
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:2605
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:150
void * palloc(Size size)
Definition: mcxt.c:848
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
#define RELKIND_INDEX
Definition: pg_class.h:161
void ginHeapTupleFastCollect(GinState *ginstate, GinTupleCollector *collector, OffsetNumber attnum, Datum value, bool isNull, ItemPointer ht_ctid)
Definition: ginfast.c:470
#define PG_FUNCTION_ARGS
Definition: fmgr.h:158
TupleDesc origTupdesc
Definition: gin_private.h:67
#define GIN_PAGE_FREESIZE
Definition: ginfast.c:39
GinNullCategory * categories
Definition: ginfast.c:45
#define GinPageGetMeta(p)
Definition: ginblock.h:103
#define elog
Definition: elog.h:219
int gin_pending_list_limit
Definition: ginfast.c:37
void vacuum_delay_point(void)
Definition: vacuum.c:1658
void XLogBeginInsert(void)
Definition: xloginsert.c:120
void RecordFreeIndexPage(Relation rel, BlockNumber freeBlock)
Definition: indexfsm.c:52
#define PageSetLSN(page, lsn)
Definition: bufpage.h:364
int Buffer
Definition: buf.h:23
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:151
Datum * ginExtractEntries(GinState *ginstate, OffsetNumber attnum, Datum value, bool isNull, int32 *nentries, GinNullCategory **categories)
Definition: ginutil.c:486
struct KeyArray KeyArray
#define PageGetItem(page, itemId)
Definition: bufpage.h:336
Pointer Page
Definition: bufpage.h:74
#define IndexTupleSize(itup)
Definition: itup.h:70
void ginInitBA(BuildAccumulator *accum)
Definition: ginbulk.c:109
BlockNumber nPendingPages
Definition: ginblock.h:72
#define XLOG_GIN_DELETE_LISTPAGE
Definition: ginxlog.h:195