93 elog(
ERROR,
"Hash node does not support ExecProcNode call convention");
314 WAIT_EVENT_HASH_BUILD_HASH_INNER))
437 size_t space_allowed;
466 state->parallel_state != NULL,
467 state->parallel_state != NULL ?
468 state->parallel_state->nparticipants - 1 : 0,
470 &nbuckets, &nbatch, &num_skew_mcvs);
473 log2_nbuckets =
my_log2(nbuckets);
474 Assert(nbuckets == (1 << log2_nbuckets));
496 hashtable->
nbatch = nbatch;
515 hashtable->
area =
state->ps.state->es_query_dsa;
519 printf(
"Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
520 hashtable, nbatch, nbuckets);
553 forboth(ho, hashOperators, hc, hashCollations)
560 elog(
ERROR,
"could not find hash function for hash operator %u",
672 #define NTUP_PER_BUCKET 1
676 bool try_combined_hash_mem,
677 int parallel_workers,
678 size_t *space_allowed,
684 double inner_rel_bytes;
685 size_t hash_table_bytes;
704 inner_rel_bytes = ntuples * tupsize;
716 if (try_combined_hash_mem)
721 newlimit = (double) hash_table_bytes * (
double) (parallel_workers + 1);
722 newlimit =
Min(newlimit, (
double) SIZE_MAX);
723 hash_table_bytes = (size_t) newlimit;
726 *space_allowed = hash_table_bytes;
744 size_t bytes_per_mcv;
757 bytes_per_mcv = tupsize +
761 skew_mcvs = hash_table_bytes / bytes_per_mcv;
770 skew_mcvs =
Min(skew_mcvs, INT_MAX);
772 *num_skew_mcvs = (int) skew_mcvs;
776 hash_table_bytes -= skew_mcvs * bytes_per_mcv;
797 max_pointers =
Min(max_pointers, INT_MAX / 2 + 1);
800 dbuckets =
Min(dbuckets, max_pointers);
801 nbuckets = (int) dbuckets;
803 nbuckets =
Max(nbuckets, 1024);
812 if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
824 if (try_combined_hash_mem)
827 false, parallel_workers,
842 if (hash_table_bytes <= bucket_size)
846 sbuckets =
Min(sbuckets, max_pointers);
847 nbuckets = (int) sbuckets;
859 Assert(bucket_bytes <= hash_table_bytes / 2);
862 dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
863 dbatch =
Min(dbatch, max_pointers);
864 minbatch = (int) dbatch;
871 *numbuckets = nbuckets;
872 *numbatches = nbatch;
918 int oldnbatch = hashtable->
nbatch;
933 nbatch = oldnbatch * 2;
937 printf(
"Hashjoin %p: increasing nbatch to %d because space = %zu\n",
938 hashtable, nbatch, hashtable->
spaceUsed);
961 hashtable->
nbatch = nbatch;
967 ninmemory = nfreed = 0;
990 oldchunks = hashtable->
chunks;
994 while (oldchunks != NULL)
1002 while (idx < oldchunks->used)
1012 &bucketno, &batchno);
1014 if (batchno == curbatch)
1020 memcpy(copyTuple, hashTuple, hashTupleSize);
1029 Assert(batchno > curbatch);
1048 oldchunks = nextchunk;
1052 printf(
"Hashjoin %p: freed %ld of %ld tuples, space now %zu\n",
1053 hashtable, nfreed, ninmemory, hashtable->
spaceUsed);
1064 if (nfreed == 0 || nfreed == ninmemory)
1068 printf(
"Hashjoin %p: disabling further increase of nbatch\n",
1102 WAIT_EVENT_HASH_GROW_BATCHES_ELECT))
1119 if (hashtable->
nbatch == 1)
1143 new_nbatch = hashtable->
nbatch * 2;
1169 dtuples = (old_batch0->
ntuples * 2.0) / new_nbatch;
1180 dbuckets =
Min(dbuckets, max_buckets);
1181 new_nbuckets = (int) dbuckets;
1182 new_nbuckets =
Max(new_nbuckets, 1024);
1191 for (
i = 0;
i < new_nbuckets; ++
i)
1221 WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE);
1234 WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION);
1245 WAIT_EVENT_HASH_GROW_BATCHES_DECIDE))
1247 bool space_exhausted =
false;
1248 bool extreme_skew_detected =
false;
1255 for (
int i = 0;
i < hashtable->
nbatch; ++
i)
1264 space_exhausted =
true;
1274 extreme_skew_detected =
true;
1279 if (extreme_skew_detected || hashtable->
nbatch >= INT_MAX / 2)
1281 else if (space_exhausted)
1295 WAIT_EVENT_HASH_GROW_BATCHES_FINISH);
1317 while (idx < chunk->used)
1327 &bucketno, &batchno);
1329 Assert(batchno < hashtable->nbatch);
1384 for (
i = 1;
i < old_nbatch; ++
i)
1395 for (
i = 1;
i < old_nbatch; ++
i)
1425 pfree(old_inner_tuples);
1471 printf(
"Hashjoin %p: increasing nbuckets %d => %d\n",
1501 while (idx < chunk->used)
1508 &bucketno, &batchno);
1544 WAIT_EVENT_HASH_GROW_BUCKETS_ELECT))
1573 WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE);
1584 while (idx < chunk->used)
1592 &bucketno, &batchno);
1608 WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT);
1634 &bucketno, &batchno);
1639 if (batchno == hashtable->
curbatch)
1672 if (hashtable->
nbatch == 1 &&
1737 if (hashTuple == NULL)
1850 foreach(hk, hashkeys)
1900 *hashvalue = hashkey;
1942 *bucketno = hashvalue & (nbuckets - 1);
1948 *bucketno = hashvalue & (nbuckets - 1);
1979 if (hashTuple != NULL)
1986 while (hashTuple != NULL)
2037 if (hashTuple != NULL)
2043 while (hashTuple != NULL)
2100 int curbatch = hashtable->
curbatch;
2174 if (hashTuple != NULL)
2191 while (hashTuple != NULL)
2249 if (hashTuple != NULL)
2257 while (hashTuple != NULL)
2302 int nbuckets = hashtable->
nbuckets;
2319 hashtable->
chunks = NULL;
2406 if (mcvsToUse > sslot.
nvalues)
2415 for (
i = 0;
i < mcvsToUse;
i++)
2453 mcvsToUse *
sizeof(
int));
2456 + mcvsToUse *
sizeof(
int);
2458 + mcvsToUse *
sizeof(
int);
2473 for (
i = 0;
i < mcvsToUse;
i++)
2488 bucket = hashvalue & (nbuckets - 1);
2489 while (hashtable->
skewBucket[bucket] != NULL &&
2491 bucket = (bucket + 1) & (nbuckets - 1);
2549 while (hashtable->
skewBucket[bucket] != NULL &&
2631 bucket = hashtable->
skewBucket[bucketToRemove];
2643 hashTuple = bucket->
tuples;
2644 while (hashTuple != NULL)
2659 if (batchno == hashtable->
curbatch)
2669 memcpy(copyTuple, hashTuple, tupleSize);
2690 hashTuple = nextHashTuple;
2708 hashtable->
skewBucket[bucketToRemove] = NULL;
2824 if (shared_info == NULL)
2893 if (hashtable->
chunks != NULL)
2901 hashtable->
chunks = newChunk;
2911 if ((hashtable->
chunks == NULL) ||
2923 hashtable->
chunks = newChunk;
2957 int curbatch = hashtable->
curbatch;
2966 if (
chunk != NULL &&
3036 if (hashtable->
nbatch == 1)
3043 hashtable->
nbuckets < (INT_MAX / 2) &&
3120 hashtable->
nbatch = nbatch;
3146 accessor->
shared = shared;
3205 if (hashtable->
batches != NULL)
3240 accessor->
shared = shared;
3242 accessor->
done =
false;
3273 for (
i = 0;
i < nbuckets; ++
i)
3287 int curbatch = hashtable->
curbatch;
3289 bool attached =
true;
3541 Assert(batchno < hashtable->nbatch);
3603 mem_limit =
Min(mem_limit, (
double) SIZE_MAX);
3605 return (
size_t) mem_limit;
Datum idx(PG_FUNCTION_ARGS)
void PrepareTempTablespaces(void)
bool BarrierArriveAndDetachExceptLast(Barrier *barrier)
bool BarrierArriveAndDetach(Barrier *barrier)
int BarrierAttach(Barrier *barrier)
void BarrierInit(Barrier *barrier, int participants)
int BarrierPhase(Barrier *barrier)
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
bool BarrierDetach(Barrier *barrier)
void BufFileClose(BufFile *file)
#define Assert(condition)
#define OidIsValid(objectId)
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
void dsa_free(dsa_area *area, dsa_pointer dp)
#define dsa_allocate0(area, size)
#define dsa_pointer_atomic_init
#define dsa_allocate(area, size)
#define dsa_pointer_atomic_write
#define InvalidDsaPointer
#define dsa_pointer_atomic_compare_exchange
#define dsa_pointer_atomic_read
pg_atomic_uint64 dsa_pointer_atomic
#define DsaPointerIsValid(x)
void ExecReScan(PlanState *node)
List * ExecInitExprList(List *nodes, PlanState *parent)
void ExecEndNode(PlanState *node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
void ExecAssignExprContext(EState *estate, PlanState *planstate)
#define outerPlanState(node)
struct HashJoinTupleData * HashJoinTuple
struct HashInstrumentation HashInstrumentation
#define EXEC_FLAG_BACKWARD
#define ResetExprContext(econtext)
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
static TupleTableSlot * ExecProcNode(PlanState *node)
#define palloc_object(type)
#define repalloc_array(pointer, type, count)
#define palloc_array(type, count)
#define palloc0_array(type, count)
#define palloc0_object(type)
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
double hash_mem_multiplier
#define PHJ_GROW_BATCHES_REPARTITION
struct HashMemoryChunkData * HashMemoryChunk
#define HASH_CHUNK_DATA(hc)
#define PHJ_GROW_BUCKETS_REINSERT
#define SKEW_MIN_OUTER_FRACTION
#define PHJ_GROW_BUCKETS_ELECT
#define PHJ_GROW_BUCKETS_PHASE(n)
#define PHJ_GROW_BATCHES_ELECT
#define ParallelHashJoinBatchInner(batch)
#define PHJ_BUILD_HASH_INNER
#define NthParallelHashJoinBatch(base, n)
#define HASH_CHUNK_THRESHOLD
#define PHJ_BUILD_HASH_OUTER
#define HJTUPLE_MINTUPLE(hjtup)
#define SKEW_BUCKET_OVERHEAD
#define PHJ_GROW_BATCHES_DECIDE
#define PHJ_GROW_BATCHES_REALLOCATE
#define HASH_CHUNK_HEADER_SIZE
#define PHJ_GROW_BATCHES_FINISH
#define ParallelHashJoinBatchOuter(batch, nparticipants)
#define SKEW_HASH_MEM_PERCENT
#define PHJ_BUILD_ALLOCATE
#define PHJ_GROW_BUCKETS_REALLOCATE
#define PHJ_GROW_BATCHES_PHASE(n)
@ PHJ_GROWTH_NEED_MORE_BUCKETS
@ PHJ_GROWTH_NEED_MORE_BATCHES
#define INVALID_SKEW_BUCKET_NO
#define EstimateParallelHashJoinBatch(hashtable)
void heap_free_minimal_tuple(MinimalTuple mtup)
#define HeapTupleIsValid(tuple)
#define HeapTupleHeaderHasMatch(tup)
#define SizeofMinimalTupleHeader
#define HeapTupleHeaderClearMatch(tup)
void InstrStartNode(Instrumentation *instr)
void InstrStopNode(Instrumentation *instr, double nTuples)
if(TABLE==NULL||TABLE_index==NULL)
void free_attstatsslot(AttStatsSlot *sslot)
bool get_op_hash_functions(Oid opno, RegProcedure *lhs_procno, RegProcedure *rhs_procno)
bool get_attstatsslot(AttStatsSlot *sslot, HeapTuple statstuple, int reqkind, Oid reqop, int flags)
#define ATTSTATSSLOT_NUMBERS
#define ATTSTATSSLOT_VALUES
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void MemoryContextReset(MemoryContext context)
void pfree(void *pointer)
void * MemoryContextAllocZero(MemoryContext context, Size size)
MemoryContext CurrentMemoryContext
void * MemoryContextAlloc(MemoryContext context, Size size)
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
void ExecParallelHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
void ExecHashTableReset(HashJoinTable hashtable)
bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, bool outer_tuple, bool keep_nulls, uint32 *hashvalue)
static HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno)
void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
static void * dense_alloc(HashJoinTable hashtable, Size size)
static void MultiExecParallelHash(HashState *node)
void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
static void MultiExecPrivateHash(HashState *node)
void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
void ExecHashTableDetachBatch(HashJoinTable hashtable)
void ExecHashEstimate(HashState *node, ParallelContext *pcxt)
HashState * ExecInitHash(Hash *node, EState *estate, int eflags)
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
void ExecHashTableDetach(HashJoinTable hashtable)
bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
void ExecHashTableDestroy(HashJoinTable hashtable)
int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
size_t get_hash_memory_limit(void)
bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
static void ExecHashSkewTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue, int bucketNumber)
static void ExecParallelHashRepartitionRest(HashJoinTable hashtable)
static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
void ExecHashTableResetMatchFlags(HashJoinTable hashtable)
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable)
static HashJoinTuple ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple)
void ExecEndHash(HashState *node)
void ExecShutdownHash(HashState *node)
void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
static void ExecParallelHashMergeCounters(HashJoinTable hashtable)
static TupleTableSlot * ExecHash(PlanState *pstate)
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
HashJoinTable ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, bool keepNulls)
bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared)
void ExecReScanHash(HashState *node)
bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable)
static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
void ExecHashRetrieveInstrumentation(HashState *node)
Node * MultiExecHash(HashState *node)
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr, HashJoinTable hashtable)
#define repalloc0_array(pointer, type, oldcount, count)
static uint32 pg_rotate_left32(uint32 word, int n)
static uint32 pg_nextpower2_32(uint32 num)
static uint32 pg_rotate_right32(uint32 word, int n)
#define pg_nextpower2_size_t
static uint32 pg_prevpower2_32(uint32 num)
#define pg_prevpower2_size_t
static int list_length(const List *l)
#define forboth(cell1, list1, cell2, list2)
static uint32 DatumGetUInt32(Datum X)
static Datum Int16GetDatum(int16 X)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
MemoryContextSwitchTo(old_ctx)
SharedTuplestoreAccessor * sts_initialize(SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
void sts_end_write(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * sts_attach(SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
#define SHARED_TUPLESTORE_SINGLE_PASS
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_estimate_keys(e, cnt)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
static pg_noinline void Size size
MemoryContext ecxt_per_tuple_memory
TupleTableSlot * ecxt_innertuple
TupleTableSlot * ecxt_outertuple
HashJoinTuple hj_CurTuple
HashJoinTable hj_HashTable
TupleTableSlot * hj_HashTupleSlot
struct HashJoinTupleData ** unshared
FmgrInfo * outer_hashfunctions
ParallelHashJoinBatchAccessor * batches
ParallelHashJoinState * parallel_state
HashMemoryChunk current_chunk
BufFile ** innerBatchFile
int log2_nbuckets_optimal
dsa_pointer_atomic * shared
union HashJoinTableData::@104 buckets
BufFile ** outerBatchFile
FmgrInfo * inner_hashfunctions
dsa_pointer current_chunk_shared
HashSkewBucket ** skewBucket
union HashJoinTupleData::@102 next
struct HashJoinTupleData * unshared
struct HashMemoryChunkData * unshared
union HashMemoryChunkData::@103 next
struct ParallelHashJoinState * parallel_state
SharedHashInfo * shared_info
HashInstrumentation * hinstrument
shm_toc_estimator estimator
SharedTuplestoreAccessor * outer_tuples
ParallelHashJoinBatch * shared
SharedTuplestoreAccessor * inner_tuples
Barrier grow_batches_barrier
dsa_pointer chunk_work_queue
Barrier grow_buckets_barrier
ParallelHashGrowth growth
Instrumentation * instrument
ExprContext * ps_ExprContext
ProjectionInfo * ps_ProjInfo
ExecProcNodeMtd ExecProcNode
HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER]
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache3(int cacheId, Datum key1, Datum key2, Datum key3)