93 elog(
ERROR,
"Hash node does not support ExecProcNode call convention");
324 WAIT_EVENT_HASH_BUILD_HASH_INNER))
451 size_t space_allowed;
476 state->parallel_state != NULL,
477 state->parallel_state != NULL ?
478 state->parallel_state->nparticipants - 1 : 0,
480 &nbuckets, &nbatch, &num_skew_mcvs);
483 log2_nbuckets =
my_log2(nbuckets);
484 Assert(nbuckets == (1 << log2_nbuckets));
505 hashtable->
nbatch = nbatch;
524 hashtable->
area =
state->ps.state->es_query_dsa;
528 printf(
"Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
529 hashtable, nbatch, nbuckets);
655#define NTUP_PER_BUCKET 1
659 bool try_combined_hash_mem,
660 int parallel_workers,
661 size_t *space_allowed,
667 double inner_rel_bytes;
668 size_t hash_table_bytes;
687 inner_rel_bytes = ntuples * tupsize;
699 if (try_combined_hash_mem)
704 newlimit = (double) hash_table_bytes * (
double) (parallel_workers + 1);
705 newlimit =
Min(newlimit, (
double) SIZE_MAX);
706 hash_table_bytes = (size_t) newlimit;
709 *space_allowed = hash_table_bytes;
727 size_t bytes_per_mcv;
740 bytes_per_mcv = tupsize +
744 skew_mcvs = hash_table_bytes / bytes_per_mcv;
753 skew_mcvs =
Min(skew_mcvs, INT_MAX);
755 *num_skew_mcvs = (int) skew_mcvs;
759 hash_table_bytes -= skew_mcvs * bytes_per_mcv;
780 max_pointers =
Min(max_pointers, INT_MAX / 2 + 1);
783 dbuckets =
Min(dbuckets, max_pointers);
784 nbuckets = (int) dbuckets;
786 nbuckets =
Max(nbuckets, 1024);
795 if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
807 if (try_combined_hash_mem)
810 false, parallel_workers,
825 if (hash_table_bytes <= bucket_size)
829 sbuckets =
Min(sbuckets, max_pointers);
830 nbuckets = (int) sbuckets;
842 Assert(bucket_bytes <= hash_table_bytes / 2);
845 dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
846 dbatch =
Min(dbatch, max_pointers);
847 minbatch = (int) dbatch;
854 *numbuckets = nbuckets;
855 *numbatches = nbatch;
901 int oldnbatch = hashtable->
nbatch;
916 nbatch = oldnbatch * 2;
920 printf(
"Hashjoin %p: increasing nbatch to %d because space = %zu\n",
921 hashtable, nbatch, hashtable->
spaceUsed);
944 hashtable->
nbatch = nbatch;
950 ninmemory = nfreed = 0;
973 oldchunks = hashtable->
chunks;
977 while (oldchunks != NULL)
985 while (idx < oldchunks->used)
995 &bucketno, &batchno);
997 if (batchno == curbatch)
1003 memcpy(copyTuple, hashTuple, hashTupleSize);
1012 Assert(batchno > curbatch);
1031 oldchunks = nextchunk;
1035 printf(
"Hashjoin %p: freed %ld of %ld tuples, space now %zu\n",
1036 hashtable, nfreed, ninmemory, hashtable->
spaceUsed);
1047 if (nfreed == 0 || nfreed == ninmemory)
1051 printf(
"Hashjoin %p: disabling further increase of nbatch\n",
1085 WAIT_EVENT_HASH_GROW_BATCHES_ELECT))
1102 if (hashtable->
nbatch == 1)
1126 new_nbatch = hashtable->
nbatch * 2;
1152 dtuples = (old_batch0->
ntuples * 2.0) / new_nbatch;
1163 dbuckets =
Min(dbuckets, max_buckets);
1164 new_nbuckets = (int) dbuckets;
1165 new_nbuckets =
Max(new_nbuckets, 1024);
1174 for (
i = 0;
i < new_nbuckets; ++
i)
1204 WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE);
1217 WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION);
1228 WAIT_EVENT_HASH_GROW_BATCHES_DECIDE))
1231 bool space_exhausted =
false;
1232 bool extreme_skew_detected =
false;
1241 for (
int i = 0;
i < hashtable->
nbatch; ++
i)
1250 space_exhausted =
true;
1264 extreme_skew_detected =
true;
1269 if (extreme_skew_detected || hashtable->
nbatch >= INT_MAX / 2)
1271 else if (space_exhausted)
1285 WAIT_EVENT_HASH_GROW_BATCHES_FINISH);
1307 while (idx < chunk->used)
1317 &bucketno, &batchno);
1319 Assert(batchno < hashtable->nbatch);
1374 for (
i = 1;
i < old_nbatch; ++
i)
1385 for (
i = 1;
i < old_nbatch; ++
i)
1415 pfree(old_inner_tuples);
1461 printf(
"Hashjoin %p: increasing nbuckets %d => %d\n",
1491 while (idx < chunk->used)
1498 &bucketno, &batchno);
1534 WAIT_EVENT_HASH_GROW_BUCKETS_ELECT))
1563 WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE);
1574 while (idx < chunk->used)
1582 &bucketno, &batchno);
1598 WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT);
1624 &bucketno, &batchno);
1629 if (batchno == hashtable->
curbatch)
1662 if (hashtable->
nbatch == 1 &&
1727 if (hashTuple == NULL)
1835 *bucketno = hashvalue & (nbuckets - 1);
1841 *bucketno = hashvalue & (nbuckets - 1);
1872 if (hashTuple != NULL)
1879 while (hashTuple != NULL)
1930 if (hashTuple != NULL)
1936 while (hashTuple != NULL)
1993 int curbatch = hashtable->
curbatch;
2067 if (hashTuple != NULL)
2084 while (hashTuple != NULL)
2142 if (hashTuple != NULL)
2150 while (hashTuple != NULL)
2195 int nbuckets = hashtable->
nbuckets;
2212 hashtable->
chunks = NULL;
2269 Hash *node,
int mcvsToUse)
2299 if (mcvsToUse > sslot.
nvalues)
2308 for (
i = 0;
i < mcvsToUse;
i++)
2346 mcvsToUse *
sizeof(
int));
2349 + mcvsToUse *
sizeof(
int);
2351 + mcvsToUse *
sizeof(
int);
2365 for (
i = 0;
i < mcvsToUse;
i++)
2380 bucket = hashvalue & (nbuckets - 1);
2381 while (hashtable->
skewBucket[bucket] != NULL &&
2383 bucket = (bucket + 1) & (nbuckets - 1);
2441 while (hashtable->
skewBucket[bucket] != NULL &&
2523 bucket = hashtable->
skewBucket[bucketToRemove];
2535 hashTuple = bucket->
tuples;
2536 while (hashTuple != NULL)
2551 if (batchno == hashtable->
curbatch)
2561 memcpy(copyTuple, hashTuple, tupleSize);
2582 hashTuple = nextHashTuple;
2600 hashtable->
skewBucket[bucketToRemove] = NULL;
2716 if (shared_info == NULL)
2785 if (hashtable->
chunks != NULL)
2793 hashtable->
chunks = newChunk;
2803 if ((hashtable->
chunks == NULL) ||
2815 hashtable->
chunks = newChunk;
2849 int curbatch = hashtable->
curbatch;
2858 if (
chunk != NULL &&
2928 if (hashtable->
nbatch == 1)
2935 hashtable->
nbuckets < (INT_MAX / 2) &&
3012 hashtable->
nbatch = nbatch;
3038 accessor->
shared = shared;
3097 if (hashtable->
batches != NULL)
3132 accessor->
shared = shared;
3134 accessor->
done =
false;
3165 for (
i = 0;
i < nbuckets; ++
i)
3179 int curbatch = hashtable->
curbatch;
3181 bool attached =
true;
3433 Assert(batchno < hashtable->nbatch);
3495 mem_limit =
Min(mem_limit, (
double) SIZE_MAX);
3497 return (
size_t) mem_limit;
Datum idx(PG_FUNCTION_ARGS)
void PrepareTempTablespaces(void)
bool BarrierArriveAndDetachExceptLast(Barrier *barrier)
bool BarrierArriveAndDetach(Barrier *barrier)
int BarrierAttach(Barrier *barrier)
void BarrierInit(Barrier *barrier, int participants)
int BarrierPhase(Barrier *barrier)
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
bool BarrierDetach(Barrier *barrier)
void BufFileClose(BufFile *file)
#define Assert(condition)
#define OidIsValid(objectId)
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
void dsa_free(dsa_area *area, dsa_pointer dp)
#define dsa_allocate0(area, size)
#define dsa_pointer_atomic_init
#define dsa_allocate(area, size)
#define dsa_pointer_atomic_write
#define InvalidDsaPointer
#define dsa_pointer_atomic_compare_exchange
#define dsa_pointer_atomic_read
pg_atomic_uint64 dsa_pointer_atomic
#define DsaPointerIsValid(x)
void ExecReScan(PlanState *node)
void ExecEndNode(PlanState *node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
void ExecAssignExprContext(EState *estate, PlanState *planstate)
#define outerPlanState(node)
struct HashJoinTupleData * HashJoinTuple
struct HashInstrumentation HashInstrumentation
#define EXEC_FLAG_BACKWARD
#define ResetExprContext(econtext)
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
static TupleTableSlot * ExecProcNode(PlanState *node)
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
#define palloc_object(type)
#define repalloc_array(pointer, type, count)
#define palloc0_array(type, count)
#define palloc0_object(type)
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
double hash_mem_multiplier
#define PHJ_GROW_BATCHES_REPARTITION
struct HashMemoryChunkData * HashMemoryChunk
#define HASH_CHUNK_DATA(hc)
#define PHJ_GROW_BUCKETS_REINSERT
#define SKEW_MIN_OUTER_FRACTION
#define PHJ_GROW_BUCKETS_ELECT
#define PHJ_GROW_BUCKETS_PHASE(n)
#define PHJ_GROW_BATCHES_ELECT
#define ParallelHashJoinBatchInner(batch)
#define PHJ_BUILD_HASH_INNER
#define NthParallelHashJoinBatch(base, n)
#define HASH_CHUNK_THRESHOLD
#define PHJ_BUILD_HASH_OUTER
#define HJTUPLE_MINTUPLE(hjtup)
#define SKEW_BUCKET_OVERHEAD
#define PHJ_GROW_BATCHES_DECIDE
#define PHJ_GROW_BATCHES_REALLOCATE
#define HASH_CHUNK_HEADER_SIZE
#define PHJ_GROW_BATCHES_FINISH
#define ParallelHashJoinBatchOuter(batch, nparticipants)
#define SKEW_HASH_MEM_PERCENT
#define PHJ_BUILD_ALLOCATE
#define PHJ_GROW_BUCKETS_REALLOCATE
#define PHJ_GROW_BATCHES_PHASE(n)
@ PHJ_GROWTH_NEED_MORE_BUCKETS
@ PHJ_GROWTH_NEED_MORE_BATCHES
#define INVALID_SKEW_BUCKET_NO
#define EstimateParallelHashJoinBatch(hashtable)
void heap_free_minimal_tuple(MinimalTuple mtup)
#define HeapTupleIsValid(tuple)
#define HeapTupleHeaderHasMatch(tup)
#define SizeofMinimalTupleHeader
#define HeapTupleHeaderClearMatch(tup)
void InstrStartNode(Instrumentation *instr)
void InstrStopNode(Instrumentation *instr, double nTuples)
if(TABLE==NULL||TABLE_index==NULL)
void free_attstatsslot(AttStatsSlot *sslot)
bool get_attstatsslot(AttStatsSlot *sslot, HeapTuple statstuple, int reqkind, Oid reqop, int flags)
#define ATTSTATSSLOT_NUMBERS
#define ATTSTATSSLOT_VALUES
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void * MemoryContextAlloc(MemoryContext context, Size size)
void MemoryContextReset(MemoryContext context)
void * MemoryContextAllocZero(MemoryContext context, Size size)
void pfree(void *pointer)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
void ExecParallelHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
void ExecHashTableReset(HashJoinTable hashtable)
static void ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable, Hash *node, int mcvsToUse)
static HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno)
void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
static void * dense_alloc(HashJoinTable hashtable, Size size)
static void MultiExecParallelHash(HashState *node)
void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
static void MultiExecPrivateHash(HashState *node)
void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
Node * MultiExecHash(HashState *node)
HashState * ExecInitHash(Hash *node, EState *estate, int eflags)
void ExecHashTableDetachBatch(HashJoinTable hashtable)
void ExecHashEstimate(HashState *node, ParallelContext *pcxt)
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
void ExecHashTableDetach(HashJoinTable hashtable)
bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
void ExecHashTableDestroy(HashJoinTable hashtable)
HashJoinTable ExecHashTableCreate(HashState *state)
int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
size_t get_hash_memory_limit(void)
bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
static void ExecHashSkewTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue, int bucketNumber)
static void ExecParallelHashRepartitionRest(HashJoinTable hashtable)
static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
void ExecHashTableResetMatchFlags(HashJoinTable hashtable)
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable)
static HashJoinTuple ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple)
void ExecEndHash(HashState *node)
void ExecShutdownHash(HashState *node)
void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
static TupleTableSlot * ExecHash(PlanState *pstate)
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
static void ExecParallelHashMergeCounters(HashJoinTable hashtable)
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared)
void ExecReScanHash(HashState *node)
bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable)
static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
void ExecHashRetrieveInstrumentation(HashState *node)
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr, HashJoinTable hashtable)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define repalloc0_array(pointer, type, oldcount, count)
static uint32 pg_nextpower2_32(uint32 num)
static uint32 pg_rotate_right32(uint32 word, int n)
#define pg_nextpower2_size_t
static uint32 pg_prevpower2_32(uint32 num)
#define pg_prevpower2_size_t
static uint32 DatumGetUInt32(Datum X)
static Datum Int16GetDatum(int16 X)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
SharedTuplestoreAccessor * sts_attach(SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
void sts_end_write(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * sts_initialize(SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
#define SHARED_TUPLESTORE_SINGLE_PASS
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_estimate_keys(e, cnt)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
static pg_noinline void Size size
TupleTableSlot * ecxt_innertuple
TupleTableSlot * ecxt_outertuple
HashJoinTuple hj_CurTuple
HashJoinTable hj_HashTable
TupleTableSlot * hj_HashTupleSlot
struct HashJoinTupleData ** unshared
ParallelHashJoinBatchAccessor * batches
union HashJoinTableData::@107 buckets
ParallelHashJoinState * parallel_state
HashMemoryChunk current_chunk
BufFile ** innerBatchFile
int log2_nbuckets_optimal
dsa_pointer_atomic * shared
BufFile ** outerBatchFile
dsa_pointer current_chunk_shared
HashSkewBucket ** skewBucket
union HashJoinTupleData::@105 next
struct HashJoinTupleData * unshared
union HashMemoryChunkData::@106 next
struct HashMemoryChunkData * unshared
struct ParallelHashJoinState * parallel_state
SharedHashInfo * shared_info
FmgrInfo * skew_hashfunction
HashInstrumentation * hinstrument
shm_toc_estimator estimator
SharedTuplestoreAccessor * outer_tuples
ParallelHashJoinBatch * shared
SharedTuplestoreAccessor * inner_tuples
Barrier grow_batches_barrier
dsa_pointer chunk_work_queue
Barrier grow_buckets_barrier
ParallelHashGrowth growth
Instrumentation * instrument
ExprContext * ps_ExprContext
ProjectionInfo * ps_ProjInfo
ExecProcNodeMtd ExecProcNode
HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER]
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache3(int cacheId, Datum key1, Datum key2, Datum key3)