PostgreSQL Source Code  git master
vacuumparallel.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * vacuumparallel.c
4  * Support routines for parallel vacuum execution.
5  *
6  * This file contains routines that are intended to support setting up, using,
7  * and tearing down a ParallelVacuumState.
8  *
9  * In a parallel vacuum, we perform both index bulk deletion and index cleanup
10  * with parallel worker processes. Individual indexes are processed by one
11  * vacuum process. ParalleVacuumState contains shared information as well as
12  * the memory space for storing dead items allocated in the DSM segment. We
13  * launch parallel worker processes at the start of parallel index
14  * bulk-deletion and index cleanup and once all indexes are processed, the
15  * parallel worker processes exit. Each time we process indexes in parallel,
16  * the parallel context is re-initialized so that the same DSM can be used for
17  * multiple passes of index bulk-deletion and index cleanup.
18  *
19  * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
20  * Portions Copyright (c) 1994, Regents of the University of California
21  *
22  * IDENTIFICATION
23  * src/backend/commands/vacuumparallel.c
24  *
25  *-------------------------------------------------------------------------
26  */
27 #include "postgres.h"
28 
29 #include "access/amapi.h"
30 #include "access/table.h"
31 #include "access/xact.h"
32 #include "catalog/index.h"
33 #include "commands/vacuum.h"
34 #include "optimizer/paths.h"
35 #include "pgstat.h"
36 #include "storage/bufmgr.h"
37 #include "tcop/tcopprot.h"
38 #include "utils/lsyscache.h"
39 #include "utils/rel.h"
40 
41 /*
42  * DSM keys for parallel vacuum. Unlike other parallel execution code, since
43  * we don't need to worry about DSM keys conflicting with plan_node_id we can
44  * use small integers.
45  */
46 #define PARALLEL_VACUUM_KEY_SHARED 1
47 #define PARALLEL_VACUUM_KEY_DEAD_ITEMS 2
48 #define PARALLEL_VACUUM_KEY_QUERY_TEXT 3
49 #define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4
50 #define PARALLEL_VACUUM_KEY_WAL_USAGE 5
51 #define PARALLEL_VACUUM_KEY_INDEX_STATS 6
52 
53 /*
54  * Shared information among parallel workers. So this is allocated in the DSM
55  * segment.
56  */
57 typedef struct PVShared
58 {
59  /*
60  * Target table relid and log level (for messages about parallel workers
61  * launched during VACUUM VERBOSE). These fields are not modified during
62  * the parallel vacuum.
63  */
65  int elevel;
66 
67  /*
68  * Fields for both index vacuum and cleanup.
69  *
70  * reltuples is the total number of input heap tuples. We set either old
71  * live tuples in the index vacuum case or the new live tuples in the
72  * index cleanup case.
73  *
74  * estimated_count is true if reltuples is an estimated value. (Note that
75  * reltuples could be -1 in this case, indicating we have no idea.)
76  */
77  double reltuples;
79 
80  /*
81  * In single process vacuum we could consume more memory during index
82  * vacuuming or cleanup apart from the memory for heap scanning. In
83  * parallel vacuum, since individual vacuum workers can consume memory
84  * equal to maintenance_work_mem, the new maintenance_work_mem for each
85  * worker is set such that the parallel operation doesn't consume more
86  * memory than single process vacuum.
87  */
89 
90  /*
91  * Shared vacuum cost balance. During parallel vacuum,
92  * VacuumSharedCostBalance points to this value and it accumulates the
93  * balance of each parallel vacuum worker.
94  */
96 
97  /*
98  * Number of active parallel workers. This is used for computing the
99  * minimum threshold of the vacuum cost balance before a worker sleeps for
100  * cost-based delay.
101  */
103 
104  /* Counter for vacuuming and cleanup */
107 
108 /* Status used during parallel index vacuum or cleanup */
109 typedef enum PVIndVacStatus
110 {
116 
117 /*
118  * Struct for index vacuum statistics of an index that is used for parallel vacuum.
119  * This includes the status of parallel index vacuum as well as index statistics.
120  */
121 typedef struct PVIndStats
122 {
123  /*
124  * The following two fields are set by leader process before executing
125  * parallel index vacuum or parallel index cleanup. These fields are not
126  * fixed for the entire VACUUM operation. They are only fixed for an
127  * individual parallel index vacuum and cleanup.
128  *
129  * parallel_workers_can_process is true if both leader and worker can
130  * process the index, otherwise only leader can process it.
131  */
134 
135  /*
136  * Individual worker or leader stores the result of index vacuum or
137  * cleanup.
138  */
139  bool istat_updated; /* are the stats updated? */
142 
143 /*
144  * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
145  */
147 {
148  /* NULL for worker processes */
150 
151  /* Target indexes */
153  int nindexes;
154 
155  /* Shared information among parallel vacuum workers */
157 
158  /*
159  * Shared index statistics among parallel vacuum workers. The array
160  * element is allocated for every index, even those indexes where parallel
161  * index vacuuming is unsafe or not worthwhile (e.g.,
162  * will_parallel_vacuum[] is false). During parallel vacuum,
163  * IndexBulkDeleteResult of each index is kept in DSM and is copied into
164  * local memory at the end of parallel vacuum.
165  */
167 
168  /* Shared dead items space among parallel vacuum workers */
170 
171  /* Points to buffer usage area in DSM */
173 
174  /* Points to WAL usage area in DSM */
176 
177  /*
178  * False if the index is totally unsuitable target for all parallel
179  * processing. For example, the index could be <
180  * min_parallel_index_scan_size cutoff.
181  */
183 
184  /*
185  * The number of indexes that support parallel index bulk-deletion and
186  * parallel index cleanup respectively.
187  */
191 
192  /* Buffer access strategy used by leader process */
194 
195  /*
196  * Error reporting state. The error callback is set only for workers
197  * processes during parallel index vacuum.
198  */
200  char *relname;
201  char *indname;
203 };
204 
205 static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
206  bool *will_parallel_vacuum);
207 static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
208  bool vacuum);
212  PVIndStats *indstats);
213 static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
214  bool vacuum);
215 static void parallel_vacuum_error_callback(void *arg);
216 
217 /*
218  * Try to enter parallel mode and create a parallel context. Then initialize
219  * shared memory state.
220  *
221  * On success, return parallel vacuum state. Otherwise return NULL.
222  */
224 parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
225  int nrequested_workers, int max_items,
226  int elevel, BufferAccessStrategy bstrategy)
227 {
228  ParallelVacuumState *pvs;
229  ParallelContext *pcxt;
230  PVShared *shared;
231  VacDeadItems *dead_items;
232  PVIndStats *indstats;
233  BufferUsage *buffer_usage;
234  WalUsage *wal_usage;
235  bool *will_parallel_vacuum;
236  Size est_indstats_len;
237  Size est_shared_len;
238  Size est_dead_items_len;
239  int nindexes_mwm = 0;
240  int parallel_workers = 0;
241  int querylen;
242 
243  /*
244  * A parallel vacuum must be requested and there must be indexes on the
245  * relation
246  */
247  Assert(nrequested_workers >= 0);
248  Assert(nindexes > 0);
249 
250  /*
251  * Compute the number of parallel vacuum workers to launch
252  */
253  will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
254  parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
255  nrequested_workers,
256  will_parallel_vacuum);
257  if (parallel_workers <= 0)
258  {
259  /* Can't perform vacuum in parallel -- return NULL */
260  pfree(will_parallel_vacuum);
261  return NULL;
262  }
263 
265  pvs->indrels = indrels;
266  pvs->nindexes = nindexes;
267  pvs->will_parallel_vacuum = will_parallel_vacuum;
268  pvs->bstrategy = bstrategy;
269 
271  pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
272  parallel_workers);
273  Assert(pcxt->nworkers > 0);
274  pvs->pcxt = pcxt;
275 
276  /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
277  est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
278  shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
279  shm_toc_estimate_keys(&pcxt->estimator, 1);
280 
281  /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
282  est_shared_len = sizeof(PVShared);
283  shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
284  shm_toc_estimate_keys(&pcxt->estimator, 1);
285 
286  /* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */
287  est_dead_items_len = vac_max_items_to_alloc_size(max_items);
288  shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len);
289  shm_toc_estimate_keys(&pcxt->estimator, 1);
290 
291  /*
292  * Estimate space for BufferUsage and WalUsage --
293  * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
294  *
295  * If there are no extensions loaded that care, we could skip this. We
296  * have no way of knowing whether anyone's looking at pgBufferUsage or
297  * pgWalUsage, so do it unconditionally.
298  */
300  mul_size(sizeof(BufferUsage), pcxt->nworkers));
301  shm_toc_estimate_keys(&pcxt->estimator, 1);
303  mul_size(sizeof(WalUsage), pcxt->nworkers));
304  shm_toc_estimate_keys(&pcxt->estimator, 1);
305 
306  /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
307  if (debug_query_string)
308  {
309  querylen = strlen(debug_query_string);
310  shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
311  shm_toc_estimate_keys(&pcxt->estimator, 1);
312  }
313  else
314  querylen = 0; /* keep compiler quiet */
315 
316  InitializeParallelDSM(pcxt);
317 
318  /* Prepare index vacuum stats */
319  indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
320  MemSet(indstats, 0, est_indstats_len);
321  for (int i = 0; i < nindexes; i++)
322  {
323  Relation indrel = indrels[i];
324  uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
325 
326  /*
327  * Cleanup option should be either disabled, always performing in
328  * parallel or conditionally performing in parallel.
329  */
330  Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
331  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
332  Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
333 
334  if (!will_parallel_vacuum[i])
335  continue;
336 
337  if (indrel->rd_indam->amusemaintenanceworkmem)
338  nindexes_mwm++;
339 
340  /*
341  * Remember the number of indexes that support parallel operation for
342  * each phase.
343  */
344  if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
346  if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
348  if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
350  }
352  pvs->indstats = indstats;
353 
354  /* Prepare shared information */
355  shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
356  MemSet(shared, 0, est_shared_len);
357  shared->relid = RelationGetRelid(rel);
358  shared->elevel = elevel;
360  (nindexes_mwm > 0) ?
361  maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
363 
364  pg_atomic_init_u32(&(shared->cost_balance), 0);
365  pg_atomic_init_u32(&(shared->active_nworkers), 0);
366  pg_atomic_init_u32(&(shared->idx), 0);
367 
369  pvs->shared = shared;
370 
371  /* Prepare the dead_items space */
372  dead_items = (VacDeadItems *) shm_toc_allocate(pcxt->toc,
373  est_dead_items_len);
374  dead_items->max_items = max_items;
375  dead_items->num_items = 0;
376  MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items);
378  pvs->dead_items = dead_items;
379 
380  /*
381  * Allocate space for each worker's BufferUsage and WalUsage; no need to
382  * initialize
383  */
384  buffer_usage = shm_toc_allocate(pcxt->toc,
385  mul_size(sizeof(BufferUsage), pcxt->nworkers));
387  pvs->buffer_usage = buffer_usage;
388  wal_usage = shm_toc_allocate(pcxt->toc,
389  mul_size(sizeof(WalUsage), pcxt->nworkers));
391  pvs->wal_usage = wal_usage;
392 
393  /* Store query string for workers */
394  if (debug_query_string)
395  {
396  char *sharedquery;
397 
398  sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
399  memcpy(sharedquery, debug_query_string, querylen + 1);
400  sharedquery[querylen] = '\0';
401  shm_toc_insert(pcxt->toc,
402  PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
403  }
404 
405  /* Success -- return parallel vacuum state */
406  return pvs;
407 }
408 
409 /*
410  * Destroy the parallel context, and end parallel mode.
411  *
412  * Since writes are not allowed during parallel mode, copy the
413  * updated index statistics from DSM into local memory and then later use that
414  * to update the index statistics. One might think that we can exit from
415  * parallel mode, update the index statistics and then destroy parallel
416  * context, but that won't be safe (see ExitParallelMode).
417  */
418 void
420 {
422 
423  /* Copy the updated statistics */
424  for (int i = 0; i < pvs->nindexes; i++)
425  {
426  PVIndStats *indstats = &(pvs->indstats[i]);
427 
428  if (indstats->istat_updated)
429  {
430  istats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
431  memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
432  }
433  else
434  istats[i] = NULL;
435  }
436 
439 
441  pfree(pvs);
442 }
443 
444 /* Returns the dead items space */
445 VacDeadItems *
447 {
448  return pvs->dead_items;
449 }
450 
451 /*
452  * Do parallel index bulk-deletion with parallel workers.
453  */
454 void
456  int num_index_scans)
457 {
459 
460  /*
461  * We can only provide an approximate value of num_heap_tuples, at least
462  * for now.
463  */
464  pvs->shared->reltuples = num_table_tuples;
465  pvs->shared->estimated_count = true;
466 
467  parallel_vacuum_process_all_indexes(pvs, num_index_scans, true);
468 }
469 
470 /*
471  * Do parallel index cleanup with parallel workers.
472  */
473 void
475  int num_index_scans, bool estimated_count)
476 {
478 
479  /*
480  * We can provide a better estimate of total number of surviving tuples
481  * (we assume indexes are more interested in that than in the number of
482  * nominally live tuples).
483  */
484  pvs->shared->reltuples = num_table_tuples;
485  pvs->shared->estimated_count = estimated_count;
486 
487  parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
488 }
489 
490 /*
491  * Compute the number of parallel worker processes to request. Both index
492  * vacuum and index cleanup can be executed with parallel workers.
493  * The index is eligible for parallel vacuum iff its size is greater than
494  * min_parallel_index_scan_size as invoking workers for very small indexes
495  * can hurt performance.
496  *
497  * nrequested is the number of parallel workers that user requested. If
498  * nrequested is 0, we compute the parallel degree based on nindexes, that is
499  * the number of indexes that support parallel vacuum. This function also
500  * sets will_parallel_vacuum to remember indexes that participate in parallel
501  * vacuum.
502  */
503 static int
504 parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
505  bool *will_parallel_vacuum)
506 {
507  int nindexes_parallel = 0;
508  int nindexes_parallel_bulkdel = 0;
509  int nindexes_parallel_cleanup = 0;
510  int parallel_workers;
511 
512  /*
513  * We don't allow performing parallel operation in standalone backend or
514  * when parallelism is disabled.
515  */
517  return 0;
518 
519  /*
520  * Compute the number of indexes that can participate in parallel vacuum.
521  */
522  for (int i = 0; i < nindexes; i++)
523  {
524  Relation indrel = indrels[i];
525  uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
526 
527  /* Skip index that is not a suitable target for parallel index vacuum */
528  if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
530  continue;
531 
532  will_parallel_vacuum[i] = true;
533 
534  if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
535  nindexes_parallel_bulkdel++;
536  if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
537  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
538  nindexes_parallel_cleanup++;
539  }
540 
541  nindexes_parallel = Max(nindexes_parallel_bulkdel,
542  nindexes_parallel_cleanup);
543 
544  /* The leader process takes one index */
545  nindexes_parallel--;
546 
547  /* No index supports parallel vacuum */
548  if (nindexes_parallel <= 0)
549  return 0;
550 
551  /* Compute the parallel degree */
552  parallel_workers = (nrequested > 0) ?
553  Min(nrequested, nindexes_parallel) : nindexes_parallel;
554 
555  /* Cap by max_parallel_maintenance_workers */
556  parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
557 
558  return parallel_workers;
559 }
560 
561 /*
562  * Perform index vacuum or index cleanup with parallel workers. This function
563  * must be used by the parallel vacuum leader process.
564  */
565 static void
567  bool vacuum)
568 {
569  int nworkers;
570  PVIndVacStatus new_status;
571 
573 
574  if (vacuum)
575  {
577 
578  /* Determine the number of parallel workers to launch */
579  nworkers = pvs->nindexes_parallel_bulkdel;
580  }
581  else
582  {
584 
585  /* Determine the number of parallel workers to launch */
586  nworkers = pvs->nindexes_parallel_cleanup;
587 
588  /* Add conditionally parallel-aware indexes if in the first time call */
589  if (num_index_scans == 0)
590  nworkers += pvs->nindexes_parallel_condcleanup;
591  }
592 
593  /* The leader process will participate */
594  nworkers--;
595 
596  /*
597  * It is possible that parallel context is initialized with fewer workers
598  * than the number of indexes that need a separate worker in the current
599  * phase, so we need to consider it. See
600  * parallel_vacuum_compute_workers().
601  */
602  nworkers = Min(nworkers, pvs->pcxt->nworkers);
603 
604  /*
605  * Set index vacuum status and mark whether parallel vacuum worker can
606  * process it.
607  */
608  for (int i = 0; i < pvs->nindexes; i++)
609  {
610  PVIndStats *indstats = &(pvs->indstats[i]);
611 
613  indstats->status = new_status;
614  indstats->parallel_workers_can_process =
615  (pvs->will_parallel_vacuum[i] &&
617  num_index_scans,
618  vacuum));
619  }
620 
621  /* Reset the parallel index processing counter */
622  pg_atomic_write_u32(&(pvs->shared->idx), 0);
623 
624  /* Setup the shared cost-based vacuum delay and launch workers */
625  if (nworkers > 0)
626  {
627  /* Reinitialize parallel context to relaunch parallel workers */
628  if (num_index_scans > 0)
630 
631  /*
632  * Set up shared cost balance and the number of active workers for
633  * vacuum delay. We need to do this before launching workers as
634  * otherwise, they might not see the updated values for these
635  * parameters.
636  */
639 
640  /*
641  * The number of workers can vary between bulkdelete and cleanup
642  * phase.
643  */
644  ReinitializeParallelWorkers(pvs->pcxt, nworkers);
645 
647 
648  if (pvs->pcxt->nworkers_launched > 0)
649  {
650  /*
651  * Reset the local cost values for leader backend as we have
652  * already accumulated the remaining balance of heap.
653  */
654  VacuumCostBalance = 0;
656 
657  /* Enable shared cost balance for leader backend */
660  }
661 
662  if (vacuum)
663  ereport(pvs->shared->elevel,
664  (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
665  "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
666  pvs->pcxt->nworkers_launched),
667  pvs->pcxt->nworkers_launched, nworkers)));
668  else
669  ereport(pvs->shared->elevel,
670  (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
671  "launched %d parallel vacuum workers for index cleanup (planned: %d)",
672  pvs->pcxt->nworkers_launched),
673  pvs->pcxt->nworkers_launched, nworkers)));
674  }
675 
676  /* Vacuum the indexes that can be processed by only leader process */
678 
679  /*
680  * Join as a parallel worker. The leader vacuums alone processes all
681  * parallel-safe indexes in the case where no workers are launched.
682  */
684 
685  /*
686  * Next, accumulate buffer and WAL usage. (This must wait for the workers
687  * to finish, or we might get incomplete data.)
688  */
689  if (nworkers > 0)
690  {
691  /* Wait for all vacuum workers to finish */
693 
694  for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
696  }
697 
698  /*
699  * Reset all index status back to initial (while checking that we have
700  * vacuumed all indexes).
701  */
702  for (int i = 0; i < pvs->nindexes; i++)
703  {
704  PVIndStats *indstats = &(pvs->indstats[i]);
705 
706  if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
707  elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
709 
711  }
712 
713  /*
714  * Carry the shared balance value to heap scan and disable shared costing
715  */
717  {
720  VacuumActiveNWorkers = NULL;
721  }
722 }
723 
724 /*
725  * Index vacuum/cleanup routine used by the leader process and parallel
726  * vacuum worker processes to vacuum the indexes in parallel.
727  */
728 static void
730 {
731  /*
732  * Increment the active worker count if we are able to launch any worker.
733  */
736 
737  /* Loop until all indexes are vacuumed */
738  for (;;)
739  {
740  int idx;
741  PVIndStats *indstats;
742 
743  /* Get an index number to process */
744  idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
745 
746  /* Done for all indexes? */
747  if (idx >= pvs->nindexes)
748  break;
749 
750  indstats = &(pvs->indstats[idx]);
751 
752  /*
753  * Skip vacuuming index that is unsafe for workers or has an
754  * unsuitable target for parallel index vacuum (this is vacuumed in
755  * parallel_vacuum_process_unsafe_indexes() by the leader).
756  */
757  if (!indstats->parallel_workers_can_process)
758  continue;
759 
760  /* Do vacuum or cleanup of the index */
761  parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
762  }
763 
764  /*
765  * We have completed the index vacuum so decrement the active worker
766  * count.
767  */
770 }
771 
772 /*
773  * Perform parallel vacuuming of indexes in leader process.
774  *
775  * Handles index vacuuming (or index cleanup) for indexes that are not
776  * parallel safe. It's possible that this will vary for a given index, based
777  * on details like whether we're performing index cleanup right now.
778  *
779  * Also performs vacuuming of smaller indexes that fell under the size cutoff
780  * enforced by parallel_vacuum_compute_workers().
781  */
782 static void
784 {
786 
787  /*
788  * Increment the active worker count if we are able to launch any worker.
789  */
792 
793  for (int i = 0; i < pvs->nindexes; i++)
794  {
795  PVIndStats *indstats = &(pvs->indstats[i]);
796 
797  /* Skip, indexes that are safe for workers */
798  if (indstats->parallel_workers_can_process)
799  continue;
800 
801  /* Do vacuum or cleanup of the index */
802  parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
803  }
804 
805  /*
806  * We have completed the index vacuum so decrement the active worker
807  * count.
808  */
811 }
812 
813 /*
814  * Vacuum or cleanup index either by leader process or by one of the worker
815  * process. After vacuuming the index this function copies the index
816  * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
817  * segment.
818  */
819 static void
821  PVIndStats *indstats)
822 {
823  IndexBulkDeleteResult *istat = NULL;
824  IndexBulkDeleteResult *istat_res;
825  IndexVacuumInfo ivinfo;
826 
827  /*
828  * Update the pointer to the corresponding bulk-deletion result if someone
829  * has already updated it
830  */
831  if (indstats->istat_updated)
832  istat = &(indstats->istat);
833 
834  ivinfo.index = indrel;
835  ivinfo.analyze_only = false;
836  ivinfo.report_progress = false;
837  ivinfo.message_level = DEBUG2;
838  ivinfo.estimated_count = pvs->shared->estimated_count;
839  ivinfo.num_heap_tuples = pvs->shared->reltuples;
840  ivinfo.strategy = pvs->bstrategy;
841 
842  /* Update error traceback information */
843  pvs->indname = pstrdup(RelationGetRelationName(indrel));
844  pvs->status = indstats->status;
845 
846  switch (indstats->status)
847  {
849  istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items);
850  break;
852  istat_res = vac_cleanup_one_index(&ivinfo, istat);
853  break;
854  default:
855  elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
856  indstats->status,
857  RelationGetRelationName(indrel));
858  }
859 
860  /*
861  * Copy the index bulk-deletion result returned from ambulkdelete and
862  * amvacuumcleanup to the DSM segment if it's the first cycle because they
863  * allocate locally and it's possible that an index will be vacuumed by a
864  * different vacuum process the next cycle. Copying the result normally
865  * happens only the first time an index is vacuumed. For any additional
866  * vacuum pass, we directly point to the result on the DSM segment and
867  * pass it to vacuum index APIs so that workers can update it directly.
868  *
869  * Since all vacuum workers write the bulk-deletion result at different
870  * slots we can write them without locking.
871  */
872  if (!indstats->istat_updated && istat_res != NULL)
873  {
874  memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
875  indstats->istat_updated = true;
876 
877  /* Free the locally-allocated bulk-deletion result */
878  pfree(istat_res);
879  }
880 
881  /*
882  * Update the status to completed. No need to lock here since each worker
883  * touches different indexes.
884  */
886 
887  /* Reset error traceback information */
889  pfree(pvs->indname);
890  pvs->indname = NULL;
891 }
892 
893 /*
894  * Returns false, if the given index can't participate in the next execution of
895  * parallel index vacuum or parallel index cleanup.
896  */
897 static bool
899  bool vacuum)
900 {
901  uint8 vacoptions;
902 
903  vacoptions = indrel->rd_indam->amparallelvacuumoptions;
904 
905  /* In parallel vacuum case, check if it supports parallel bulk-deletion */
906  if (vacuum)
907  return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
908 
909  /* Not safe, if the index does not support parallel cleanup */
910  if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
911  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
912  return false;
913 
914  /*
915  * Not safe, if the index supports parallel cleanup conditionally, but we
916  * have already processed the index (for bulkdelete). We do this to avoid
917  * the need to invoke workers when parallel index cleanup doesn't need to
918  * scan the index. See the comments for option
919  * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
920  * parallel cleanup conditionally.
921  */
922  if (num_index_scans > 0 &&
923  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
924  return false;
925 
926  return true;
927 }
928 
929 /*
930  * Perform work within a launched parallel process.
931  *
932  * Since parallel vacuum workers perform only index vacuum or index cleanup,
933  * we don't need to report progress information.
934  */
935 void
937 {
939  Relation rel;
940  Relation *indrels;
941  PVIndStats *indstats;
942  PVShared *shared;
943  VacDeadItems *dead_items;
944  BufferUsage *buffer_usage;
945  WalUsage *wal_usage;
946  int nindexes;
947  char *sharedquery;
948  ErrorContextCallback errcallback;
949 
950  /*
951  * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
952  * don't support parallel vacuum for autovacuum as of now.
953  */
955 
956  elog(DEBUG1, "starting parallel vacuum worker");
957 
958  shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
959 
960  /* Set debug_query_string for individual workers */
961  sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
962  debug_query_string = sharedquery;
964 
965  /*
966  * Open table. The lock mode is the same as the leader process. It's
967  * okay because the lock mode does not conflict among the parallel
968  * workers.
969  */
970  rel = table_open(shared->relid, ShareUpdateExclusiveLock);
971 
972  /*
973  * Open all indexes. indrels are sorted in order by OID, which should be
974  * matched to the leader's one.
975  */
976  vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
977  Assert(nindexes > 0);
978 
979  if (shared->maintenance_work_mem_worker > 0)
981 
982  /* Set index statistics */
983  indstats = (PVIndStats *) shm_toc_lookup(toc,
985  false);
986 
987  /* Set dead_items space */
988  dead_items = (VacDeadItems *) shm_toc_lookup(toc,
990  false);
991 
992  /* Set cost-based vacuum delay */
994  VacuumCostBalance = 0;
995  VacuumPageHit = 0;
996  VacuumPageMiss = 0;
997  VacuumPageDirty = 0;
1001 
1002  /* Set parallel vacuum state */
1003  pvs.indrels = indrels;
1004  pvs.nindexes = nindexes;
1005  pvs.indstats = indstats;
1006  pvs.shared = shared;
1007  pvs.dead_items = dead_items;
1010 
1011  /* These fields will be filled during index vacuum or cleanup */
1012  pvs.indname = NULL;
1014 
1015  /* Each parallel VACUUM worker gets its own access strategy */
1017 
1018  /* Setup error traceback support for ereport() */
1020  errcallback.arg = &pvs;
1021  errcallback.previous = error_context_stack;
1022  error_context_stack = &errcallback;
1023 
1024  /* Prepare to track buffer usage during parallel execution */
1026 
1027  /* Process indexes to perform vacuum/cleanup */
1029 
1030  /* Report buffer/WAL usage during parallel execution */
1031  buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
1032  wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
1034  &wal_usage[ParallelWorkerNumber]);
1035 
1036  /* Pop the error context stack */
1037  error_context_stack = errcallback.previous;
1038 
1039  vac_close_indexes(nindexes, indrels, RowExclusiveLock);
1042 }
1043 
1044 /*
1045  * Error context callback for errors occurring during parallel index vacuum.
1046  * The error context messages should match the messages set in the lazy vacuum
1047  * error context. If you change this function, change vacuum_error_callback()
1048  * as well.
1049  */
1050 static void
1052 {
1053  ParallelVacuumState *errinfo = arg;
1054 
1055  switch (errinfo->status)
1056  {
1058  errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
1059  errinfo->indname,
1060  errinfo->relnamespace,
1061  errinfo->relname);
1062  break;
1064  errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
1065  errinfo->indname,
1066  errinfo->relnamespace,
1067  errinfo->relname);
1068  break;
1071  default:
1072  return;
1073  }
1074 }
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:259
int min_parallel_index_scan_size
Definition: allpaths.c:67
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition: atomics.h:396
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:218
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:323
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:381
static void pg_atomic_write_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:253
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition: atomics.h:236
int ParallelWorkerNumber
Definition: parallel.c:113
void InitializeParallelDSM(ParallelContext *pcxt)
Definition: parallel.c:203
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
Definition: parallel.c:773
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition: parallel.c:550
void ReinitializeParallelDSM(ParallelContext *pcxt)
Definition: parallel.c:486
void DestroyParallelContext(ParallelContext *pcxt)
Definition: parallel.c:927
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
Definition: parallel.c:165
void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
Definition: parallel.c:536
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
@ BAS_VACUUM
Definition: bufmgr.h:33
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:147
#define Min(x, y)
Definition: c.h:976
#define ngettext(s, p, n)
Definition: c.h:1165
#define Max(x, y)
Definition: c.h:970
unsigned char uint8
Definition: c.h:440
#define MemSet(start, val, len)
Definition: c.h:998
size_t Size
Definition: c.h:541
ErrorContextCallback * error_context_stack
Definition: elog.c:94
int errmsg(const char *fmt,...)
Definition: elog.c:906
#define errcontext
Definition: elog.h:192
#define DEBUG2
Definition: elog.h:25
#define DEBUG1
Definition: elog.h:26
#define ERROR
Definition: elog.h:35
#define ereport(elevel,...)
Definition: elog.h:145
BufferAccessStrategy GetAccessStrategy(BufferAccessStrategyType btype)
Definition: freelist.c:541
void FreeAccessStrategy(BufferAccessStrategy strategy)
Definition: freelist.c:596
int64 VacuumPageHit
Definition: globals.c:148
int max_parallel_maintenance_workers
Definition: globals.c:128
int64 VacuumPageMiss
Definition: globals.c:149
bool VacuumCostActive
Definition: globals.c:153
bool IsUnderPostmaster
Definition: globals.c:113
int64 VacuumPageDirty
Definition: globals.c:150
int VacuumCostBalance
Definition: globals.c:152
int maintenance_work_mem
Definition: globals.c:127
double VacuumCostDelay
Definition: globals.c:146
#define IsParallelWorker()
Definition: parallel.h:61
void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition: instrument.c:218
void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition: instrument.c:208
void InstrStartParallelQuery(void)
Definition: instrument.c:200
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3327
char * pstrdup(const char *in)
Definition: mcxt.c:1392
void pfree(void *pointer)
Definition: mcxt.c:1252
void * palloc0(Size size)
Definition: mcxt.c:1176
void * arg
const char * debug_query_string
Definition: postgres.c:82
unsigned int Oid
Definition: postgres_ext.h:31
#define PROC_IN_VACUUM
Definition: proc.h:57
#define RelationGetRelid(relation)
Definition: rel.h:501
#define RelationGetRelationName(relation)
Definition: rel.h:535
#define RelationGetNamespace(relation)
Definition: rel.h:542
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
PGPROC * MyProc
Definition: proc.c:68
struct ErrorContextCallback * previous
Definition: elog.h:234
void(* callback)(void *arg)
Definition: elog.h:235
bool amusemaintenanceworkmem
Definition: amapi.h:246
uint8 amparallelvacuumoptions
Definition: amapi.h:248
Relation index
Definition: genam.h:46
double num_heap_tuples
Definition: genam.h:51
bool analyze_only
Definition: genam.h:47
BufferAccessStrategy strategy
Definition: genam.h:52
bool report_progress
Definition: genam.h:48
int message_level
Definition: genam.h:50
bool estimated_count
Definition: genam.h:49
uint8 statusFlags
Definition: proc.h:229
bool istat_updated
IndexBulkDeleteResult istat
bool parallel_workers_can_process
PVIndVacStatus status
double reltuples
pg_atomic_uint32 cost_balance
int maintenance_work_mem_worker
pg_atomic_uint32 active_nworkers
pg_atomic_uint32 idx
bool estimated_count
shm_toc_estimator estimator
Definition: parallel.h:42
shm_toc * toc
Definition: parallel.h:45
int nworkers_launched
Definition: parallel.h:38
BufferAccessStrategy bstrategy
BufferUsage * buffer_usage
ParallelContext * pcxt
PVIndStats * indstats
VacDeadItems * dead_items
PVIndVacStatus status
struct IndexAmRoutine * rd_indam
Definition: rel.h:202
ItemPointerData items[FLEXIBLE_ARRAY_MEMBER]
Definition: vacuum.h:247
int max_items
Definition: vacuum.h:243
int num_items
Definition: vacuum.h:244
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
IndexBulkDeleteResult * vac_bulkdel_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat, VacDeadItems *dead_items)
Definition: vacuum.c:2297
pg_atomic_uint32 * VacuumActiveNWorkers
Definition: vacuum.c:84
Size vac_max_items_to_alloc_size(int max_items)
Definition: vacuum.c:2343
void vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel)
Definition: vacuum.c:2114
int VacuumCostBalanceLocal
Definition: vacuum.c:85
void vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, bool isTopLevel)
Definition: vacuum.c:296
void vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
Definition: vacuum.c:2157
pg_atomic_uint32 * VacuumSharedCostBalance
Definition: vacuum.c:83
IndexBulkDeleteResult * vac_cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat)
Definition: vacuum.c:2318
#define VACUUM_OPTION_PARALLEL_CLEANUP
Definition: vacuum.h:62
#define VACUUM_OPTION_NO_PARALLEL
Definition: vacuum.h:41
#define VACUUM_OPTION_PARALLEL_BULKDEL
Definition: vacuum.h:47
#define VACUUM_OPTION_MAX_VALID_VALUE
Definition: vacuum.h:65
#define VACUUM_OPTION_PARALLEL_COND_CLEANUP
Definition: vacuum.h:54
static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, bool vacuum)
static void parallel_vacuum_error_callback(void *arg)
static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, bool *will_parallel_vacuum)
#define PARALLEL_VACUUM_KEY_INDEX_STATS
#define PARALLEL_VACUUM_KEY_QUERY_TEXT
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE
VacDeadItems * parallel_vacuum_get_dead_items(ParallelVacuumState *pvs)
#define PARALLEL_VACUUM_KEY_SHARED
void parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans)
#define PARALLEL_VACUUM_KEY_WAL_USAGE
void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, bool estimated_count)
static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, PVIndStats *indstats)
void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
struct PVShared PVShared
static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, bool vacuum)
static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
#define PARALLEL_VACUUM_KEY_DEAD_ITEMS
struct PVIndStats PVIndStats
void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
ParallelVacuumState * parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int max_items, int elevel, BufferAccessStrategy bstrategy)
PVIndVacStatus
@ PARALLEL_INDVAC_STATUS_NEED_CLEANUP
@ PARALLEL_INDVAC_STATUS_INITIAL
@ PARALLEL_INDVAC_STATUS_NEED_BULKDELETE
@ PARALLEL_INDVAC_STATUS_COMPLETED
void ExitParallelMode(void)
Definition: xact.c:1045
void EnterParallelMode(void)
Definition: xact.c:1032