57#define PARALLEL_VACUUM_KEY_SHARED 1
58#define PARALLEL_VACUUM_KEY_QUERY_TEXT 2
59#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3
60#define PARALLEL_VACUUM_KEY_WAL_USAGE 4
61#define PARALLEL_VACUUM_KEY_INDEX_STATS 5
285 bool *will_parallel_vacuum);
306 int nrequested_workers,
int vac_work_mem,
316 bool *will_parallel_vacuum;
317 Size est_indstats_len;
319 int nindexes_mwm = 0;
320 int parallel_workers = 0;
327 Assert(nrequested_workers >= 0);
336 will_parallel_vacuum);
337 if (parallel_workers <= 0)
340 pfree(will_parallel_vacuum);
396 MemSet(indstats, 0, est_indstats_len);
397 for (
int i = 0;
i < nindexes;
i++)
410 if (!will_parallel_vacuum[
i])
432 MemSet(shared, 0, est_shared_len);
438 vac_work_mem /
Min(parallel_workers, nindexes_mwm) :
445 LWTRANCHE_PARALLEL_VACUUM_DSA);
496 sharedquery[querylen] =
'\0';
581 LWTRANCHE_PARALLEL_VACUUM_DSA);
615 int num_index_scans,
bool estimated_count,
681 "parallel autovacuum worker updated cost params: cost_limit=%d, cost_delay=%g, cost_page_miss=%d, cost_page_dirty=%d, cost_page_hit=%d",
743 bool *will_parallel_vacuum)
745 int nindexes_parallel = 0;
746 int nindexes_parallel_bulkdel = 0;
747 int nindexes_parallel_cleanup = 0;
748 int parallel_workers;
765 for (
int i = 0;
i < nindexes;
i++)
775 will_parallel_vacuum[
i] =
true;
778 nindexes_parallel_bulkdel++;
781 nindexes_parallel_cleanup++;
784 nindexes_parallel =
Max(nindexes_parallel_bulkdel,
785 nindexes_parallel_cleanup);
791 if (nindexes_parallel <= 0)
795 parallel_workers = (nrequested > 0) ?
796 Min(nrequested, nindexes_parallel) : nindexes_parallel;
799 parallel_workers =
Min(parallel_workers, max_workers);
801 return parallel_workers;
834 if (num_index_scans == 0)
850 if (wstats != NULL && nworkers > 0)
862 indstats->
status = new_status;
877 if (num_index_scans > 0)
917 (
errmsg(
ngettext(
"launched %d parallel vacuum worker for index vacuuming (planned: %d)",
918 "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
923 (
errmsg(
ngettext(
"launched %d parallel vacuum worker for index cleanup (planned: %d)",
924 "launched %d parallel vacuum workers for index cleanup (planned: %d)",
960 elog(
ERROR,
"parallel index vacuum on index \"%s\" is not completed",
1085 istat = &(indstats->
istat);
1087 ivinfo.
index = indrel;
1100 switch (indstats->
status)
1110 elog(
ERROR,
"unexpected parallel vacuum index status %d for index \"%s\"",
1183 if (num_index_scans > 0 &&
1217 elog(
DEBUG1,
"starting parallel vacuum worker");
1303 errcallback.
arg = &pvs;
1351 errcontext(
"while vacuuming index \"%s\" of relation \"%s.%s\"",
1357 errcontext(
"while cleaning up index \"%s\" of relation \"%s.%s\"",
Datum idx(PG_FUNCTION_ARGS)
int min_parallel_index_scan_size
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
static void pg_atomic_write_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
void VacuumUpdateCosts(void)
void InitializeParallelDSM(ParallelContext *pcxt)
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
void LaunchParallelWorkers(ParallelContext *pcxt)
void ReinitializeParallelDSM(ParallelContext *pcxt)
void DestroyParallelContext(ParallelContext *pcxt)
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
void pgstat_progress_parallel_incr_param(int index, int64 incr)
void pgstat_report_query_id(int64 query_id, bool force)
int64 pgstat_get_my_query_id(void)
void pgstat_report_activity(BackendState state, const char *cmd_str)
#define RelationGetNumberOfBlocks(reln)
#define ngettext(s, p, n)
#define Assert(condition)
#define MemSet(start, val, len)
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
dsa_handle dsa_get_handle(dsa_area *area)
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
ErrorContextCallback * error_context_stack
#define ereport(elevel,...)
#define palloc0_array(type, count)
#define palloc0_object(type)
BufferAccessStrategy GetAccessStrategyWithSize(BufferAccessStrategyType btype, int ring_size_kb)
int GetAccessStrategyBufferCount(BufferAccessStrategy strategy)
void FreeAccessStrategy(BufferAccessStrategy strategy)
int max_parallel_maintenance_workers
int autovacuum_max_parallel_workers
#define IsParallelWorker()
void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
void InstrStartParallelQuery(void)
#define ShareUpdateExclusiveLock
char * get_namespace_name(Oid nspid)
char * pstrdup(const char *in)
void pfree(void *pointer)
#define AmAutoVacuumWorkerProcess()
const char * debug_query_string
#define PROGRESS_VACUUM_DELAY_TIME
#define PROGRESS_VACUUM_INDEXES_PROCESSED
#define RelationGetRelid(relation)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_estimate_keys(e, cnt)
Size mul_size(Size s1, Size s2)
static void SpinLockRelease(volatile slock_t *lock)
static void SpinLockAcquire(volatile slock_t *lock)
static void SpinLockInit(volatile slock_t *lock)
struct ErrorContextCallback * previous
void(* callback)(void *arg)
bool amusemaintenanceworkmem
uint8 amparallelvacuumoptions
BufferAccessStrategy strategy
IndexBulkDeleteResult istat
bool parallel_workers_can_process
pg_atomic_uint32 generation
pg_atomic_uint32 cost_balance
int maintenance_work_mem_worker
pg_atomic_uint32 active_nworkers
PVSharedCostParams cost_params
dsa_pointer dead_items_handle
dsa_handle dead_items_dsa_handle
VacDeadItemsInfo dead_items_info
shm_toc_estimator estimator
int nindexes_parallel_bulkdel
BufferAccessStrategy bstrategy
int nindexes_parallel_cleanup
int nindexes_parallel_condcleanup
BufferUsage * buffer_usage
bool * will_parallel_vacuum
const struct IndexAmRoutine * rd_indam
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
dsa_area * TidStoreGetDSA(TidStore *ts)
void TidStoreDetach(TidStore *ts)
void TidStoreDestroy(TidStore *ts)
TidStore * TidStoreAttach(dsa_handle area_handle, dsa_pointer handle)
dsa_pointer TidStoreGetHandle(TidStore *ts)
TidStore * TidStoreCreateShared(size_t max_bytes, int tranche_id)
pg_atomic_uint32 * VacuumActiveNWorkers
void vacuum(List *relations, const VacuumParams *params, BufferAccessStrategy bstrategy, MemoryContext vac_context, bool isTopLevel)
bool track_cost_delay_timing
void vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel)
int VacuumCostBalanceLocal
IndexBulkDeleteResult * vac_cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat)
int64 parallel_vacuum_worker_delay_ns
void vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
pg_atomic_uint32 * VacuumSharedCostBalance
IndexBulkDeleteResult * vac_bulkdel_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat, TidStore *dead_items, VacDeadItemsInfo *dead_items_info)
#define VACUUM_OPTION_PARALLEL_CLEANUP
#define VACUUM_OPTION_NO_PARALLEL
#define VACUUM_OPTION_PARALLEL_BULKDEL
#define VACUUM_OPTION_MAX_VALID_VALUE
#define VACUUM_OPTION_PARALLEL_COND_CLEANUP
static uint32 shared_params_generation_local
void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, bool estimated_count, PVWorkerStats *wstats)
static void parallel_vacuum_error_callback(void *arg)
TidStore * parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
void parallel_vacuum_update_shared_delay_params(void)
static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, bool *will_parallel_vacuum)
#define PARALLEL_VACUUM_KEY_INDEX_STATS
static PVSharedCostParams * pv_shared_cost_params
#define PARALLEL_VACUUM_KEY_QUERY_TEXT
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE
void parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, PVWorkerStats *wstats)
ParallelVacuumState * parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int vac_work_mem, int elevel, BufferAccessStrategy bstrategy)
#define PARALLEL_VACUUM_KEY_SHARED
#define PARALLEL_VACUUM_KEY_WAL_USAGE
void parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, bool vacuum, PVWorkerStats *wstats)
void parallel_vacuum_propagate_shared_delay_params(void)
static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, PVIndStats *indstats)
void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
static void parallel_vacuum_dsm_detach(dsm_segment *seg, Datum arg)
static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, bool vacuum)
static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
static void parallel_vacuum_set_cost_parameters(PVSharedCostParams *params)
void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
@ PARALLEL_INDVAC_STATUS_NEED_CLEANUP
@ PARALLEL_INDVAC_STATUS_INITIAL
@ PARALLEL_INDVAC_STATUS_NEED_BULKDELETE
@ PARALLEL_INDVAC_STATUS_COMPLETED
void ExitParallelMode(void)
void EnterParallelMode(void)