94 elog(
ERROR,
"Hash node does not support ExecProcNode call convention");
436 size_t space_allowed;
465 state->parallel_state != NULL,
466 state->parallel_state != NULL ?
467 state->parallel_state->nparticipants - 1 : 0,
469 &nbuckets, &nbatch, &num_skew_mcvs);
472 log2_nbuckets =
my_log2(nbuckets);
473 Assert(nbuckets == (1 << log2_nbuckets));
495 hashtable->
nbatch = nbatch;
514 hashtable->
area =
state->ps.state->es_query_dsa;
518 printf(
"Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
519 hashtable, nbatch, nbuckets);
550 forboth(ho, hashOperators, hc, hashCollations)
557 elog(
ERROR,
"could not find hash function for hash operator %u",
665 #define NTUP_PER_BUCKET 1
669 bool try_combined_hash_mem,
670 int parallel_workers,
671 size_t *space_allowed,
677 double inner_rel_bytes;
678 size_t hash_table_bytes;
697 inner_rel_bytes = ntuples * tupsize;
709 if (try_combined_hash_mem)
714 newlimit = (double) hash_table_bytes * (
double) (parallel_workers + 1);
715 newlimit =
Min(newlimit, (
double) SIZE_MAX);
716 hash_table_bytes = (size_t) newlimit;
719 *space_allowed = hash_table_bytes;
737 size_t bytes_per_mcv;
750 bytes_per_mcv = tupsize +
754 skew_mcvs = hash_table_bytes / bytes_per_mcv;
763 skew_mcvs =
Min(skew_mcvs, INT_MAX);
765 *num_skew_mcvs = (int) skew_mcvs;
769 hash_table_bytes -= skew_mcvs * bytes_per_mcv;
790 max_pointers =
Min(max_pointers, INT_MAX / 2 + 1);
793 dbuckets =
Min(dbuckets, max_pointers);
794 nbuckets = (int) dbuckets;
796 nbuckets =
Max(nbuckets, 1024);
805 if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
817 if (try_combined_hash_mem)
820 false, parallel_workers,
836 sbuckets =
Min(sbuckets, max_pointers);
837 nbuckets = (int) sbuckets;
849 Assert(bucket_bytes <= hash_table_bytes / 2);
852 dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
853 dbatch =
Min(dbatch, max_pointers);
854 minbatch = (int) dbatch;
861 *numbuckets = nbuckets;
862 *numbatches = nbatch;
908 int oldnbatch = hashtable->
nbatch;
924 nbatch = oldnbatch * 2;
928 printf(
"Hashjoin %p: increasing nbatch to %d because space = %zu\n",
929 hashtable, nbatch, hashtable->
spaceUsed);
952 (nbatch - oldnbatch) *
sizeof(
BufFile *));
954 (nbatch - oldnbatch) *
sizeof(
BufFile *));
959 hashtable->
nbatch = nbatch;
965 ninmemory = nfreed = 0;
988 oldchunks = hashtable->
chunks;
992 while (oldchunks != NULL)
1000 while (idx < oldchunks->used)
1010 &bucketno, &batchno);
1012 if (batchno == curbatch)
1018 memcpy(copyTuple, hashTuple, hashTupleSize);
1027 Assert(batchno > curbatch);
1045 oldchunks = nextchunk;
1049 printf(
"Hashjoin %p: freed %ld of %ld tuples, space now %zu\n",
1050 hashtable, nfreed, ninmemory, hashtable->
spaceUsed);
1061 if (nfreed == 0 || nfreed == ninmemory)
1065 printf(
"Hashjoin %p: disabling further increase of nbatch\n",
1117 if (hashtable->
nbatch == 1)
1141 new_nbatch = hashtable->
nbatch * 2;
1166 dtuples = (old_batch0->
ntuples * 2.0) / new_nbatch;
1168 dbuckets =
Min(dbuckets,
1170 new_nbuckets = (int) dbuckets;
1171 new_nbuckets =
Max(new_nbuckets, 1024);
1180 for (
i = 0;
i < new_nbuckets; ++
i)
1236 bool space_exhausted =
false;
1237 bool extreme_skew_detected =
false;
1253 space_exhausted =
true;
1263 extreme_skew_detected =
true;
1268 if (extreme_skew_detected || hashtable->
nbatch >= INT_MAX / 2)
1270 else if (space_exhausted)
1306 while (idx < chunk->used)
1316 &bucketno, &batchno);
1318 Assert(batchno < hashtable->nbatch);
1373 for (
i = 1;
i < old_nbatch; ++
i)
1384 for (
i = 1;
i < old_nbatch; ++
i)
1414 pfree(old_inner_tuples);
1460 printf(
"Hashjoin %p: increasing nbuckets %d => %d\n",
1490 while (idx < chunk->used)
1497 &bucketno, &batchno);
1573 while (idx < chunk->used)
1581 &bucketno, &batchno);
1623 &bucketno, &batchno);
1628 if (batchno == hashtable->
curbatch)
1661 if (hashtable->
nbatch == 1 &&
1725 if (hashTuple == NULL)
1837 foreach(hk, hashkeys)
1887 *hashvalue = hashkey;
1929 *bucketno = hashvalue & (nbuckets - 1);
1935 *bucketno = hashvalue & (nbuckets - 1);
1966 if (hashTuple != NULL)
1973 while (hashTuple != NULL)
2024 if (hashTuple != NULL)
2030 while (hashTuple != NULL)
2099 if (hashTuple != NULL)
2116 while (hashTuple != NULL)
2161 int nbuckets = hashtable->
nbuckets;
2179 hashtable->
chunks = NULL;
2264 if (mcvsToUse > sslot.
nvalues)
2273 for (
i = 0;
i < mcvsToUse;
i++)
2311 mcvsToUse *
sizeof(
int));
2314 + mcvsToUse *
sizeof(
int);
2316 + mcvsToUse *
sizeof(
int);
2331 for (
i = 0;
i < mcvsToUse;
i++)
2346 bucket = hashvalue & (nbuckets - 1);
2347 while (hashtable->
skewBucket[bucket] != NULL &&
2349 bucket = (bucket + 1) & (nbuckets - 1);
2407 while (hashtable->
skewBucket[bucket] != NULL &&
2489 bucket = hashtable->
skewBucket[bucketToRemove];
2501 hashTuple = bucket->
tuples;
2502 while (hashTuple != NULL)
2517 if (batchno == hashtable->
curbatch)
2527 memcpy(copyTuple, hashTuple, tupleSize);
2547 hashTuple = nextHashTuple;
2565 hashtable->
skewBucket[bucketToRemove] = NULL;
2682 if (shared_info == NULL)
2744 newChunk->
used = size;
2751 if (hashtable->
chunks != NULL)
2759 hashtable->
chunks = newChunk;
2769 if ((hashtable->
chunks == NULL) ||
2777 newChunk->
used = size;
2781 hashtable->
chunks = newChunk;
2815 int curbatch = hashtable->
curbatch;
2824 if (chunk != NULL &&
2833 chunk->
used += size;
2894 if (hashtable->
nbatch == 1)
2901 hashtable->
nbuckets < (INT_MAX / 2) &&
2975 hashtable->
nbatch = nbatch;
3001 accessor->
shared = shared;
3060 if (hashtable->
batches != NULL)
3095 accessor->
shared = shared;
3097 accessor->
done =
false;
3127 for (
i = 0;
i < nbuckets; ++
i)
3141 int curbatch = hashtable->
curbatch;
3347 Assert(batchno < hashtable->nbatch);
3409 mem_limit =
Min(mem_limit, (
double) SIZE_MAX);
3411 return (
size_t) mem_limit;
Datum idx(PG_FUNCTION_ARGS)
void PrepareTempTablespaces(void)
bool BarrierArriveAndDetach(Barrier *barrier)
int BarrierAttach(Barrier *barrier)
void BarrierInit(Barrier *barrier, int participants)
int BarrierPhase(Barrier *barrier)
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
bool BarrierDetach(Barrier *barrier)
void BufFileClose(BufFile *file)
#define offsetof(type, field)
#define MemSet(start, val, len)
#define OidIsValid(objectId)
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
void dsa_free(dsa_area *area, dsa_pointer dp)
#define dsa_allocate0(area, size)
#define dsa_pointer_atomic_init
#define dsa_allocate(area, size)
#define dsa_pointer_atomic_write
#define InvalidDsaPointer
#define dsa_pointer_atomic_compare_exchange
#define dsa_pointer_atomic_read
pg_atomic_uint64 dsa_pointer_atomic
#define DsaPointerIsValid(x)
void ExecReScan(PlanState *node)
List * ExecInitExprList(List *nodes, PlanState *parent)
void ExecEndNode(PlanState *node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
void ExecAssignExprContext(EState *estate, PlanState *planstate)
void ExecFreeExprContext(PlanState *planstate)
#define outerPlanState(node)
struct HashJoinTableData * HashJoinTable
struct HashJoinTupleData * HashJoinTuple
struct HashInstrumentation HashInstrumentation
#define EXEC_FLAG_BACKWARD
#define ResetExprContext(econtext)
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
static TupleTableSlot * ExecProcNode(PlanState *node)
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
double hash_mem_multiplier
#define PHJ_GROW_BUCKETS_REINSERTING
struct HashMemoryChunkData * HashMemoryChunk
#define PHJ_GROW_BATCHES_FINISHING
#define PHJ_BUILD_HASHING_INNER
#define HASH_CHUNK_DATA(hc)
#define SKEW_MIN_OUTER_FRACTION
#define PHJ_GROW_BATCHES_ALLOCATING
#define PHJ_BUILD_HASHING_OUTER
#define PHJ_GROW_BUCKETS_PHASE(n)
#define PHJ_BATCH_PROBING
#define PHJ_BUILD_ELECTING
#define ParallelHashJoinBatchInner(batch)
#define PHJ_BUILD_ALLOCATING
#define NthParallelHashJoinBatch(base, n)
#define HASH_CHUNK_THRESHOLD
#define HJTUPLE_MINTUPLE(hjtup)
#define SKEW_BUCKET_OVERHEAD
#define PHJ_GROW_BATCHES_DECIDING
#define PHJ_GROW_BATCHES_ELECTING
#define PHJ_GROW_BATCHES_REPARTITIONING
#define HASH_CHUNK_HEADER_SIZE
#define ParallelHashJoinBatchOuter(batch, nparticipants)
#define PHJ_GROW_BUCKETS_ELECTING
#define SKEW_HASH_MEM_PERCENT
#define PHJ_GROW_BATCHES_PHASE(n)
@ PHJ_GROWTH_NEED_MORE_BUCKETS
@ PHJ_GROWTH_NEED_MORE_BATCHES
#define INVALID_SKEW_BUCKET_NO
#define EstimateParallelHashJoinBatch(hashtable)
#define PHJ_GROW_BUCKETS_ALLOCATING
void heap_free_minimal_tuple(MinimalTuple mtup)
#define HeapTupleIsValid(tuple)
#define HeapTupleHeaderHasMatch(tup)
#define SizeofMinimalTupleHeader
#define HeapTupleHeaderClearMatch(tup)
void InstrStartNode(Instrumentation *instr)
void InstrStopNode(Instrumentation *instr, double nTuples)
Assert(fmt[strlen(fmt) - 1] !='\n')
void free_attstatsslot(AttStatsSlot *sslot)
bool get_op_hash_functions(Oid opno, RegProcedure *lhs_procno, RegProcedure *rhs_procno)
bool get_attstatsslot(AttStatsSlot *sslot, HeapTuple statstuple, int reqkind, Oid reqop, int flags)
#define ATTSTATSSLOT_NUMBERS
#define ATTSTATSSLOT_VALUES
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void MemoryContextReset(MemoryContext context)
void pfree(void *pointer)
void * palloc0(Size size)
void * MemoryContextAllocZero(MemoryContext context, Size size)
MemoryContext CurrentMemoryContext
void * repalloc(void *pointer, Size size)
void * MemoryContextAlloc(MemoryContext context, Size size)
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
void ExecParallelHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
void ExecHashTableReset(HashJoinTable hashtable)
bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, bool outer_tuple, bool keep_nulls, uint32 *hashvalue)
void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
static void * dense_alloc(HashJoinTable hashtable, Size size)
static void MultiExecParallelHash(HashState *node)
void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
static void MultiExecPrivateHash(HashState *node)
void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
void ExecHashTableDetachBatch(HashJoinTable hashtable)
void ExecHashEstimate(HashState *node, ParallelContext *pcxt)
HashState * ExecInitHash(Hash *node, EState *estate, int eflags)
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
void ExecHashTableDetach(HashJoinTable hashtable)
void ExecHashTableDestroy(HashJoinTable hashtable)
int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
size_t get_hash_memory_limit(void)
bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
static void ExecHashSkewTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue, int bucketNumber)
static void ExecParallelHashRepartitionRest(HashJoinTable hashtable)
static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
void ExecHashTableResetMatchFlags(HashJoinTable hashtable)
static HashJoinTuple ExecParallelHashNextTuple(HashJoinTable table, HashJoinTuple tuple)
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable)
void ExecEndHash(HashState *node)
void ExecShutdownHash(HashState *node)
static HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable table, int bucketno)
void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable table, dsa_pointer *shared)
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
static void ExecParallelHashMergeCounters(HashJoinTable hashtable)
static TupleTableSlot * ExecHash(PlanState *pstate)
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
HashJoinTable ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, bool keepNulls)
void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
void ExecReScanHash(HashState *node)
bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable)
static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
void ExecHashRetrieveInstrumentation(HashState *node)
Node * MultiExecHash(HashState *node)
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define pg_nextpower2_size_t(num)
static uint32 pg_rotate_left32(uint32 word, int n)
static uint32 pg_nextpower2_32(uint32 num)
static uint32 pg_rotate_right32(uint32 word, int n)
#define pg_prevpower2_size_t(num)
static int list_length(const List *l)
#define forboth(cell1, list1, cell2, list2)
#define ObjectIdGetDatum(X)
#define DatumGetUInt32(X)
SharedTuplestoreAccessor * sts_initialize(SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
void sts_end_write(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * sts_attach(SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
#define SHARED_TUPLESTORE_SINGLE_PASS
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_estimate_keys(e, cnt)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
MemoryContext ecxt_per_tuple_memory
TupleTableSlot * ecxt_innertuple
TupleTableSlot * ecxt_outertuple
HashJoinTuple hj_CurTuple
HashJoinTable hj_HashTable
TupleTableSlot * hj_HashTupleSlot
struct HashJoinTupleData ** unshared
FmgrInfo * outer_hashfunctions
ParallelHashJoinBatchAccessor * batches
union HashJoinTableData::@101 buckets
ParallelHashJoinState * parallel_state
HashMemoryChunk current_chunk
BufFile ** innerBatchFile
int log2_nbuckets_optimal
dsa_pointer_atomic * shared
BufFile ** outerBatchFile
FmgrInfo * inner_hashfunctions
dsa_pointer current_chunk_shared
HashSkewBucket ** skewBucket
union HashJoinTupleData::@99 next
struct HashJoinTupleData * unshared
struct HashMemoryChunkData * unshared
union HashMemoryChunkData::@100 next
struct ParallelHashJoinState * parallel_state
SharedHashInfo * shared_info
HashInstrumentation * hinstrument
shm_toc_estimator estimator
SharedTuplestoreAccessor * outer_tuples
ParallelHashJoinBatch * shared
SharedTuplestoreAccessor * inner_tuples
Barrier grow_batches_barrier
dsa_pointer chunk_work_queue
Barrier grow_buckets_barrier
ParallelHashGrowth growth
Instrumentation * instrument
ExprContext * ps_ExprContext
ProjectionInfo * ps_ProjInfo
struct PlanState * lefttree
ExecProcNodeMtd ExecProcNode
HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER]
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache3(int cacheId, Datum key1, Datum key2, Datum key3)
@ WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATE
@ WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT
@ WAIT_EVENT_HASH_BUILD_ELECT
@ WAIT_EVENT_HASH_BUILD_HASH_INNER
@ WAIT_EVENT_HASH_GROW_BATCHES_DECIDE
@ WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATE
@ WAIT_EVENT_HASH_GROW_BATCHES_FINISH
@ WAIT_EVENT_HASH_GROW_BUCKETS_ELECT
@ WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION
@ WAIT_EVENT_HASH_BUILD_ALLOCATE
@ WAIT_EVENT_HASH_GROW_BATCHES_ELECT