PostgreSQL Source Code git master
vacuumparallel.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * vacuumparallel.c
4 * Support routines for parallel vacuum execution.
5 *
6 * This file contains routines that are intended to support setting up, using,
7 * and tearing down a ParallelVacuumState.
8 *
9 * In a parallel vacuum, we perform both index bulk deletion and index cleanup
10 * with parallel worker processes. Individual indexes are processed by one
11 * vacuum process. ParallelVacuumState contains shared information as well as
12 * the memory space for storing dead items allocated in the DSA area. We
13 * launch parallel worker processes at the start of parallel index
14 * bulk-deletion and index cleanup and once all indexes are processed, the
15 * parallel worker processes exit. Each time we process indexes in parallel,
16 * the parallel context is re-initialized so that the same DSM can be used for
17 * multiple passes of index bulk-deletion and index cleanup.
18 *
19 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
20 * Portions Copyright (c) 1994, Regents of the University of California
21 *
22 * IDENTIFICATION
23 * src/backend/commands/vacuumparallel.c
24 *
25 *-------------------------------------------------------------------------
26 */
27#include "postgres.h"
28
29#include "access/amapi.h"
30#include "access/table.h"
31#include "access/xact.h"
32#include "commands/progress.h"
33#include "commands/vacuum.h"
34#include "executor/instrument.h"
35#include "optimizer/paths.h"
36#include "pgstat.h"
37#include "storage/bufmgr.h"
38#include "tcop/tcopprot.h"
39#include "utils/lsyscache.h"
40#include "utils/rel.h"
41
42/*
43 * DSM keys for parallel vacuum. Unlike other parallel execution code, since
44 * we don't need to worry about DSM keys conflicting with plan_node_id we can
45 * use small integers.
46 */
47#define PARALLEL_VACUUM_KEY_SHARED 1
48#define PARALLEL_VACUUM_KEY_QUERY_TEXT 2
49#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3
50#define PARALLEL_VACUUM_KEY_WAL_USAGE 4
51#define PARALLEL_VACUUM_KEY_INDEX_STATS 5
52
53/*
54 * Shared information among parallel workers. So this is allocated in the DSM
55 * segment.
56 */
57typedef struct PVShared
58{
59 /*
60 * Target table relid, log level (for messages about parallel workers
61 * launched during VACUUM VERBOSE) and query ID. These fields are not
62 * modified during the parallel vacuum.
63 */
65 int elevel;
67
68 /*
69 * Fields for both index vacuum and cleanup.
70 *
71 * reltuples is the total number of input heap tuples. We set either old
72 * live tuples in the index vacuum case or the new live tuples in the
73 * index cleanup case.
74 *
75 * estimated_count is true if reltuples is an estimated value. (Note that
76 * reltuples could be -1 in this case, indicating we have no idea.)
77 */
78 double reltuples;
80
81 /*
82 * In single process vacuum we could consume more memory during index
83 * vacuuming or cleanup apart from the memory for heap scanning. In
84 * parallel vacuum, since individual vacuum workers can consume memory
85 * equal to maintenance_work_mem, the new maintenance_work_mem for each
86 * worker is set such that the parallel operation doesn't consume more
87 * memory than single process vacuum.
88 */
90
91 /*
92 * The number of buffers each worker's Buffer Access Strategy ring should
93 * contain.
94 */
96
97 /*
98 * Shared vacuum cost balance. During parallel vacuum,
99 * VacuumSharedCostBalance points to this value and it accumulates the
100 * balance of each parallel vacuum worker.
101 */
103
104 /*
105 * Number of active parallel workers. This is used for computing the
106 * minimum threshold of the vacuum cost balance before a worker sleeps for
107 * cost-based delay.
108 */
110
111 /* Counter for vacuuming and cleanup */
113
114 /* DSA handle where the TidStore lives */
116
117 /* DSA pointer to the shared TidStore */
119
120 /* Statistics of shared dead items */
123
124/* Status used during parallel index vacuum or cleanup */
125typedef enum PVIndVacStatus
126{
132
133/*
134 * Struct for index vacuum statistics of an index that is used for parallel vacuum.
135 * This includes the status of parallel index vacuum as well as index statistics.
136 */
137typedef struct PVIndStats
138{
139 /*
140 * The following two fields are set by leader process before executing
141 * parallel index vacuum or parallel index cleanup. These fields are not
142 * fixed for the entire VACUUM operation. They are only fixed for an
143 * individual parallel index vacuum and cleanup.
144 *
145 * parallel_workers_can_process is true if both leader and worker can
146 * process the index, otherwise only leader can process it.
147 */
150
151 /*
152 * Individual worker or leader stores the result of index vacuum or
153 * cleanup.
154 */
155 bool istat_updated; /* are the stats updated? */
158
159/*
160 * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
161 */
163{
164 /* NULL for worker processes */
166
167 /* Parent Heap Relation */
169
170 /* Target indexes */
173
174 /* Shared information among parallel vacuum workers */
176
177 /*
178 * Shared index statistics among parallel vacuum workers. The array
179 * element is allocated for every index, even those indexes where parallel
180 * index vacuuming is unsafe or not worthwhile (e.g.,
181 * will_parallel_vacuum[] is false). During parallel vacuum,
182 * IndexBulkDeleteResult of each index is kept in DSM and is copied into
183 * local memory at the end of parallel vacuum.
184 */
186
187 /* Shared dead items space among parallel vacuum workers */
189
190 /* Points to buffer usage area in DSM */
192
193 /* Points to WAL usage area in DSM */
195
196 /*
197 * False if the index is totally unsuitable target for all parallel
198 * processing. For example, the index could be <
199 * min_parallel_index_scan_size cutoff.
200 */
202
203 /*
204 * The number of indexes that support parallel index bulk-deletion and
205 * parallel index cleanup respectively.
206 */
210
211 /* Buffer access strategy used by leader process */
213
214 /*
215 * Error reporting state. The error callback is set only for workers
216 * processes during parallel index vacuum.
217 */
219 char *relname;
220 char *indname;
222};
223
224static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
225 bool *will_parallel_vacuum);
226static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
227 bool vacuum);
231 PVIndStats *indstats);
232static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
233 bool vacuum);
234static void parallel_vacuum_error_callback(void *arg);
235
236/*
237 * Try to enter parallel mode and create a parallel context. Then initialize
238 * shared memory state.
239 *
240 * On success, return parallel vacuum state. Otherwise return NULL.
241 */
243parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
244 int nrequested_workers, int vac_work_mem,
245 int elevel, BufferAccessStrategy bstrategy)
246{
248 ParallelContext *pcxt;
249 PVShared *shared;
250 TidStore *dead_items;
251 PVIndStats *indstats;
252 BufferUsage *buffer_usage;
253 WalUsage *wal_usage;
254 bool *will_parallel_vacuum;
255 Size est_indstats_len;
256 Size est_shared_len;
257 int nindexes_mwm = 0;
258 int parallel_workers = 0;
259 int querylen;
260
261 /*
262 * A parallel vacuum must be requested and there must be indexes on the
263 * relation
264 */
265 Assert(nrequested_workers >= 0);
266 Assert(nindexes > 0);
267
268 /*
269 * Compute the number of parallel vacuum workers to launch
270 */
271 will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
272 parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
273 nrequested_workers,
274 will_parallel_vacuum);
275 if (parallel_workers <= 0)
276 {
277 /* Can't perform vacuum in parallel -- return NULL */
278 pfree(will_parallel_vacuum);
279 return NULL;
280 }
281
283 pvs->indrels = indrels;
284 pvs->nindexes = nindexes;
285 pvs->will_parallel_vacuum = will_parallel_vacuum;
286 pvs->bstrategy = bstrategy;
287 pvs->heaprel = rel;
288
290 pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
291 parallel_workers);
292 Assert(pcxt->nworkers > 0);
293 pvs->pcxt = pcxt;
294
295 /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
296 est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
297 shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
299
300 /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
301 est_shared_len = sizeof(PVShared);
302 shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
304
305 /*
306 * Estimate space for BufferUsage and WalUsage --
307 * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
308 *
309 * If there are no extensions loaded that care, we could skip this. We
310 * have no way of knowing whether anyone's looking at pgBufferUsage or
311 * pgWalUsage, so do it unconditionally.
312 */
314 mul_size(sizeof(BufferUsage), pcxt->nworkers));
317 mul_size(sizeof(WalUsage), pcxt->nworkers));
319
320 /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
322 {
323 querylen = strlen(debug_query_string);
324 shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
326 }
327 else
328 querylen = 0; /* keep compiler quiet */
329
331
332 /* Prepare index vacuum stats */
333 indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
334 MemSet(indstats, 0, est_indstats_len);
335 for (int i = 0; i < nindexes; i++)
336 {
337 Relation indrel = indrels[i];
338 uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
339
340 /*
341 * Cleanup option should be either disabled, always performing in
342 * parallel or conditionally performing in parallel.
343 */
344 Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
345 ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
347
348 if (!will_parallel_vacuum[i])
349 continue;
350
352 nindexes_mwm++;
353
354 /*
355 * Remember the number of indexes that support parallel operation for
356 * each phase.
357 */
358 if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
360 if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
362 if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
364 }
366 pvs->indstats = indstats;
367
368 /* Prepare shared information */
369 shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
370 MemSet(shared, 0, est_shared_len);
371 shared->relid = RelationGetRelid(rel);
372 shared->elevel = elevel;
375 (nindexes_mwm > 0) ?
376 maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
378 shared->dead_items_info.max_bytes = vac_work_mem * 1024L;
379
380 /* Prepare DSA space for dead items */
383 pvs->dead_items = dead_items;
384 shared->dead_items_handle = TidStoreGetHandle(dead_items);
386
387 /* Use the same buffer size for all workers */
388 shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
389
390 pg_atomic_init_u32(&(shared->cost_balance), 0);
391 pg_atomic_init_u32(&(shared->active_nworkers), 0);
392 pg_atomic_init_u32(&(shared->idx), 0);
393
395 pvs->shared = shared;
396
397 /*
398 * Allocate space for each worker's BufferUsage and WalUsage; no need to
399 * initialize
400 */
401 buffer_usage = shm_toc_allocate(pcxt->toc,
402 mul_size(sizeof(BufferUsage), pcxt->nworkers));
404 pvs->buffer_usage = buffer_usage;
405 wal_usage = shm_toc_allocate(pcxt->toc,
406 mul_size(sizeof(WalUsage), pcxt->nworkers));
408 pvs->wal_usage = wal_usage;
409
410 /* Store query string for workers */
412 {
413 char *sharedquery;
414
415 sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
416 memcpy(sharedquery, debug_query_string, querylen + 1);
417 sharedquery[querylen] = '\0';
418 shm_toc_insert(pcxt->toc,
419 PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
420 }
421
422 /* Success -- return parallel vacuum state */
423 return pvs;
424}
425
426/*
427 * Destroy the parallel context, and end parallel mode.
428 *
429 * Since writes are not allowed during parallel mode, copy the
430 * updated index statistics from DSM into local memory and then later use that
431 * to update the index statistics. One might think that we can exit from
432 * parallel mode, update the index statistics and then destroy parallel
433 * context, but that won't be safe (see ExitParallelMode).
434 */
435void
437{
439
440 /* Copy the updated statistics */
441 for (int i = 0; i < pvs->nindexes; i++)
442 {
443 PVIndStats *indstats = &(pvs->indstats[i]);
444
445 if (indstats->istat_updated)
446 {
448 memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
449 }
450 else
451 istats[i] = NULL;
452 }
453
455
458
460 pfree(pvs);
461}
462
463/*
464 * Returns the dead items space and dead items information.
465 */
466TidStore *
468{
469 *dead_items_info_p = &(pvs->shared->dead_items_info);
470 return pvs->dead_items;
471}
472
473/* Forget all items in dead_items */
474void
476{
477 VacDeadItemsInfo *dead_items_info = &(pvs->shared->dead_items_info);
478
479 /*
480 * Free the current tidstore and return allocated DSA segments to the
481 * operating system. Then we recreate the tidstore with the same max_bytes
482 * limitation we just used.
483 */
485 pvs->dead_items = TidStoreCreateShared(dead_items_info->max_bytes,
487
488 /* Update the DSA pointer for dead_items to the new one */
491
492 /* Reset the counter */
493 dead_items_info->num_items = 0;
494}
495
496/*
497 * Do parallel index bulk-deletion with parallel workers.
498 */
499void
501 int num_index_scans)
502{
504
505 /*
506 * We can only provide an approximate value of num_heap_tuples, at least
507 * for now.
508 */
509 pvs->shared->reltuples = num_table_tuples;
510 pvs->shared->estimated_count = true;
511
512 parallel_vacuum_process_all_indexes(pvs, num_index_scans, true);
513}
514
515/*
516 * Do parallel index cleanup with parallel workers.
517 */
518void
520 int num_index_scans, bool estimated_count)
521{
523
524 /*
525 * We can provide a better estimate of total number of surviving tuples
526 * (we assume indexes are more interested in that than in the number of
527 * nominally live tuples).
528 */
529 pvs->shared->reltuples = num_table_tuples;
530 pvs->shared->estimated_count = estimated_count;
531
532 parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
533}
534
535/*
536 * Compute the number of parallel worker processes to request. Both index
537 * vacuum and index cleanup can be executed with parallel workers.
538 * The index is eligible for parallel vacuum iff its size is greater than
539 * min_parallel_index_scan_size as invoking workers for very small indexes
540 * can hurt performance.
541 *
542 * nrequested is the number of parallel workers that user requested. If
543 * nrequested is 0, we compute the parallel degree based on nindexes, that is
544 * the number of indexes that support parallel vacuum. This function also
545 * sets will_parallel_vacuum to remember indexes that participate in parallel
546 * vacuum.
547 */
548static int
549parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
550 bool *will_parallel_vacuum)
551{
552 int nindexes_parallel = 0;
553 int nindexes_parallel_bulkdel = 0;
554 int nindexes_parallel_cleanup = 0;
555 int parallel_workers;
556
557 /*
558 * We don't allow performing parallel operation in standalone backend or
559 * when parallelism is disabled.
560 */
562 return 0;
563
564 /*
565 * Compute the number of indexes that can participate in parallel vacuum.
566 */
567 for (int i = 0; i < nindexes; i++)
568 {
569 Relation indrel = indrels[i];
570 uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
571
572 /* Skip index that is not a suitable target for parallel index vacuum */
573 if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
575 continue;
576
577 will_parallel_vacuum[i] = true;
578
579 if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
580 nindexes_parallel_bulkdel++;
581 if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
582 ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
583 nindexes_parallel_cleanup++;
584 }
585
586 nindexes_parallel = Max(nindexes_parallel_bulkdel,
587 nindexes_parallel_cleanup);
588
589 /* The leader process takes one index */
590 nindexes_parallel--;
591
592 /* No index supports parallel vacuum */
593 if (nindexes_parallel <= 0)
594 return 0;
595
596 /* Compute the parallel degree */
597 parallel_workers = (nrequested > 0) ?
598 Min(nrequested, nindexes_parallel) : nindexes_parallel;
599
600 /* Cap by max_parallel_maintenance_workers */
601 parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
602
603 return parallel_workers;
604}
605
606/*
607 * Perform index vacuum or index cleanup with parallel workers. This function
608 * must be used by the parallel vacuum leader process.
609 */
610static void
612 bool vacuum)
613{
614 int nworkers;
615 PVIndVacStatus new_status;
616
618
619 if (vacuum)
620 {
622
623 /* Determine the number of parallel workers to launch */
624 nworkers = pvs->nindexes_parallel_bulkdel;
625 }
626 else
627 {
629
630 /* Determine the number of parallel workers to launch */
631 nworkers = pvs->nindexes_parallel_cleanup;
632
633 /* Add conditionally parallel-aware indexes if in the first time call */
634 if (num_index_scans == 0)
635 nworkers += pvs->nindexes_parallel_condcleanup;
636 }
637
638 /* The leader process will participate */
639 nworkers--;
640
641 /*
642 * It is possible that parallel context is initialized with fewer workers
643 * than the number of indexes that need a separate worker in the current
644 * phase, so we need to consider it. See
645 * parallel_vacuum_compute_workers().
646 */
647 nworkers = Min(nworkers, pvs->pcxt->nworkers);
648
649 /*
650 * Set index vacuum status and mark whether parallel vacuum worker can
651 * process it.
652 */
653 for (int i = 0; i < pvs->nindexes; i++)
654 {
655 PVIndStats *indstats = &(pvs->indstats[i]);
656
658 indstats->status = new_status;
660 (pvs->will_parallel_vacuum[i] &&
662 num_index_scans,
663 vacuum));
664 }
665
666 /* Reset the parallel index processing and progress counters */
667 pg_atomic_write_u32(&(pvs->shared->idx), 0);
668
669 /* Setup the shared cost-based vacuum delay and launch workers */
670 if (nworkers > 0)
671 {
672 /* Reinitialize parallel context to relaunch parallel workers */
673 if (num_index_scans > 0)
675
676 /*
677 * Set up shared cost balance and the number of active workers for
678 * vacuum delay. We need to do this before launching workers as
679 * otherwise, they might not see the updated values for these
680 * parameters.
681 */
684
685 /*
686 * The number of workers can vary between bulkdelete and cleanup
687 * phase.
688 */
689 ReinitializeParallelWorkers(pvs->pcxt, nworkers);
690
692
693 if (pvs->pcxt->nworkers_launched > 0)
694 {
695 /*
696 * Reset the local cost values for leader backend as we have
697 * already accumulated the remaining balance of heap.
698 */
701
702 /* Enable shared cost balance for leader backend */
705 }
706
707 if (vacuum)
708 ereport(pvs->shared->elevel,
709 (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
710 "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
711 pvs->pcxt->nworkers_launched),
712 pvs->pcxt->nworkers_launched, nworkers)));
713 else
714 ereport(pvs->shared->elevel,
715 (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
716 "launched %d parallel vacuum workers for index cleanup (planned: %d)",
717 pvs->pcxt->nworkers_launched),
718 pvs->pcxt->nworkers_launched, nworkers)));
719 }
720
721 /* Vacuum the indexes that can be processed by only leader process */
723
724 /*
725 * Join as a parallel worker. The leader vacuums alone processes all
726 * parallel-safe indexes in the case where no workers are launched.
727 */
729
730 /*
731 * Next, accumulate buffer and WAL usage. (This must wait for the workers
732 * to finish, or we might get incomplete data.)
733 */
734 if (nworkers > 0)
735 {
736 /* Wait for all vacuum workers to finish */
738
739 for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
741 }
742
743 /*
744 * Reset all index status back to initial (while checking that we have
745 * vacuumed all indexes).
746 */
747 for (int i = 0; i < pvs->nindexes; i++)
748 {
749 PVIndStats *indstats = &(pvs->indstats[i]);
750
752 elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
754
756 }
757
758 /*
759 * Carry the shared balance value to heap scan and disable shared costing
760 */
762 {
766 }
767}
768
769/*
770 * Index vacuum/cleanup routine used by the leader process and parallel
771 * vacuum worker processes to vacuum the indexes in parallel.
772 */
773static void
775{
776 /*
777 * Increment the active worker count if we are able to launch any worker.
778 */
781
782 /* Loop until all indexes are vacuumed */
783 for (;;)
784 {
785 int idx;
786 PVIndStats *indstats;
787
788 /* Get an index number to process */
789 idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
790
791 /* Done for all indexes? */
792 if (idx >= pvs->nindexes)
793 break;
794
795 indstats = &(pvs->indstats[idx]);
796
797 /*
798 * Skip vacuuming index that is unsafe for workers or has an
799 * unsuitable target for parallel index vacuum (this is vacuumed in
800 * parallel_vacuum_process_unsafe_indexes() by the leader).
801 */
802 if (!indstats->parallel_workers_can_process)
803 continue;
804
805 /* Do vacuum or cleanup of the index */
806 parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
807 }
808
809 /*
810 * We have completed the index vacuum so decrement the active worker
811 * count.
812 */
815}
816
817/*
818 * Perform parallel vacuuming of indexes in leader process.
819 *
820 * Handles index vacuuming (or index cleanup) for indexes that are not
821 * parallel safe. It's possible that this will vary for a given index, based
822 * on details like whether we're performing index cleanup right now.
823 *
824 * Also performs vacuuming of smaller indexes that fell under the size cutoff
825 * enforced by parallel_vacuum_compute_workers().
826 */
827static void
829{
831
832 /*
833 * Increment the active worker count if we are able to launch any worker.
834 */
837
838 for (int i = 0; i < pvs->nindexes; i++)
839 {
840 PVIndStats *indstats = &(pvs->indstats[i]);
841
842 /* Skip, indexes that are safe for workers */
843 if (indstats->parallel_workers_can_process)
844 continue;
845
846 /* Do vacuum or cleanup of the index */
847 parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
848 }
849
850 /*
851 * We have completed the index vacuum so decrement the active worker
852 * count.
853 */
856}
857
858/*
859 * Vacuum or cleanup index either by leader process or by one of the worker
860 * process. After vacuuming the index this function copies the index
861 * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
862 * segment.
863 */
864static void
866 PVIndStats *indstats)
867{
868 IndexBulkDeleteResult *istat = NULL;
869 IndexBulkDeleteResult *istat_res;
870 IndexVacuumInfo ivinfo;
871
872 /*
873 * Update the pointer to the corresponding bulk-deletion result if someone
874 * has already updated it
875 */
876 if (indstats->istat_updated)
877 istat = &(indstats->istat);
878
879 ivinfo.index = indrel;
880 ivinfo.heaprel = pvs->heaprel;
881 ivinfo.analyze_only = false;
882 ivinfo.report_progress = false;
883 ivinfo.message_level = DEBUG2;
885 ivinfo.num_heap_tuples = pvs->shared->reltuples;
886 ivinfo.strategy = pvs->bstrategy;
887
888 /* Update error traceback information */
890 pvs->status = indstats->status;
891
892 switch (indstats->status)
893 {
895 istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items,
896 &pvs->shared->dead_items_info);
897 break;
899 istat_res = vac_cleanup_one_index(&ivinfo, istat);
900 break;
901 default:
902 elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
903 indstats->status,
905 }
906
907 /*
908 * Copy the index bulk-deletion result returned from ambulkdelete and
909 * amvacuumcleanup to the DSM segment if it's the first cycle because they
910 * allocate locally and it's possible that an index will be vacuumed by a
911 * different vacuum process the next cycle. Copying the result normally
912 * happens only the first time an index is vacuumed. For any additional
913 * vacuum pass, we directly point to the result on the DSM segment and
914 * pass it to vacuum index APIs so that workers can update it directly.
915 *
916 * Since all vacuum workers write the bulk-deletion result at different
917 * slots we can write them without locking.
918 */
919 if (!indstats->istat_updated && istat_res != NULL)
920 {
921 memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
922 indstats->istat_updated = true;
923
924 /* Free the locally-allocated bulk-deletion result */
925 pfree(istat_res);
926 }
927
928 /*
929 * Update the status to completed. No need to lock here since each worker
930 * touches different indexes.
931 */
933
934 /* Reset error traceback information */
936 pfree(pvs->indname);
937 pvs->indname = NULL;
938
939 /*
940 * Call the parallel variant of pgstat_progress_incr_param so workers can
941 * report progress of index vacuum to the leader.
942 */
944}
945
946/*
947 * Returns false, if the given index can't participate in the next execution of
948 * parallel index vacuum or parallel index cleanup.
949 */
950static bool
952 bool vacuum)
953{
954 uint8 vacoptions;
955
956 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
957
958 /* In parallel vacuum case, check if it supports parallel bulk-deletion */
959 if (vacuum)
960 return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
961
962 /* Not safe, if the index does not support parallel cleanup */
963 if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
964 ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
965 return false;
966
967 /*
968 * Not safe, if the index supports parallel cleanup conditionally, but we
969 * have already processed the index (for bulkdelete). We do this to avoid
970 * the need to invoke workers when parallel index cleanup doesn't need to
971 * scan the index. See the comments for option
972 * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
973 * parallel cleanup conditionally.
974 */
975 if (num_index_scans > 0 &&
976 ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
977 return false;
978
979 return true;
980}
981
982/*
983 * Perform work within a launched parallel process.
984 *
985 * Since parallel vacuum workers perform only index vacuum or index cleanup,
986 * we don't need to report progress information.
987 */
988void
990{
992 Relation rel;
993 Relation *indrels;
994 PVIndStats *indstats;
995 PVShared *shared;
996 TidStore *dead_items;
997 BufferUsage *buffer_usage;
998 WalUsage *wal_usage;
999 int nindexes;
1000 char *sharedquery;
1001 ErrorContextCallback errcallback;
1002
1003 /*
1004 * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
1005 * don't support parallel vacuum for autovacuum as of now.
1006 */
1008
1009 elog(DEBUG1, "starting parallel vacuum worker");
1010
1011 shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
1012
1013 /* Set debug_query_string for individual workers */
1014 sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
1015 debug_query_string = sharedquery;
1017
1018 /* Track query ID */
1019 pgstat_report_query_id(shared->queryid, false);
1020
1021 /*
1022 * Open table. The lock mode is the same as the leader process. It's
1023 * okay because the lock mode does not conflict among the parallel
1024 * workers.
1025 */
1027
1028 /*
1029 * Open all indexes. indrels are sorted in order by OID, which should be
1030 * matched to the leader's one.
1031 */
1032 vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
1033 Assert(nindexes > 0);
1034
1035 if (shared->maintenance_work_mem_worker > 0)
1037
1038 /* Set index statistics */
1039 indstats = (PVIndStats *) shm_toc_lookup(toc,
1041 false);
1042
1043 /* Find dead_items in shared memory */
1044 dead_items = TidStoreAttach(shared->dead_items_dsa_handle,
1045 shared->dead_items_handle);
1046
1047 /* Set cost-based vacuum delay */
1053
1054 /* Set parallel vacuum state */
1055 pvs.indrels = indrels;
1056 pvs.nindexes = nindexes;
1057 pvs.indstats = indstats;
1058 pvs.shared = shared;
1059 pvs.dead_items = dead_items;
1062 pvs.heaprel = rel;
1063
1064 /* These fields will be filled during index vacuum or cleanup */
1065 pvs.indname = NULL;
1067
1068 /* Each parallel VACUUM worker gets its own access strategy. */
1070 shared->ring_nbuffers * (BLCKSZ / 1024));
1071
1072 /* Setup error traceback support for ereport() */
1074 errcallback.arg = &pvs;
1075 errcallback.previous = error_context_stack;
1076 error_context_stack = &errcallback;
1077
1078 /* Prepare to track buffer usage during parallel execution */
1080
1081 /* Process indexes to perform vacuum/cleanup */
1083
1084 /* Report buffer/WAL usage during parallel execution */
1085 buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
1086 wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
1088 &wal_usage[ParallelWorkerNumber]);
1089
1090 TidStoreDetach(dead_items);
1091
1092 /* Pop the error context stack */
1093 error_context_stack = errcallback.previous;
1094
1095 vac_close_indexes(nindexes, indrels, RowExclusiveLock);
1098}
1099
1100/*
1101 * Error context callback for errors occurring during parallel index vacuum.
1102 * The error context messages should match the messages set in the lazy vacuum
1103 * error context. If you change this function, change vacuum_error_callback()
1104 * as well.
1105 */
1106static void
1108{
1109 ParallelVacuumState *errinfo = arg;
1110
1111 switch (errinfo->status)
1112 {
1114 errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
1115 errinfo->indname,
1116 errinfo->relnamespace,
1117 errinfo->relname);
1118 break;
1120 errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
1121 errinfo->indname,
1122 errinfo->relnamespace,
1123 errinfo->relname);
1124 break;
1127 default:
1128 return;
1129 }
1130}
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:259
int min_parallel_index_scan_size
Definition: allpaths.c:82
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition: atomics.h:439
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:221
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:366
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:424
static void pg_atomic_write_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:276
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition: atomics.h:239
void VacuumUpdateCosts(void)
Definition: autovacuum.c:1651
int ParallelWorkerNumber
Definition: parallel.c:114
void InitializeParallelDSM(ParallelContext *pcxt)
Definition: parallel.c:207
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
Definition: parallel.c:792
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition: parallel.c:569
void ReinitializeParallelDSM(ParallelContext *pcxt)
Definition: parallel.c:504
void DestroyParallelContext(ParallelContext *pcxt)
Definition: parallel.c:946
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
Definition: parallel.c:169
void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
Definition: parallel.c:554
void pgstat_progress_parallel_incr_param(int index, int64 incr)
uint64 pgstat_get_my_query_id(void)
void pgstat_report_query_id(uint64 query_id, bool force)
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
@ BAS_VACUUM
Definition: bufmgr.h:39
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:273
#define Min(x, y)
Definition: c.h:958
uint8_t uint8
Definition: c.h:483
#define ngettext(s, p, n)
Definition: c.h:1135
#define Max(x, y)
Definition: c.h:952
#define Assert(condition)
Definition: c.h:812
uint64_t uint64
Definition: c.h:486
#define MemSet(start, val, len)
Definition: c.h:974
size_t Size
Definition: c.h:559
dsa_handle dsa_get_handle(dsa_area *area)
Definition: dsa.c:498
uint64 dsa_pointer
Definition: dsa.h:62
dsm_handle dsa_handle
Definition: dsa.h:136
ErrorContextCallback * error_context_stack
Definition: elog.c:94
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define errcontext
Definition: elog.h:196
#define DEBUG2
Definition: elog.h:29
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
BufferAccessStrategy GetAccessStrategyWithSize(BufferAccessStrategyType btype, int ring_size_kb)
Definition: freelist.c:584
int GetAccessStrategyBufferCount(BufferAccessStrategy strategy)
Definition: freelist.c:624
void FreeAccessStrategy(BufferAccessStrategy strategy)
Definition: freelist.c:681
int max_parallel_maintenance_workers
Definition: globals.c:133
bool IsUnderPostmaster
Definition: globals.c:119
int VacuumCostBalance
Definition: globals.c:156
int maintenance_work_mem
Definition: globals.c:132
#define IsParallelWorker()
Definition: parallel.h:60
void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition: instrument.c:218
void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition: instrument.c:208
void InstrStartParallelQuery(void)
Definition: instrument.c:200
int i
Definition: isn.c:72
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3366
@ LWTRANCHE_PARALLEL_VACUUM_DSA
Definition: lwlock.h:217
char * pstrdup(const char *in)
Definition: mcxt.c:1696
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
void * arg
const char * debug_query_string
Definition: postgres.c:87
unsigned int Oid
Definition: postgres_ext.h:31
#define PROC_IN_VACUUM
Definition: proc.h:58
#define PROGRESS_VACUUM_INDEXES_PROCESSED
Definition: progress.h:30
#define RelationGetRelid(relation)
Definition: rel.h:505
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RelationGetNamespace(relation)
Definition: rel.h:546
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size mul_size(Size s1, Size s2)
Definition: shmem.c:505
PGPROC * MyProc
Definition: proc.c:66
struct ErrorContextCallback * previous
Definition: elog.h:296
void(* callback)(void *arg)
Definition: elog.h:297
bool amusemaintenanceworkmem
Definition: amapi.h:263
uint8 amparallelvacuumoptions
Definition: amapi.h:267
Relation index
Definition: genam.h:48
double num_heap_tuples
Definition: genam.h:54
bool analyze_only
Definition: genam.h:50
BufferAccessStrategy strategy
Definition: genam.h:55
Relation heaprel
Definition: genam.h:49
bool report_progress
Definition: genam.h:51
int message_level
Definition: genam.h:53
bool estimated_count
Definition: genam.h:52
uint8 statusFlags
Definition: proc.h:242
bool istat_updated
IndexBulkDeleteResult istat
bool parallel_workers_can_process
PVIndVacStatus status
double reltuples
pg_atomic_uint32 cost_balance
int maintenance_work_mem_worker
pg_atomic_uint32 active_nworkers
dsa_pointer dead_items_handle
dsa_handle dead_items_dsa_handle
uint64 queryid
int ring_nbuffers
pg_atomic_uint32 idx
bool estimated_count
VacDeadItemsInfo dead_items_info
shm_toc_estimator estimator
Definition: parallel.h:41
shm_toc * toc
Definition: parallel.h:44
int nworkers_launched
Definition: parallel.h:37
BufferAccessStrategy bstrategy
BufferUsage * buffer_usage
ParallelContext * pcxt
PVIndStats * indstats
PVIndVacStatus status
struct IndexAmRoutine * rd_indam
Definition: rel.h:206
size_t max_bytes
Definition: vacuum.h:287
int64 num_items
Definition: vacuum.h:288
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
dsa_area * TidStoreGetDSA(TidStore *ts)
Definition: tidstore.c:544
void TidStoreDetach(TidStore *ts)
Definition: tidstore.c:269
void TidStoreDestroy(TidStore *ts)
Definition: tidstore.c:317
TidStore * TidStoreAttach(dsa_handle area_handle, dsa_pointer handle)
Definition: tidstore.c:244
dsa_pointer TidStoreGetHandle(TidStore *ts)
Definition: tidstore.c:552
TidStore * TidStoreCreateShared(size_t max_bytes, int tranche_id)
Definition: tidstore.c:208
pg_atomic_uint32 * VacuumActiveNWorkers
Definition: vacuum.c:102
void vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel)
Definition: vacuum.c:2297
void vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, MemoryContext vac_context, bool isTopLevel)
Definition: vacuum.c:478
int VacuumCostBalanceLocal
Definition: vacuum.c:103
IndexBulkDeleteResult * vac_cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat)
Definition: vacuum.c:2536
void vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
Definition: vacuum.c:2340
pg_atomic_uint32 * VacuumSharedCostBalance
Definition: vacuum.c:101
IndexBulkDeleteResult * vac_bulkdel_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat, TidStore *dead_items, VacDeadItemsInfo *dead_items_info)
Definition: vacuum.c:2515
#define VACUUM_OPTION_PARALLEL_CLEANUP
Definition: vacuum.h:63
#define VACUUM_OPTION_NO_PARALLEL
Definition: vacuum.h:42
#define VACUUM_OPTION_PARALLEL_BULKDEL
Definition: vacuum.h:48
#define VACUUM_OPTION_MAX_VALID_VALUE
Definition: vacuum.h:66
#define VACUUM_OPTION_PARALLEL_COND_CLEANUP
Definition: vacuum.h:55
static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, bool vacuum)
static void parallel_vacuum_error_callback(void *arg)
TidStore * parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, bool *will_parallel_vacuum)
#define PARALLEL_VACUUM_KEY_INDEX_STATS
#define PARALLEL_VACUUM_KEY_QUERY_TEXT
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE
ParallelVacuumState * parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int vac_work_mem, int elevel, BufferAccessStrategy bstrategy)
#define PARALLEL_VACUUM_KEY_SHARED
void parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans)
#define PARALLEL_VACUUM_KEY_WAL_USAGE
void parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, bool estimated_count)
static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, PVIndStats *indstats)
void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
struct PVShared PVShared
static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, bool vacuum)
static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
struct PVIndStats PVIndStats
void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
PVIndVacStatus
@ PARALLEL_INDVAC_STATUS_NEED_CLEANUP
@ PARALLEL_INDVAC_STATUS_INITIAL
@ PARALLEL_INDVAC_STATUS_NEED_BULKDELETE
@ PARALLEL_INDVAC_STATUS_COMPLETED
void ExitParallelMode(void)
Definition: xact.c:1063
void EnterParallelMode(void)
Definition: xact.c:1050