92 elog(
ERROR,
"Hash node does not support ExecProcNode call convention");
450 size_t space_allowed;
477 state->parallel_state->nparticipants - 1 : 0,
483 Assert(nbuckets == (1 << log2_nbuckets));
504 hashtable->
nbatch = nbatch;
523 hashtable->
area =
state->ps.state->es_query_dsa;
527 printf(
"Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
528 hashtable, nbatch, nbuckets);
654#define NTUP_PER_BUCKET 1
659 int parallel_workers,
660 size_t *space_allowed,
785 nbuckets =
Max(nbuckets, 1024);
809 false, parallel_workers,
912 if ((*space_allowed) > (
SIZE_MAX / 2))
925 if (nbatch < (*space_allowed) /
BLCKSZ)
935 *space_allowed = (*space_allowed) * 2;
943 *numbuckets = nbuckets;
944 *numbatches = nbatch;
1032 int curbatch = hashtable->
curbatch;
1054 printf(
"Hashjoin %p: increasing nbatch to %d because space = %zu\n",
1055 hashtable, nbatch, hashtable->
spaceUsed);
1078 hashtable->
nbatch = nbatch;
1169 printf(
"Hashjoin %p: freed %ld of %ld tuples, space now %zu\n",
1185 printf(
"Hashjoin %p: disabling further increase of nbatch\n",
1236 if (hashtable->
nbatch == 1)
1365 bool space_exhausted =
false;
1375 for (
int i = 0;
i < hashtable->
nbatch; ++
i)
1382 if (
batch->space_exhausted ||
1384 space_exhausted =
true;
1405 else if (space_exhausted)
1508 for (
i = 1;
i < old_nbatch; ++
i)
1519 for (
i = 1;
i < old_nbatch; ++
i)
1568 batch->shared->estimated_size +=
batch->estimated_size;
1570 batch->shared->old_ntuples +=
batch->old_ntuples;
1572 batch->estimated_size = 0;
1574 batch->old_ntuples = 0;
1595 printf(
"Hashjoin %p: increasing nbuckets %d => %d\n",
1796 if (hashtable->
nbatch == 1 &&
1969 *
bucketno = hashvalue & (nbuckets - 1);
1975 *
bucketno = hashvalue & (nbuckets - 1);
2114 hjstate->hj_CurSkewBucketNo = 0;
2127 int curbatch = hashtable->
curbatch;
2167 if (
batch->skip_unmatched)
2213 hjstate->hj_CurSkewBucketNo++;
2329 int nbuckets = hashtable->
nbuckets;
2514 bucket = hashvalue & (nbuckets - 1);
2850 if (shared_info ==
NULL)
2983 int curbatch = hashtable->
curbatch;
3001 chunk->used += size;
3062 if (hashtable->
nbatch == 1)
3146 hashtable->
nbatch = nbatch;
3299 for (
i = 0;
i < nbuckets; ++
i)
3313 int curbatch = hashtable->
curbatch;
3344 batch->skip_unmatched =
true;
3588 batch->at_least_one_chunk &&
3596 batch->shared->space_exhausted =
true;
3603 batch->at_least_one_chunk =
true;
3631 return (
size_t) mem_limit;
Datum idx(PG_FUNCTION_ARGS)
void PrepareTempTablespaces(void)
bool BarrierArriveAndDetachExceptLast(Barrier *barrier)
bool BarrierArriveAndDetach(Barrier *barrier)
int BarrierAttach(Barrier *barrier)
void BarrierInit(Barrier *barrier, int participants)
int BarrierPhase(Barrier *barrier)
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
bool BarrierDetach(Barrier *barrier)
void BufFileClose(BufFile *file)
#define Assert(condition)
#define OidIsValid(objectId)
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
void dsa_free(dsa_area *area, dsa_pointer dp)
#define dsa_allocate0(area, size)
#define dsa_pointer_atomic_init
#define dsa_allocate(area, size)
#define dsa_pointer_atomic_write
#define InvalidDsaPointer
#define dsa_pointer_atomic_compare_exchange
#define dsa_pointer_atomic_read
pg_atomic_uint64 dsa_pointer_atomic
#define DsaPointerIsValid(x)
void ExecReScan(PlanState *node)
void ExecEndNode(PlanState *node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
void ExecAssignExprContext(EState *estate, PlanState *planstate)
#define outerPlanState(node)
struct HashJoinTupleData * HashJoinTuple
#define EXEC_FLAG_BACKWARD
#define ResetExprContext(econtext)
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
static TupleTableSlot * ExecProcNode(PlanState *node)
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
#define palloc_object(type)
#define repalloc_array(pointer, type, count)
#define palloc0_array(type, count)
#define palloc0_object(type)
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
double hash_mem_multiplier
#define PHJ_GROW_BATCHES_REPARTITION
struct HashMemoryChunkData * HashMemoryChunk
#define HASH_CHUNK_DATA(hc)
#define PHJ_GROW_BUCKETS_REINSERT
#define SKEW_MIN_OUTER_FRACTION
#define PHJ_GROW_BUCKETS_ELECT
#define PHJ_GROW_BUCKETS_PHASE(n)
#define PHJ_GROW_BATCHES_ELECT
#define ParallelHashJoinBatchInner(batch)
#define PHJ_BUILD_HASH_INNER
#define NthParallelHashJoinBatch(base, n)
#define HASH_CHUNK_THRESHOLD
#define PHJ_BUILD_HASH_OUTER
#define HJTUPLE_MINTUPLE(hjtup)
#define SKEW_BUCKET_OVERHEAD
#define PHJ_GROW_BATCHES_DECIDE
#define PHJ_GROW_BATCHES_REALLOCATE
#define HASH_CHUNK_HEADER_SIZE
#define PHJ_GROW_BATCHES_FINISH
#define ParallelHashJoinBatchOuter(batch, nparticipants)
#define SKEW_HASH_MEM_PERCENT
#define PHJ_BUILD_ALLOCATE
#define PHJ_GROW_BUCKETS_REALLOCATE
#define PHJ_GROW_BATCHES_PHASE(n)
@ PHJ_GROWTH_NEED_MORE_BUCKETS
@ PHJ_GROWTH_NEED_MORE_BATCHES
#define INVALID_SKEW_BUCKET_NO
#define EstimateParallelHashJoinBatch(hashtable)
void heap_free_minimal_tuple(MinimalTuple mtup)
#define HeapTupleIsValid(tuple)
#define SizeofMinimalTupleHeader
static void HeapTupleHeaderClearMatch(MinimalTupleData *tup)
static bool HeapTupleHeaderHasMatch(const MinimalTupleData *tup)
void InstrStartNode(Instrumentation *instr)
void InstrStopNode(Instrumentation *instr, double nTuples)
void free_attstatsslot(AttStatsSlot *sslot)
bool get_attstatsslot(AttStatsSlot *sslot, HeapTuple statstuple, int reqkind, Oid reqop, int flags)
#define ATTSTATSSLOT_NUMBERS
#define ATTSTATSSLOT_VALUES
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void * MemoryContextAlloc(MemoryContext context, Size size)
void MemoryContextReset(MemoryContext context)
void * MemoryContextAllocZero(MemoryContext context, Size size)
void pfree(void *pointer)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
void ExecParallelHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
void ExecHashTableReset(HashJoinTable hashtable)
static void ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable, Hash *node, int mcvsToUse)
static HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno)
void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
static bool ExecHashIncreaseBatchSize(HashJoinTable hashtable)
bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
static void * dense_alloc(HashJoinTable hashtable, Size size)
static void MultiExecParallelHash(HashState *node)
void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
static void MultiExecPrivateHash(HashState *node)
void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
Node * MultiExecHash(HashState *node)
HashState * ExecInitHash(Hash *node, EState *estate, int eflags)
void ExecHashTableDetachBatch(HashJoinTable hashtable)
void ExecHashEstimate(HashState *node, ParallelContext *pcxt)
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
void ExecHashTableDetach(HashJoinTable hashtable)
bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
void ExecHashTableDestroy(HashJoinTable hashtable)
HashJoinTable ExecHashTableCreate(HashState *state)
int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
size_t get_hash_memory_limit(void)
bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
static void ExecHashSkewTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue, int bucketNumber)
static void ExecParallelHashRepartitionRest(HashJoinTable hashtable)
static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
void ExecHashTableResetMatchFlags(HashJoinTable hashtable)
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable)
static HashJoinTuple ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple)
void ExecEndHash(HashState *node)
void ExecShutdownHash(HashState *node)
void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
static TupleTableSlot * ExecHash(PlanState *pstate)
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
static void ExecParallelHashMergeCounters(HashJoinTable hashtable)
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared)
void ExecReScanHash(HashState *node)
bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable)
static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
void ExecHashRetrieveInstrumentation(HashState *node)
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr, HashJoinTable hashtable)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define repalloc0_array(pointer, type, oldcount, count)
static uint32 pg_nextpower2_32(uint32 num)
static uint32 pg_rotate_right32(uint32 word, int n)
#define pg_nextpower2_size_t
static uint32 pg_ceil_log2_32(uint32 num)
static uint32 pg_prevpower2_32(uint32 num)
#define pg_prevpower2_size_t
static uint32 DatumGetUInt32(Datum X)
static Datum Int16GetDatum(int16 X)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
SharedTuplestoreAccessor * sts_attach(SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
void sts_end_write(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * sts_initialize(SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
#define SHARED_TUPLESTORE_SINGLE_PASS
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_estimate_keys(e, cnt)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
TupleTableSlot * ecxt_innertuple
TupleTableSlot * ecxt_outertuple
struct HashJoinTupleData ** unshared
union HashJoinTableData::@111 buckets
ParallelHashJoinBatchAccessor * batches
ParallelHashJoinState * parallel_state
HashMemoryChunk current_chunk
BufFile ** innerBatchFile
int log2_nbuckets_optimal
dsa_pointer_atomic * shared
BufFile ** outerBatchFile
dsa_pointer current_chunk_shared
HashSkewBucket ** skewBucket
union HashJoinTupleData::@109 next
struct HashJoinTupleData * unshared
struct HashMemoryChunkData * unshared
union HashMemoryChunkData::@110 next
struct ParallelHashJoinState * parallel_state
SharedHashInfo * shared_info
HashInstrumentation * hinstrument
shm_toc_estimator estimator
SharedTuplestoreAccessor * outer_tuples
ParallelHashJoinBatch * shared
SharedTuplestoreAccessor * inner_tuples
Barrier grow_batches_barrier
dsa_pointer chunk_work_queue
Barrier grow_buckets_barrier
ParallelHashGrowth growth
Instrumentation * instrument
ExprContext * ps_ExprContext
HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER]
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache3(int cacheId, Datum key1, Datum key2, Datum key3)