PostgreSQL Source Code  git master
vacuumparallel.c File Reference
#include "postgres.h"
#include "access/amapi.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/index.h"
#include "commands/vacuum.h"
#include "optimizer/paths.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
Include dependency graph for vacuumparallel.c:

Go to the source code of this file.

Data Structures

struct  PVShared
 
struct  PVIndStats
 
struct  ParallelVacuumState
 

Macros

#define PARALLEL_VACUUM_KEY_SHARED   1
 
#define PARALLEL_VACUUM_KEY_DEAD_ITEMS   2
 
#define PARALLEL_VACUUM_KEY_QUERY_TEXT   3
 
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE   4
 
#define PARALLEL_VACUUM_KEY_WAL_USAGE   5
 
#define PARALLEL_VACUUM_KEY_INDEX_STATS   6
 

Typedefs

typedef struct PVShared PVShared
 
typedef enum PVIndVacStatus PVIndVacStatus
 
typedef struct PVIndStats PVIndStats
 

Enumerations

enum  PVIndVacStatus { PARALLEL_INDVAC_STATUS_INITIAL = 0 , PARALLEL_INDVAC_STATUS_NEED_BULKDELETE , PARALLEL_INDVAC_STATUS_NEED_CLEANUP , PARALLEL_INDVAC_STATUS_COMPLETED }
 

Functions

static int parallel_vacuum_compute_workers (Relation *indrels, int nindexes, int nrequested, bool *will_parallel_vacuum)
 
static void parallel_vacuum_process_all_indexes (ParallelVacuumState *pvs, int num_index_scans, bool vacuum)
 
static void parallel_vacuum_process_safe_indexes (ParallelVacuumState *pvs)
 
static void parallel_vacuum_process_unsafe_indexes (ParallelVacuumState *pvs)
 
static void parallel_vacuum_process_one_index (ParallelVacuumState *pvs, Relation indrel, PVIndStats *indstats)
 
static bool parallel_vacuum_index_is_parallel_safe (Relation indrel, int num_index_scans, bool vacuum)
 
static void parallel_vacuum_error_callback (void *arg)
 
ParallelVacuumStateparallel_vacuum_init (Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int max_items, int elevel, BufferAccessStrategy bstrategy)
 
void parallel_vacuum_end (ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
 
VacDeadItemsparallel_vacuum_get_dead_items (ParallelVacuumState *pvs)
 
void parallel_vacuum_bulkdel_all_indexes (ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans)
 
void parallel_vacuum_cleanup_all_indexes (ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, bool estimated_count)
 
void parallel_vacuum_main (dsm_segment *seg, shm_toc *toc)
 

Macro Definition Documentation

◆ PARALLEL_VACUUM_KEY_BUFFER_USAGE

#define PARALLEL_VACUUM_KEY_BUFFER_USAGE   4

Definition at line 49 of file vacuumparallel.c.

◆ PARALLEL_VACUUM_KEY_DEAD_ITEMS

#define PARALLEL_VACUUM_KEY_DEAD_ITEMS   2

Definition at line 47 of file vacuumparallel.c.

◆ PARALLEL_VACUUM_KEY_INDEX_STATS

#define PARALLEL_VACUUM_KEY_INDEX_STATS   6

Definition at line 51 of file vacuumparallel.c.

◆ PARALLEL_VACUUM_KEY_QUERY_TEXT

#define PARALLEL_VACUUM_KEY_QUERY_TEXT   3

Definition at line 48 of file vacuumparallel.c.

◆ PARALLEL_VACUUM_KEY_SHARED

#define PARALLEL_VACUUM_KEY_SHARED   1

Definition at line 46 of file vacuumparallel.c.

◆ PARALLEL_VACUUM_KEY_WAL_USAGE

#define PARALLEL_VACUUM_KEY_WAL_USAGE   5

Definition at line 50 of file vacuumparallel.c.

Typedef Documentation

◆ PVIndStats

typedef struct PVIndStats PVIndStats

◆ PVIndVacStatus

◆ PVShared

typedef struct PVShared PVShared

Enumeration Type Documentation

◆ PVIndVacStatus

Enumerator
PARALLEL_INDVAC_STATUS_INITIAL 
PARALLEL_INDVAC_STATUS_NEED_BULKDELETE 
PARALLEL_INDVAC_STATUS_NEED_CLEANUP 
PARALLEL_INDVAC_STATUS_COMPLETED 

Definition at line 109 of file vacuumparallel.c.

110 {
PVIndVacStatus
@ PARALLEL_INDVAC_STATUS_NEED_CLEANUP
@ PARALLEL_INDVAC_STATUS_INITIAL
@ PARALLEL_INDVAC_STATUS_NEED_BULKDELETE
@ PARALLEL_INDVAC_STATUS_COMPLETED

Function Documentation

◆ parallel_vacuum_bulkdel_all_indexes()

void parallel_vacuum_bulkdel_all_indexes ( ParallelVacuumState pvs,
long  num_table_tuples,
int  num_index_scans 
)

Definition at line 454 of file vacuumparallel.c.

456 {
458 
459  /*
460  * We can only provide an approximate value of num_heap_tuples, at least
461  * for now.
462  */
463  pvs->shared->reltuples = num_table_tuples;
464  pvs->shared->estimated_count = true;
465 
466  parallel_vacuum_process_all_indexes(pvs, num_index_scans, true);
467 }
#define IsParallelWorker()
Definition: parallel.h:61
Assert(fmt[strlen(fmt) - 1] !='\n')
double reltuples
bool estimated_count
static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, bool vacuum)

References Assert(), PVShared::estimated_count, IsParallelWorker, parallel_vacuum_process_all_indexes(), PVShared::reltuples, and ParallelVacuumState::shared.

Referenced by lazy_vacuum_all_indexes().

◆ parallel_vacuum_cleanup_all_indexes()

void parallel_vacuum_cleanup_all_indexes ( ParallelVacuumState pvs,
long  num_table_tuples,
int  num_index_scans,
bool  estimated_count 
)

Definition at line 473 of file vacuumparallel.c.

475 {
477 
478  /*
479  * We can provide a better estimate of total number of surviving tuples
480  * (we assume indexes are more interested in that than in the number of
481  * nominally live tuples).
482  */
483  pvs->shared->reltuples = num_table_tuples;
484  pvs->shared->estimated_count = estimated_count;
485 
486  parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
487 }

References Assert(), PVShared::estimated_count, IsParallelWorker, parallel_vacuum_process_all_indexes(), PVShared::reltuples, and ParallelVacuumState::shared.

Referenced by lazy_cleanup_all_indexes().

◆ parallel_vacuum_compute_workers()

static int parallel_vacuum_compute_workers ( Relation indrels,
int  nindexes,
int  nrequested,
bool will_parallel_vacuum 
)
static

Definition at line 503 of file vacuumparallel.c.

505 {
506  int nindexes_parallel = 0;
507  int nindexes_parallel_bulkdel = 0;
508  int nindexes_parallel_cleanup = 0;
509  int parallel_workers;
510 
511  /*
512  * We don't allow performing parallel operation in standalone backend or
513  * when parallelism is disabled.
514  */
516  return 0;
517 
518  /*
519  * Compute the number of indexes that can participate in parallel vacuum.
520  */
521  for (int i = 0; i < nindexes; i++)
522  {
523  Relation indrel = indrels[i];
524  uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
525 
526  /* Skip index that is not a suitable target for parallel index vacuum */
527  if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
529  continue;
530 
531  will_parallel_vacuum[i] = true;
532 
533  if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
534  nindexes_parallel_bulkdel++;
535  if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
536  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
537  nindexes_parallel_cleanup++;
538  }
539 
540  nindexes_parallel = Max(nindexes_parallel_bulkdel,
541  nindexes_parallel_cleanup);
542 
543  /* The leader process takes one index */
544  nindexes_parallel--;
545 
546  /* No index supports parallel vacuum */
547  if (nindexes_parallel <= 0)
548  return 0;
549 
550  /* Compute the parallel degree */
551  parallel_workers = (nrequested > 0) ?
552  Min(nrequested, nindexes_parallel) : nindexes_parallel;
553 
554  /* Cap by max_parallel_maintenance_workers */
555  parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
556 
557  return parallel_workers;
558 }
int min_parallel_index_scan_size
Definition: allpaths.c:66
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:216
#define Min(x, y)
Definition: c.h:986
#define Max(x, y)
Definition: c.h:980
unsigned char uint8
Definition: c.h:439
int max_parallel_maintenance_workers
Definition: globals.c:128
bool IsUnderPostmaster
Definition: globals.c:113
int i
Definition: isn.c:73
uint8 amparallelvacuumoptions
Definition: amapi.h:250
struct IndexAmRoutine * rd_indam
Definition: rel.h:202
#define VACUUM_OPTION_PARALLEL_CLEANUP
Definition: vacuum.h:62
#define VACUUM_OPTION_NO_PARALLEL
Definition: vacuum.h:41
#define VACUUM_OPTION_PARALLEL_BULKDEL
Definition: vacuum.h:47
#define VACUUM_OPTION_PARALLEL_COND_CLEANUP
Definition: vacuum.h:54

References IndexAmRoutine::amparallelvacuumoptions, i, IsUnderPostmaster, Max, max_parallel_maintenance_workers, Min, min_parallel_index_scan_size, RelationData::rd_indam, RelationGetNumberOfBlocks, VACUUM_OPTION_NO_PARALLEL, VACUUM_OPTION_PARALLEL_BULKDEL, VACUUM_OPTION_PARALLEL_CLEANUP, and VACUUM_OPTION_PARALLEL_COND_CLEANUP.

Referenced by parallel_vacuum_init().

◆ parallel_vacuum_end()

void parallel_vacuum_end ( ParallelVacuumState pvs,
IndexBulkDeleteResult **  istats 
)

Definition at line 418 of file vacuumparallel.c.

419 {
421 
422  /* Copy the updated statistics */
423  for (int i = 0; i < pvs->nindexes; i++)
424  {
425  PVIndStats *indstats = &(pvs->indstats[i]);
426 
427  if (indstats->istat_updated)
428  {
429  istats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
430  memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
431  }
432  else
433  istats[i] = NULL;
434  }
435 
438 
440  pfree(pvs);
441 }
void DestroyParallelContext(ParallelContext *pcxt)
Definition: parallel.c:916
void pfree(void *pointer)
Definition: mcxt.c:1175
void * palloc0(Size size)
Definition: mcxt.c:1099
bool istat_updated
IndexBulkDeleteResult istat
ParallelContext * pcxt
PVIndStats * indstats
void ExitParallelMode(void)
Definition: xact.c:1045

References Assert(), DestroyParallelContext(), ExitParallelMode(), i, ParallelVacuumState::indstats, IsParallelWorker, PVIndStats::istat, PVIndStats::istat_updated, ParallelVacuumState::nindexes, palloc0(), ParallelVacuumState::pcxt, pfree(), and ParallelVacuumState::will_parallel_vacuum.

Referenced by dead_items_cleanup().

◆ parallel_vacuum_error_callback()

static void parallel_vacuum_error_callback ( void *  arg)
static

Definition at line 1050 of file vacuumparallel.c.

1051 {
1052  ParallelVacuumState *errinfo = arg;
1053 
1054  switch (errinfo->status)
1055  {
1057  errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
1058  errinfo->indname,
1059  errinfo->relnamespace,
1060  errinfo->relname);
1061  break;
1063  errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
1064  errinfo->indname,
1065  errinfo->relnamespace,
1066  errinfo->relname);
1067  break;
1070  default:
1071  return;
1072  }
1073 }
#define errcontext
Definition: elog.h:190
void * arg
PVIndVacStatus status

References arg, errcontext, ParallelVacuumState::indname, PARALLEL_INDVAC_STATUS_COMPLETED, PARALLEL_INDVAC_STATUS_INITIAL, PARALLEL_INDVAC_STATUS_NEED_BULKDELETE, PARALLEL_INDVAC_STATUS_NEED_CLEANUP, ParallelVacuumState::relname, ParallelVacuumState::relnamespace, and ParallelVacuumState::status.

Referenced by parallel_vacuum_main().

◆ parallel_vacuum_get_dead_items()

VacDeadItems* parallel_vacuum_get_dead_items ( ParallelVacuumState pvs)

Definition at line 445 of file vacuumparallel.c.

446 {
447  return pvs->dead_items;
448 }
VacDeadItems * dead_items

References ParallelVacuumState::dead_items.

Referenced by dead_items_alloc().

◆ parallel_vacuum_index_is_parallel_safe()

static bool parallel_vacuum_index_is_parallel_safe ( Relation  indrel,
int  num_index_scans,
bool  vacuum 
)
static

Definition at line 897 of file vacuumparallel.c.

899 {
900  uint8 vacoptions;
901 
902  vacoptions = indrel->rd_indam->amparallelvacuumoptions;
903 
904  /* In parallel vacuum case, check if it supports parallel bulk-deletion */
905  if (vacuum)
906  return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
907 
908  /* Not safe, if the index does not support parallel cleanup */
909  if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
910  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
911  return false;
912 
913  /*
914  * Not safe, if the index supports parallel cleanup conditionally, but we
915  * have already processed the index (for bulkdelete). We do this to avoid
916  * the need to invoke workers when parallel index cleanup doesn't need to
917  * scan the index. See the comments for option
918  * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
919  * parallel cleanup conditionally.
920  */
921  if (num_index_scans > 0 &&
922  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
923  return false;
924 
925  return true;
926 }
void vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, bool isTopLevel)
Definition: vacuum.c:298

References IndexAmRoutine::amparallelvacuumoptions, RelationData::rd_indam, vacuum(), VACUUM_OPTION_PARALLEL_BULKDEL, VACUUM_OPTION_PARALLEL_CLEANUP, and VACUUM_OPTION_PARALLEL_COND_CLEANUP.

Referenced by parallel_vacuum_process_all_indexes().

◆ parallel_vacuum_init()

ParallelVacuumState* parallel_vacuum_init ( Relation  rel,
Relation indrels,
int  nindexes,
int  nrequested_workers,
int  max_items,
int  elevel,
BufferAccessStrategy  bstrategy 
)

Definition at line 224 of file vacuumparallel.c.

227 {
228  ParallelVacuumState *pvs;
229  ParallelContext *pcxt;
230  PVShared *shared;
231  VacDeadItems *dead_items;
232  PVIndStats *indstats;
233  BufferUsage *buffer_usage;
234  WalUsage *wal_usage;
235  bool *will_parallel_vacuum;
236  Size est_indstats_len;
237  Size est_shared_len;
238  Size est_dead_items_len;
239  int nindexes_mwm = 0;
240  int parallel_workers = 0;
241  int querylen;
242 
243  /*
244  * A parallel vacuum must be requested and there must be indexes on the
245  * relation
246  */
247  Assert(nrequested_workers >= 0);
248  Assert(nindexes > 0);
249 
250  /*
251  * Compute the number of parallel vacuum workers to launch
252  */
253  will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
254  parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
255  nrequested_workers,
256  will_parallel_vacuum);
257  if (parallel_workers <= 0)
258  {
259  /* Can't perform vacuum in parallel -- return NULL */
260  pfree(will_parallel_vacuum);
261  return NULL;
262  }
263 
265  pvs->indrels = indrels;
266  pvs->nindexes = nindexes;
267  pvs->will_parallel_vacuum = will_parallel_vacuum;
268  pvs->bstrategy = bstrategy;
269 
271  pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
272  parallel_workers);
273  Assert(pcxt->nworkers > 0);
274  pvs->pcxt = pcxt;
275 
276  /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
277  est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
278  shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
279  shm_toc_estimate_keys(&pcxt->estimator, 1);
280 
281  /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
282  est_shared_len = sizeof(PVShared);
283  shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
284  shm_toc_estimate_keys(&pcxt->estimator, 1);
285 
286  /* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */
287  est_dead_items_len = vac_max_items_to_alloc_size(max_items);
288  shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len);
289  shm_toc_estimate_keys(&pcxt->estimator, 1);
290 
291  /*
292  * Estimate space for BufferUsage and WalUsage --
293  * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
294  *
295  * If there are no extensions loaded that care, we could skip this. We
296  * have no way of knowing whether anyone's looking at pgBufferUsage or
297  * pgWalUsage, so do it unconditionally.
298  */
300  mul_size(sizeof(BufferUsage), pcxt->nworkers));
301  shm_toc_estimate_keys(&pcxt->estimator, 1);
303  mul_size(sizeof(WalUsage), pcxt->nworkers));
304  shm_toc_estimate_keys(&pcxt->estimator, 1);
305 
306  /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
307  if (debug_query_string)
308  {
309  querylen = strlen(debug_query_string);
310  shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
311  shm_toc_estimate_keys(&pcxt->estimator, 1);
312  }
313  else
314  querylen = 0; /* keep compiler quiet */
315 
316  InitializeParallelDSM(pcxt);
317 
318  /* Prepare index vacuum stats */
319  indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
320  for (int i = 0; i < nindexes; i++)
321  {
322  Relation indrel = indrels[i];
323  uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
324 
325  /*
326  * Cleanup option should be either disabled, always performing in
327  * parallel or conditionally performing in parallel.
328  */
329  Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
330  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
331  Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
332 
333  if (!will_parallel_vacuum[i])
334  continue;
335 
336  if (indrel->rd_indam->amusemaintenanceworkmem)
337  nindexes_mwm++;
338 
339  /*
340  * Remember the number of indexes that support parallel operation for
341  * each phase.
342  */
343  if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
345  if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
347  if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
349  }
351  pvs->indstats = indstats;
352 
353  /* Prepare shared information */
354  shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
355  MemSet(shared, 0, est_shared_len);
356  shared->relid = RelationGetRelid(rel);
357  shared->elevel = elevel;
359  (nindexes_mwm > 0) ?
360  maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
362 
363  pg_atomic_init_u32(&(shared->cost_balance), 0);
364  pg_atomic_init_u32(&(shared->active_nworkers), 0);
365  pg_atomic_init_u32(&(shared->idx), 0);
366 
368  pvs->shared = shared;
369 
370  /* Prepare the dead_items space */
371  dead_items = (VacDeadItems *) shm_toc_allocate(pcxt->toc,
372  est_dead_items_len);
373  dead_items->max_items = max_items;
374  dead_items->num_items = 0;
375  MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items);
377  pvs->dead_items = dead_items;
378 
379  /*
380  * Allocate space for each worker's BufferUsage and WalUsage; no need to
381  * initialize
382  */
383  buffer_usage = shm_toc_allocate(pcxt->toc,
384  mul_size(sizeof(BufferUsage), pcxt->nworkers));
386  pvs->buffer_usage = buffer_usage;
387  wal_usage = shm_toc_allocate(pcxt->toc,
388  mul_size(sizeof(WalUsage), pcxt->nworkers));
390  pvs->wal_usage = wal_usage;
391 
392  /* Store query string for workers */
393  if (debug_query_string)
394  {
395  char *sharedquery;
396 
397  sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
398  memcpy(sharedquery, debug_query_string, querylen + 1);
399  sharedquery[querylen] = '\0';
400  shm_toc_insert(pcxt->toc,
401  PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
402  }
403 
404  /* Success -- return parallel vacuum state */
405  return pvs;
406 }
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:223
void InitializeParallelDSM(ParallelContext *pcxt)
Definition: parallel.c:202
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
Definition: parallel.c:164
#define MemSet(start, val, len)
Definition: c.h:1008
size_t Size
Definition: c.h:540
int maintenance_work_mem
Definition: globals.c:127
const char * debug_query_string
Definition: postgres.c:89
#define RelationGetRelid(relation)
Definition: rel.h:489
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
bool amusemaintenanceworkmem
Definition: amapi.h:246
pg_atomic_uint32 cost_balance
int maintenance_work_mem_worker
pg_atomic_uint32 active_nworkers
pg_atomic_uint32 idx
shm_toc_estimator estimator
Definition: parallel.h:42
shm_toc * toc
Definition: parallel.h:45
BufferAccessStrategy bstrategy
BufferUsage * buffer_usage
ItemPointerData items[FLEXIBLE_ARRAY_MEMBER]
Definition: vacuum.h:247
int max_items
Definition: vacuum.h:243
int num_items
Definition: vacuum.h:244
Size vac_max_items_to_alloc_size(int max_items)
Definition: vacuum.c:2372
#define VACUUM_OPTION_MAX_VALID_VALUE
Definition: vacuum.h:65
static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, bool *will_parallel_vacuum)
#define PARALLEL_VACUUM_KEY_INDEX_STATS
#define PARALLEL_VACUUM_KEY_QUERY_TEXT
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE
#define PARALLEL_VACUUM_KEY_SHARED
#define PARALLEL_VACUUM_KEY_WAL_USAGE
struct PVShared PVShared
#define PARALLEL_VACUUM_KEY_DEAD_ITEMS
void EnterParallelMode(void)
Definition: xact.c:1032

References PVShared::active_nworkers, IndexAmRoutine::amparallelvacuumoptions, IndexAmRoutine::amusemaintenanceworkmem, Assert(), ParallelVacuumState::bstrategy, ParallelVacuumState::buffer_usage, PVShared::cost_balance, CreateParallelContext(), ParallelVacuumState::dead_items, debug_query_string, PVShared::elevel, EnterParallelMode(), ParallelContext::estimator, i, PVShared::idx, ParallelVacuumState::indrels, ParallelVacuumState::indstats, InitializeParallelDSM(), VacDeadItems::items, maintenance_work_mem, PVShared::maintenance_work_mem_worker, VacDeadItems::max_items, MemSet, Min, mul_size(), ParallelVacuumState::nindexes, ParallelVacuumState::nindexes_parallel_bulkdel, ParallelVacuumState::nindexes_parallel_cleanup, ParallelVacuumState::nindexes_parallel_condcleanup, VacDeadItems::num_items, ParallelContext::nworkers, palloc0(), parallel_vacuum_compute_workers(), PARALLEL_VACUUM_KEY_BUFFER_USAGE, PARALLEL_VACUUM_KEY_DEAD_ITEMS, PARALLEL_VACUUM_KEY_INDEX_STATS, PARALLEL_VACUUM_KEY_QUERY_TEXT, PARALLEL_VACUUM_KEY_SHARED, PARALLEL_VACUUM_KEY_WAL_USAGE, ParallelVacuumState::pcxt, pfree(), pg_atomic_init_u32(), RelationData::rd_indam, RelationGetRelid, PVShared::relid, ParallelVacuumState::shared, shm_toc_allocate(), shm_toc_estimate_chunk, shm_toc_estimate_keys, shm_toc_insert(), ParallelContext::toc, vac_max_items_to_alloc_size(), VACUUM_OPTION_MAX_VALID_VALUE, VACUUM_OPTION_PARALLEL_BULKDEL, VACUUM_OPTION_PARALLEL_CLEANUP, VACUUM_OPTION_PARALLEL_COND_CLEANUP, ParallelVacuumState::wal_usage, and ParallelVacuumState::will_parallel_vacuum.

Referenced by dead_items_alloc().

◆ parallel_vacuum_main()

void parallel_vacuum_main ( dsm_segment seg,
shm_toc toc 
)

Definition at line 935 of file vacuumparallel.c.

936 {
938  Relation rel;
939  Relation *indrels;
940  PVIndStats *indstats;
941  PVShared *shared;
942  VacDeadItems *dead_items;
943  BufferUsage *buffer_usage;
944  WalUsage *wal_usage;
945  int nindexes;
946  char *sharedquery;
947  ErrorContextCallback errcallback;
948 
949  /*
950  * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
951  * don't support parallel vacuum for autovacuum as of now.
952  */
954 
955  elog(DEBUG1, "starting parallel vacuum worker");
956 
957  shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
958 
959  /* Set debug_query_string for individual workers */
960  sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
961  debug_query_string = sharedquery;
963 
964  /*
965  * Open table. The lock mode is the same as the leader process. It's
966  * okay because the lock mode does not conflict among the parallel
967  * workers.
968  */
969  rel = table_open(shared->relid, ShareUpdateExclusiveLock);
970 
971  /*
972  * Open all indexes. indrels are sorted in order by OID, which should be
973  * matched to the leader's one.
974  */
975  vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
976  Assert(nindexes > 0);
977 
978  if (shared->maintenance_work_mem_worker > 0)
980 
981  /* Set index statistics */
982  indstats = (PVIndStats *) shm_toc_lookup(toc,
984  false);
985 
986  /* Set dead_items space */
987  dead_items = (VacDeadItems *) shm_toc_lookup(toc,
989  false);
990 
991  /* Set cost-based vacuum delay */
993  VacuumCostBalance = 0;
994  VacuumPageHit = 0;
995  VacuumPageMiss = 0;
996  VacuumPageDirty = 0;
1000 
1001  /* Set parallel vacuum state */
1002  pvs.indrels = indrels;
1003  pvs.nindexes = nindexes;
1004  pvs.indstats = indstats;
1005  pvs.shared = shared;
1006  pvs.dead_items = dead_items;
1009 
1010  /* These fields will be filled during index vacuum or cleanup */
1011  pvs.indname = NULL;
1013 
1014  /* Each parallel VACUUM worker gets its own access strategy */
1016 
1017  /* Setup error traceback support for ereport() */
1019  errcallback.arg = &pvs;
1020  errcallback.previous = error_context_stack;
1021  error_context_stack = &errcallback;
1022 
1023  /* Prepare to track buffer usage during parallel execution */
1025 
1026  /* Process indexes to perform vacuum/cleanup */
1028 
1029  /* Report buffer/WAL usage during parallel execution */
1030  buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
1031  wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
1033  &wal_usage[ParallelWorkerNumber]);
1034 
1035  /* Pop the error context stack */
1036  error_context_stack = errcallback.previous;
1037 
1038  vac_close_indexes(nindexes, indrels, RowExclusiveLock);
1041 }
int ParallelWorkerNumber
Definition: parallel.c:112
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
@ BAS_VACUUM
Definition: bufmgr.h:33
ErrorContextCallback * error_context_stack
Definition: elog.c:93
#define DEBUG1
Definition: elog.h:24
#define elog(elevel,...)
Definition: elog.h:218
BufferAccessStrategy GetAccessStrategy(BufferAccessStrategyType btype)
Definition: freelist.c:541
void FreeAccessStrategy(BufferAccessStrategy strategy)
Definition: freelist.c:596
int64 VacuumPageHit
Definition: globals.c:148
int64 VacuumPageMiss
Definition: globals.c:149
bool VacuumCostActive
Definition: globals.c:153
int64 VacuumPageDirty
Definition: globals.c:150
int VacuumCostBalance
Definition: globals.c:152
double VacuumCostDelay
Definition: globals.c:146
void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition: instrument.c:208
void InstrStartParallelQuery(void)
Definition: instrument.c:200
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3326
char * pstrdup(const char *in)
Definition: mcxt.c:1305
#define PROC_IN_VACUUM
Definition: proc.h:55
#define RelationGetRelationName(relation)
Definition: rel.h:523
#define RelationGetNamespace(relation)
Definition: rel.h:530
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
PGPROC * MyProc
Definition: proc.c:68
struct ErrorContextCallback * previous
Definition: elog.h:232
void(* callback)(void *arg)
Definition: elog.h:233
uint8 statusFlags
Definition: proc.h:228
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
pg_atomic_uint32 * VacuumActiveNWorkers
Definition: vacuum.c:84
void vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel)
Definition: vacuum.c:2143
int VacuumCostBalanceLocal
Definition: vacuum.c:85
void vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
Definition: vacuum.c:2186
pg_atomic_uint32 * VacuumSharedCostBalance
Definition: vacuum.c:83
static void parallel_vacuum_error_callback(void *arg)
static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)

References PVShared::active_nworkers, ErrorContextCallback::arg, Assert(), BAS_VACUUM, ParallelVacuumState::bstrategy, ErrorContextCallback::callback, PVShared::cost_balance, ParallelVacuumState::dead_items, DEBUG1, debug_query_string, elog, error_context_stack, FreeAccessStrategy(), get_namespace_name(), GetAccessStrategy(), ParallelVacuumState::indname, ParallelVacuumState::indrels, ParallelVacuumState::indstats, InstrEndParallelQuery(), InstrStartParallelQuery(), maintenance_work_mem, PVShared::maintenance_work_mem_worker, MyProc, ParallelVacuumState::nindexes, PARALLEL_INDVAC_STATUS_INITIAL, parallel_vacuum_error_callback(), PARALLEL_VACUUM_KEY_BUFFER_USAGE, PARALLEL_VACUUM_KEY_DEAD_ITEMS, PARALLEL_VACUUM_KEY_INDEX_STATS, PARALLEL_VACUUM_KEY_QUERY_TEXT, PARALLEL_VACUUM_KEY_SHARED, PARALLEL_VACUUM_KEY_WAL_USAGE, parallel_vacuum_process_safe_indexes(), ParallelWorkerNumber, pgstat_report_activity(), ErrorContextCallback::previous, PROC_IN_VACUUM, pstrdup(), RelationGetNamespace, RelationGetRelationName, PVShared::relid, ParallelVacuumState::relname, ParallelVacuumState::relnamespace, RowExclusiveLock, ParallelVacuumState::shared, ShareUpdateExclusiveLock, shm_toc_lookup(), STATE_RUNNING, ParallelVacuumState::status, PGPROC::statusFlags, table_close(), table_open(), vac_close_indexes(), vac_open_indexes(), VacuumActiveNWorkers, VacuumCostActive, VacuumCostBalance, VacuumCostBalanceLocal, VacuumCostDelay, VacuumPageDirty, VacuumPageHit, VacuumPageMiss, and VacuumSharedCostBalance.

◆ parallel_vacuum_process_all_indexes()

static void parallel_vacuum_process_all_indexes ( ParallelVacuumState pvs,
int  num_index_scans,
bool  vacuum 
)
static

Definition at line 565 of file vacuumparallel.c.

567 {
568  int nworkers;
569  PVIndVacStatus new_status;
570 
572 
573  if (vacuum)
574  {
576 
577  /* Determine the number of parallel workers to launch */
578  nworkers = pvs->nindexes_parallel_bulkdel;
579  }
580  else
581  {
583 
584  /* Determine the number of parallel workers to launch */
585  nworkers = pvs->nindexes_parallel_cleanup;
586 
587  /* Add conditionally parallel-aware indexes if in the first time call */
588  if (num_index_scans == 0)
589  nworkers += pvs->nindexes_parallel_condcleanup;
590  }
591 
592  /* The leader process will participate */
593  nworkers--;
594 
595  /*
596  * It is possible that parallel context is initialized with fewer workers
597  * than the number of indexes that need a separate worker in the current
598  * phase, so we need to consider it. See
599  * parallel_vacuum_compute_workers().
600  */
601  nworkers = Min(nworkers, pvs->pcxt->nworkers);
602 
603  /*
604  * Set index vacuum status and mark whether parallel vacuum worker can
605  * process it.
606  */
607  for (int i = 0; i < pvs->nindexes; i++)
608  {
609  PVIndStats *indstats = &(pvs->indstats[i]);
610 
612  indstats->status = new_status;
613  indstats->parallel_workers_can_process =
614  (pvs->will_parallel_vacuum[i] &
616  num_index_scans,
617  vacuum));
618  }
619 
620  /* Reset the parallel index processing counter */
621  pg_atomic_write_u32(&(pvs->shared->idx), 0);
622 
623  /* Setup the shared cost-based vacuum delay and launch workers */
624  if (nworkers > 0)
625  {
626  /* Reinitialize parallel context to relaunch parallel workers */
627  if (num_index_scans > 0)
629 
630  /*
631  * Set up shared cost balance and the number of active workers for
632  * vacuum delay. We need to do this before launching workers as
633  * otherwise, they might not see the updated values for these
634  * parameters.
635  */
638 
639  /*
640  * The number of workers can vary between bulkdelete and cleanup
641  * phase.
642  */
643  ReinitializeParallelWorkers(pvs->pcxt, nworkers);
644 
646 
647  if (pvs->pcxt->nworkers_launched > 0)
648  {
649  /*
650  * Reset the local cost values for leader backend as we have
651  * already accumulated the remaining balance of heap.
652  */
653  VacuumCostBalance = 0;
655 
656  /* Enable shared cost balance for leader backend */
659  }
660 
661  if (vacuum)
662  ereport(pvs->shared->elevel,
663  (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
664  "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
665  pvs->pcxt->nworkers_launched),
666  pvs->pcxt->nworkers_launched, nworkers)));
667  else
668  ereport(pvs->shared->elevel,
669  (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
670  "launched %d parallel vacuum workers for index cleanup (planned: %d)",
671  pvs->pcxt->nworkers_launched),
672  pvs->pcxt->nworkers_launched, nworkers)));
673  }
674 
675  /* Vacuum the indexes that can be processed by only leader process */
677 
678  /*
679  * Join as a parallel worker. The leader vacuums alone processes all
680  * parallel-safe indexes in the case where no workers are launched.
681  */
683 
684  /*
685  * Next, accumulate buffer and WAL usage. (This must wait for the workers
686  * to finish, or we might get incomplete data.)
687  */
688  if (nworkers > 0)
689  {
690  /* Wait for all vacuum workers to finish */
692 
693  for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
695  }
696 
697  /*
698  * Reset all index status back to initial (while checking that we have
699  * vacuumed all indexes).
700  */
701  for (int i = 0; i < pvs->nindexes; i++)
702  {
703  PVIndStats *indstats = &(pvs->indstats[i]);
704 
705  if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
706  elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
708 
710  }
711 
712  /*
713  * Carry the shared balance value to heap scan and disable shared costing
714  */
716  {
719  VacuumActiveNWorkers = NULL;
720  }
721 }
static void pg_atomic_write_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:258
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition: atomics.h:241
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
Definition: parallel.c:762
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition: parallel.c:539
void ReinitializeParallelDSM(ParallelContext *pcxt)
Definition: parallel.c:475
void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
Definition: parallel.c:525
#define ngettext(s, p, n)
Definition: c.h:1179
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define ERROR
Definition: elog.h:33
#define ereport(elevel,...)
Definition: elog.h:143
void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition: instrument.c:218
bool parallel_workers_can_process
PVIndVacStatus status
int nworkers_launched
Definition: parallel.h:38
static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, bool vacuum)
static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)

References PVShared::active_nworkers, Assert(), ParallelVacuumState::buffer_usage, PVShared::cost_balance, PVShared::elevel, elog, ereport, errmsg(), ERROR, i, PVShared::idx, ParallelVacuumState::indrels, ParallelVacuumState::indstats, InstrAccumParallelQuery(), IsParallelWorker, LaunchParallelWorkers(), Min, ngettext, ParallelVacuumState::nindexes, ParallelVacuumState::nindexes_parallel_bulkdel, ParallelVacuumState::nindexes_parallel_cleanup, ParallelVacuumState::nindexes_parallel_condcleanup, ParallelContext::nworkers, ParallelContext::nworkers_launched, PARALLEL_INDVAC_STATUS_COMPLETED, PARALLEL_INDVAC_STATUS_INITIAL, PARALLEL_INDVAC_STATUS_NEED_BULKDELETE, PARALLEL_INDVAC_STATUS_NEED_CLEANUP, parallel_vacuum_index_is_parallel_safe(), parallel_vacuum_process_safe_indexes(), parallel_vacuum_process_unsafe_indexes(), PVIndStats::parallel_workers_can_process, ParallelVacuumState::pcxt, pg_atomic_read_u32(), pg_atomic_write_u32(), ReinitializeParallelDSM(), ReinitializeParallelWorkers(), RelationGetRelationName, ParallelVacuumState::shared, PVIndStats::status, vacuum(), VacuumActiveNWorkers, VacuumCostBalance, VacuumCostBalanceLocal, VacuumSharedCostBalance, WaitForParallelWorkersToFinish(), ParallelVacuumState::wal_usage, and ParallelVacuumState::will_parallel_vacuum.

Referenced by parallel_vacuum_bulkdel_all_indexes(), and parallel_vacuum_cleanup_all_indexes().

◆ parallel_vacuum_process_one_index()

static void parallel_vacuum_process_one_index ( ParallelVacuumState pvs,
Relation  indrel,
PVIndStats indstats 
)
static

Definition at line 819 of file vacuumparallel.c.

821 {
822  IndexBulkDeleteResult *istat = NULL;
823  IndexBulkDeleteResult *istat_res;
824  IndexVacuumInfo ivinfo;
825 
826  /*
827  * Update the pointer to the corresponding bulk-deletion result if someone
828  * has already updated it
829  */
830  if (indstats->istat_updated)
831  istat = &(indstats->istat);
832 
833  ivinfo.index = indrel;
834  ivinfo.analyze_only = false;
835  ivinfo.report_progress = false;
836  ivinfo.message_level = DEBUG2;
837  ivinfo.estimated_count = pvs->shared->estimated_count;
838  ivinfo.num_heap_tuples = pvs->shared->reltuples;
839  ivinfo.strategy = pvs->bstrategy;
840 
841  /* Update error traceback information */
842  pvs->indname = pstrdup(RelationGetRelationName(indrel));
843  pvs->status = indstats->status;
844 
845  switch (indstats->status)
846  {
848  istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items);
849  break;
851  istat_res = vac_cleanup_one_index(&ivinfo, istat);
852  break;
853  default:
854  elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
855  indstats->status,
856  RelationGetRelationName(indrel));
857  }
858 
859  /*
860  * Copy the index bulk-deletion result returned from ambulkdelete and
861  * amvacuumcleanup to the DSM segment if it's the first cycle because they
862  * allocate locally and it's possible that an index will be vacuumed by a
863  * different vacuum process the next cycle. Copying the result normally
864  * happens only the first time an index is vacuumed. For any additional
865  * vacuum pass, we directly point to the result on the DSM segment and
866  * pass it to vacuum index APIs so that workers can update it directly.
867  *
868  * Since all vacuum workers write the bulk-deletion result at different
869  * slots we can write them without locking.
870  */
871  if (!indstats->istat_updated && istat_res != NULL)
872  {
873  memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
874  indstats->istat_updated = true;
875 
876  /* Free the locally-allocated bulk-deletion result */
877  pfree(istat_res);
878  }
879 
880  /*
881  * Update the status to completed. No need to lock here since each worker
882  * touches different indexes.
883  */
885 
886  /* Reset error traceback information */
888  pfree(pvs->indname);
889  pvs->indname = NULL;
890 }
#define DEBUG2
Definition: elog.h:23
Relation index
Definition: genam.h:46
double num_heap_tuples
Definition: genam.h:51
bool analyze_only
Definition: genam.h:47
BufferAccessStrategy strategy
Definition: genam.h:52
bool report_progress
Definition: genam.h:48
int message_level
Definition: genam.h:50
bool estimated_count
Definition: genam.h:49
IndexBulkDeleteResult * vac_bulkdel_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat, VacDeadItems *dead_items)
Definition: vacuum.c:2326
IndexBulkDeleteResult * vac_cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat)
Definition: vacuum.c:2347

References IndexVacuumInfo::analyze_only, ParallelVacuumState::bstrategy, ParallelVacuumState::dead_items, DEBUG2, elog, ERROR, PVShared::estimated_count, IndexVacuumInfo::estimated_count, IndexVacuumInfo::index, ParallelVacuumState::indname, PVIndStats::istat, PVIndStats::istat_updated, IndexVacuumInfo::message_level, IndexVacuumInfo::num_heap_tuples, PARALLEL_INDVAC_STATUS_COMPLETED, PARALLEL_INDVAC_STATUS_NEED_BULKDELETE, PARALLEL_INDVAC_STATUS_NEED_CLEANUP, pfree(), pstrdup(), RelationGetRelationName, PVShared::reltuples, IndexVacuumInfo::report_progress, ParallelVacuumState::shared, PVIndStats::status, ParallelVacuumState::status, IndexVacuumInfo::strategy, vac_bulkdel_one_index(), and vac_cleanup_one_index().

Referenced by parallel_vacuum_process_safe_indexes(), and parallel_vacuum_process_unsafe_indexes().

◆ parallel_vacuum_process_safe_indexes()

static void parallel_vacuum_process_safe_indexes ( ParallelVacuumState pvs)
static

Definition at line 728 of file vacuumparallel.c.

729 {
730  /*
731  * Increment the active worker count if we are able to launch any worker.
732  */
735 
736  /* Loop until all indexes are vacuumed */
737  for (;;)
738  {
739  int idx;
740  PVIndStats *indstats;
741 
742  /* Get an index number to process */
743  idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
744 
745  /* Done for all indexes? */
746  if (idx >= pvs->nindexes)
747  break;
748 
749  indstats = &(pvs->indstats[idx]);
750 
751  /*
752  * Skip vacuuming index that is unsafe for workers or has an
753  * unsuitable target for parallel index vacuum (this is vacuumed in
754  * parallel_vacuum_process_unsafe_indexes() by the leader).
755  */
756  if (!indstats->parallel_workers_can_process)
757  continue;
758 
759  /* Do vacuum or cleanup of the index */
760  parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
761  }
762 
763  /*
764  * We have completed the index vacuum so decrement the active worker
765  * count.
766  */
769 }
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:259
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition: atomics.h:401
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:328
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:386
static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, PVIndStats *indstats)

References idx(), PVShared::idx, ParallelVacuumState::indrels, ParallelVacuumState::indstats, ParallelVacuumState::nindexes, parallel_vacuum_process_one_index(), PVIndStats::parallel_workers_can_process, pg_atomic_add_fetch_u32(), pg_atomic_fetch_add_u32(), pg_atomic_sub_fetch_u32(), ParallelVacuumState::shared, and VacuumActiveNWorkers.

Referenced by parallel_vacuum_main(), and parallel_vacuum_process_all_indexes().

◆ parallel_vacuum_process_unsafe_indexes()

static void parallel_vacuum_process_unsafe_indexes ( ParallelVacuumState pvs)
static

Definition at line 782 of file vacuumparallel.c.

783 {
785 
786  /*
787  * Increment the active worker count if we are able to launch any worker.
788  */
791 
792  for (int i = 0; i < pvs->nindexes; i++)
793  {
794  PVIndStats *indstats = &(pvs->indstats[i]);
795 
796  /* Skip, indexes that are safe for workers */
797  if (indstats->parallel_workers_can_process)
798  continue;
799 
800  /* Do vacuum or cleanup of the index */
801  parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
802  }
803 
804  /*
805  * We have completed the index vacuum so decrement the active worker
806  * count.
807  */
810 }

References Assert(), i, ParallelVacuumState::indrels, ParallelVacuumState::indstats, IsParallelWorker, ParallelVacuumState::nindexes, parallel_vacuum_process_one_index(), PVIndStats::parallel_workers_can_process, pg_atomic_add_fetch_u32(), pg_atomic_sub_fetch_u32(), and VacuumActiveNWorkers.

Referenced by parallel_vacuum_process_all_indexes().