PostgreSQL Source Code  git master
vacuumparallel.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * vacuumparallel.c
4  * Support routines for parallel vacuum execution.
5  *
6  * This file contains routines that are intended to support setting up, using,
7  * and tearing down a ParallelVacuumState.
8  *
9  * In a parallel vacuum, we perform both index bulk deletion and index cleanup
10  * with parallel worker processes. Individual indexes are processed by one
11  * vacuum process. ParallelVacuumState contains shared information as well as
12  * the memory space for storing dead items allocated in the DSM segment. We
13  * launch parallel worker processes at the start of parallel index
14  * bulk-deletion and index cleanup and once all indexes are processed, the
15  * parallel worker processes exit. Each time we process indexes in parallel,
16  * the parallel context is re-initialized so that the same DSM can be used for
17  * multiple passes of index bulk-deletion and index cleanup.
18  *
19  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
20  * Portions Copyright (c) 1994, Regents of the University of California
21  *
22  * IDENTIFICATION
23  * src/backend/commands/vacuumparallel.c
24  *
25  *-------------------------------------------------------------------------
26  */
27 #include "postgres.h"
28 
29 #include "access/amapi.h"
30 #include "access/table.h"
31 #include "access/xact.h"
32 #include "commands/progress.h"
33 #include "commands/vacuum.h"
34 #include "executor/instrument.h"
35 #include "optimizer/paths.h"
36 #include "pgstat.h"
37 #include "storage/bufmgr.h"
38 #include "tcop/tcopprot.h"
39 #include "utils/lsyscache.h"
40 #include "utils/rel.h"
41 
42 /*
43  * DSM keys for parallel vacuum. Unlike other parallel execution code, since
44  * we don't need to worry about DSM keys conflicting with plan_node_id we can
45  * use small integers.
46  */
47 #define PARALLEL_VACUUM_KEY_SHARED 1
48 #define PARALLEL_VACUUM_KEY_DEAD_ITEMS 2
49 #define PARALLEL_VACUUM_KEY_QUERY_TEXT 3
50 #define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4
51 #define PARALLEL_VACUUM_KEY_WAL_USAGE 5
52 #define PARALLEL_VACUUM_KEY_INDEX_STATS 6
53 
54 /*
55  * Shared information among parallel workers. So this is allocated in the DSM
56  * segment.
57  */
58 typedef struct PVShared
59 {
60  /*
61  * Target table relid and log level (for messages about parallel workers
62  * launched during VACUUM VERBOSE). These fields are not modified during
63  * the parallel vacuum.
64  */
66  int elevel;
67 
68  /*
69  * Fields for both index vacuum and cleanup.
70  *
71  * reltuples is the total number of input heap tuples. We set either old
72  * live tuples in the index vacuum case or the new live tuples in the
73  * index cleanup case.
74  *
75  * estimated_count is true if reltuples is an estimated value. (Note that
76  * reltuples could be -1 in this case, indicating we have no idea.)
77  */
78  double reltuples;
80 
81  /*
82  * In single process vacuum we could consume more memory during index
83  * vacuuming or cleanup apart from the memory for heap scanning. In
84  * parallel vacuum, since individual vacuum workers can consume memory
85  * equal to maintenance_work_mem, the new maintenance_work_mem for each
86  * worker is set such that the parallel operation doesn't consume more
87  * memory than single process vacuum.
88  */
90 
91  /*
92  * The number of buffers each worker's Buffer Access Strategy ring should
93  * contain.
94  */
96 
97  /*
98  * Shared vacuum cost balance. During parallel vacuum,
99  * VacuumSharedCostBalance points to this value and it accumulates the
100  * balance of each parallel vacuum worker.
101  */
103 
104  /*
105  * Number of active parallel workers. This is used for computing the
106  * minimum threshold of the vacuum cost balance before a worker sleeps for
107  * cost-based delay.
108  */
110 
111  /* Counter for vacuuming and cleanup */
114 
115 /* Status used during parallel index vacuum or cleanup */
116 typedef enum PVIndVacStatus
117 {
123 
124 /*
125  * Struct for index vacuum statistics of an index that is used for parallel vacuum.
126  * This includes the status of parallel index vacuum as well as index statistics.
127  */
128 typedef struct PVIndStats
129 {
130  /*
131  * The following two fields are set by leader process before executing
132  * parallel index vacuum or parallel index cleanup. These fields are not
133  * fixed for the entire VACUUM operation. They are only fixed for an
134  * individual parallel index vacuum and cleanup.
135  *
136  * parallel_workers_can_process is true if both leader and worker can
137  * process the index, otherwise only leader can process it.
138  */
141 
142  /*
143  * Individual worker or leader stores the result of index vacuum or
144  * cleanup.
145  */
146  bool istat_updated; /* are the stats updated? */
149 
150 /*
151  * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
152  */
154 {
155  /* NULL for worker processes */
157 
158  /* Parent Heap Relation */
160 
161  /* Target indexes */
163  int nindexes;
164 
165  /* Shared information among parallel vacuum workers */
167 
168  /*
169  * Shared index statistics among parallel vacuum workers. The array
170  * element is allocated for every index, even those indexes where parallel
171  * index vacuuming is unsafe or not worthwhile (e.g.,
172  * will_parallel_vacuum[] is false). During parallel vacuum,
173  * IndexBulkDeleteResult of each index is kept in DSM and is copied into
174  * local memory at the end of parallel vacuum.
175  */
177 
178  /* Shared dead items space among parallel vacuum workers */
180 
181  /* Points to buffer usage area in DSM */
183 
184  /* Points to WAL usage area in DSM */
186 
187  /*
188  * False if the index is totally unsuitable target for all parallel
189  * processing. For example, the index could be <
190  * min_parallel_index_scan_size cutoff.
191  */
193 
194  /*
195  * The number of indexes that support parallel index bulk-deletion and
196  * parallel index cleanup respectively.
197  */
201 
202  /* Buffer access strategy used by leader process */
204 
205  /*
206  * Error reporting state. The error callback is set only for workers
207  * processes during parallel index vacuum.
208  */
210  char *relname;
211  char *indname;
213 };
214 
215 static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
216  bool *will_parallel_vacuum);
217 static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
218  bool vacuum);
222  PVIndStats *indstats);
223 static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
224  bool vacuum);
225 static void parallel_vacuum_error_callback(void *arg);
226 
227 /*
228  * Try to enter parallel mode and create a parallel context. Then initialize
229  * shared memory state.
230  *
231  * On success, return parallel vacuum state. Otherwise return NULL.
232  */
234 parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
235  int nrequested_workers, int max_items,
236  int elevel, BufferAccessStrategy bstrategy)
237 {
238  ParallelVacuumState *pvs;
239  ParallelContext *pcxt;
240  PVShared *shared;
241  VacDeadItems *dead_items;
242  PVIndStats *indstats;
243  BufferUsage *buffer_usage;
244  WalUsage *wal_usage;
245  bool *will_parallel_vacuum;
246  Size est_indstats_len;
247  Size est_shared_len;
248  Size est_dead_items_len;
249  int nindexes_mwm = 0;
250  int parallel_workers = 0;
251  int querylen;
252 
253  /*
254  * A parallel vacuum must be requested and there must be indexes on the
255  * relation
256  */
257  Assert(nrequested_workers >= 0);
258  Assert(nindexes > 0);
259 
260  /*
261  * Compute the number of parallel vacuum workers to launch
262  */
263  will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
264  parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
265  nrequested_workers,
266  will_parallel_vacuum);
267  if (parallel_workers <= 0)
268  {
269  /* Can't perform vacuum in parallel -- return NULL */
270  pfree(will_parallel_vacuum);
271  return NULL;
272  }
273 
275  pvs->indrels = indrels;
276  pvs->nindexes = nindexes;
277  pvs->will_parallel_vacuum = will_parallel_vacuum;
278  pvs->bstrategy = bstrategy;
279  pvs->heaprel = rel;
280 
282  pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
283  parallel_workers);
284  Assert(pcxt->nworkers > 0);
285  pvs->pcxt = pcxt;
286 
287  /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
288  est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
289  shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
290  shm_toc_estimate_keys(&pcxt->estimator, 1);
291 
292  /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
293  est_shared_len = sizeof(PVShared);
294  shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
295  shm_toc_estimate_keys(&pcxt->estimator, 1);
296 
297  /* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */
298  est_dead_items_len = vac_max_items_to_alloc_size(max_items);
299  shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len);
300  shm_toc_estimate_keys(&pcxt->estimator, 1);
301 
302  /*
303  * Estimate space for BufferUsage and WalUsage --
304  * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
305  *
306  * If there are no extensions loaded that care, we could skip this. We
307  * have no way of knowing whether anyone's looking at pgBufferUsage or
308  * pgWalUsage, so do it unconditionally.
309  */
311  mul_size(sizeof(BufferUsage), pcxt->nworkers));
312  shm_toc_estimate_keys(&pcxt->estimator, 1);
314  mul_size(sizeof(WalUsage), pcxt->nworkers));
315  shm_toc_estimate_keys(&pcxt->estimator, 1);
316 
317  /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
318  if (debug_query_string)
319  {
320  querylen = strlen(debug_query_string);
321  shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
322  shm_toc_estimate_keys(&pcxt->estimator, 1);
323  }
324  else
325  querylen = 0; /* keep compiler quiet */
326 
327  InitializeParallelDSM(pcxt);
328 
329  /* Prepare index vacuum stats */
330  indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
331  MemSet(indstats, 0, est_indstats_len);
332  for (int i = 0; i < nindexes; i++)
333  {
334  Relation indrel = indrels[i];
335  uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
336 
337  /*
338  * Cleanup option should be either disabled, always performing in
339  * parallel or conditionally performing in parallel.
340  */
341  Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
342  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
343  Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
344 
345  if (!will_parallel_vacuum[i])
346  continue;
347 
348  if (indrel->rd_indam->amusemaintenanceworkmem)
349  nindexes_mwm++;
350 
351  /*
352  * Remember the number of indexes that support parallel operation for
353  * each phase.
354  */
355  if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
357  if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
359  if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
361  }
363  pvs->indstats = indstats;
364 
365  /* Prepare shared information */
366  shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
367  MemSet(shared, 0, est_shared_len);
368  shared->relid = RelationGetRelid(rel);
369  shared->elevel = elevel;
371  (nindexes_mwm > 0) ?
372  maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
374 
375  /* Use the same buffer size for all workers */
376  shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
377 
378  pg_atomic_init_u32(&(shared->cost_balance), 0);
379  pg_atomic_init_u32(&(shared->active_nworkers), 0);
380  pg_atomic_init_u32(&(shared->idx), 0);
381 
383  pvs->shared = shared;
384 
385  /* Prepare the dead_items space */
386  dead_items = (VacDeadItems *) shm_toc_allocate(pcxt->toc,
387  est_dead_items_len);
388  dead_items->max_items = max_items;
389  dead_items->num_items = 0;
390  MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items);
392  pvs->dead_items = dead_items;
393 
394  /*
395  * Allocate space for each worker's BufferUsage and WalUsage; no need to
396  * initialize
397  */
398  buffer_usage = shm_toc_allocate(pcxt->toc,
399  mul_size(sizeof(BufferUsage), pcxt->nworkers));
401  pvs->buffer_usage = buffer_usage;
402  wal_usage = shm_toc_allocate(pcxt->toc,
403  mul_size(sizeof(WalUsage), pcxt->nworkers));
405  pvs->wal_usage = wal_usage;
406 
407  /* Store query string for workers */
408  if (debug_query_string)
409  {
410  char *sharedquery;
411 
412  sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
413  memcpy(sharedquery, debug_query_string, querylen + 1);
414  sharedquery[querylen] = '\0';
415  shm_toc_insert(pcxt->toc,
416  PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
417  }
418 
419  /* Success -- return parallel vacuum state */
420  return pvs;
421 }
422 
423 /*
424  * Destroy the parallel context, and end parallel mode.
425  *
426  * Since writes are not allowed during parallel mode, copy the
427  * updated index statistics from DSM into local memory and then later use that
428  * to update the index statistics. One might think that we can exit from
429  * parallel mode, update the index statistics and then destroy parallel
430  * context, but that won't be safe (see ExitParallelMode).
431  */
432 void
434 {
436 
437  /* Copy the updated statistics */
438  for (int i = 0; i < pvs->nindexes; i++)
439  {
440  PVIndStats *indstats = &(pvs->indstats[i]);
441 
442  if (indstats->istat_updated)
443  {
444  istats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
445  memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
446  }
447  else
448  istats[i] = NULL;
449  }
450 
453 
455  pfree(pvs);
456 }
457 
458 /* Returns the dead items space */
459 VacDeadItems *
461 {
462  return pvs->dead_items;
463 }
464 
465 /*
466  * Do parallel index bulk-deletion with parallel workers.
467  */
468 void
470  int num_index_scans)
471 {
473 
474  /*
475  * We can only provide an approximate value of num_heap_tuples, at least
476  * for now.
477  */
478  pvs->shared->reltuples = num_table_tuples;
479  pvs->shared->estimated_count = true;
480 
481  parallel_vacuum_process_all_indexes(pvs, num_index_scans, true);
482 }
483 
484 /*
485  * Do parallel index cleanup with parallel workers.
486  */
487 void
489  int num_index_scans, bool estimated_count)
490 {
492 
493  /*
494  * We can provide a better estimate of total number of surviving tuples
495  * (we assume indexes are more interested in that than in the number of
496  * nominally live tuples).
497  */
498  pvs->shared->reltuples = num_table_tuples;
499  pvs->shared->estimated_count = estimated_count;
500 
501  parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
502 }
503 
504 /*
505  * Compute the number of parallel worker processes to request. Both index
506  * vacuum and index cleanup can be executed with parallel workers.
507  * The index is eligible for parallel vacuum iff its size is greater than
508  * min_parallel_index_scan_size as invoking workers for very small indexes
509  * can hurt performance.
510  *
511  * nrequested is the number of parallel workers that user requested. If
512  * nrequested is 0, we compute the parallel degree based on nindexes, that is
513  * the number of indexes that support parallel vacuum. This function also
514  * sets will_parallel_vacuum to remember indexes that participate in parallel
515  * vacuum.
516  */
517 static int
518 parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
519  bool *will_parallel_vacuum)
520 {
521  int nindexes_parallel = 0;
522  int nindexes_parallel_bulkdel = 0;
523  int nindexes_parallel_cleanup = 0;
524  int parallel_workers;
525 
526  /*
527  * We don't allow performing parallel operation in standalone backend or
528  * when parallelism is disabled.
529  */
531  return 0;
532 
533  /*
534  * Compute the number of indexes that can participate in parallel vacuum.
535  */
536  for (int i = 0; i < nindexes; i++)
537  {
538  Relation indrel = indrels[i];
539  uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
540 
541  /* Skip index that is not a suitable target for parallel index vacuum */
542  if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
544  continue;
545 
546  will_parallel_vacuum[i] = true;
547 
548  if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
549  nindexes_parallel_bulkdel++;
550  if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
551  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
552  nindexes_parallel_cleanup++;
553  }
554 
555  nindexes_parallel = Max(nindexes_parallel_bulkdel,
556  nindexes_parallel_cleanup);
557 
558  /* The leader process takes one index */
559  nindexes_parallel--;
560 
561  /* No index supports parallel vacuum */
562  if (nindexes_parallel <= 0)
563  return 0;
564 
565  /* Compute the parallel degree */
566  parallel_workers = (nrequested > 0) ?
567  Min(nrequested, nindexes_parallel) : nindexes_parallel;
568 
569  /* Cap by max_parallel_maintenance_workers */
570  parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
571 
572  return parallel_workers;
573 }
574 
575 /*
576  * Perform index vacuum or index cleanup with parallel workers. This function
577  * must be used by the parallel vacuum leader process.
578  */
579 static void
581  bool vacuum)
582 {
583  int nworkers;
584  PVIndVacStatus new_status;
585 
587 
588  if (vacuum)
589  {
591 
592  /* Determine the number of parallel workers to launch */
593  nworkers = pvs->nindexes_parallel_bulkdel;
594  }
595  else
596  {
598 
599  /* Determine the number of parallel workers to launch */
600  nworkers = pvs->nindexes_parallel_cleanup;
601 
602  /* Add conditionally parallel-aware indexes if in the first time call */
603  if (num_index_scans == 0)
604  nworkers += pvs->nindexes_parallel_condcleanup;
605  }
606 
607  /* The leader process will participate */
608  nworkers--;
609 
610  /*
611  * It is possible that parallel context is initialized with fewer workers
612  * than the number of indexes that need a separate worker in the current
613  * phase, so we need to consider it. See
614  * parallel_vacuum_compute_workers().
615  */
616  nworkers = Min(nworkers, pvs->pcxt->nworkers);
617 
618  /*
619  * Set index vacuum status and mark whether parallel vacuum worker can
620  * process it.
621  */
622  for (int i = 0; i < pvs->nindexes; i++)
623  {
624  PVIndStats *indstats = &(pvs->indstats[i]);
625 
627  indstats->status = new_status;
628  indstats->parallel_workers_can_process =
629  (pvs->will_parallel_vacuum[i] &&
631  num_index_scans,
632  vacuum));
633  }
634 
635  /* Reset the parallel index processing and progress counters */
636  pg_atomic_write_u32(&(pvs->shared->idx), 0);
637 
638  /* Setup the shared cost-based vacuum delay and launch workers */
639  if (nworkers > 0)
640  {
641  /* Reinitialize parallel context to relaunch parallel workers */
642  if (num_index_scans > 0)
644 
645  /*
646  * Set up shared cost balance and the number of active workers for
647  * vacuum delay. We need to do this before launching workers as
648  * otherwise, they might not see the updated values for these
649  * parameters.
650  */
653 
654  /*
655  * The number of workers can vary between bulkdelete and cleanup
656  * phase.
657  */
658  ReinitializeParallelWorkers(pvs->pcxt, nworkers);
659 
661 
662  if (pvs->pcxt->nworkers_launched > 0)
663  {
664  /*
665  * Reset the local cost values for leader backend as we have
666  * already accumulated the remaining balance of heap.
667  */
668  VacuumCostBalance = 0;
670 
671  /* Enable shared cost balance for leader backend */
674  }
675 
676  if (vacuum)
677  ereport(pvs->shared->elevel,
678  (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
679  "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
680  pvs->pcxt->nworkers_launched),
681  pvs->pcxt->nworkers_launched, nworkers)));
682  else
683  ereport(pvs->shared->elevel,
684  (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
685  "launched %d parallel vacuum workers for index cleanup (planned: %d)",
686  pvs->pcxt->nworkers_launched),
687  pvs->pcxt->nworkers_launched, nworkers)));
688  }
689 
690  /* Vacuum the indexes that can be processed by only leader process */
692 
693  /*
694  * Join as a parallel worker. The leader vacuums alone processes all
695  * parallel-safe indexes in the case where no workers are launched.
696  */
698 
699  /*
700  * Next, accumulate buffer and WAL usage. (This must wait for the workers
701  * to finish, or we might get incomplete data.)
702  */
703  if (nworkers > 0)
704  {
705  /* Wait for all vacuum workers to finish */
707 
708  for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
710  }
711 
712  /*
713  * Reset all index status back to initial (while checking that we have
714  * vacuumed all indexes).
715  */
716  for (int i = 0; i < pvs->nindexes; i++)
717  {
718  PVIndStats *indstats = &(pvs->indstats[i]);
719 
720  if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
721  elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
723 
725  }
726 
727  /*
728  * Carry the shared balance value to heap scan and disable shared costing
729  */
731  {
734  VacuumActiveNWorkers = NULL;
735  }
736 }
737 
738 /*
739  * Index vacuum/cleanup routine used by the leader process and parallel
740  * vacuum worker processes to vacuum the indexes in parallel.
741  */
742 static void
744 {
745  /*
746  * Increment the active worker count if we are able to launch any worker.
747  */
750 
751  /* Loop until all indexes are vacuumed */
752  for (;;)
753  {
754  int idx;
755  PVIndStats *indstats;
756 
757  /* Get an index number to process */
758  idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
759 
760  /* Done for all indexes? */
761  if (idx >= pvs->nindexes)
762  break;
763 
764  indstats = &(pvs->indstats[idx]);
765 
766  /*
767  * Skip vacuuming index that is unsafe for workers or has an
768  * unsuitable target for parallel index vacuum (this is vacuumed in
769  * parallel_vacuum_process_unsafe_indexes() by the leader).
770  */
771  if (!indstats->parallel_workers_can_process)
772  continue;
773 
774  /* Do vacuum or cleanup of the index */
775  parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
776  }
777 
778  /*
779  * We have completed the index vacuum so decrement the active worker
780  * count.
781  */
784 }
785 
786 /*
787  * Perform parallel vacuuming of indexes in leader process.
788  *
789  * Handles index vacuuming (or index cleanup) for indexes that are not
790  * parallel safe. It's possible that this will vary for a given index, based
791  * on details like whether we're performing index cleanup right now.
792  *
793  * Also performs vacuuming of smaller indexes that fell under the size cutoff
794  * enforced by parallel_vacuum_compute_workers().
795  */
796 static void
798 {
800 
801  /*
802  * Increment the active worker count if we are able to launch any worker.
803  */
806 
807  for (int i = 0; i < pvs->nindexes; i++)
808  {
809  PVIndStats *indstats = &(pvs->indstats[i]);
810 
811  /* Skip, indexes that are safe for workers */
812  if (indstats->parallel_workers_can_process)
813  continue;
814 
815  /* Do vacuum or cleanup of the index */
816  parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
817  }
818 
819  /*
820  * We have completed the index vacuum so decrement the active worker
821  * count.
822  */
825 }
826 
827 /*
828  * Vacuum or cleanup index either by leader process or by one of the worker
829  * process. After vacuuming the index this function copies the index
830  * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
831  * segment.
832  */
833 static void
835  PVIndStats *indstats)
836 {
837  IndexBulkDeleteResult *istat = NULL;
838  IndexBulkDeleteResult *istat_res;
839  IndexVacuumInfo ivinfo;
840 
841  /*
842  * Update the pointer to the corresponding bulk-deletion result if someone
843  * has already updated it
844  */
845  if (indstats->istat_updated)
846  istat = &(indstats->istat);
847 
848  ivinfo.index = indrel;
849  ivinfo.heaprel = pvs->heaprel;
850  ivinfo.analyze_only = false;
851  ivinfo.report_progress = false;
852  ivinfo.message_level = DEBUG2;
853  ivinfo.estimated_count = pvs->shared->estimated_count;
854  ivinfo.num_heap_tuples = pvs->shared->reltuples;
855  ivinfo.strategy = pvs->bstrategy;
856 
857  /* Update error traceback information */
858  pvs->indname = pstrdup(RelationGetRelationName(indrel));
859  pvs->status = indstats->status;
860 
861  switch (indstats->status)
862  {
864  istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items);
865  break;
867  istat_res = vac_cleanup_one_index(&ivinfo, istat);
868  break;
869  default:
870  elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
871  indstats->status,
872  RelationGetRelationName(indrel));
873  }
874 
875  /*
876  * Copy the index bulk-deletion result returned from ambulkdelete and
877  * amvacuumcleanup to the DSM segment if it's the first cycle because they
878  * allocate locally and it's possible that an index will be vacuumed by a
879  * different vacuum process the next cycle. Copying the result normally
880  * happens only the first time an index is vacuumed. For any additional
881  * vacuum pass, we directly point to the result on the DSM segment and
882  * pass it to vacuum index APIs so that workers can update it directly.
883  *
884  * Since all vacuum workers write the bulk-deletion result at different
885  * slots we can write them without locking.
886  */
887  if (!indstats->istat_updated && istat_res != NULL)
888  {
889  memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
890  indstats->istat_updated = true;
891 
892  /* Free the locally-allocated bulk-deletion result */
893  pfree(istat_res);
894  }
895 
896  /*
897  * Update the status to completed. No need to lock here since each worker
898  * touches different indexes.
899  */
901 
902  /* Reset error traceback information */
904  pfree(pvs->indname);
905  pvs->indname = NULL;
906 
907  /*
908  * Call the parallel variant of pgstat_progress_incr_param so workers can
909  * report progress of index vacuum to the leader.
910  */
912 }
913 
914 /*
915  * Returns false, if the given index can't participate in the next execution of
916  * parallel index vacuum or parallel index cleanup.
917  */
918 static bool
920  bool vacuum)
921 {
922  uint8 vacoptions;
923 
924  vacoptions = indrel->rd_indam->amparallelvacuumoptions;
925 
926  /* In parallel vacuum case, check if it supports parallel bulk-deletion */
927  if (vacuum)
928  return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
929 
930  /* Not safe, if the index does not support parallel cleanup */
931  if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
932  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
933  return false;
934 
935  /*
936  * Not safe, if the index supports parallel cleanup conditionally, but we
937  * have already processed the index (for bulkdelete). We do this to avoid
938  * the need to invoke workers when parallel index cleanup doesn't need to
939  * scan the index. See the comments for option
940  * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
941  * parallel cleanup conditionally.
942  */
943  if (num_index_scans > 0 &&
944  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
945  return false;
946 
947  return true;
948 }
949 
950 /*
951  * Perform work within a launched parallel process.
952  *
953  * Since parallel vacuum workers perform only index vacuum or index cleanup,
954  * we don't need to report progress information.
955  */
956 void
958 {
960  Relation rel;
961  Relation *indrels;
962  PVIndStats *indstats;
963  PVShared *shared;
964  VacDeadItems *dead_items;
965  BufferUsage *buffer_usage;
966  WalUsage *wal_usage;
967  int nindexes;
968  char *sharedquery;
969  ErrorContextCallback errcallback;
970 
971  /*
972  * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
973  * don't support parallel vacuum for autovacuum as of now.
974  */
976 
977  elog(DEBUG1, "starting parallel vacuum worker");
978 
979  shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
980 
981  /* Set debug_query_string for individual workers */
982  sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
983  debug_query_string = sharedquery;
985 
986  /*
987  * Open table. The lock mode is the same as the leader process. It's
988  * okay because the lock mode does not conflict among the parallel
989  * workers.
990  */
991  rel = table_open(shared->relid, ShareUpdateExclusiveLock);
992 
993  /*
994  * Open all indexes. indrels are sorted in order by OID, which should be
995  * matched to the leader's one.
996  */
997  vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
998  Assert(nindexes > 0);
999 
1000  if (shared->maintenance_work_mem_worker > 0)
1002 
1003  /* Set index statistics */
1004  indstats = (PVIndStats *) shm_toc_lookup(toc,
1006  false);
1007 
1008  /* Set dead_items space */
1009  dead_items = (VacDeadItems *) shm_toc_lookup(toc,
1011  false);
1012 
1013  /* Set cost-based vacuum delay */
1015  VacuumCostBalance = 0;
1016  VacuumPageHit = 0;
1017  VacuumPageMiss = 0;
1018  VacuumPageDirty = 0;
1022 
1023  /* Set parallel vacuum state */
1024  pvs.indrels = indrels;
1025  pvs.nindexes = nindexes;
1026  pvs.indstats = indstats;
1027  pvs.shared = shared;
1028  pvs.dead_items = dead_items;
1031  pvs.heaprel = rel;
1032 
1033  /* These fields will be filled during index vacuum or cleanup */
1034  pvs.indname = NULL;
1036 
1037  /* Each parallel VACUUM worker gets its own access strategy. */
1039  shared->ring_nbuffers * (BLCKSZ / 1024));
1040 
1041  /* Setup error traceback support for ereport() */
1043  errcallback.arg = &pvs;
1044  errcallback.previous = error_context_stack;
1045  error_context_stack = &errcallback;
1046 
1047  /* Prepare to track buffer usage during parallel execution */
1049 
1050  /* Process indexes to perform vacuum/cleanup */
1052 
1053  /* Report buffer/WAL usage during parallel execution */
1054  buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
1055  wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
1057  &wal_usage[ParallelWorkerNumber]);
1058 
1059  /* Pop the error context stack */
1060  error_context_stack = errcallback.previous;
1061 
1062  vac_close_indexes(nindexes, indrels, RowExclusiveLock);
1065 }
1066 
1067 /*
1068  * Error context callback for errors occurring during parallel index vacuum.
1069  * The error context messages should match the messages set in the lazy vacuum
1070  * error context. If you change this function, change vacuum_error_callback()
1071  * as well.
1072  */
1073 static void
1075 {
1076  ParallelVacuumState *errinfo = arg;
1077 
1078  switch (errinfo->status)
1079  {
1081  errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
1082  errinfo->indname,
1083  errinfo->relnamespace,
1084  errinfo->relname);
1085  break;
1087  errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
1088  errinfo->indname,
1089  errinfo->relnamespace,
1090  errinfo->relname);
1091  break;
1094  default:
1095  return;
1096  }
1097 }
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:259
int min_parallel_index_scan_size
Definition: allpaths.c:82
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition: atomics.h:434
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:216
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:361
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:419
static void pg_atomic_write_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:271
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition: atomics.h:234
void VacuumUpdateCosts(void)
Definition: autovacuum.c:1632
int ParallelWorkerNumber
Definition: parallel.c:112
void InitializeParallelDSM(ParallelContext *pcxt)
Definition: parallel.c:205
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
Definition: parallel.c:775
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition: parallel.c:552
void ReinitializeParallelDSM(ParallelContext *pcxt)
Definition: parallel.c:488
void DestroyParallelContext(ParallelContext *pcxt)
Definition: parallel.c:929
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
Definition: parallel.c:167
void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
Definition: parallel.c:538
void pgstat_progress_parallel_incr_param(int index, int64 incr)
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
@ BAS_VACUUM
Definition: bufmgr.h:38
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:229
#define Min(x, y)
Definition: c.h:991
#define ngettext(s, p, n)
Definition: c.h:1168
#define Max(x, y)
Definition: c.h:985
unsigned char uint8
Definition: c.h:491
#define MemSet(start, val, len)
Definition: c.h:1007
size_t Size
Definition: c.h:592
ErrorContextCallback * error_context_stack
Definition: elog.c:94
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define errcontext
Definition: elog.h:196
#define DEBUG2
Definition: elog.h:29
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
#define ereport(elevel,...)
Definition: elog.h:149
BufferAccessStrategy GetAccessStrategyWithSize(BufferAccessStrategyType btype, int ring_size_kb)
Definition: freelist.c:584
int GetAccessStrategyBufferCount(BufferAccessStrategy strategy)
Definition: freelist.c:624
void FreeAccessStrategy(BufferAccessStrategy strategy)
Definition: freelist.c:639
int64 VacuumPageHit
Definition: globals.c:154
int max_parallel_maintenance_workers
Definition: globals.c:131
int64 VacuumPageMiss
Definition: globals.c:155
bool IsUnderPostmaster
Definition: globals.c:117
int64 VacuumPageDirty
Definition: globals.c:156
int VacuumCostBalance
Definition: globals.c:158
int maintenance_work_mem
Definition: globals.c:130
#define IsParallelWorker()
Definition: parallel.h:60
void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition: instrument.c:218
void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition: instrument.c:208
void InstrStartParallelQuery(void)
Definition: instrument.c:200
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3322
char * pstrdup(const char *in)
Definition: mcxt.c:1683
void pfree(void *pointer)
Definition: mcxt.c:1508
void * palloc0(Size size)
Definition: mcxt.c:1334
void * arg
const char * debug_query_string
Definition: postgres.c:87
unsigned int Oid
Definition: postgres_ext.h:31
#define PROC_IN_VACUUM
Definition: proc.h:58
#define PROGRESS_VACUUM_INDEXES_PROCESSED
Definition: progress.h:29
#define RelationGetRelid(relation)
Definition: rel.h:505
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RelationGetNamespace(relation)
Definition: rel.h:546
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
PGPROC * MyProc
Definition: proc.c:66
struct ErrorContextCallback * previous
Definition: elog.h:295
void(* callback)(void *arg)
Definition: elog.h:296
bool amusemaintenanceworkmem
Definition: amapi.h:251
uint8 amparallelvacuumoptions
Definition: amapi.h:255
Relation index
Definition: genam.h:46
double num_heap_tuples
Definition: genam.h:52
bool analyze_only
Definition: genam.h:48
BufferAccessStrategy strategy
Definition: genam.h:53
Relation heaprel
Definition: genam.h:47
bool report_progress
Definition: genam.h:49
int message_level
Definition: genam.h:51
bool estimated_count
Definition: genam.h:50
uint8 statusFlags
Definition: proc.h:238
bool istat_updated
IndexBulkDeleteResult istat
bool parallel_workers_can_process
PVIndVacStatus status
double reltuples
pg_atomic_uint32 cost_balance
int maintenance_work_mem_worker
pg_atomic_uint32 active_nworkers
int ring_nbuffers
pg_atomic_uint32 idx
bool estimated_count
shm_toc_estimator estimator
Definition: parallel.h:41
shm_toc * toc
Definition: parallel.h:44
int nworkers_launched
Definition: parallel.h:37
BufferAccessStrategy bstrategy
BufferUsage * buffer_usage
ParallelContext * pcxt
PVIndStats * indstats
VacDeadItems * dead_items
PVIndVacStatus status
struct IndexAmRoutine * rd_indam
Definition: rel.h:206
ItemPointerData items[FLEXIBLE_ARRAY_MEMBER]
Definition: vacuum.h:289
int max_items
Definition: vacuum.h:285
int num_items
Definition: vacuum.h:286
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
IndexBulkDeleteResult * vac_bulkdel_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat, VacDeadItems *dead_items)
Definition: vacuum.c:2491
pg_atomic_uint32 * VacuumActiveNWorkers
Definition: vacuum.c:103
Size vac_max_items_to_alloc_size(int max_items)
Definition: vacuum.c:2537
void vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel)
Definition: vacuum.c:2273
void vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, MemoryContext vac_context, bool isTopLevel)
Definition: vacuum.c:479
int VacuumCostBalanceLocal
Definition: vacuum.c:104
void vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
Definition: vacuum.c:2316
pg_atomic_uint32 * VacuumSharedCostBalance
Definition: vacuum.c:102
IndexBulkDeleteResult * vac_cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat)
Definition: vacuum.c:2512
#define VACUUM_OPTION_PARALLEL_CLEANUP
Definition: vacuum.h:62
#define VACUUM_OPTION_NO_PARALLEL
Definition: vacuum.h:41
#define VACUUM_OPTION_PARALLEL_BULKDEL
Definition: vacuum.h:47
#define VACUUM_OPTION_MAX_VALID_VALUE
Definition: vacuum.h:65
#define VACUUM_OPTION_PARALLEL_COND_CLEANUP
Definition: vacuum.h:54
static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, bool vacuum)
static void parallel_vacuum_error_callback(void *arg)
static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, bool *will_parallel_vacuum)
#define PARALLEL_VACUUM_KEY_INDEX_STATS
#define PARALLEL_VACUUM_KEY_QUERY_TEXT
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE
VacDeadItems * parallel_vacuum_get_dead_items(ParallelVacuumState *pvs)
#define PARALLEL_VACUUM_KEY_SHARED
void parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans)
#define PARALLEL_VACUUM_KEY_WAL_USAGE
void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, bool estimated_count)
static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, PVIndStats *indstats)
void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
struct PVShared PVShared
static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, bool vacuum)
static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
#define PARALLEL_VACUUM_KEY_DEAD_ITEMS
struct PVIndStats PVIndStats
void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
ParallelVacuumState * parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int max_items, int elevel, BufferAccessStrategy bstrategy)
PVIndVacStatus
@ PARALLEL_INDVAC_STATUS_NEED_CLEANUP
@ PARALLEL_INDVAC_STATUS_INITIAL
@ PARALLEL_INDVAC_STATUS_NEED_BULKDELETE
@ PARALLEL_INDVAC_STATUS_COMPLETED
void ExitParallelMode(void)
Definition: xact.c:1050
void EnterParallelMode(void)
Definition: xact.c:1037