PostgreSQL Source Code  git master
vacuumparallel.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * vacuumparallel.c
4  * Support routines for parallel vacuum execution.
5  *
6  * This file contains routines that are intended to support setting up, using,
7  * and tearing down a ParallelVacuumState.
8  *
9  * In a parallel vacuum, we perform both index bulk deletion and index cleanup
10  * with parallel worker processes. Individual indexes are processed by one
11  * vacuum process. ParallelVacuumState contains shared information as well as
12  * the memory space for storing dead items allocated in the DSA area. We
13  * launch parallel worker processes at the start of parallel index
14  * bulk-deletion and index cleanup and once all indexes are processed, the
15  * parallel worker processes exit. Each time we process indexes in parallel,
16  * the parallel context is re-initialized so that the same DSM can be used for
17  * multiple passes of index bulk-deletion and index cleanup.
18  *
19  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
20  * Portions Copyright (c) 1994, Regents of the University of California
21  *
22  * IDENTIFICATION
23  * src/backend/commands/vacuumparallel.c
24  *
25  *-------------------------------------------------------------------------
26  */
27 #include "postgres.h"
28 
29 #include "access/amapi.h"
30 #include "access/table.h"
31 #include "access/xact.h"
32 #include "commands/progress.h"
33 #include "commands/vacuum.h"
34 #include "executor/instrument.h"
35 #include "optimizer/paths.h"
36 #include "pgstat.h"
37 #include "storage/bufmgr.h"
38 #include "tcop/tcopprot.h"
39 #include "utils/lsyscache.h"
40 #include "utils/rel.h"
41 
42 /*
43  * DSM keys for parallel vacuum. Unlike other parallel execution code, since
44  * we don't need to worry about DSM keys conflicting with plan_node_id we can
45  * use small integers.
46  */
47 #define PARALLEL_VACUUM_KEY_SHARED 1
48 #define PARALLEL_VACUUM_KEY_QUERY_TEXT 2
49 #define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3
50 #define PARALLEL_VACUUM_KEY_WAL_USAGE 4
51 #define PARALLEL_VACUUM_KEY_INDEX_STATS 5
52 
53 /*
54  * Shared information among parallel workers. So this is allocated in the DSM
55  * segment.
56  */
57 typedef struct PVShared
58 {
59  /*
60  * Target table relid and log level (for messages about parallel workers
61  * launched during VACUUM VERBOSE). These fields are not modified during
62  * the parallel vacuum.
63  */
65  int elevel;
66 
67  /*
68  * Fields for both index vacuum and cleanup.
69  *
70  * reltuples is the total number of input heap tuples. We set either old
71  * live tuples in the index vacuum case or the new live tuples in the
72  * index cleanup case.
73  *
74  * estimated_count is true if reltuples is an estimated value. (Note that
75  * reltuples could be -1 in this case, indicating we have no idea.)
76  */
77  double reltuples;
79 
80  /*
81  * In single process vacuum we could consume more memory during index
82  * vacuuming or cleanup apart from the memory for heap scanning. In
83  * parallel vacuum, since individual vacuum workers can consume memory
84  * equal to maintenance_work_mem, the new maintenance_work_mem for each
85  * worker is set such that the parallel operation doesn't consume more
86  * memory than single process vacuum.
87  */
89 
90  /*
91  * The number of buffers each worker's Buffer Access Strategy ring should
92  * contain.
93  */
95 
96  /*
97  * Shared vacuum cost balance. During parallel vacuum,
98  * VacuumSharedCostBalance points to this value and it accumulates the
99  * balance of each parallel vacuum worker.
100  */
102 
103  /*
104  * Number of active parallel workers. This is used for computing the
105  * minimum threshold of the vacuum cost balance before a worker sleeps for
106  * cost-based delay.
107  */
109 
110  /* Counter for vacuuming and cleanup */
112 
113  /* DSA handle where the TidStore lives */
115 
116  /* DSA pointer to the shared TidStore */
118 
119  /* Statistics of shared dead items */
122 
123 /* Status used during parallel index vacuum or cleanup */
124 typedef enum PVIndVacStatus
125 {
131 
132 /*
133  * Struct for index vacuum statistics of an index that is used for parallel vacuum.
134  * This includes the status of parallel index vacuum as well as index statistics.
135  */
136 typedef struct PVIndStats
137 {
138  /*
139  * The following two fields are set by leader process before executing
140  * parallel index vacuum or parallel index cleanup. These fields are not
141  * fixed for the entire VACUUM operation. They are only fixed for an
142  * individual parallel index vacuum and cleanup.
143  *
144  * parallel_workers_can_process is true if both leader and worker can
145  * process the index, otherwise only leader can process it.
146  */
149 
150  /*
151  * Individual worker or leader stores the result of index vacuum or
152  * cleanup.
153  */
154  bool istat_updated; /* are the stats updated? */
157 
158 /*
159  * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
160  */
162 {
163  /* NULL for worker processes */
165 
166  /* Parent Heap Relation */
168 
169  /* Target indexes */
171  int nindexes;
172 
173  /* Shared information among parallel vacuum workers */
175 
176  /*
177  * Shared index statistics among parallel vacuum workers. The array
178  * element is allocated for every index, even those indexes where parallel
179  * index vacuuming is unsafe or not worthwhile (e.g.,
180  * will_parallel_vacuum[] is false). During parallel vacuum,
181  * IndexBulkDeleteResult of each index is kept in DSM and is copied into
182  * local memory at the end of parallel vacuum.
183  */
185 
186  /* Shared dead items space among parallel vacuum workers */
188 
189  /* Points to buffer usage area in DSM */
191 
192  /* Points to WAL usage area in DSM */
194 
195  /*
196  * False if the index is totally unsuitable target for all parallel
197  * processing. For example, the index could be <
198  * min_parallel_index_scan_size cutoff.
199  */
201 
202  /*
203  * The number of indexes that support parallel index bulk-deletion and
204  * parallel index cleanup respectively.
205  */
209 
210  /* Buffer access strategy used by leader process */
212 
213  /*
214  * Error reporting state. The error callback is set only for workers
215  * processes during parallel index vacuum.
216  */
218  char *relname;
219  char *indname;
221 };
222 
223 static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
224  bool *will_parallel_vacuum);
225 static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
226  bool vacuum);
230  PVIndStats *indstats);
231 static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
232  bool vacuum);
233 static void parallel_vacuum_error_callback(void *arg);
234 
235 /*
236  * Try to enter parallel mode and create a parallel context. Then initialize
237  * shared memory state.
238  *
239  * On success, return parallel vacuum state. Otherwise return NULL.
240  */
242 parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
243  int nrequested_workers, int vac_work_mem,
244  int elevel, BufferAccessStrategy bstrategy)
245 {
246  ParallelVacuumState *pvs;
247  ParallelContext *pcxt;
248  PVShared *shared;
249  TidStore *dead_items;
250  PVIndStats *indstats;
251  BufferUsage *buffer_usage;
252  WalUsage *wal_usage;
253  bool *will_parallel_vacuum;
254  Size est_indstats_len;
255  Size est_shared_len;
256  int nindexes_mwm = 0;
257  int parallel_workers = 0;
258  int querylen;
259 
260  /*
261  * A parallel vacuum must be requested and there must be indexes on the
262  * relation
263  */
264  Assert(nrequested_workers >= 0);
265  Assert(nindexes > 0);
266 
267  /*
268  * Compute the number of parallel vacuum workers to launch
269  */
270  will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
271  parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
272  nrequested_workers,
273  will_parallel_vacuum);
274  if (parallel_workers <= 0)
275  {
276  /* Can't perform vacuum in parallel -- return NULL */
277  pfree(will_parallel_vacuum);
278  return NULL;
279  }
280 
282  pvs->indrels = indrels;
283  pvs->nindexes = nindexes;
284  pvs->will_parallel_vacuum = will_parallel_vacuum;
285  pvs->bstrategy = bstrategy;
286  pvs->heaprel = rel;
287 
289  pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
290  parallel_workers);
291  Assert(pcxt->nworkers > 0);
292  pvs->pcxt = pcxt;
293 
294  /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
295  est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
296  shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
297  shm_toc_estimate_keys(&pcxt->estimator, 1);
298 
299  /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
300  est_shared_len = sizeof(PVShared);
301  shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
302  shm_toc_estimate_keys(&pcxt->estimator, 1);
303 
304  /*
305  * Estimate space for BufferUsage and WalUsage --
306  * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
307  *
308  * If there are no extensions loaded that care, we could skip this. We
309  * have no way of knowing whether anyone's looking at pgBufferUsage or
310  * pgWalUsage, so do it unconditionally.
311  */
313  mul_size(sizeof(BufferUsage), pcxt->nworkers));
314  shm_toc_estimate_keys(&pcxt->estimator, 1);
316  mul_size(sizeof(WalUsage), pcxt->nworkers));
317  shm_toc_estimate_keys(&pcxt->estimator, 1);
318 
319  /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
320  if (debug_query_string)
321  {
322  querylen = strlen(debug_query_string);
323  shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
324  shm_toc_estimate_keys(&pcxt->estimator, 1);
325  }
326  else
327  querylen = 0; /* keep compiler quiet */
328 
329  InitializeParallelDSM(pcxt);
330 
331  /* Prepare index vacuum stats */
332  indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
333  MemSet(indstats, 0, est_indstats_len);
334  for (int i = 0; i < nindexes; i++)
335  {
336  Relation indrel = indrels[i];
337  uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
338 
339  /*
340  * Cleanup option should be either disabled, always performing in
341  * parallel or conditionally performing in parallel.
342  */
343  Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
344  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
345  Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
346 
347  if (!will_parallel_vacuum[i])
348  continue;
349 
350  if (indrel->rd_indam->amusemaintenanceworkmem)
351  nindexes_mwm++;
352 
353  /*
354  * Remember the number of indexes that support parallel operation for
355  * each phase.
356  */
357  if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
359  if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
361  if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
363  }
365  pvs->indstats = indstats;
366 
367  /* Prepare shared information */
368  shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
369  MemSet(shared, 0, est_shared_len);
370  shared->relid = RelationGetRelid(rel);
371  shared->elevel = elevel;
373  (nindexes_mwm > 0) ?
374  maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
376  shared->dead_items_info.max_bytes = vac_work_mem * 1024L;
377 
378  /* Prepare DSA space for dead items */
379  dead_items = TidStoreCreateShared(shared->dead_items_info.max_bytes,
381  pvs->dead_items = dead_items;
382  shared->dead_items_handle = TidStoreGetHandle(dead_items);
383  shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items));
384 
385  /* Use the same buffer size for all workers */
386  shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
387 
388  pg_atomic_init_u32(&(shared->cost_balance), 0);
389  pg_atomic_init_u32(&(shared->active_nworkers), 0);
390  pg_atomic_init_u32(&(shared->idx), 0);
391 
393  pvs->shared = shared;
394 
395  /*
396  * Allocate space for each worker's BufferUsage and WalUsage; no need to
397  * initialize
398  */
399  buffer_usage = shm_toc_allocate(pcxt->toc,
400  mul_size(sizeof(BufferUsage), pcxt->nworkers));
402  pvs->buffer_usage = buffer_usage;
403  wal_usage = shm_toc_allocate(pcxt->toc,
404  mul_size(sizeof(WalUsage), pcxt->nworkers));
406  pvs->wal_usage = wal_usage;
407 
408  /* Store query string for workers */
409  if (debug_query_string)
410  {
411  char *sharedquery;
412 
413  sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
414  memcpy(sharedquery, debug_query_string, querylen + 1);
415  sharedquery[querylen] = '\0';
416  shm_toc_insert(pcxt->toc,
417  PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
418  }
419 
420  /* Success -- return parallel vacuum state */
421  return pvs;
422 }
423 
424 /*
425  * Destroy the parallel context, and end parallel mode.
426  *
427  * Since writes are not allowed during parallel mode, copy the
428  * updated index statistics from DSM into local memory and then later use that
429  * to update the index statistics. One might think that we can exit from
430  * parallel mode, update the index statistics and then destroy parallel
431  * context, but that won't be safe (see ExitParallelMode).
432  */
433 void
435 {
437 
438  /* Copy the updated statistics */
439  for (int i = 0; i < pvs->nindexes; i++)
440  {
441  PVIndStats *indstats = &(pvs->indstats[i]);
442 
443  if (indstats->istat_updated)
444  {
445  istats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
446  memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
447  }
448  else
449  istats[i] = NULL;
450  }
451 
453 
456 
458  pfree(pvs);
459 }
460 
461 /*
462  * Returns the dead items space and dead items information.
463  */
464 TidStore *
466 {
467  *dead_items_info_p = &(pvs->shared->dead_items_info);
468  return pvs->dead_items;
469 }
470 
471 /* Forget all items in dead_items */
472 void
474 {
475  TidStore *dead_items = pvs->dead_items;
476  VacDeadItemsInfo *dead_items_info = &(pvs->shared->dead_items_info);
477 
478  /*
479  * Free the current tidstore and return allocated DSA segments to the
480  * operating system. Then we recreate the tidstore with the same max_bytes
481  * limitation we just used.
482  */
483  TidStoreDestroy(dead_items);
484  pvs->dead_items = TidStoreCreateShared(dead_items_info->max_bytes,
486 
487  /* Update the DSA pointer for dead_items to the new one */
489  pvs->shared->dead_items_handle = TidStoreGetHandle(dead_items);
490 
491  /* Reset the counter */
492  dead_items_info->num_items = 0;
493 }
494 
495 /*
496  * Do parallel index bulk-deletion with parallel workers.
497  */
498 void
500  int num_index_scans)
501 {
503 
504  /*
505  * We can only provide an approximate value of num_heap_tuples, at least
506  * for now.
507  */
508  pvs->shared->reltuples = num_table_tuples;
509  pvs->shared->estimated_count = true;
510 
511  parallel_vacuum_process_all_indexes(pvs, num_index_scans, true);
512 }
513 
514 /*
515  * Do parallel index cleanup with parallel workers.
516  */
517 void
519  int num_index_scans, bool estimated_count)
520 {
522 
523  /*
524  * We can provide a better estimate of total number of surviving tuples
525  * (we assume indexes are more interested in that than in the number of
526  * nominally live tuples).
527  */
528  pvs->shared->reltuples = num_table_tuples;
529  pvs->shared->estimated_count = estimated_count;
530 
531  parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
532 }
533 
534 /*
535  * Compute the number of parallel worker processes to request. Both index
536  * vacuum and index cleanup can be executed with parallel workers.
537  * The index is eligible for parallel vacuum iff its size is greater than
538  * min_parallel_index_scan_size as invoking workers for very small indexes
539  * can hurt performance.
540  *
541  * nrequested is the number of parallel workers that user requested. If
542  * nrequested is 0, we compute the parallel degree based on nindexes, that is
543  * the number of indexes that support parallel vacuum. This function also
544  * sets will_parallel_vacuum to remember indexes that participate in parallel
545  * vacuum.
546  */
547 static int
548 parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
549  bool *will_parallel_vacuum)
550 {
551  int nindexes_parallel = 0;
552  int nindexes_parallel_bulkdel = 0;
553  int nindexes_parallel_cleanup = 0;
554  int parallel_workers;
555 
556  /*
557  * We don't allow performing parallel operation in standalone backend or
558  * when parallelism is disabled.
559  */
561  return 0;
562 
563  /*
564  * Compute the number of indexes that can participate in parallel vacuum.
565  */
566  for (int i = 0; i < nindexes; i++)
567  {
568  Relation indrel = indrels[i];
569  uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
570 
571  /* Skip index that is not a suitable target for parallel index vacuum */
572  if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
574  continue;
575 
576  will_parallel_vacuum[i] = true;
577 
578  if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
579  nindexes_parallel_bulkdel++;
580  if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
581  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
582  nindexes_parallel_cleanup++;
583  }
584 
585  nindexes_parallel = Max(nindexes_parallel_bulkdel,
586  nindexes_parallel_cleanup);
587 
588  /* The leader process takes one index */
589  nindexes_parallel--;
590 
591  /* No index supports parallel vacuum */
592  if (nindexes_parallel <= 0)
593  return 0;
594 
595  /* Compute the parallel degree */
596  parallel_workers = (nrequested > 0) ?
597  Min(nrequested, nindexes_parallel) : nindexes_parallel;
598 
599  /* Cap by max_parallel_maintenance_workers */
600  parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
601 
602  return parallel_workers;
603 }
604 
605 /*
606  * Perform index vacuum or index cleanup with parallel workers. This function
607  * must be used by the parallel vacuum leader process.
608  */
609 static void
611  bool vacuum)
612 {
613  int nworkers;
614  PVIndVacStatus new_status;
615 
617 
618  if (vacuum)
619  {
621 
622  /* Determine the number of parallel workers to launch */
623  nworkers = pvs->nindexes_parallel_bulkdel;
624  }
625  else
626  {
628 
629  /* Determine the number of parallel workers to launch */
630  nworkers = pvs->nindexes_parallel_cleanup;
631 
632  /* Add conditionally parallel-aware indexes if in the first time call */
633  if (num_index_scans == 0)
634  nworkers += pvs->nindexes_parallel_condcleanup;
635  }
636 
637  /* The leader process will participate */
638  nworkers--;
639 
640  /*
641  * It is possible that parallel context is initialized with fewer workers
642  * than the number of indexes that need a separate worker in the current
643  * phase, so we need to consider it. See
644  * parallel_vacuum_compute_workers().
645  */
646  nworkers = Min(nworkers, pvs->pcxt->nworkers);
647 
648  /*
649  * Set index vacuum status and mark whether parallel vacuum worker can
650  * process it.
651  */
652  for (int i = 0; i < pvs->nindexes; i++)
653  {
654  PVIndStats *indstats = &(pvs->indstats[i]);
655 
657  indstats->status = new_status;
658  indstats->parallel_workers_can_process =
659  (pvs->will_parallel_vacuum[i] &&
661  num_index_scans,
662  vacuum));
663  }
664 
665  /* Reset the parallel index processing and progress counters */
666  pg_atomic_write_u32(&(pvs->shared->idx), 0);
667 
668  /* Setup the shared cost-based vacuum delay and launch workers */
669  if (nworkers > 0)
670  {
671  /* Reinitialize parallel context to relaunch parallel workers */
672  if (num_index_scans > 0)
674 
675  /*
676  * Set up shared cost balance and the number of active workers for
677  * vacuum delay. We need to do this before launching workers as
678  * otherwise, they might not see the updated values for these
679  * parameters.
680  */
683 
684  /*
685  * The number of workers can vary between bulkdelete and cleanup
686  * phase.
687  */
688  ReinitializeParallelWorkers(pvs->pcxt, nworkers);
689 
691 
692  if (pvs->pcxt->nworkers_launched > 0)
693  {
694  /*
695  * Reset the local cost values for leader backend as we have
696  * already accumulated the remaining balance of heap.
697  */
698  VacuumCostBalance = 0;
700 
701  /* Enable shared cost balance for leader backend */
704  }
705 
706  if (vacuum)
707  ereport(pvs->shared->elevel,
708  (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
709  "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
710  pvs->pcxt->nworkers_launched),
711  pvs->pcxt->nworkers_launched, nworkers)));
712  else
713  ereport(pvs->shared->elevel,
714  (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
715  "launched %d parallel vacuum workers for index cleanup (planned: %d)",
716  pvs->pcxt->nworkers_launched),
717  pvs->pcxt->nworkers_launched, nworkers)));
718  }
719 
720  /* Vacuum the indexes that can be processed by only leader process */
722 
723  /*
724  * Join as a parallel worker. The leader vacuums alone processes all
725  * parallel-safe indexes in the case where no workers are launched.
726  */
728 
729  /*
730  * Next, accumulate buffer and WAL usage. (This must wait for the workers
731  * to finish, or we might get incomplete data.)
732  */
733  if (nworkers > 0)
734  {
735  /* Wait for all vacuum workers to finish */
737 
738  for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
740  }
741 
742  /*
743  * Reset all index status back to initial (while checking that we have
744  * vacuumed all indexes).
745  */
746  for (int i = 0; i < pvs->nindexes; i++)
747  {
748  PVIndStats *indstats = &(pvs->indstats[i]);
749 
750  if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
751  elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
753 
755  }
756 
757  /*
758  * Carry the shared balance value to heap scan and disable shared costing
759  */
761  {
764  VacuumActiveNWorkers = NULL;
765  }
766 }
767 
768 /*
769  * Index vacuum/cleanup routine used by the leader process and parallel
770  * vacuum worker processes to vacuum the indexes in parallel.
771  */
772 static void
774 {
775  /*
776  * Increment the active worker count if we are able to launch any worker.
777  */
780 
781  /* Loop until all indexes are vacuumed */
782  for (;;)
783  {
784  int idx;
785  PVIndStats *indstats;
786 
787  /* Get an index number to process */
788  idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
789 
790  /* Done for all indexes? */
791  if (idx >= pvs->nindexes)
792  break;
793 
794  indstats = &(pvs->indstats[idx]);
795 
796  /*
797  * Skip vacuuming index that is unsafe for workers or has an
798  * unsuitable target for parallel index vacuum (this is vacuumed in
799  * parallel_vacuum_process_unsafe_indexes() by the leader).
800  */
801  if (!indstats->parallel_workers_can_process)
802  continue;
803 
804  /* Do vacuum or cleanup of the index */
805  parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
806  }
807 
808  /*
809  * We have completed the index vacuum so decrement the active worker
810  * count.
811  */
814 }
815 
816 /*
817  * Perform parallel vacuuming of indexes in leader process.
818  *
819  * Handles index vacuuming (or index cleanup) for indexes that are not
820  * parallel safe. It's possible that this will vary for a given index, based
821  * on details like whether we're performing index cleanup right now.
822  *
823  * Also performs vacuuming of smaller indexes that fell under the size cutoff
824  * enforced by parallel_vacuum_compute_workers().
825  */
826 static void
828 {
830 
831  /*
832  * Increment the active worker count if we are able to launch any worker.
833  */
836 
837  for (int i = 0; i < pvs->nindexes; i++)
838  {
839  PVIndStats *indstats = &(pvs->indstats[i]);
840 
841  /* Skip, indexes that are safe for workers */
842  if (indstats->parallel_workers_can_process)
843  continue;
844 
845  /* Do vacuum or cleanup of the index */
846  parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
847  }
848 
849  /*
850  * We have completed the index vacuum so decrement the active worker
851  * count.
852  */
855 }
856 
857 /*
858  * Vacuum or cleanup index either by leader process or by one of the worker
859  * process. After vacuuming the index this function copies the index
860  * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
861  * segment.
862  */
863 static void
865  PVIndStats *indstats)
866 {
867  IndexBulkDeleteResult *istat = NULL;
868  IndexBulkDeleteResult *istat_res;
869  IndexVacuumInfo ivinfo;
870 
871  /*
872  * Update the pointer to the corresponding bulk-deletion result if someone
873  * has already updated it
874  */
875  if (indstats->istat_updated)
876  istat = &(indstats->istat);
877 
878  ivinfo.index = indrel;
879  ivinfo.heaprel = pvs->heaprel;
880  ivinfo.analyze_only = false;
881  ivinfo.report_progress = false;
882  ivinfo.message_level = DEBUG2;
883  ivinfo.estimated_count = pvs->shared->estimated_count;
884  ivinfo.num_heap_tuples = pvs->shared->reltuples;
885  ivinfo.strategy = pvs->bstrategy;
886 
887  /* Update error traceback information */
888  pvs->indname = pstrdup(RelationGetRelationName(indrel));
889  pvs->status = indstats->status;
890 
891  switch (indstats->status)
892  {
894  istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items,
895  &pvs->shared->dead_items_info);
896  break;
898  istat_res = vac_cleanup_one_index(&ivinfo, istat);
899  break;
900  default:
901  elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
902  indstats->status,
903  RelationGetRelationName(indrel));
904  }
905 
906  /*
907  * Copy the index bulk-deletion result returned from ambulkdelete and
908  * amvacuumcleanup to the DSM segment if it's the first cycle because they
909  * allocate locally and it's possible that an index will be vacuumed by a
910  * different vacuum process the next cycle. Copying the result normally
911  * happens only the first time an index is vacuumed. For any additional
912  * vacuum pass, we directly point to the result on the DSM segment and
913  * pass it to vacuum index APIs so that workers can update it directly.
914  *
915  * Since all vacuum workers write the bulk-deletion result at different
916  * slots we can write them without locking.
917  */
918  if (!indstats->istat_updated && istat_res != NULL)
919  {
920  memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
921  indstats->istat_updated = true;
922 
923  /* Free the locally-allocated bulk-deletion result */
924  pfree(istat_res);
925  }
926 
927  /*
928  * Update the status to completed. No need to lock here since each worker
929  * touches different indexes.
930  */
932 
933  /* Reset error traceback information */
935  pfree(pvs->indname);
936  pvs->indname = NULL;
937 
938  /*
939  * Call the parallel variant of pgstat_progress_incr_param so workers can
940  * report progress of index vacuum to the leader.
941  */
943 }
944 
945 /*
946  * Returns false, if the given index can't participate in the next execution of
947  * parallel index vacuum or parallel index cleanup.
948  */
949 static bool
951  bool vacuum)
952 {
953  uint8 vacoptions;
954 
955  vacoptions = indrel->rd_indam->amparallelvacuumoptions;
956 
957  /* In parallel vacuum case, check if it supports parallel bulk-deletion */
958  if (vacuum)
959  return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
960 
961  /* Not safe, if the index does not support parallel cleanup */
962  if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
963  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
964  return false;
965 
966  /*
967  * Not safe, if the index supports parallel cleanup conditionally, but we
968  * have already processed the index (for bulkdelete). We do this to avoid
969  * the need to invoke workers when parallel index cleanup doesn't need to
970  * scan the index. See the comments for option
971  * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
972  * parallel cleanup conditionally.
973  */
974  if (num_index_scans > 0 &&
975  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
976  return false;
977 
978  return true;
979 }
980 
981 /*
982  * Perform work within a launched parallel process.
983  *
984  * Since parallel vacuum workers perform only index vacuum or index cleanup,
985  * we don't need to report progress information.
986  */
987 void
989 {
991  Relation rel;
992  Relation *indrels;
993  PVIndStats *indstats;
994  PVShared *shared;
995  TidStore *dead_items;
996  BufferUsage *buffer_usage;
997  WalUsage *wal_usage;
998  int nindexes;
999  char *sharedquery;
1000  ErrorContextCallback errcallback;
1001 
1002  /*
1003  * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
1004  * don't support parallel vacuum for autovacuum as of now.
1005  */
1007 
1008  elog(DEBUG1, "starting parallel vacuum worker");
1009 
1010  shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
1011 
1012  /* Set debug_query_string for individual workers */
1013  sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
1014  debug_query_string = sharedquery;
1016 
1017  /*
1018  * Open table. The lock mode is the same as the leader process. It's
1019  * okay because the lock mode does not conflict among the parallel
1020  * workers.
1021  */
1022  rel = table_open(shared->relid, ShareUpdateExclusiveLock);
1023 
1024  /*
1025  * Open all indexes. indrels are sorted in order by OID, which should be
1026  * matched to the leader's one.
1027  */
1028  vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
1029  Assert(nindexes > 0);
1030 
1031  if (shared->maintenance_work_mem_worker > 0)
1033 
1034  /* Set index statistics */
1035  indstats = (PVIndStats *) shm_toc_lookup(toc,
1037  false);
1038 
1039  /* Find dead_items in shared memory */
1040  dead_items = TidStoreAttach(shared->dead_items_dsa_handle,
1041  shared->dead_items_handle);
1042 
1043  /* Set cost-based vacuum delay */
1045  VacuumCostBalance = 0;
1049 
1050  /* Set parallel vacuum state */
1051  pvs.indrels = indrels;
1052  pvs.nindexes = nindexes;
1053  pvs.indstats = indstats;
1054  pvs.shared = shared;
1055  pvs.dead_items = dead_items;
1058  pvs.heaprel = rel;
1059 
1060  /* These fields will be filled during index vacuum or cleanup */
1061  pvs.indname = NULL;
1063 
1064  /* Each parallel VACUUM worker gets its own access strategy. */
1066  shared->ring_nbuffers * (BLCKSZ / 1024));
1067 
1068  /* Setup error traceback support for ereport() */
1070  errcallback.arg = &pvs;
1071  errcallback.previous = error_context_stack;
1072  error_context_stack = &errcallback;
1073 
1074  /* Prepare to track buffer usage during parallel execution */
1076 
1077  /* Process indexes to perform vacuum/cleanup */
1079 
1080  /* Report buffer/WAL usage during parallel execution */
1081  buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
1082  wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
1084  &wal_usage[ParallelWorkerNumber]);
1085 
1086  TidStoreDetach(dead_items);
1087 
1088  /* Pop the error context stack */
1089  error_context_stack = errcallback.previous;
1090 
1091  vac_close_indexes(nindexes, indrels, RowExclusiveLock);
1094 }
1095 
1096 /*
1097  * Error context callback for errors occurring during parallel index vacuum.
1098  * The error context messages should match the messages set in the lazy vacuum
1099  * error context. If you change this function, change vacuum_error_callback()
1100  * as well.
1101  */
1102 static void
1104 {
1105  ParallelVacuumState *errinfo = arg;
1106 
1107  switch (errinfo->status)
1108  {
1110  errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
1111  errinfo->indname,
1112  errinfo->relnamespace,
1113  errinfo->relname);
1114  break;
1116  errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
1117  errinfo->indname,
1118  errinfo->relnamespace,
1119  errinfo->relname);
1120  break;
1123  default:
1124  return;
1125  }
1126 }
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:259
int min_parallel_index_scan_size
Definition: allpaths.c:82
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition: atomics.h:439
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:221
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:366
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:424
static void pg_atomic_write_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:276
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition: atomics.h:239
void VacuumUpdateCosts(void)
Definition: autovacuum.c:1634
int ParallelWorkerNumber
Definition: parallel.c:112
void InitializeParallelDSM(ParallelContext *pcxt)
Definition: parallel.c:205
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
Definition: parallel.c:775
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition: parallel.c:552
void ReinitializeParallelDSM(ParallelContext *pcxt)
Definition: parallel.c:488
void DestroyParallelContext(ParallelContext *pcxt)
Definition: parallel.c:929
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
Definition: parallel.c:167
void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
Definition: parallel.c:538
void pgstat_progress_parallel_incr_param(int index, int64 incr)
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
@ BAS_VACUUM
Definition: bufmgr.h:39
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:273
#define Min(x, y)
Definition: c.h:1004
#define ngettext(s, p, n)
Definition: c.h:1181
#define Max(x, y)
Definition: c.h:998
#define Assert(condition)
Definition: c.h:858
unsigned char uint8
Definition: c.h:504
#define MemSet(start, val, len)
Definition: c.h:1020
size_t Size
Definition: c.h:605
dsa_handle dsa_get_handle(dsa_area *area)
Definition: dsa.c:498
uint64 dsa_pointer
Definition: dsa.h:62
dsm_handle dsa_handle
Definition: dsa.h:136
ErrorContextCallback * error_context_stack
Definition: elog.c:94
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define errcontext
Definition: elog.h:196
#define DEBUG2
Definition: elog.h:29
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
BufferAccessStrategy GetAccessStrategyWithSize(BufferAccessStrategyType btype, int ring_size_kb)
Definition: freelist.c:584
int GetAccessStrategyBufferCount(BufferAccessStrategy strategy)
Definition: freelist.c:624
void FreeAccessStrategy(BufferAccessStrategy strategy)
Definition: freelist.c:681
int max_parallel_maintenance_workers
Definition: globals.c:133
bool IsUnderPostmaster
Definition: globals.c:119
int VacuumCostBalance
Definition: globals.c:156
int maintenance_work_mem
Definition: globals.c:132
#define IsParallelWorker()
Definition: parallel.h:60
void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition: instrument.c:218
void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition: instrument.c:208
void InstrStartParallelQuery(void)
Definition: instrument.c:200
int i
Definition: isn.c:73
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3366
@ LWTRANCHE_PARALLEL_VACUUM_DSA
Definition: lwlock.h:217
char * pstrdup(const char *in)
Definition: mcxt.c:1696
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
void * arg
const char * debug_query_string
Definition: postgres.c:88
unsigned int Oid
Definition: postgres_ext.h:31
#define PROC_IN_VACUUM
Definition: proc.h:58
#define PROGRESS_VACUUM_INDEXES_PROCESSED
Definition: progress.h:30
#define RelationGetRelid(relation)
Definition: rel.h:505
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RelationGetNamespace(relation)
Definition: rel.h:546
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
PGPROC * MyProc
Definition: proc.c:67
struct ErrorContextCallback * previous
Definition: elog.h:296
void(* callback)(void *arg)
Definition: elog.h:297
bool amusemaintenanceworkmem
Definition: amapi.h:259
uint8 amparallelvacuumoptions
Definition: amapi.h:263
Relation index
Definition: genam.h:46
double num_heap_tuples
Definition: genam.h:52
bool analyze_only
Definition: genam.h:48
BufferAccessStrategy strategy
Definition: genam.h:53
Relation heaprel
Definition: genam.h:47
bool report_progress
Definition: genam.h:49
int message_level
Definition: genam.h:51
bool estimated_count
Definition: genam.h:50
uint8 statusFlags
Definition: proc.h:237
bool istat_updated
IndexBulkDeleteResult istat
bool parallel_workers_can_process
PVIndVacStatus status
double reltuples
pg_atomic_uint32 cost_balance
int maintenance_work_mem_worker
pg_atomic_uint32 active_nworkers
dsa_pointer dead_items_handle
dsa_handle dead_items_dsa_handle
int ring_nbuffers
pg_atomic_uint32 idx
bool estimated_count
VacDeadItemsInfo dead_items_info
shm_toc_estimator estimator
Definition: parallel.h:41
shm_toc * toc
Definition: parallel.h:44
int nworkers_launched
Definition: parallel.h:37
BufferAccessStrategy bstrategy
BufferUsage * buffer_usage
ParallelContext * pcxt
PVIndStats * indstats
PVIndVacStatus status
struct IndexAmRoutine * rd_indam
Definition: rel.h:206
size_t max_bytes
Definition: vacuum.h:287
int64 num_items
Definition: vacuum.h:288
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
TidStore * TidStoreAttach(dsa_handle area_handle, dsa_pointer handle)
Definition: tidstore.c:252
void TidStoreDetach(TidStore *ts)
Definition: tidstore.c:277
TidStore * TidStoreCreateShared(size_t max_bytes, int tranche_id)
Definition: tidstore.c:210
void TidStoreDestroy(TidStore *ts)
Definition: tidstore.c:325
dsa_area * TidStoreGetDSA(TidStore *ts)
Definition: tidstore.c:552
dsa_pointer TidStoreGetHandle(TidStore *ts)
Definition: tidstore.c:560
IndexBulkDeleteResult * vac_bulkdel_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat, TidStore *dead_items, VacDeadItemsInfo *dead_items_info)
Definition: vacuum.c:2491
pg_atomic_uint32 * VacuumActiveNWorkers
Definition: vacuum.c:103
void vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel)
Definition: vacuum.c:2273
void vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, MemoryContext vac_context, bool isTopLevel)
Definition: vacuum.c:478
int VacuumCostBalanceLocal
Definition: vacuum.c:104
void vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
Definition: vacuum.c:2316
pg_atomic_uint32 * VacuumSharedCostBalance
Definition: vacuum.c:102
IndexBulkDeleteResult * vac_cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat)
Definition: vacuum.c:2512
#define VACUUM_OPTION_PARALLEL_CLEANUP
Definition: vacuum.h:63
#define VACUUM_OPTION_NO_PARALLEL
Definition: vacuum.h:42
#define VACUUM_OPTION_PARALLEL_BULKDEL
Definition: vacuum.h:48
#define VACUUM_OPTION_MAX_VALID_VALUE
Definition: vacuum.h:66
#define VACUUM_OPTION_PARALLEL_COND_CLEANUP
Definition: vacuum.h:55
static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, bool vacuum)
static void parallel_vacuum_error_callback(void *arg)
static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, bool *will_parallel_vacuum)
#define PARALLEL_VACUUM_KEY_INDEX_STATS
#define PARALLEL_VACUUM_KEY_QUERY_TEXT
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE
ParallelVacuumState * parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int vac_work_mem, int elevel, BufferAccessStrategy bstrategy)
#define PARALLEL_VACUUM_KEY_SHARED
void parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans)
#define PARALLEL_VACUUM_KEY_WAL_USAGE
void parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, bool estimated_count)
static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, PVIndStats *indstats)
void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
struct PVShared PVShared
static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, bool vacuum)
static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
TidStore * parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
struct PVIndStats PVIndStats
void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
PVIndVacStatus
@ PARALLEL_INDVAC_STATUS_NEED_CLEANUP
@ PARALLEL_INDVAC_STATUS_INITIAL
@ PARALLEL_INDVAC_STATUS_NEED_BULKDELETE
@ PARALLEL_INDVAC_STATUS_COMPLETED
void ExitParallelMode(void)
Definition: xact.c:1063
void EnterParallelMode(void)
Definition: xact.c:1050