46 #define PARALLEL_VACUUM_KEY_SHARED 1
47 #define PARALLEL_VACUUM_KEY_DEAD_ITEMS 2
48 #define PARALLEL_VACUUM_KEY_QUERY_TEXT 3
49 #define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4
50 #define PARALLEL_VACUUM_KEY_WAL_USAGE 5
51 #define PARALLEL_VACUUM_KEY_INDEX_STATS 6
215 bool *will_parallel_vacuum);
234 int nrequested_workers,
int max_items,
244 bool *will_parallel_vacuum;
245 Size est_indstats_len;
247 Size est_dead_items_len;
248 int nindexes_mwm = 0;
249 int parallel_workers = 0;
256 Assert(nrequested_workers >= 0);
262 will_parallel_vacuum = (
bool *)
palloc0(
sizeof(
bool) * nindexes);
265 will_parallel_vacuum);
266 if (parallel_workers <= 0)
269 pfree(will_parallel_vacuum);
330 MemSet(indstats, 0, est_indstats_len);
331 for (
int i = 0;
i < nindexes;
i++)
344 if (!will_parallel_vacuum[
i])
366 MemSet(shared, 0, est_shared_len);
413 sharedquery[querylen] =
'\0';
488 int num_index_scans,
bool estimated_count)
518 bool *will_parallel_vacuum)
520 int nindexes_parallel = 0;
521 int nindexes_parallel_bulkdel = 0;
522 int nindexes_parallel_cleanup = 0;
523 int parallel_workers;
535 for (
int i = 0;
i < nindexes;
i++)
545 will_parallel_vacuum[
i] =
true;
548 nindexes_parallel_bulkdel++;
551 nindexes_parallel_cleanup++;
554 nindexes_parallel =
Max(nindexes_parallel_bulkdel,
555 nindexes_parallel_cleanup);
561 if (nindexes_parallel <= 0)
565 parallel_workers = (nrequested > 0) ?
566 Min(nrequested, nindexes_parallel) : nindexes_parallel;
571 return parallel_workers;
602 if (num_index_scans == 0)
626 indstats->
status = new_status;
641 if (num_index_scans > 0)
677 (
errmsg(
ngettext(
"launched %d parallel vacuum worker for index vacuuming (planned: %d)",
678 "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
683 (
errmsg(
ngettext(
"launched %d parallel vacuum worker for index cleanup (planned: %d)",
684 "launched %d parallel vacuum workers for index cleanup (planned: %d)",
720 elog(
ERROR,
"parallel index vacuum on index \"%s\" is not completed",
845 istat = &(indstats->
istat);
847 ivinfo.
index = indrel;
869 elog(
ERROR,
"unexpected parallel vacuum index status %d for index \"%s\"",
936 if (num_index_scans > 0 &&
970 elog(
DEBUG1,
"starting parallel vacuum worker");
1036 errcallback.
arg = &pvs;
1074 errcontext(
"while vacuuming index \"%s\" of relation \"%s.%s\"",
1080 errcontext(
"while cleaning up index \"%s\" of relation \"%s.%s\"",
Datum idx(PG_FUNCTION_ARGS)
int min_parallel_index_scan_size
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
static void pg_atomic_write_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
void VacuumUpdateCosts(void)
void InitializeParallelDSM(ParallelContext *pcxt)
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
void LaunchParallelWorkers(ParallelContext *pcxt)
void ReinitializeParallelDSM(ParallelContext *pcxt)
void DestroyParallelContext(ParallelContext *pcxt)
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
void pgstat_report_activity(BackendState state, const char *cmd_str)
#define RelationGetNumberOfBlocks(reln)
#define ngettext(s, p, n)
#define MemSet(start, val, len)
elog(ERROR, "%s: %s", p2, msg)
ErrorContextCallback * error_context_stack
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
BufferAccessStrategy GetAccessStrategyWithSize(BufferAccessStrategyType btype, int ring_size_kb)
int GetAccessStrategyBufferCount(BufferAccessStrategy strategy)
void FreeAccessStrategy(BufferAccessStrategy strategy)
int max_parallel_maintenance_workers
#define IsParallelWorker()
void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
void InstrStartParallelQuery(void)
Assert(fmt[strlen(fmt) - 1] !='\n')
#define ShareUpdateExclusiveLock
char * get_namespace_name(Oid nspid)
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
const char * debug_query_string
#define RelationGetRelid(relation)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_estimate_keys(e, cnt)
Size mul_size(Size s1, Size s2)
struct ErrorContextCallback * previous
void(* callback)(void *arg)
bool amusemaintenanceworkmem
uint8 amparallelvacuumoptions
BufferAccessStrategy strategy
IndexBulkDeleteResult istat
bool parallel_workers_can_process
pg_atomic_uint32 cost_balance
int maintenance_work_mem_worker
pg_atomic_uint32 active_nworkers
shm_toc_estimator estimator
int nindexes_parallel_bulkdel
BufferAccessStrategy bstrategy
int nindexes_parallel_cleanup
int nindexes_parallel_condcleanup
BufferUsage * buffer_usage
VacDeadItems * dead_items
bool * will_parallel_vacuum
struct IndexAmRoutine * rd_indam
ItemPointerData items[FLEXIBLE_ARRAY_MEMBER]
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
IndexBulkDeleteResult * vac_bulkdel_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat, VacDeadItems *dead_items)
pg_atomic_uint32 * VacuumActiveNWorkers
Size vac_max_items_to_alloc_size(int max_items)
void vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel)
void vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, MemoryContext vac_context, bool isTopLevel)
int VacuumCostBalanceLocal
void vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
pg_atomic_uint32 * VacuumSharedCostBalance
IndexBulkDeleteResult * vac_cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat)
#define VACUUM_OPTION_PARALLEL_CLEANUP
#define VACUUM_OPTION_NO_PARALLEL
#define VACUUM_OPTION_PARALLEL_BULKDEL
#define VACUUM_OPTION_MAX_VALID_VALUE
#define VACUUM_OPTION_PARALLEL_COND_CLEANUP
static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, bool vacuum)
static void parallel_vacuum_error_callback(void *arg)
static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, bool *will_parallel_vacuum)
#define PARALLEL_VACUUM_KEY_INDEX_STATS
#define PARALLEL_VACUUM_KEY_QUERY_TEXT
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE
VacDeadItems * parallel_vacuum_get_dead_items(ParallelVacuumState *pvs)
#define PARALLEL_VACUUM_KEY_SHARED
void parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans)
#define PARALLEL_VACUUM_KEY_WAL_USAGE
void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, bool estimated_count)
static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, PVIndStats *indstats)
void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, bool vacuum)
static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
#define PARALLEL_VACUUM_KEY_DEAD_ITEMS
struct PVIndStats PVIndStats
void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
ParallelVacuumState * parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int max_items, int elevel, BufferAccessStrategy bstrategy)
@ PARALLEL_INDVAC_STATUS_NEED_CLEANUP
@ PARALLEL_INDVAC_STATUS_INITIAL
@ PARALLEL_INDVAC_STATUS_NEED_BULKDELETE
@ PARALLEL_INDVAC_STATUS_COMPLETED
void ExitParallelMode(void)
void EnterParallelMode(void)