92 elog(
ERROR,
"Hash node does not support ExecProcNode call convention");
296 for (i = 0; i < hashtable->
nbatch; ++
i)
434 size_t space_allowed;
467 &nbuckets, &nbatch, &num_skew_mcvs);
470 log2_nbuckets =
my_log2(nbuckets);
471 Assert(nbuckets == (1 << log2_nbuckets));
493 hashtable->
nbatch = nbatch;
516 printf(
"Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
517 hashtable, nbatch, nbuckets);
548 forboth(ho, hashOperators, hc, hashCollations)
555 elog(
ERROR,
"could not find hash function for hash operator %u",
663 #define NTUP_PER_BUCKET 1 667 bool try_combined_work_mem,
668 int parallel_workers,
669 size_t *space_allowed,
675 double inner_rel_bytes;
677 long hash_table_bytes;
678 long skew_table_bytes;
697 inner_rel_bytes = ntuples * tupsize;
702 hash_table_bytes =
work_mem * 1024L;
709 if (try_combined_work_mem)
710 hash_table_bytes += hash_table_bytes * parallel_workers;
712 *space_allowed = hash_table_bytes;
740 *num_skew_mcvs = skew_table_bytes / (tupsize +
744 if (*num_skew_mcvs > 0)
745 hash_table_bytes -= skew_table_bytes;
762 mppow2 = 1L <<
my_log2(max_pointers);
763 if (max_pointers != mppow2)
764 max_pointers = mppow2 / 2;
768 max_pointers =
Min(max_pointers, INT_MAX / 2);
771 dbuckets =
Min(dbuckets, max_pointers);
772 nbuckets = (int) dbuckets;
774 nbuckets =
Max(nbuckets, 1024);
776 nbuckets = 1 <<
my_log2(nbuckets);
783 if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
795 if (try_combined_work_mem)
798 false, parallel_workers,
813 lbuckets = 1L <<
my_log2(hash_table_bytes / bucket_size);
814 lbuckets =
Min(lbuckets, max_pointers);
815 nbuckets = (int) lbuckets;
816 nbuckets = 1 <<
my_log2(nbuckets);
827 Assert(bucket_bytes <= hash_table_bytes / 2);
830 dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
831 dbatch =
Min(dbatch, max_pointers);
832 minbatch = (int) dbatch;
834 while (nbatch < minbatch)
841 *numbuckets = nbuckets;
842 *numbatches = nbatch;
864 for (i = 1; i < hashtable->
nbatch; i++)
888 int oldnbatch = hashtable->
nbatch;
904 nbatch = oldnbatch * 2;
908 printf(
"Hashjoin %p: increasing nbatch to %d because space = %zu\n",
909 hashtable, nbatch, hashtable->
spaceUsed);
932 (nbatch - oldnbatch) *
sizeof(
BufFile *));
934 (nbatch - oldnbatch) *
sizeof(
BufFile *));
939 hashtable->
nbatch = nbatch;
945 ninmemory = nfreed = 0;
968 oldchunks = hashtable->
chunks;
972 while (oldchunks != NULL)
980 while (idx < oldchunks->used)
990 &bucketno, &batchno);
992 if (batchno == curbatch)
998 memcpy(copyTuple, hashTuple, hashTupleSize);
1007 Assert(batchno > curbatch);
1025 oldchunks = nextchunk;
1029 printf(
"Hashjoin %p: freed %ld of %ld tuples, space now %zu\n",
1030 hashtable, nfreed, ninmemory, hashtable->
spaceUsed);
1041 if (nfreed == 0 || nfreed == ninmemory)
1045 printf(
"Hashjoin %p: disabling further increase of nbatch\n",
1097 if (hashtable->
nbatch == 1)
1121 new_nbatch = hashtable->
nbatch * 2;
1146 dtuples = (old_batch0->
ntuples * 2.0) / new_nbatch;
1148 dbuckets =
Min(dbuckets,
1150 new_nbuckets = (int) dbuckets;
1151 new_nbuckets =
Max(new_nbuckets, 1024);
1152 new_nbuckets = 1 <<
my_log2(new_nbuckets);
1160 for (i = 0; i < new_nbuckets; ++
i)
1170 for (i = 0; i < hashtable->
nbuckets; ++
i)
1216 bool space_exhausted =
false;
1217 bool extreme_skew_detected =
false;
1224 for (i = 0; i < hashtable->
nbatch; ++
i)
1233 space_exhausted =
true;
1243 extreme_skew_detected =
true;
1248 if (extreme_skew_detected || hashtable->
nbatch >= INT_MAX / 2)
1250 else if (space_exhausted)
1286 while (idx < chunk->used)
1296 &bucketno, &batchno);
1298 Assert(batchno < hashtable->nbatch);
1353 for (i = 1; i < old_nbatch; ++
i)
1364 for (i = 1; i < old_nbatch; ++
i)
1394 pfree(old_inner_tuples);
1408 for (i = 0; i < hashtable->
nbatch; ++
i)
1440 printf(
"Hashjoin %p: increasing nbuckets %d => %d\n",
1470 while (idx < chunk->used)
1477 &bucketno, &batchno);
1553 while (idx < chunk->used)
1561 &bucketno, &batchno);
1603 &bucketno, &batchno);
1608 if (batchno == hashtable->
curbatch)
1641 if (hashtable->
nbatch == 1 &&
1705 if (hashTuple == NULL)
1817 foreach(hk, hashkeys)
1824 hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
1867 *hashvalue = hashkey;
1906 *bucketno = hashvalue & (nbuckets - 1);
1907 *batchno = (hashvalue >> hashtable->
log2_nbuckets) & (nbatch - 1);
1911 *bucketno = hashvalue & (nbuckets - 1);
1942 if (hashTuple != NULL)
1949 while (hashTuple != NULL)
2000 if (hashTuple != NULL)
2006 while (hashTuple != NULL)
2075 if (hashTuple != NULL)
2092 while (hashTuple != NULL)
2137 int nbuckets = hashtable->
nbuckets;
2155 hashtable->
chunks = NULL;
2169 for (i = 0; i < hashtable->
nbuckets; i++)
2240 if (mcvsToUse > sslot.
nvalues)
2249 for (i = 0; i < mcvsToUse; i++)
2271 while (nbuckets <= mcvsToUse)
2289 mcvsToUse *
sizeof(
int));
2292 + mcvsToUse *
sizeof(
int);
2294 + mcvsToUse *
sizeof(
int);
2309 for (i = 0; i < mcvsToUse; i++)
2324 bucket = hashvalue & (nbuckets - 1);
2325 while (hashtable->
skewBucket[bucket] != NULL &&
2327 bucket = (bucket + 1) & (nbuckets - 1);
2385 while (hashtable->
skewBucket[bucket] != NULL &&
2467 bucket = hashtable->
skewBucket[bucketToRemove];
2479 hashTuple = bucket->
tuples;
2480 while (hashTuple != NULL)
2495 if (batchno == hashtable->
curbatch)
2505 memcpy(copyTuple, hashTuple, tupleSize);
2525 hashTuple = nextHashTuple;
2543 hashtable->
skewBucket[bucketToRemove] = NULL;
2646 if (shared_info == NULL)
2692 newChunk->
used = size;
2699 if (hashtable->
chunks != NULL)
2707 hashtable->
chunks = newChunk;
2717 if ((hashtable->
chunks == NULL) ||
2725 newChunk->
used = size;
2729 hashtable->
chunks = newChunk;
2763 int curbatch = hashtable->
curbatch;
2772 if (chunk != NULL &&
2781 chunk->
used += size;
2842 if (hashtable->
nbatch == 1)
2849 hashtable->
nbuckets < (INT_MAX / 2) &&
2923 hashtable->
nbatch = nbatch;
2928 for (i = 0; i < hashtable->
nbatch; ++
i)
2949 accessor->
shared = shared;
2984 for (i = 0; i < hashtable->
nbatch; ++
i)
3008 if (hashtable->
batches != NULL)
3038 for (i = 0; i < hashtable->
nbatch; ++
i)
3043 accessor->
shared = shared;
3045 accessor->
done =
false;
3075 for (i = 0; i < nbuckets; ++
i)
3089 int curbatch = hashtable->
curbatch;
3151 for (i = 0; i < hashtable->
nbatch; ++
i)
3295 Assert(batchno < hashtable->nbatch);
dsa_pointer current_chunk_shared
static void ExecParallelHashRepartitionRest(HashJoinTable hashtable)
int log2_nbuckets_optimal
#define DatumGetUInt32(X)
struct ParallelHashJoinState * parallel_state
void InstrStopNode(Instrumentation *instr, double nTuples)
static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
SharedTuplestoreAccessor * outer_tuples
dsa_pointer_atomic * shared
#define SKEW_BUCKET_OVERHEAD
struct dsa_area * es_query_dsa
#define INVALID_SKEW_BUCKET_NO
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable)
bool get_op_hash_functions(Oid opno, RegProcedure *lhs_procno, RegProcedure *rhs_procno)
dsa_pointer chunk_work_queue
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
#define SKEW_MIN_OUTER_FRACTION
void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
#define forboth(cell1, list1, cell2, list2)
#define PHJ_GROW_BATCHES_DECIDING
SharedTuplestoreAccessor * sts_initialize(SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
union HashJoinTupleData::@95 next
ProjectionInfo * ps_ProjInfo
Instrumentation * instrument
void ExecHashGetInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
#define InvalidDsaPointer
void BarrierInit(Barrier *barrier, int participants)
#define ATTSTATSSLOT_VALUES
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
void ExecEndNode(PlanState *node)
bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
void ExecHashTableDetachBatch(HashJoinTable hashtable)
void ExecHashTableDetach(HashJoinTable hashtable)
#define ParallelHashJoinBatchOuter(batch, nparticipants)
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
ExprContext * ps_ExprContext
HashState * ExecInitHash(Hash *node, EState *estate, int eflags)
void ExecHashTableReset(HashJoinTable hashtable)
shm_toc_estimator estimator
MemoryContext ecxt_per_tuple_memory
void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
void ExecReScan(PlanState *node)
struct HashInstrumentation HashInstrumentation
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define dsa_pointer_atomic_compare_exchange
#define MemSet(start, val, len)
Node * MultiExecHash(HashState *node)
Datum idx(PG_FUNCTION_ARGS)
void MemoryContextReset(MemoryContext context)
FmgrInfo * inner_hashfunctions
static void MultiExecPrivateHash(HashState *node)
static void ExecParallelHashMergeCounters(HashJoinTable hashtable)
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
void ExecHashRetrieveInstrumentation(HashState *node)
#define shm_toc_estimate_chunk(e, sz)
#define OidIsValid(objectId)
static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable)
#define PHJ_BUILD_HASHING_OUTER
void ExecFreeExprContext(PlanState *planstate)
void ExecShutdownHash(HashState *node)
void BufFileClose(BufFile *file)
int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
struct PlanState * lefttree
HashJoinTable ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, bool keepNulls)
SharedTuplestoreAccessor * inner_tuples
#define PHJ_GROW_BUCKETS_REINSERTING
SharedHashInfo * shared_info
void LWLockRelease(LWLock *lock)
void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
void ExecReScanHash(HashState *node)
void pfree(void *pointer)
#define PHJ_GROW_BUCKETS_ALLOCATING
#define ATTSTATSSLOT_NUMBERS
Barrier grow_buckets_barrier
#define ObjectIdGetDatum(X)
struct HashJoinTupleData * unshared
void PrepareTempTablespaces(void)
union HashMemoryChunkData::@96 next
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
void heap_free_minimal_tuple(MinimalTuple mtup)
#define ParallelHashJoinBatchInner(batch)
static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
void InstrStartNode(Instrumentation *instr)
static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
void fmgr_info(Oid functionId, FmgrInfo *finfo)
HeapTuple SearchSysCache3(int cacheId, Datum key1, Datum key2, Datum key3)
#define PHJ_GROW_BATCHES_PHASE(n)
#define HASH_CHUNK_THRESHOLD
struct HashJoinTupleData * HashJoinTuple
#define ALLOCSET_DEFAULT_SIZES
#define EXEC_FLAG_BACKWARD
BufFile ** outerBatchFile
#define outerPlanState(node)
#define dsa_allocate0(area, size)
#define PHJ_BUILD_ALLOCATING
HashJoinTuple hj_CurTuple
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
SharedTuplestoreAccessor * sts_attach(SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
bool BarrierArriveAndDetach(Barrier *barrier)
TupleTableSlot * ecxt_innertuple
List * ExecInitExprList(List *nodes, PlanState *parent)
struct HashMemoryChunkData * unshared
#define PHJ_BUILD_ELECTING
MemoryContext CurrentMemoryContext
#define SHARED_TUPLESTORE_SINGLE_PASS
void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
HashInstrumentation * hinstrument
struct HashJoinTableData * HashJoinTable
HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER]
void ExecParallelHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
FmgrInfo * outer_hashfunctions
#define PHJ_GROW_BATCHES_ALLOCATING
static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable table, dsa_pointer *shared)
#define SizeofMinimalTupleHeader
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
HashSkewBucket ** skewBucket
Size mul_size(Size s1, Size s2)
static TupleTableSlot * ExecHash(PlanState *pstate)
int BarrierAttach(Barrier *barrier)
void * palloc0(Size size)
ExecProcNodeMtd ExecProcNode
ParallelHashJoinState * parallel_state
static void * dense_alloc(HashJoinTable hashtable, Size size)
void ReleaseSysCache(HeapTuple tuple)
void ExecHashEstimate(HashState *node, ParallelContext *pcxt)
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
struct HashMemoryChunkData * HashMemoryChunk
Size add_size(Size s1, Size s2)
static TupleTableSlot * ExecProcNode(PlanState *node)
#define PHJ_BATCH_PROBING
#define PHJ_GROW_BATCHES_REPARTITIONING
#define PHJ_GROW_BUCKETS_ELECTING
void * MemoryContextAllocZero(MemoryContext context, Size size)
ParallelHashJoinBatchAccessor * batches
#define HJTUPLE_MINTUPLE(hjtup)
#define HeapTupleHeaderHasMatch(tup)
#define HASH_CHUNK_HEADER_SIZE
TupleTableSlot * ecxt_outertuple
#define HeapTupleIsValid(tuple)
static HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable table, int bucketno)
bool get_attstatsslot(AttStatsSlot *sslot, HeapTuple statstuple, int reqkind, Oid reqop, int flags)
#define dsa_pointer_atomic_write
#define Assert(condition)
static void ExecHashSkewTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue, int bucketNumber)
static void MultiExecParallelHash(HashState *node)
ParallelHashGrowth growth
#define PHJ_GROW_BUCKETS_PHASE(n)
#define NthParallelHashJoinBatch(base, n)
bool BarrierDetach(Barrier *barrier)
pg_atomic_uint64 dsa_pointer_atomic
void ExecAssignExprContext(EState *estate, PlanState *planstate)
BufFile ** innerBatchFile
#define shm_toc_estimate_keys(e, cnt)
static int list_length(const List *l)
int BarrierPhase(Barrier *barrier)
#define HeapTupleHeaderClearMatch(tup)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
union HashJoinTableData::@97 buckets
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void * repalloc(void *pointer, Size size)
ParallelHashJoinBatch * shared
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
#define DsaPointerIsValid(x)
TupleTableSlot * hj_HashTupleSlot
#define dsa_pointer_atomic_init
void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
#define EstimateParallelHashJoinBatch(hashtable)
void dsa_free(dsa_area *area, dsa_pointer dp)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
static HashJoinTuple ExecParallelHashNextTuple(HashJoinTable table, HashJoinTuple tuple)
HashJoinTable hj_HashTable
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
#define PHJ_BUILD_HASHING_INNER
static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
struct HashJoinTupleData ** unshared
HashMemoryChunk current_chunk
void * MemoryContextAlloc(MemoryContext context, Size size)
void ExecEndHash(HashState *node)
void ExecHashTableResetMatchFlags(HashJoinTable hashtable)
bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, bool outer_tuple, bool keep_nulls, uint32 *hashvalue)
#define dsa_pointer_atomic_read
#define CHECK_FOR_INTERRUPTS()
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
#define PHJ_GROW_BATCHES_FINISHING
#define SKEW_WORK_MEM_PERCENT
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_work_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
#define HASH_CHUNK_DATA(hc)
const TupleTableSlotOps TTSOpsMinimalTuple
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr)
Barrier grow_batches_barrier
void ExecHashTableDestroy(HashJoinTable hashtable)
#define offsetof(type, field)
#define PHJ_GROW_BATCHES_ELECTING
static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
#define ResetExprContext(econtext)
#define dsa_allocate(area, size)
void sts_end_write(SharedTuplestoreAccessor *accessor)
void free_attstatsslot(AttStatsSlot *sslot)