94 elog(
ERROR,
"Hash node does not support ExecProcNode call convention");
443 size_t space_allowed;
472 state->parallel_state != NULL,
473 state->parallel_state != NULL ?
474 state->parallel_state->nparticipants - 1 : 0,
476 &nbuckets, &nbatch, &num_skew_mcvs);
479 log2_nbuckets =
my_log2(nbuckets);
480 Assert(nbuckets == (1 << log2_nbuckets));
502 hashtable->
nbatch = nbatch;
521 hashtable->
area =
state->ps.state->es_query_dsa;
525 printf(
"Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
526 hashtable, nbatch, nbuckets);
555 forboth(ho, hashOperators, hc, hashCollations)
562 elog(
ERROR,
"could not find hash function for hash operator %u",
667 #define NTUP_PER_BUCKET 1
671 bool try_combined_hash_mem,
672 int parallel_workers,
673 size_t *space_allowed,
679 double inner_rel_bytes;
680 size_t hash_table_bytes;
699 inner_rel_bytes = ntuples * tupsize;
711 if (try_combined_hash_mem)
716 newlimit = (double) hash_table_bytes * (
double) (parallel_workers + 1);
717 newlimit =
Min(newlimit, (
double) SIZE_MAX);
718 hash_table_bytes = (size_t) newlimit;
721 *space_allowed = hash_table_bytes;
739 size_t bytes_per_mcv;
752 bytes_per_mcv = tupsize +
756 skew_mcvs = hash_table_bytes / bytes_per_mcv;
765 skew_mcvs =
Min(skew_mcvs, INT_MAX);
767 *num_skew_mcvs = (int) skew_mcvs;
771 hash_table_bytes -= skew_mcvs * bytes_per_mcv;
792 max_pointers =
Min(max_pointers, INT_MAX / 2 + 1);
795 dbuckets =
Min(dbuckets, max_pointers);
796 nbuckets = (int) dbuckets;
798 nbuckets =
Max(nbuckets, 1024);
807 if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
819 if (try_combined_hash_mem)
822 false, parallel_workers,
837 if (hash_table_bytes <= bucket_size)
841 sbuckets =
Min(sbuckets, max_pointers);
842 nbuckets = (int) sbuckets;
854 Assert(bucket_bytes <= hash_table_bytes / 2);
857 dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
858 dbatch =
Min(dbatch, max_pointers);
859 minbatch = (int) dbatch;
866 *numbuckets = nbuckets;
867 *numbatches = nbatch;
913 int oldnbatch = hashtable->
nbatch;
929 nbatch = oldnbatch * 2;
933 printf(
"Hashjoin %p: increasing nbatch to %d because space = %zu\n",
934 hashtable, nbatch, hashtable->
spaceUsed);
956 hashtable->
nbatch = nbatch;
962 ninmemory = nfreed = 0;
985 oldchunks = hashtable->
chunks;
989 while (oldchunks != NULL)
997 while (idx < oldchunks->used)
1007 &bucketno, &batchno);
1009 if (batchno == curbatch)
1015 memcpy(copyTuple, hashTuple, hashTupleSize);
1024 Assert(batchno > curbatch);
1042 oldchunks = nextchunk;
1046 printf(
"Hashjoin %p: freed %ld of %ld tuples, space now %zu\n",
1047 hashtable, nfreed, ninmemory, hashtable->
spaceUsed);
1058 if (nfreed == 0 || nfreed == ninmemory)
1062 printf(
"Hashjoin %p: disabling further increase of nbatch\n",
1113 if (hashtable->
nbatch == 1)
1137 new_nbatch = hashtable->
nbatch * 2;
1162 dtuples = (old_batch0->
ntuples * 2.0) / new_nbatch;
1164 dbuckets =
Min(dbuckets,
1166 new_nbuckets = (int) dbuckets;
1167 new_nbuckets =
Max(new_nbuckets, 1024);
1176 for (
i = 0;
i < new_nbuckets; ++
i)
1232 bool space_exhausted =
false;
1233 bool extreme_skew_detected =
false;
1240 for (
int i = 0;
i < hashtable->
nbatch; ++
i)
1249 space_exhausted =
true;
1259 extreme_skew_detected =
true;
1264 if (extreme_skew_detected || hashtable->
nbatch >= INT_MAX / 2)
1266 else if (space_exhausted)
1302 while (idx < chunk->used)
1312 &bucketno, &batchno);
1314 Assert(batchno < hashtable->nbatch);
1369 for (
i = 1;
i < old_nbatch; ++
i)
1380 for (
i = 1;
i < old_nbatch; ++
i)
1410 pfree(old_inner_tuples);
1456 printf(
"Hashjoin %p: increasing nbuckets %d => %d\n",
1486 while (idx < chunk->used)
1493 &bucketno, &batchno);
1569 while (idx < chunk->used)
1577 &bucketno, &batchno);
1619 &bucketno, &batchno);
1624 if (batchno == hashtable->
curbatch)
1657 if (hashtable->
nbatch == 1 &&
1721 if (hashTuple == NULL)
1833 foreach(hk, hashkeys)
1883 *hashvalue = hashkey;
1925 *bucketno = hashvalue & (nbuckets - 1);
1931 *bucketno = hashvalue & (nbuckets - 1);
1962 if (hashTuple != NULL)
1969 while (hashTuple != NULL)
2020 if (hashTuple != NULL)
2026 while (hashTuple != NULL)
2083 int curbatch = hashtable->
curbatch;
2158 if (hashTuple != NULL)
2175 while (hashTuple != NULL)
2233 if (hashTuple != NULL)
2241 while (hashTuple != NULL)
2286 int nbuckets = hashtable->
nbuckets;
2303 hashtable->
chunks = NULL;
2390 if (mcvsToUse > sslot.
nvalues)
2399 for (
i = 0;
i < mcvsToUse;
i++)
2437 mcvsToUse *
sizeof(
int));
2440 + mcvsToUse *
sizeof(
int);
2442 + mcvsToUse *
sizeof(
int);
2457 for (
i = 0;
i < mcvsToUse;
i++)
2472 bucket = hashvalue & (nbuckets - 1);
2473 while (hashtable->
skewBucket[bucket] != NULL &&
2475 bucket = (bucket + 1) & (nbuckets - 1);
2533 while (hashtable->
skewBucket[bucket] != NULL &&
2615 bucket = hashtable->
skewBucket[bucketToRemove];
2627 hashTuple = bucket->
tuples;
2628 while (hashTuple != NULL)
2643 if (batchno == hashtable->
curbatch)
2653 memcpy(copyTuple, hashTuple, tupleSize);
2673 hashTuple = nextHashTuple;
2691 hashtable->
skewBucket[bucketToRemove] = NULL;
2807 if (shared_info == NULL)
2869 newChunk->
used = size;
2876 if (hashtable->
chunks != NULL)
2884 hashtable->
chunks = newChunk;
2894 if ((hashtable->
chunks == NULL) ||
2902 newChunk->
used = size;
2906 hashtable->
chunks = newChunk;
2940 int curbatch = hashtable->
curbatch;
2949 if (chunk != NULL &&
2958 chunk->
used += size;
3019 if (hashtable->
nbatch == 1)
3026 hashtable->
nbuckets < (INT_MAX / 2) &&
3100 hashtable->
nbatch = nbatch;
3126 accessor->
shared = shared;
3185 if (hashtable->
batches != NULL)
3217 accessor->
shared = shared;
3219 accessor->
done =
false;
3250 for (
i = 0;
i < nbuckets; ++
i)
3264 int curbatch = hashtable->
curbatch;
3266 bool attached =
true;
3518 Assert(batchno < hashtable->nbatch);
3580 mem_limit =
Min(mem_limit, (
double) SIZE_MAX);
3582 return (
size_t) mem_limit;
Datum idx(PG_FUNCTION_ARGS)
void PrepareTempTablespaces(void)
bool BarrierArriveAndDetachExceptLast(Barrier *barrier)
int BarrierParticipants(Barrier *barrier)
bool BarrierArriveAndDetach(Barrier *barrier)
int BarrierAttach(Barrier *barrier)
void BarrierInit(Barrier *barrier, int participants)
int BarrierPhase(Barrier *barrier)
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
bool BarrierDetach(Barrier *barrier)
void BufFileClose(BufFile *file)
#define OidIsValid(objectId)
elog(ERROR, "%s: %s", p2, msg)
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
void dsa_free(dsa_area *area, dsa_pointer dp)
#define dsa_allocate0(area, size)
#define dsa_pointer_atomic_init
#define dsa_allocate(area, size)
#define dsa_pointer_atomic_write
#define InvalidDsaPointer
#define dsa_pointer_atomic_compare_exchange
#define dsa_pointer_atomic_read
pg_atomic_uint64 dsa_pointer_atomic
#define DsaPointerIsValid(x)
void ExecReScan(PlanState *node)
List * ExecInitExprList(List *nodes, PlanState *parent)
void ExecEndNode(PlanState *node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
void ExecAssignExprContext(EState *estate, PlanState *planstate)
void ExecFreeExprContext(PlanState *planstate)
#define outerPlanState(node)
struct HashJoinTupleData * HashJoinTuple
struct HashInstrumentation HashInstrumentation
#define EXEC_FLAG_BACKWARD
#define ResetExprContext(econtext)
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
static TupleTableSlot * ExecProcNode(PlanState *node)
#define palloc_object(type)
#define repalloc_array(pointer, type, count)
#define palloc_array(type, count)
#define palloc0_array(type, count)
#define palloc0_object(type)
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
double hash_mem_multiplier
#define PHJ_GROW_BATCHES_REPARTITION
struct HashMemoryChunkData * HashMemoryChunk
#define HASH_CHUNK_DATA(hc)
#define PHJ_GROW_BUCKETS_REINSERT
#define SKEW_MIN_OUTER_FRACTION
#define PHJ_GROW_BUCKETS_ELECT
#define PHJ_GROW_BUCKETS_PHASE(n)
#define PHJ_GROW_BATCHES_ELECT
#define ParallelHashJoinBatchInner(batch)
#define PHJ_BUILD_HASH_INNER
#define NthParallelHashJoinBatch(base, n)
#define HASH_CHUNK_THRESHOLD
#define PHJ_BUILD_HASH_OUTER
#define HJTUPLE_MINTUPLE(hjtup)
#define SKEW_BUCKET_OVERHEAD
#define PHJ_GROW_BATCHES_DECIDE
#define PHJ_GROW_BATCHES_REALLOCATE
#define HASH_CHUNK_HEADER_SIZE
#define PHJ_GROW_BATCHES_FINISH
#define ParallelHashJoinBatchOuter(batch, nparticipants)
#define SKEW_HASH_MEM_PERCENT
#define PHJ_BUILD_ALLOCATE
#define PHJ_GROW_BUCKETS_REALLOCATE
#define PHJ_GROW_BATCHES_PHASE(n)
@ PHJ_GROWTH_NEED_MORE_BUCKETS
@ PHJ_GROWTH_NEED_MORE_BATCHES
#define INVALID_SKEW_BUCKET_NO
#define EstimateParallelHashJoinBatch(hashtable)
void heap_free_minimal_tuple(MinimalTuple mtup)
#define HeapTupleIsValid(tuple)
#define HeapTupleHeaderHasMatch(tup)
#define SizeofMinimalTupleHeader
#define HeapTupleHeaderClearMatch(tup)
void InstrStartNode(Instrumentation *instr)
void InstrStopNode(Instrumentation *instr, double nTuples)
if(TABLE==NULL||TABLE_index==NULL)
Assert(fmt[strlen(fmt) - 1] !='\n')
void free_attstatsslot(AttStatsSlot *sslot)
bool get_op_hash_functions(Oid opno, RegProcedure *lhs_procno, RegProcedure *rhs_procno)
bool get_attstatsslot(AttStatsSlot *sslot, HeapTuple statstuple, int reqkind, Oid reqop, int flags)
#define ATTSTATSSLOT_NUMBERS
#define ATTSTATSSLOT_VALUES
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void MemoryContextReset(MemoryContext context)
void pfree(void *pointer)
void * MemoryContextAllocZero(MemoryContext context, Size size)
MemoryContext CurrentMemoryContext
void * MemoryContextAlloc(MemoryContext context, Size size)
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
void ExecParallelHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
void ExecHashTableReset(HashJoinTable hashtable)
bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, bool outer_tuple, bool keep_nulls, uint32 *hashvalue)
static HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno)
void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
static void * dense_alloc(HashJoinTable hashtable, Size size)
static void MultiExecParallelHash(HashState *node)
void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
static void MultiExecPrivateHash(HashState *node)
void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
void ExecHashTableDetachBatch(HashJoinTable hashtable)
void ExecHashEstimate(HashState *node, ParallelContext *pcxt)
HashState * ExecInitHash(Hash *node, EState *estate, int eflags)
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
void ExecHashTableDetach(HashJoinTable hashtable)
bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
void ExecHashTableDestroy(HashJoinTable hashtable)
int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
size_t get_hash_memory_limit(void)
bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
static void ExecHashSkewTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue, int bucketNumber)
static void ExecParallelHashRepartitionRest(HashJoinTable hashtable)
static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
void ExecHashTableResetMatchFlags(HashJoinTable hashtable)
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable)
static HashJoinTuple ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple)
void ExecEndHash(HashState *node)
void ExecShutdownHash(HashState *node)
void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
static void ExecParallelHashMergeCounters(HashJoinTable hashtable)
static TupleTableSlot * ExecHash(PlanState *pstate)
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
HashJoinTable ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, bool keepNulls)
bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared)
void ExecReScanHash(HashState *node)
bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable)
static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
void ExecHashRetrieveInstrumentation(HashState *node)
Node * MultiExecHash(HashState *node)
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define repalloc0_array(pointer, type, oldcount, count)
static uint32 pg_rotate_left32(uint32 word, int n)
static uint32 pg_nextpower2_32(uint32 num)
static uint32 pg_rotate_right32(uint32 word, int n)
#define pg_nextpower2_size_t
#define pg_prevpower2_size_t
static int list_length(const List *l)
#define forboth(cell1, list1, cell2, list2)
static uint32 DatumGetUInt32(Datum X)
static Datum Int16GetDatum(int16 X)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
SharedTuplestoreAccessor * sts_initialize(SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
void sts_end_write(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * sts_attach(SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
#define SHARED_TUPLESTORE_SINGLE_PASS
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_estimate_keys(e, cnt)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
MemoryContext ecxt_per_tuple_memory
TupleTableSlot * ecxt_innertuple
TupleTableSlot * ecxt_outertuple
HashJoinTuple hj_CurTuple
HashJoinTable hj_HashTable
TupleTableSlot * hj_HashTupleSlot
struct HashJoinTupleData ** unshared
FmgrInfo * outer_hashfunctions
union HashJoinTableData::@97 buckets
ParallelHashJoinBatchAccessor * batches
ParallelHashJoinState * parallel_state
HashMemoryChunk current_chunk
BufFile ** innerBatchFile
int log2_nbuckets_optimal
dsa_pointer_atomic * shared
BufFile ** outerBatchFile
FmgrInfo * inner_hashfunctions
dsa_pointer current_chunk_shared
HashSkewBucket ** skewBucket
union HashJoinTupleData::@95 next
struct HashJoinTupleData * unshared
union HashMemoryChunkData::@96 next
struct HashMemoryChunkData * unshared
struct ParallelHashJoinState * parallel_state
SharedHashInfo * shared_info
HashInstrumentation * hinstrument
shm_toc_estimator estimator
SharedTuplestoreAccessor * outer_tuples
ParallelHashJoinBatch * shared
SharedTuplestoreAccessor * inner_tuples
Barrier grow_batches_barrier
dsa_pointer chunk_work_queue
Barrier grow_buckets_barrier
ParallelHashGrowth growth
Instrumentation * instrument
ExprContext * ps_ExprContext
ProjectionInfo * ps_ProjInfo
ExecProcNodeMtd ExecProcNode
HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER]
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache3(int cacheId, Datum key1, Datum key2, Datum key3)
@ WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT
@ WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE
@ WAIT_EVENT_HASH_BUILD_ELECT
@ WAIT_EVENT_HASH_BUILD_HASH_INNER
@ WAIT_EVENT_HASH_GROW_BATCHES_DECIDE
@ WAIT_EVENT_HASH_GROW_BATCHES_FINISH
@ WAIT_EVENT_HASH_GROW_BUCKETS_ELECT
@ WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE
@ WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION
@ WAIT_EVENT_HASH_BUILD_ALLOCATE
@ WAIT_EVENT_HASH_GROW_BATCHES_ELECT