47 #define PARALLEL_VACUUM_KEY_SHARED 1
48 #define PARALLEL_VACUUM_KEY_QUERY_TEXT 2
49 #define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3
50 #define PARALLEL_VACUUM_KEY_WAL_USAGE 4
51 #define PARALLEL_VACUUM_KEY_INDEX_STATS 5
224 bool *will_parallel_vacuum);
243 int nrequested_workers,
int vac_work_mem,
253 bool *will_parallel_vacuum;
254 Size est_indstats_len;
256 int nindexes_mwm = 0;
257 int parallel_workers = 0;
264 Assert(nrequested_workers >= 0);
270 will_parallel_vacuum = (
bool *)
palloc0(
sizeof(
bool) * nindexes);
273 will_parallel_vacuum);
274 if (parallel_workers <= 0)
277 pfree(will_parallel_vacuum);
333 MemSet(indstats, 0, est_indstats_len);
334 for (
int i = 0;
i < nindexes;
i++)
347 if (!will_parallel_vacuum[
i])
369 MemSet(shared, 0, est_shared_len);
415 sharedquery[querylen] =
'\0';
519 int num_index_scans,
bool estimated_count)
549 bool *will_parallel_vacuum)
551 int nindexes_parallel = 0;
552 int nindexes_parallel_bulkdel = 0;
553 int nindexes_parallel_cleanup = 0;
554 int parallel_workers;
566 for (
int i = 0;
i < nindexes;
i++)
576 will_parallel_vacuum[
i] =
true;
579 nindexes_parallel_bulkdel++;
582 nindexes_parallel_cleanup++;
585 nindexes_parallel =
Max(nindexes_parallel_bulkdel,
586 nindexes_parallel_cleanup);
592 if (nindexes_parallel <= 0)
596 parallel_workers = (nrequested > 0) ?
597 Min(nrequested, nindexes_parallel) : nindexes_parallel;
602 return parallel_workers;
633 if (num_index_scans == 0)
657 indstats->
status = new_status;
672 if (num_index_scans > 0)
708 (
errmsg(
ngettext(
"launched %d parallel vacuum worker for index vacuuming (planned: %d)",
709 "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
714 (
errmsg(
ngettext(
"launched %d parallel vacuum worker for index cleanup (planned: %d)",
715 "launched %d parallel vacuum workers for index cleanup (planned: %d)",
751 elog(
ERROR,
"parallel index vacuum on index \"%s\" is not completed",
876 istat = &(indstats->
istat);
878 ivinfo.
index = indrel;
901 elog(
ERROR,
"unexpected parallel vacuum index status %d for index \"%s\"",
974 if (num_index_scans > 0 &&
1008 elog(
DEBUG1,
"starting parallel vacuum worker");
1070 errcallback.
arg = &pvs;
1110 errcontext(
"while vacuuming index \"%s\" of relation \"%s.%s\"",
1116 errcontext(
"while cleaning up index \"%s\" of relation \"%s.%s\"",
Datum idx(PG_FUNCTION_ARGS)
int min_parallel_index_scan_size
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
static void pg_atomic_write_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
void VacuumUpdateCosts(void)
void InitializeParallelDSM(ParallelContext *pcxt)
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
void LaunchParallelWorkers(ParallelContext *pcxt)
void ReinitializeParallelDSM(ParallelContext *pcxt)
void DestroyParallelContext(ParallelContext *pcxt)
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
void pgstat_progress_parallel_incr_param(int index, int64 incr)
void pgstat_report_activity(BackendState state, const char *cmd_str)
#define RelationGetNumberOfBlocks(reln)
#define ngettext(s, p, n)
#define Assert(condition)
#define MemSet(start, val, len)
dsa_handle dsa_get_handle(dsa_area *area)
ErrorContextCallback * error_context_stack
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
BufferAccessStrategy GetAccessStrategyWithSize(BufferAccessStrategyType btype, int ring_size_kb)
int GetAccessStrategyBufferCount(BufferAccessStrategy strategy)
void FreeAccessStrategy(BufferAccessStrategy strategy)
int max_parallel_maintenance_workers
#define IsParallelWorker()
void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
void InstrStartParallelQuery(void)
#define ShareUpdateExclusiveLock
char * get_namespace_name(Oid nspid)
@ LWTRANCHE_PARALLEL_VACUUM_DSA
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
const char * debug_query_string
#define PROGRESS_VACUUM_INDEXES_PROCESSED
#define RelationGetRelid(relation)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_estimate_keys(e, cnt)
Size mul_size(Size s1, Size s2)
struct ErrorContextCallback * previous
void(* callback)(void *arg)
bool amusemaintenanceworkmem
uint8 amparallelvacuumoptions
BufferAccessStrategy strategy
IndexBulkDeleteResult istat
bool parallel_workers_can_process
pg_atomic_uint32 cost_balance
int maintenance_work_mem_worker
pg_atomic_uint32 active_nworkers
dsa_pointer dead_items_handle
dsa_handle dead_items_dsa_handle
VacDeadItemsInfo dead_items_info
shm_toc_estimator estimator
int nindexes_parallel_bulkdel
BufferAccessStrategy bstrategy
int nindexes_parallel_cleanup
int nindexes_parallel_condcleanup
BufferUsage * buffer_usage
bool * will_parallel_vacuum
struct IndexAmRoutine * rd_indam
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
TidStore * TidStoreAttach(dsa_handle area_handle, dsa_pointer handle)
void TidStoreDetach(TidStore *ts)
TidStore * TidStoreCreateShared(size_t max_bytes, int tranche_id)
void TidStoreDestroy(TidStore *ts)
dsa_area * TidStoreGetDSA(TidStore *ts)
dsa_pointer TidStoreGetHandle(TidStore *ts)
IndexBulkDeleteResult * vac_bulkdel_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat, TidStore *dead_items, VacDeadItemsInfo *dead_items_info)
pg_atomic_uint32 * VacuumActiveNWorkers
void vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel)
void vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, MemoryContext vac_context, bool isTopLevel)
int VacuumCostBalanceLocal
void vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
pg_atomic_uint32 * VacuumSharedCostBalance
IndexBulkDeleteResult * vac_cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat)
#define VACUUM_OPTION_PARALLEL_CLEANUP
#define VACUUM_OPTION_NO_PARALLEL
#define VACUUM_OPTION_PARALLEL_BULKDEL
#define VACUUM_OPTION_MAX_VALID_VALUE
#define VACUUM_OPTION_PARALLEL_COND_CLEANUP
static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, bool vacuum)
static void parallel_vacuum_error_callback(void *arg)
static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, bool *will_parallel_vacuum)
#define PARALLEL_VACUUM_KEY_INDEX_STATS
#define PARALLEL_VACUUM_KEY_QUERY_TEXT
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE
ParallelVacuumState * parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int vac_work_mem, int elevel, BufferAccessStrategy bstrategy)
#define PARALLEL_VACUUM_KEY_SHARED
void parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans)
#define PARALLEL_VACUUM_KEY_WAL_USAGE
void parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, bool estimated_count)
static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, PVIndStats *indstats)
void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, bool vacuum)
static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
TidStore * parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
struct PVIndStats PVIndStats
void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
@ PARALLEL_INDVAC_STATUS_NEED_CLEANUP
@ PARALLEL_INDVAC_STATUS_INITIAL
@ PARALLEL_INDVAC_STATUS_NEED_BULKDELETE
@ PARALLEL_INDVAC_STATUS_COMPLETED
void ExitParallelMode(void)
void EnterParallelMode(void)