94 elog(
ERROR,
"Hash node does not support ExecProcNode call convention");
476 size_t space_allowed;
503 state->parallel_state->nparticipants - 1 : 0,
509 Assert(nbuckets == (1 << log2_nbuckets));
530 hashtable->
nbatch = nbatch;
549 hashtable->
area =
state->ps.state->es_query_dsa;
553 printf(
"Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
554 hashtable, nbatch, nbuckets);
680#define NTUP_PER_BUCKET 1
685 int parallel_workers,
686 size_t *space_allowed,
811 nbuckets =
Max(nbuckets, 1024);
835 false, parallel_workers,
938 if ((*space_allowed) > (
SIZE_MAX / 2))
951 if (nbatch < (*space_allowed) /
BLCKSZ)
961 *space_allowed = (*space_allowed) * 2;
969 *numbuckets = nbuckets;
970 *numbatches = nbatch;
1058 int curbatch = hashtable->
curbatch;
1080 printf(
"Hashjoin %p: increasing nbatch to %d because space = %zu\n",
1081 hashtable, nbatch, hashtable->
spaceUsed);
1104 hashtable->
nbatch = nbatch;
1195 printf(
"Hashjoin %p: freed %ld of %ld tuples, space now %zu\n",
1211 printf(
"Hashjoin %p: disabling further increase of nbatch\n",
1262 if (hashtable->
nbatch == 1)
1391 bool space_exhausted =
false;
1401 for (
int i = 0;
i < hashtable->
nbatch; ++
i)
1408 if (
batch->space_exhausted ||
1410 space_exhausted =
true;
1431 else if (space_exhausted)
1534 for (
i = 1;
i < old_nbatch; ++
i)
1545 for (
i = 1;
i < old_nbatch; ++
i)
1594 batch->shared->estimated_size +=
batch->estimated_size;
1596 batch->shared->old_ntuples +=
batch->old_ntuples;
1598 batch->estimated_size = 0;
1600 batch->old_ntuples = 0;
1621 printf(
"Hashjoin %p: increasing nbuckets %d => %d\n",
1822 if (hashtable->
nbatch == 1 &&
1996 *
bucketno = hashvalue & (nbuckets - 1);
2002 *
bucketno = hashvalue & (nbuckets - 1);
2141 hjstate->hj_CurSkewBucketNo = 0;
2154 int curbatch = hashtable->
curbatch;
2194 if (
batch->skip_unmatched)
2240 hjstate->hj_CurSkewBucketNo++;
2356 int nbuckets = hashtable->
nbuckets;
2541 bucket = hashvalue & (nbuckets - 1);
2909 if (shared_info ==
NULL)
3042 int curbatch = hashtable->
curbatch;
3051 if (chunk !=
NULL &&
3060 chunk->
used += size;
3121 if (hashtable->
nbatch == 1)
3205 hashtable->
nbatch = nbatch;
3358 for (
i = 0;
i < nbuckets; ++
i)
3372 int curbatch = hashtable->
curbatch;
3403 batch->skip_unmatched =
true;
3647 batch->at_least_one_chunk &&
3655 batch->shared->space_exhausted =
true;
3662 batch->at_least_one_chunk =
true;
3690 return (
size_t) mem_limit;
Datum idx(PG_FUNCTION_ARGS)
void PrepareTempTablespaces(void)
bool BarrierArriveAndDetachExceptLast(Barrier *barrier)
bool BarrierArriveAndDetach(Barrier *barrier)
int BarrierAttach(Barrier *barrier)
void BarrierInit(Barrier *barrier, int participants)
int BarrierPhase(Barrier *barrier)
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
bool BarrierDetach(Barrier *barrier)
void BufFileClose(BufFile *file)
#define Assert(condition)
#define OidIsValid(objectId)
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
void dsa_free(dsa_area *area, dsa_pointer dp)
#define dsa_allocate0(area, size)
#define dsa_pointer_atomic_init
#define dsa_allocate(area, size)
#define dsa_pointer_atomic_write
#define InvalidDsaPointer
#define dsa_pointer_atomic_compare_exchange
#define dsa_pointer_atomic_read
pg_atomic_uint64 dsa_pointer_atomic
#define DsaPointerIsValid(x)
void ExecReScan(PlanState *node)
void ExecEndNode(PlanState *node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
void ExecAssignExprContext(EState *estate, PlanState *planstate)
#define outerPlanState(node)
struct HashJoinTupleData * HashJoinTuple
#define EXEC_FLAG_BACKWARD
#define ResetExprContext(econtext)
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
static TupleTableSlot * ExecProcNode(PlanState *node)
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
#define palloc_object(type)
#define repalloc_array(pointer, type, count)
#define palloc0_array(type, count)
#define palloc0_object(type)
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
double hash_mem_multiplier
#define PHJ_GROW_BATCHES_REPARTITION
struct HashMemoryChunkData * HashMemoryChunk
#define HASH_CHUNK_DATA(hc)
#define PHJ_GROW_BUCKETS_REINSERT
#define SKEW_MIN_OUTER_FRACTION
#define PHJ_GROW_BUCKETS_ELECT
#define PHJ_GROW_BUCKETS_PHASE(n)
#define PHJ_GROW_BATCHES_ELECT
#define ParallelHashJoinBatchInner(batch)
#define PHJ_BUILD_HASH_INNER
#define NthParallelHashJoinBatch(base, n)
#define HASH_CHUNK_THRESHOLD
#define PHJ_BUILD_HASH_OUTER
#define HJTUPLE_MINTUPLE(hjtup)
#define SKEW_BUCKET_OVERHEAD
#define PHJ_GROW_BATCHES_DECIDE
#define PHJ_GROW_BATCHES_REALLOCATE
#define HASH_CHUNK_HEADER_SIZE
#define PHJ_GROW_BATCHES_FINISH
#define ParallelHashJoinBatchOuter(batch, nparticipants)
#define SKEW_HASH_MEM_PERCENT
#define PHJ_BUILD_ALLOCATE
#define PHJ_GROW_BUCKETS_REALLOCATE
#define PHJ_GROW_BATCHES_PHASE(n)
@ PHJ_GROWTH_NEED_MORE_BUCKETS
@ PHJ_GROWTH_NEED_MORE_BATCHES
#define INVALID_SKEW_BUCKET_NO
#define EstimateParallelHashJoinBatch(hashtable)
void heap_free_minimal_tuple(MinimalTuple mtup)
#define HeapTupleIsValid(tuple)
#define SizeofMinimalTupleHeader
static void HeapTupleHeaderClearMatch(MinimalTupleData *tup)
static bool HeapTupleHeaderHasMatch(const MinimalTupleData *tup)
void InstrStartNode(Instrumentation *instr)
void InstrStopNode(Instrumentation *instr, double nTuples)
void free_attstatsslot(AttStatsSlot *sslot)
bool get_attstatsslot(AttStatsSlot *sslot, HeapTuple statstuple, int reqkind, Oid reqop, int flags)
#define ATTSTATSSLOT_NUMBERS
#define ATTSTATSSLOT_VALUES
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void * MemoryContextAlloc(MemoryContext context, Size size)
void MemoryContextReset(MemoryContext context)
void * MemoryContextAllocZero(MemoryContext context, Size size)
void pfree(void *pointer)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
void ExecParallelHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
void ExecHashTableReset(HashJoinTable hashtable)
static void ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable, Hash *node, int mcvsToUse)
static HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno)
void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
static bool ExecHashIncreaseBatchSize(HashJoinTable hashtable)
bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
static void * dense_alloc(HashJoinTable hashtable, Size size)
static void MultiExecParallelHash(HashState *node)
void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
static void MultiExecPrivateHash(HashState *node)
void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
Node * MultiExecHash(HashState *node)
HashState * ExecInitHash(Hash *node, EState *estate, int eflags)
void ExecHashTableDetachBatch(HashJoinTable hashtable)
void ExecHashEstimate(HashState *node, ParallelContext *pcxt)
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
void ExecHashTableDetach(HashJoinTable hashtable)
bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
void ExecHashTableDestroy(HashJoinTable hashtable)
HashJoinTable ExecHashTableCreate(HashState *state)
int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
size_t get_hash_memory_limit(void)
bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
static void ExecHashSkewTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue, int bucketNumber)
static void ExecParallelHashRepartitionRest(HashJoinTable hashtable)
static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
void ExecHashTableResetMatchFlags(HashJoinTable hashtable)
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable)
static HashJoinTuple ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple)
void ExecEndHash(HashState *node)
void ExecShutdownHash(HashState *node)
void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
static TupleTableSlot * ExecHash(PlanState *pstate)
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
static void ExecParallelHashMergeCounters(HashJoinTable hashtable)
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
Tuplestorestate * ExecHashBuildNullTupleStore(HashJoinTable hashtable)
static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared)
void ExecReScanHash(HashState *node)
bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable)
static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
void ExecHashRetrieveInstrumentation(HashState *node)
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr, HashJoinTable hashtable)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define repalloc0_array(pointer, type, oldcount, count)
static uint32 pg_nextpower2_32(uint32 num)
static uint32 pg_rotate_right32(uint32 word, int n)
#define pg_nextpower2_size_t
static uint32 pg_ceil_log2_32(uint32 num)
static uint32 pg_prevpower2_32(uint32 num)
#define pg_prevpower2_size_t
static uint32 DatumGetUInt32(Datum X)
static Datum Int16GetDatum(int16 X)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
SharedTuplestoreAccessor * sts_attach(SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
void sts_end_write(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * sts_initialize(SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
#define SHARED_TUPLESTORE_SINGLE_PASS
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_estimate_keys(e, cnt)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
TupleTableSlot * ecxt_innertuple
TupleTableSlot * ecxt_outertuple
struct HashJoinTupleData ** unshared
ParallelHashJoinBatchAccessor * batches
ParallelHashJoinState * parallel_state
union HashJoinTableData::@113 buckets
HashMemoryChunk current_chunk
BufFile ** innerBatchFile
int log2_nbuckets_optimal
dsa_pointer_atomic * shared
BufFile ** outerBatchFile
dsa_pointer current_chunk_shared
HashSkewBucket ** skewBucket
union HashJoinTupleData::@111 next
struct HashJoinTupleData * unshared
struct HashMemoryChunkData * unshared
union HashMemoryChunkData::@112 next
struct ParallelHashJoinState * parallel_state
SharedHashInfo * shared_info
Tuplestorestate * null_tuple_store
HashInstrumentation * hinstrument
shm_toc_estimator estimator
SharedTuplestoreAccessor * outer_tuples
ParallelHashJoinBatch * shared
SharedTuplestoreAccessor * inner_tuples
Barrier grow_batches_barrier
dsa_pointer chunk_work_queue
Barrier grow_buckets_barrier
ParallelHashGrowth growth
Instrumentation * instrument
ExprContext * ps_ExprContext
HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER]
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache3(SysCacheIdentifier cacheId, Datum key1, Datum key2, Datum key3)
void tuplestore_puttupleslot(Tuplestorestate *state, TupleTableSlot *slot)
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)