PostgreSQL Source Code  git master
nodeHash.h File Reference
#include "access/parallel.h"
#include "nodes/execnodes.h"
Include dependency graph for nodeHash.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

HashStateExecInitHash (Hash *node, EState *estate, int eflags)
 
NodeMultiExecHash (HashState *node)
 
void ExecEndHash (HashState *node)
 
void ExecReScanHash (HashState *node)
 
HashJoinTable ExecHashTableCreate (HashState *state, List *hashOperators, List *hashCollations, bool keepNulls)
 
void ExecParallelHashTableAlloc (HashJoinTable hashtable, int batchno)
 
void ExecHashTableDestroy (HashJoinTable hashtable)
 
void ExecHashTableDetach (HashJoinTable hashtable)
 
void ExecHashTableDetachBatch (HashJoinTable hashtable)
 
void ExecParallelHashTableSetCurrentBatch (HashJoinTable hashtable, int batchno)
 
void ExecHashTableInsert (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
void ExecParallelHashTableInsert (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
void ExecParallelHashTableInsertCurrentBatch (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
bool ExecHashGetHashValue (HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, bool outer_tuple, bool keep_nulls, uint32 *hashvalue)
 
void ExecHashGetBucketAndBatch (HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
 
bool ExecScanHashBucket (HashJoinState *hjstate, ExprContext *econtext)
 
bool ExecParallelScanHashBucket (HashJoinState *hjstate, ExprContext *econtext)
 
void ExecPrepHashTableForUnmatched (HashJoinState *hjstate)
 
bool ExecScanHashTableForUnmatched (HashJoinState *hjstate, ExprContext *econtext)
 
void ExecHashTableReset (HashJoinTable hashtable)
 
void ExecHashTableResetMatchFlags (HashJoinTable hashtable)
 
void ExecChooseHashTableSize (double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
 
int ExecHashGetSkewBucket (HashJoinTable hashtable, uint32 hashvalue)
 
void ExecHashEstimate (HashState *node, ParallelContext *pcxt)
 
void ExecHashInitializeDSM (HashState *node, ParallelContext *pcxt)
 
void ExecHashInitializeWorker (HashState *node, ParallelWorkerContext *pwcxt)
 
void ExecHashRetrieveInstrumentation (HashState *node)
 
void ExecShutdownHash (HashState *node)
 
void ExecHashAccumInstrumentation (HashInstrumentation *instrument, HashJoinTable hashtable)
 

Function Documentation

◆ ExecChooseHashTableSize()

void ExecChooseHashTableSize ( double  ntuples,
int  tupwidth,
bool  useskew,
bool  try_combined_hash_mem,
int  parallel_workers,
size_t *  space_allowed,
int *  numbuckets,
int *  numbatches,
int *  num_skew_mcvs 
)

Definition at line 668 of file nodeHash.c.

References Assert, ExecChooseHashTableSize(), get_hash_mem(), HJTUPLE_OVERHEAD, Max, MAXALIGN, MaxAllocSize, Min, my_log2(), NTUP_PER_BUCKET, pg_nextpower2_32(), SizeofMinimalTupleHeader, SKEW_BUCKET_OVERHEAD, and SKEW_HASH_MEM_PERCENT.

Referenced by ExecChooseHashTableSize(), ExecHashTableCreate(), and initial_cost_hashjoin().

675 {
676  int tupsize;
677  double inner_rel_bytes;
678  long bucket_bytes;
679  long hash_table_bytes;
680  long skew_table_bytes;
681  long max_pointers;
682  long mppow2;
683  int nbatch = 1;
684  int nbuckets;
685  double dbuckets;
686  int hash_mem = get_hash_mem();
687 
688  /* Force a plausible relation size if no info */
689  if (ntuples <= 0.0)
690  ntuples = 1000.0;
691 
692  /*
693  * Estimate tupsize based on footprint of tuple in hashtable... note this
694  * does not allow for any palloc overhead. The manipulations of spaceUsed
695  * don't count palloc overhead either.
696  */
697  tupsize = HJTUPLE_OVERHEAD +
699  MAXALIGN(tupwidth);
700  inner_rel_bytes = ntuples * tupsize;
701 
702  /*
703  * Target in-memory hashtable size is hash_mem kilobytes.
704  */
705  hash_table_bytes = hash_mem * 1024L;
706 
707  /*
708  * Parallel Hash tries to use the combined hash_mem of all workers to
709  * avoid the need to batch. If that won't work, it falls back to hash_mem
710  * per worker and tries to process batches in parallel.
711  */
712  if (try_combined_hash_mem)
713  hash_table_bytes += hash_table_bytes * parallel_workers;
714 
715  *space_allowed = hash_table_bytes;
716 
717  /*
718  * If skew optimization is possible, estimate the number of skew buckets
719  * that will fit in the memory allowed, and decrement the assumed space
720  * available for the main hash table accordingly.
721  *
722  * We make the optimistic assumption that each skew bucket will contain
723  * one inner-relation tuple. If that turns out to be low, we will recover
724  * at runtime by reducing the number of skew buckets.
725  *
726  * hashtable->skewBucket will have up to 8 times as many HashSkewBucket
727  * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash
728  * will round up to the next power of 2 and then multiply by 4 to reduce
729  * collisions.
730  */
731  if (useskew)
732  {
733  skew_table_bytes = hash_table_bytes * SKEW_HASH_MEM_PERCENT / 100;
734 
735  /*----------
736  * Divisor is:
737  * size of a hash tuple +
738  * worst-case size of skewBucket[] per MCV +
739  * size of skewBucketNums[] entry +
740  * size of skew bucket struct itself
741  *----------
742  */
743  *num_skew_mcvs = skew_table_bytes / (tupsize +
744  (8 * sizeof(HashSkewBucket *)) +
745  sizeof(int) +
747  if (*num_skew_mcvs > 0)
748  hash_table_bytes -= skew_table_bytes;
749  }
750  else
751  *num_skew_mcvs = 0;
752 
753  /*
754  * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when
755  * memory is filled, assuming a single batch; but limit the value so that
756  * the pointer arrays we'll try to allocate do not exceed hash_mem nor
757  * MaxAllocSize.
758  *
759  * Note that both nbuckets and nbatch must be powers of 2 to make
760  * ExecHashGetBucketAndBatch fast.
761  */
762  max_pointers = *space_allowed / sizeof(HashJoinTuple);
763  max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));
764  /* If max_pointers isn't a power of 2, must round it down to one */
765  mppow2 = 1L << my_log2(max_pointers);
766  if (max_pointers != mppow2)
767  max_pointers = mppow2 / 2;
768 
769  /* Also ensure we avoid integer overflow in nbatch and nbuckets */
770  /* (this step is redundant given the current value of MaxAllocSize) */
771  max_pointers = Min(max_pointers, INT_MAX / 2);
772 
773  dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
774  dbuckets = Min(dbuckets, max_pointers);
775  nbuckets = (int) dbuckets;
776  /* don't let nbuckets be really small, though ... */
777  nbuckets = Max(nbuckets, 1024);
778  /* ... and force it to be a power of 2. */
779  nbuckets = 1 << my_log2(nbuckets);
780 
781  /*
782  * If there's not enough space to store the projected number of tuples and
783  * the required bucket headers, we will need multiple batches.
784  */
785  bucket_bytes = sizeof(HashJoinTuple) * nbuckets;
786  if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
787  {
788  /* We'll need multiple batches */
789  long lbuckets;
790  double dbatch;
791  int minbatch;
792  long bucket_size;
793 
794  /*
795  * If Parallel Hash with combined hash_mem would still need multiple
796  * batches, we'll have to fall back to regular hash_mem budget.
797  */
798  if (try_combined_hash_mem)
799  {
800  ExecChooseHashTableSize(ntuples, tupwidth, useskew,
801  false, parallel_workers,
802  space_allowed,
803  numbuckets,
804  numbatches,
805  num_skew_mcvs);
806  return;
807  }
808 
809  /*
810  * Estimate the number of buckets we'll want to have when hash_mem is
811  * entirely full. Each bucket will contain a bucket pointer plus
812  * NTUP_PER_BUCKET tuples, whose projected size already includes
813  * overhead for the hash code, pointer to the next tuple, etc.
814  */
815  bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple));
816  lbuckets = 1L << my_log2(hash_table_bytes / bucket_size);
817  lbuckets = Min(lbuckets, max_pointers);
818  nbuckets = (int) lbuckets;
819  nbuckets = 1 << my_log2(nbuckets);
820  bucket_bytes = nbuckets * sizeof(HashJoinTuple);
821 
822  /*
823  * Buckets are simple pointers to hashjoin tuples, while tupsize
824  * includes the pointer, hash code, and MinimalTupleData. So buckets
825  * should never really exceed 25% of hash_mem (even for
826  * NTUP_PER_BUCKET=1); except maybe for hash_mem values that are not
827  * 2^N bytes, where we might get more because of doubling. So let's
828  * look for 50% here.
829  */
830  Assert(bucket_bytes <= hash_table_bytes / 2);
831 
832  /* Calculate required number of batches. */
833  dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
834  dbatch = Min(dbatch, max_pointers);
835  minbatch = (int) dbatch;
836  nbatch = pg_nextpower2_32(Max(2, minbatch));
837  }
838 
839  Assert(nbuckets > 0);
840  Assert(nbatch > 0);
841 
842  *numbuckets = nbuckets;
843  *numbatches = nbatch;
844 }
#define SKEW_HASH_MEM_PERCENT
Definition: hashjoin.h:110
#define SKEW_BUCKET_OVERHEAD
Definition: hashjoin.h:108
#define Min(x, y)
Definition: c.h:928
struct HashJoinTupleData * HashJoinTuple
Definition: execnodes.h:1934
static uint32 pg_nextpower2_32(uint32 num)
Definition: pg_bitutils.h:146
int my_log2(long num)
Definition: dynahash.c:1730
#define MaxAllocSize
Definition: memutils.h:40
#define SizeofMinimalTupleHeader
Definition: htup_details.h:649
#define NTUP_PER_BUCKET
Definition: nodeHash.c:665
#define HJTUPLE_OVERHEAD
Definition: hashjoin.h:79
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
Definition: nodeHash.c:668
#define Max(x, y)
Definition: c.h:922
#define Assert(condition)
Definition: c.h:746
#define MAXALIGN(LEN)
Definition: c.h:699
int get_hash_mem(void)
Definition: nodeHash.c:3389

◆ ExecEndHash()

void ExecEndHash ( HashState node)

Definition at line 407 of file nodeHash.c.

References ExecEndNode(), ExecFreeExprContext(), outerPlan, outerPlanState, and HashState::ps.

Referenced by ExecEndNode().

408 {
410 
411  /*
412  * free exprcontext
413  */
414  ExecFreeExprContext(&node->ps);
415 
416  /*
417  * shut down the subplan
418  */
419  outerPlan = outerPlanState(node);
420  ExecEndNode(outerPlan);
421 }
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:543
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:649
#define outerPlanState(node)
Definition: execnodes.h:1033
PlanState ps
Definition: execnodes.h:2403
#define outerPlan(node)
Definition: plannodes.h:166

◆ ExecHashAccumInstrumentation()

void ExecHashAccumInstrumentation ( HashInstrumentation instrument,
HashJoinTable  hashtable 
)

Definition at line 2691 of file nodeHash.c.

References Max, HashJoinTableData::nbatch, HashInstrumentation::nbatch, HashJoinTableData::nbatch_original, HashInstrumentation::nbatch_original, HashJoinTableData::nbuckets, HashInstrumentation::nbuckets, HashJoinTableData::nbuckets_original, HashInstrumentation::nbuckets_original, HashInstrumentation::space_peak, and HashJoinTableData::spacePeak.

Referenced by ExecReScanHashJoin(), and ExecShutdownHash().

2693 {
2694  instrument->nbuckets = Max(instrument->nbuckets,
2695  hashtable->nbuckets);
2696  instrument->nbuckets_original = Max(instrument->nbuckets_original,
2697  hashtable->nbuckets_original);
2698  instrument->nbatch = Max(instrument->nbatch,
2699  hashtable->nbatch);
2700  instrument->nbatch_original = Max(instrument->nbatch_original,
2701  hashtable->nbatch_original);
2702  instrument->space_peak = Max(instrument->space_peak,
2703  hashtable->spacePeak);
2704 }
#define Max(x, y)
Definition: c.h:922

◆ ExecHashEstimate()

void ExecHashEstimate ( HashState node,
ParallelContext pcxt 
)

Definition at line 2574 of file nodeHash.c.

References add_size(), ParallelContext::estimator, PlanState::instrument, mul_size(), ParallelContext::nworkers, offsetof, HashState::ps, shm_toc_estimate_chunk, and shm_toc_estimate_keys.

Referenced by ExecParallelEstimate().

2575 {
2576  size_t size;
2577 
2578  /* don't need this if not instrumenting or no workers */
2579  if (!node->ps.instrument || pcxt->nworkers == 0)
2580  return;
2581 
2582  size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation));
2583  size = add_size(size, offsetof(SharedHashInfo, hinstrument));
2584  shm_toc_estimate_chunk(&pcxt->estimator, size);
2585  shm_toc_estimate_keys(&pcxt->estimator, 1);
2586 }
Instrumentation * instrument
Definition: execnodes.h:949
shm_toc_estimator estimator
Definition: parallel.h:42
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
PlanState ps
Definition: execnodes.h:2403
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
#define offsetof(type, field)
Definition: c.h:669

◆ ExecHashGetBucketAndBatch()

void ExecHashGetBucketAndBatch ( HashJoinTable  hashtable,
uint32  hashvalue,
int *  bucketno,
int *  batchno 
)

Definition at line 1902 of file nodeHash.c.

References HashJoinTableData::log2_nbuckets, HashJoinTableData::nbatch, HashJoinTableData::nbuckets, and pg_rotate_right32().

Referenced by ExecHashIncreaseNumBatches(), ExecHashIncreaseNumBuckets(), ExecHashJoinImpl(), ExecHashRemoveNextSkewBucket(), ExecHashTableInsert(), ExecParallelHashIncreaseNumBuckets(), ExecParallelHashJoinPartitionOuter(), ExecParallelHashRepartitionFirst(), ExecParallelHashRepartitionRest(), ExecParallelHashTableInsert(), and ExecParallelHashTableInsertCurrentBatch().

1906 {
1907  uint32 nbuckets = (uint32) hashtable->nbuckets;
1908  uint32 nbatch = (uint32) hashtable->nbatch;
1909 
1910  if (nbatch > 1)
1911  {
1912  *bucketno = hashvalue & (nbuckets - 1);
1913  *batchno = pg_rotate_right32(hashvalue,
1914  hashtable->log2_nbuckets) & (nbatch - 1);
1915  }
1916  else
1917  {
1918  *bucketno = hashvalue & (nbuckets - 1);
1919  *batchno = 0;
1920  }
1921 }
static uint32 pg_rotate_right32(uint32 word, int n)
Definition: pg_bitutils.h:221
unsigned int uint32
Definition: c.h:375

◆ ExecHashGetHashValue()

bool ExecHashGetHashValue ( HashJoinTable  hashtable,
ExprContext econtext,
List hashkeys,
bool  outer_tuple,
bool  keep_nulls,
uint32 hashvalue 
)

Definition at line 1794 of file nodeHash.c.

References HashJoinTableData::collations, DatumGetUInt32, ExprContext::ecxt_per_tuple_memory, ExecEvalExpr(), FunctionCall1Coll(), HashJoinTableData::hashStrict, i, HashJoinTableData::inner_hashfunctions, lfirst, MemoryContextSwitchTo(), HashJoinTableData::outer_hashfunctions, and ResetExprContext.

Referenced by ExecHashJoinOuterGetTuple(), ExecParallelHashJoinOuterGetTuple(), ExecParallelHashJoinPartitionOuter(), MultiExecParallelHash(), and MultiExecPrivateHash().

1800 {
1801  uint32 hashkey = 0;
1802  FmgrInfo *hashfunctions;
1803  ListCell *hk;
1804  int i = 0;
1805  MemoryContext oldContext;
1806 
1807  /*
1808  * We reset the eval context each time to reclaim any memory leaked in the
1809  * hashkey expressions.
1810  */
1811  ResetExprContext(econtext);
1812 
1813  oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
1814 
1815  if (outer_tuple)
1816  hashfunctions = hashtable->outer_hashfunctions;
1817  else
1818  hashfunctions = hashtable->inner_hashfunctions;
1819 
1820  foreach(hk, hashkeys)
1821  {
1822  ExprState *keyexpr = (ExprState *) lfirst(hk);
1823  Datum keyval;
1824  bool isNull;
1825 
1826  /* rotate hashkey left 1 bit at each step */
1827  hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
1828 
1829  /*
1830  * Get the join attribute value of the tuple
1831  */
1832  keyval = ExecEvalExpr(keyexpr, econtext, &isNull);
1833 
1834  /*
1835  * If the attribute is NULL, and the join operator is strict, then
1836  * this tuple cannot pass the join qual so we can reject it
1837  * immediately (unless we're scanning the outside of an outer join, in
1838  * which case we must not reject it). Otherwise we act like the
1839  * hashcode of NULL is zero (this will support operators that act like
1840  * IS NOT DISTINCT, though not any more-random behavior). We treat
1841  * the hash support function as strict even if the operator is not.
1842  *
1843  * Note: currently, all hashjoinable operators must be strict since
1844  * the hash index AM assumes that. However, it takes so little extra
1845  * code here to allow non-strict that we may as well do it.
1846  */
1847  if (isNull)
1848  {
1849  if (hashtable->hashStrict[i] && !keep_nulls)
1850  {
1851  MemoryContextSwitchTo(oldContext);
1852  return false; /* cannot match */
1853  }
1854  /* else, leave hashkey unmodified, equivalent to hashcode 0 */
1855  }
1856  else
1857  {
1858  /* Compute the hash function */
1859  uint32 hkey;
1860 
1861  hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i], hashtable->collations[i], keyval));
1862  hashkey ^= hkey;
1863  }
1864 
1865  i++;
1866  }
1867 
1868  MemoryContextSwitchTo(oldContext);
1869 
1870  *hashvalue = hashkey;
1871  return true;
1872 }
#define DatumGetUInt32(X)
Definition: postgres.h:486
Definition: fmgr.h:56
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:233
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
FmgrInfo * inner_hashfunctions
Definition: hashjoin.h:338
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:292
unsigned int uint32
Definition: c.h:375
FmgrInfo * outer_hashfunctions
Definition: hashjoin.h:337
uintptr_t Datum
Definition: postgres.h:367
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
Definition: fmgr.c:1132
#define lfirst(lc)
Definition: pg_list.h:169
int i
bool * hashStrict
Definition: hashjoin.h:339
#define ResetExprContext(econtext)
Definition: executor.h:503

◆ ExecHashGetSkewBucket()

int ExecHashGetSkewBucket ( HashJoinTable  hashtable,
uint32  hashvalue 
)

Definition at line 2369 of file nodeHash.c.

References HashSkewBucket::hashvalue, INVALID_SKEW_BUCKET_NO, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketLen, and HashJoinTableData::skewEnabled.

Referenced by ExecHashJoinImpl(), and MultiExecPrivateHash().

2370 {
2371  int bucket;
2372 
2373  /*
2374  * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization (in
2375  * particular, this happens after the initial batch is done).
2376  */
2377  if (!hashtable->skewEnabled)
2378  return INVALID_SKEW_BUCKET_NO;
2379 
2380  /*
2381  * Since skewBucketLen is a power of 2, we can do a modulo by ANDing.
2382  */
2383  bucket = hashvalue & (hashtable->skewBucketLen - 1);
2384 
2385  /*
2386  * While we have not hit a hole in the hashtable and have not hit the
2387  * desired bucket, we have collided with some other hash value, so try the
2388  * next bucket location.
2389  */
2390  while (hashtable->skewBucket[bucket] != NULL &&
2391  hashtable->skewBucket[bucket]->hashvalue != hashvalue)
2392  bucket = (bucket + 1) & (hashtable->skewBucketLen - 1);
2393 
2394  /*
2395  * Found the desired bucket?
2396  */
2397  if (hashtable->skewBucket[bucket] != NULL)
2398  return bucket;
2399 
2400  /*
2401  * There must not be any hashtable entry for this hash value.
2402  */
2403  return INVALID_SKEW_BUCKET_NO;
2404 }
#define INVALID_SKEW_BUCKET_NO
Definition: hashjoin.h:109
HashSkewBucket ** skewBucket
Definition: hashjoin.h:305
uint32 hashvalue
Definition: hashjoin.h:104

◆ ExecHashInitializeDSM()

void ExecHashInitializeDSM ( HashState node,
ParallelContext pcxt 
)

Definition at line 2593 of file nodeHash.c.

References PlanState::instrument, SharedHashInfo::num_workers, ParallelContext::nworkers, offsetof, PlanState::plan, Plan::plan_node_id, HashState::ps, HashState::shared_info, shm_toc_allocate(), shm_toc_insert(), and ParallelContext::toc.

Referenced by ExecParallelInitializeDSM().

2594 {
2595  size_t size;
2596 
2597  /* don't need this if not instrumenting or no workers */
2598  if (!node->ps.instrument || pcxt->nworkers == 0)
2599  return;
2600 
2601  size = offsetof(SharedHashInfo, hinstrument) +
2602  pcxt->nworkers * sizeof(HashInstrumentation);
2603  node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size);
2604 
2605  /* Each per-worker area must start out as zeroes. */
2606  memset(node->shared_info, 0, size);
2607 
2608  node->shared_info->num_workers = pcxt->nworkers;
2609  shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id,
2610  node->shared_info);
2611 }
Instrumentation * instrument
Definition: execnodes.h:949
struct HashInstrumentation HashInstrumentation
int plan_node_id
Definition: plannodes.h:135
SharedHashInfo * shared_info
Definition: execnodes.h:2413
PlanState ps
Definition: execnodes.h:2403
Plan * plan
Definition: execnodes.h:939
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
#define offsetof(type, field)
Definition: c.h:669
shm_toc * toc
Definition: parallel.h:45

◆ ExecHashInitializeWorker()

void ExecHashInitializeWorker ( HashState node,
ParallelWorkerContext pwcxt 
)

Definition at line 2618 of file nodeHash.c.

References SharedHashInfo::hinstrument, HashState::hinstrument, PlanState::instrument, ParallelWorkerNumber, PlanState::plan, Plan::plan_node_id, HashState::ps, shm_toc_lookup(), and ParallelWorkerContext::toc.

Referenced by ExecParallelInitializeWorker().

2619 {
2620  SharedHashInfo *shared_info;
2621 
2622  /* don't need this if not instrumenting */
2623  if (!node->ps.instrument)
2624  return;
2625 
2626  /*
2627  * Find our entry in the shared area, and set up a pointer to it so that
2628  * we'll accumulate stats there when shutting down or rebuilding the hash
2629  * table.
2630  */
2631  shared_info = (SharedHashInfo *)
2632  shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
2633  node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber];
2634 }
Instrumentation * instrument
Definition: execnodes.h:949
int plan_node_id
Definition: plannodes.h:135
int ParallelWorkerNumber
Definition: parallel.c:112
PlanState ps
Definition: execnodes.h:2403
HashInstrumentation * hinstrument
Definition: execnodes.h:2420
HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER]
Definition: execnodes.h:2394
Plan * plan
Definition: execnodes.h:939
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232

◆ ExecHashRetrieveInstrumentation()

void ExecHashRetrieveInstrumentation ( HashState node)

Definition at line 2660 of file nodeHash.c.

References SharedHashInfo::num_workers, offsetof, palloc(), and HashState::shared_info.

Referenced by ExecParallelRetrieveInstrumentation().

2661 {
2662  SharedHashInfo *shared_info = node->shared_info;
2663  size_t size;
2664 
2665  if (shared_info == NULL)
2666  return;
2667 
2668  /* Replace node->shared_info with a copy in backend-local memory. */
2669  size = offsetof(SharedHashInfo, hinstrument) +
2670  shared_info->num_workers * sizeof(HashInstrumentation);
2671  node->shared_info = palloc(size);
2672  memcpy(node->shared_info, shared_info, size);
2673 }
struct HashInstrumentation HashInstrumentation
SharedHashInfo * shared_info
Definition: execnodes.h:2413
void * palloc(Size size)
Definition: mcxt.c:950
#define offsetof(type, field)
Definition: c.h:669

◆ ExecHashTableCreate()

HashJoinTable ExecHashTableCreate ( HashState state,
List hashOperators,
List hashCollations,
bool  keepNulls 
)

Definition at line 431 of file nodeHash.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, HashJoinTableData::area, Assert, BarrierArriveAndWait(), BarrierAttach(), BarrierPhase(), HashJoinTableData::batchCxt, HashJoinTableData::batches, HashJoinTableData::buckets, ParallelHashJoinState::build_barrier, HashJoinTableData::chunks, HashJoinTableData::collations, HashJoinTableData::curbatch, HashJoinTableData::current_chunk, CurrentMemoryContext, elog, ERROR, EState::es_query_dsa, ExecChooseHashTableSize(), ExecHashBuildSkewHash(), ExecParallelHashJoinSetUpBatches(), ExecParallelHashTableAlloc(), fmgr_info(), forboth, get_op_hash_functions(), HashJoinTableData::growEnabled, ParallelHashJoinState::growth, HashJoinTableData::hashCxt, HashJoinTableData::hashStrict, i, HashJoinTableData::inner_hashfunctions, HashJoinTableData::innerBatchFile, HashJoinTableData::keepNulls, lfirst_oid, list_length(), HashJoinTableData::log2_nbuckets, HashJoinTableData::log2_nbuckets_optimal, MemoryContextSwitchTo(), my_log2(), ParallelHashJoinState::nbatch, HashJoinTableData::nbatch, HashJoinTableData::nbatch_original, HashJoinTableData::nbatch_outstart, ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, HashJoinTableData::nbuckets_optimal, HashJoinTableData::nbuckets_original, ParallelHashJoinState::nparticipants, HashJoinTableData::nSkewBuckets, OidIsValid, op_strict(), HashJoinTableData::outer_hashfunctions, HashJoinTableData::outerBatchFile, outerPlan, palloc(), palloc0(), Plan::parallel_aware, HashJoinTableData::parallel_state, HashState::parallel_state, HashJoinTableData::partialTuples, PHJ_BUILD_ELECTING, PHJ_GROWTH_OK, Hash::plan, PlanState::plan, Plan::plan_rows, Plan::plan_width, PrepareTempTablespaces(), printf, HashState::ps, Hash::rows_total, SKEW_HASH_MEM_PERCENT, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketLen, HashJoinTableData::skewBucketNums, HashJoinTableData::skewEnabled, Hash::skewTable, HashJoinTableData::skewTuples, ParallelHashJoinState::space_allowed, HashJoinTableData::spaceAllowed, HashJoinTableData::spaceAllowedSkew, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, HashJoinTableData::spaceUsedSkew, PlanState::state, HashJoinTableData::totalTuples, HashJoinTableData::unshared, and WAIT_EVENT_HASH_BUILD_ELECT.

Referenced by ExecHashJoinImpl().

432 {
433  Hash *node;
434  HashJoinTable hashtable;
435  Plan *outerNode;
436  size_t space_allowed;
437  int nbuckets;
438  int nbatch;
439  double rows;
440  int num_skew_mcvs;
441  int log2_nbuckets;
442  int nkeys;
443  int i;
444  ListCell *ho;
445  ListCell *hc;
446  MemoryContext oldcxt;
447 
448  /*
449  * Get information about the size of the relation to be hashed (it's the
450  * "outer" subtree of this node, but the inner relation of the hashjoin).
451  * Compute the appropriate size of the hash table.
452  */
453  node = (Hash *) state->ps.plan;
454  outerNode = outerPlan(node);
455 
456  /*
457  * If this is shared hash table with a partial plan, then we can't use
458  * outerNode->plan_rows to estimate its size. We need an estimate of the
459  * total number of rows across all copies of the partial plan.
460  */
461  rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows;
462 
463  ExecChooseHashTableSize(rows, outerNode->plan_width,
464  OidIsValid(node->skewTable),
465  state->parallel_state != NULL,
466  state->parallel_state != NULL ?
467  state->parallel_state->nparticipants - 1 : 0,
468  &space_allowed,
469  &nbuckets, &nbatch, &num_skew_mcvs);
470 
471  /* nbuckets must be a power of 2 */
472  log2_nbuckets = my_log2(nbuckets);
473  Assert(nbuckets == (1 << log2_nbuckets));
474 
475  /*
476  * Initialize the hash table control block.
477  *
478  * The hashtable control block is just palloc'd from the executor's
479  * per-query memory context. Everything else should be kept inside the
480  * subsidiary hashCxt or batchCxt.
481  */
482  hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
483  hashtable->nbuckets = nbuckets;
484  hashtable->nbuckets_original = nbuckets;
485  hashtable->nbuckets_optimal = nbuckets;
486  hashtable->log2_nbuckets = log2_nbuckets;
487  hashtable->log2_nbuckets_optimal = log2_nbuckets;
488  hashtable->buckets.unshared = NULL;
489  hashtable->keepNulls = keepNulls;
490  hashtable->skewEnabled = false;
491  hashtable->skewBucket = NULL;
492  hashtable->skewBucketLen = 0;
493  hashtable->nSkewBuckets = 0;
494  hashtable->skewBucketNums = NULL;
495  hashtable->nbatch = nbatch;
496  hashtable->curbatch = 0;
497  hashtable->nbatch_original = nbatch;
498  hashtable->nbatch_outstart = nbatch;
499  hashtable->growEnabled = true;
500  hashtable->totalTuples = 0;
501  hashtable->partialTuples = 0;
502  hashtable->skewTuples = 0;
503  hashtable->innerBatchFile = NULL;
504  hashtable->outerBatchFile = NULL;
505  hashtable->spaceUsed = 0;
506  hashtable->spacePeak = 0;
507  hashtable->spaceAllowed = space_allowed;
508  hashtable->spaceUsedSkew = 0;
509  hashtable->spaceAllowedSkew =
510  hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
511  hashtable->chunks = NULL;
512  hashtable->current_chunk = NULL;
513  hashtable->parallel_state = state->parallel_state;
514  hashtable->area = state->ps.state->es_query_dsa;
515  hashtable->batches = NULL;
516 
517 #ifdef HJDEBUG
518  printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
519  hashtable, nbatch, nbuckets);
520 #endif
521 
522  /*
523  * Create temporary memory contexts in which to keep the hashtable working
524  * storage. See notes in executor/hashjoin.h.
525  */
527  "HashTableContext",
529 
530  hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
531  "HashBatchContext",
533 
534  /* Allocate data that will live for the life of the hashjoin */
535 
536  oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
537 
538  /*
539  * Get info about the hash functions to be used for each hash key. Also
540  * remember whether the join operators are strict.
541  */
542  nkeys = list_length(hashOperators);
543  hashtable->outer_hashfunctions =
544  (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
545  hashtable->inner_hashfunctions =
546  (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
547  hashtable->hashStrict = (bool *) palloc(nkeys * sizeof(bool));
548  hashtable->collations = (Oid *) palloc(nkeys * sizeof(Oid));
549  i = 0;
550  forboth(ho, hashOperators, hc, hashCollations)
551  {
552  Oid hashop = lfirst_oid(ho);
553  Oid left_hashfn;
554  Oid right_hashfn;
555 
556  if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
557  elog(ERROR, "could not find hash function for hash operator %u",
558  hashop);
559  fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]);
560  fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]);
561  hashtable->hashStrict[i] = op_strict(hashop);
562  hashtable->collations[i] = lfirst_oid(hc);
563  i++;
564  }
565 
566  if (nbatch > 1 && hashtable->parallel_state == NULL)
567  {
568  /*
569  * allocate and initialize the file arrays in hashCxt (not needed for
570  * parallel case which uses shared tuplestores instead of raw files)
571  */
572  hashtable->innerBatchFile = (BufFile **)
573  palloc0(nbatch * sizeof(BufFile *));
574  hashtable->outerBatchFile = (BufFile **)
575  palloc0(nbatch * sizeof(BufFile *));
576  /* The files will not be opened until needed... */
577  /* ... but make sure we have temp tablespaces established for them */
579  }
580 
581  MemoryContextSwitchTo(oldcxt);
582 
583  if (hashtable->parallel_state)
584  {
585  ParallelHashJoinState *pstate = hashtable->parallel_state;
586  Barrier *build_barrier;
587 
588  /*
589  * Attach to the build barrier. The corresponding detach operation is
590  * in ExecHashTableDetach. Note that we won't attach to the
591  * batch_barrier for batch 0 yet. We'll attach later and start it out
592  * in PHJ_BATCH_PROBING phase, because batch 0 is allocated up front
593  * and then loaded while hashing (the standard hybrid hash join
594  * algorithm), and we'll coordinate that using build_barrier.
595  */
596  build_barrier = &pstate->build_barrier;
597  BarrierAttach(build_barrier);
598 
599  /*
600  * So far we have no idea whether there are any other participants,
601  * and if so, what phase they are working on. The only thing we care
602  * about at this point is whether someone has already created the
603  * SharedHashJoinBatch objects and the hash table for batch 0. One
604  * backend will be elected to do that now if necessary.
605  */
606  if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECTING &&
608  {
609  pstate->nbatch = nbatch;
610  pstate->space_allowed = space_allowed;
611  pstate->growth = PHJ_GROWTH_OK;
612 
613  /* Set up the shared state for coordinating batches. */
614  ExecParallelHashJoinSetUpBatches(hashtable, nbatch);
615 
616  /*
617  * Allocate batch 0's hash table up front so we can load it
618  * directly while hashing.
619  */
620  pstate->nbuckets = nbuckets;
621  ExecParallelHashTableAlloc(hashtable, 0);
622  }
623 
624  /*
625  * The next Parallel Hash synchronization point is in
626  * MultiExecParallelHash(), which will progress it all the way to
627  * PHJ_BUILD_DONE. The caller must not return control from this
628  * executor node between now and then.
629  */
630  }
631  else
632  {
633  /*
634  * Prepare context for the first-scan space allocations; allocate the
635  * hashbucket array therein, and set each bucket "empty".
636  */
637  MemoryContextSwitchTo(hashtable->batchCxt);
638 
639  hashtable->buckets.unshared = (HashJoinTuple *)
640  palloc0(nbuckets * sizeof(HashJoinTuple));
641 
642  /*
643  * Set up for skew optimization, if possible and there's a need for
644  * more than one batch. (In a one-batch join, there's no point in
645  * it.)
646  */
647  if (nbatch > 1)
648  ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
649 
650  MemoryContextSwitchTo(oldcxt);
651  }
652 
653  return hashtable;
654 }
int log2_nbuckets_optimal
Definition: hashjoin.h:291
double rows_total
Definition: plannodes.h:935
Oid skewTable
Definition: plannodes.h:931
struct ParallelHashJoinState * parallel_state
Definition: execnodes.h:2423
double skewTuples
Definition: hashjoin.h:320
#define SKEW_HASH_MEM_PERCENT
Definition: hashjoin.h:110
Definition: fmgr.h:56
struct dsa_area * es_query_dsa
Definition: execnodes.h:593
double plan_rows
Definition: plannodes.h:123
bool op_strict(Oid opno)
Definition: lsyscache.c:1389
#define AllocSetContextCreate
Definition: memutils.h:170
bool get_op_hash_functions(Oid opno, RegProcedure *lhs_procno, RegProcedure *rhs_procno)
Definition: lsyscache.c:508
#define forboth(cell1, list1, cell2, list2)
Definition: pg_list.h:434
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define printf(...)
Definition: port.h:221
FmgrInfo * inner_hashfunctions
Definition: hashjoin.h:338
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
Definition: nodeHash.c:3099
EState * state
Definition: execnodes.h:941
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:652
static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
Definition: nodeHash.c:2216
double partialTuples
Definition: hashjoin.h:319
dsa_area * area
Definition: hashjoin.h:356
int * skewBucketNums
Definition: hashjoin.h:308
#define ERROR
Definition: elog.h:43
void PrepareTempTablespaces(void)
Definition: tablespace.c:1326
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:126
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
BufFile ** outerBatchFile
Definition: hashjoin.h:330
Size spaceAllowedSkew
Definition: hashjoin.h:346
bool parallel_aware
Definition: plannodes.h:129
PlanState ps
Definition: execnodes.h:2403
#define PHJ_BUILD_ELECTING
Definition: hashjoin.h:257
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
union HashJoinTableData::@94 buckets
MemoryContext batchCxt
Definition: hashjoin.h:349
struct HashJoinTableData * HashJoinTable
Definition: execnodes.h:1935
int my_log2(long num)
Definition: dynahash.c:1730
#define outerPlan(node)
Definition: plannodes.h:166
FmgrInfo * outer_hashfunctions
Definition: hashjoin.h:337
HashSkewBucket ** skewBucket
Definition: hashjoin.h:305
int BarrierAttach(Barrier *barrier)
Definition: barrier.c:214
void * palloc0(Size size)
Definition: mcxt.c:981
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
Plan * plan
Definition: execnodes.h:939
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
Definition: nodeHash.c:668
double totalTuples
Definition: hashjoin.h:318
int plan_width
Definition: plannodes.h:124
#define Assert(condition)
Definition: c.h:746
ParallelHashGrowth growth
Definition: hashjoin.h:241
BufFile ** innerBatchFile
Definition: hashjoin.h:329
static int list_length(const List *l)
Definition: pg_list.h:149
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:243
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
Definition: barrier.c:125
HashMemoryChunk chunks
Definition: hashjoin.h:352
Plan plan
Definition: plannodes.h:924
void * palloc(Size size)
Definition: mcxt.c:950
static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
Definition: nodeHash.c:2938
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:297
HashMemoryChunk current_chunk
Definition: hashjoin.h:355
#define elog(elevel,...)
Definition: elog.h:214
int i
bool * hashStrict
Definition: hashjoin.h:339
MemoryContext hashCxt
Definition: hashjoin.h:348
#define lfirst_oid(lc)
Definition: pg_list.h:171

◆ ExecHashTableDestroy()

void ExecHashTableDestroy ( HashJoinTable  hashtable)

Definition at line 854 of file nodeHash.c.

References BufFileClose(), HashJoinTableData::hashCxt, i, HashJoinTableData::innerBatchFile, MemoryContextDelete(), HashJoinTableData::nbatch, HashJoinTableData::outerBatchFile, and pfree().

Referenced by ExecEndHashJoin(), and ExecReScanHashJoin().

855 {
856  int i;
857 
858  /*
859  * Make sure all the temp files are closed. We skip batch 0, since it
860  * can't have any temp files (and the arrays might not even exist if
861  * nbatch is only 1). Parallel hash joins don't use these files.
862  */
863  if (hashtable->innerBatchFile != NULL)
864  {
865  for (i = 1; i < hashtable->nbatch; i++)
866  {
867  if (hashtable->innerBatchFile[i])
868  BufFileClose(hashtable->innerBatchFile[i]);
869  if (hashtable->outerBatchFile[i])
870  BufFileClose(hashtable->outerBatchFile[i]);
871  }
872  }
873 
874  /* Release working memory (batchCxt is a child, so it goes away too) */
875  MemoryContextDelete(hashtable->hashCxt);
876 
877  /* And drop the control block */
878  pfree(hashtable);
879 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
void BufFileClose(BufFile *file)
Definition: buffile.c:395
void pfree(void *pointer)
Definition: mcxt.c:1057
BufFile ** outerBatchFile
Definition: hashjoin.h:330
BufFile ** innerBatchFile
Definition: hashjoin.h:329
int i
MemoryContext hashCxt
Definition: hashjoin.h:348

◆ ExecHashTableDetach()

void ExecHashTableDetach ( HashJoinTable  hashtable)

Definition at line 3176 of file nodeHash.c.

References HashJoinTableData::area, BarrierDetach(), ParallelHashJoinState::batches, HashJoinTableData::batches, ParallelHashJoinState::build_barrier, dsa_free(), DsaPointerIsValid, i, ParallelHashJoinBatchAccessor::inner_tuples, InvalidDsaPointer, HashJoinTableData::nbatch, ParallelHashJoinBatchAccessor::outer_tuples, HashJoinTableData::parallel_state, sts_end_parallel_scan(), and sts_end_write().

Referenced by ExecHashJoinReInitializeDSM(), and ExecShutdownHashJoin().

3177 {
3178  if (hashtable->parallel_state)
3179  {
3180  ParallelHashJoinState *pstate = hashtable->parallel_state;
3181  int i;
3182 
3183  /* Make sure any temporary files are closed. */
3184  if (hashtable->batches)
3185  {
3186  for (i = 0; i < hashtable->nbatch; ++i)
3187  {
3188  sts_end_write(hashtable->batches[i].inner_tuples);
3189  sts_end_write(hashtable->batches[i].outer_tuples);
3192  }
3193  }
3194 
3195  /* If we're last to detach, clean up shared memory. */
3196  if (BarrierDetach(&pstate->build_barrier))
3197  {
3198  if (DsaPointerIsValid(pstate->batches))
3199  {
3200  dsa_free(hashtable->area, pstate->batches);
3201  pstate->batches = InvalidDsaPointer;
3202  }
3203  }
3204 
3205  hashtable->parallel_state = NULL;
3206  }
3207 }
SharedTuplestoreAccessor * outer_tuples
Definition: hashjoin.h:209
#define InvalidDsaPointer
Definition: dsa.h:78
SharedTuplestoreAccessor * inner_tuples
Definition: hashjoin.h:208
dsa_area * area
Definition: hashjoin.h:356
dsa_pointer batches
Definition: hashjoin.h:236
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
bool BarrierDetach(Barrier *barrier)
Definition: barrier.c:234
#define DsaPointerIsValid(x)
Definition: dsa.h:81
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:820
int i
void sts_end_write(SharedTuplestoreAccessor *accessor)

◆ ExecHashTableDetachBatch()

void ExecHashTableDetachBatch ( HashJoinTable  hashtable)

Definition at line 3119 of file nodeHash.c.

References HashJoinTableData::area, Assert, BarrierArriveAndDetach(), BarrierPhase(), ParallelHashJoinBatch::batch_barrier, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, ParallelHashJoinBatch::chunks, HashJoinTableData::curbatch, dsa_free(), dsa_get_address(), DsaPointerIsValid, ParallelHashJoinBatchAccessor::inner_tuples, InvalidDsaPointer, Max, HashJoinTableData::nbuckets, HashMemoryChunkData::next, next, ParallelHashJoinBatchAccessor::outer_tuples, HashJoinTableData::parallel_state, PHJ_BATCH_DONE, HashMemoryChunkData::shared, ParallelHashJoinBatchAccessor::shared, ParallelHashJoinBatch::size, HashJoinTableData::spacePeak, and sts_end_parallel_scan().

Referenced by ExecHashJoinReInitializeDSM(), ExecParallelHashJoinNewBatch(), and ExecShutdownHashJoin().

3120 {
3121  if (hashtable->parallel_state != NULL &&
3122  hashtable->curbatch >= 0)
3123  {
3124  int curbatch = hashtable->curbatch;
3125  ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
3126 
3127  /* Make sure any temporary files are closed. */
3128  sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
3129  sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
3130 
3131  /* Detach from the batch we were last working on. */
3133  {
3134  /*
3135  * Technically we shouldn't access the barrier because we're no
3136  * longer attached, but since there is no way it's moving after
3137  * this point it seems safe to make the following assertion.
3138  */
3140 
3141  /* Free shared chunks and buckets. */
3142  while (DsaPointerIsValid(batch->chunks))
3143  {
3144  HashMemoryChunk chunk =
3145  dsa_get_address(hashtable->area, batch->chunks);
3146  dsa_pointer next = chunk->next.shared;
3147 
3148  dsa_free(hashtable->area, batch->chunks);
3149  batch->chunks = next;
3150  }
3151  if (DsaPointerIsValid(batch->buckets))
3152  {
3153  dsa_free(hashtable->area, batch->buckets);
3154  batch->buckets = InvalidDsaPointer;
3155  }
3156  }
3157 
3158  /*
3159  * Track the largest batch we've been attached to. Though each
3160  * backend might see a different subset of batches, explain.c will
3161  * scan the results from all backends to find the largest value.
3162  */
3163  hashtable->spacePeak =
3164  Max(hashtable->spacePeak,
3165  batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
3166 
3167  /* Remember that we are not attached to a batch. */
3168  hashtable->curbatch = -1;
3169  }
3170 }
SharedTuplestoreAccessor * outer_tuples
Definition: hashjoin.h:209
#define PHJ_BATCH_DONE
Definition: hashjoin.h:268
static int32 next
Definition: blutils.c:219
#define InvalidDsaPointer
Definition: dsa.h:78
dsa_pointer chunks
Definition: hashjoin.h:156
uint64 dsa_pointer
Definition: dsa.h:62
SharedTuplestoreAccessor * inner_tuples
Definition: hashjoin.h:208
dsa_area * area
Definition: hashjoin.h:356
dsa_pointer shared
Definition: hashjoin.h:127
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:932
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
union HashMemoryChunkData::@93 next
bool BarrierArriveAndDetach(Barrier *barrier)
Definition: barrier.c:203
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
#define Max(x, y)
Definition: c.h:922
#define Assert(condition)
Definition: c.h:746
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:243
ParallelHashJoinBatch * shared
Definition: hashjoin.h:197
#define DsaPointerIsValid(x)
Definition: dsa.h:81
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:820
dsa_pointer buckets
Definition: hashjoin.h:153

◆ ExecHashTableInsert()

void ExecHashTableInsert ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1596 of file nodeHash.c.

References Assert, HashJoinTableData::buckets, HashJoinTableData::curbatch, dense_alloc(), ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecHashIncreaseNumBatches(), ExecHashJoinSaveTuple(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, HashJoinTableData::innerBatchFile, HashJoinTableData::log2_nbuckets_optimal, MaxAllocSize, HashJoinTableData::nbatch, HashJoinTableData::nbuckets_optimal, HashJoinTupleData::next, NTUP_PER_BUCKET, HashJoinTableData::skewTuples, HashJoinTableData::spaceAllowed, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, MinimalTupleData::t_len, HashJoinTableData::totalTuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinNewBatch(), and MultiExecPrivateHash().

1599 {
1600  bool shouldFree;
1601  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1602  int bucketno;
1603  int batchno;
1604 
1605  ExecHashGetBucketAndBatch(hashtable, hashvalue,
1606  &bucketno, &batchno);
1607 
1608  /*
1609  * decide whether to put the tuple in the hash table or a temp file
1610  */
1611  if (batchno == hashtable->curbatch)
1612  {
1613  /*
1614  * put the tuple in hash table
1615  */
1616  HashJoinTuple hashTuple;
1617  int hashTupleSize;
1618  double ntuples = (hashtable->totalTuples - hashtable->skewTuples);
1619 
1620  /* Create the HashJoinTuple */
1621  hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
1622  hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
1623 
1624  hashTuple->hashvalue = hashvalue;
1625  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1626 
1627  /*
1628  * We always reset the tuple-matched flag on insertion. This is okay
1629  * even when reloading a tuple from a batch file, since the tuple
1630  * could not possibly have been matched to an outer tuple before it
1631  * went into the batch file.
1632  */
1634 
1635  /* Push it onto the front of the bucket's list */
1636  hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
1637  hashtable->buckets.unshared[bucketno] = hashTuple;
1638 
1639  /*
1640  * Increase the (optimal) number of buckets if we just exceeded the
1641  * NTUP_PER_BUCKET threshold, but only when there's still a single
1642  * batch.
1643  */
1644  if (hashtable->nbatch == 1 &&
1645  ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))
1646  {
1647  /* Guard against integer overflow and alloc size overflow */
1648  if (hashtable->nbuckets_optimal <= INT_MAX / 2 &&
1649  hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple))
1650  {
1651  hashtable->nbuckets_optimal *= 2;
1652  hashtable->log2_nbuckets_optimal += 1;
1653  }
1654  }
1655 
1656  /* Account for space used, and back off if we've used too much */
1657  hashtable->spaceUsed += hashTupleSize;
1658  if (hashtable->spaceUsed > hashtable->spacePeak)
1659  hashtable->spacePeak = hashtable->spaceUsed;
1660  if (hashtable->spaceUsed +
1661  hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
1662  > hashtable->spaceAllowed)
1663  ExecHashIncreaseNumBatches(hashtable);
1664  }
1665  else
1666  {
1667  /*
1668  * put the tuple into a temp file for later batches
1669  */
1670  Assert(batchno > hashtable->curbatch);
1671  ExecHashJoinSaveTuple(tuple,
1672  hashvalue,
1673  &hashtable->innerBatchFile[batchno]);
1674  }
1675 
1676  if (shouldFree)
1677  heap_free_minimal_tuple(tuple);
1678 }
int log2_nbuckets_optimal
Definition: hashjoin.h:291
double skewTuples
Definition: hashjoin.h:320
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1662
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
Definition: nodeHash.c:887
union HashJoinTupleData::@92 next
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
Definition: nodeHash.c:1902
struct HashJoinTupleData * unshared
Definition: hashjoin.h:72
void heap_free_minimal_tuple(MinimalTuple mtup)
Definition: heaptuple.c:1427
struct HashJoinTupleData * HashJoinTuple
Definition: execnodes.h:1934
union HashJoinTableData::@94 buckets
#define MaxAllocSize
Definition: memutils.h:40
static void * dense_alloc(HashJoinTable hashtable, Size size)
Definition: nodeHash.c:2710
#define NTUP_PER_BUCKET
Definition: nodeHash.c:665
#define HJTUPLE_OVERHEAD
Definition: hashjoin.h:79
double totalTuples
Definition: hashjoin.h:318
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
#define Assert(condition)
Definition: c.h:746
BufFile ** innerBatchFile
Definition: hashjoin.h:329
#define HeapTupleHeaderClearMatch(tup)
Definition: htup_details.h:526
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:297
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr)
uint32 hashvalue
Definition: hashjoin.h:75

◆ ExecHashTableReset()

void ExecHashTableReset ( HashJoinTable  hashtable)

Definition at line 2141 of file nodeHash.c.

References HashJoinTableData::batchCxt, HashJoinTableData::buckets, HashJoinTableData::chunks, MemoryContextReset(), MemoryContextSwitchTo(), HashJoinTableData::nbuckets, palloc0(), HashJoinTableData::spaceUsed, and HashJoinTableData::unshared.

Referenced by ExecHashJoinNewBatch().

2142 {
2143  MemoryContext oldcxt;
2144  int nbuckets = hashtable->nbuckets;
2145 
2146  /*
2147  * Release all the hash buckets and tuples acquired in the prior pass, and
2148  * reinitialize the context for a new pass.
2149  */
2150  MemoryContextReset(hashtable->batchCxt);
2151  oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
2152 
2153  /* Reallocate and reinitialize the hash bucket headers. */
2154  hashtable->buckets.unshared = (HashJoinTuple *)
2155  palloc0(nbuckets * sizeof(HashJoinTuple));
2156 
2157  hashtable->spaceUsed = 0;
2158 
2159  MemoryContextSwitchTo(oldcxt);
2160 
2161  /* Forget the chunks (the memory was freed by the context reset above). */
2162  hashtable->chunks = NULL;
2163 }
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:137
union HashJoinTableData::@94 buckets
MemoryContext batchCxt
Definition: hashjoin.h:349
void * palloc0(Size size)
Definition: mcxt.c:981
HashMemoryChunk chunks
Definition: hashjoin.h:352
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:297

◆ ExecHashTableResetMatchFlags()

void ExecHashTableResetMatchFlags ( HashJoinTable  hashtable)

Definition at line 2170 of file nodeHash.c.

References HashJoinTableData::buckets, HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, i, HashJoinTableData::nbuckets, HashJoinTupleData::next, HashJoinTableData::nSkewBuckets, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketNums, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecReScanHashJoin().

2171 {
2172  HashJoinTuple tuple;
2173  int i;
2174 
2175  /* Reset all flags in the main table ... */
2176  for (i = 0; i < hashtable->nbuckets; i++)
2177  {
2178  for (tuple = hashtable->buckets.unshared[i]; tuple != NULL;
2179  tuple = tuple->next.unshared)
2181  }
2182 
2183  /* ... and the same for the skew buckets, if any */
2184  for (i = 0; i < hashtable->nSkewBuckets; i++)
2185  {
2186  int j = hashtable->skewBucketNums[i];
2187  HashSkewBucket *skewBucket = hashtable->skewBucket[j];
2188 
2189  for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared)
2191  }
2192 }
union HashJoinTupleData::@92 next
int * skewBucketNums
Definition: hashjoin.h:308
struct HashJoinTupleData * unshared
Definition: hashjoin.h:72
HashJoinTuple tuples
Definition: hashjoin.h:105
union HashJoinTableData::@94 buckets
HashSkewBucket ** skewBucket
Definition: hashjoin.h:305
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
#define HeapTupleHeaderClearMatch(tup)
Definition: htup_details.h:526
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:297
int i

◆ ExecInitHash()

HashState* ExecInitHash ( Hash node,
EState estate,
int  eflags 
)

Definition at line 354 of file nodeHash.c.

References Assert, EXEC_FLAG_BACKWARD, EXEC_FLAG_MARK, ExecAssignExprContext(), ExecHash(), ExecInitExprList(), ExecInitNode(), ExecInitResultTupleSlotTL(), PlanState::ExecProcNode, Hash::hashkeys, HashState::hashkeys, HashState::hashtable, makeNode, NIL, outerPlan, outerPlanState, Hash::plan, PlanState::plan, HashState::ps, PlanState::ps_ProjInfo, Plan::qual, PlanState::state, and TTSOpsMinimalTuple.

Referenced by ExecInitNode().

355 {
356  HashState *hashstate;
357 
358  /* check for unsupported flags */
359  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
360 
361  /*
362  * create state structure
363  */
364  hashstate = makeNode(HashState);
365  hashstate->ps.plan = (Plan *) node;
366  hashstate->ps.state = estate;
367  hashstate->ps.ExecProcNode = ExecHash;
368  hashstate->hashtable = NULL;
369  hashstate->hashkeys = NIL; /* will be set by parent HashJoin */
370 
371  /*
372  * Miscellaneous initialization
373  *
374  * create expression context for node
375  */
376  ExecAssignExprContext(estate, &hashstate->ps);
377 
378  /*
379  * initialize child nodes
380  */
381  outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags);
382 
383  /*
384  * initialize our result slot and type. No need to build projection
385  * because this node doesn't do projections.
386  */
388  hashstate->ps.ps_ProjInfo = NULL;
389 
390  /*
391  * initialize child expressions
392  */
393  Assert(node->plan.qual == NIL);
394  hashstate->hashkeys =
395  ExecInitExprList(node->hashkeys, (PlanState *) hashstate);
396 
397  return hashstate;
398 }
#define NIL
Definition: pg_list.h:65
List * qual
Definition: plannodes.h:137
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:979
HashJoinTable hashtable
Definition: execnodes.h:2404
EState * state
Definition: execnodes.h:941
#define EXEC_FLAG_BACKWARD
Definition: executor.h:58
#define outerPlanState(node)
Definition: execnodes.h:1033
List * ExecInitExprList(List *nodes, PlanState *parent)
Definition: execExpr.c:318
List * hashkeys
Definition: execnodes.h:2405
PlanState ps
Definition: execnodes.h:2403
#define outerPlan(node)
Definition: plannodes.h:166
static TupleTableSlot * ExecHash(PlanState *pstate)
Definition: nodeHash.c:92
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:945
List * hashkeys
Definition: plannodes.h:930
Plan * plan
Definition: execnodes.h:939
#define makeNode(_type_)
Definition: nodes.h:576
#define Assert(condition)
Definition: c.h:746
#define EXEC_FLAG_MARK
Definition: executor.h:59
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:479
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1769
Plan plan
Definition: plannodes.h:924
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:139
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:85

◆ ExecParallelHashTableAlloc()

void ExecParallelHashTableAlloc ( HashJoinTable  hashtable,
int  batchno 
)

Definition at line 3099 of file nodeHash.c.

References HashJoinTableData::area, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, dsa_allocate, dsa_get_address(), dsa_pointer_atomic_init, i, InvalidDsaPointer, ParallelHashJoinState::nbuckets, HashJoinTableData::parallel_state, and ParallelHashJoinBatchAccessor::shared.

Referenced by ExecHashTableCreate(), and ExecParallelHashJoinNewBatch().

3100 {
3101  ParallelHashJoinBatch *batch = hashtable->batches[batchno].shared;
3102  dsa_pointer_atomic *buckets;
3103  int nbuckets = hashtable->parallel_state->nbuckets;
3104  int i;
3105 
3106  batch->buckets =
3107  dsa_allocate(hashtable->area, sizeof(dsa_pointer_atomic) * nbuckets);
3108  buckets = (dsa_pointer_atomic *)
3109  dsa_get_address(hashtable->area, batch->buckets);
3110  for (i = 0; i < nbuckets; ++i)
3112 }
#define InvalidDsaPointer
Definition: dsa.h:78
dsa_area * area
Definition: hashjoin.h:356
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:932
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
ParallelHashJoinBatch * shared
Definition: hashjoin.h:197
#define dsa_pointer_atomic_init
Definition: dsa.h:64
int i
#define dsa_allocate(area, size)
Definition: dsa.h:84
dsa_pointer buckets
Definition: hashjoin.h:153

◆ ExecParallelHashTableInsert()

void ExecParallelHashTableInsert ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1685 of file nodeHash.c.

References Assert, BarrierPhase(), HashJoinTableData::batches, HashJoinTableData::buckets, ParallelHashJoinState::build_barrier, ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecParallelHashPushTuple(), ExecParallelHashTupleAlloc(), ExecParallelHashTuplePrealloc(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, ParallelHashJoinBatchAccessor::inner_tuples, MAXALIGN, ParallelHashJoinBatchAccessor::ntuples, HashJoinTableData::parallel_state, PHJ_BUILD_HASHING_INNER, ParallelHashJoinBatchAccessor::preallocated, HashJoinTableData::shared, sts_puttuple(), and MinimalTupleData::t_len.

Referenced by MultiExecParallelHash().

1688 {
1689  bool shouldFree;
1690  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1691  dsa_pointer shared;
1692  int bucketno;
1693  int batchno;
1694 
1695 retry:
1696  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1697 
1698  if (batchno == 0)
1699  {
1700  HashJoinTuple hashTuple;
1701 
1702  /* Try to load it into memory. */
1705  hashTuple = ExecParallelHashTupleAlloc(hashtable,
1706  HJTUPLE_OVERHEAD + tuple->t_len,
1707  &shared);
1708  if (hashTuple == NULL)
1709  goto retry;
1710 
1711  /* Store the hash value in the HashJoinTuple header. */
1712  hashTuple->hashvalue = hashvalue;
1713  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1714 
1715  /* Push it onto the front of the bucket's list */
1716  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1717  hashTuple, shared);
1718  }
1719  else
1720  {
1721  size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
1722 
1723  Assert(batchno > 0);
1724 
1725  /* Try to preallocate space in the batch if necessary. */
1726  if (hashtable->batches[batchno].preallocated < tuple_size)
1727  {
1728  if (!ExecParallelHashTuplePrealloc(hashtable, batchno, tuple_size))
1729  goto retry;
1730  }
1731 
1732  Assert(hashtable->batches[batchno].preallocated >= tuple_size);
1733  hashtable->batches[batchno].preallocated -= tuple_size;
1734  sts_puttuple(hashtable->batches[batchno].inner_tuples, &hashvalue,
1735  tuple);
1736  }
1737  ++hashtable->batches[batchno].ntuples;
1738 
1739  if (shouldFree)
1740  heap_free_minimal_tuple(tuple);
1741 }
dsa_pointer_atomic * shared
Definition: hashjoin.h:299
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1662
uint64 dsa_pointer
Definition: dsa.h:62
SharedTuplestoreAccessor * inner_tuples
Definition: hashjoin.h:208
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
Definition: nodeHash.c:1902
void heap_free_minimal_tuple(MinimalTuple mtup)
Definition: heaptuple.c:1427
static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
Definition: nodeHash.c:3323
union HashJoinTableData::@94 buckets
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
#define HJTUPLE_OVERHEAD
Definition: hashjoin.h:79
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
#define Assert(condition)
Definition: c.h:746
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:243
#define MAXALIGN(LEN)
Definition: c.h:699
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
Definition: nodeHash.c:3243
#define PHJ_BUILD_HASHING_INNER
Definition: hashjoin.h:259
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
Definition: nodeHash.c:2790
uint32 hashvalue
Definition: hashjoin.h:75

◆ ExecParallelHashTableInsertCurrentBatch()

void ExecParallelHashTableInsertCurrentBatch ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1750 of file nodeHash.c.

References Assert, HashJoinTableData::buckets, HashJoinTableData::curbatch, ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecParallelHashPushTuple(), ExecParallelHashTupleAlloc(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, HashJoinTableData::shared, and MinimalTupleData::t_len.

Referenced by ExecParallelHashJoinNewBatch().

1753 {
1754  bool shouldFree;
1755  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1756  HashJoinTuple hashTuple;
1757  dsa_pointer shared;
1758  int batchno;
1759  int bucketno;
1760 
1761  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1762  Assert(batchno == hashtable->curbatch);
1763  hashTuple = ExecParallelHashTupleAlloc(hashtable,
1764  HJTUPLE_OVERHEAD + tuple->t_len,
1765  &shared);
1766  hashTuple->hashvalue = hashvalue;
1767  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1769  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1770  hashTuple, shared);
1771 
1772  if (shouldFree)
1773  heap_free_minimal_tuple(tuple);
1774 }
dsa_pointer_atomic * shared
Definition: hashjoin.h:299
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1662
uint64 dsa_pointer
Definition: dsa.h:62
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
Definition: nodeHash.c:1902
void heap_free_minimal_tuple(MinimalTuple mtup)
Definition: heaptuple.c:1427
union HashJoinTableData::@94 buckets
#define HJTUPLE_OVERHEAD
Definition: hashjoin.h:79
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
#define Assert(condition)
Definition: c.h:746
#define HeapTupleHeaderClearMatch(tup)
Definition: htup_details.h:526
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
Definition: nodeHash.c:3243
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
Definition: nodeHash.c:2790
uint32 hashvalue
Definition: hashjoin.h:75

◆ ExecParallelHashTableSetCurrentBatch()

void ExecParallelHashTableSetCurrentBatch ( HashJoinTable  hashtable,
int  batchno 
)

Definition at line 3261 of file nodeHash.c.

References HashJoinTableData::area, Assert, ParallelHashJoinBatchAccessor::at_least_one_chunk, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, HashJoinTableData::buckets, HashJoinTableData::curbatch, HashJoinTableData::current_chunk, HashJoinTableData::current_chunk_shared, dsa_get_address(), InvalidDsaPointer, HashJoinTableData::log2_nbuckets, my_log2(), ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, HashJoinTableData::parallel_state, ParallelHashJoinBatchAccessor::shared, and HashJoinTableData::shared.

Referenced by ExecParallelHashIncreaseNumBatches(), ExecParallelHashIncreaseNumBuckets(), ExecParallelHashJoinNewBatch(), and MultiExecParallelHash().

3262 {
3263  Assert(hashtable->batches[batchno].shared->buckets != InvalidDsaPointer);
3264 
3265  hashtable->curbatch = batchno;
3266  hashtable->buckets.shared = (dsa_pointer_atomic *)
3267  dsa_get_address(hashtable->area,
3268  hashtable->batches[batchno].shared->buckets);
3269  hashtable->nbuckets = hashtable->parallel_state->nbuckets;
3270  hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
3271  hashtable->current_chunk = NULL;
3273  hashtable->batches[batchno].at_least_one_chunk = false;
3274 }
dsa_pointer current_chunk_shared
Definition: hashjoin.h:359
dsa_pointer_atomic * shared
Definition: hashjoin.h:299
#define InvalidDsaPointer
Definition: dsa.h:78
dsa_area * area
Definition: hashjoin.h:356
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:932
union HashJoinTableData::@94 buckets
int my_log2(long num)
Definition: dynahash.c:1730
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
#define Assert(condition)
Definition: c.h:746
ParallelHashJoinBatch * shared
Definition: hashjoin.h:197
HashMemoryChunk current_chunk
Definition: hashjoin.h:355
dsa_pointer buckets
Definition: hashjoin.h:153

◆ ExecParallelScanHashBucket()

bool ExecParallelScanHashBucket ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 1995 of file nodeHash.c.

References ExprContext::ecxt_innertuple, ExecParallelHashFirstTuple(), ExecParallelHashNextTuple(), ExecQualAndReset(), ExecStoreMinimalTuple(), HashJoinState::hashclauses, HashJoinTupleData::hashvalue, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurHashValue, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, and HJTUPLE_MINTUPLE.

Referenced by ExecHashJoinImpl().

1997 {
1998  ExprState *hjclauses = hjstate->hashclauses;
1999  HashJoinTable hashtable = hjstate->hj_HashTable;
2000  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2001  uint32 hashvalue = hjstate->hj_CurHashValue;
2002 
2003  /*
2004  * hj_CurTuple is the address of the tuple last returned from the current
2005  * bucket, or NULL if it's time to start scanning a new bucket.
2006  */
2007  if (hashTuple != NULL)
2008  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2009  else
2010  hashTuple = ExecParallelHashFirstTuple(hashtable,
2011  hjstate->hj_CurBucketNo);
2012 
2013  while (hashTuple != NULL)
2014  {
2015  if (hashTuple->hashvalue == hashvalue)
2016  {
2017  TupleTableSlot *inntuple;
2018 
2019  /* insert hashtable's tuple into exec slot so ExecQual sees it */
2020  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2021  hjstate->hj_HashTupleSlot,
2022  false); /* do not pfree */
2023  econtext->ecxt_innertuple = inntuple;
2024 
2025  if (ExecQualAndReset(hjclauses, econtext))
2026  {
2027  hjstate->hj_CurTuple = hashTuple;
2028  return true;
2029  }
2030  }
2031 
2032  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2033  }
2034 
2035  /*
2036  * no match
2037  */
2038  return false;
2039 }
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1416
uint32 hj_CurHashValue
Definition: execnodes.h:1945
HashJoinTuple hj_CurTuple
Definition: execnodes.h:1948
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:227
unsigned int uint32
Definition: c.h:375
int hj_CurBucketNo
Definition: execnodes.h:1946
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
Definition: executor.h:399
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
static HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable table, int bucketno)
Definition: nodeHash.c:3213
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:1950
static HashJoinTuple ExecParallelHashNextTuple(HashJoinTable table, HashJoinTuple tuple)
Definition: nodeHash.c:3229
HashJoinTable hj_HashTable
Definition: execnodes.h:1944
uint32 hashvalue
Definition: hashjoin.h:75
ExprState * hashclauses
Definition: execnodes.h:1940

◆ ExecPrepHashTableForUnmatched()

void ExecPrepHashTableForUnmatched ( HashJoinState hjstate)

Definition at line 2046 of file nodeHash.c.

References HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurSkewBucketNo, and HashJoinState::hj_CurTuple.

Referenced by ExecHashJoinImpl().

2047 {
2048  /*----------
2049  * During this scan we use the HashJoinState fields as follows:
2050  *
2051  * hj_CurBucketNo: next regular bucket to scan
2052  * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
2053  * hj_CurTuple: last tuple returned, or NULL to start next bucket
2054  *----------
2055  */
2056  hjstate->hj_CurBucketNo = 0;
2057  hjstate->hj_CurSkewBucketNo = 0;
2058  hjstate->hj_CurTuple = NULL;
2059 }
int hj_CurSkewBucketNo
Definition: execnodes.h:1947
HashJoinTuple hj_CurTuple
Definition: execnodes.h:1948
int hj_CurBucketNo
Definition: execnodes.h:1946

◆ ExecReScanHash()

void ExecReScanHash ( HashState node)

Definition at line 2196 of file nodeHash.c.

References PlanState::chgParam, ExecReScan(), PlanState::lefttree, and HashState::ps.

Referenced by ExecReScan().

2197 {
2198  /*
2199  * if chgParam of subnode is not null then plan will be re-scanned by
2200  * first ExecProcNode.
2201  */
2202  if (node->ps.lefttree->chgParam == NULL)
2203  ExecReScan(node->ps.lefttree);
2204 }
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
struct PlanState * lefttree
Definition: execnodes.h:961
PlanState ps
Definition: execnodes.h:2403
Bitmapset * chgParam
Definition: execnodes.h:971

◆ ExecScanHashBucket()

bool ExecScanHashBucket ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 1934 of file nodeHash.c.

References HashJoinTableData::buckets, ExprContext::ecxt_innertuple, ExecQualAndReset(), ExecStoreMinimalTuple(), HashJoinState::hashclauses, HashJoinTupleData::hashvalue, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurHashValue, HashJoinState::hj_CurSkewBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, INVALID_SKEW_BUCKET_NO, HashJoinTupleData::next, HashJoinTableData::skewBucket, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

1936 {
1937  ExprState *hjclauses = hjstate->hashclauses;
1938  HashJoinTable hashtable = hjstate->hj_HashTable;
1939  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
1940  uint32 hashvalue = hjstate->hj_CurHashValue;
1941 
1942  /*
1943  * hj_CurTuple is the address of the tuple last returned from the current
1944  * bucket, or NULL if it's time to start scanning a new bucket.
1945  *
1946  * If the tuple hashed to a skew bucket then scan the skew bucket
1947  * otherwise scan the standard hashtable bucket.
1948  */
1949  if (hashTuple != NULL)
1950  hashTuple = hashTuple->next.unshared;
1951  else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
1952  hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
1953  else
1954  hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
1955 
1956  while (hashTuple != NULL)
1957  {
1958  if (hashTuple->hashvalue == hashvalue)
1959  {
1960  TupleTableSlot *inntuple;
1961 
1962  /* insert hashtable's tuple into exec slot so ExecQual sees it */
1963  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
1964  hjstate->hj_HashTupleSlot,
1965  false); /* do not pfree */
1966  econtext->ecxt_innertuple = inntuple;
1967 
1968  if (ExecQualAndReset(hjclauses, econtext))
1969  {
1970  hjstate->hj_CurTuple = hashTuple;
1971  return true;
1972  }
1973  }
1974 
1975  hashTuple = hashTuple->next.unshared;
1976  }
1977 
1978  /*
1979  * no match
1980  */
1981  return false;
1982 }
#define INVALID_SKEW_BUCKET_NO
Definition: hashjoin.h:109
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1416
union HashJoinTupleData::@92 next
uint32 hj_CurHashValue
Definition: execnodes.h:1945
int hj_CurSkewBucketNo
Definition: execnodes.h:1947
struct HashJoinTupleData * unshared
Definition: hashjoin.h:72
HashJoinTuple hj_CurTuple
Definition: execnodes.h:1948
HashJoinTuple tuples
Definition: hashjoin.h:105
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:227
unsigned int uint32
Definition: c.h:375
union HashJoinTableData::@94 buckets
int hj_CurBucketNo
Definition: execnodes.h:1946
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
Definition: executor.h:399
HashSkewBucket ** skewBucket
Definition: hashjoin.h:305
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:1950
HashJoinTable hj_HashTable
Definition: execnodes.h:1944
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:297
uint32 hashvalue
Definition: hashjoin.h:75
ExprState * hashclauses
Definition: execnodes.h:1940

◆ ExecScanHashTableForUnmatched()

bool ExecScanHashTableForUnmatched ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 2070 of file nodeHash.c.

References HashJoinTableData::buckets, CHECK_FOR_INTERRUPTS, ExprContext::ecxt_innertuple, ExecStoreMinimalTuple(), HeapTupleHeaderHasMatch, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurSkewBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, HashJoinTableData::nbuckets, HashJoinTupleData::next, HashJoinTableData::nSkewBuckets, ResetExprContext, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketNums, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

2071 {
2072  HashJoinTable hashtable = hjstate->hj_HashTable;
2073  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2074 
2075  for (;;)
2076  {
2077  /*
2078  * hj_CurTuple is the address of the tuple last returned from the
2079  * current bucket, or NULL if it's time to start scanning a new
2080  * bucket.
2081  */
2082  if (hashTuple != NULL)
2083  hashTuple = hashTuple->next.unshared;
2084  else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
2085  {
2086  hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
2087  hjstate->hj_CurBucketNo++;
2088  }
2089  else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
2090  {
2091  int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
2092 
2093  hashTuple = hashtable->skewBucket[j]->tuples;
2094  hjstate->hj_CurSkewBucketNo++;
2095  }
2096  else
2097  break; /* finished all buckets */
2098 
2099  while (hashTuple != NULL)
2100  {
2101  if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
2102  {
2103  TupleTableSlot *inntuple;
2104 
2105  /* insert hashtable's tuple into exec slot */
2106  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2107  hjstate->hj_HashTupleSlot,
2108  false); /* do not pfree */
2109  econtext->ecxt_innertuple = inntuple;
2110 
2111  /*
2112  * Reset temp memory each time; although this function doesn't
2113  * do any qual eval, the caller will, so let's keep it
2114  * parallel to ExecScanHashBucket.
2115  */
2116  ResetExprContext(econtext);
2117 
2118  hjstate->hj_CurTuple = hashTuple;
2119  return true;
2120  }
2121 
2122  hashTuple = hashTuple->next.unshared;
2123  }
2124 
2125  /* allow this loop to be cancellable */
2127  }
2128 
2129  /*
2130  * no more unmatched tuples
2131  */
2132  return false;
2133 }
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1416
union HashJoinTupleData::@92 next
int * skewBucketNums
Definition: hashjoin.h:308
int hj_CurSkewBucketNo
Definition: execnodes.h:1947
struct HashJoinTupleData * unshared
Definition: hashjoin.h:72
HashJoinTuple hj_CurTuple
Definition: execnodes.h:1948
HashJoinTuple tuples
Definition: hashjoin.h:105
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:227
union HashJoinTableData::@94 buckets
int hj_CurBucketNo
Definition: execnodes.h:1946
HashSkewBucket ** skewBucket
Definition: hashjoin.h:305
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
#define HeapTupleHeaderHasMatch(tup)
Definition: htup_details.h:516
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:1950
HashJoinTable hj_HashTable
Definition: execnodes.h:1944
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:297
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
#define ResetExprContext(econtext)
Definition: executor.h:503

◆ ExecShutdownHash()

void ExecShutdownHash ( HashState node)

Definition at line 2644 of file nodeHash.c.

References ExecHashAccumInstrumentation(), HashState::hashtable, HashState::hinstrument, PlanState::instrument, palloc0(), and HashState::ps.

Referenced by ExecShutdownNode().

2645 {
2646  /* Allocate save space if EXPLAIN'ing and we didn't do so already */
2647  if (node->ps.instrument && !node->hinstrument)
2648  node->hinstrument = (HashInstrumentation *)
2649  palloc0(sizeof(HashInstrumentation));
2650  /* Now accumulate data for the current (final) hash table */
2651  if (node->hinstrument && node->hashtable)
2653 }
Instrumentation * instrument
Definition: execnodes.h:949
HashJoinTable hashtable
Definition: execnodes.h:2404
void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
Definition: nodeHash.c:2691
PlanState ps
Definition: execnodes.h:2403
HashInstrumentation * hinstrument
Definition: execnodes.h:2420
void * palloc0(Size size)
Definition: mcxt.c:981

◆ MultiExecHash()

Node* MultiExecHash ( HashState node)

Definition at line 106 of file nodeHash.c.

References HashState::hashtable, InstrStartNode(), InstrStopNode(), PlanState::instrument, MultiExecParallelHash(), MultiExecPrivateHash(), HashState::parallel_state, HashJoinTableData::partialTuples, and HashState::ps.

Referenced by MultiExecProcNode().

107 {
108  /* must provide our own instrumentation support */
109  if (node->ps.instrument)
111 
112  if (node->parallel_state != NULL)
113  MultiExecParallelHash(node);
114  else
115  MultiExecPrivateHash(node);
116 
117  /* must provide our own instrumentation support */
118  if (node->ps.instrument)
120 
121  /*
122  * We do not return the hash table directly because it's not a subtype of
123  * Node, and so would violate the MultiExecProcNode API. Instead, our
124  * parent Hashjoin node is expected to know how to fish it out of our node
125  * state. Ugly but not really worth cleaning up, since Hashjoin knows
126  * quite a bit more about Hash besides that.
127  */
128  return NULL;
129 }
struct ParallelHashJoinState * parallel_state
Definition: execnodes.h:2423
void InstrStopNode(Instrumentation *instr, double nTuples)
Definition: instrument.c:83
Instrumentation * instrument
Definition: execnodes.h:949
HashJoinTable hashtable
Definition: execnodes.h:2404
static void MultiExecPrivateHash(HashState *node)
Definition: nodeHash.c:139
double partialTuples
Definition: hashjoin.h:319
void InstrStartNode(Instrumentation *instr)
Definition: instrument.c:67
PlanState ps
Definition: execnodes.h:2403
static void MultiExecParallelHash(HashState *node)
Definition: nodeHash.c:215