PostgreSQL Source Code git master
Loading...
Searching...
No Matches
vacuumparallel.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * vacuumparallel.c
4 * Support routines for parallel vacuum and autovacuum execution. In the
5 * comments below, the word "vacuum" will refer to both vacuum and
6 * autovacuum.
7 *
8 * This file contains routines that are intended to support setting up, using,
9 * and tearing down a ParallelVacuumState.
10 *
11 * In a parallel vacuum, we perform both index bulk deletion and index cleanup
12 * with parallel worker processes. Individual indexes are processed by one
13 * vacuum process. ParallelVacuumState contains shared information as well as
14 * the memory space for storing dead items allocated in the DSA area. We
15 * launch parallel worker processes at the start of parallel index
16 * bulk-deletion and index cleanup and once all indexes are processed, the
17 * parallel worker processes exit. Each time we process indexes in parallel,
18 * the parallel context is re-initialized so that the same DSM can be used for
19 * multiple passes of index bulk-deletion and index cleanup.
20 *
21 * For parallel autovacuum, we need to propagate cost-based vacuum delay
22 * parameters from the leader to its workers, as the leader's parameters can
23 * change even while processing a table (e.g., due to a config reload).
24 * The PVSharedCostParams struct manages these parameters using a
25 * generation counter. Each parallel worker polls this shared state and
26 * refreshes its local delay parameters whenever a change is detected.
27 *
28 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
29 * Portions Copyright (c) 1994, Regents of the University of California
30 *
31 * IDENTIFICATION
32 * src/backend/commands/vacuumparallel.c
33 *
34 *-------------------------------------------------------------------------
35 */
36#include "postgres.h"
37
38#include "access/amapi.h"
39#include "access/table.h"
40#include "access/xact.h"
41#include "commands/progress.h"
42#include "commands/vacuum.h"
43#include "executor/instrument.h"
44#include "optimizer/paths.h"
45#include "pgstat.h"
46#include "storage/bufmgr.h"
47#include "storage/proc.h"
48#include "tcop/tcopprot.h"
49#include "utils/lsyscache.h"
50#include "utils/rel.h"
51
52/*
53 * DSM keys for parallel vacuum. Unlike other parallel execution code, since
54 * we don't need to worry about DSM keys conflicting with plan_node_id we can
55 * use small integers.
56 */
57#define PARALLEL_VACUUM_KEY_SHARED 1
58#define PARALLEL_VACUUM_KEY_QUERY_TEXT 2
59#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3
60#define PARALLEL_VACUUM_KEY_WAL_USAGE 4
61#define PARALLEL_VACUUM_KEY_INDEX_STATS 5
62
63/*
64 * Struct for cost-based vacuum delay related parameters to share among an
65 * autovacuum worker and its parallel vacuum workers.
66 */
67typedef struct PVSharedCostParams
68{
69 /*
70 * The generation counter is incremented by the leader process each time
71 * it updates the shared cost-based vacuum delay parameters. Parallel
72 * vacuum workers compare it with their local generation,
73 * shared_params_generation_local, to detect whether they need to refresh
74 * their local parameters. The generation starts from 1 so that a freshly
75 * started worker (whose local copy is 0) will always load the initial
76 * parameters on its first check.
77 */
79
80 slock_t mutex; /* protects all fields below */
81
82 /* Parameters to share with parallel workers */
83 double cost_delay;
89
90/*
91 * Shared information among parallel workers. So this is allocated in the DSM
92 * segment.
93 */
94typedef struct PVShared
95{
96 /*
97 * Target table relid, log level (for messages about parallel workers
98 * launched during VACUUM VERBOSE) and query ID. These fields are not
99 * modified during the parallel vacuum.
100 */
104
105 /*
106 * Fields for both index vacuum and cleanup.
107 *
108 * reltuples is the total number of input heap tuples. We set either old
109 * live tuples in the index vacuum case or the new live tuples in the
110 * index cleanup case.
111 *
112 * estimated_count is true if reltuples is an estimated value. (Note that
113 * reltuples could be -1 in this case, indicating we have no idea.)
114 */
115 double reltuples;
117
118 /*
119 * In single process vacuum we could consume more memory during index
120 * vacuuming or cleanup apart from the memory for heap scanning. In
121 * parallel vacuum, since individual vacuum workers can consume memory
122 * equal to maintenance_work_mem, the new maintenance_work_mem for each
123 * worker is set such that the parallel operation doesn't consume more
124 * memory than single process vacuum.
125 */
127
128 /*
129 * The number of buffers each worker's Buffer Access Strategy ring should
130 * contain.
131 */
133
134 /*
135 * Shared vacuum cost balance. During parallel vacuum,
136 * VacuumSharedCostBalance points to this value and it accumulates the
137 * balance of each parallel vacuum worker.
138 */
140
141 /*
142 * Number of active parallel workers. This is used for computing the
143 * minimum threshold of the vacuum cost balance before a worker sleeps for
144 * cost-based delay.
145 */
147
148 /* Counter for vacuuming and cleanup */
150
151 /* DSA handle where the TidStore lives */
153
154 /* DSA pointer to the shared TidStore */
156
157 /* Statistics of shared dead items */
159
160 /*
161 * If 'true' then we are running parallel autovacuum. Otherwise, we are
162 * running parallel maintenance VACUUM.
163 */
165
166 /*
167 * Cost-based vacuum delay parameters shared between the autovacuum leader
168 * and its parallel workers.
169 */
172
173/* Status used during parallel index vacuum or cleanup */
181
182/*
183 * Struct for index vacuum statistics of an index that is used for parallel vacuum.
184 * This includes the status of parallel index vacuum as well as index statistics.
185 */
186typedef struct PVIndStats
187{
188 /*
189 * The following two fields are set by leader process before executing
190 * parallel index vacuum or parallel index cleanup. These fields are not
191 * fixed for the entire VACUUM operation. They are only fixed for an
192 * individual parallel index vacuum and cleanup.
193 *
194 * parallel_workers_can_process is true if both leader and worker can
195 * process the index, otherwise only leader can process it.
196 */
199
200 /*
201 * Individual worker or leader stores the result of index vacuum or
202 * cleanup.
203 */
204 bool istat_updated; /* are the stats updated? */
207
208/*
209 * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
210 */
212{
213 /* NULL for worker processes */
215
216 /* Parent Heap Relation */
218
219 /* Target indexes */
222
223 /* Shared information among parallel vacuum workers */
225
226 /*
227 * Shared index statistics among parallel vacuum workers. The array
228 * element is allocated for every index, even those indexes where parallel
229 * index vacuuming is unsafe or not worthwhile (e.g.,
230 * will_parallel_vacuum[] is false). During parallel vacuum,
231 * IndexBulkDeleteResult of each index is kept in DSM and is copied into
232 * local memory at the end of parallel vacuum.
233 */
235
236 /* Shared dead items space among parallel vacuum workers */
238
239 /* Points to buffer usage area in DSM */
241
242 /* Points to WAL usage area in DSM */
244
245 /*
246 * False if the index is totally unsuitable target for all parallel
247 * processing. For example, the index could be <
248 * min_parallel_index_scan_size cutoff.
249 */
251
252 /*
253 * The number of indexes that support parallel index bulk-deletion and
254 * parallel index cleanup respectively.
255 */
259
260 /* Buffer access strategy used by leader process */
262
263 /*
264 * Error reporting state. The error callback is set only for workers
265 * processes during parallel index vacuum.
266 */
268 char *relname;
269 char *indname;
271};
272
274
275/*
276 * Worker-local copy of the last cost-parameter generation this worker has
277 * applied. Initialized to 0; since the leader initializes the shared
278 * generation counter to 1, the first call to
279 * parallel_vacuum_update_shared_delay_params() will always detect a
280 * mismatch and read the initial parameters from shared memory.
281 */
283
284static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
285 bool *will_parallel_vacuum);
286static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
287 bool vacuum, PVWorkerStats *wstats);
291 PVIndStats *indstats);
292static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
293 bool vacuum);
294static void parallel_vacuum_error_callback(void *arg);
297
298/*
299 * Try to enter parallel mode and create a parallel context. Then initialize
300 * shared memory state.
301 *
302 * On success, return parallel vacuum state. Otherwise return NULL.
303 */
305parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
306 int nrequested_workers, int vac_work_mem,
307 int elevel, BufferAccessStrategy bstrategy)
308{
310 ParallelContext *pcxt;
311 PVShared *shared;
312 TidStore *dead_items;
313 PVIndStats *indstats;
314 BufferUsage *buffer_usage;
315 WalUsage *wal_usage;
316 bool *will_parallel_vacuum;
317 Size est_indstats_len;
318 Size est_shared_len;
319 int nindexes_mwm = 0;
320 int parallel_workers = 0;
321 int querylen;
322
323 /*
324 * A parallel vacuum must be requested and there must be indexes on the
325 * relation
326 */
327 Assert(nrequested_workers >= 0);
328 Assert(nindexes > 0);
329
330 /*
331 * Compute the number of parallel vacuum workers to launch
332 */
333 will_parallel_vacuum = palloc0_array(bool, nindexes);
334 parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
335 nrequested_workers,
336 will_parallel_vacuum);
337 if (parallel_workers <= 0)
338 {
339 /* Can't perform vacuum in parallel -- return NULL */
340 pfree(will_parallel_vacuum);
341 return NULL;
342 }
343
345 pvs->indrels = indrels;
346 pvs->nindexes = nindexes;
347 pvs->will_parallel_vacuum = will_parallel_vacuum;
348 pvs->bstrategy = bstrategy;
349 pvs->heaprel = rel;
350
352 pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
353 parallel_workers);
354 Assert(pcxt->nworkers > 0);
355 pvs->pcxt = pcxt;
356
357 /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
358 est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
359 shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
361
362 /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
363 est_shared_len = sizeof(PVShared);
364 shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
366
367 /*
368 * Estimate space for BufferUsage and WalUsage --
369 * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
370 *
371 * If there are no extensions loaded that care, we could skip this. We
372 * have no way of knowing whether anyone's looking at pgBufferUsage or
373 * pgWalUsage, so do it unconditionally.
374 */
376 mul_size(sizeof(BufferUsage), pcxt->nworkers));
379 mul_size(sizeof(WalUsage), pcxt->nworkers));
381
382 /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
384 {
385 querylen = strlen(debug_query_string);
386 shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
388 }
389 else
390 querylen = 0; /* keep compiler quiet */
391
393
394 /* Prepare index vacuum stats */
395 indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
396 MemSet(indstats, 0, est_indstats_len);
397 for (int i = 0; i < nindexes; i++)
398 {
399 Relation indrel = indrels[i];
400 uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
401
402 /*
403 * Cleanup option should be either disabled, always performing in
404 * parallel or conditionally performing in parallel.
405 */
406 Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
407 ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
409
410 if (!will_parallel_vacuum[i])
411 continue;
412
414 nindexes_mwm++;
415
416 /*
417 * Remember the number of indexes that support parallel operation for
418 * each phase.
419 */
420 if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
422 if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
424 if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
426 }
428 pvs->indstats = indstats;
429
430 /* Prepare shared information */
431 shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
432 MemSet(shared, 0, est_shared_len);
433 shared->relid = RelationGetRelid(rel);
434 shared->elevel = elevel;
437 (nindexes_mwm > 0) ?
438 vac_work_mem / Min(parallel_workers, nindexes_mwm) :
439 vac_work_mem;
440
441 shared->dead_items_info.max_bytes = vac_work_mem * (size_t) 1024;
442
443 /* Prepare DSA space for dead items */
445 LWTRANCHE_PARALLEL_VACUUM_DSA);
446 pvs->dead_items = dead_items;
447 shared->dead_items_handle = TidStoreGetHandle(dead_items);
449
450 /* Use the same buffer size for all workers */
451 shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
452
453 pg_atomic_init_u32(&(shared->cost_balance), 0);
454 pg_atomic_init_u32(&(shared->active_nworkers), 0);
455 pg_atomic_init_u32(&(shared->idx), 0);
456
458
459 /*
460 * Initialize shared cost-based vacuum delay parameters if it's for
461 * autovacuum.
462 */
463 if (shared->is_autovacuum)
464 {
468
471 }
472
474 pvs->shared = shared;
475
476 /*
477 * Allocate space for each worker's BufferUsage and WalUsage; no need to
478 * initialize
479 */
480 buffer_usage = shm_toc_allocate(pcxt->toc,
481 mul_size(sizeof(BufferUsage), pcxt->nworkers));
483 pvs->buffer_usage = buffer_usage;
484 wal_usage = shm_toc_allocate(pcxt->toc,
485 mul_size(sizeof(WalUsage), pcxt->nworkers));
487 pvs->wal_usage = wal_usage;
488
489 /* Store query string for workers */
491 {
492 char *sharedquery;
493
494 sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
495 memcpy(sharedquery, debug_query_string, querylen + 1);
496 sharedquery[querylen] = '\0';
497 shm_toc_insert(pcxt->toc,
498 PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
499 }
500
501 /* Success -- return parallel vacuum state */
502 return pvs;
503}
504
505/*
506 * Destroy the parallel context, and end parallel mode.
507 *
508 * Since writes are not allowed during parallel mode, copy the
509 * updated index statistics from DSM into local memory and then later use that
510 * to update the index statistics. One might think that we can exit from
511 * parallel mode, update the index statistics and then destroy parallel
512 * context, but that won't be safe (see ExitParallelMode).
513 */
514void
516{
518
519 /* Copy the updated statistics */
520 for (int i = 0; i < pvs->nindexes; i++)
521 {
522 PVIndStats *indstats = &(pvs->indstats[i]);
523
524 if (indstats->istat_updated)
525 {
527 memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
528 }
529 else
530 istats[i] = NULL;
531 }
532
534
537
540
542 pfree(pvs);
543}
544
545/*
546 * DSM detach callback. This is invoked when an autovacuum worker detaches
547 * from the DSM segment holding PVShared. It ensures to reset the local pointer
548 * to the shared state even if paralell vacuum raises an error and doesn't
549 * call parallel_vacuum_end().
550 */
551static void
557
558/*
559 * Returns the dead items space and dead items information.
560 */
561TidStore *
563{
564 *dead_items_info_p = &(pvs->shared->dead_items_info);
565 return pvs->dead_items;
566}
567
568/* Forget all items in dead_items */
569void
571{
572 VacDeadItemsInfo *dead_items_info = &(pvs->shared->dead_items_info);
573
574 /*
575 * Free the current tidstore and return allocated DSA segments to the
576 * operating system. Then we recreate the tidstore with the same max_bytes
577 * limitation we just used.
578 */
580 pvs->dead_items = TidStoreCreateShared(dead_items_info->max_bytes,
581 LWTRANCHE_PARALLEL_VACUUM_DSA);
582
583 /* Update the DSA pointer for dead_items to the new one */
586
587 /* Reset the counter */
588 dead_items_info->num_items = 0;
589}
590
591/*
592 * Do parallel index bulk-deletion with parallel workers.
593 */
594void
596 int num_index_scans, PVWorkerStats *wstats)
597{
599
600 /*
601 * We can only provide an approximate value of num_heap_tuples, at least
602 * for now.
603 */
604 pvs->shared->reltuples = num_table_tuples;
605 pvs->shared->estimated_count = true;
606
607 parallel_vacuum_process_all_indexes(pvs, num_index_scans, true, wstats);
608}
609
610/*
611 * Do parallel index cleanup with parallel workers.
612 */
613void
615 int num_index_scans, bool estimated_count,
616 PVWorkerStats *wstats)
617{
619
620 /*
621 * We can provide a better estimate of total number of surviving tuples
622 * (we assume indexes are more interested in that than in the number of
623 * nominally live tuples).
624 */
625 pvs->shared->reltuples = num_table_tuples;
626 pvs->shared->estimated_count = estimated_count;
627
628 parallel_vacuum_process_all_indexes(pvs, num_index_scans, false, wstats);
629}
630
631/*
632 * Fill in the given structure with cost-based vacuum delay parameter values.
633 */
634static inline void
643
644/*
645 * Updates the cost-based vacuum delay parameters for parallel autovacuum
646 * workers.
647 *
648 * For non-autovacuum parallel workers, this function will have no effect.
649 */
650void
652{
653 uint32 params_generation;
654
656
657 /* Quick return if the worker is not running for the autovacuum */
658 if (pv_shared_cost_params == NULL)
659 return;
660
662 Assert(shared_params_generation_local <= params_generation);
663
664 /* Return if parameters had not changed in the leader */
665 if (params_generation == shared_params_generation_local)
666 return;
667
675
677
678 shared_params_generation_local = params_generation;
679
680 elog(DEBUG2,
681 "parallel autovacuum worker updated cost params: cost_limit=%d, cost_delay=%g, cost_page_miss=%d, cost_page_dirty=%d, cost_page_hit=%d",
687}
688
689/*
690 * Store the cost-based vacuum delay parameters in the shared memory so that
691 * parallel vacuum workers can consume them (see
692 * parallel_vacuum_update_shared_delay_params()).
693 */
694void
696{
698
699 /*
700 * Quick return if the leader process is not sharing the delay parameters.
701 */
702 if (pv_shared_cost_params == NULL)
703 return;
704
705 /*
706 * Check if any delay parameters have changed. We can read them without
707 * locks as only the leader can modify them.
708 */
714 return;
715
716 /* Update the shared delay parameters */
720
721 /*
722 * Increment the generation of the parameters, i.e. let parallel workers
723 * know that they should re-read shared cost params.
724 */
726}
727
728/*
729 * Compute the number of parallel worker processes to request. Both index
730 * vacuum and index cleanup can be executed with parallel workers.
731 * The index is eligible for parallel vacuum iff its size is greater than
732 * min_parallel_index_scan_size as invoking workers for very small indexes
733 * can hurt performance.
734 *
735 * nrequested is the number of parallel workers that user requested. If
736 * nrequested is 0, we compute the parallel degree based on nindexes, that is
737 * the number of indexes that support parallel vacuum. This function also
738 * sets will_parallel_vacuum to remember indexes that participate in parallel
739 * vacuum.
740 */
741static int
742parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
743 bool *will_parallel_vacuum)
744{
745 int nindexes_parallel = 0;
746 int nindexes_parallel_bulkdel = 0;
747 int nindexes_parallel_cleanup = 0;
748 int parallel_workers;
749 int max_workers;
750
751 max_workers = AmAutoVacuumWorkerProcess() ?
754
755 /*
756 * We don't allow performing parallel operation in standalone backend or
757 * when parallelism is disabled.
758 */
759 if (!IsUnderPostmaster || max_workers == 0)
760 return 0;
761
762 /*
763 * Compute the number of indexes that can participate in parallel vacuum.
764 */
765 for (int i = 0; i < nindexes; i++)
766 {
767 Relation indrel = indrels[i];
768 uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
769
770 /* Skip index that is not a suitable target for parallel index vacuum */
771 if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
773 continue;
774
775 will_parallel_vacuum[i] = true;
776
777 if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
778 nindexes_parallel_bulkdel++;
779 if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
780 ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
781 nindexes_parallel_cleanup++;
782 }
783
784 nindexes_parallel = Max(nindexes_parallel_bulkdel,
785 nindexes_parallel_cleanup);
786
787 /* The leader process takes one index */
788 nindexes_parallel--;
789
790 /* No index supports parallel vacuum */
791 if (nindexes_parallel <= 0)
792 return 0;
793
794 /* Compute the parallel degree */
795 parallel_workers = (nrequested > 0) ?
796 Min(nrequested, nindexes_parallel) : nindexes_parallel;
797
798 /* Cap by GUC variable */
799 parallel_workers = Min(parallel_workers, max_workers);
800
801 return parallel_workers;
802}
803
804/*
805 * Perform index vacuum or index cleanup with parallel workers. This function
806 * must be used by the parallel vacuum leader process.
807 *
808 * If wstats is not NULL, the parallel worker statistics are updated.
809 */
810static void
812 bool vacuum, PVWorkerStats *wstats)
813{
814 int nworkers;
815 PVIndVacStatus new_status;
816
818
819 if (vacuum)
820 {
822
823 /* Determine the number of parallel workers to launch */
824 nworkers = pvs->nindexes_parallel_bulkdel;
825 }
826 else
827 {
829
830 /* Determine the number of parallel workers to launch */
831 nworkers = pvs->nindexes_parallel_cleanup;
832
833 /* Add conditionally parallel-aware indexes if in the first time call */
834 if (num_index_scans == 0)
835 nworkers += pvs->nindexes_parallel_condcleanup;
836 }
837
838 /* The leader process will participate */
839 nworkers--;
840
841 /*
842 * It is possible that parallel context is initialized with fewer workers
843 * than the number of indexes that need a separate worker in the current
844 * phase, so we need to consider it. See
845 * parallel_vacuum_compute_workers().
846 */
847 nworkers = Min(nworkers, pvs->pcxt->nworkers);
848
849 /* Update the statistics, if we asked to */
850 if (wstats != NULL && nworkers > 0)
851 wstats->nplanned += nworkers;
852
853 /*
854 * Set index vacuum status and mark whether parallel vacuum worker can
855 * process it.
856 */
857 for (int i = 0; i < pvs->nindexes; i++)
858 {
859 PVIndStats *indstats = &(pvs->indstats[i]);
860
862 indstats->status = new_status;
864 (pvs->will_parallel_vacuum[i] &&
866 num_index_scans,
867 vacuum));
868 }
869
870 /* Reset the parallel index processing and progress counters */
871 pg_atomic_write_u32(&(pvs->shared->idx), 0);
872
873 /* Setup the shared cost-based vacuum delay and launch workers */
874 if (nworkers > 0)
875 {
876 /* Reinitialize parallel context to relaunch parallel workers */
877 if (num_index_scans > 0)
879
880 /*
881 * Set up shared cost balance and the number of active workers for
882 * vacuum delay. We need to do this before launching workers as
883 * otherwise, they might not see the updated values for these
884 * parameters.
885 */
888
889 /*
890 * The number of workers can vary between bulkdelete and cleanup
891 * phase.
892 */
893 ReinitializeParallelWorkers(pvs->pcxt, nworkers);
894
896
897 if (pvs->pcxt->nworkers_launched > 0)
898 {
899 /*
900 * Reset the local cost values for leader backend as we have
901 * already accumulated the remaining balance of heap.
902 */
905
906 /* Enable shared cost balance for leader backend */
909
910 /* Update the statistics, if we asked to */
911 if (wstats != NULL)
912 wstats->nlaunched += pvs->pcxt->nworkers_launched;
913 }
914
915 if (vacuum)
916 ereport(pvs->shared->elevel,
917 (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
918 "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
919 pvs->pcxt->nworkers_launched),
920 pvs->pcxt->nworkers_launched, nworkers)));
921 else
922 ereport(pvs->shared->elevel,
923 (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
924 "launched %d parallel vacuum workers for index cleanup (planned: %d)",
925 pvs->pcxt->nworkers_launched),
926 pvs->pcxt->nworkers_launched, nworkers)));
927 }
928
929 /* Vacuum the indexes that can be processed by only leader process */
931
932 /*
933 * Join as a parallel worker. The leader vacuums alone processes all
934 * parallel-safe indexes in the case where no workers are launched.
935 */
937
938 /*
939 * Next, accumulate buffer and WAL usage. (This must wait for the workers
940 * to finish, or we might get incomplete data.)
941 */
942 if (nworkers > 0)
943 {
944 /* Wait for all vacuum workers to finish */
946
947 for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
949 }
950
951 /*
952 * Reset all index status back to initial (while checking that we have
953 * vacuumed all indexes).
954 */
955 for (int i = 0; i < pvs->nindexes; i++)
956 {
957 PVIndStats *indstats = &(pvs->indstats[i]);
958
960 elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
962
964 }
965
966 /*
967 * Carry the shared balance value to heap scan and disable shared costing
968 */
970 {
974 }
975}
976
977/*
978 * Index vacuum/cleanup routine used by the leader process and parallel
979 * vacuum worker processes to vacuum the indexes in parallel.
980 */
981static void
983{
984 /*
985 * Increment the active worker count if we are able to launch any worker.
986 */
989
990 /* Loop until all indexes are vacuumed */
991 for (;;)
992 {
993 int idx;
994 PVIndStats *indstats;
995
996 /* Get an index number to process */
997 idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
998
999 /* Done for all indexes? */
1000 if (idx >= pvs->nindexes)
1001 break;
1002
1003 indstats = &(pvs->indstats[idx]);
1004
1005 /*
1006 * Skip vacuuming index that is unsafe for workers or has an
1007 * unsuitable target for parallel index vacuum (this is vacuumed in
1008 * parallel_vacuum_process_unsafe_indexes() by the leader).
1009 */
1010 if (!indstats->parallel_workers_can_process)
1011 continue;
1012
1013 /* Do vacuum or cleanup of the index */
1014 parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
1015 }
1016
1017 /*
1018 * We have completed the index vacuum so decrement the active worker
1019 * count.
1020 */
1023}
1024
1025/*
1026 * Perform parallel vacuuming of indexes in leader process.
1027 *
1028 * Handles index vacuuming (or index cleanup) for indexes that are not
1029 * parallel safe. It's possible that this will vary for a given index, based
1030 * on details like whether we're performing index cleanup right now.
1031 *
1032 * Also performs vacuuming of smaller indexes that fell under the size cutoff
1033 * enforced by parallel_vacuum_compute_workers().
1034 */
1035static void
1037{
1039
1040 /*
1041 * Increment the active worker count if we are able to launch any worker.
1042 */
1045
1046 for (int i = 0; i < pvs->nindexes; i++)
1047 {
1048 PVIndStats *indstats = &(pvs->indstats[i]);
1049
1050 /* Skip, indexes that are safe for workers */
1051 if (indstats->parallel_workers_can_process)
1052 continue;
1053
1054 /* Do vacuum or cleanup of the index */
1055 parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
1056 }
1057
1058 /*
1059 * We have completed the index vacuum so decrement the active worker
1060 * count.
1061 */
1064}
1065
1066/*
1067 * Vacuum or cleanup index either by leader process or by one of the worker
1068 * process. After vacuuming the index this function copies the index
1069 * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
1070 * segment.
1071 */
1072static void
1074 PVIndStats *indstats)
1075{
1076 IndexBulkDeleteResult *istat = NULL;
1077 IndexBulkDeleteResult *istat_res;
1078 IndexVacuumInfo ivinfo;
1079
1080 /*
1081 * Update the pointer to the corresponding bulk-deletion result if someone
1082 * has already updated it
1083 */
1084 if (indstats->istat_updated)
1085 istat = &(indstats->istat);
1086
1087 ivinfo.index = indrel;
1088 ivinfo.heaprel = pvs->heaprel;
1089 ivinfo.analyze_only = false;
1090 ivinfo.report_progress = false;
1091 ivinfo.message_level = DEBUG2;
1092 ivinfo.estimated_count = pvs->shared->estimated_count;
1093 ivinfo.num_heap_tuples = pvs->shared->reltuples;
1094 ivinfo.strategy = pvs->bstrategy;
1095
1096 /* Update error traceback information */
1097 pvs->indname = pstrdup(RelationGetRelationName(indrel));
1098 pvs->status = indstats->status;
1099
1100 switch (indstats->status)
1101 {
1103 istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items,
1104 &pvs->shared->dead_items_info);
1105 break;
1107 istat_res = vac_cleanup_one_index(&ivinfo, istat);
1108 break;
1109 default:
1110 elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
1111 indstats->status,
1112 RelationGetRelationName(indrel));
1113 }
1114
1115 /*
1116 * Copy the index bulk-deletion result returned from ambulkdelete and
1117 * amvacuumcleanup to the DSM segment if it's the first cycle because they
1118 * allocate locally and it's possible that an index will be vacuumed by a
1119 * different vacuum process the next cycle. Copying the result normally
1120 * happens only the first time an index is vacuumed. For any additional
1121 * vacuum pass, we directly point to the result on the DSM segment and
1122 * pass it to vacuum index APIs so that workers can update it directly.
1123 *
1124 * Since all vacuum workers write the bulk-deletion result at different
1125 * slots we can write them without locking.
1126 */
1127 if (!indstats->istat_updated && istat_res != NULL)
1128 {
1129 memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
1130 indstats->istat_updated = true;
1131
1132 /* Free the locally-allocated bulk-deletion result */
1133 pfree(istat_res);
1134 }
1135
1136 /*
1137 * Update the status to completed. No need to lock here since each worker
1138 * touches different indexes.
1139 */
1141
1142 /* Reset error traceback information */
1144 pfree(pvs->indname);
1145 pvs->indname = NULL;
1146
1147 /*
1148 * Call the parallel variant of pgstat_progress_incr_param so workers can
1149 * report progress of index vacuum to the leader.
1150 */
1152}
1153
1154/*
1155 * Returns false, if the given index can't participate in the next execution of
1156 * parallel index vacuum or parallel index cleanup.
1157 */
1158static bool
1160 bool vacuum)
1161{
1162 uint8 vacoptions;
1163
1164 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
1165
1166 /* In parallel vacuum case, check if it supports parallel bulk-deletion */
1167 if (vacuum)
1168 return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
1169
1170 /* Not safe, if the index does not support parallel cleanup */
1171 if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
1172 ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
1173 return false;
1174
1175 /*
1176 * Not safe, if the index supports parallel cleanup conditionally, but we
1177 * have already processed the index (for bulkdelete). We do this to avoid
1178 * the need to invoke workers when parallel index cleanup doesn't need to
1179 * scan the index. See the comments for option
1180 * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
1181 * parallel cleanup conditionally.
1182 */
1183 if (num_index_scans > 0 &&
1184 ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
1185 return false;
1186
1187 return true;
1188}
1189
1190/*
1191 * Perform work within a launched parallel process.
1192 *
1193 * Since parallel vacuum workers perform only index vacuum or index cleanup,
1194 * we don't need to report progress information.
1195 */
1196void
1198{
1200 Relation rel;
1201 Relation *indrels;
1202 PVIndStats *indstats;
1203 PVShared *shared;
1204 TidStore *dead_items;
1205 BufferUsage *buffer_usage;
1206 WalUsage *wal_usage;
1207 int nindexes;
1208 char *sharedquery;
1209 ErrorContextCallback errcallback;
1210
1211 /*
1212 * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
1213 * don't support parallel vacuum for autovacuum as of now.
1214 */
1216
1217 elog(DEBUG1, "starting parallel vacuum worker");
1218
1219 shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
1220
1221 /* Set debug_query_string for individual workers */
1222 sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
1223 debug_query_string = sharedquery;
1225
1226 /* Track query ID */
1227 pgstat_report_query_id(shared->queryid, false);
1228
1229 /*
1230 * Open table. The lock mode is the same as the leader process. It's
1231 * okay because the lock mode does not conflict among the parallel
1232 * workers.
1233 */
1235
1236 /*
1237 * Open all indexes. indrels are sorted in order by OID, which should be
1238 * matched to the leader's one.
1239 */
1240 vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
1241 Assert(nindexes > 0);
1242
1243 /*
1244 * Apply the desired value of maintenance_work_mem within this process.
1245 * Really we should use SetConfigOption() to change a GUC, but since we're
1246 * already in parallel mode guc.c would complain about that. Fortunately,
1247 * by the same token guc.c will not let any user-defined code change it.
1248 * So just avert your eyes while we do this:
1249 */
1250 if (shared->maintenance_work_mem_worker > 0)
1252
1253 /* Set index statistics */
1254 indstats = (PVIndStats *) shm_toc_lookup(toc,
1256 false);
1257
1258 /* Find dead_items in shared memory */
1259 dead_items = TidStoreAttach(shared->dead_items_dsa_handle,
1260 shared->dead_items_handle);
1261
1262 /* Set cost-based vacuum delay */
1263 if (shared->is_autovacuum)
1264 {
1265 /*
1266 * Parallel autovacuum workers initialize cost-based delay parameters
1267 * from the leader's shared state rather than GUC defaults, because
1268 * the leader may have applied per-table or autovacuum-specific
1269 * overrides. pv_shared_cost_params must be set before calling
1270 * parallel_vacuum_update_shared_delay_params().
1271 */
1274 }
1275 else
1277
1282
1283 /* Set parallel vacuum state */
1284 pvs.indrels = indrels;
1285 pvs.nindexes = nindexes;
1286 pvs.indstats = indstats;
1287 pvs.shared = shared;
1288 pvs.dead_items = dead_items;
1291 pvs.heaprel = rel;
1292
1293 /* These fields will be filled during index vacuum or cleanup */
1294 pvs.indname = NULL;
1296
1297 /* Each parallel VACUUM worker gets its own access strategy. */
1299 shared->ring_nbuffers * (BLCKSZ / 1024));
1300
1301 /* Setup error traceback support for ereport() */
1303 errcallback.arg = &pvs;
1304 errcallback.previous = error_context_stack;
1305 error_context_stack = &errcallback;
1306
1307 /* Prepare to track buffer usage during parallel execution */
1309
1310 /* Process indexes to perform vacuum/cleanup */
1312
1313 /* Report buffer/WAL usage during parallel execution */
1314 buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
1315 wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
1317 &wal_usage[ParallelWorkerNumber]);
1318
1319 /* Report any remaining cost-based vacuum delay time */
1323
1324 TidStoreDetach(dead_items);
1325
1326 /* Pop the error context stack */
1327 error_context_stack = errcallback.previous;
1328
1329 vac_close_indexes(nindexes, indrels, RowExclusiveLock);
1332
1333 if (shared->is_autovacuum)
1334 pv_shared_cost_params = NULL;
1335}
1336
1337/*
1338 * Error context callback for errors occurring during parallel index vacuum.
1339 * The error context messages should match the messages set in the lazy vacuum
1340 * error context. If you change this function, change vacuum_error_callback()
1341 * as well.
1342 */
1343static void
1345{
1346 ParallelVacuumState *errinfo = arg;
1347
1348 switch (errinfo->status)
1349 {
1351 errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
1352 errinfo->indname,
1353 errinfo->relnamespace,
1354 errinfo->relname);
1355 break;
1357 errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
1358 errinfo->indname,
1359 errinfo->relnamespace,
1360 errinfo->relname);
1361 break;
1364 default:
1365 return;
1366 }
1367}
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:262
int min_parallel_index_scan_size
Definition allpaths.c:86
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition atomics.h:439
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition atomics.h:219
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition atomics.h:366
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition atomics.h:424
static void pg_atomic_write_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition atomics.h:274
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition atomics.h:237
void VacuumUpdateCosts(void)
int ParallelWorkerNumber
Definition parallel.c:117
void InitializeParallelDSM(ParallelContext *pcxt)
Definition parallel.c:213
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
Definition parallel.c:805
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition parallel.c:583
void ReinitializeParallelDSM(ParallelContext *pcxt)
Definition parallel.c:511
void DestroyParallelContext(ParallelContext *pcxt)
Definition parallel.c:959
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
Definition parallel.c:175
void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
Definition parallel.c:568
void pgstat_progress_parallel_incr_param(int index, int64 incr)
void pgstat_report_query_id(int64 query_id, bool force)
int64 pgstat_get_my_query_id(void)
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
@ BAS_VACUUM
Definition bufmgr.h:40
#define RelationGetNumberOfBlocks(reln)
Definition bufmgr.h:309
#define Min(x, y)
Definition c.h:1091
uint8_t uint8
Definition c.h:622
#define ngettext(s, p, n)
Definition c.h:1270
#define Max(x, y)
Definition c.h:1085
#define Assert(condition)
Definition c.h:943
int64_t int64
Definition c.h:621
uint32_t uint32
Definition c.h:624
#define MemSet(start, val, len)
Definition c.h:1107
size_t Size
Definition c.h:689
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
dsa_handle dsa_get_handle(dsa_area *area)
Definition dsa.c:498
uint64 dsa_pointer
Definition dsa.h:62
dsm_handle dsa_handle
Definition dsa.h:136
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition dsm.c:1140
Datum arg
Definition elog.c:1322
ErrorContextCallback * error_context_stack
Definition elog.c:99
#define errcontext
Definition elog.h:200
#define DEBUG2
Definition elog.h:30
#define DEBUG1
Definition elog.h:31
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
#define palloc0_array(type, count)
Definition fe_memutils.h:77
#define palloc0_object(type)
Definition fe_memutils.h:75
BufferAccessStrategy GetAccessStrategyWithSize(BufferAccessStrategyType btype, int ring_size_kb)
Definition freelist.c:511
int GetAccessStrategyBufferCount(BufferAccessStrategy strategy)
Definition freelist.c:551
void FreeAccessStrategy(BufferAccessStrategy strategy)
Definition freelist.c:608
int VacuumCostLimit
Definition globals.c:157
int max_parallel_maintenance_workers
Definition globals.c:136
int VacuumCostPageMiss
Definition globals.c:155
bool IsUnderPostmaster
Definition globals.c:122
int autovacuum_max_parallel_workers
Definition globals.c:148
int VacuumCostBalance
Definition globals.c:160
int maintenance_work_mem
Definition globals.c:135
int VacuumCostPageDirty
Definition globals.c:156
int VacuumCostPageHit
Definition globals.c:154
double VacuumCostDelay
Definition globals.c:158
#define IsParallelWorker()
Definition parallel.h:62
void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition instrument.c:297
void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition instrument.c:287
void InstrStartParallelQuery(void)
Definition instrument.c:279
int i
Definition isn.c:77
#define ShareUpdateExclusiveLock
Definition lockdefs.h:39
#define RowExclusiveLock
Definition lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3588
char * pstrdup(const char *in)
Definition mcxt.c:1781
void pfree(void *pointer)
Definition mcxt.c:1616
#define AmAutoVacuumWorkerProcess()
Definition miscadmin.h:398
static char * errmsg
const char * debug_query_string
Definition postgres.c:94
uint64_t Datum
Definition postgres.h:70
unsigned int Oid
#define PROC_IN_VACUUM
Definition proc.h:62
#define PROGRESS_VACUUM_DELAY_TIME
Definition progress.h:31
#define PROGRESS_VACUUM_INDEXES_PROCESSED
Definition progress.h:30
#define RelationGetRelid(relation)
Definition rel.h:516
#define RelationGetRelationName(relation)
Definition rel.h:550
#define RelationGetNamespace(relation)
Definition rel.h:557
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition shm_toc.c:88
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition shm_toc.c:171
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition shm_toc.c:239
#define shm_toc_estimate_chunk(e, sz)
Definition shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition shm_toc.h:53
Size mul_size(Size s1, Size s2)
Definition shmem.c:1063
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50
PGPROC * MyProc
Definition proc.c:71
struct ErrorContextCallback * previous
Definition elog.h:299
void(* callback)(void *arg)
Definition elog.h:300
bool amusemaintenanceworkmem
Definition amapi.h:280
uint8 amparallelvacuumoptions
Definition amapi.h:284
Relation index
Definition genam.h:54
double num_heap_tuples
Definition genam.h:60
bool analyze_only
Definition genam.h:56
BufferAccessStrategy strategy
Definition genam.h:61
Relation heaprel
Definition genam.h:55
bool report_progress
Definition genam.h:57
int message_level
Definition genam.h:59
bool estimated_count
Definition genam.h:58
uint8 statusFlags
Definition proc.h:210
IndexBulkDeleteResult istat
bool parallel_workers_can_process
PVIndVacStatus status
pg_atomic_uint32 generation
bool is_autovacuum
double reltuples
pg_atomic_uint32 cost_balance
int maintenance_work_mem_worker
pg_atomic_uint32 active_nworkers
PVSharedCostParams cost_params
dsa_pointer dead_items_handle
dsa_handle dead_items_dsa_handle
pg_atomic_uint32 idx
bool estimated_count
VacDeadItemsInfo dead_items_info
int nlaunched
Definition vacuum.h:311
dsm_segment * seg
Definition parallel.h:44
shm_toc_estimator estimator
Definition parallel.h:43
shm_toc * toc
Definition parallel.h:46
int nworkers_launched
Definition parallel.h:39
BufferAccessStrategy bstrategy
BufferUsage * buffer_usage
ParallelContext * pcxt
PVIndVacStatus status
const struct IndexAmRoutine * rd_indam
Definition rel.h:206
size_t max_bytes
Definition vacuum.h:298
int64 num_items
Definition vacuum.h:299
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
dsa_area * TidStoreGetDSA(TidStore *ts)
Definition tidstore.c:544
void TidStoreDetach(TidStore *ts)
Definition tidstore.c:269
void TidStoreDestroy(TidStore *ts)
Definition tidstore.c:317
TidStore * TidStoreAttach(dsa_handle area_handle, dsa_pointer handle)
Definition tidstore.c:244
dsa_pointer TidStoreGetHandle(TidStore *ts)
Definition tidstore.c:552
TidStore * TidStoreCreateShared(size_t max_bytes, int tranche_id)
Definition tidstore.c:208
pg_atomic_uint32 * VacuumActiveNWorkers
Definition vacuum.c:118
void vacuum(List *relations, const VacuumParams *params, BufferAccessStrategy bstrategy, MemoryContext vac_context, bool isTopLevel)
Definition vacuum.c:494
bool track_cost_delay_timing
Definition vacuum.c:83
double vacuum_cost_delay
Definition vacuum.c:92
void vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel)
Definition vacuum.c:2369
int VacuumCostBalanceLocal
Definition vacuum.c:119
IndexBulkDeleteResult * vac_cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat)
Definition vacuum.c:2679
int64 parallel_vacuum_worker_delay_ns
Definition vacuum.c:96
void vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
Definition vacuum.c:2412
pg_atomic_uint32 * VacuumSharedCostBalance
Definition vacuum.c:117
int vacuum_cost_limit
Definition vacuum.c:93
IndexBulkDeleteResult * vac_bulkdel_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat, TidStore *dead_items, VacDeadItemsInfo *dead_items_info)
Definition vacuum.c:2658
#define VACUUM_OPTION_PARALLEL_CLEANUP
Definition vacuum.h:62
#define VACUUM_OPTION_NO_PARALLEL
Definition vacuum.h:41
#define VACUUM_OPTION_PARALLEL_BULKDEL
Definition vacuum.h:47
#define VACUUM_OPTION_MAX_VALID_VALUE
Definition vacuum.h:65
#define VACUUM_OPTION_PARALLEL_COND_CLEANUP
Definition vacuum.h:54
static uint32 shared_params_generation_local
void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, bool estimated_count, PVWorkerStats *wstats)
static void parallel_vacuum_error_callback(void *arg)
TidStore * parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
void parallel_vacuum_update_shared_delay_params(void)
static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, bool *will_parallel_vacuum)
#define PARALLEL_VACUUM_KEY_INDEX_STATS
static PVSharedCostParams * pv_shared_cost_params
#define PARALLEL_VACUUM_KEY_QUERY_TEXT
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE
void parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, PVWorkerStats *wstats)
ParallelVacuumState * parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int vac_work_mem, int elevel, BufferAccessStrategy bstrategy)
#define PARALLEL_VACUUM_KEY_SHARED
#define PARALLEL_VACUUM_KEY_WAL_USAGE
void parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, bool vacuum, PVWorkerStats *wstats)
void parallel_vacuum_propagate_shared_delay_params(void)
static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, PVIndStats *indstats)
void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
static void parallel_vacuum_dsm_detach(dsm_segment *seg, Datum arg)
static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, bool vacuum)
static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
static void parallel_vacuum_set_cost_parameters(PVSharedCostParams *params)
void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
PVIndVacStatus
@ PARALLEL_INDVAC_STATUS_NEED_CLEANUP
@ PARALLEL_INDVAC_STATUS_INITIAL
@ PARALLEL_INDVAC_STATUS_NEED_BULKDELETE
@ PARALLEL_INDVAC_STATUS_COMPLETED
void ExitParallelMode(void)
Definition xact.c:1094
void EnterParallelMode(void)
Definition xact.c:1081