124 #define HJ_BUILD_HASHTABLE 1 125 #define HJ_NEED_NEW_OUTER 2 126 #define HJ_SCAN_BUCKET 3 127 #define HJ_FILL_OUTER_TUPLE 4 128 #define HJ_FILL_INNER_TUPLES 5 129 #define HJ_NEED_NEW_BATCH 6 132 #define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL) 134 #define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL) 216 Assert(hashtable == NULL);
327 if (hashtable->
nbatch > 1)
390 if (batchno != hashtable->
curbatch &&
401 Assert(parallel_state == NULL);
454 if (joinqual == NULL ||
ExecQual(joinqual, econtext))
493 if (otherqual == NULL ||
ExecQual(otherqual, econtext))
520 if (otherqual == NULL ||
ExecQual(otherqual, econtext))
547 if (otherqual == NULL ||
ExecQual(otherqual, econtext))
572 elog(
ERROR,
"unrecognized hashjoin state: %d",
707 elog(
ERROR,
"unrecognized join type: %d",
730 hjstate->js.joinqual =
732 hjstate->hashclauses =
738 hjstate->hj_HashTable = NULL;
739 hjstate->hj_FirstOuterTupleSlot = NULL;
741 hjstate->hj_CurHashValue = 0;
742 hjstate->hj_CurBucketNo = 0;
744 hjstate->hj_CurTuple = NULL;
752 hjstate->hj_MatchedOuter =
false;
753 hjstate->hj_OuterNotEmpty =
false;
855 else if (curbatch < hashtable->nbatch)
895 if (curbatch == 0 && hashtable->
nbatch == 1)
918 else if (curbatch < hashtable->nbatch)
956 nbatch = hashtable->
nbatch;
1002 while (curbatch < nbatch &&
1029 if (curbatch >= nbatch)
1041 if (innerFile != NULL)
1046 errmsg(
"could not rewind hash-join temporary file")));
1076 errmsg(
"could not rewind hash-join temporary file")));
1098 if (hashtable->
batches == NULL)
1117 batchno = start_batchno =
1196 elog(
ERROR,
"unexpected batch phase %d",
1200 batchno = (batchno + 1) % hashtable->
nbatch;
1201 }
while (batchno != start_batchno);
1263 nread =
BufFileRead(file, (
void *) header,
sizeof(header));
1269 if (nread !=
sizeof(header))
1272 errmsg(
"could not read from hash-join temporary file: read only %zu of %zu bytes",
1273 nread,
sizeof(header))));
1274 *hashvalue = header[0];
1276 tuple->
t_len = header[1];
1278 (
void *) ((
char *) tuple +
sizeof(
uint32)),
1279 header[1] -
sizeof(
uint32));
1280 if (nread != header[1] -
sizeof(
uint32))
1283 errmsg(
"could not read from hash-join temporary file: read only %zu of %zu bytes",
1284 nread, header[1] -
sizeof(
uint32))));
1423 &hashvalue, mintup);
1432 for (i = 0; i < hashtable->
nbatch; ++
i)
1454 if (pcxt->
seg == NULL)
struct ParallelHashJoinState * parallel_state
SharedTuplestoreAccessor * outer_tuples
#define HJ_NEED_NEW_BATCH
#define INVALID_SKEW_BUCKET_NO
dsa_pointer chunk_work_queue
void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
TupleTableSlot * hj_NullInnerTupleSlot
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
ProjectionInfo * ps_ProjInfo
Instrumentation * instrument
#define InvalidDsaPointer
const TupleTableSlotOps * ExecGetResultSlotOps(PlanState *planstate, bool *isfixed)
void BarrierInit(Barrier *barrier, int participants)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
static TupleTableSlot * ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue)
#define castNode(_type_, nodeptr)
void ExecEndNode(PlanState *node)
bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
void ExecHashTableDetachBatch(HashJoinTable hashtable)
void ExecHashTableDetach(HashJoinTable hashtable)
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
#define PHJ_BATCH_ALLOCATING
void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
ExprContext * ps_ExprContext
void ExecHashTableReset(HashJoinTable hashtable)
shm_toc_estimator estimator
void ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
void ExecReScan(PlanState *node)
const TupleTableSlotOps TTSOpsVirtual
void ExecShutdownHashJoin(HashJoinState *node)
static TupleTableSlot * ExecHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue)
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
TupleTableSlot * hj_OuterTupleSlot
struct PlanState * righttree
static bool ExecQual(ExprState *state, ExprContext *econtext)
#define shm_toc_estimate_chunk(e, sz)
TupleTableSlot * hj_FirstOuterTupleSlot
#define PHJ_BUILD_HASHING_OUTER
void ExecFreeExprContext(PlanState *planstate)
void BufFileClose(BufFile *file)
int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
ExprState * ExecInitQual(List *qual, PlanState *parent)
#define PHJ_BATCH_LOADING
#define PHJ_BATCH_ELECTING
struct PlanState * lefttree
void ExecEndHashJoin(HashJoinState *node)
HashJoinTable ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, bool keepNulls)
SharedTuplestoreAccessor * inner_tuples
#define HJ_FILL_INNER(hjstate)
void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
TupleTableSlot * ps_ResultTupleSlot
Barrier grow_buckets_barrier
TupleTableSlot * hj_NullOuterTupleSlot
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
static void ExecParallelHashJoinPartitionOuter(HashJoinState *node)
void ExecForceStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
BufFile * BufFileCreateTemp(bool interXact)
void heap_free_minimal_tuple(MinimalTuple mtup)
void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
#define EXEC_FLAG_BACKWARD
BufFile ** outerBatchFile
#define outerPlanState(node)
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
HashJoinTuple hj_CurTuple
MinimalTupleData * MinimalTuple
bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
int errcode_for_file_access(void)
TupleTableSlot * ecxt_innertuple
List * ExecInitExprList(List *nodes, PlanState *parent)
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
#define InstrCountFiltered1(node, delta)
HashInstrumentation * hinstrument
void LWLockInitialize(LWLock *lock, int tranche_id)
#define HJ_FILL_INNER_TUPLES
void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
#define HJ_FILL_OUTER(hjstate)
HashSkewBucket ** skewBucket
HashJoinState * ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
int BarrierAttach(Barrier *barrier)
void * palloc0(Size size)
ExecProcNodeMtd ExecProcNode
ParallelHashJoinState * parallel_state
static TupleTableSlot * ExecProcNode(PlanState *node)
#define PHJ_BATCH_PROBING
void ExecHashJoinInitializeWorker(HashJoinState *state, ParallelWorkerContext *pwcxt)
void ExecSetExecProcNode(PlanState *node, ExecProcNodeMtd function)
ParallelHashJoinBatchAccessor * batches
#define ereport(elevel,...)
#define HJTUPLE_MINTUPLE(hjtup)
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
TupleTableSlot * ecxt_outertuple
void SharedFileSetDeleteAll(SharedFileSet *fileset)
#define Assert(condition)
TupleTableSlot * ExecInitNullTupleSlot(EState *estate, TupleDesc tupType, const TupleTableSlotOps *tts_ops)
ParallelHashGrowth growth
#define InstrCountFiltered2(node, delta)
bool BarrierDetach(Barrier *barrier)
static pg_attribute_always_inline TupleTableSlot * ExecHashJoinImpl(PlanState *pstate, bool parallel)
void ExecAssignExprContext(EState *estate, PlanState *planstate)
BufFile ** innerBatchFile
#define shm_toc_estimate_keys(e, cnt)
int BarrierPhase(Barrier *barrier)
static TupleTableSlot * ExecHashJoinGetSavedTuple(HashJoinState *hjstate, BufFile *file, uint32 *hashvalue, TupleTableSlot *tupleSlot)
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
static TupleTableSlot * ExecParallelHashJoin(PlanState *pstate)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
static void header(const char *fmt,...) pg_attribute_printf(1
TupleDesc ExecGetResultType(PlanState *planstate)
ParallelHashJoinBatch * shared
#define HJ_NEED_NEW_OUTER
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
void ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
TupleTableSlot * hj_HashTupleSlot
pg_atomic_uint32 distributor
#define HJ_BUILD_HASHTABLE
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
HashJoinTable hj_HashTable
int errmsg(const char *fmt,...)
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
Node * MultiExecProcNode(PlanState *node)
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
#define HeapTupleHeaderSetMatch(tup)
static TupleTableSlot * ExecHashJoin(PlanState *pstate)
void ExecHashTableResetMatchFlags(HashJoinTable hashtable)
#define pg_attribute_always_inline
bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, bool outer_tuple, bool keep_nulls, uint32 *hashvalue)
#define CHECK_FOR_INTERRUPTS()
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
#define HJ_FILL_OUTER_TUPLE
void BufFileWrite(BufFile *file, void *ptr, size_t size)
#define innerPlanState(node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
static bool ExecHashJoinNewBatch(HashJoinState *hjstate)
Barrier grow_batches_barrier
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr)
void ExecHashTableDestroy(HashJoinTable hashtable)
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
#define ResetExprContext(econtext)
void sts_end_write(SharedTuplestoreAccessor *accessor)
void ExecReScanHashJoin(HashJoinState *node)