PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
ginfast.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * ginfast.c
4  * Fast insert routines for the Postgres inverted index access method.
5  * Pending entries are stored in linear list of pages. Later on
6  * (typically during VACUUM), ginInsertCleanup() will be invoked to
7  * transfer pending entries into the regular index structure. This
8  * wins because bulk insertion is much more efficient than retail.
9  *
10  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  * IDENTIFICATION
14  * src/backend/access/gin/ginfast.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "access/gin_private.h"
22 #include "access/ginxlog.h"
23 #include "access/xloginsert.h"
24 #include "access/xlog.h"
25 #include "commands/vacuum.h"
26 #include "catalog/pg_am.h"
27 #include "miscadmin.h"
28 #include "utils/memutils.h"
29 #include "utils/rel.h"
30 #include "utils/acl.h"
31 #include "postmaster/autovacuum.h"
32 #include "storage/indexfsm.h"
33 #include "storage/lmgr.h"
34 #include "utils/builtins.h"
35 
36 /* GUC parameter */
38 
39 #define GIN_PAGE_FREESIZE \
40  ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
41 
42 typedef struct KeyArray
43 {
44  Datum *keys; /* expansible array */
45  GinNullCategory *categories; /* another expansible array */
46  int32 nvalues; /* current number of valid entries */
47  int32 maxvalues; /* allocated size of arrays */
48 } KeyArray;
49 
50 
51 /*
52  * Build a pending-list page from the given array of tuples, and write it out.
53  *
54  * Returns amount of free space left on the page.
55  */
56 static int32
58  IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
59 {
60  Page page = BufferGetPage(buffer);
61  int32 i,
62  freesize,
63  size = 0;
64  OffsetNumber l,
65  off;
66  char *workspace;
67  char *ptr;
68 
69  /* workspace could be a local array; we use palloc for alignment */
70  workspace = palloc(BLCKSZ);
71 
73 
74  GinInitBuffer(buffer, GIN_LIST);
75 
76  off = FirstOffsetNumber;
77  ptr = workspace;
78 
79  for (i = 0; i < ntuples; i++)
80  {
81  int this_size = IndexTupleSize(tuples[i]);
82 
83  memcpy(ptr, tuples[i], this_size);
84  ptr += this_size;
85  size += this_size;
86 
87  l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
88 
89  if (l == InvalidOffsetNumber)
90  elog(ERROR, "failed to add item to index page in \"%s\"",
92 
93  off++;
94  }
95 
96  Assert(size <= BLCKSZ); /* else we overran workspace */
97 
98  GinPageGetOpaque(page)->rightlink = rightlink;
99 
100  /*
101  * tail page may contain only whole row(s) or final part of row placed on
102  * previous pages (a "row" here meaning all the index tuples generated for
103  * one heap tuple)
104  */
105  if (rightlink == InvalidBlockNumber)
106  {
107  GinPageSetFullRow(page);
108  GinPageGetOpaque(page)->maxoff = 1;
109  }
110  else
111  {
112  GinPageGetOpaque(page)->maxoff = 0;
113  }
114 
115  MarkBufferDirty(buffer);
116 
117  if (RelationNeedsWAL(index))
118  {
120  XLogRecPtr recptr;
121 
122  data.rightlink = rightlink;
123  data.ntuples = ntuples;
124 
125  XLogBeginInsert();
126  XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
127 
129  XLogRegisterBufData(0, workspace, size);
130 
131  recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
132  PageSetLSN(page, recptr);
133  }
134 
135  /* get free space before releasing buffer */
136  freesize = PageGetExactFreeSpace(page);
137 
138  UnlockReleaseBuffer(buffer);
139 
141 
142  pfree(workspace);
143 
144  return freesize;
145 }
146 
147 static void
149  GinMetaPageData *res)
150 {
151  Buffer curBuffer = InvalidBuffer;
152  Buffer prevBuffer = InvalidBuffer;
153  int i,
154  size = 0,
155  tupsize;
156  int startTuple = 0;
157 
158  Assert(ntuples > 0);
159 
160  /*
161  * Split tuples into pages
162  */
163  for (i = 0; i < ntuples; i++)
164  {
165  if (curBuffer == InvalidBuffer)
166  {
167  curBuffer = GinNewBuffer(index);
168 
169  if (prevBuffer != InvalidBuffer)
170  {
171  res->nPendingPages++;
172  writeListPage(index, prevBuffer,
173  tuples + startTuple,
174  i - startTuple,
175  BufferGetBlockNumber(curBuffer));
176  }
177  else
178  {
179  res->head = BufferGetBlockNumber(curBuffer);
180  }
181 
182  prevBuffer = curBuffer;
183  startTuple = i;
184  size = 0;
185  }
186 
187  tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
188 
189  if (size + tupsize > GinListPageSize)
190  {
191  /* won't fit, force a new page and reprocess */
192  i--;
193  curBuffer = InvalidBuffer;
194  }
195  else
196  {
197  size += tupsize;
198  }
199  }
200 
201  /*
202  * Write last page
203  */
204  res->tail = BufferGetBlockNumber(curBuffer);
205  res->tailFreeSize = writeListPage(index, curBuffer,
206  tuples + startTuple,
207  ntuples - startTuple,
209  res->nPendingPages++;
210  /* that was only one heap tuple */
211  res->nPendingHeapTuples = 1;
212 }
213 
214 /*
215  * Write the index tuples contained in *collector into the index's
216  * pending list.
217  *
218  * Function guarantees that all these tuples will be inserted consecutively,
219  * preserving order
220  */
221 void
223 {
224  Relation index = ginstate->index;
225  Buffer metabuffer;
226  Page metapage;
227  GinMetaPageData *metadata = NULL;
229  Page page = NULL;
230  ginxlogUpdateMeta data;
231  bool separateList = false;
232  bool needCleanup = false;
233  int cleanupSize;
234  bool needWal;
235 
236  if (collector->ntuples == 0)
237  return;
238 
239  needWal = RelationNeedsWAL(index);
240 
241  data.node = index->rd_node;
242  data.ntuples = 0;
244 
245  metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
246  metapage = BufferGetPage(metabuffer);
247 
248  if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
249  {
250  /*
251  * Total size is greater than one page => make sublist
252  */
253  separateList = true;
254  }
255  else
256  {
257  LockBuffer(metabuffer, GIN_EXCLUSIVE);
258  metadata = GinPageGetMeta(metapage);
259 
260  if (metadata->head == InvalidBlockNumber ||
261  collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
262  {
263  /*
264  * Pending list is empty or total size is greater than freespace
265  * on tail page => make sublist
266  *
267  * We unlock metabuffer to keep high concurrency
268  */
269  separateList = true;
270  LockBuffer(metabuffer, GIN_UNLOCK);
271  }
272  }
273 
274  if (separateList)
275  {
276  /*
277  * We should make sublist separately and append it to the tail
278  */
279  GinMetaPageData sublist;
280 
281  memset(&sublist, 0, sizeof(GinMetaPageData));
282  makeSublist(index, collector->tuples, collector->ntuples, &sublist);
283 
284  if (needWal)
285  XLogBeginInsert();
286 
287  /*
288  * metapage was unlocked, see above
289  */
290  LockBuffer(metabuffer, GIN_EXCLUSIVE);
291  metadata = GinPageGetMeta(metapage);
292 
293  if (metadata->head == InvalidBlockNumber)
294  {
295  /*
296  * Main list is empty, so just insert sublist as main list
297  */
299 
300  metadata->head = sublist.head;
301  metadata->tail = sublist.tail;
302  metadata->tailFreeSize = sublist.tailFreeSize;
303 
304  metadata->nPendingPages = sublist.nPendingPages;
305  metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
306  }
307  else
308  {
309  /*
310  * Merge lists
311  */
312  data.prevTail = metadata->tail;
313  data.newRightlink = sublist.head;
314 
315  buffer = ReadBuffer(index, metadata->tail);
316  LockBuffer(buffer, GIN_EXCLUSIVE);
317  page = BufferGetPage(buffer);
318 
319  Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
320 
322 
323  GinPageGetOpaque(page)->rightlink = sublist.head;
324 
325  MarkBufferDirty(buffer);
326 
327  metadata->tail = sublist.tail;
328  metadata->tailFreeSize = sublist.tailFreeSize;
329 
330  metadata->nPendingPages += sublist.nPendingPages;
331  metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
332 
333  if (needWal)
335  }
336  }
337  else
338  {
339  /*
340  * Insert into tail page. Metapage is already locked
341  */
342  OffsetNumber l,
343  off;
344  int i,
345  tupsize;
346  char *ptr;
347  char *collectordata;
348 
349  buffer = ReadBuffer(index, metadata->tail);
350  LockBuffer(buffer, GIN_EXCLUSIVE);
351  page = BufferGetPage(buffer);
352 
353  off = (PageIsEmpty(page)) ? FirstOffsetNumber :
355 
356  collectordata = ptr = (char *) palloc(collector->sumsize);
357 
358  data.ntuples = collector->ntuples;
359 
360  if (needWal)
361  XLogBeginInsert();
362 
364 
365  /*
366  * Increase counter of heap tuples
367  */
368  Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
369  GinPageGetOpaque(page)->maxoff++;
370  metadata->nPendingHeapTuples++;
371 
372  for (i = 0; i < collector->ntuples; i++)
373  {
374  tupsize = IndexTupleSize(collector->tuples[i]);
375  l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
376 
377  if (l == InvalidOffsetNumber)
378  elog(ERROR, "failed to add item to index page in \"%s\"",
379  RelationGetRelationName(index));
380 
381  memcpy(ptr, collector->tuples[i], tupsize);
382  ptr += tupsize;
383 
384  off++;
385  }
386 
387  Assert((ptr - collectordata) <= collector->sumsize);
388  if (needWal)
389  {
391  XLogRegisterBufData(1, collectordata, collector->sumsize);
392  }
393 
394  metadata->tailFreeSize = PageGetExactFreeSpace(page);
395 
396  MarkBufferDirty(buffer);
397  }
398 
399  /*
400  * Write metabuffer, make xlog entry
401  */
402  MarkBufferDirty(metabuffer);
403 
404  if (needWal)
405  {
406  XLogRecPtr recptr;
407 
408  memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
409 
410  XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
411  XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
412 
413  recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
414  PageSetLSN(metapage, recptr);
415 
416  if (buffer != InvalidBuffer)
417  {
418  PageSetLSN(page, recptr);
419  }
420  }
421 
422  if (buffer != InvalidBuffer)
423  UnlockReleaseBuffer(buffer);
424 
425  /*
426  * Force pending list cleanup when it becomes too long. And,
427  * ginInsertCleanup could take significant amount of time, so we prefer to
428  * call it when it can do all the work in a single collection cycle. In
429  * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
430  * while pending list is still small enough to fit into
431  * gin_pending_list_limit.
432  *
433  * ginInsertCleanup() should not be called inside our CRIT_SECTION.
434  */
435  cleanupSize = GinGetPendingListCleanupSize(index);
436  if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
437  needCleanup = true;
438 
439  UnlockReleaseBuffer(metabuffer);
440 
442 
443  if (needCleanup)
444  ginInsertCleanup(ginstate, false, true, NULL);
445 }
446 
447 /*
448  * Create temporary index tuples for a single indexable item (one index column
449  * for the heap tuple specified by ht_ctid), and append them to the array
450  * in *collector. They will subsequently be written out using
451  * ginHeapTupleFastInsert. Note that to guarantee consistent state, all
452  * temp tuples for a given heap tuple must be written in one call to
453  * ginHeapTupleFastInsert.
454  */
455 void
457  GinTupleCollector *collector,
458  OffsetNumber attnum, Datum value, bool isNull,
459  ItemPointer ht_ctid)
460 {
461  Datum *entries;
462  GinNullCategory *categories;
463  int32 i,
464  nentries;
465 
466  /*
467  * Extract the key values that need to be inserted in the index
468  */
469  entries = ginExtractEntries(ginstate, attnum, value, isNull,
470  &nentries, &categories);
471 
472  /*
473  * Allocate/reallocate memory for storing collected tuples
474  */
475  if (collector->tuples == NULL)
476  {
477  collector->lentuples = nentries * ginstate->origTupdesc->natts;
478  collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
479  }
480 
481  while (collector->ntuples + nentries > collector->lentuples)
482  {
483  collector->lentuples *= 2;
484  collector->tuples = (IndexTuple *) repalloc(collector->tuples,
485  sizeof(IndexTuple) * collector->lentuples);
486  }
487 
488  /*
489  * Build an index tuple for each key value, and add to array. In pending
490  * tuples we just stick the heap TID into t_tid.
491  */
492  for (i = 0; i < nentries; i++)
493  {
494  IndexTuple itup;
495 
496  itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
497  NULL, 0, 0, true);
498  itup->t_tid = *ht_ctid;
499  collector->tuples[collector->ntuples++] = itup;
500  collector->sumsize += IndexTupleSize(itup);
501  }
502 }
503 
504 /*
505  * Deletes pending list pages up to (not including) newHead page.
506  * If newHead == InvalidBlockNumber then function drops the whole list.
507  *
508  * metapage is pinned and exclusive-locked throughout this function.
509  */
510 static void
512  bool fill_fsm, IndexBulkDeleteResult *stats)
513 {
514  Page metapage;
515  GinMetaPageData *metadata;
516  BlockNumber blknoToDelete;
517 
518  metapage = BufferGetPage(metabuffer);
519  metadata = GinPageGetMeta(metapage);
520  blknoToDelete = metadata->head;
521 
522  do
523  {
524  Page page;
525  int i;
526  int64 nDeletedHeapTuples = 0;
528  Buffer buffers[GIN_NDELETE_AT_ONCE];
529  BlockNumber freespace[GIN_NDELETE_AT_ONCE];
530 
531  data.ndeleted = 0;
532  while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
533  {
534  freespace[data.ndeleted] = blknoToDelete;
535  buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
536  LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
537  page = BufferGetPage(buffers[data.ndeleted]);
538 
539  data.ndeleted++;
540 
541  Assert(!GinPageIsDeleted(page));
542 
543  nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
544  blknoToDelete = GinPageGetOpaque(page)->rightlink;
545  }
546 
547  if (stats)
548  stats->pages_deleted += data.ndeleted;
549 
550  /*
551  * This operation touches an unusually large number of pages, so
552  * prepare the XLogInsert machinery for that before entering the
553  * critical section.
554  */
555  if (RelationNeedsWAL(index))
557 
559 
560  metadata->head = blknoToDelete;
561 
562  Assert(metadata->nPendingPages >= data.ndeleted);
563  metadata->nPendingPages -= data.ndeleted;
564  Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
565  metadata->nPendingHeapTuples -= nDeletedHeapTuples;
566 
567  if (blknoToDelete == InvalidBlockNumber)
568  {
569  metadata->tail = InvalidBlockNumber;
570  metadata->tailFreeSize = 0;
571  metadata->nPendingPages = 0;
572  metadata->nPendingHeapTuples = 0;
573  }
574 
575  MarkBufferDirty(metabuffer);
576 
577  for (i = 0; i < data.ndeleted; i++)
578  {
579  page = BufferGetPage(buffers[i]);
580  GinPageGetOpaque(page)->flags = GIN_DELETED;
581  MarkBufferDirty(buffers[i]);
582  }
583 
584  if (RelationNeedsWAL(index))
585  {
586  XLogRecPtr recptr;
587 
588  XLogBeginInsert();
589  XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
590  for (i = 0; i < data.ndeleted; i++)
591  XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
592 
593  memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
594 
595  XLogRegisterData((char *) &data,
596  sizeof(ginxlogDeleteListPages));
597 
598  recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
599  PageSetLSN(metapage, recptr);
600 
601  for (i = 0; i < data.ndeleted; i++)
602  {
603  page = BufferGetPage(buffers[i]);
604  PageSetLSN(page, recptr);
605  }
606  }
607 
608  for (i = 0; i < data.ndeleted; i++)
609  UnlockReleaseBuffer(buffers[i]);
610 
612 
613  for (i = 0; fill_fsm && i < data.ndeleted; i++)
614  RecordFreeIndexPage(index, freespace[i]);
615 
616  } while (blknoToDelete != newHead);
617 }
618 
619 /* Initialize empty KeyArray */
620 static void
621 initKeyArray(KeyArray *keys, int32 maxvalues)
622 {
623  keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
624  keys->categories = (GinNullCategory *)
625  palloc(sizeof(GinNullCategory) * maxvalues);
626  keys->nvalues = 0;
627  keys->maxvalues = maxvalues;
628 }
629 
630 /* Add datum to KeyArray, resizing if needed */
631 static void
632 addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
633 {
634  if (keys->nvalues >= keys->maxvalues)
635  {
636  keys->maxvalues *= 2;
637  keys->keys = (Datum *)
638  repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
639  keys->categories = (GinNullCategory *)
640  repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
641  }
642 
643  keys->keys[keys->nvalues] = datum;
644  keys->categories[keys->nvalues] = category;
645  keys->nvalues++;
646 }
647 
648 /*
649  * Collect data from a pending-list page in preparation for insertion into
650  * the main index.
651  *
652  * Go through all tuples >= startoff on page and collect values in accum
653  *
654  * Note that ka is just workspace --- it does not carry any state across
655  * calls.
656  */
657 static void
659  Page page, OffsetNumber startoff)
660 {
661  ItemPointerData heapptr;
662  OffsetNumber i,
663  maxoff;
664  OffsetNumber attrnum;
665 
666  /* reset *ka to empty */
667  ka->nvalues = 0;
668 
669  maxoff = PageGetMaxOffsetNumber(page);
670  Assert(maxoff >= FirstOffsetNumber);
671  ItemPointerSetInvalid(&heapptr);
672  attrnum = 0;
673 
674  for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
675  {
676  IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
677  OffsetNumber curattnum;
678  Datum curkey;
679  GinNullCategory curcategory;
680 
681  /* Check for change of heap TID or attnum */
682  curattnum = gintuple_get_attrnum(accum->ginstate, itup);
683 
684  if (!ItemPointerIsValid(&heapptr))
685  {
686  heapptr = itup->t_tid;
687  attrnum = curattnum;
688  }
689  else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
690  curattnum == attrnum))
691  {
692  /*
693  * ginInsertBAEntries can insert several datums per call, but only
694  * for one heap tuple and one column. So call it at a boundary,
695  * and reset ka.
696  */
697  ginInsertBAEntries(accum, &heapptr, attrnum,
698  ka->keys, ka->categories, ka->nvalues);
699  ka->nvalues = 0;
700  heapptr = itup->t_tid;
701  attrnum = curattnum;
702  }
703 
704  /* Add key to KeyArray */
705  curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
706  addDatum(ka, curkey, curcategory);
707  }
708 
709  /* Dump out all remaining keys */
710  ginInsertBAEntries(accum, &heapptr, attrnum,
711  ka->keys, ka->categories, ka->nvalues);
712 }
713 
714 /*
715  * Move tuples from pending pages into regular GIN structure.
716  *
717  * On first glance it looks completely not crash-safe. But if we crash
718  * after posting entries to the main index and before removing them from the
719  * pending list, it's okay because when we redo the posting later on, nothing
720  * bad will happen.
721  *
722  * fill_fsm indicates that ginInsertCleanup should add deleted pages
723  * to FSM otherwise caller is responsible to put deleted pages into
724  * FSM.
725  *
726  * If stats isn't null, we count deleted pending pages into the counts.
727  */
728 void
729 ginInsertCleanup(GinState *ginstate, bool full_clean,
730  bool fill_fsm, IndexBulkDeleteResult *stats)
731 {
732  Relation index = ginstate->index;
733  Buffer metabuffer,
734  buffer;
735  Page metapage,
736  page;
737  GinMetaPageData *metadata;
739  oldCtx;
740  BuildAccumulator accum;
741  KeyArray datums;
742  BlockNumber blkno,
743  blknoFinish;
744  bool cleanupFinish = false;
745  bool fsm_vac = false;
746  Size workMemory;
747  bool inVacuum = (stats == NULL);
748 
749  /*
750  * We would like to prevent concurrent cleanup process. For that we will
751  * lock metapage in exclusive mode using LockPage() call. Nobody other
752  * will use that lock for metapage, so we keep possibility of concurrent
753  * insertion into pending list
754  */
755 
756  if (inVacuum)
757  {
758  /*
759  * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
760  * and we would like to wait concurrent cleanup to finish.
761  */
763  workMemory =
766  }
767  else
768  {
769  /*
770  * We are called from regular insert and if we see concurrent cleanup
771  * just exit in hope that concurrent process will clean up pending
772  * list.
773  */
775  return;
776  workMemory = work_mem;
777  }
778 
779  metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
780  LockBuffer(metabuffer, GIN_SHARE);
781  metapage = BufferGetPage(metabuffer);
782  metadata = GinPageGetMeta(metapage);
783 
784  if (metadata->head == InvalidBlockNumber)
785  {
786  /* Nothing to do */
787  UnlockReleaseBuffer(metabuffer);
789  return;
790  }
791 
792  /*
793  * Remember a tail page to prevent infinite cleanup if other backends add
794  * new tuples faster than we can cleanup.
795  */
796  blknoFinish = metadata->tail;
797 
798  /*
799  * Read and lock head of pending list
800  */
801  blkno = metadata->head;
802  buffer = ReadBuffer(index, blkno);
803  LockBuffer(buffer, GIN_SHARE);
804  page = BufferGetPage(buffer);
805 
806  LockBuffer(metabuffer, GIN_UNLOCK);
807 
808  /*
809  * Initialize. All temporary space will be in opCtx
810  */
812  "GIN insert cleanup temporary context",
814 
815  oldCtx = MemoryContextSwitchTo(opCtx);
816 
817  initKeyArray(&datums, 128);
818  ginInitBA(&accum);
819  accum.ginstate = ginstate;
820 
821  /*
822  * At the top of this loop, we have pin and lock on the current page of
823  * the pending list. However, we'll release that before exiting the loop.
824  * Note we also have pin but not lock on the metapage.
825  */
826  for (;;)
827  {
828  Assert(!GinPageIsDeleted(page));
829 
830  /*
831  * Are we walk through the page which as we remember was a tail when
832  * we start our cleanup? But if caller asks us to clean up whole
833  * pending list then ignore old tail, we will work until list becomes
834  * empty.
835  */
836  if (blkno == blknoFinish && full_clean == false)
837  cleanupFinish = true;
838 
839  /*
840  * read page's datums into accum
841  */
842  processPendingPage(&accum, &datums, page, FirstOffsetNumber);
843 
845 
846  /*
847  * Is it time to flush memory to disk? Flush if we are at the end of
848  * the pending list, or if we have a full row and memory is getting
849  * full.
850  */
851  if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
852  (GinPageHasFullRow(page) &&
853  (accum.allocatedMemory >= workMemory * 1024L)))
854  {
856  uint32 nlist;
857  Datum key;
858  GinNullCategory category;
859  OffsetNumber maxoff,
860  attnum;
861 
862  /*
863  * Unlock current page to increase performance. Changes of page
864  * will be checked later by comparing maxoff after completion of
865  * memory flush.
866  */
867  maxoff = PageGetMaxOffsetNumber(page);
868  LockBuffer(buffer, GIN_UNLOCK);
869 
870  /*
871  * Moving collected data into regular structure can take
872  * significant amount of time - so, run it without locking pending
873  * list.
874  */
875  ginBeginBAScan(&accum);
876  while ((list = ginGetBAEntry(&accum,
877  &attnum, &key, &category, &nlist)) != NULL)
878  {
879  ginEntryInsert(ginstate, attnum, key, category,
880  list, nlist, NULL);
882  }
883 
884  /*
885  * Lock the whole list to remove pages
886  */
887  LockBuffer(metabuffer, GIN_EXCLUSIVE);
888  LockBuffer(buffer, GIN_SHARE);
889 
890  Assert(!GinPageIsDeleted(page));
891 
892  /*
893  * While we left the page unlocked, more stuff might have gotten
894  * added to it. If so, process those entries immediately. There
895  * shouldn't be very many, so we don't worry about the fact that
896  * we're doing this with exclusive lock. Insertion algorithm
897  * guarantees that inserted row(s) will not continue on next page.
898  * NOTE: intentionally no vacuum_delay_point in this loop.
899  */
900  if (PageGetMaxOffsetNumber(page) != maxoff)
901  {
902  ginInitBA(&accum);
903  processPendingPage(&accum, &datums, page, maxoff + 1);
904 
905  ginBeginBAScan(&accum);
906  while ((list = ginGetBAEntry(&accum,
907  &attnum, &key, &category, &nlist)) != NULL)
908  ginEntryInsert(ginstate, attnum, key, category,
909  list, nlist, NULL);
910  }
911 
912  /*
913  * Remember next page - it will become the new list head
914  */
915  blkno = GinPageGetOpaque(page)->rightlink;
916  UnlockReleaseBuffer(buffer); /* shiftList will do exclusive
917  * locking */
918 
919  /*
920  * remove read pages from pending list, at this point all content
921  * of read pages is in regular structure
922  */
923  shiftList(index, metabuffer, blkno, fill_fsm, stats);
924 
925  /* At this point, some pending pages have been freed up */
926  fsm_vac = true;
927 
928  Assert(blkno == metadata->head);
929  LockBuffer(metabuffer, GIN_UNLOCK);
930 
931  /*
932  * if we removed the whole pending list or we cleanup tail (which
933  * we remembered on start our cleanup process) then just exit
934  */
935  if (blkno == InvalidBlockNumber || cleanupFinish)
936  break;
937 
938  /*
939  * release memory used so far and reinit state
940  */
941  MemoryContextReset(opCtx);
942  initKeyArray(&datums, datums.maxvalues);
943  ginInitBA(&accum);
944  }
945  else
946  {
947  blkno = GinPageGetOpaque(page)->rightlink;
948  UnlockReleaseBuffer(buffer);
949  }
950 
951  /*
952  * Read next page in pending list
953  */
955  buffer = ReadBuffer(index, blkno);
956  LockBuffer(buffer, GIN_SHARE);
957  page = BufferGetPage(buffer);
958  }
959 
961  ReleaseBuffer(metabuffer);
962 
963  /*
964  * As pending list pages can have a high churn rate, it is desirable to
965  * recycle them immediately to the FreeSpace Map when ordinary backends
966  * clean the list.
967  */
968  if (fsm_vac && fill_fsm)
970 
971 
972  /* Clean up temporary space */
973  MemoryContextSwitchTo(oldCtx);
974  MemoryContextDelete(opCtx);
975 }
976 
977 /*
978  * SQL-callable function to clean the insert pending list
979  */
980 Datum
982 {
983  Oid indexoid = PG_GETARG_OID(0);
984  Relation indexRel = index_open(indexoid, AccessShareLock);
985  IndexBulkDeleteResult stats;
986  GinState ginstate;
987 
988  if (RecoveryInProgress())
989  ereport(ERROR,
990  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
991  errmsg("recovery is in progress"),
992  errhint("GIN pending list cannot be cleaned up during recovery.")));
993 
994  /* Must be a GIN index */
995  if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
996  indexRel->rd_rel->relam != GIN_AM_OID)
997  ereport(ERROR,
998  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
999  errmsg("\"%s\" is not a GIN index",
1000  RelationGetRelationName(indexRel))));
1001 
1002  /*
1003  * Reject attempts to read non-local temporary relations; we would be
1004  * likely to get wrong data since we have no visibility into the owning
1005  * session's local buffers.
1006  */
1007  if (RELATION_IS_OTHER_TEMP(indexRel))
1008  ereport(ERROR,
1009  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1010  errmsg("cannot access temporary indexes of other sessions")));
1011 
1012  /* User must own the index (comparable to privileges needed for VACUUM) */
1013  if (!pg_class_ownercheck(indexoid, GetUserId()))
1015  RelationGetRelationName(indexRel));
1016 
1017  memset(&stats, 0, sizeof(stats));
1018  initGinState(&ginstate, indexRel);
1019  ginInsertCleanup(&ginstate, true, true, &stats);
1020 
1021  index_close(indexRel, AccessShareLock);
1022 
1023  PG_RETURN_INT64((int64) stats.pages_deleted);
1024 }
#define GinPageHasFullRow(page)
Definition: ginblock.h:118
int autovacuum_work_mem
Definition: autovacuum.c:112
#define ItemPointerIsValid(pointer)
Definition: itemptr.h:59
BlockNumber prevTail
Definition: ginxlog.h:173
#define GIN_UNLOCK
Definition: gin_private.h:43
void XLogRegisterBufData(uint8 block_id, char *data, int len)
Definition: xloginsert.c:361
#define GIN_DELETED
Definition: ginblock.h:41
Buffer GinNewBuffer(Relation index)
Definition: ginutil.c:287
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
#define PageIsEmpty(page)
Definition: bufpage.h:219
RelFileNode node
Definition: ginxlog.h:171
void ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
Definition: ginfast.c:222
int errhint(const char *fmt,...)
Definition: elog.c:987
Relation index
Definition: gin_private.h:53
BlockNumber rightlink
Definition: ginxlog.h:185
Oid GetUserId(void)
Definition: miscinit.c:283
#define PG_RETURN_INT64(x)
Definition: fmgr.h:327
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:1450
#define ExclusiveLock
Definition: lockdefs.h:44
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:213
ItemPointerData t_tid
Definition: itup.h:37
#define END_CRIT_SECTION()
Definition: miscadmin.h:132
static MemoryContext opCtx
Definition: ginxlog.c:22
#define GinListPageSize
Definition: ginblock.h:311
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Pointer Item
Definition: item.h:17
static void addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
Definition: ginfast.c:632
#define AccessShareLock
Definition: lockdefs.h:36
#define InvalidBuffer
Definition: buf.h:25
#define REGBUF_WILL_INIT
Definition: xloginsert.h:32
void ginInsertCleanup(GinState *ginstate, bool full_clean, bool fill_fsm, IndexBulkDeleteResult *stats)
Definition: ginfast.c:729
#define GIN_NDELETE_AT_ONCE
Definition: ginxlog.h:203
#define START_CRIT_SECTION()
Definition: miscadmin.h:130
int errcode(int sqlerrcode)
Definition: elog.c:575
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:413
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:135
uint32 BlockNumber
Definition: block.h:31
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3309
#define GinPageGetOpaque(page)
Definition: ginblock.h:109
#define GIN_METAPAGE_BLKNO
Definition: ginblock.h:50
static int32 writeListPage(Relation index, Buffer buffer, IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
Definition: ginfast.c:57
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
bool RecoveryInProgress(void)
Definition: xlog.c:7860
#define PageGetMaxOffsetNumber(page)
Definition: bufpage.h:354
int natts
Definition: tupdesc.h:73
static void makeSublist(Relation index, IndexTuple *tuples, int32 ntuples, GinMetaPageData *res)
Definition: ginfast.c:148
signed int int32
Definition: c.h:256
#define GIN_AM_OID
Definition: pg_am.h:79
uint16 OffsetNumber
Definition: off.h:24
#define XLOG_GIN_INSERT_LISTPAGE
Definition: ginxlog.h:181
static void initKeyArray(KeyArray *keys, int32 maxvalues)
Definition: ginfast.c:621
Definition: type.h:90
static struct @114 value
int64 nPendingHeapTuples
Definition: ginblock.h:73
GinMetaPageData metadata
Definition: ginxlog.h:206
void pfree(void *pointer)
Definition: mcxt.c:950
void ginEntryInsert(GinState *ginstate, OffsetNumber attnum, Datum key, GinNullCategory category, ItemPointerData *items, uint32 nitem, GinStatsData *buildStats)
Definition: gininsert.c:177
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3332
#define ERROR
Definition: elog.h:43
signed char GinNullCategory
Definition: ginblock.h:190
#define XLOG_GIN_UPDATE_META_PAGE
Definition: ginxlog.h:163
BlockNumber head
Definition: ginblock.h:60
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
void LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
Definition: lmgr.c:400
OffsetNumber gintuple_get_attrnum(GinState *ginstate, IndexTuple tuple)
Definition: ginutil.c:213
#define GIN_LIST
Definition: ginblock.h:43
#define PG_GETARG_OID(n)
Definition: fmgr.h:240
BlockNumber tail
Definition: ginblock.h:61
void aclcheck_error(AclResult aclerr, AclObjectKind objectkind, const char *objectname)
Definition: aclchk.c:3382
#define FirstOffsetNumber
Definition: off.h:27
IndexTupleData * IndexTuple
Definition: itup.h:53
#define REGBUF_STANDARD
Definition: xloginsert.h:35
void initGinState(GinState *state, Relation index)
Definition: ginutil.c:86
#define GinGetPendingListCleanupSize(relation)
Definition: gin_private.h:35
int32 nvalues
Definition: ginfast.c:46
#define RelationGetRelationName(relation)
Definition: rel.h:437
unsigned int uint32
Definition: c.h:268
struct ItemIdData ItemIdData
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
GinState * ginstate
Definition: gin_private.h:407
Datum gintuple_get_key(GinState *ginstate, IndexTuple tuple, GinNullCategory *category)
Definition: ginutil.c:246
BlockNumber pages_deleted
Definition: genam.h:78
#define BufferGetPage(buffer)
Definition: bufmgr.h:160
bool IsAutoVacuumWorkerProcess(void)
Definition: autovacuum.c:2988
#define ereport(elevel, rest)
Definition: elog.h:122
BlockNumber newRightlink
Definition: ginxlog.h:174
#define GIN_SHARE
Definition: gin_private.h:44
#define GinPageSetFullRow(page)
Definition: ginblock.h:119
int32 maxvalues
Definition: ginfast.c:47
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:232
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
Datum * keys
Definition: ginfast.c:44
IndexTuple * tuples
Definition: gin_private.h:429
static void processPendingPage(BuildAccumulator *accum, KeyArray *ka, Page page, OffsetNumber startoff)
Definition: ginfast.c:658
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
#define GinPageIsDeleted(page)
Definition: ginblock.h:123
uintptr_t Datum
Definition: postgres.h:372
#define GIN_EXCLUSIVE
Definition: gin_private.h:45
void ginInsertBAEntries(BuildAccumulator *accum, ItemPointer heapptr, OffsetNumber attnum, Datum *entries, GinNullCategory *categories, int32 nentries)
Definition: ginbulk.c:209
void LockBuffer(Buffer buffer, int mode)
Definition: bufmgr.c:3546
int work_mem
Definition: globals.c:112
#define InvalidOffsetNumber
Definition: off.h:26
int maintenance_work_mem
Definition: globals.c:113
GinMetaPageData metadata
Definition: ginxlog.h:172
RelFileNode rd_node
Definition: rel.h:85
#define NULL
Definition: c.h:229
uint32 tailFreeSize
Definition: ginblock.h:66
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
static void shiftList(Relation index, Buffer metabuffer, BlockNumber newHead, bool fill_fsm, IndexBulkDeleteResult *stats)
Definition: ginfast.c:511
IndexTuple GinFormTuple(GinState *ginstate, OffsetNumber attnum, Datum key, GinNullCategory category, Pointer data, Size dataSize, int nipd, bool errorTooBig)
Definition: ginentrypage.c:45
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:534
void IndexFreeSpaceMapVacuum(Relation rel)
Definition: indexfsm.c:71
bool pg_class_ownercheck(Oid class_oid, Oid roleid)
Definition: aclchk.c:4529
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:207
Buffer ReadBuffer(Relation reln, BlockNumber blockNum)
Definition: bufmgr.c:594
#define OffsetNumberNext(offsetNumber)
Definition: off.h:53
size_t Size
Definition: c.h:356
#define InvalidBlockNumber
Definition: block.h:33
void XLogEnsureRecordSpace(int max_block_id, int ndatas)
Definition: xloginsert.c:146
void GinInitBuffer(Buffer b, uint32 f)
Definition: ginutil.c:351
ItemPointerData * ginGetBAEntry(BuildAccumulator *accum, OffsetNumber *attnum, Datum *key, GinNullCategory *category, uint32 *n)
Definition: ginbulk.c:267
#define MAXALIGN(LEN)
Definition: c.h:588
#define RelationNeedsWAL(relation)
Definition: rel.h:506
void UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
Definition: lmgr.c:435
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
Definition: itemptr.c:29
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:963
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:176
Size PageGetExactFreeSpace(Page page)
Definition: bufpage.c:633
bool ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
Definition: lmgr.c:419
void ginBeginBAScan(BuildAccumulator *accum)
Definition: ginbulk.c:256
Datum gin_clean_pending_list(PG_FUNCTION_ARGS)
Definition: ginfast.c:981
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:2605
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:131
tuple list
Definition: sort-test.py:11
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
#define RELKIND_INDEX
Definition: pg_class.h:161
void ginHeapTupleFastCollect(GinState *ginstate, GinTupleCollector *collector, OffsetNumber attnum, Datum value, bool isNull, ItemPointer ht_ctid)
Definition: ginfast.c:456
#define PG_FUNCTION_ARGS
Definition: fmgr.h:158
TupleDesc origTupdesc
Definition: gin_private.h:67
#define GIN_PAGE_FREESIZE
Definition: ginfast.c:39
GinNullCategory * categories
Definition: ginfast.c:45
#define GinPageGetMeta(p)
Definition: ginblock.h:103
#define elog
Definition: elog.h:219
int gin_pending_list_limit
Definition: ginfast.c:37
void vacuum_delay_point(void)
Definition: vacuum.c:1560
void XLogBeginInsert(void)
Definition: xloginsert.c:120
void RecordFreeIndexPage(Relation rel, BlockNumber freeBlock)
Definition: indexfsm.c:52
#define PageSetLSN(page, lsn)
Definition: bufpage.h:365
int Buffer
Definition: buf.h:23
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:151
Datum * ginExtractEntries(GinState *ginstate, OffsetNumber attnum, Datum value, bool isNull, int32 *nentries, GinNullCategory **categories)
Definition: ginutil.c:476
struct KeyArray KeyArray
#define PageGetItem(page, itemId)
Definition: bufpage.h:337
Pointer Page
Definition: bufpage.h:74
#define IndexTupleSize(itup)
Definition: itup.h:70
void ginInitBA(BuildAccumulator *accum)
Definition: ginbulk.c:109
BlockNumber nPendingPages
Definition: ginblock.h:72
#define XLOG_GIN_DELETE_LISTPAGE
Definition: ginxlog.h:195