94 elog(
ERROR,
"Hash node does not support ExecProcNode call convention");
315 WAIT_EVENT_HASH_BUILD_HASH_INNER))
443 size_t space_allowed;
472 state->parallel_state != NULL,
473 state->parallel_state != NULL ?
474 state->parallel_state->nparticipants - 1 : 0,
476 &nbuckets, &nbatch, &num_skew_mcvs);
479 log2_nbuckets =
my_log2(nbuckets);
480 Assert(nbuckets == (1 << log2_nbuckets));
502 hashtable->
nbatch = nbatch;
521 hashtable->
area =
state->ps.state->es_query_dsa;
525 printf(
"Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
526 hashtable, nbatch, nbuckets);
559 forboth(ho, hashOperators, hc, hashCollations)
566 elog(
ERROR,
"could not find hash function for hash operator %u",
678 #define NTUP_PER_BUCKET 1
682 bool try_combined_hash_mem,
683 int parallel_workers,
684 size_t *space_allowed,
690 double inner_rel_bytes;
691 size_t hash_table_bytes;
710 inner_rel_bytes = ntuples * tupsize;
722 if (try_combined_hash_mem)
727 newlimit = (double) hash_table_bytes * (
double) (parallel_workers + 1);
728 newlimit =
Min(newlimit, (
double) SIZE_MAX);
729 hash_table_bytes = (size_t) newlimit;
732 *space_allowed = hash_table_bytes;
750 size_t bytes_per_mcv;
763 bytes_per_mcv = tupsize +
767 skew_mcvs = hash_table_bytes / bytes_per_mcv;
776 skew_mcvs =
Min(skew_mcvs, INT_MAX);
778 *num_skew_mcvs = (int) skew_mcvs;
782 hash_table_bytes -= skew_mcvs * bytes_per_mcv;
803 max_pointers =
Min(max_pointers, INT_MAX / 2 + 1);
806 dbuckets =
Min(dbuckets, max_pointers);
807 nbuckets = (int) dbuckets;
809 nbuckets =
Max(nbuckets, 1024);
818 if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
830 if (try_combined_hash_mem)
833 false, parallel_workers,
848 if (hash_table_bytes <= bucket_size)
852 sbuckets =
Min(sbuckets, max_pointers);
853 nbuckets = (int) sbuckets;
865 Assert(bucket_bytes <= hash_table_bytes / 2);
868 dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
869 dbatch =
Min(dbatch, max_pointers);
870 minbatch = (int) dbatch;
877 *numbuckets = nbuckets;
878 *numbatches = nbatch;
924 int oldnbatch = hashtable->
nbatch;
939 nbatch = oldnbatch * 2;
943 printf(
"Hashjoin %p: increasing nbatch to %d because space = %zu\n",
944 hashtable, nbatch, hashtable->
spaceUsed);
967 hashtable->
nbatch = nbatch;
973 ninmemory = nfreed = 0;
996 oldchunks = hashtable->
chunks;
1000 while (oldchunks != NULL)
1008 while (idx < oldchunks->used)
1018 &bucketno, &batchno);
1020 if (batchno == curbatch)
1026 memcpy(copyTuple, hashTuple, hashTupleSize);
1035 Assert(batchno > curbatch);
1054 oldchunks = nextchunk;
1058 printf(
"Hashjoin %p: freed %ld of %ld tuples, space now %zu\n",
1059 hashtable, nfreed, ninmemory, hashtable->
spaceUsed);
1070 if (nfreed == 0 || nfreed == ninmemory)
1074 printf(
"Hashjoin %p: disabling further increase of nbatch\n",
1108 WAIT_EVENT_HASH_GROW_BATCHES_ELECT))
1125 if (hashtable->
nbatch == 1)
1149 new_nbatch = hashtable->
nbatch * 2;
1174 dtuples = (old_batch0->
ntuples * 2.0) / new_nbatch;
1176 dbuckets =
Min(dbuckets,
1178 new_nbuckets = (int) dbuckets;
1179 new_nbuckets =
Max(new_nbuckets, 1024);
1188 for (
i = 0;
i < new_nbuckets; ++
i)
1218 WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE);
1231 WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION);
1242 WAIT_EVENT_HASH_GROW_BATCHES_DECIDE))
1244 bool space_exhausted =
false;
1245 bool extreme_skew_detected =
false;
1252 for (
int i = 0;
i < hashtable->
nbatch; ++
i)
1261 space_exhausted =
true;
1271 extreme_skew_detected =
true;
1276 if (extreme_skew_detected || hashtable->
nbatch >= INT_MAX / 2)
1278 else if (space_exhausted)
1292 WAIT_EVENT_HASH_GROW_BATCHES_FINISH);
1314 while (idx < chunk->used)
1324 &bucketno, &batchno);
1326 Assert(batchno < hashtable->nbatch);
1381 for (
i = 1;
i < old_nbatch; ++
i)
1392 for (
i = 1;
i < old_nbatch; ++
i)
1422 pfree(old_inner_tuples);
1468 printf(
"Hashjoin %p: increasing nbuckets %d => %d\n",
1498 while (idx < chunk->used)
1505 &bucketno, &batchno);
1541 WAIT_EVENT_HASH_GROW_BUCKETS_ELECT))
1570 WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE);
1581 while (idx < chunk->used)
1589 &bucketno, &batchno);
1605 WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT);
1631 &bucketno, &batchno);
1636 if (batchno == hashtable->
curbatch)
1669 if (hashtable->
nbatch == 1 &&
1734 if (hashTuple == NULL)
1847 foreach(hk, hashkeys)
1897 *hashvalue = hashkey;
1939 *bucketno = hashvalue & (nbuckets - 1);
1945 *bucketno = hashvalue & (nbuckets - 1);
1976 if (hashTuple != NULL)
1983 while (hashTuple != NULL)
2034 if (hashTuple != NULL)
2040 while (hashTuple != NULL)
2097 int curbatch = hashtable->
curbatch;
2171 if (hashTuple != NULL)
2188 while (hashTuple != NULL)
2246 if (hashTuple != NULL)
2254 while (hashTuple != NULL)
2299 int nbuckets = hashtable->
nbuckets;
2316 hashtable->
chunks = NULL;
2403 if (mcvsToUse > sslot.
nvalues)
2412 for (
i = 0;
i < mcvsToUse;
i++)
2450 mcvsToUse *
sizeof(
int));
2453 + mcvsToUse *
sizeof(
int);
2455 + mcvsToUse *
sizeof(
int);
2470 for (
i = 0;
i < mcvsToUse;
i++)
2485 bucket = hashvalue & (nbuckets - 1);
2486 while (hashtable->
skewBucket[bucket] != NULL &&
2488 bucket = (bucket + 1) & (nbuckets - 1);
2546 while (hashtable->
skewBucket[bucket] != NULL &&
2628 bucket = hashtable->
skewBucket[bucketToRemove];
2640 hashTuple = bucket->
tuples;
2641 while (hashTuple != NULL)
2656 if (batchno == hashtable->
curbatch)
2666 memcpy(copyTuple, hashTuple, tupleSize);
2687 hashTuple = nextHashTuple;
2705 hashtable->
skewBucket[bucketToRemove] = NULL;
2821 if (shared_info == NULL)
2883 newChunk->
used = size;
2890 if (hashtable->
chunks != NULL)
2898 hashtable->
chunks = newChunk;
2908 if ((hashtable->
chunks == NULL) ||
2916 newChunk->
used = size;
2920 hashtable->
chunks = newChunk;
2954 int curbatch = hashtable->
curbatch;
2963 if (chunk != NULL &&
2972 chunk->
used += size;
3033 if (hashtable->
nbatch == 1)
3040 hashtable->
nbuckets < (INT_MAX / 2) &&
3117 hashtable->
nbatch = nbatch;
3143 accessor->
shared = shared;
3202 if (hashtable->
batches != NULL)
3237 accessor->
shared = shared;
3239 accessor->
done =
false;
3270 for (
i = 0;
i < nbuckets; ++
i)
3284 int curbatch = hashtable->
curbatch;
3286 bool attached =
true;
3538 Assert(batchno < hashtable->nbatch);
3600 mem_limit =
Min(mem_limit, (
double) SIZE_MAX);
3602 return (
size_t) mem_limit;
Datum idx(PG_FUNCTION_ARGS)
void PrepareTempTablespaces(void)
bool BarrierArriveAndDetachExceptLast(Barrier *barrier)
bool BarrierArriveAndDetach(Barrier *barrier)
int BarrierAttach(Barrier *barrier)
void BarrierInit(Barrier *barrier, int participants)
int BarrierPhase(Barrier *barrier)
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
bool BarrierDetach(Barrier *barrier)
void BufFileClose(BufFile *file)
#define OidIsValid(objectId)
elog(ERROR, "%s: %s", p2, msg)
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
void dsa_free(dsa_area *area, dsa_pointer dp)
#define dsa_allocate0(area, size)
#define dsa_pointer_atomic_init
#define dsa_allocate(area, size)
#define dsa_pointer_atomic_write
#define InvalidDsaPointer
#define dsa_pointer_atomic_compare_exchange
#define dsa_pointer_atomic_read
pg_atomic_uint64 dsa_pointer_atomic
#define DsaPointerIsValid(x)
void ExecReScan(PlanState *node)
List * ExecInitExprList(List *nodes, PlanState *parent)
void ExecEndNode(PlanState *node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
void ExecAssignExprContext(EState *estate, PlanState *planstate)
void ExecFreeExprContext(PlanState *planstate)
#define outerPlanState(node)
struct HashJoinTupleData * HashJoinTuple
struct HashInstrumentation HashInstrumentation
#define EXEC_FLAG_BACKWARD
#define ResetExprContext(econtext)
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
static TupleTableSlot * ExecProcNode(PlanState *node)
#define palloc_object(type)
#define repalloc_array(pointer, type, count)
#define palloc_array(type, count)
#define palloc0_array(type, count)
#define palloc0_object(type)
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
double hash_mem_multiplier
#define PHJ_GROW_BATCHES_REPARTITION
struct HashMemoryChunkData * HashMemoryChunk
#define HASH_CHUNK_DATA(hc)
#define PHJ_GROW_BUCKETS_REINSERT
#define SKEW_MIN_OUTER_FRACTION
#define PHJ_GROW_BUCKETS_ELECT
#define PHJ_GROW_BUCKETS_PHASE(n)
#define PHJ_GROW_BATCHES_ELECT
#define ParallelHashJoinBatchInner(batch)
#define PHJ_BUILD_HASH_INNER
#define NthParallelHashJoinBatch(base, n)
#define HASH_CHUNK_THRESHOLD
#define PHJ_BUILD_HASH_OUTER
#define HJTUPLE_MINTUPLE(hjtup)
#define SKEW_BUCKET_OVERHEAD
#define PHJ_GROW_BATCHES_DECIDE
#define PHJ_GROW_BATCHES_REALLOCATE
#define HASH_CHUNK_HEADER_SIZE
#define PHJ_GROW_BATCHES_FINISH
#define ParallelHashJoinBatchOuter(batch, nparticipants)
#define SKEW_HASH_MEM_PERCENT
#define PHJ_BUILD_ALLOCATE
#define PHJ_GROW_BUCKETS_REALLOCATE
#define PHJ_GROW_BATCHES_PHASE(n)
@ PHJ_GROWTH_NEED_MORE_BUCKETS
@ PHJ_GROWTH_NEED_MORE_BATCHES
#define INVALID_SKEW_BUCKET_NO
#define EstimateParallelHashJoinBatch(hashtable)
void heap_free_minimal_tuple(MinimalTuple mtup)
#define HeapTupleIsValid(tuple)
#define HeapTupleHeaderHasMatch(tup)
#define SizeofMinimalTupleHeader
#define HeapTupleHeaderClearMatch(tup)
void InstrStartNode(Instrumentation *instr)
void InstrStopNode(Instrumentation *instr, double nTuples)
if(TABLE==NULL||TABLE_index==NULL)
Assert(fmt[strlen(fmt) - 1] !='\n')
void free_attstatsslot(AttStatsSlot *sslot)
bool get_op_hash_functions(Oid opno, RegProcedure *lhs_procno, RegProcedure *rhs_procno)
bool get_attstatsslot(AttStatsSlot *sslot, HeapTuple statstuple, int reqkind, Oid reqop, int flags)
#define ATTSTATSSLOT_NUMBERS
#define ATTSTATSSLOT_VALUES
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void MemoryContextReset(MemoryContext context)
void pfree(void *pointer)
void * MemoryContextAllocZero(MemoryContext context, Size size)
MemoryContext CurrentMemoryContext
void * MemoryContextAlloc(MemoryContext context, Size size)
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
void ExecParallelHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
void ExecHashTableReset(HashJoinTable hashtable)
bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, bool outer_tuple, bool keep_nulls, uint32 *hashvalue)
static HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno)
void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
static void * dense_alloc(HashJoinTable hashtable, Size size)
static void MultiExecParallelHash(HashState *node)
void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
static void MultiExecPrivateHash(HashState *node)
void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
void ExecHashTableDetachBatch(HashJoinTable hashtable)
void ExecHashEstimate(HashState *node, ParallelContext *pcxt)
HashState * ExecInitHash(Hash *node, EState *estate, int eflags)
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
void ExecHashTableDetach(HashJoinTable hashtable)
bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
void ExecHashTableDestroy(HashJoinTable hashtable)
int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
size_t get_hash_memory_limit(void)
bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
static void ExecHashSkewTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue, int bucketNumber)
static void ExecParallelHashRepartitionRest(HashJoinTable hashtable)
static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
void ExecHashTableResetMatchFlags(HashJoinTable hashtable)
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable)
static HashJoinTuple ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple)
void ExecEndHash(HashState *node)
void ExecShutdownHash(HashState *node)
void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
static void ExecParallelHashMergeCounters(HashJoinTable hashtable)
static TupleTableSlot * ExecHash(PlanState *pstate)
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
HashJoinTable ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, bool keepNulls)
bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared)
void ExecReScanHash(HashState *node)
bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable)
static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
void ExecHashRetrieveInstrumentation(HashState *node)
Node * MultiExecHash(HashState *node)
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr, HashJoinTable hashtable)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define repalloc0_array(pointer, type, oldcount, count)
static uint32 pg_rotate_left32(uint32 word, int n)
static uint32 pg_nextpower2_32(uint32 num)
static uint32 pg_rotate_right32(uint32 word, int n)
#define pg_nextpower2_size_t
#define pg_prevpower2_size_t
static int list_length(const List *l)
#define forboth(cell1, list1, cell2, list2)
static uint32 DatumGetUInt32(Datum X)
static Datum Int16GetDatum(int16 X)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
SharedTuplestoreAccessor * sts_initialize(SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
void sts_end_write(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * sts_attach(SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
#define SHARED_TUPLESTORE_SINGLE_PASS
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_estimate_keys(e, cnt)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
MemoryContext ecxt_per_tuple_memory
TupleTableSlot * ecxt_innertuple
TupleTableSlot * ecxt_outertuple
HashJoinTuple hj_CurTuple
HashJoinTable hj_HashTable
TupleTableSlot * hj_HashTupleSlot
struct HashJoinTupleData ** unshared
FmgrInfo * outer_hashfunctions
ParallelHashJoinBatchAccessor * batches
ParallelHashJoinState * parallel_state
HashMemoryChunk current_chunk
BufFile ** innerBatchFile
int log2_nbuckets_optimal
dsa_pointer_atomic * shared
BufFile ** outerBatchFile
FmgrInfo * inner_hashfunctions
dsa_pointer current_chunk_shared
HashSkewBucket ** skewBucket
union HashJoinTableData::@99 buckets
union HashJoinTupleData::@97 next
struct HashJoinTupleData * unshared
struct HashMemoryChunkData * unshared
union HashMemoryChunkData::@98 next
struct ParallelHashJoinState * parallel_state
SharedHashInfo * shared_info
HashInstrumentation * hinstrument
shm_toc_estimator estimator
SharedTuplestoreAccessor * outer_tuples
ParallelHashJoinBatch * shared
SharedTuplestoreAccessor * inner_tuples
Barrier grow_batches_barrier
dsa_pointer chunk_work_queue
Barrier grow_buckets_barrier
ParallelHashGrowth growth
Instrumentation * instrument
ExprContext * ps_ExprContext
ProjectionInfo * ps_ProjInfo
ExecProcNodeMtd ExecProcNode
HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER]
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache3(int cacheId, Datum key1, Datum key2, Datum key3)