PostgreSQL Source Code git master
Loading...
Searching...
No Matches
vacuumparallel.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * vacuumparallel.c
4 * Support routines for parallel vacuum execution.
5 *
6 * This file contains routines that are intended to support setting up, using,
7 * and tearing down a ParallelVacuumState.
8 *
9 * In a parallel vacuum, we perform both index bulk deletion and index cleanup
10 * with parallel worker processes. Individual indexes are processed by one
11 * vacuum process. ParallelVacuumState contains shared information as well as
12 * the memory space for storing dead items allocated in the DSA area. We
13 * launch parallel worker processes at the start of parallel index
14 * bulk-deletion and index cleanup and once all indexes are processed, the
15 * parallel worker processes exit. Each time we process indexes in parallel,
16 * the parallel context is re-initialized so that the same DSM can be used for
17 * multiple passes of index bulk-deletion and index cleanup.
18 *
19 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
20 * Portions Copyright (c) 1994, Regents of the University of California
21 *
22 * IDENTIFICATION
23 * src/backend/commands/vacuumparallel.c
24 *
25 *-------------------------------------------------------------------------
26 */
27#include "postgres.h"
28
29#include "access/amapi.h"
30#include "access/table.h"
31#include "access/xact.h"
32#include "commands/progress.h"
33#include "commands/vacuum.h"
34#include "executor/instrument.h"
35#include "optimizer/paths.h"
36#include "pgstat.h"
37#include "storage/bufmgr.h"
38#include "storage/proc.h"
39#include "tcop/tcopprot.h"
40#include "utils/lsyscache.h"
41#include "utils/rel.h"
42
43/*
44 * DSM keys for parallel vacuum. Unlike other parallel execution code, since
45 * we don't need to worry about DSM keys conflicting with plan_node_id we can
46 * use small integers.
47 */
48#define PARALLEL_VACUUM_KEY_SHARED 1
49#define PARALLEL_VACUUM_KEY_QUERY_TEXT 2
50#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3
51#define PARALLEL_VACUUM_KEY_WAL_USAGE 4
52#define PARALLEL_VACUUM_KEY_INDEX_STATS 5
53
54/*
55 * Shared information among parallel workers. So this is allocated in the DSM
56 * segment.
57 */
58typedef struct PVShared
59{
60 /*
61 * Target table relid, log level (for messages about parallel workers
62 * launched during VACUUM VERBOSE) and query ID. These fields are not
63 * modified during the parallel vacuum.
64 */
66 int elevel;
68
69 /*
70 * Fields for both index vacuum and cleanup.
71 *
72 * reltuples is the total number of input heap tuples. We set either old
73 * live tuples in the index vacuum case or the new live tuples in the
74 * index cleanup case.
75 *
76 * estimated_count is true if reltuples is an estimated value. (Note that
77 * reltuples could be -1 in this case, indicating we have no idea.)
78 */
79 double reltuples;
81
82 /*
83 * In single process vacuum we could consume more memory during index
84 * vacuuming or cleanup apart from the memory for heap scanning. In
85 * parallel vacuum, since individual vacuum workers can consume memory
86 * equal to maintenance_work_mem, the new maintenance_work_mem for each
87 * worker is set such that the parallel operation doesn't consume more
88 * memory than single process vacuum.
89 */
91
92 /*
93 * The number of buffers each worker's Buffer Access Strategy ring should
94 * contain.
95 */
97
98 /*
99 * Shared vacuum cost balance. During parallel vacuum,
100 * VacuumSharedCostBalance points to this value and it accumulates the
101 * balance of each parallel vacuum worker.
102 */
104
105 /*
106 * Number of active parallel workers. This is used for computing the
107 * minimum threshold of the vacuum cost balance before a worker sleeps for
108 * cost-based delay.
109 */
111
112 /* Counter for vacuuming and cleanup */
114
115 /* DSA handle where the TidStore lives */
117
118 /* DSA pointer to the shared TidStore */
120
121 /* Statistics of shared dead items */
124
125/* Status used during parallel index vacuum or cleanup */
133
134/*
135 * Struct for index vacuum statistics of an index that is used for parallel vacuum.
136 * This includes the status of parallel index vacuum as well as index statistics.
137 */
138typedef struct PVIndStats
139{
140 /*
141 * The following two fields are set by leader process before executing
142 * parallel index vacuum or parallel index cleanup. These fields are not
143 * fixed for the entire VACUUM operation. They are only fixed for an
144 * individual parallel index vacuum and cleanup.
145 *
146 * parallel_workers_can_process is true if both leader and worker can
147 * process the index, otherwise only leader can process it.
148 */
151
152 /*
153 * Individual worker or leader stores the result of index vacuum or
154 * cleanup.
155 */
156 bool istat_updated; /* are the stats updated? */
159
160/*
161 * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
162 */
164{
165 /* NULL for worker processes */
167
168 /* Parent Heap Relation */
170
171 /* Target indexes */
174
175 /* Shared information among parallel vacuum workers */
177
178 /*
179 * Shared index statistics among parallel vacuum workers. The array
180 * element is allocated for every index, even those indexes where parallel
181 * index vacuuming is unsafe or not worthwhile (e.g.,
182 * will_parallel_vacuum[] is false). During parallel vacuum,
183 * IndexBulkDeleteResult of each index is kept in DSM and is copied into
184 * local memory at the end of parallel vacuum.
185 */
187
188 /* Shared dead items space among parallel vacuum workers */
190
191 /* Points to buffer usage area in DSM */
193
194 /* Points to WAL usage area in DSM */
196
197 /*
198 * False if the index is totally unsuitable target for all parallel
199 * processing. For example, the index could be <
200 * min_parallel_index_scan_size cutoff.
201 */
203
204 /*
205 * The number of indexes that support parallel index bulk-deletion and
206 * parallel index cleanup respectively.
207 */
211
212 /* Buffer access strategy used by leader process */
214
215 /*
216 * Error reporting state. The error callback is set only for workers
217 * processes during parallel index vacuum.
218 */
220 char *relname;
221 char *indname;
223};
224
225static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
226 bool *will_parallel_vacuum);
227static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
232 PVIndStats *indstats);
233static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
234 bool vacuum);
235static void parallel_vacuum_error_callback(void *arg);
236
237/*
238 * Try to enter parallel mode and create a parallel context. Then initialize
239 * shared memory state.
240 *
241 * On success, return parallel vacuum state. Otherwise return NULL.
242 */
244parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
246 int elevel, BufferAccessStrategy bstrategy)
247{
249 ParallelContext *pcxt;
250 PVShared *shared;
251 TidStore *dead_items;
252 PVIndStats *indstats;
253 BufferUsage *buffer_usage;
254 WalUsage *wal_usage;
255 bool *will_parallel_vacuum;
258 int nindexes_mwm = 0;
259 int parallel_workers = 0;
260 int querylen;
261
262 /*
263 * A parallel vacuum must be requested and there must be indexes on the
264 * relation
265 */
267 Assert(nindexes > 0);
268
269 /*
270 * Compute the number of parallel vacuum workers to launch
271 */
272 will_parallel_vacuum = palloc0_array(bool, nindexes);
273 parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
275 will_parallel_vacuum);
276 if (parallel_workers <= 0)
277 {
278 /* Can't perform vacuum in parallel -- return NULL */
279 pfree(will_parallel_vacuum);
280 return NULL;
281 }
282
284 pvs->indrels = indrels;
285 pvs->nindexes = nindexes;
286 pvs->will_parallel_vacuum = will_parallel_vacuum;
287 pvs->bstrategy = bstrategy;
288 pvs->heaprel = rel;
289
291 pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
292 parallel_workers);
293 Assert(pcxt->nworkers > 0);
294 pvs->pcxt = pcxt;
295
296 /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
297 est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
300
301 /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
302 est_shared_len = sizeof(PVShared);
305
306 /*
307 * Estimate space for BufferUsage and WalUsage --
308 * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
309 *
310 * If there are no extensions loaded that care, we could skip this. We
311 * have no way of knowing whether anyone's looking at pgBufferUsage or
312 * pgWalUsage, so do it unconditionally.
313 */
315 mul_size(sizeof(BufferUsage), pcxt->nworkers));
318 mul_size(sizeof(WalUsage), pcxt->nworkers));
320
321 /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
323 {
327 }
328 else
329 querylen = 0; /* keep compiler quiet */
330
332
333 /* Prepare index vacuum stats */
334 indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
335 MemSet(indstats, 0, est_indstats_len);
336 for (int i = 0; i < nindexes; i++)
337 {
338 Relation indrel = indrels[i];
340
341 /*
342 * Cleanup option should be either disabled, always performing in
343 * parallel or conditionally performing in parallel.
344 */
348
349 if (!will_parallel_vacuum[i])
350 continue;
351
352 if (indrel->rd_indam->amusemaintenanceworkmem)
353 nindexes_mwm++;
354
355 /*
356 * Remember the number of indexes that support parallel operation for
357 * each phase.
358 */
365 }
367 pvs->indstats = indstats;
368
369 /* Prepare shared information */
370 shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
371 MemSet(shared, 0, est_shared_len);
372 shared->relid = RelationGetRelid(rel);
373 shared->elevel = elevel;
376 (nindexes_mwm > 0) ?
377 maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
379 shared->dead_items_info.max_bytes = vac_work_mem * (size_t) 1024;
380
381 /* Prepare DSA space for dead items */
384 pvs->dead_items = dead_items;
385 shared->dead_items_handle = TidStoreGetHandle(dead_items);
387
388 /* Use the same buffer size for all workers */
389 shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
390
391 pg_atomic_init_u32(&(shared->cost_balance), 0);
392 pg_atomic_init_u32(&(shared->active_nworkers), 0);
393 pg_atomic_init_u32(&(shared->idx), 0);
394
396 pvs->shared = shared;
397
398 /*
399 * Allocate space for each worker's BufferUsage and WalUsage; no need to
400 * initialize
401 */
402 buffer_usage = shm_toc_allocate(pcxt->toc,
403 mul_size(sizeof(BufferUsage), pcxt->nworkers));
405 pvs->buffer_usage = buffer_usage;
406 wal_usage = shm_toc_allocate(pcxt->toc,
407 mul_size(sizeof(WalUsage), pcxt->nworkers));
409 pvs->wal_usage = wal_usage;
410
411 /* Store query string for workers */
413 {
414 char *sharedquery;
415
416 sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
418 sharedquery[querylen] = '\0';
419 shm_toc_insert(pcxt->toc,
421 }
422
423 /* Success -- return parallel vacuum state */
424 return pvs;
425}
426
427/*
428 * Destroy the parallel context, and end parallel mode.
429 *
430 * Since writes are not allowed during parallel mode, copy the
431 * updated index statistics from DSM into local memory and then later use that
432 * to update the index statistics. One might think that we can exit from
433 * parallel mode, update the index statistics and then destroy parallel
434 * context, but that won't be safe (see ExitParallelMode).
435 */
436void
438{
440
441 /* Copy the updated statistics */
442 for (int i = 0; i < pvs->nindexes; i++)
443 {
444 PVIndStats *indstats = &(pvs->indstats[i]);
445
446 if (indstats->istat_updated)
447 {
449 memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
450 }
451 else
452 istats[i] = NULL;
453 }
454
456
459
461 pfree(pvs);
462}
463
464/*
465 * Returns the dead items space and dead items information.
466 */
467TidStore *
473
474/* Forget all items in dead_items */
475void
477{
478 VacDeadItemsInfo *dead_items_info = &(pvs->shared->dead_items_info);
479
480 /*
481 * Free the current tidstore and return allocated DSA segments to the
482 * operating system. Then we recreate the tidstore with the same max_bytes
483 * limitation we just used.
484 */
486 pvs->dead_items = TidStoreCreateShared(dead_items_info->max_bytes,
488
489 /* Update the DSA pointer for dead_items to the new one */
492
493 /* Reset the counter */
494 dead_items_info->num_items = 0;
495}
496
497/*
498 * Do parallel index bulk-deletion with parallel workers.
499 */
500void
502 int num_index_scans, PVWorkerStats *wstats)
503{
505
506 /*
507 * We can only provide an approximate value of num_heap_tuples, at least
508 * for now.
509 */
511 pvs->shared->estimated_count = true;
512
513 parallel_vacuum_process_all_indexes(pvs, num_index_scans, true, wstats);
514}
515
516/*
517 * Do parallel index cleanup with parallel workers.
518 */
519void
521 int num_index_scans, bool estimated_count,
523{
525
526 /*
527 * We can provide a better estimate of total number of surviving tuples
528 * (we assume indexes are more interested in that than in the number of
529 * nominally live tuples).
530 */
532 pvs->shared->estimated_count = estimated_count;
533
534 parallel_vacuum_process_all_indexes(pvs, num_index_scans, false, wstats);
535}
536
537/*
538 * Compute the number of parallel worker processes to request. Both index
539 * vacuum and index cleanup can be executed with parallel workers.
540 * The index is eligible for parallel vacuum iff its size is greater than
541 * min_parallel_index_scan_size as invoking workers for very small indexes
542 * can hurt performance.
543 *
544 * nrequested is the number of parallel workers that user requested. If
545 * nrequested is 0, we compute the parallel degree based on nindexes, that is
546 * the number of indexes that support parallel vacuum. This function also
547 * sets will_parallel_vacuum to remember indexes that participate in parallel
548 * vacuum.
549 */
550static int
552 bool *will_parallel_vacuum)
553{
554 int nindexes_parallel = 0;
555 int nindexes_parallel_bulkdel = 0;
556 int nindexes_parallel_cleanup = 0;
557 int parallel_workers;
558
559 /*
560 * We don't allow performing parallel operation in standalone backend or
561 * when parallelism is disabled.
562 */
564 return 0;
565
566 /*
567 * Compute the number of indexes that can participate in parallel vacuum.
568 */
569 for (int i = 0; i < nindexes; i++)
570 {
571 Relation indrel = indrels[i];
573
574 /* Skip index that is not a suitable target for parallel index vacuum */
577 continue;
578
579 will_parallel_vacuum[i] = true;
580
582 nindexes_parallel_bulkdel++;
585 nindexes_parallel_cleanup++;
586 }
587
588 nindexes_parallel = Max(nindexes_parallel_bulkdel,
589 nindexes_parallel_cleanup);
590
591 /* The leader process takes one index */
593
594 /* No index supports parallel vacuum */
595 if (nindexes_parallel <= 0)
596 return 0;
597
598 /* Compute the parallel degree */
599 parallel_workers = (nrequested > 0) ?
601
602 /* Cap by max_parallel_maintenance_workers */
603 parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
604
605 return parallel_workers;
606}
607
608/*
609 * Perform index vacuum or index cleanup with parallel workers. This function
610 * must be used by the parallel vacuum leader process.
611 *
612 * If wstats is not NULL, the parallel worker statistics are updated.
613 */
614static void
617{
618 int nworkers;
620
622
623 if (vacuum)
624 {
626
627 /* Determine the number of parallel workers to launch */
628 nworkers = pvs->nindexes_parallel_bulkdel;
629 }
630 else
631 {
633
634 /* Determine the number of parallel workers to launch */
635 nworkers = pvs->nindexes_parallel_cleanup;
636
637 /* Add conditionally parallel-aware indexes if in the first time call */
638 if (num_index_scans == 0)
639 nworkers += pvs->nindexes_parallel_condcleanup;
640 }
641
642 /* The leader process will participate */
643 nworkers--;
644
645 /*
646 * It is possible that parallel context is initialized with fewer workers
647 * than the number of indexes that need a separate worker in the current
648 * phase, so we need to consider it. See
649 * parallel_vacuum_compute_workers().
650 */
651 nworkers = Min(nworkers, pvs->pcxt->nworkers);
652
653 /* Update the statistics, if we asked to */
654 if (wstats != NULL && nworkers > 0)
655 wstats->nplanned += nworkers;
656
657 /*
658 * Set index vacuum status and mark whether parallel vacuum worker can
659 * process it.
660 */
661 for (int i = 0; i < pvs->nindexes; i++)
662 {
663 PVIndStats *indstats = &(pvs->indstats[i]);
664
666 indstats->status = new_status;
668 (pvs->will_parallel_vacuum[i] &&
670 num_index_scans,
671 vacuum));
672 }
673
674 /* Reset the parallel index processing and progress counters */
675 pg_atomic_write_u32(&(pvs->shared->idx), 0);
676
677 /* Setup the shared cost-based vacuum delay and launch workers */
678 if (nworkers > 0)
679 {
680 /* Reinitialize parallel context to relaunch parallel workers */
681 if (num_index_scans > 0)
683
684 /*
685 * Set up shared cost balance and the number of active workers for
686 * vacuum delay. We need to do this before launching workers as
687 * otherwise, they might not see the updated values for these
688 * parameters.
689 */
692
693 /*
694 * The number of workers can vary between bulkdelete and cleanup
695 * phase.
696 */
697 ReinitializeParallelWorkers(pvs->pcxt, nworkers);
698
700
701 if (pvs->pcxt->nworkers_launched > 0)
702 {
703 /*
704 * Reset the local cost values for leader backend as we have
705 * already accumulated the remaining balance of heap.
706 */
709
710 /* Enable shared cost balance for leader backend */
713
714 /* Update the statistics, if we asked to */
715 if (wstats != NULL)
716 wstats->nlaunched += pvs->pcxt->nworkers_launched;
717 }
718
719 if (vacuum)
720 ereport(pvs->shared->elevel,
721 (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
722 "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
723 pvs->pcxt->nworkers_launched),
724 pvs->pcxt->nworkers_launched, nworkers)));
725 else
726 ereport(pvs->shared->elevel,
727 (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
728 "launched %d parallel vacuum workers for index cleanup (planned: %d)",
729 pvs->pcxt->nworkers_launched),
730 pvs->pcxt->nworkers_launched, nworkers)));
731 }
732
733 /* Vacuum the indexes that can be processed by only leader process */
735
736 /*
737 * Join as a parallel worker. The leader vacuums alone processes all
738 * parallel-safe indexes in the case where no workers are launched.
739 */
741
742 /*
743 * Next, accumulate buffer and WAL usage. (This must wait for the workers
744 * to finish, or we might get incomplete data.)
745 */
746 if (nworkers > 0)
747 {
748 /* Wait for all vacuum workers to finish */
750
751 for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
753 }
754
755 /*
756 * Reset all index status back to initial (while checking that we have
757 * vacuumed all indexes).
758 */
759 for (int i = 0; i < pvs->nindexes; i++)
760 {
761 PVIndStats *indstats = &(pvs->indstats[i]);
762
764 elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
766
768 }
769
770 /*
771 * Carry the shared balance value to heap scan and disable shared costing
772 */
774 {
778 }
779}
780
781/*
782 * Index vacuum/cleanup routine used by the leader process and parallel
783 * vacuum worker processes to vacuum the indexes in parallel.
784 */
785static void
787{
788 /*
789 * Increment the active worker count if we are able to launch any worker.
790 */
793
794 /* Loop until all indexes are vacuumed */
795 for (;;)
796 {
797 int idx;
798 PVIndStats *indstats;
799
800 /* Get an index number to process */
801 idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
802
803 /* Done for all indexes? */
804 if (idx >= pvs->nindexes)
805 break;
806
807 indstats = &(pvs->indstats[idx]);
808
809 /*
810 * Skip vacuuming index that is unsafe for workers or has an
811 * unsuitable target for parallel index vacuum (this is vacuumed in
812 * parallel_vacuum_process_unsafe_indexes() by the leader).
813 */
814 if (!indstats->parallel_workers_can_process)
815 continue;
816
817 /* Do vacuum or cleanup of the index */
818 parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
819 }
820
821 /*
822 * We have completed the index vacuum so decrement the active worker
823 * count.
824 */
827}
828
829/*
830 * Perform parallel vacuuming of indexes in leader process.
831 *
832 * Handles index vacuuming (or index cleanup) for indexes that are not
833 * parallel safe. It's possible that this will vary for a given index, based
834 * on details like whether we're performing index cleanup right now.
835 *
836 * Also performs vacuuming of smaller indexes that fell under the size cutoff
837 * enforced by parallel_vacuum_compute_workers().
838 */
839static void
841{
843
844 /*
845 * Increment the active worker count if we are able to launch any worker.
846 */
849
850 for (int i = 0; i < pvs->nindexes; i++)
851 {
852 PVIndStats *indstats = &(pvs->indstats[i]);
853
854 /* Skip, indexes that are safe for workers */
855 if (indstats->parallel_workers_can_process)
856 continue;
857
858 /* Do vacuum or cleanup of the index */
859 parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
860 }
861
862 /*
863 * We have completed the index vacuum so decrement the active worker
864 * count.
865 */
868}
869
870/*
871 * Vacuum or cleanup index either by leader process or by one of the worker
872 * process. After vacuuming the index this function copies the index
873 * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
874 * segment.
875 */
876static void
878 PVIndStats *indstats)
879{
883
884 /*
885 * Update the pointer to the corresponding bulk-deletion result if someone
886 * has already updated it
887 */
888 if (indstats->istat_updated)
889 istat = &(indstats->istat);
890
891 ivinfo.index = indrel;
892 ivinfo.heaprel = pvs->heaprel;
893 ivinfo.analyze_only = false;
894 ivinfo.report_progress = false;
895 ivinfo.message_level = DEBUG2;
896 ivinfo.estimated_count = pvs->shared->estimated_count;
897 ivinfo.num_heap_tuples = pvs->shared->reltuples;
898 ivinfo.strategy = pvs->bstrategy;
899
900 /* Update error traceback information */
902 pvs->status = indstats->status;
903
904 switch (indstats->status)
905 {
908 &pvs->shared->dead_items_info);
909 break;
912 break;
913 default:
914 elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
915 indstats->status,
917 }
918
919 /*
920 * Copy the index bulk-deletion result returned from ambulkdelete and
921 * amvacuumcleanup to the DSM segment if it's the first cycle because they
922 * allocate locally and it's possible that an index will be vacuumed by a
923 * different vacuum process the next cycle. Copying the result normally
924 * happens only the first time an index is vacuumed. For any additional
925 * vacuum pass, we directly point to the result on the DSM segment and
926 * pass it to vacuum index APIs so that workers can update it directly.
927 *
928 * Since all vacuum workers write the bulk-deletion result at different
929 * slots we can write them without locking.
930 */
931 if (!indstats->istat_updated && istat_res != NULL)
932 {
933 memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
934 indstats->istat_updated = true;
935
936 /* Free the locally-allocated bulk-deletion result */
938 }
939
940 /*
941 * Update the status to completed. No need to lock here since each worker
942 * touches different indexes.
943 */
945
946 /* Reset error traceback information */
948 pfree(pvs->indname);
949 pvs->indname = NULL;
950
951 /*
952 * Call the parallel variant of pgstat_progress_incr_param so workers can
953 * report progress of index vacuum to the leader.
954 */
956}
957
958/*
959 * Returns false, if the given index can't participate in the next execution of
960 * parallel index vacuum or parallel index cleanup.
961 */
962static bool
964 bool vacuum)
965{
967
968 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
969
970 /* In parallel vacuum case, check if it supports parallel bulk-deletion */
971 if (vacuum)
973
974 /* Not safe, if the index does not support parallel cleanup */
977 return false;
978
979 /*
980 * Not safe, if the index supports parallel cleanup conditionally, but we
981 * have already processed the index (for bulkdelete). We do this to avoid
982 * the need to invoke workers when parallel index cleanup doesn't need to
983 * scan the index. See the comments for option
984 * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
985 * parallel cleanup conditionally.
986 */
987 if (num_index_scans > 0 &&
989 return false;
990
991 return true;
992}
993
994/*
995 * Perform work within a launched parallel process.
996 *
997 * Since parallel vacuum workers perform only index vacuum or index cleanup,
998 * we don't need to report progress information.
999 */
1000void
1002{
1004 Relation rel;
1005 Relation *indrels;
1006 PVIndStats *indstats;
1007 PVShared *shared;
1008 TidStore *dead_items;
1009 BufferUsage *buffer_usage;
1010 WalUsage *wal_usage;
1011 int nindexes;
1012 char *sharedquery;
1013 ErrorContextCallback errcallback;
1014
1015 /*
1016 * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
1017 * don't support parallel vacuum for autovacuum as of now.
1018 */
1020
1021 elog(DEBUG1, "starting parallel vacuum worker");
1022
1023 shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
1024
1025 /* Set debug_query_string for individual workers */
1029
1030 /* Track query ID */
1031 pgstat_report_query_id(shared->queryid, false);
1032
1033 /*
1034 * Open table. The lock mode is the same as the leader process. It's
1035 * okay because the lock mode does not conflict among the parallel
1036 * workers.
1037 */
1039
1040 /*
1041 * Open all indexes. indrels are sorted in order by OID, which should be
1042 * matched to the leader's one.
1043 */
1044 vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
1045 Assert(nindexes > 0);
1046
1047 /*
1048 * Apply the desired value of maintenance_work_mem within this process.
1049 * Really we should use SetConfigOption() to change a GUC, but since we're
1050 * already in parallel mode guc.c would complain about that. Fortunately,
1051 * by the same token guc.c will not let any user-defined code change it.
1052 * So just avert your eyes while we do this:
1053 */
1054 if (shared->maintenance_work_mem_worker > 0)
1056
1057 /* Set index statistics */
1058 indstats = (PVIndStats *) shm_toc_lookup(toc,
1060 false);
1061
1062 /* Find dead_items in shared memory */
1063 dead_items = TidStoreAttach(shared->dead_items_dsa_handle,
1064 shared->dead_items_handle);
1065
1066 /* Set cost-based vacuum delay */
1072
1073 /* Set parallel vacuum state */
1074 pvs.indrels = indrels;
1075 pvs.nindexes = nindexes;
1076 pvs.indstats = indstats;
1077 pvs.shared = shared;
1078 pvs.dead_items = dead_items;
1081 pvs.heaprel = rel;
1082
1083 /* These fields will be filled during index vacuum or cleanup */
1084 pvs.indname = NULL;
1086
1087 /* Each parallel VACUUM worker gets its own access strategy. */
1089 shared->ring_nbuffers * (BLCKSZ / 1024));
1090
1091 /* Setup error traceback support for ereport() */
1093 errcallback.arg = &pvs;
1094 errcallback.previous = error_context_stack;
1095 error_context_stack = &errcallback;
1096
1097 /* Prepare to track buffer usage during parallel execution */
1099
1100 /* Process indexes to perform vacuum/cleanup */
1102
1103 /* Report buffer/WAL usage during parallel execution */
1104 buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
1105 wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
1107 &wal_usage[ParallelWorkerNumber]);
1108
1109 /* Report any remaining cost-based vacuum delay time */
1113
1114 TidStoreDetach(dead_items);
1115
1116 /* Pop the error context stack */
1117 error_context_stack = errcallback.previous;
1118
1119 vac_close_indexes(nindexes, indrels, RowExclusiveLock);
1122}
1123
1124/*
1125 * Error context callback for errors occurring during parallel index vacuum.
1126 * The error context messages should match the messages set in the lazy vacuum
1127 * error context. If you change this function, change vacuum_error_callback()
1128 * as well.
1129 */
1130static void
1132{
1134
1135 switch (errinfo->status)
1136 {
1138 errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
1139 errinfo->indname,
1140 errinfo->relnamespace,
1141 errinfo->relname);
1142 break;
1144 errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
1145 errinfo->indname,
1146 errinfo->relnamespace,
1147 errinfo->relname);
1148 break;
1151 default:
1152 return;
1153 }
1154}
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:262
int min_parallel_index_scan_size
Definition allpaths.c:86
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition atomics.h:439
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition atomics.h:219
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition atomics.h:366
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition atomics.h:424
static void pg_atomic_write_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition atomics.h:274
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition atomics.h:237
void VacuumUpdateCosts(void)
int ParallelWorkerNumber
Definition parallel.c:117
void InitializeParallelDSM(ParallelContext *pcxt)
Definition parallel.c:213
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
Definition parallel.c:805
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition parallel.c:583
void ReinitializeParallelDSM(ParallelContext *pcxt)
Definition parallel.c:511
void DestroyParallelContext(ParallelContext *pcxt)
Definition parallel.c:959
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
Definition parallel.c:175
void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
Definition parallel.c:568
void pgstat_progress_parallel_incr_param(int index, int64 incr)
void pgstat_report_query_id(int64 query_id, bool force)
int64 pgstat_get_my_query_id(void)
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
@ BAS_VACUUM
Definition bufmgr.h:40
#define RelationGetNumberOfBlocks(reln)
Definition bufmgr.h:307
#define Min(x, y)
Definition c.h:1093
uint8_t uint8
Definition c.h:616
#define ngettext(s, p, n)
Definition c.h:1272
#define Max(x, y)
Definition c.h:1087
#define Assert(condition)
Definition c.h:945
int64_t int64
Definition c.h:615
#define MemSet(start, val, len)
Definition c.h:1109
size_t Size
Definition c.h:691
dsa_handle dsa_get_handle(dsa_area *area)
Definition dsa.c:498
uint64 dsa_pointer
Definition dsa.h:62
dsm_handle dsa_handle
Definition dsa.h:136
Datum arg
Definition elog.c:1322
ErrorContextCallback * error_context_stack
Definition elog.c:99
#define errcontext
Definition elog.h:198
#define DEBUG2
Definition elog.h:29
#define DEBUG1
Definition elog.h:30
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
#define palloc0_array(type, count)
Definition fe_memutils.h:77
#define palloc0_object(type)
Definition fe_memutils.h:75
BufferAccessStrategy GetAccessStrategyWithSize(BufferAccessStrategyType btype, int ring_size_kb)
Definition freelist.c:546
int GetAccessStrategyBufferCount(BufferAccessStrategy strategy)
Definition freelist.c:586
void FreeAccessStrategy(BufferAccessStrategy strategy)
Definition freelist.c:643
int max_parallel_maintenance_workers
Definition globals.c:134
bool IsUnderPostmaster
Definition globals.c:120
int VacuumCostBalance
Definition globals.c:157
int maintenance_work_mem
Definition globals.c:133
#define IsParallelWorker()
Definition parallel.h:62
void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition instrument.c:219
void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition instrument.c:209
void InstrStartParallelQuery(void)
Definition instrument.c:201
int i
Definition isn.c:77
#define ShareUpdateExclusiveLock
Definition lockdefs.h:39
#define RowExclusiveLock
Definition lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3588
char * pstrdup(const char *in)
Definition mcxt.c:1781
void pfree(void *pointer)
Definition mcxt.c:1616
static char * errmsg
const char * debug_query_string
Definition postgres.c:91
unsigned int Oid
static int fb(int x)
#define PROC_IN_VACUUM
Definition proc.h:59
#define PROGRESS_VACUUM_DELAY_TIME
Definition progress.h:31
#define PROGRESS_VACUUM_INDEXES_PROCESSED
Definition progress.h:30
#define RelationGetRelid(relation)
Definition rel.h:514
#define RelationGetRelationName(relation)
Definition rel.h:548
#define RelationGetNamespace(relation)
Definition rel.h:555
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition shm_toc.c:88
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition shm_toc.c:171
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition shm_toc.c:232
#define shm_toc_estimate_chunk(e, sz)
Definition shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition shm_toc.h:53
Size mul_size(Size s1, Size s2)
Definition shmem.c:500
PGPROC * MyProc
Definition proc.c:68
struct ErrorContextCallback * previous
Definition elog.h:297
void(* callback)(void *arg)
Definition elog.h:298
uint8 amparallelvacuumoptions
Definition amapi.h:284
uint8 statusFlags
Definition proc.h:207
IndexBulkDeleteResult istat
bool parallel_workers_can_process
PVIndVacStatus status
double reltuples
pg_atomic_uint32 cost_balance
int maintenance_work_mem_worker
int64 queryid
pg_atomic_uint32 active_nworkers
dsa_pointer dead_items_handle
dsa_handle dead_items_dsa_handle
int ring_nbuffers
pg_atomic_uint32 idx
bool estimated_count
VacDeadItemsInfo dead_items_info
shm_toc_estimator estimator
Definition parallel.h:43
shm_toc * toc
Definition parallel.h:46
int nworkers_launched
Definition parallel.h:39
BufferAccessStrategy bstrategy
BufferUsage * buffer_usage
ParallelContext * pcxt
PVIndVacStatus status
const struct IndexAmRoutine * rd_indam
Definition rel.h:206
size_t max_bytes
Definition vacuum.h:299
int64 num_items
Definition vacuum.h:300
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
dsa_area * TidStoreGetDSA(TidStore *ts)
Definition tidstore.c:544
void TidStoreDetach(TidStore *ts)
Definition tidstore.c:269
void TidStoreDestroy(TidStore *ts)
Definition tidstore.c:317
TidStore * TidStoreAttach(dsa_handle area_handle, dsa_pointer handle)
Definition tidstore.c:244
dsa_pointer TidStoreGetHandle(TidStore *ts)
Definition tidstore.c:552
TidStore * TidStoreCreateShared(size_t max_bytes, int tranche_id)
Definition tidstore.c:208
pg_atomic_uint32 * VacuumActiveNWorkers
Definition vacuum.c:118
void vacuum(List *relations, const VacuumParams params, BufferAccessStrategy bstrategy, MemoryContext vac_context, bool isTopLevel)
Definition vacuum.c:494
bool track_cost_delay_timing
Definition vacuum.c:83
void vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel)
Definition vacuum.c:2367
int VacuumCostBalanceLocal
Definition vacuum.c:119
IndexBulkDeleteResult * vac_cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat)
Definition vacuum.c:2659
int64 parallel_vacuum_worker_delay_ns
Definition vacuum.c:96
void vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
Definition vacuum.c:2410
pg_atomic_uint32 * VacuumSharedCostBalance
Definition vacuum.c:117
IndexBulkDeleteResult * vac_bulkdel_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat, TidStore *dead_items, VacDeadItemsInfo *dead_items_info)
Definition vacuum.c:2638
#define VACUUM_OPTION_PARALLEL_CLEANUP
Definition vacuum.h:63
#define VACUUM_OPTION_NO_PARALLEL
Definition vacuum.h:42
#define VACUUM_OPTION_PARALLEL_BULKDEL
Definition vacuum.h:48
#define VACUUM_OPTION_MAX_VALID_VALUE
Definition vacuum.h:66
#define VACUUM_OPTION_PARALLEL_COND_CLEANUP
Definition vacuum.h:55
void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, bool estimated_count, PVWorkerStats *wstats)
static void parallel_vacuum_error_callback(void *arg)
TidStore * parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, bool *will_parallel_vacuum)
#define PARALLEL_VACUUM_KEY_INDEX_STATS
#define PARALLEL_VACUUM_KEY_QUERY_TEXT
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE
void parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, PVWorkerStats *wstats)
ParallelVacuumState * parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int vac_work_mem, int elevel, BufferAccessStrategy bstrategy)
#define PARALLEL_VACUUM_KEY_SHARED
#define PARALLEL_VACUUM_KEY_WAL_USAGE
void parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, bool vacuum, PVWorkerStats *wstats)
static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, PVIndStats *indstats)
void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, bool vacuum)
static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
PVIndVacStatus
@ PARALLEL_INDVAC_STATUS_NEED_CLEANUP
@ PARALLEL_INDVAC_STATUS_INITIAL
@ PARALLEL_INDVAC_STATUS_NEED_BULKDELETE
@ PARALLEL_INDVAC_STATUS_COMPLETED
void ExitParallelMode(void)
Definition xact.c:1066
void EnterParallelMode(void)
Definition xact.c:1053