47#define PARALLEL_VACUUM_KEY_SHARED 1
48#define PARALLEL_VACUUM_KEY_QUERY_TEXT 2
49#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3
50#define PARALLEL_VACUUM_KEY_WAL_USAGE 4
51#define PARALLEL_VACUUM_KEY_INDEX_STATS 5
225 bool *will_parallel_vacuum);
244 int nrequested_workers,
int vac_work_mem,
254 bool *will_parallel_vacuum;
255 Size est_indstats_len;
257 int nindexes_mwm = 0;
258 int parallel_workers = 0;
265 Assert(nrequested_workers >= 0);
271 will_parallel_vacuum = (
bool *)
palloc0(
sizeof(
bool) * nindexes);
274 will_parallel_vacuum);
275 if (parallel_workers <= 0)
278 pfree(will_parallel_vacuum);
334 MemSet(indstats, 0, est_indstats_len);
335 for (
int i = 0;
i < nindexes;
i++)
348 if (!will_parallel_vacuum[
i])
370 MemSet(shared, 0, est_shared_len);
417 sharedquery[querylen] =
'\0';
520 int num_index_scans,
bool estimated_count)
550 bool *will_parallel_vacuum)
552 int nindexes_parallel = 0;
553 int nindexes_parallel_bulkdel = 0;
554 int nindexes_parallel_cleanup = 0;
555 int parallel_workers;
567 for (
int i = 0;
i < nindexes;
i++)
577 will_parallel_vacuum[
i] =
true;
580 nindexes_parallel_bulkdel++;
583 nindexes_parallel_cleanup++;
586 nindexes_parallel =
Max(nindexes_parallel_bulkdel,
587 nindexes_parallel_cleanup);
593 if (nindexes_parallel <= 0)
597 parallel_workers = (nrequested > 0) ?
598 Min(nrequested, nindexes_parallel) : nindexes_parallel;
603 return parallel_workers;
634 if (num_index_scans == 0)
658 indstats->
status = new_status;
673 if (num_index_scans > 0)
709 (
errmsg(
ngettext(
"launched %d parallel vacuum worker for index vacuuming (planned: %d)",
710 "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
715 (
errmsg(
ngettext(
"launched %d parallel vacuum worker for index cleanup (planned: %d)",
716 "launched %d parallel vacuum workers for index cleanup (planned: %d)",
752 elog(
ERROR,
"parallel index vacuum on index \"%s\" is not completed",
877 istat = &(indstats->
istat);
879 ivinfo.
index = indrel;
902 elog(
ERROR,
"unexpected parallel vacuum index status %d for index \"%s\"",
975 if (num_index_scans > 0 &&
1009 elog(
DEBUG1,
"starting parallel vacuum worker");
1074 errcallback.
arg = &pvs;
1114 errcontext(
"while vacuuming index \"%s\" of relation \"%s.%s\"",
1120 errcontext(
"while cleaning up index \"%s\" of relation \"%s.%s\"",
Datum idx(PG_FUNCTION_ARGS)
int min_parallel_index_scan_size
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
static void pg_atomic_write_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
void VacuumUpdateCosts(void)
void InitializeParallelDSM(ParallelContext *pcxt)
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
void LaunchParallelWorkers(ParallelContext *pcxt)
void ReinitializeParallelDSM(ParallelContext *pcxt)
void DestroyParallelContext(ParallelContext *pcxt)
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
void pgstat_progress_parallel_incr_param(int index, int64 incr)
uint64 pgstat_get_my_query_id(void)
void pgstat_report_query_id(uint64 query_id, bool force)
void pgstat_report_activity(BackendState state, const char *cmd_str)
#define RelationGetNumberOfBlocks(reln)
#define ngettext(s, p, n)
#define Assert(condition)
#define MemSet(start, val, len)
dsa_handle dsa_get_handle(dsa_area *area)
ErrorContextCallback * error_context_stack
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
BufferAccessStrategy GetAccessStrategyWithSize(BufferAccessStrategyType btype, int ring_size_kb)
int GetAccessStrategyBufferCount(BufferAccessStrategy strategy)
void FreeAccessStrategy(BufferAccessStrategy strategy)
int max_parallel_maintenance_workers
#define IsParallelWorker()
void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
void InstrStartParallelQuery(void)
#define ShareUpdateExclusiveLock
char * get_namespace_name(Oid nspid)
@ LWTRANCHE_PARALLEL_VACUUM_DSA
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
const char * debug_query_string
#define PROGRESS_VACUUM_INDEXES_PROCESSED
#define RelationGetRelid(relation)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_estimate_keys(e, cnt)
Size mul_size(Size s1, Size s2)
struct ErrorContextCallback * previous
void(* callback)(void *arg)
bool amusemaintenanceworkmem
uint8 amparallelvacuumoptions
BufferAccessStrategy strategy
IndexBulkDeleteResult istat
bool parallel_workers_can_process
pg_atomic_uint32 cost_balance
int maintenance_work_mem_worker
pg_atomic_uint32 active_nworkers
dsa_pointer dead_items_handle
dsa_handle dead_items_dsa_handle
VacDeadItemsInfo dead_items_info
shm_toc_estimator estimator
int nindexes_parallel_bulkdel
BufferAccessStrategy bstrategy
int nindexes_parallel_cleanup
int nindexes_parallel_condcleanup
BufferUsage * buffer_usage
bool * will_parallel_vacuum
struct IndexAmRoutine * rd_indam
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
dsa_area * TidStoreGetDSA(TidStore *ts)
void TidStoreDetach(TidStore *ts)
void TidStoreDestroy(TidStore *ts)
TidStore * TidStoreAttach(dsa_handle area_handle, dsa_pointer handle)
dsa_pointer TidStoreGetHandle(TidStore *ts)
TidStore * TidStoreCreateShared(size_t max_bytes, int tranche_id)
pg_atomic_uint32 * VacuumActiveNWorkers
void vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel)
void vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, MemoryContext vac_context, bool isTopLevel)
int VacuumCostBalanceLocal
IndexBulkDeleteResult * vac_cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat)
void vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
pg_atomic_uint32 * VacuumSharedCostBalance
IndexBulkDeleteResult * vac_bulkdel_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat, TidStore *dead_items, VacDeadItemsInfo *dead_items_info)
#define VACUUM_OPTION_PARALLEL_CLEANUP
#define VACUUM_OPTION_NO_PARALLEL
#define VACUUM_OPTION_PARALLEL_BULKDEL
#define VACUUM_OPTION_MAX_VALID_VALUE
#define VACUUM_OPTION_PARALLEL_COND_CLEANUP
static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, bool vacuum)
static void parallel_vacuum_error_callback(void *arg)
TidStore * parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, bool *will_parallel_vacuum)
#define PARALLEL_VACUUM_KEY_INDEX_STATS
#define PARALLEL_VACUUM_KEY_QUERY_TEXT
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE
ParallelVacuumState * parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int vac_work_mem, int elevel, BufferAccessStrategy bstrategy)
#define PARALLEL_VACUUM_KEY_SHARED
void parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans)
#define PARALLEL_VACUUM_KEY_WAL_USAGE
void parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, bool estimated_count)
static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, PVIndStats *indstats)
void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, bool vacuum)
static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
struct PVIndStats PVIndStats
void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
@ PARALLEL_INDVAC_STATUS_NEED_CLEANUP
@ PARALLEL_INDVAC_STATUS_INITIAL
@ PARALLEL_INDVAC_STATUS_NEED_BULKDELETE
@ PARALLEL_INDVAC_STATUS_COMPLETED
void ExitParallelMode(void)
void EnterParallelMode(void)