PostgreSQL Source Code  git master
vacuumparallel.c File Reference
#include "postgres.h"
#include "access/amapi.h"
#include "access/table.h"
#include "access/xact.h"
#include "commands/progress.h"
#include "commands/vacuum.h"
#include "executor/instrument.h"
#include "optimizer/paths.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
Include dependency graph for vacuumparallel.c:

Go to the source code of this file.

Data Structures

struct  PVShared
 
struct  PVIndStats
 
struct  ParallelVacuumState
 

Macros

#define PARALLEL_VACUUM_KEY_SHARED   1
 
#define PARALLEL_VACUUM_KEY_QUERY_TEXT   2
 
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE   3
 
#define PARALLEL_VACUUM_KEY_WAL_USAGE   4
 
#define PARALLEL_VACUUM_KEY_INDEX_STATS   5
 

Typedefs

typedef struct PVShared PVShared
 
typedef enum PVIndVacStatus PVIndVacStatus
 
typedef struct PVIndStats PVIndStats
 

Enumerations

enum  PVIndVacStatus { PARALLEL_INDVAC_STATUS_INITIAL = 0 , PARALLEL_INDVAC_STATUS_NEED_BULKDELETE , PARALLEL_INDVAC_STATUS_NEED_CLEANUP , PARALLEL_INDVAC_STATUS_COMPLETED }
 

Functions

static int parallel_vacuum_compute_workers (Relation *indrels, int nindexes, int nrequested, bool *will_parallel_vacuum)
 
static void parallel_vacuum_process_all_indexes (ParallelVacuumState *pvs, int num_index_scans, bool vacuum)
 
static void parallel_vacuum_process_safe_indexes (ParallelVacuumState *pvs)
 
static void parallel_vacuum_process_unsafe_indexes (ParallelVacuumState *pvs)
 
static void parallel_vacuum_process_one_index (ParallelVacuumState *pvs, Relation indrel, PVIndStats *indstats)
 
static bool parallel_vacuum_index_is_parallel_safe (Relation indrel, int num_index_scans, bool vacuum)
 
static void parallel_vacuum_error_callback (void *arg)
 
ParallelVacuumStateparallel_vacuum_init (Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int vac_work_mem, int elevel, BufferAccessStrategy bstrategy)
 
void parallel_vacuum_end (ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
 
TidStoreparallel_vacuum_get_dead_items (ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
 
void parallel_vacuum_reset_dead_items (ParallelVacuumState *pvs)
 
void parallel_vacuum_bulkdel_all_indexes (ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans)
 
void parallel_vacuum_cleanup_all_indexes (ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, bool estimated_count)
 
void parallel_vacuum_main (dsm_segment *seg, shm_toc *toc)
 

Macro Definition Documentation

◆ PARALLEL_VACUUM_KEY_BUFFER_USAGE

#define PARALLEL_VACUUM_KEY_BUFFER_USAGE   3

Definition at line 49 of file vacuumparallel.c.

◆ PARALLEL_VACUUM_KEY_INDEX_STATS

#define PARALLEL_VACUUM_KEY_INDEX_STATS   5

Definition at line 51 of file vacuumparallel.c.

◆ PARALLEL_VACUUM_KEY_QUERY_TEXT

#define PARALLEL_VACUUM_KEY_QUERY_TEXT   2

Definition at line 48 of file vacuumparallel.c.

◆ PARALLEL_VACUUM_KEY_SHARED

#define PARALLEL_VACUUM_KEY_SHARED   1

Definition at line 47 of file vacuumparallel.c.

◆ PARALLEL_VACUUM_KEY_WAL_USAGE

#define PARALLEL_VACUUM_KEY_WAL_USAGE   4

Definition at line 50 of file vacuumparallel.c.

Typedef Documentation

◆ PVIndStats

typedef struct PVIndStats PVIndStats

◆ PVIndVacStatus

◆ PVShared

typedef struct PVShared PVShared

Enumeration Type Documentation

◆ PVIndVacStatus

Enumerator
PARALLEL_INDVAC_STATUS_INITIAL 
PARALLEL_INDVAC_STATUS_NEED_BULKDELETE 
PARALLEL_INDVAC_STATUS_NEED_CLEANUP 
PARALLEL_INDVAC_STATUS_COMPLETED 

Definition at line 125 of file vacuumparallel.c.

126 {
PVIndVacStatus
@ PARALLEL_INDVAC_STATUS_NEED_CLEANUP
@ PARALLEL_INDVAC_STATUS_INITIAL
@ PARALLEL_INDVAC_STATUS_NEED_BULKDELETE
@ PARALLEL_INDVAC_STATUS_COMPLETED

Function Documentation

◆ parallel_vacuum_bulkdel_all_indexes()

void parallel_vacuum_bulkdel_all_indexes ( ParallelVacuumState pvs,
long  num_table_tuples,
int  num_index_scans 
)

Definition at line 501 of file vacuumparallel.c.

503 {
505 
506  /*
507  * We can only provide an approximate value of num_heap_tuples, at least
508  * for now.
509  */
510  pvs->shared->reltuples = num_table_tuples;
511  pvs->shared->estimated_count = true;
512 
513  parallel_vacuum_process_all_indexes(pvs, num_index_scans, true);
514 }
#define Assert(condition)
Definition: c.h:861
#define IsParallelWorker()
Definition: parallel.h:60
double reltuples
bool estimated_count
static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, bool vacuum)

References Assert, PVShared::estimated_count, IsParallelWorker, parallel_vacuum_process_all_indexes(), PVShared::reltuples, and ParallelVacuumState::shared.

Referenced by lazy_vacuum_all_indexes().

◆ parallel_vacuum_cleanup_all_indexes()

void parallel_vacuum_cleanup_all_indexes ( ParallelVacuumState pvs,
long  num_table_tuples,
int  num_index_scans,
bool  estimated_count 
)

Definition at line 520 of file vacuumparallel.c.

522 {
524 
525  /*
526  * We can provide a better estimate of total number of surviving tuples
527  * (we assume indexes are more interested in that than in the number of
528  * nominally live tuples).
529  */
530  pvs->shared->reltuples = num_table_tuples;
531  pvs->shared->estimated_count = estimated_count;
532 
533  parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
534 }

References Assert, PVShared::estimated_count, IsParallelWorker, parallel_vacuum_process_all_indexes(), PVShared::reltuples, and ParallelVacuumState::shared.

Referenced by lazy_cleanup_all_indexes().

◆ parallel_vacuum_compute_workers()

static int parallel_vacuum_compute_workers ( Relation indrels,
int  nindexes,
int  nrequested,
bool will_parallel_vacuum 
)
static

Definition at line 550 of file vacuumparallel.c.

552 {
553  int nindexes_parallel = 0;
554  int nindexes_parallel_bulkdel = 0;
555  int nindexes_parallel_cleanup = 0;
556  int parallel_workers;
557 
558  /*
559  * We don't allow performing parallel operation in standalone backend or
560  * when parallelism is disabled.
561  */
563  return 0;
564 
565  /*
566  * Compute the number of indexes that can participate in parallel vacuum.
567  */
568  for (int i = 0; i < nindexes; i++)
569  {
570  Relation indrel = indrels[i];
571  uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
572 
573  /* Skip index that is not a suitable target for parallel index vacuum */
574  if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
576  continue;
577 
578  will_parallel_vacuum[i] = true;
579 
580  if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
581  nindexes_parallel_bulkdel++;
582  if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
583  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
584  nindexes_parallel_cleanup++;
585  }
586 
587  nindexes_parallel = Max(nindexes_parallel_bulkdel,
588  nindexes_parallel_cleanup);
589 
590  /* The leader process takes one index */
591  nindexes_parallel--;
592 
593  /* No index supports parallel vacuum */
594  if (nindexes_parallel <= 0)
595  return 0;
596 
597  /* Compute the parallel degree */
598  parallel_workers = (nrequested > 0) ?
599  Min(nrequested, nindexes_parallel) : nindexes_parallel;
600 
601  /* Cap by max_parallel_maintenance_workers */
602  parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
603 
604  return parallel_workers;
605 }
int min_parallel_index_scan_size
Definition: allpaths.c:82
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:273
#define Min(x, y)
Definition: c.h:1007
#define Max(x, y)
Definition: c.h:1001
unsigned char uint8
Definition: c.h:507
int max_parallel_maintenance_workers
Definition: globals.c:133
bool IsUnderPostmaster
Definition: globals.c:119
int i
Definition: isn.c:73
uint8 amparallelvacuumoptions
Definition: amapi.h:263
struct IndexAmRoutine * rd_indam
Definition: rel.h:206
#define VACUUM_OPTION_PARALLEL_CLEANUP
Definition: vacuum.h:63
#define VACUUM_OPTION_NO_PARALLEL
Definition: vacuum.h:42
#define VACUUM_OPTION_PARALLEL_BULKDEL
Definition: vacuum.h:48
#define VACUUM_OPTION_PARALLEL_COND_CLEANUP
Definition: vacuum.h:55

References IndexAmRoutine::amparallelvacuumoptions, i, IsUnderPostmaster, Max, max_parallel_maintenance_workers, Min, min_parallel_index_scan_size, RelationData::rd_indam, RelationGetNumberOfBlocks, VACUUM_OPTION_NO_PARALLEL, VACUUM_OPTION_PARALLEL_BULKDEL, VACUUM_OPTION_PARALLEL_CLEANUP, and VACUUM_OPTION_PARALLEL_COND_CLEANUP.

Referenced by parallel_vacuum_init().

◆ parallel_vacuum_end()

void parallel_vacuum_end ( ParallelVacuumState pvs,
IndexBulkDeleteResult **  istats 
)

Definition at line 436 of file vacuumparallel.c.

437 {
439 
440  /* Copy the updated statistics */
441  for (int i = 0; i < pvs->nindexes; i++)
442  {
443  PVIndStats *indstats = &(pvs->indstats[i]);
444 
445  if (indstats->istat_updated)
446  {
447  istats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
448  memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
449  }
450  else
451  istats[i] = NULL;
452  }
453 
455 
458 
460  pfree(pvs);
461 }
void DestroyParallelContext(ParallelContext *pcxt)
Definition: parallel.c:929
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
bool istat_updated
IndexBulkDeleteResult istat
ParallelContext * pcxt
PVIndStats * indstats
void TidStoreDestroy(TidStore *ts)
Definition: tidstore.c:325
void ExitParallelMode(void)
Definition: xact.c:1063

References Assert, ParallelVacuumState::dead_items, DestroyParallelContext(), ExitParallelMode(), i, ParallelVacuumState::indstats, IsParallelWorker, PVIndStats::istat, PVIndStats::istat_updated, ParallelVacuumState::nindexes, palloc0(), ParallelVacuumState::pcxt, pfree(), TidStoreDestroy(), and ParallelVacuumState::will_parallel_vacuum.

Referenced by dead_items_cleanup().

◆ parallel_vacuum_error_callback()

static void parallel_vacuum_error_callback ( void *  arg)
static

Definition at line 1108 of file vacuumparallel.c.

1109 {
1110  ParallelVacuumState *errinfo = arg;
1111 
1112  switch (errinfo->status)
1113  {
1115  errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
1116  errinfo->indname,
1117  errinfo->relnamespace,
1118  errinfo->relname);
1119  break;
1121  errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
1122  errinfo->indname,
1123  errinfo->relnamespace,
1124  errinfo->relname);
1125  break;
1128  default:
1129  return;
1130  }
1131 }
#define errcontext
Definition: elog.h:196
void * arg
PVIndVacStatus status

References arg, errcontext, ParallelVacuumState::indname, PARALLEL_INDVAC_STATUS_COMPLETED, PARALLEL_INDVAC_STATUS_INITIAL, PARALLEL_INDVAC_STATUS_NEED_BULKDELETE, PARALLEL_INDVAC_STATUS_NEED_CLEANUP, ParallelVacuumState::relname, ParallelVacuumState::relnamespace, and ParallelVacuumState::status.

Referenced by parallel_vacuum_main().

◆ parallel_vacuum_get_dead_items()

TidStore* parallel_vacuum_get_dead_items ( ParallelVacuumState pvs,
VacDeadItemsInfo **  dead_items_info_p 
)

Definition at line 467 of file vacuumparallel.c.

468 {
469  *dead_items_info_p = &(pvs->shared->dead_items_info);
470  return pvs->dead_items;
471 }
VacDeadItemsInfo dead_items_info

References ParallelVacuumState::dead_items, PVShared::dead_items_info, and ParallelVacuumState::shared.

Referenced by dead_items_alloc().

◆ parallel_vacuum_index_is_parallel_safe()

static bool parallel_vacuum_index_is_parallel_safe ( Relation  indrel,
int  num_index_scans,
bool  vacuum 
)
static

Definition at line 952 of file vacuumparallel.c.

954 {
955  uint8 vacoptions;
956 
957  vacoptions = indrel->rd_indam->amparallelvacuumoptions;
958 
959  /* In parallel vacuum case, check if it supports parallel bulk-deletion */
960  if (vacuum)
961  return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
962 
963  /* Not safe, if the index does not support parallel cleanup */
964  if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
965  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
966  return false;
967 
968  /*
969  * Not safe, if the index supports parallel cleanup conditionally, but we
970  * have already processed the index (for bulkdelete). We do this to avoid
971  * the need to invoke workers when parallel index cleanup doesn't need to
972  * scan the index. See the comments for option
973  * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
974  * parallel cleanup conditionally.
975  */
976  if (num_index_scans > 0 &&
977  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
978  return false;
979 
980  return true;
981 }
void vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, MemoryContext vac_context, bool isTopLevel)
Definition: vacuum.c:478

References IndexAmRoutine::amparallelvacuumoptions, RelationData::rd_indam, vacuum(), VACUUM_OPTION_PARALLEL_BULKDEL, VACUUM_OPTION_PARALLEL_CLEANUP, and VACUUM_OPTION_PARALLEL_COND_CLEANUP.

Referenced by parallel_vacuum_process_all_indexes().

◆ parallel_vacuum_init()

ParallelVacuumState* parallel_vacuum_init ( Relation  rel,
Relation indrels,
int  nindexes,
int  nrequested_workers,
int  vac_work_mem,
int  elevel,
BufferAccessStrategy  bstrategy 
)

Definition at line 243 of file vacuumparallel.c.

246 {
247  ParallelVacuumState *pvs;
248  ParallelContext *pcxt;
249  PVShared *shared;
250  TidStore *dead_items;
251  PVIndStats *indstats;
252  BufferUsage *buffer_usage;
253  WalUsage *wal_usage;
254  bool *will_parallel_vacuum;
255  Size est_indstats_len;
256  Size est_shared_len;
257  int nindexes_mwm = 0;
258  int parallel_workers = 0;
259  int querylen;
260 
261  /*
262  * A parallel vacuum must be requested and there must be indexes on the
263  * relation
264  */
265  Assert(nrequested_workers >= 0);
266  Assert(nindexes > 0);
267 
268  /*
269  * Compute the number of parallel vacuum workers to launch
270  */
271  will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
272  parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
273  nrequested_workers,
274  will_parallel_vacuum);
275  if (parallel_workers <= 0)
276  {
277  /* Can't perform vacuum in parallel -- return NULL */
278  pfree(will_parallel_vacuum);
279  return NULL;
280  }
281 
283  pvs->indrels = indrels;
284  pvs->nindexes = nindexes;
285  pvs->will_parallel_vacuum = will_parallel_vacuum;
286  pvs->bstrategy = bstrategy;
287  pvs->heaprel = rel;
288 
290  pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
291  parallel_workers);
292  Assert(pcxt->nworkers > 0);
293  pvs->pcxt = pcxt;
294 
295  /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
296  est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
297  shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
298  shm_toc_estimate_keys(&pcxt->estimator, 1);
299 
300  /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
301  est_shared_len = sizeof(PVShared);
302  shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
303  shm_toc_estimate_keys(&pcxt->estimator, 1);
304 
305  /*
306  * Estimate space for BufferUsage and WalUsage --
307  * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
308  *
309  * If there are no extensions loaded that care, we could skip this. We
310  * have no way of knowing whether anyone's looking at pgBufferUsage or
311  * pgWalUsage, so do it unconditionally.
312  */
314  mul_size(sizeof(BufferUsage), pcxt->nworkers));
315  shm_toc_estimate_keys(&pcxt->estimator, 1);
317  mul_size(sizeof(WalUsage), pcxt->nworkers));
318  shm_toc_estimate_keys(&pcxt->estimator, 1);
319 
320  /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
321  if (debug_query_string)
322  {
323  querylen = strlen(debug_query_string);
324  shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
325  shm_toc_estimate_keys(&pcxt->estimator, 1);
326  }
327  else
328  querylen = 0; /* keep compiler quiet */
329 
330  InitializeParallelDSM(pcxt);
331 
332  /* Prepare index vacuum stats */
333  indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
334  MemSet(indstats, 0, est_indstats_len);
335  for (int i = 0; i < nindexes; i++)
336  {
337  Relation indrel = indrels[i];
338  uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
339 
340  /*
341  * Cleanup option should be either disabled, always performing in
342  * parallel or conditionally performing in parallel.
343  */
344  Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
345  ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
346  Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
347 
348  if (!will_parallel_vacuum[i])
349  continue;
350 
351  if (indrel->rd_indam->amusemaintenanceworkmem)
352  nindexes_mwm++;
353 
354  /*
355  * Remember the number of indexes that support parallel operation for
356  * each phase.
357  */
358  if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
360  if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
362  if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
364  }
366  pvs->indstats = indstats;
367 
368  /* Prepare shared information */
369  shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
370  MemSet(shared, 0, est_shared_len);
371  shared->relid = RelationGetRelid(rel);
372  shared->elevel = elevel;
373  shared->queryid = pgstat_get_my_query_id();
375  (nindexes_mwm > 0) ?
376  maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
378  shared->dead_items_info.max_bytes = vac_work_mem * 1024L;
379 
380  /* Prepare DSA space for dead items */
381  dead_items = TidStoreCreateShared(shared->dead_items_info.max_bytes,
383  pvs->dead_items = dead_items;
384  shared->dead_items_handle = TidStoreGetHandle(dead_items);
385  shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items));
386 
387  /* Use the same buffer size for all workers */
388  shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
389 
390  pg_atomic_init_u32(&(shared->cost_balance), 0);
391  pg_atomic_init_u32(&(shared->active_nworkers), 0);
392  pg_atomic_init_u32(&(shared->idx), 0);
393 
395  pvs->shared = shared;
396 
397  /*
398  * Allocate space for each worker's BufferUsage and WalUsage; no need to
399  * initialize
400  */
401  buffer_usage = shm_toc_allocate(pcxt->toc,
402  mul_size(sizeof(BufferUsage), pcxt->nworkers));
404  pvs->buffer_usage = buffer_usage;
405  wal_usage = shm_toc_allocate(pcxt->toc,
406  mul_size(sizeof(WalUsage), pcxt->nworkers));
408  pvs->wal_usage = wal_usage;
409 
410  /* Store query string for workers */
411  if (debug_query_string)
412  {
413  char *sharedquery;
414 
415  sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
416  memcpy(sharedquery, debug_query_string, querylen + 1);
417  sharedquery[querylen] = '\0';
418  shm_toc_insert(pcxt->toc,
419  PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
420  }
421 
422  /* Success -- return parallel vacuum state */
423  return pvs;
424 }
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:221
void InitializeParallelDSM(ParallelContext *pcxt)
Definition: parallel.c:205
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
Definition: parallel.c:167
uint64 pgstat_get_my_query_id(void)
#define MemSet(start, val, len)
Definition: c.h:1023
size_t Size
Definition: c.h:608
dsa_handle dsa_get_handle(dsa_area *area)
Definition: dsa.c:498
int GetAccessStrategyBufferCount(BufferAccessStrategy strategy)
Definition: freelist.c:624
int maintenance_work_mem
Definition: globals.c:132
@ LWTRANCHE_PARALLEL_VACUUM_DSA
Definition: lwlock.h:217
const char * debug_query_string
Definition: postgres.c:88
#define RelationGetRelid(relation)
Definition: rel.h:505
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
bool amusemaintenanceworkmem
Definition: amapi.h:259
pg_atomic_uint32 cost_balance
int maintenance_work_mem_worker
pg_atomic_uint32 active_nworkers
dsa_pointer dead_items_handle
dsa_handle dead_items_dsa_handle
uint64 queryid
int ring_nbuffers
pg_atomic_uint32 idx
shm_toc_estimator estimator
Definition: parallel.h:41
shm_toc * toc
Definition: parallel.h:44
BufferAccessStrategy bstrategy
BufferUsage * buffer_usage
size_t max_bytes
Definition: vacuum.h:287
TidStore * TidStoreCreateShared(size_t max_bytes, int tranche_id)
Definition: tidstore.c:210
dsa_area * TidStoreGetDSA(TidStore *ts)
Definition: tidstore.c:552
dsa_pointer TidStoreGetHandle(TidStore *ts)
Definition: tidstore.c:560
#define VACUUM_OPTION_MAX_VALID_VALUE
Definition: vacuum.h:66
static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, bool *will_parallel_vacuum)
#define PARALLEL_VACUUM_KEY_INDEX_STATS
#define PARALLEL_VACUUM_KEY_QUERY_TEXT
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE
#define PARALLEL_VACUUM_KEY_SHARED
#define PARALLEL_VACUUM_KEY_WAL_USAGE
struct PVShared PVShared
void EnterParallelMode(void)
Definition: xact.c:1050

References PVShared::active_nworkers, IndexAmRoutine::amparallelvacuumoptions, IndexAmRoutine::amusemaintenanceworkmem, Assert, ParallelVacuumState::bstrategy, ParallelVacuumState::buffer_usage, PVShared::cost_balance, CreateParallelContext(), ParallelVacuumState::dead_items, PVShared::dead_items_dsa_handle, PVShared::dead_items_handle, PVShared::dead_items_info, debug_query_string, dsa_get_handle(), PVShared::elevel, EnterParallelMode(), ParallelContext::estimator, GetAccessStrategyBufferCount(), ParallelVacuumState::heaprel, i, PVShared::idx, ParallelVacuumState::indrels, ParallelVacuumState::indstats, InitializeParallelDSM(), LWTRANCHE_PARALLEL_VACUUM_DSA, maintenance_work_mem, PVShared::maintenance_work_mem_worker, VacDeadItemsInfo::max_bytes, MemSet, Min, mul_size(), ParallelVacuumState::nindexes, ParallelVacuumState::nindexes_parallel_bulkdel, ParallelVacuumState::nindexes_parallel_cleanup, ParallelVacuumState::nindexes_parallel_condcleanup, ParallelContext::nworkers, palloc0(), parallel_vacuum_compute_workers(), PARALLEL_VACUUM_KEY_BUFFER_USAGE, PARALLEL_VACUUM_KEY_INDEX_STATS, PARALLEL_VACUUM_KEY_QUERY_TEXT, PARALLEL_VACUUM_KEY_SHARED, PARALLEL_VACUUM_KEY_WAL_USAGE, ParallelVacuumState::pcxt, pfree(), pg_atomic_init_u32(), pgstat_get_my_query_id(), PVShared::queryid, RelationData::rd_indam, RelationGetRelid, PVShared::relid, PVShared::ring_nbuffers, ParallelVacuumState::shared, shm_toc_allocate(), shm_toc_estimate_chunk, shm_toc_estimate_keys, shm_toc_insert(), TidStoreCreateShared(), TidStoreGetDSA(), TidStoreGetHandle(), ParallelContext::toc, VACUUM_OPTION_MAX_VALID_VALUE, VACUUM_OPTION_PARALLEL_BULKDEL, VACUUM_OPTION_PARALLEL_CLEANUP, VACUUM_OPTION_PARALLEL_COND_CLEANUP, ParallelVacuumState::wal_usage, and ParallelVacuumState::will_parallel_vacuum.

Referenced by dead_items_alloc().

◆ parallel_vacuum_main()

void parallel_vacuum_main ( dsm_segment seg,
shm_toc toc 
)

Definition at line 990 of file vacuumparallel.c.

991 {
993  Relation rel;
994  Relation *indrels;
995  PVIndStats *indstats;
996  PVShared *shared;
997  TidStore *dead_items;
998  BufferUsage *buffer_usage;
999  WalUsage *wal_usage;
1000  int nindexes;
1001  char *sharedquery;
1002  ErrorContextCallback errcallback;
1003 
1004  /*
1005  * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
1006  * don't support parallel vacuum for autovacuum as of now.
1007  */
1009 
1010  elog(DEBUG1, "starting parallel vacuum worker");
1011 
1012  shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
1013 
1014  /* Set debug_query_string for individual workers */
1015  sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
1016  debug_query_string = sharedquery;
1018 
1019  /* Track query ID */
1020  pgstat_report_query_id(shared->queryid, false);
1021 
1022  /*
1023  * Open table. The lock mode is the same as the leader process. It's
1024  * okay because the lock mode does not conflict among the parallel
1025  * workers.
1026  */
1027  rel = table_open(shared->relid, ShareUpdateExclusiveLock);
1028 
1029  /*
1030  * Open all indexes. indrels are sorted in order by OID, which should be
1031  * matched to the leader's one.
1032  */
1033  vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
1034  Assert(nindexes > 0);
1035 
1036  if (shared->maintenance_work_mem_worker > 0)
1038 
1039  /* Set index statistics */
1040  indstats = (PVIndStats *) shm_toc_lookup(toc,
1042  false);
1043 
1044  /* Find dead_items in shared memory */
1045  dead_items = TidStoreAttach(shared->dead_items_dsa_handle,
1046  shared->dead_items_handle);
1047 
1048  /* Set cost-based vacuum delay */
1050  VacuumCostBalance = 0;
1054 
1055  /* Set parallel vacuum state */
1056  pvs.indrels = indrels;
1057  pvs.nindexes = nindexes;
1058  pvs.indstats = indstats;
1059  pvs.shared = shared;
1060  pvs.dead_items = dead_items;
1063  pvs.heaprel = rel;
1064 
1065  /* These fields will be filled during index vacuum or cleanup */
1066  pvs.indname = NULL;
1068 
1069  /* Each parallel VACUUM worker gets its own access strategy. */
1071  shared->ring_nbuffers * (BLCKSZ / 1024));
1072 
1073  /* Setup error traceback support for ereport() */
1075  errcallback.arg = &pvs;
1076  errcallback.previous = error_context_stack;
1077  error_context_stack = &errcallback;
1078 
1079  /* Prepare to track buffer usage during parallel execution */
1081 
1082  /* Process indexes to perform vacuum/cleanup */
1084 
1085  /* Report buffer/WAL usage during parallel execution */
1086  buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
1087  wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
1089  &wal_usage[ParallelWorkerNumber]);
1090 
1091  TidStoreDetach(dead_items);
1092 
1093  /* Pop the error context stack */
1094  error_context_stack = errcallback.previous;
1095 
1096  vac_close_indexes(nindexes, indrels, RowExclusiveLock);
1099 }
void VacuumUpdateCosts(void)
Definition: autovacuum.c:1635
int ParallelWorkerNumber
Definition: parallel.c:112
void pgstat_report_query_id(uint64 query_id, bool force)
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
@ BAS_VACUUM
Definition: bufmgr.h:39
ErrorContextCallback * error_context_stack
Definition: elog.c:94
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:225
BufferAccessStrategy GetAccessStrategyWithSize(BufferAccessStrategyType btype, int ring_size_kb)
Definition: freelist.c:584
void FreeAccessStrategy(BufferAccessStrategy strategy)
Definition: freelist.c:681
int VacuumCostBalance
Definition: globals.c:156
void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition: instrument.c:208
void InstrStartParallelQuery(void)
Definition: instrument.c:200
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3366
char * pstrdup(const char *in)
Definition: mcxt.c:1696
#define PROC_IN_VACUUM
Definition: proc.h:58
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RelationGetNamespace(relation)
Definition: rel.h:546
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
PGPROC * MyProc
Definition: proc.c:67
struct ErrorContextCallback * previous
Definition: elog.h:296
void(* callback)(void *arg)
Definition: elog.h:297
uint8 statusFlags
Definition: proc.h:242
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
TidStore * TidStoreAttach(dsa_handle area_handle, dsa_pointer handle)
Definition: tidstore.c:252
void TidStoreDetach(TidStore *ts)
Definition: tidstore.c:277
pg_atomic_uint32 * VacuumActiveNWorkers
Definition: vacuum.c:103
void vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel)
Definition: vacuum.c:2298
int VacuumCostBalanceLocal
Definition: vacuum.c:104
void vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
Definition: vacuum.c:2341
pg_atomic_uint32 * VacuumSharedCostBalance
Definition: vacuum.c:102
static void parallel_vacuum_error_callback(void *arg)
static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)

References PVShared::active_nworkers, ErrorContextCallback::arg, Assert, BAS_VACUUM, ParallelVacuumState::bstrategy, ErrorContextCallback::callback, PVShared::cost_balance, ParallelVacuumState::dead_items, PVShared::dead_items_dsa_handle, PVShared::dead_items_handle, DEBUG1, debug_query_string, elog, error_context_stack, FreeAccessStrategy(), get_namespace_name(), GetAccessStrategyWithSize(), ParallelVacuumState::heaprel, ParallelVacuumState::indname, ParallelVacuumState::indrels, ParallelVacuumState::indstats, InstrEndParallelQuery(), InstrStartParallelQuery(), maintenance_work_mem, PVShared::maintenance_work_mem_worker, MyProc, ParallelVacuumState::nindexes, PARALLEL_INDVAC_STATUS_INITIAL, parallel_vacuum_error_callback(), PARALLEL_VACUUM_KEY_BUFFER_USAGE, PARALLEL_VACUUM_KEY_INDEX_STATS, PARALLEL_VACUUM_KEY_QUERY_TEXT, PARALLEL_VACUUM_KEY_SHARED, PARALLEL_VACUUM_KEY_WAL_USAGE, parallel_vacuum_process_safe_indexes(), ParallelWorkerNumber, pgstat_report_activity(), pgstat_report_query_id(), ErrorContextCallback::previous, PROC_IN_VACUUM, pstrdup(), PVShared::queryid, RelationGetNamespace, RelationGetRelationName, PVShared::relid, ParallelVacuumState::relname, ParallelVacuumState::relnamespace, PVShared::ring_nbuffers, RowExclusiveLock, ParallelVacuumState::shared, ShareUpdateExclusiveLock, shm_toc_lookup(), STATE_RUNNING, ParallelVacuumState::status, PGPROC::statusFlags, table_close(), table_open(), TidStoreAttach(), TidStoreDetach(), vac_close_indexes(), vac_open_indexes(), VacuumActiveNWorkers, VacuumCostBalance, VacuumCostBalanceLocal, VacuumSharedCostBalance, and VacuumUpdateCosts().

◆ parallel_vacuum_process_all_indexes()

static void parallel_vacuum_process_all_indexes ( ParallelVacuumState pvs,
int  num_index_scans,
bool  vacuum 
)
static

Definition at line 612 of file vacuumparallel.c.

614 {
615  int nworkers;
616  PVIndVacStatus new_status;
617 
619 
620  if (vacuum)
621  {
623 
624  /* Determine the number of parallel workers to launch */
625  nworkers = pvs->nindexes_parallel_bulkdel;
626  }
627  else
628  {
630 
631  /* Determine the number of parallel workers to launch */
632  nworkers = pvs->nindexes_parallel_cleanup;
633 
634  /* Add conditionally parallel-aware indexes if in the first time call */
635  if (num_index_scans == 0)
636  nworkers += pvs->nindexes_parallel_condcleanup;
637  }
638 
639  /* The leader process will participate */
640  nworkers--;
641 
642  /*
643  * It is possible that parallel context is initialized with fewer workers
644  * than the number of indexes that need a separate worker in the current
645  * phase, so we need to consider it. See
646  * parallel_vacuum_compute_workers().
647  */
648  nworkers = Min(nworkers, pvs->pcxt->nworkers);
649 
650  /*
651  * Set index vacuum status and mark whether parallel vacuum worker can
652  * process it.
653  */
654  for (int i = 0; i < pvs->nindexes; i++)
655  {
656  PVIndStats *indstats = &(pvs->indstats[i]);
657 
659  indstats->status = new_status;
660  indstats->parallel_workers_can_process =
661  (pvs->will_parallel_vacuum[i] &&
663  num_index_scans,
664  vacuum));
665  }
666 
667  /* Reset the parallel index processing and progress counters */
668  pg_atomic_write_u32(&(pvs->shared->idx), 0);
669 
670  /* Setup the shared cost-based vacuum delay and launch workers */
671  if (nworkers > 0)
672  {
673  /* Reinitialize parallel context to relaunch parallel workers */
674  if (num_index_scans > 0)
676 
677  /*
678  * Set up shared cost balance and the number of active workers for
679  * vacuum delay. We need to do this before launching workers as
680  * otherwise, they might not see the updated values for these
681  * parameters.
682  */
685 
686  /*
687  * The number of workers can vary between bulkdelete and cleanup
688  * phase.
689  */
690  ReinitializeParallelWorkers(pvs->pcxt, nworkers);
691 
693 
694  if (pvs->pcxt->nworkers_launched > 0)
695  {
696  /*
697  * Reset the local cost values for leader backend as we have
698  * already accumulated the remaining balance of heap.
699  */
700  VacuumCostBalance = 0;
702 
703  /* Enable shared cost balance for leader backend */
706  }
707 
708  if (vacuum)
709  ereport(pvs->shared->elevel,
710  (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
711  "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
712  pvs->pcxt->nworkers_launched),
713  pvs->pcxt->nworkers_launched, nworkers)));
714  else
715  ereport(pvs->shared->elevel,
716  (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
717  "launched %d parallel vacuum workers for index cleanup (planned: %d)",
718  pvs->pcxt->nworkers_launched),
719  pvs->pcxt->nworkers_launched, nworkers)));
720  }
721 
722  /* Vacuum the indexes that can be processed by only leader process */
724 
725  /*
726  * Join as a parallel worker. The leader vacuums alone processes all
727  * parallel-safe indexes in the case where no workers are launched.
728  */
730 
731  /*
732  * Next, accumulate buffer and WAL usage. (This must wait for the workers
733  * to finish, or we might get incomplete data.)
734  */
735  if (nworkers > 0)
736  {
737  /* Wait for all vacuum workers to finish */
739 
740  for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
742  }
743 
744  /*
745  * Reset all index status back to initial (while checking that we have
746  * vacuumed all indexes).
747  */
748  for (int i = 0; i < pvs->nindexes; i++)
749  {
750  PVIndStats *indstats = &(pvs->indstats[i]);
751 
752  if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
753  elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
755 
757  }
758 
759  /*
760  * Carry the shared balance value to heap scan and disable shared costing
761  */
763  {
766  VacuumActiveNWorkers = NULL;
767  }
768 }
static void pg_atomic_write_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:276
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition: atomics.h:239
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
Definition: parallel.c:775
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition: parallel.c:552
void ReinitializeParallelDSM(ParallelContext *pcxt)
Definition: parallel.c:488
void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
Definition: parallel.c:538
#define ngettext(s, p, n)
Definition: c.h:1184
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition: instrument.c:218
bool parallel_workers_can_process
PVIndVacStatus status
int nworkers_launched
Definition: parallel.h:37
static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, bool vacuum)
static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)

References PVShared::active_nworkers, Assert, ParallelVacuumState::buffer_usage, PVShared::cost_balance, PVShared::elevel, elog, ereport, errmsg(), ERROR, i, PVShared::idx, ParallelVacuumState::indrels, ParallelVacuumState::indstats, InstrAccumParallelQuery(), IsParallelWorker, LaunchParallelWorkers(), Min, ngettext, ParallelVacuumState::nindexes, ParallelVacuumState::nindexes_parallel_bulkdel, ParallelVacuumState::nindexes_parallel_cleanup, ParallelVacuumState::nindexes_parallel_condcleanup, ParallelContext::nworkers, ParallelContext::nworkers_launched, PARALLEL_INDVAC_STATUS_COMPLETED, PARALLEL_INDVAC_STATUS_INITIAL, PARALLEL_INDVAC_STATUS_NEED_BULKDELETE, PARALLEL_INDVAC_STATUS_NEED_CLEANUP, parallel_vacuum_index_is_parallel_safe(), parallel_vacuum_process_safe_indexes(), parallel_vacuum_process_unsafe_indexes(), PVIndStats::parallel_workers_can_process, ParallelVacuumState::pcxt, pg_atomic_read_u32(), pg_atomic_write_u32(), ReinitializeParallelDSM(), ReinitializeParallelWorkers(), RelationGetRelationName, ParallelVacuumState::shared, PVIndStats::status, vacuum(), VacuumActiveNWorkers, VacuumCostBalance, VacuumCostBalanceLocal, VacuumSharedCostBalance, WaitForParallelWorkersToFinish(), ParallelVacuumState::wal_usage, and ParallelVacuumState::will_parallel_vacuum.

Referenced by parallel_vacuum_bulkdel_all_indexes(), and parallel_vacuum_cleanup_all_indexes().

◆ parallel_vacuum_process_one_index()

static void parallel_vacuum_process_one_index ( ParallelVacuumState pvs,
Relation  indrel,
PVIndStats indstats 
)
static

Definition at line 866 of file vacuumparallel.c.

868 {
869  IndexBulkDeleteResult *istat = NULL;
870  IndexBulkDeleteResult *istat_res;
871  IndexVacuumInfo ivinfo;
872 
873  /*
874  * Update the pointer to the corresponding bulk-deletion result if someone
875  * has already updated it
876  */
877  if (indstats->istat_updated)
878  istat = &(indstats->istat);
879 
880  ivinfo.index = indrel;
881  ivinfo.heaprel = pvs->heaprel;
882  ivinfo.analyze_only = false;
883  ivinfo.report_progress = false;
884  ivinfo.message_level = DEBUG2;
885  ivinfo.estimated_count = pvs->shared->estimated_count;
886  ivinfo.num_heap_tuples = pvs->shared->reltuples;
887  ivinfo.strategy = pvs->bstrategy;
888 
889  /* Update error traceback information */
890  pvs->indname = pstrdup(RelationGetRelationName(indrel));
891  pvs->status = indstats->status;
892 
893  switch (indstats->status)
894  {
896  istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items,
897  &pvs->shared->dead_items_info);
898  break;
900  istat_res = vac_cleanup_one_index(&ivinfo, istat);
901  break;
902  default:
903  elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
904  indstats->status,
905  RelationGetRelationName(indrel));
906  }
907 
908  /*
909  * Copy the index bulk-deletion result returned from ambulkdelete and
910  * amvacuumcleanup to the DSM segment if it's the first cycle because they
911  * allocate locally and it's possible that an index will be vacuumed by a
912  * different vacuum process the next cycle. Copying the result normally
913  * happens only the first time an index is vacuumed. For any additional
914  * vacuum pass, we directly point to the result on the DSM segment and
915  * pass it to vacuum index APIs so that workers can update it directly.
916  *
917  * Since all vacuum workers write the bulk-deletion result at different
918  * slots we can write them without locking.
919  */
920  if (!indstats->istat_updated && istat_res != NULL)
921  {
922  memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
923  indstats->istat_updated = true;
924 
925  /* Free the locally-allocated bulk-deletion result */
926  pfree(istat_res);
927  }
928 
929  /*
930  * Update the status to completed. No need to lock here since each worker
931  * touches different indexes.
932  */
934 
935  /* Reset error traceback information */
937  pfree(pvs->indname);
938  pvs->indname = NULL;
939 
940  /*
941  * Call the parallel variant of pgstat_progress_incr_param so workers can
942  * report progress of index vacuum to the leader.
943  */
945 }
void pgstat_progress_parallel_incr_param(int index, int64 incr)
#define DEBUG2
Definition: elog.h:29
#define PROGRESS_VACUUM_INDEXES_PROCESSED
Definition: progress.h:30
Relation index
Definition: genam.h:46
double num_heap_tuples
Definition: genam.h:52
bool analyze_only
Definition: genam.h:48
BufferAccessStrategy strategy
Definition: genam.h:53
Relation heaprel
Definition: genam.h:47
bool report_progress
Definition: genam.h:49
int message_level
Definition: genam.h:51
bool estimated_count
Definition: genam.h:50
IndexBulkDeleteResult * vac_bulkdel_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat, TidStore *dead_items, VacDeadItemsInfo *dead_items_info)
Definition: vacuum.c:2516
IndexBulkDeleteResult * vac_cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat)
Definition: vacuum.c:2537

References IndexVacuumInfo::analyze_only, ParallelVacuumState::bstrategy, ParallelVacuumState::dead_items, PVShared::dead_items_info, DEBUG2, elog, ERROR, PVShared::estimated_count, IndexVacuumInfo::estimated_count, ParallelVacuumState::heaprel, IndexVacuumInfo::heaprel, IndexVacuumInfo::index, ParallelVacuumState::indname, PVIndStats::istat, PVIndStats::istat_updated, IndexVacuumInfo::message_level, IndexVacuumInfo::num_heap_tuples, PARALLEL_INDVAC_STATUS_COMPLETED, PARALLEL_INDVAC_STATUS_NEED_BULKDELETE, PARALLEL_INDVAC_STATUS_NEED_CLEANUP, pfree(), pgstat_progress_parallel_incr_param(), PROGRESS_VACUUM_INDEXES_PROCESSED, pstrdup(), RelationGetRelationName, PVShared::reltuples, IndexVacuumInfo::report_progress, ParallelVacuumState::shared, PVIndStats::status, ParallelVacuumState::status, IndexVacuumInfo::strategy, vac_bulkdel_one_index(), and vac_cleanup_one_index().

Referenced by parallel_vacuum_process_safe_indexes(), and parallel_vacuum_process_unsafe_indexes().

◆ parallel_vacuum_process_safe_indexes()

static void parallel_vacuum_process_safe_indexes ( ParallelVacuumState pvs)
static

Definition at line 775 of file vacuumparallel.c.

776 {
777  /*
778  * Increment the active worker count if we are able to launch any worker.
779  */
782 
783  /* Loop until all indexes are vacuumed */
784  for (;;)
785  {
786  int idx;
787  PVIndStats *indstats;
788 
789  /* Get an index number to process */
790  idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
791 
792  /* Done for all indexes? */
793  if (idx >= pvs->nindexes)
794  break;
795 
796  indstats = &(pvs->indstats[idx]);
797 
798  /*
799  * Skip vacuuming index that is unsafe for workers or has an
800  * unsuitable target for parallel index vacuum (this is vacuumed in
801  * parallel_vacuum_process_unsafe_indexes() by the leader).
802  */
803  if (!indstats->parallel_workers_can_process)
804  continue;
805 
806  /* Do vacuum or cleanup of the index */
807  parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
808  }
809 
810  /*
811  * We have completed the index vacuum so decrement the active worker
812  * count.
813  */
816 }
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:259
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition: atomics.h:439
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:366
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:424
static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, PVIndStats *indstats)

References idx(), PVShared::idx, ParallelVacuumState::indrels, ParallelVacuumState::indstats, ParallelVacuumState::nindexes, parallel_vacuum_process_one_index(), PVIndStats::parallel_workers_can_process, pg_atomic_add_fetch_u32(), pg_atomic_fetch_add_u32(), pg_atomic_sub_fetch_u32(), ParallelVacuumState::shared, and VacuumActiveNWorkers.

Referenced by parallel_vacuum_main(), and parallel_vacuum_process_all_indexes().

◆ parallel_vacuum_process_unsafe_indexes()

static void parallel_vacuum_process_unsafe_indexes ( ParallelVacuumState pvs)
static

Definition at line 829 of file vacuumparallel.c.

830 {
832 
833  /*
834  * Increment the active worker count if we are able to launch any worker.
835  */
838 
839  for (int i = 0; i < pvs->nindexes; i++)
840  {
841  PVIndStats *indstats = &(pvs->indstats[i]);
842 
843  /* Skip, indexes that are safe for workers */
844  if (indstats->parallel_workers_can_process)
845  continue;
846 
847  /* Do vacuum or cleanup of the index */
848  parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
849  }
850 
851  /*
852  * We have completed the index vacuum so decrement the active worker
853  * count.
854  */
857 }

References Assert, i, ParallelVacuumState::indrels, ParallelVacuumState::indstats, IsParallelWorker, ParallelVacuumState::nindexes, parallel_vacuum_process_one_index(), PVIndStats::parallel_workers_can_process, pg_atomic_add_fetch_u32(), pg_atomic_sub_fetch_u32(), and VacuumActiveNWorkers.

Referenced by parallel_vacuum_process_all_indexes().

◆ parallel_vacuum_reset_dead_items()

void parallel_vacuum_reset_dead_items ( ParallelVacuumState pvs)

Definition at line 475 of file vacuumparallel.c.

476 {
477  TidStore *dead_items = pvs->dead_items;
478  VacDeadItemsInfo *dead_items_info = &(pvs->shared->dead_items_info);
479 
480  /*
481  * Free the current tidstore and return allocated DSA segments to the
482  * operating system. Then we recreate the tidstore with the same max_bytes
483  * limitation we just used.
484  */
485  TidStoreDestroy(dead_items);
486  pvs->dead_items = TidStoreCreateShared(dead_items_info->max_bytes,
488 
489  /* Update the DSA pointer for dead_items to the new one */
491  pvs->shared->dead_items_handle = TidStoreGetHandle(dead_items);
492 
493  /* Reset the counter */
494  dead_items_info->num_items = 0;
495 }
int64 num_items
Definition: vacuum.h:288

References ParallelVacuumState::dead_items, PVShared::dead_items_dsa_handle, PVShared::dead_items_handle, PVShared::dead_items_info, dsa_get_handle(), LWTRANCHE_PARALLEL_VACUUM_DSA, VacDeadItemsInfo::max_bytes, VacDeadItemsInfo::num_items, ParallelVacuumState::shared, TidStoreCreateShared(), TidStoreDestroy(), TidStoreGetDSA(), and TidStoreGetHandle().

Referenced by dead_items_reset().