PostgreSQL Source Code git master
Loading...
Searching...
No Matches
vacuumparallel.c File Reference
#include "postgres.h"
#include "access/amapi.h"
#include "access/table.h"
#include "access/xact.h"
#include "commands/progress.h"
#include "commands/vacuum.h"
#include "executor/instrument.h"
#include "optimizer/paths.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
Include dependency graph for vacuumparallel.c:

Go to the source code of this file.

Data Structures

struct  PVShared
 
struct  PVIndStats
 
struct  ParallelVacuumState
 

Macros

#define PARALLEL_VACUUM_KEY_SHARED   1
 
#define PARALLEL_VACUUM_KEY_QUERY_TEXT   2
 
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE   3
 
#define PARALLEL_VACUUM_KEY_WAL_USAGE   4
 
#define PARALLEL_VACUUM_KEY_INDEX_STATS   5
 

Typedefs

typedef struct PVShared PVShared
 
typedef enum PVIndVacStatus PVIndVacStatus
 
typedef struct PVIndStats PVIndStats
 

Enumerations

enum  PVIndVacStatus { PARALLEL_INDVAC_STATUS_INITIAL = 0 , PARALLEL_INDVAC_STATUS_NEED_BULKDELETE , PARALLEL_INDVAC_STATUS_NEED_CLEANUP , PARALLEL_INDVAC_STATUS_COMPLETED }
 

Functions

static int parallel_vacuum_compute_workers (Relation *indrels, int nindexes, int nrequested, bool *will_parallel_vacuum)
 
static void parallel_vacuum_process_all_indexes (ParallelVacuumState *pvs, int num_index_scans, bool vacuum)
 
static void parallel_vacuum_process_safe_indexes (ParallelVacuumState *pvs)
 
static void parallel_vacuum_process_unsafe_indexes (ParallelVacuumState *pvs)
 
static void parallel_vacuum_process_one_index (ParallelVacuumState *pvs, Relation indrel, PVIndStats *indstats)
 
static bool parallel_vacuum_index_is_parallel_safe (Relation indrel, int num_index_scans, bool vacuum)
 
static void parallel_vacuum_error_callback (void *arg)
 
ParallelVacuumStateparallel_vacuum_init (Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int vac_work_mem, int elevel, BufferAccessStrategy bstrategy)
 
void parallel_vacuum_end (ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
 
TidStoreparallel_vacuum_get_dead_items (ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
 
void parallel_vacuum_reset_dead_items (ParallelVacuumState *pvs)
 
void parallel_vacuum_bulkdel_all_indexes (ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans)
 
void parallel_vacuum_cleanup_all_indexes (ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, bool estimated_count)
 
void parallel_vacuum_main (dsm_segment *seg, shm_toc *toc)
 

Macro Definition Documentation

◆ PARALLEL_VACUUM_KEY_BUFFER_USAGE

#define PARALLEL_VACUUM_KEY_BUFFER_USAGE   3

Definition at line 50 of file vacuumparallel.c.

◆ PARALLEL_VACUUM_KEY_INDEX_STATS

#define PARALLEL_VACUUM_KEY_INDEX_STATS   5

Definition at line 52 of file vacuumparallel.c.

◆ PARALLEL_VACUUM_KEY_QUERY_TEXT

#define PARALLEL_VACUUM_KEY_QUERY_TEXT   2

Definition at line 49 of file vacuumparallel.c.

◆ PARALLEL_VACUUM_KEY_SHARED

#define PARALLEL_VACUUM_KEY_SHARED   1

Definition at line 48 of file vacuumparallel.c.

◆ PARALLEL_VACUUM_KEY_WAL_USAGE

#define PARALLEL_VACUUM_KEY_WAL_USAGE   4

Definition at line 51 of file vacuumparallel.c.

Typedef Documentation

◆ PVIndStats

◆ PVIndVacStatus

◆ PVShared

Enumeration Type Documentation

◆ PVIndVacStatus

Enumerator
PARALLEL_INDVAC_STATUS_INITIAL 
PARALLEL_INDVAC_STATUS_NEED_BULKDELETE 
PARALLEL_INDVAC_STATUS_NEED_CLEANUP 
PARALLEL_INDVAC_STATUS_COMPLETED 

Definition at line 126 of file vacuumparallel.c.

Function Documentation

◆ parallel_vacuum_bulkdel_all_indexes()

void parallel_vacuum_bulkdel_all_indexes ( ParallelVacuumState pvs,
long  num_table_tuples,
int  num_index_scans 
)

Definition at line 501 of file vacuumparallel.c.

503{
505
506 /*
507 * We can only provide an approximate value of num_heap_tuples, at least
508 * for now.
509 */
511 pvs->shared->estimated_count = true;
512
513 parallel_vacuum_process_all_indexes(pvs, num_index_scans, true);
514}
#define Assert(condition)
Definition c.h:927
#define IsParallelWorker()
Definition parallel.h:62
static int fb(int x)
double reltuples
bool estimated_count
static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, bool vacuum)

References Assert, PVShared::estimated_count, fb(), IsParallelWorker, parallel_vacuum_process_all_indexes(), PVShared::reltuples, and ParallelVacuumState::shared.

Referenced by lazy_vacuum_all_indexes().

◆ parallel_vacuum_cleanup_all_indexes()

void parallel_vacuum_cleanup_all_indexes ( ParallelVacuumState pvs,
long  num_table_tuples,
int  num_index_scans,
bool  estimated_count 
)

Definition at line 520 of file vacuumparallel.c.

522{
524
525 /*
526 * We can provide a better estimate of total number of surviving tuples
527 * (we assume indexes are more interested in that than in the number of
528 * nominally live tuples).
529 */
531 pvs->shared->estimated_count = estimated_count;
532
533 parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
534}

References Assert, PVShared::estimated_count, fb(), IsParallelWorker, parallel_vacuum_process_all_indexes(), PVShared::reltuples, and ParallelVacuumState::shared.

Referenced by lazy_cleanup_all_indexes().

◆ parallel_vacuum_compute_workers()

static int parallel_vacuum_compute_workers ( Relation indrels,
int  nindexes,
int  nrequested,
bool will_parallel_vacuum 
)
static

Definition at line 550 of file vacuumparallel.c.

552{
553 int nindexes_parallel = 0;
554 int nindexes_parallel_bulkdel = 0;
555 int nindexes_parallel_cleanup = 0;
556 int parallel_workers;
557
558 /*
559 * We don't allow performing parallel operation in standalone backend or
560 * when parallelism is disabled.
561 */
563 return 0;
564
565 /*
566 * Compute the number of indexes that can participate in parallel vacuum.
567 */
568 for (int i = 0; i < nindexes; i++)
569 {
570 Relation indrel = indrels[i];
572
573 /* Skip index that is not a suitable target for parallel index vacuum */
576 continue;
577
578 will_parallel_vacuum[i] = true;
579
581 nindexes_parallel_bulkdel++;
584 nindexes_parallel_cleanup++;
585 }
586
587 nindexes_parallel = Max(nindexes_parallel_bulkdel,
588 nindexes_parallel_cleanup);
589
590 /* The leader process takes one index */
592
593 /* No index supports parallel vacuum */
594 if (nindexes_parallel <= 0)
595 return 0;
596
597 /* Compute the parallel degree */
598 parallel_workers = (nrequested > 0) ?
600
601 /* Cap by max_parallel_maintenance_workers */
602 parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
603
604 return parallel_workers;
605}
int min_parallel_index_scan_size
Definition allpaths.c:86
#define RelationGetNumberOfBlocks(reln)
Definition bufmgr.h:307
#define Min(x, y)
Definition c.h:1075
uint8_t uint8
Definition c.h:598
#define Max(x, y)
Definition c.h:1069
int max_parallel_maintenance_workers
Definition globals.c:134
bool IsUnderPostmaster
Definition globals.c:120
int i
Definition isn.c:77
uint8 amparallelvacuumoptions
Definition amapi.h:284
const struct IndexAmRoutine * rd_indam
Definition rel.h:206
#define VACUUM_OPTION_PARALLEL_CLEANUP
Definition vacuum.h:63
#define VACUUM_OPTION_NO_PARALLEL
Definition vacuum.h:42
#define VACUUM_OPTION_PARALLEL_BULKDEL
Definition vacuum.h:48
#define VACUUM_OPTION_PARALLEL_COND_CLEANUP
Definition vacuum.h:55

References IndexAmRoutine::amparallelvacuumoptions, fb(), i, IsUnderPostmaster, Max, max_parallel_maintenance_workers, Min, min_parallel_index_scan_size, RelationData::rd_indam, RelationGetNumberOfBlocks, VACUUM_OPTION_NO_PARALLEL, VACUUM_OPTION_PARALLEL_BULKDEL, VACUUM_OPTION_PARALLEL_CLEANUP, and VACUUM_OPTION_PARALLEL_COND_CLEANUP.

Referenced by parallel_vacuum_init().

◆ parallel_vacuum_end()

void parallel_vacuum_end ( ParallelVacuumState pvs,
IndexBulkDeleteResult **  istats 
)

Definition at line 437 of file vacuumparallel.c.

438{
440
441 /* Copy the updated statistics */
442 for (int i = 0; i < pvs->nindexes; i++)
443 {
444 PVIndStats *indstats = &(pvs->indstats[i]);
445
446 if (indstats->istat_updated)
447 {
449 memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
450 }
451 else
452 istats[i] = NULL;
453 }
454
456
459
461 pfree(pvs);
462}
void DestroyParallelContext(ParallelContext *pcxt)
Definition parallel.c:959
#define palloc0_object(type)
Definition fe_memutils.h:75
void pfree(void *pointer)
Definition mcxt.c:1616
IndexBulkDeleteResult istat
ParallelContext * pcxt
void TidStoreDestroy(TidStore *ts)
Definition tidstore.c:317
void ExitParallelMode(void)
Definition xact.c:1066

References Assert, ParallelVacuumState::dead_items, DestroyParallelContext(), ExitParallelMode(), fb(), i, ParallelVacuumState::indstats, IsParallelWorker, PVIndStats::istat, PVIndStats::istat_updated, ParallelVacuumState::nindexes, palloc0_object, ParallelVacuumState::pcxt, pfree(), TidStoreDestroy(), and ParallelVacuumState::will_parallel_vacuum.

Referenced by dead_items_cleanup().

◆ parallel_vacuum_error_callback()

static void parallel_vacuum_error_callback ( void arg)
static

Definition at line 1120 of file vacuumparallel.c.

1121{
1123
1124 switch (errinfo->status)
1125 {
1127 errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
1128 errinfo->indname,
1129 errinfo->relnamespace,
1130 errinfo->relname);
1131 break;
1133 errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
1134 errinfo->indname,
1135 errinfo->relnamespace,
1136 errinfo->relname);
1137 break;
1140 default:
1141 return;
1142 }
1143}
Datum arg
Definition elog.c:1322
#define errcontext
Definition elog.h:198

References arg, errcontext, fb(), PARALLEL_INDVAC_STATUS_COMPLETED, PARALLEL_INDVAC_STATUS_INITIAL, PARALLEL_INDVAC_STATUS_NEED_BULKDELETE, and PARALLEL_INDVAC_STATUS_NEED_CLEANUP.

Referenced by parallel_vacuum_main().

◆ parallel_vacuum_get_dead_items()

TidStore * parallel_vacuum_get_dead_items ( ParallelVacuumState pvs,
VacDeadItemsInfo **  dead_items_info_p 
)

Definition at line 468 of file vacuumparallel.c.

469{
471 return pvs->dead_items;
472}
VacDeadItemsInfo dead_items_info

References ParallelVacuumState::dead_items, PVShared::dead_items_info, fb(), and ParallelVacuumState::shared.

Referenced by dead_items_alloc(), and dead_items_reset().

◆ parallel_vacuum_index_is_parallel_safe()

static bool parallel_vacuum_index_is_parallel_safe ( Relation  indrel,
int  num_index_scans,
bool  vacuum 
)
static

Definition at line 952 of file vacuumparallel.c.

954{
956
957 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
958
959 /* In parallel vacuum case, check if it supports parallel bulk-deletion */
960 if (vacuum)
962
963 /* Not safe, if the index does not support parallel cleanup */
966 return false;
967
968 /*
969 * Not safe, if the index supports parallel cleanup conditionally, but we
970 * have already processed the index (for bulkdelete). We do this to avoid
971 * the need to invoke workers when parallel index cleanup doesn't need to
972 * scan the index. See the comments for option
973 * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
974 * parallel cleanup conditionally.
975 */
976 if (num_index_scans > 0 &&
978 return false;
979
980 return true;
981}
void vacuum(List *relations, const VacuumParams params, BufferAccessStrategy bstrategy, MemoryContext vac_context, bool isTopLevel)
Definition vacuum.c:494

References fb(), vacuum(), VACUUM_OPTION_PARALLEL_BULKDEL, VACUUM_OPTION_PARALLEL_CLEANUP, and VACUUM_OPTION_PARALLEL_COND_CLEANUP.

Referenced by parallel_vacuum_process_all_indexes().

◆ parallel_vacuum_init()

ParallelVacuumState * parallel_vacuum_init ( Relation  rel,
Relation indrels,
int  nindexes,
int  nrequested_workers,
int  vac_work_mem,
int  elevel,
BufferAccessStrategy  bstrategy 
)

Definition at line 244 of file vacuumparallel.c.

247{
249 ParallelContext *pcxt;
250 PVShared *shared;
251 TidStore *dead_items;
252 PVIndStats *indstats;
253 BufferUsage *buffer_usage;
254 WalUsage *wal_usage;
255 bool *will_parallel_vacuum;
258 int nindexes_mwm = 0;
259 int parallel_workers = 0;
260 int querylen;
261
262 /*
263 * A parallel vacuum must be requested and there must be indexes on the
264 * relation
265 */
267 Assert(nindexes > 0);
268
269 /*
270 * Compute the number of parallel vacuum workers to launch
271 */
272 will_parallel_vacuum = palloc0_array(bool, nindexes);
273 parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
275 will_parallel_vacuum);
276 if (parallel_workers <= 0)
277 {
278 /* Can't perform vacuum in parallel -- return NULL */
279 pfree(will_parallel_vacuum);
280 return NULL;
281 }
282
284 pvs->indrels = indrels;
285 pvs->nindexes = nindexes;
286 pvs->will_parallel_vacuum = will_parallel_vacuum;
287 pvs->bstrategy = bstrategy;
288 pvs->heaprel = rel;
289
291 pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
292 parallel_workers);
293 Assert(pcxt->nworkers > 0);
294 pvs->pcxt = pcxt;
295
296 /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
297 est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
300
301 /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
302 est_shared_len = sizeof(PVShared);
305
306 /*
307 * Estimate space for BufferUsage and WalUsage --
308 * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
309 *
310 * If there are no extensions loaded that care, we could skip this. We
311 * have no way of knowing whether anyone's looking at pgBufferUsage or
312 * pgWalUsage, so do it unconditionally.
313 */
315 mul_size(sizeof(BufferUsage), pcxt->nworkers));
318 mul_size(sizeof(WalUsage), pcxt->nworkers));
320
321 /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
323 {
327 }
328 else
329 querylen = 0; /* keep compiler quiet */
330
332
333 /* Prepare index vacuum stats */
334 indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
335 MemSet(indstats, 0, est_indstats_len);
336 for (int i = 0; i < nindexes; i++)
337 {
338 Relation indrel = indrels[i];
340
341 /*
342 * Cleanup option should be either disabled, always performing in
343 * parallel or conditionally performing in parallel.
344 */
348
349 if (!will_parallel_vacuum[i])
350 continue;
351
352 if (indrel->rd_indam->amusemaintenanceworkmem)
353 nindexes_mwm++;
354
355 /*
356 * Remember the number of indexes that support parallel operation for
357 * each phase.
358 */
365 }
367 pvs->indstats = indstats;
368
369 /* Prepare shared information */
370 shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
371 MemSet(shared, 0, est_shared_len);
372 shared->relid = RelationGetRelid(rel);
373 shared->elevel = elevel;
376 (nindexes_mwm > 0) ?
377 maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
379 shared->dead_items_info.max_bytes = vac_work_mem * (size_t) 1024;
380
381 /* Prepare DSA space for dead items */
384 pvs->dead_items = dead_items;
385 shared->dead_items_handle = TidStoreGetHandle(dead_items);
387
388 /* Use the same buffer size for all workers */
389 shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
390
391 pg_atomic_init_u32(&(shared->cost_balance), 0);
392 pg_atomic_init_u32(&(shared->active_nworkers), 0);
393 pg_atomic_init_u32(&(shared->idx), 0);
394
396 pvs->shared = shared;
397
398 /*
399 * Allocate space for each worker's BufferUsage and WalUsage; no need to
400 * initialize
401 */
402 buffer_usage = shm_toc_allocate(pcxt->toc,
403 mul_size(sizeof(BufferUsage), pcxt->nworkers));
405 pvs->buffer_usage = buffer_usage;
406 wal_usage = shm_toc_allocate(pcxt->toc,
407 mul_size(sizeof(WalUsage), pcxt->nworkers));
409 pvs->wal_usage = wal_usage;
410
411 /* Store query string for workers */
413 {
414 char *sharedquery;
415
416 sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
418 sharedquery[querylen] = '\0';
419 shm_toc_insert(pcxt->toc,
421 }
422
423 /* Success -- return parallel vacuum state */
424 return pvs;
425}
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition atomics.h:219
void InitializeParallelDSM(ParallelContext *pcxt)
Definition parallel.c:213
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
Definition parallel.c:175
int64 pgstat_get_my_query_id(void)
#define MemSet(start, val, len)
Definition c.h:1091
size_t Size
Definition c.h:673
dsa_handle dsa_get_handle(dsa_area *area)
Definition dsa.c:498
#define palloc0_array(type, count)
Definition fe_memutils.h:77
int GetAccessStrategyBufferCount(BufferAccessStrategy strategy)
Definition freelist.c:586
int maintenance_work_mem
Definition globals.c:133
const char * debug_query_string
Definition postgres.c:90
#define RelationGetRelid(relation)
Definition rel.h:514
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition shm_toc.c:88
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition shm_toc.c:171
#define shm_toc_estimate_chunk(e, sz)
Definition shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition shm_toc.h:53
Size mul_size(Size s1, Size s2)
Definition shmem.c:497
pg_atomic_uint32 cost_balance
int maintenance_work_mem_worker
int64 queryid
pg_atomic_uint32 active_nworkers
dsa_pointer dead_items_handle
dsa_handle dead_items_dsa_handle
int ring_nbuffers
pg_atomic_uint32 idx
shm_toc_estimator estimator
Definition parallel.h:43
shm_toc * toc
Definition parallel.h:46
BufferAccessStrategy bstrategy
BufferUsage * buffer_usage
size_t max_bytes
Definition vacuum.h:299
dsa_area * TidStoreGetDSA(TidStore *ts)
Definition tidstore.c:544
dsa_pointer TidStoreGetHandle(TidStore *ts)
Definition tidstore.c:552
TidStore * TidStoreCreateShared(size_t max_bytes, int tranche_id)
Definition tidstore.c:208
#define VACUUM_OPTION_MAX_VALID_VALUE
Definition vacuum.h:66
static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, bool *will_parallel_vacuum)
#define PARALLEL_VACUUM_KEY_INDEX_STATS
#define PARALLEL_VACUUM_KEY_QUERY_TEXT
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE
#define PARALLEL_VACUUM_KEY_SHARED
#define PARALLEL_VACUUM_KEY_WAL_USAGE
void EnterParallelMode(void)
Definition xact.c:1053

References PVShared::active_nworkers, IndexAmRoutine::amparallelvacuumoptions, Assert, ParallelVacuumState::bstrategy, ParallelVacuumState::buffer_usage, PVShared::cost_balance, CreateParallelContext(), ParallelVacuumState::dead_items, PVShared::dead_items_dsa_handle, PVShared::dead_items_handle, PVShared::dead_items_info, debug_query_string, dsa_get_handle(), PVShared::elevel, EnterParallelMode(), ParallelContext::estimator, fb(), GetAccessStrategyBufferCount(), ParallelVacuumState::heaprel, i, PVShared::idx, ParallelVacuumState::indrels, ParallelVacuumState::indstats, InitializeParallelDSM(), maintenance_work_mem, PVShared::maintenance_work_mem_worker, VacDeadItemsInfo::max_bytes, MemSet, Min, mul_size(), ParallelVacuumState::nindexes, ParallelVacuumState::nindexes_parallel_bulkdel, ParallelVacuumState::nindexes_parallel_cleanup, ParallelVacuumState::nindexes_parallel_condcleanup, ParallelContext::nworkers, palloc0_array, palloc0_object, parallel_vacuum_compute_workers(), PARALLEL_VACUUM_KEY_BUFFER_USAGE, PARALLEL_VACUUM_KEY_INDEX_STATS, PARALLEL_VACUUM_KEY_QUERY_TEXT, PARALLEL_VACUUM_KEY_SHARED, PARALLEL_VACUUM_KEY_WAL_USAGE, ParallelVacuumState::pcxt, pfree(), pg_atomic_init_u32(), pgstat_get_my_query_id(), PVShared::queryid, RelationData::rd_indam, RelationGetRelid, PVShared::relid, PVShared::ring_nbuffers, ParallelVacuumState::shared, shm_toc_allocate(), shm_toc_estimate_chunk, shm_toc_estimate_keys, shm_toc_insert(), TidStoreCreateShared(), TidStoreGetDSA(), TidStoreGetHandle(), ParallelContext::toc, VACUUM_OPTION_MAX_VALID_VALUE, VACUUM_OPTION_PARALLEL_BULKDEL, VACUUM_OPTION_PARALLEL_CLEANUP, VACUUM_OPTION_PARALLEL_COND_CLEANUP, ParallelVacuumState::wal_usage, and ParallelVacuumState::will_parallel_vacuum.

Referenced by dead_items_alloc().

◆ parallel_vacuum_main()

void parallel_vacuum_main ( dsm_segment seg,
shm_toc toc 
)

Definition at line 990 of file vacuumparallel.c.

991{
993 Relation rel;
994 Relation *indrels;
995 PVIndStats *indstats;
996 PVShared *shared;
997 TidStore *dead_items;
998 BufferUsage *buffer_usage;
999 WalUsage *wal_usage;
1000 int nindexes;
1001 char *sharedquery;
1002 ErrorContextCallback errcallback;
1003
1004 /*
1005 * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
1006 * don't support parallel vacuum for autovacuum as of now.
1007 */
1009
1010 elog(DEBUG1, "starting parallel vacuum worker");
1011
1012 shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
1013
1014 /* Set debug_query_string for individual workers */
1018
1019 /* Track query ID */
1020 pgstat_report_query_id(shared->queryid, false);
1021
1022 /*
1023 * Open table. The lock mode is the same as the leader process. It's
1024 * okay because the lock mode does not conflict among the parallel
1025 * workers.
1026 */
1028
1029 /*
1030 * Open all indexes. indrels are sorted in order by OID, which should be
1031 * matched to the leader's one.
1032 */
1033 vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
1034 Assert(nindexes > 0);
1035
1036 /*
1037 * Apply the desired value of maintenance_work_mem within this process.
1038 * Really we should use SetConfigOption() to change a GUC, but since we're
1039 * already in parallel mode guc.c would complain about that. Fortunately,
1040 * by the same token guc.c will not let any user-defined code change it.
1041 * So just avert your eyes while we do this:
1042 */
1043 if (shared->maintenance_work_mem_worker > 0)
1045
1046 /* Set index statistics */
1047 indstats = (PVIndStats *) shm_toc_lookup(toc,
1049 false);
1050
1051 /* Find dead_items in shared memory */
1052 dead_items = TidStoreAttach(shared->dead_items_dsa_handle,
1053 shared->dead_items_handle);
1054
1055 /* Set cost-based vacuum delay */
1061
1062 /* Set parallel vacuum state */
1063 pvs.indrels = indrels;
1064 pvs.nindexes = nindexes;
1065 pvs.indstats = indstats;
1066 pvs.shared = shared;
1067 pvs.dead_items = dead_items;
1070 pvs.heaprel = rel;
1071
1072 /* These fields will be filled during index vacuum or cleanup */
1073 pvs.indname = NULL;
1075
1076 /* Each parallel VACUUM worker gets its own access strategy. */
1078 shared->ring_nbuffers * (BLCKSZ / 1024));
1079
1080 /* Setup error traceback support for ereport() */
1082 errcallback.arg = &pvs;
1083 errcallback.previous = error_context_stack;
1084 error_context_stack = &errcallback;
1085
1086 /* Prepare to track buffer usage during parallel execution */
1088
1089 /* Process indexes to perform vacuum/cleanup */
1091
1092 /* Report buffer/WAL usage during parallel execution */
1093 buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
1094 wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
1096 &wal_usage[ParallelWorkerNumber]);
1097
1098 /* Report any remaining cost-based vacuum delay time */
1102
1103 TidStoreDetach(dead_items);
1104
1105 /* Pop the error context stack */
1106 error_context_stack = errcallback.previous;
1107
1108 vac_close_indexes(nindexes, indrels, RowExclusiveLock);
1111}
void VacuumUpdateCosts(void)
int ParallelWorkerNumber
Definition parallel.c:117
void pgstat_progress_parallel_incr_param(int index, int64 incr)
void pgstat_report_query_id(int64 query_id, bool force)
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
@ BAS_VACUUM
Definition bufmgr.h:40
ErrorContextCallback * error_context_stack
Definition elog.c:99
#define DEBUG1
Definition elog.h:30
#define elog(elevel,...)
Definition elog.h:226
BufferAccessStrategy GetAccessStrategyWithSize(BufferAccessStrategyType btype, int ring_size_kb)
Definition freelist.c:546
void FreeAccessStrategy(BufferAccessStrategy strategy)
Definition freelist.c:643
int VacuumCostBalance
Definition globals.c:157
void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition instrument.c:209
void InstrStartParallelQuery(void)
Definition instrument.c:201
#define ShareUpdateExclusiveLock
Definition lockdefs.h:39
#define RowExclusiveLock
Definition lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3586
char * pstrdup(const char *in)
Definition mcxt.c:1781
#define PROC_IN_VACUUM
Definition proc.h:59
#define PROGRESS_VACUUM_DELAY_TIME
Definition progress.h:31
#define RelationGetRelationName(relation)
Definition rel.h:548
#define RelationGetNamespace(relation)
Definition rel.h:555
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition shm_toc.c:232
PGPROC * MyProc
Definition proc.c:68
struct ErrorContextCallback * previous
Definition elog.h:297
void(* callback)(void *arg)
Definition elog.h:298
uint8 statusFlags
Definition proc.h:202
PVIndVacStatus status
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
void TidStoreDetach(TidStore *ts)
Definition tidstore.c:269
TidStore * TidStoreAttach(dsa_handle area_handle, dsa_pointer handle)
Definition tidstore.c:244
pg_atomic_uint32 * VacuumActiveNWorkers
Definition vacuum.c:118
bool track_cost_delay_timing
Definition vacuum.c:83
void vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel)
Definition vacuum.c:2367
int VacuumCostBalanceLocal
Definition vacuum.c:119
int64 parallel_vacuum_worker_delay_ns
Definition vacuum.c:96
void vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
Definition vacuum.c:2410
pg_atomic_uint32 * VacuumSharedCostBalance
Definition vacuum.c:117
static void parallel_vacuum_error_callback(void *arg)
static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)

References PVShared::active_nworkers, ErrorContextCallback::arg, Assert, BAS_VACUUM, ParallelVacuumState::bstrategy, ErrorContextCallback::callback, PVShared::cost_balance, ParallelVacuumState::dead_items, PVShared::dead_items_dsa_handle, PVShared::dead_items_handle, DEBUG1, debug_query_string, elog, error_context_stack, fb(), FreeAccessStrategy(), get_namespace_name(), GetAccessStrategyWithSize(), ParallelVacuumState::heaprel, ParallelVacuumState::indname, ParallelVacuumState::indrels, ParallelVacuumState::indstats, InstrEndParallelQuery(), InstrStartParallelQuery(), maintenance_work_mem, PVShared::maintenance_work_mem_worker, MyProc, ParallelVacuumState::nindexes, PARALLEL_INDVAC_STATUS_INITIAL, parallel_vacuum_error_callback(), PARALLEL_VACUUM_KEY_BUFFER_USAGE, PARALLEL_VACUUM_KEY_INDEX_STATS, PARALLEL_VACUUM_KEY_QUERY_TEXT, PARALLEL_VACUUM_KEY_SHARED, PARALLEL_VACUUM_KEY_WAL_USAGE, parallel_vacuum_process_safe_indexes(), parallel_vacuum_worker_delay_ns, ParallelWorkerNumber, pgstat_progress_parallel_incr_param(), pgstat_report_activity(), pgstat_report_query_id(), ErrorContextCallback::previous, PROC_IN_VACUUM, PROGRESS_VACUUM_DELAY_TIME, pstrdup(), PVShared::queryid, RelationGetNamespace, RelationGetRelationName, PVShared::relid, ParallelVacuumState::relname, ParallelVacuumState::relnamespace, PVShared::ring_nbuffers, RowExclusiveLock, ParallelVacuumState::shared, ShareUpdateExclusiveLock, shm_toc_lookup(), STATE_RUNNING, ParallelVacuumState::status, PGPROC::statusFlags, table_close(), table_open(), TidStoreAttach(), TidStoreDetach(), track_cost_delay_timing, vac_close_indexes(), vac_open_indexes(), VacuumActiveNWorkers, VacuumCostBalance, VacuumCostBalanceLocal, VacuumSharedCostBalance, and VacuumUpdateCosts().

◆ parallel_vacuum_process_all_indexes()

static void parallel_vacuum_process_all_indexes ( ParallelVacuumState pvs,
int  num_index_scans,
bool  vacuum 
)
static

Definition at line 612 of file vacuumparallel.c.

614{
615 int nworkers;
617
619
620 if (vacuum)
621 {
623
624 /* Determine the number of parallel workers to launch */
625 nworkers = pvs->nindexes_parallel_bulkdel;
626 }
627 else
628 {
630
631 /* Determine the number of parallel workers to launch */
632 nworkers = pvs->nindexes_parallel_cleanup;
633
634 /* Add conditionally parallel-aware indexes if in the first time call */
635 if (num_index_scans == 0)
636 nworkers += pvs->nindexes_parallel_condcleanup;
637 }
638
639 /* The leader process will participate */
640 nworkers--;
641
642 /*
643 * It is possible that parallel context is initialized with fewer workers
644 * than the number of indexes that need a separate worker in the current
645 * phase, so we need to consider it. See
646 * parallel_vacuum_compute_workers().
647 */
648 nworkers = Min(nworkers, pvs->pcxt->nworkers);
649
650 /*
651 * Set index vacuum status and mark whether parallel vacuum worker can
652 * process it.
653 */
654 for (int i = 0; i < pvs->nindexes; i++)
655 {
656 PVIndStats *indstats = &(pvs->indstats[i]);
657
659 indstats->status = new_status;
661 (pvs->will_parallel_vacuum[i] &&
663 num_index_scans,
664 vacuum));
665 }
666
667 /* Reset the parallel index processing and progress counters */
668 pg_atomic_write_u32(&(pvs->shared->idx), 0);
669
670 /* Setup the shared cost-based vacuum delay and launch workers */
671 if (nworkers > 0)
672 {
673 /* Reinitialize parallel context to relaunch parallel workers */
674 if (num_index_scans > 0)
676
677 /*
678 * Set up shared cost balance and the number of active workers for
679 * vacuum delay. We need to do this before launching workers as
680 * otherwise, they might not see the updated values for these
681 * parameters.
682 */
685
686 /*
687 * The number of workers can vary between bulkdelete and cleanup
688 * phase.
689 */
690 ReinitializeParallelWorkers(pvs->pcxt, nworkers);
691
693
694 if (pvs->pcxt->nworkers_launched > 0)
695 {
696 /*
697 * Reset the local cost values for leader backend as we have
698 * already accumulated the remaining balance of heap.
699 */
702
703 /* Enable shared cost balance for leader backend */
706 }
707
708 if (vacuum)
709 ereport(pvs->shared->elevel,
710 (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
711 "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
712 pvs->pcxt->nworkers_launched),
713 pvs->pcxt->nworkers_launched, nworkers)));
714 else
715 ereport(pvs->shared->elevel,
716 (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
717 "launched %d parallel vacuum workers for index cleanup (planned: %d)",
718 pvs->pcxt->nworkers_launched),
719 pvs->pcxt->nworkers_launched, nworkers)));
720 }
721
722 /* Vacuum the indexes that can be processed by only leader process */
724
725 /*
726 * Join as a parallel worker. The leader vacuums alone processes all
727 * parallel-safe indexes in the case where no workers are launched.
728 */
730
731 /*
732 * Next, accumulate buffer and WAL usage. (This must wait for the workers
733 * to finish, or we might get incomplete data.)
734 */
735 if (nworkers > 0)
736 {
737 /* Wait for all vacuum workers to finish */
739
740 for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
742 }
743
744 /*
745 * Reset all index status back to initial (while checking that we have
746 * vacuumed all indexes).
747 */
748 for (int i = 0; i < pvs->nindexes; i++)
749 {
750 PVIndStats *indstats = &(pvs->indstats[i]);
751
753 elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
755
757 }
758
759 /*
760 * Carry the shared balance value to heap scan and disable shared costing
761 */
763 {
767 }
768}
static void pg_atomic_write_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition atomics.h:274
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition atomics.h:237
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
Definition parallel.c:805
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition parallel.c:583
void ReinitializeParallelDSM(ParallelContext *pcxt)
Definition parallel.c:511
void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
Definition parallel.c:568
#define ngettext(s, p, n)
Definition c.h:1254
#define ERROR
Definition elog.h:39
#define ereport(elevel,...)
Definition elog.h:150
void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition instrument.c:219
static char * errmsg
bool parallel_workers_can_process
PVIndVacStatus status
int nworkers_launched
Definition parallel.h:39
static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, bool vacuum)
static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)

References PVShared::active_nworkers, Assert, ParallelVacuumState::buffer_usage, PVShared::cost_balance, PVShared::elevel, elog, ereport, errmsg, ERROR, fb(), i, PVShared::idx, ParallelVacuumState::indrels, ParallelVacuumState::indstats, InstrAccumParallelQuery(), IsParallelWorker, LaunchParallelWorkers(), Min, ngettext, ParallelVacuumState::nindexes, ParallelVacuumState::nindexes_parallel_bulkdel, ParallelVacuumState::nindexes_parallel_cleanup, ParallelVacuumState::nindexes_parallel_condcleanup, ParallelContext::nworkers, ParallelContext::nworkers_launched, PARALLEL_INDVAC_STATUS_COMPLETED, PARALLEL_INDVAC_STATUS_INITIAL, PARALLEL_INDVAC_STATUS_NEED_BULKDELETE, PARALLEL_INDVAC_STATUS_NEED_CLEANUP, parallel_vacuum_index_is_parallel_safe(), parallel_vacuum_process_safe_indexes(), parallel_vacuum_process_unsafe_indexes(), PVIndStats::parallel_workers_can_process, ParallelVacuumState::pcxt, pg_atomic_read_u32(), pg_atomic_write_u32(), ReinitializeParallelDSM(), ReinitializeParallelWorkers(), RelationGetRelationName, ParallelVacuumState::shared, PVIndStats::status, vacuum(), VacuumActiveNWorkers, VacuumCostBalance, VacuumCostBalanceLocal, VacuumSharedCostBalance, WaitForParallelWorkersToFinish(), ParallelVacuumState::wal_usage, and ParallelVacuumState::will_parallel_vacuum.

Referenced by parallel_vacuum_bulkdel_all_indexes(), and parallel_vacuum_cleanup_all_indexes().

◆ parallel_vacuum_process_one_index()

static void parallel_vacuum_process_one_index ( ParallelVacuumState pvs,
Relation  indrel,
PVIndStats indstats 
)
static

Definition at line 866 of file vacuumparallel.c.

868{
872
873 /*
874 * Update the pointer to the corresponding bulk-deletion result if someone
875 * has already updated it
876 */
877 if (indstats->istat_updated)
878 istat = &(indstats->istat);
879
880 ivinfo.index = indrel;
881 ivinfo.heaprel = pvs->heaprel;
882 ivinfo.analyze_only = false;
883 ivinfo.report_progress = false;
884 ivinfo.message_level = DEBUG2;
885 ivinfo.estimated_count = pvs->shared->estimated_count;
886 ivinfo.num_heap_tuples = pvs->shared->reltuples;
887 ivinfo.strategy = pvs->bstrategy;
888
889 /* Update error traceback information */
891 pvs->status = indstats->status;
892
893 switch (indstats->status)
894 {
897 &pvs->shared->dead_items_info);
898 break;
901 break;
902 default:
903 elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
904 indstats->status,
906 }
907
908 /*
909 * Copy the index bulk-deletion result returned from ambulkdelete and
910 * amvacuumcleanup to the DSM segment if it's the first cycle because they
911 * allocate locally and it's possible that an index will be vacuumed by a
912 * different vacuum process the next cycle. Copying the result normally
913 * happens only the first time an index is vacuumed. For any additional
914 * vacuum pass, we directly point to the result on the DSM segment and
915 * pass it to vacuum index APIs so that workers can update it directly.
916 *
917 * Since all vacuum workers write the bulk-deletion result at different
918 * slots we can write them without locking.
919 */
920 if (!indstats->istat_updated && istat_res != NULL)
921 {
922 memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
923 indstats->istat_updated = true;
924
925 /* Free the locally-allocated bulk-deletion result */
927 }
928
929 /*
930 * Update the status to completed. No need to lock here since each worker
931 * touches different indexes.
932 */
934
935 /* Reset error traceback information */
937 pfree(pvs->indname);
938 pvs->indname = NULL;
939
940 /*
941 * Call the parallel variant of pgstat_progress_incr_param so workers can
942 * report progress of index vacuum to the leader.
943 */
945}
#define DEBUG2
Definition elog.h:29
#define PROGRESS_VACUUM_INDEXES_PROCESSED
Definition progress.h:30
IndexBulkDeleteResult * vac_cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat)
Definition vacuum.c:2659
IndexBulkDeleteResult * vac_bulkdel_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat, TidStore *dead_items, VacDeadItemsInfo *dead_items_info)
Definition vacuum.c:2638

References ParallelVacuumState::bstrategy, ParallelVacuumState::dead_items, PVShared::dead_items_info, DEBUG2, elog, ERROR, PVShared::estimated_count, fb(), ParallelVacuumState::heaprel, ParallelVacuumState::indname, PVIndStats::istat, PVIndStats::istat_updated, PARALLEL_INDVAC_STATUS_COMPLETED, PARALLEL_INDVAC_STATUS_NEED_BULKDELETE, PARALLEL_INDVAC_STATUS_NEED_CLEANUP, pfree(), pgstat_progress_parallel_incr_param(), PROGRESS_VACUUM_INDEXES_PROCESSED, pstrdup(), RelationGetRelationName, PVShared::reltuples, ParallelVacuumState::shared, PVIndStats::status, ParallelVacuumState::status, vac_bulkdel_one_index(), and vac_cleanup_one_index().

Referenced by parallel_vacuum_process_safe_indexes(), and parallel_vacuum_process_unsafe_indexes().

◆ parallel_vacuum_process_safe_indexes()

static void parallel_vacuum_process_safe_indexes ( ParallelVacuumState pvs)
static

Definition at line 775 of file vacuumparallel.c.

776{
777 /*
778 * Increment the active worker count if we are able to launch any worker.
779 */
782
783 /* Loop until all indexes are vacuumed */
784 for (;;)
785 {
786 int idx;
787 PVIndStats *indstats;
788
789 /* Get an index number to process */
790 idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
791
792 /* Done for all indexes? */
793 if (idx >= pvs->nindexes)
794 break;
795
796 indstats = &(pvs->indstats[idx]);
797
798 /*
799 * Skip vacuuming index that is unsafe for workers or has an
800 * unsuitable target for parallel index vacuum (this is vacuumed in
801 * parallel_vacuum_process_unsafe_indexes() by the leader).
802 */
803 if (!indstats->parallel_workers_can_process)
804 continue;
805
806 /* Do vacuum or cleanup of the index */
807 parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
808 }
809
810 /*
811 * We have completed the index vacuum so decrement the active worker
812 * count.
813 */
816}
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:262
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition atomics.h:439
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition atomics.h:366
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition atomics.h:424
static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, PVIndStats *indstats)

References idx(), PVShared::idx, ParallelVacuumState::indrels, ParallelVacuumState::indstats, ParallelVacuumState::nindexes, parallel_vacuum_process_one_index(), PVIndStats::parallel_workers_can_process, pg_atomic_add_fetch_u32(), pg_atomic_fetch_add_u32(), pg_atomic_sub_fetch_u32(), ParallelVacuumState::shared, and VacuumActiveNWorkers.

Referenced by parallel_vacuum_main(), and parallel_vacuum_process_all_indexes().

◆ parallel_vacuum_process_unsafe_indexes()

static void parallel_vacuum_process_unsafe_indexes ( ParallelVacuumState pvs)
static

Definition at line 829 of file vacuumparallel.c.

830{
832
833 /*
834 * Increment the active worker count if we are able to launch any worker.
835 */
838
839 for (int i = 0; i < pvs->nindexes; i++)
840 {
841 PVIndStats *indstats = &(pvs->indstats[i]);
842
843 /* Skip, indexes that are safe for workers */
844 if (indstats->parallel_workers_can_process)
845 continue;
846
847 /* Do vacuum or cleanup of the index */
848 parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
849 }
850
851 /*
852 * We have completed the index vacuum so decrement the active worker
853 * count.
854 */
857}

References Assert, i, ParallelVacuumState::indrels, ParallelVacuumState::indstats, IsParallelWorker, ParallelVacuumState::nindexes, parallel_vacuum_process_one_index(), PVIndStats::parallel_workers_can_process, pg_atomic_add_fetch_u32(), pg_atomic_sub_fetch_u32(), and VacuumActiveNWorkers.

Referenced by parallel_vacuum_process_all_indexes().

◆ parallel_vacuum_reset_dead_items()

void parallel_vacuum_reset_dead_items ( ParallelVacuumState pvs)

Definition at line 476 of file vacuumparallel.c.

477{
478 VacDeadItemsInfo *dead_items_info = &(pvs->shared->dead_items_info);
479
480 /*
481 * Free the current tidstore and return allocated DSA segments to the
482 * operating system. Then we recreate the tidstore with the same max_bytes
483 * limitation we just used.
484 */
486 pvs->dead_items = TidStoreCreateShared(dead_items_info->max_bytes,
488
489 /* Update the DSA pointer for dead_items to the new one */
492
493 /* Reset the counter */
494 dead_items_info->num_items = 0;
495}
int64 num_items
Definition vacuum.h:300

References ParallelVacuumState::dead_items, PVShared::dead_items_dsa_handle, PVShared::dead_items_handle, PVShared::dead_items_info, dsa_get_handle(), fb(), VacDeadItemsInfo::max_bytes, VacDeadItemsInfo::num_items, ParallelVacuumState::shared, TidStoreCreateShared(), TidStoreDestroy(), TidStoreGetDSA(), and TidStoreGetHandle().

Referenced by dead_items_reset().