PostgreSQL Source Code  git master
vacuumparallel.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * vacuumparallel.c
4  * Support routines for parallel vacuum execution.
5  *
6  * This file contains routines that are intended to support setting up, using,
7  * and tearing down a ParallelVacuumState.
8  *
9  * In a parallel vacuum, we perform both index bulk deletion and index cleanup
10  * with parallel worker processes. Individual indexes are processed by one
11  * vacuum process. ParallelVacuumState contains shared information as well as
12  * the memory space for storing dead items allocated in the DSM segment. We
13  * launch parallel worker processes at the start of parallel index
14  * bulk-deletion and index cleanup and once all indexes are processed, the
15  * parallel worker processes exit. Each time we process indexes in parallel,
16  * the parallel context is re-initialized so that the same DSM can be used for
17  * multiple passes of index bulk-deletion and index cleanup.
18  *
19  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
20  * Portions Copyright (c) 1994, Regents of the University of California
21  *
22  * IDENTIFICATION
23  * src/backend/commands/vacuumparallel.c
24  *
25  *-------------------------------------------------------------------------
26  */
27 #include "postgres.h"
28 
29 #include "access/amapi.h"
30 #include "access/table.h"
31 #include "access/xact.h"
32 #include "catalog/index.h"
33 #include "commands/vacuum.h"
34 #include "optimizer/paths.h"
35 #include "pgstat.h"
36 #include "storage/bufmgr.h"
37 #include "tcop/tcopprot.h"
38 #include "utils/lsyscache.h"
39 #include "utils/rel.h"
40 
41 /*
42  * DSM keys for parallel vacuum. Unlike other parallel execution code, since
43  * we don't need to worry about DSM keys conflicting with plan_node_id we can
44  * use small integers.
45  */
46 #define PARALLEL_VACUUM_KEY_SHARED 1
47 #define PARALLEL_VACUUM_KEY_DEAD_ITEMS 2
48 #define PARALLEL_VACUUM_KEY_QUERY_TEXT 3
49 #define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4
50 #define PARALLEL_VACUUM_KEY_WAL_USAGE 5
51 #define PARALLEL_VACUUM_KEY_INDEX_STATS 6
52 
53 /*
54  * Shared information among parallel workers. So this is allocated in the DSM
55  * segment.
56  */
57 typedef struct PVShared
58 {
59  /*
60  * Target table relid and log level (for messages about parallel workers
61  * launched during VACUUM VERBOSE). These fields are not modified during
62  * the parallel vacuum.
63  */
65  int elevel;
66 
67  /*
68  * Fields for both index vacuum and cleanup.
69  *
70  * reltuples is the total number of input heap tuples. We set either old
71  * live tuples in the index vacuum case or the new live tuples in the
72  * index cleanup case.
73  *
74  * estimated_count is true if reltuples is an estimated value. (Note that
75  * reltuples could be -1 in this case, indicating we have no idea.)
76  */
77  double reltuples;
79 
80  /*
81  * In single process vacuum we could consume more memory during index
82  * vacuuming or cleanup apart from the memory for heap scanning. In
83  * parallel vacuum, since individual vacuum workers can consume memory
84  * equal to maintenance_work_mem, the new maintenance_work_mem for each
85  * worker is set such that the parallel operation doesn't consume more
86  * memory than single process vacuum.
87  */
89 
90  /*
91  * The number of buffers each worker's Buffer Access Strategy ring should
92  * contain.
93  */
95 
96  /*
97  * Shared vacuum cost balance. During parallel vacuum,
98  * VacuumSharedCostBalance points to this value and it accumulates the
99  * balance of each parallel vacuum worker.
100  */
102 
103  /*
104  * Number of active parallel workers. This is used for computing the
105  * minimum threshold of the vacuum cost balance before a worker sleeps for
106  * cost-based delay.
107  */
109 
110  /* Counter for vacuuming and cleanup */
113 
114 /* Status used during parallel index vacuum or cleanup */
115 typedef enum PVIndVacStatus
116 {
122 
123 /*
124  * Struct for index vacuum statistics of an index that is used for parallel vacuum.
125  * This includes the status of parallel index vacuum as well as index statistics.
126  */
127 typedef struct PVIndStats
128 {
129  /*
130  * The following two fields are set by leader process before executing
131  * parallel index vacuum or parallel index cleanup. These fields are not
132  * fixed for the entire VACUUM operation. They are only fixed for an
133  * individual parallel index vacuum and cleanup.
134  *
135  * parallel_workers_can_process is true if both leader and worker can
136  * process the index, otherwise only leader can process it.
137  */
140 
141  /*
142  * Individual worker or leader stores the result of index vacuum or
143  * cleanup.
144  */
145  bool istat_updated; /* are the stats updated? */
148 
149 /*
150  * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
151  */
153 {
154  /* NULL for worker processes */
156 
157  /* Parent Heap Relation */
159 
160  /* Target indexes */
162  int nindexes;
163 
164  /* Shared information among parallel vacuum workers */
166 
167  /*
168  * Shared index statistics among parallel vacuum workers. The array
169  * element is allocated for every index, even those indexes where parallel
170  * index vacuuming is unsafe or not worthwhile (e.g.,
171  * will_parallel_vacuum[] is false). During parallel vacuum,
172  * IndexBulkDeleteResult of each index is kept in DSM and is copied into
173  * local memory at the end of parallel vacuum.
174  */
176 
177  /* Shared dead items space among parallel vacuum workers */
179 
180  /* Points to buffer usage area in DSM */
182 
183  /* Points to WAL usage area in DSM */
185 
186  /*
187  * False if the index is totally unsuitable target for all parallel
188  * processing. For example, the index could be <
189  * min_parallel_index_scan_size cutoff.
190  */
192 
193  /*
194  * The number of indexes that support parallel index bulk-deletion and
195  * parallel index cleanup respectively.
196  */
200 
201  /* Buffer access strategy used by leader process */
203 
204  /*
205  * Error reporting state. The error callback is set only for workers
206  * processes during parallel index vacuum.
207  */
209  char *relname;
210  char *indname;
212 };
213 
214 static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
215  bool *will_parallel_vacuum);
216 static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
217  bool vacuum);
221  PVIndStats *indstats);
222 static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
223  bool vacuum);
224 static void parallel_vacuum_error_callback(void *arg);
225 
226 /*
227  * Try to enter parallel mode and create a parallel context. Then initialize
228  * shared memory state.
229  *
230  * On success, return parallel vacuum state. Otherwise return NULL.
231  */
233 parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
234  int nrequested_workers, int max_items,
235  int elevel, BufferAccessStrategy bstrategy)
236 {
237  ParallelVacuumState *pvs;
238  ParallelContext *pcxt;
239  PVShared *shared;
240  VacDeadItems *dead_items;
241  PVIndStats *indstats;
242  BufferUsage *buffer_usage;
243  WalUsage *wal_usage;
244  bool *will_parallel_vacuum;
245  Size est_indstats_len;
246  Size est_shared_len;
247  Size est_dead_items_len;
248  int nindexes_mwm = 0;
249  int parallel_workers = 0;
250  int querylen;
251 
252  /*
253  * A parallel vacuum must be requested and there must be indexes on the
254  * relation
255  */
256  Assert(nrequested_workers >= 0);
257  Assert(nindexes > 0);
258 
259  /*
260  * Compute the number of parallel vacuum workers to launch
261  */
262  will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
263  parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
264  nrequested_workers,
265  will_parallel_vacuum);
266  if (parallel_workers <= 0)
267  {
268  /* Can't perform vacuum in parallel -- return NULL */
269  pfree(will_parallel_vacuum);
270  return NULL;
271  }
272 
274  pvs->indrels = indrels;
275  pvs->nindexes = nindexes;
276  pvs->will_parallel_vacuum = will_parallel_vacuum;
277  pvs->bstrategy = bstrategy;
278  pvs->heaprel = rel;
279 
281  pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
282  parallel_workers);
283  Assert(pcxt->nworkers > 0);
284  pvs->pcxt = pcxt;
285 
286  /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
287  est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
288  shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
289  shm_toc_estimate_keys(&pcxt->estimator, 1);
290 
291  /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
292  est_shared_len = sizeof(PVShared);
293  shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
294  shm_toc_estimate_keys(&pcxt->estimator, 1);
295 
296  /* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */
297  est_dead_items_len = vac_max_items_to_alloc_size(max_items);
298  shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len);
299  shm_toc_estimate_keys(&pcxt->estimator, 1);
300 
301  /*
302  * Estimate space for BufferUsage and WalUsage --
303  * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
304  *
305  * If there are no extensions loaded that care, we could skip this. We
306  * have no way of knowing whether anyone's looking at pgBufferUsage or
307  * pgWalUsage, so do it unconditionally.
308  */
310  mul_size(sizeof(BufferUsage), pcxt->nworkers));
311  shm_toc_estimate_keys(&pcxt->estimator, 1);
313  mul_size(sizeof(WalUsage), pcxt->nworkers));
314  shm_toc_estimate_keys(&pcxt->estimator, 1);
315 
316  /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
317  if (debug_query_string)
318  {
319  querylen = strlen(debug_query_string);
320  shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
321  shm_toc_estimate_keys(&pcxt->estimator, 1);
322  }
323  else
324  querylen = 0; /* keep compiler quiet */
325 
326  InitializeParallelDSM(pcxt);
327 
328  /* Prepare index vacuum stats */
329  indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
330  MemSet(indstats, 0, est_indstats_len);
331  for (int i = 0; i < nindexes; i++)
332  {
333  Relation indrel = indrels[i];
334  uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
335 
336  /*
337  * Cleanup option should be either disabled, always performing in
338  * parallel or conditionally performing in parallel.
339  */
340  Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
341  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
342  Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
343 
344  if (!will_parallel_vacuum[i])
345  continue;
346 
347  if (indrel->rd_indam->amusemaintenanceworkmem)
348  nindexes_mwm++;
349 
350  /*
351  * Remember the number of indexes that support parallel operation for
352  * each phase.
353  */
354  if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
356  if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
358  if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
360  }
362  pvs->indstats = indstats;
363 
364  /* Prepare shared information */
365  shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
366  MemSet(shared, 0, est_shared_len);
367  shared->relid = RelationGetRelid(rel);
368  shared->elevel = elevel;
370  (nindexes_mwm > 0) ?
371  maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
373 
374  /* Use the same buffer size for all workers */
375  shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
376 
377  pg_atomic_init_u32(&(shared->cost_balance), 0);
378  pg_atomic_init_u32(&(shared->active_nworkers), 0);
379  pg_atomic_init_u32(&(shared->idx), 0);
380 
382  pvs->shared = shared;
383 
384  /* Prepare the dead_items space */
385  dead_items = (VacDeadItems *) shm_toc_allocate(pcxt->toc,
386  est_dead_items_len);
387  dead_items->max_items = max_items;
388  dead_items->num_items = 0;
389  MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items);
391  pvs->dead_items = dead_items;
392 
393  /*
394  * Allocate space for each worker's BufferUsage and WalUsage; no need to
395  * initialize
396  */
397  buffer_usage = shm_toc_allocate(pcxt->toc,
398  mul_size(sizeof(BufferUsage), pcxt->nworkers));
400  pvs->buffer_usage = buffer_usage;
401  wal_usage = shm_toc_allocate(pcxt->toc,
402  mul_size(sizeof(WalUsage), pcxt->nworkers));
404  pvs->wal_usage = wal_usage;
405 
406  /* Store query string for workers */
407  if (debug_query_string)
408  {
409  char *sharedquery;
410 
411  sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
412  memcpy(sharedquery, debug_query_string, querylen + 1);
413  sharedquery[querylen] = '\0';
414  shm_toc_insert(pcxt->toc,
415  PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
416  }
417 
418  /* Success -- return parallel vacuum state */
419  return pvs;
420 }
421 
422 /*
423  * Destroy the parallel context, and end parallel mode.
424  *
425  * Since writes are not allowed during parallel mode, copy the
426  * updated index statistics from DSM into local memory and then later use that
427  * to update the index statistics. One might think that we can exit from
428  * parallel mode, update the index statistics and then destroy parallel
429  * context, but that won't be safe (see ExitParallelMode).
430  */
431 void
433 {
435 
436  /* Copy the updated statistics */
437  for (int i = 0; i < pvs->nindexes; i++)
438  {
439  PVIndStats *indstats = &(pvs->indstats[i]);
440 
441  if (indstats->istat_updated)
442  {
443  istats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
444  memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
445  }
446  else
447  istats[i] = NULL;
448  }
449 
452 
454  pfree(pvs);
455 }
456 
457 /* Returns the dead items space */
458 VacDeadItems *
460 {
461  return pvs->dead_items;
462 }
463 
464 /*
465  * Do parallel index bulk-deletion with parallel workers.
466  */
467 void
469  int num_index_scans)
470 {
472 
473  /*
474  * We can only provide an approximate value of num_heap_tuples, at least
475  * for now.
476  */
477  pvs->shared->reltuples = num_table_tuples;
478  pvs->shared->estimated_count = true;
479 
480  parallel_vacuum_process_all_indexes(pvs, num_index_scans, true);
481 }
482 
483 /*
484  * Do parallel index cleanup with parallel workers.
485  */
486 void
488  int num_index_scans, bool estimated_count)
489 {
491 
492  /*
493  * We can provide a better estimate of total number of surviving tuples
494  * (we assume indexes are more interested in that than in the number of
495  * nominally live tuples).
496  */
497  pvs->shared->reltuples = num_table_tuples;
498  pvs->shared->estimated_count = estimated_count;
499 
500  parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
501 }
502 
503 /*
504  * Compute the number of parallel worker processes to request. Both index
505  * vacuum and index cleanup can be executed with parallel workers.
506  * The index is eligible for parallel vacuum iff its size is greater than
507  * min_parallel_index_scan_size as invoking workers for very small indexes
508  * can hurt performance.
509  *
510  * nrequested is the number of parallel workers that user requested. If
511  * nrequested is 0, we compute the parallel degree based on nindexes, that is
512  * the number of indexes that support parallel vacuum. This function also
513  * sets will_parallel_vacuum to remember indexes that participate in parallel
514  * vacuum.
515  */
516 static int
517 parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
518  bool *will_parallel_vacuum)
519 {
520  int nindexes_parallel = 0;
521  int nindexes_parallel_bulkdel = 0;
522  int nindexes_parallel_cleanup = 0;
523  int parallel_workers;
524 
525  /*
526  * We don't allow performing parallel operation in standalone backend or
527  * when parallelism is disabled.
528  */
530  return 0;
531 
532  /*
533  * Compute the number of indexes that can participate in parallel vacuum.
534  */
535  for (int i = 0; i < nindexes; i++)
536  {
537  Relation indrel = indrels[i];
538  uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
539 
540  /* Skip index that is not a suitable target for parallel index vacuum */
541  if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
543  continue;
544 
545  will_parallel_vacuum[i] = true;
546 
547  if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
548  nindexes_parallel_bulkdel++;
549  if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
550  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
551  nindexes_parallel_cleanup++;
552  }
553 
554  nindexes_parallel = Max(nindexes_parallel_bulkdel,
555  nindexes_parallel_cleanup);
556 
557  /* The leader process takes one index */
558  nindexes_parallel--;
559 
560  /* No index supports parallel vacuum */
561  if (nindexes_parallel <= 0)
562  return 0;
563 
564  /* Compute the parallel degree */
565  parallel_workers = (nrequested > 0) ?
566  Min(nrequested, nindexes_parallel) : nindexes_parallel;
567 
568  /* Cap by max_parallel_maintenance_workers */
569  parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
570 
571  return parallel_workers;
572 }
573 
574 /*
575  * Perform index vacuum or index cleanup with parallel workers. This function
576  * must be used by the parallel vacuum leader process.
577  */
578 static void
580  bool vacuum)
581 {
582  int nworkers;
583  PVIndVacStatus new_status;
584 
586 
587  if (vacuum)
588  {
590 
591  /* Determine the number of parallel workers to launch */
592  nworkers = pvs->nindexes_parallel_bulkdel;
593  }
594  else
595  {
597 
598  /* Determine the number of parallel workers to launch */
599  nworkers = pvs->nindexes_parallel_cleanup;
600 
601  /* Add conditionally parallel-aware indexes if in the first time call */
602  if (num_index_scans == 0)
603  nworkers += pvs->nindexes_parallel_condcleanup;
604  }
605 
606  /* The leader process will participate */
607  nworkers--;
608 
609  /*
610  * It is possible that parallel context is initialized with fewer workers
611  * than the number of indexes that need a separate worker in the current
612  * phase, so we need to consider it. See
613  * parallel_vacuum_compute_workers().
614  */
615  nworkers = Min(nworkers, pvs->pcxt->nworkers);
616 
617  /*
618  * Set index vacuum status and mark whether parallel vacuum worker can
619  * process it.
620  */
621  for (int i = 0; i < pvs->nindexes; i++)
622  {
623  PVIndStats *indstats = &(pvs->indstats[i]);
624 
626  indstats->status = new_status;
627  indstats->parallel_workers_can_process =
628  (pvs->will_parallel_vacuum[i] &&
630  num_index_scans,
631  vacuum));
632  }
633 
634  /* Reset the parallel index processing counter */
635  pg_atomic_write_u32(&(pvs->shared->idx), 0);
636 
637  /* Setup the shared cost-based vacuum delay and launch workers */
638  if (nworkers > 0)
639  {
640  /* Reinitialize parallel context to relaunch parallel workers */
641  if (num_index_scans > 0)
643 
644  /*
645  * Set up shared cost balance and the number of active workers for
646  * vacuum delay. We need to do this before launching workers as
647  * otherwise, they might not see the updated values for these
648  * parameters.
649  */
652 
653  /*
654  * The number of workers can vary between bulkdelete and cleanup
655  * phase.
656  */
657  ReinitializeParallelWorkers(pvs->pcxt, nworkers);
658 
660 
661  if (pvs->pcxt->nworkers_launched > 0)
662  {
663  /*
664  * Reset the local cost values for leader backend as we have
665  * already accumulated the remaining balance of heap.
666  */
667  VacuumCostBalance = 0;
669 
670  /* Enable shared cost balance for leader backend */
673  }
674 
675  if (vacuum)
676  ereport(pvs->shared->elevel,
677  (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
678  "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
679  pvs->pcxt->nworkers_launched),
680  pvs->pcxt->nworkers_launched, nworkers)));
681  else
682  ereport(pvs->shared->elevel,
683  (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
684  "launched %d parallel vacuum workers for index cleanup (planned: %d)",
685  pvs->pcxt->nworkers_launched),
686  pvs->pcxt->nworkers_launched, nworkers)));
687  }
688 
689  /* Vacuum the indexes that can be processed by only leader process */
691 
692  /*
693  * Join as a parallel worker. The leader vacuums alone processes all
694  * parallel-safe indexes in the case where no workers are launched.
695  */
697 
698  /*
699  * Next, accumulate buffer and WAL usage. (This must wait for the workers
700  * to finish, or we might get incomplete data.)
701  */
702  if (nworkers > 0)
703  {
704  /* Wait for all vacuum workers to finish */
706 
707  for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
709  }
710 
711  /*
712  * Reset all index status back to initial (while checking that we have
713  * vacuumed all indexes).
714  */
715  for (int i = 0; i < pvs->nindexes; i++)
716  {
717  PVIndStats *indstats = &(pvs->indstats[i]);
718 
719  if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
720  elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
722 
724  }
725 
726  /*
727  * Carry the shared balance value to heap scan and disable shared costing
728  */
730  {
733  VacuumActiveNWorkers = NULL;
734  }
735 }
736 
737 /*
738  * Index vacuum/cleanup routine used by the leader process and parallel
739  * vacuum worker processes to vacuum the indexes in parallel.
740  */
741 static void
743 {
744  /*
745  * Increment the active worker count if we are able to launch any worker.
746  */
749 
750  /* Loop until all indexes are vacuumed */
751  for (;;)
752  {
753  int idx;
754  PVIndStats *indstats;
755 
756  /* Get an index number to process */
757  idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
758 
759  /* Done for all indexes? */
760  if (idx >= pvs->nindexes)
761  break;
762 
763  indstats = &(pvs->indstats[idx]);
764 
765  /*
766  * Skip vacuuming index that is unsafe for workers or has an
767  * unsuitable target for parallel index vacuum (this is vacuumed in
768  * parallel_vacuum_process_unsafe_indexes() by the leader).
769  */
770  if (!indstats->parallel_workers_can_process)
771  continue;
772 
773  /* Do vacuum or cleanup of the index */
774  parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
775  }
776 
777  /*
778  * We have completed the index vacuum so decrement the active worker
779  * count.
780  */
783 }
784 
785 /*
786  * Perform parallel vacuuming of indexes in leader process.
787  *
788  * Handles index vacuuming (or index cleanup) for indexes that are not
789  * parallel safe. It's possible that this will vary for a given index, based
790  * on details like whether we're performing index cleanup right now.
791  *
792  * Also performs vacuuming of smaller indexes that fell under the size cutoff
793  * enforced by parallel_vacuum_compute_workers().
794  */
795 static void
797 {
799 
800  /*
801  * Increment the active worker count if we are able to launch any worker.
802  */
805 
806  for (int i = 0; i < pvs->nindexes; i++)
807  {
808  PVIndStats *indstats = &(pvs->indstats[i]);
809 
810  /* Skip, indexes that are safe for workers */
811  if (indstats->parallel_workers_can_process)
812  continue;
813 
814  /* Do vacuum or cleanup of the index */
815  parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
816  }
817 
818  /*
819  * We have completed the index vacuum so decrement the active worker
820  * count.
821  */
824 }
825 
826 /*
827  * Vacuum or cleanup index either by leader process or by one of the worker
828  * process. After vacuuming the index this function copies the index
829  * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
830  * segment.
831  */
832 static void
834  PVIndStats *indstats)
835 {
836  IndexBulkDeleteResult *istat = NULL;
837  IndexBulkDeleteResult *istat_res;
838  IndexVacuumInfo ivinfo;
839 
840  /*
841  * Update the pointer to the corresponding bulk-deletion result if someone
842  * has already updated it
843  */
844  if (indstats->istat_updated)
845  istat = &(indstats->istat);
846 
847  ivinfo.index = indrel;
848  ivinfo.heaprel = pvs->heaprel;
849  ivinfo.analyze_only = false;
850  ivinfo.report_progress = false;
851  ivinfo.message_level = DEBUG2;
852  ivinfo.estimated_count = pvs->shared->estimated_count;
853  ivinfo.num_heap_tuples = pvs->shared->reltuples;
854  ivinfo.strategy = pvs->bstrategy;
855 
856  /* Update error traceback information */
857  pvs->indname = pstrdup(RelationGetRelationName(indrel));
858  pvs->status = indstats->status;
859 
860  switch (indstats->status)
861  {
863  istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items);
864  break;
866  istat_res = vac_cleanup_one_index(&ivinfo, istat);
867  break;
868  default:
869  elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
870  indstats->status,
871  RelationGetRelationName(indrel));
872  }
873 
874  /*
875  * Copy the index bulk-deletion result returned from ambulkdelete and
876  * amvacuumcleanup to the DSM segment if it's the first cycle because they
877  * allocate locally and it's possible that an index will be vacuumed by a
878  * different vacuum process the next cycle. Copying the result normally
879  * happens only the first time an index is vacuumed. For any additional
880  * vacuum pass, we directly point to the result on the DSM segment and
881  * pass it to vacuum index APIs so that workers can update it directly.
882  *
883  * Since all vacuum workers write the bulk-deletion result at different
884  * slots we can write them without locking.
885  */
886  if (!indstats->istat_updated && istat_res != NULL)
887  {
888  memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
889  indstats->istat_updated = true;
890 
891  /* Free the locally-allocated bulk-deletion result */
892  pfree(istat_res);
893  }
894 
895  /*
896  * Update the status to completed. No need to lock here since each worker
897  * touches different indexes.
898  */
900 
901  /* Reset error traceback information */
903  pfree(pvs->indname);
904  pvs->indname = NULL;
905 }
906 
907 /*
908  * Returns false, if the given index can't participate in the next execution of
909  * parallel index vacuum or parallel index cleanup.
910  */
911 static bool
913  bool vacuum)
914 {
915  uint8 vacoptions;
916 
917  vacoptions = indrel->rd_indam->amparallelvacuumoptions;
918 
919  /* In parallel vacuum case, check if it supports parallel bulk-deletion */
920  if (vacuum)
921  return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
922 
923  /* Not safe, if the index does not support parallel cleanup */
924  if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
925  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
926  return false;
927 
928  /*
929  * Not safe, if the index supports parallel cleanup conditionally, but we
930  * have already processed the index (for bulkdelete). We do this to avoid
931  * the need to invoke workers when parallel index cleanup doesn't need to
932  * scan the index. See the comments for option
933  * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
934  * parallel cleanup conditionally.
935  */
936  if (num_index_scans > 0 &&
937  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
938  return false;
939 
940  return true;
941 }
942 
943 /*
944  * Perform work within a launched parallel process.
945  *
946  * Since parallel vacuum workers perform only index vacuum or index cleanup,
947  * we don't need to report progress information.
948  */
949 void
951 {
953  Relation rel;
954  Relation *indrels;
955  PVIndStats *indstats;
956  PVShared *shared;
957  VacDeadItems *dead_items;
958  BufferUsage *buffer_usage;
959  WalUsage *wal_usage;
960  int nindexes;
961  char *sharedquery;
962  ErrorContextCallback errcallback;
963 
964  /*
965  * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
966  * don't support parallel vacuum for autovacuum as of now.
967  */
969 
970  elog(DEBUG1, "starting parallel vacuum worker");
971 
972  shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
973 
974  /* Set debug_query_string for individual workers */
975  sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
976  debug_query_string = sharedquery;
978 
979  /*
980  * Open table. The lock mode is the same as the leader process. It's
981  * okay because the lock mode does not conflict among the parallel
982  * workers.
983  */
984  rel = table_open(shared->relid, ShareUpdateExclusiveLock);
985 
986  /*
987  * Open all indexes. indrels are sorted in order by OID, which should be
988  * matched to the leader's one.
989  */
990  vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
991  Assert(nindexes > 0);
992 
993  if (shared->maintenance_work_mem_worker > 0)
995 
996  /* Set index statistics */
997  indstats = (PVIndStats *) shm_toc_lookup(toc,
999  false);
1000 
1001  /* Set dead_items space */
1002  dead_items = (VacDeadItems *) shm_toc_lookup(toc,
1004  false);
1005 
1006  /* Set cost-based vacuum delay */
1008  VacuumCostBalance = 0;
1009  VacuumPageHit = 0;
1010  VacuumPageMiss = 0;
1011  VacuumPageDirty = 0;
1015 
1016  /* Set parallel vacuum state */
1017  pvs.indrels = indrels;
1018  pvs.nindexes = nindexes;
1019  pvs.indstats = indstats;
1020  pvs.shared = shared;
1021  pvs.dead_items = dead_items;
1024  pvs.heaprel = rel;
1025 
1026  /* These fields will be filled during index vacuum or cleanup */
1027  pvs.indname = NULL;
1029 
1030  /* Each parallel VACUUM worker gets its own access strategy. */
1032  shared->ring_nbuffers * (BLCKSZ / 1024));
1033 
1034  /* Setup error traceback support for ereport() */
1036  errcallback.arg = &pvs;
1037  errcallback.previous = error_context_stack;
1038  error_context_stack = &errcallback;
1039 
1040  /* Prepare to track buffer usage during parallel execution */
1042 
1043  /* Process indexes to perform vacuum/cleanup */
1045 
1046  /* Report buffer/WAL usage during parallel execution */
1047  buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
1048  wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
1050  &wal_usage[ParallelWorkerNumber]);
1051 
1052  /* Pop the error context stack */
1053  error_context_stack = errcallback.previous;
1054 
1055  vac_close_indexes(nindexes, indrels, RowExclusiveLock);
1058 }
1059 
1060 /*
1061  * Error context callback for errors occurring during parallel index vacuum.
1062  * The error context messages should match the messages set in the lazy vacuum
1063  * error context. If you change this function, change vacuum_error_callback()
1064  * as well.
1065  */
1066 static void
1068 {
1069  ParallelVacuumState *errinfo = arg;
1070 
1071  switch (errinfo->status)
1072  {
1074  errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
1075  errinfo->indname,
1076  errinfo->relnamespace,
1077  errinfo->relname);
1078  break;
1080  errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
1081  errinfo->indname,
1082  errinfo->relnamespace,
1083  errinfo->relname);
1084  break;
1087  default:
1088  return;
1089  }
1090 }
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:259
int min_parallel_index_scan_size
Definition: allpaths.c:85
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition: atomics.h:396
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:218
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:323
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:381
static void pg_atomic_write_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:253
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition: atomics.h:236
void VacuumUpdateCosts(void)
Definition: autovacuum.c:1786
int ParallelWorkerNumber
Definition: parallel.c:113
void InitializeParallelDSM(ParallelContext *pcxt)
Definition: parallel.c:203
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
Definition: parallel.c:773
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition: parallel.c:550
void ReinitializeParallelDSM(ParallelContext *pcxt)
Definition: parallel.c:486
void DestroyParallelContext(ParallelContext *pcxt)
Definition: parallel.c:927
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
Definition: parallel.c:165
void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
Definition: parallel.c:536
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
@ BAS_VACUUM
Definition: bufmgr.h:38
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:227
#define Min(x, y)
Definition: c.h:988
#define ngettext(s, p, n)
Definition: c.h:1189
#define Max(x, y)
Definition: c.h:982
unsigned char uint8
Definition: c.h:488
#define MemSet(start, val, len)
Definition: c.h:1004
size_t Size
Definition: c.h:589
ErrorContextCallback * error_context_stack
Definition: elog.c:95
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define errcontext
Definition: elog.h:196
#define DEBUG2
Definition: elog.h:29
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
BufferAccessStrategy GetAccessStrategyWithSize(BufferAccessStrategyType btype, int ring_size_kb)
Definition: freelist.c:584
int GetAccessStrategyBufferCount(BufferAccessStrategy strategy)
Definition: freelist.c:624
void FreeAccessStrategy(BufferAccessStrategy strategy)
Definition: freelist.c:639
int64 VacuumPageHit
Definition: globals.c:151
int max_parallel_maintenance_workers
Definition: globals.c:128
int64 VacuumPageMiss
Definition: globals.c:152
bool IsUnderPostmaster
Definition: globals.c:113
int64 VacuumPageDirty
Definition: globals.c:153
int VacuumCostBalance
Definition: globals.c:155
int maintenance_work_mem
Definition: globals.c:127
#define IsParallelWorker()
Definition: parallel.h:61
void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition: instrument.c:218
void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition: instrument.c:208
void InstrStartParallelQuery(void)
Definition: instrument.c:200
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3324
char * pstrdup(const char *in)
Definition: mcxt.c:1644
void pfree(void *pointer)
Definition: mcxt.c:1456
void * palloc0(Size size)
Definition: mcxt.c:1257
void * arg
const char * debug_query_string
Definition: postgres.c:85
unsigned int Oid
Definition: postgres_ext.h:31
#define PROC_IN_VACUUM
Definition: proc.h:57
#define RelationGetRelid(relation)
Definition: rel.h:504
#define RelationGetRelationName(relation)
Definition: rel.h:538
#define RelationGetNamespace(relation)
Definition: rel.h:545
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
PGPROC * MyProc
Definition: proc.c:66
struct ErrorContextCallback * previous
Definition: elog.h:295
void(* callback)(void *arg)
Definition: elog.h:296
bool amusemaintenanceworkmem
Definition: amapi.h:246
uint8 amparallelvacuumoptions
Definition: amapi.h:250
Relation index
Definition: genam.h:46
double num_heap_tuples
Definition: genam.h:52
bool analyze_only
Definition: genam.h:48
BufferAccessStrategy strategy
Definition: genam.h:53
Relation heaprel
Definition: genam.h:47
bool report_progress
Definition: genam.h:49
int message_level
Definition: genam.h:51
bool estimated_count
Definition: genam.h:50
uint8 statusFlags
Definition: proc.h:233
bool istat_updated
IndexBulkDeleteResult istat
bool parallel_workers_can_process
PVIndVacStatus status
double reltuples
pg_atomic_uint32 cost_balance
int maintenance_work_mem_worker
pg_atomic_uint32 active_nworkers
int ring_nbuffers
pg_atomic_uint32 idx
bool estimated_count
shm_toc_estimator estimator
Definition: parallel.h:42
shm_toc * toc
Definition: parallel.h:45
int nworkers_launched
Definition: parallel.h:38
BufferAccessStrategy bstrategy
BufferUsage * buffer_usage
ParallelContext * pcxt
PVIndStats * indstats
VacDeadItems * dead_items
PVIndVacStatus status
struct IndexAmRoutine * rd_indam
Definition: rel.h:205
ItemPointerData items[FLEXIBLE_ARRAY_MEMBER]
Definition: vacuum.h:292
int max_items
Definition: vacuum.h:288
int num_items
Definition: vacuum.h:289
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
IndexBulkDeleteResult * vac_bulkdel_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat, VacDeadItems *dead_items)
Definition: vacuum.c:2480
pg_atomic_uint32 * VacuumActiveNWorkers
Definition: vacuum.c:106
Size vac_max_items_to_alloc_size(int max_items)
Definition: vacuum.c:2526
void vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel)
Definition: vacuum.c:2262
void vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, MemoryContext vac_context, bool isTopLevel)
Definition: vacuum.c:479
int VacuumCostBalanceLocal
Definition: vacuum.c:107
void vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
Definition: vacuum.c:2305
pg_atomic_uint32 * VacuumSharedCostBalance
Definition: vacuum.c:105
IndexBulkDeleteResult * vac_cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat)
Definition: vacuum.c:2501
#define VACUUM_OPTION_PARALLEL_CLEANUP
Definition: vacuum.h:62
#define VACUUM_OPTION_NO_PARALLEL
Definition: vacuum.h:41
#define VACUUM_OPTION_PARALLEL_BULKDEL
Definition: vacuum.h:47
#define VACUUM_OPTION_MAX_VALID_VALUE
Definition: vacuum.h:65
#define VACUUM_OPTION_PARALLEL_COND_CLEANUP
Definition: vacuum.h:54
static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, bool vacuum)
static void parallel_vacuum_error_callback(void *arg)
static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, bool *will_parallel_vacuum)
#define PARALLEL_VACUUM_KEY_INDEX_STATS
#define PARALLEL_VACUUM_KEY_QUERY_TEXT
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE
VacDeadItems * parallel_vacuum_get_dead_items(ParallelVacuumState *pvs)
#define PARALLEL_VACUUM_KEY_SHARED
void parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans)
#define PARALLEL_VACUUM_KEY_WAL_USAGE
void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, bool estimated_count)
static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, PVIndStats *indstats)
void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
struct PVShared PVShared
static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, bool vacuum)
static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
#define PARALLEL_VACUUM_KEY_DEAD_ITEMS
struct PVIndStats PVIndStats
void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
ParallelVacuumState * parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int max_items, int elevel, BufferAccessStrategy bstrategy)
PVIndVacStatus
@ PARALLEL_INDVAC_STATUS_NEED_CLEANUP
@ PARALLEL_INDVAC_STATUS_INITIAL
@ PARALLEL_INDVAC_STATUS_NEED_BULKDELETE
@ PARALLEL_INDVAC_STATUS_COMPLETED
void ExitParallelMode(void)
Definition: xact.c:1049
void EnterParallelMode(void)
Definition: xact.c:1036