PostgreSQL Source Code  git master
nodeHash.h File Reference
#include "access/parallel.h"
#include "nodes/execnodes.h"
Include dependency graph for nodeHash.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

HashStateExecInitHash (Hash *node, EState *estate, int eflags)
 
NodeMultiExecHash (HashState *node)
 
void ExecEndHash (HashState *node)
 
void ExecReScanHash (HashState *node)
 
HashJoinTable ExecHashTableCreate (HashState *state, List *hashOperators, List *hashCollations, bool keepNulls)
 
void ExecParallelHashTableAlloc (HashJoinTable hashtable, int batchno)
 
void ExecHashTableDestroy (HashJoinTable hashtable)
 
void ExecHashTableDetach (HashJoinTable hashtable)
 
void ExecHashTableDetachBatch (HashJoinTable hashtable)
 
void ExecParallelHashTableSetCurrentBatch (HashJoinTable hashtable, int batchno)
 
void ExecHashTableInsert (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
void ExecParallelHashTableInsert (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
void ExecParallelHashTableInsertCurrentBatch (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
bool ExecHashGetHashValue (HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, bool outer_tuple, bool keep_nulls, uint32 *hashvalue)
 
void ExecHashGetBucketAndBatch (HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
 
bool ExecScanHashBucket (HashJoinState *hjstate, ExprContext *econtext)
 
bool ExecParallelScanHashBucket (HashJoinState *hjstate, ExprContext *econtext)
 
void ExecPrepHashTableForUnmatched (HashJoinState *hjstate)
 
bool ExecScanHashTableForUnmatched (HashJoinState *hjstate, ExprContext *econtext)
 
void ExecHashTableReset (HashJoinTable hashtable)
 
void ExecHashTableResetMatchFlags (HashJoinTable hashtable)
 
void ExecChooseHashTableSize (double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
 
int ExecHashGetSkewBucket (HashJoinTable hashtable, uint32 hashvalue)
 
void ExecHashEstimate (HashState *node, ParallelContext *pcxt)
 
void ExecHashInitializeDSM (HashState *node, ParallelContext *pcxt)
 
void ExecHashInitializeWorker (HashState *node, ParallelWorkerContext *pwcxt)
 
void ExecHashRetrieveInstrumentation (HashState *node)
 
void ExecShutdownHash (HashState *node)
 
void ExecHashAccumInstrumentation (HashInstrumentation *instrument, HashJoinTable hashtable)
 

Function Documentation

◆ ExecChooseHashTableSize()

void ExecChooseHashTableSize ( double  ntuples,
int  tupwidth,
bool  useskew,
bool  try_combined_hash_mem,
int  parallel_workers,
size_t *  space_allowed,
int *  numbuckets,
int *  numbatches,
int *  num_skew_mcvs 
)

Definition at line 668 of file nodeHash.c.

References Assert, ExecChooseHashTableSize(), get_hash_memory_limit(), HJTUPLE_OVERHEAD, Max, MAXALIGN, MaxAllocSize, Min, NTUP_PER_BUCKET, pg_nextpower2_32(), pg_nextpower2_size_t, pg_prevpower2_size_t, SizeofMinimalTupleHeader, SKEW_BUCKET_OVERHEAD, and SKEW_HASH_MEM_PERCENT.

Referenced by ExecChooseHashTableSize(), ExecHashTableCreate(), and initial_cost_hashjoin().

675 {
676  int tupsize;
677  double inner_rel_bytes;
678  size_t hash_table_bytes;
679  size_t bucket_bytes;
680  size_t max_pointers;
681  int nbatch = 1;
682  int nbuckets;
683  double dbuckets;
684 
685  /* Force a plausible relation size if no info */
686  if (ntuples <= 0.0)
687  ntuples = 1000.0;
688 
689  /*
690  * Estimate tupsize based on footprint of tuple in hashtable... note this
691  * does not allow for any palloc overhead. The manipulations of spaceUsed
692  * don't count palloc overhead either.
693  */
694  tupsize = HJTUPLE_OVERHEAD +
696  MAXALIGN(tupwidth);
697  inner_rel_bytes = ntuples * tupsize;
698 
699  /*
700  * Compute in-memory hashtable size limit from GUCs.
701  */
702  hash_table_bytes = get_hash_memory_limit();
703 
704  /*
705  * Parallel Hash tries to use the combined hash_mem of all workers to
706  * avoid the need to batch. If that won't work, it falls back to hash_mem
707  * per worker and tries to process batches in parallel.
708  */
709  if (try_combined_hash_mem)
710  {
711  /* Careful, this could overflow size_t */
712  double newlimit;
713 
714  newlimit = (double) hash_table_bytes * (double) (parallel_workers + 1);
715  newlimit = Min(newlimit, (double) SIZE_MAX);
716  hash_table_bytes = (size_t) newlimit;
717  }
718 
719  *space_allowed = hash_table_bytes;
720 
721  /*
722  * If skew optimization is possible, estimate the number of skew buckets
723  * that will fit in the memory allowed, and decrement the assumed space
724  * available for the main hash table accordingly.
725  *
726  * We make the optimistic assumption that each skew bucket will contain
727  * one inner-relation tuple. If that turns out to be low, we will recover
728  * at runtime by reducing the number of skew buckets.
729  *
730  * hashtable->skewBucket will have up to 8 times as many HashSkewBucket
731  * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash
732  * will round up to the next power of 2 and then multiply by 4 to reduce
733  * collisions.
734  */
735  if (useskew)
736  {
737  size_t bytes_per_mcv;
738  size_t skew_mcvs;
739 
740  /*----------
741  * Compute number of MCVs we could hold in hash_table_bytes
742  *
743  * Divisor is:
744  * size of a hash tuple +
745  * worst-case size of skewBucket[] per MCV +
746  * size of skewBucketNums[] entry +
747  * size of skew bucket struct itself
748  *----------
749  */
750  bytes_per_mcv = tupsize +
751  (8 * sizeof(HashSkewBucket *)) +
752  sizeof(int) +
754  skew_mcvs = hash_table_bytes / bytes_per_mcv;
755 
756  /*
757  * Now scale by SKEW_HASH_MEM_PERCENT (we do it in this order so as
758  * not to worry about size_t overflow in the multiplication)
759  */
760  skew_mcvs = (skew_mcvs * SKEW_HASH_MEM_PERCENT) / 100;
761 
762  /* Now clamp to integer range */
763  skew_mcvs = Min(skew_mcvs, INT_MAX);
764 
765  *num_skew_mcvs = (int) skew_mcvs;
766 
767  /* Reduce hash_table_bytes by the amount needed for the skew table */
768  if (skew_mcvs > 0)
769  hash_table_bytes -= skew_mcvs * bytes_per_mcv;
770  }
771  else
772  *num_skew_mcvs = 0;
773 
774  /*
775  * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when
776  * memory is filled, assuming a single batch; but limit the value so that
777  * the pointer arrays we'll try to allocate do not exceed hash_table_bytes
778  * nor MaxAllocSize.
779  *
780  * Note that both nbuckets and nbatch must be powers of 2 to make
781  * ExecHashGetBucketAndBatch fast.
782  */
783  max_pointers = hash_table_bytes / sizeof(HashJoinTuple);
784  max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));
785  /* If max_pointers isn't a power of 2, must round it down to one */
786  max_pointers = pg_prevpower2_size_t(max_pointers);
787 
788  /* Also ensure we avoid integer overflow in nbatch and nbuckets */
789  /* (this step is redundant given the current value of MaxAllocSize) */
790  max_pointers = Min(max_pointers, INT_MAX / 2 + 1);
791 
792  dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
793  dbuckets = Min(dbuckets, max_pointers);
794  nbuckets = (int) dbuckets;
795  /* don't let nbuckets be really small, though ... */
796  nbuckets = Max(nbuckets, 1024);
797  /* ... and force it to be a power of 2. */
798  nbuckets = pg_nextpower2_32(nbuckets);
799 
800  /*
801  * If there's not enough space to store the projected number of tuples and
802  * the required bucket headers, we will need multiple batches.
803  */
804  bucket_bytes = sizeof(HashJoinTuple) * nbuckets;
805  if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
806  {
807  /* We'll need multiple batches */
808  size_t sbuckets;
809  double dbatch;
810  int minbatch;
811  size_t bucket_size;
812 
813  /*
814  * If Parallel Hash with combined hash_mem would still need multiple
815  * batches, we'll have to fall back to regular hash_mem budget.
816  */
817  if (try_combined_hash_mem)
818  {
819  ExecChooseHashTableSize(ntuples, tupwidth, useskew,
820  false, parallel_workers,
821  space_allowed,
822  numbuckets,
823  numbatches,
824  num_skew_mcvs);
825  return;
826  }
827 
828  /*
829  * Estimate the number of buckets we'll want to have when hash_mem is
830  * entirely full. Each bucket will contain a bucket pointer plus
831  * NTUP_PER_BUCKET tuples, whose projected size already includes
832  * overhead for the hash code, pointer to the next tuple, etc.
833  */
834  bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple));
835  sbuckets = pg_nextpower2_size_t(hash_table_bytes / bucket_size);
836  sbuckets = Min(sbuckets, max_pointers);
837  nbuckets = (int) sbuckets;
838  nbuckets = pg_nextpower2_32(nbuckets);
839  bucket_bytes = nbuckets * sizeof(HashJoinTuple);
840 
841  /*
842  * Buckets are simple pointers to hashjoin tuples, while tupsize
843  * includes the pointer, hash code, and MinimalTupleData. So buckets
844  * should never really exceed 25% of hash_mem (even for
845  * NTUP_PER_BUCKET=1); except maybe for hash_mem values that are not
846  * 2^N bytes, where we might get more because of doubling. So let's
847  * look for 50% here.
848  */
849  Assert(bucket_bytes <= hash_table_bytes / 2);
850 
851  /* Calculate required number of batches. */
852  dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
853  dbatch = Min(dbatch, max_pointers);
854  minbatch = (int) dbatch;
855  nbatch = pg_nextpower2_32(Max(2, minbatch));
856  }
857 
858  Assert(nbuckets > 0);
859  Assert(nbatch > 0);
860 
861  *numbuckets = nbuckets;
862  *numbatches = nbatch;
863 }
#define SKEW_HASH_MEM_PERCENT
Definition: hashjoin.h:110
#define SKEW_BUCKET_OVERHEAD
Definition: hashjoin.h:108
#define pg_nextpower2_size_t(num)
Definition: pg_bitutils.h:191
#define Min(x, y)
Definition: c.h:986
#define pg_prevpower2_size_t(num)
Definition: pg_bitutils.h:227
struct HashJoinTupleData * HashJoinTuple
Definition: execnodes.h:2001
static uint32 pg_nextpower2_32(uint32 num)
Definition: pg_bitutils.h:146
#define MaxAllocSize
Definition: memutils.h:40
#define SizeofMinimalTupleHeader
Definition: htup_details.h:648
#define NTUP_PER_BUCKET
Definition: nodeHash.c:665
#define HJTUPLE_OVERHEAD
Definition: hashjoin.h:79
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
Definition: nodeHash.c:668
#define Max(x, y)
Definition: c.h:980
#define Assert(condition)
Definition: c.h:804
size_t get_hash_memory_limit(void)
Definition: nodeHash.c:3401
#define MAXALIGN(LEN)
Definition: c.h:757

◆ ExecEndHash()

void ExecEndHash ( HashState node)

Definition at line 407 of file nodeHash.c.

References ExecEndNode(), ExecFreeExprContext(), outerPlan, outerPlanState, and HashState::ps.

Referenced by ExecEndNode().

408 {
410 
411  /*
412  * free exprcontext
413  */
414  ExecFreeExprContext(&node->ps);
415 
416  /*
417  * shut down the subplan
418  */
419  outerPlan = outerPlanState(node);
420  ExecEndNode(outerPlan);
421 }
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:556
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:650
#define outerPlanState(node)
Definition: execnodes.h:1062
PlanState ps
Definition: execnodes.h:2536
#define outerPlan(node)
Definition: plannodes.h:171

◆ ExecHashAccumInstrumentation()

void ExecHashAccumInstrumentation ( HashInstrumentation instrument,
HashJoinTable  hashtable 
)

Definition at line 2708 of file nodeHash.c.

References Max, HashJoinTableData::nbatch, HashInstrumentation::nbatch, HashJoinTableData::nbatch_original, HashInstrumentation::nbatch_original, HashJoinTableData::nbuckets, HashInstrumentation::nbuckets, HashJoinTableData::nbuckets_original, HashInstrumentation::nbuckets_original, HashInstrumentation::space_peak, and HashJoinTableData::spacePeak.

Referenced by ExecReScanHashJoin(), and ExecShutdownHash().

2710 {
2711  instrument->nbuckets = Max(instrument->nbuckets,
2712  hashtable->nbuckets);
2713  instrument->nbuckets_original = Max(instrument->nbuckets_original,
2714  hashtable->nbuckets_original);
2715  instrument->nbatch = Max(instrument->nbatch,
2716  hashtable->nbatch);
2717  instrument->nbatch_original = Max(instrument->nbatch_original,
2718  hashtable->nbatch_original);
2719  instrument->space_peak = Max(instrument->space_peak,
2720  hashtable->spacePeak);
2721 }
#define Max(x, y)
Definition: c.h:980

◆ ExecHashEstimate()

void ExecHashEstimate ( HashState node,
ParallelContext pcxt 
)

Definition at line 2591 of file nodeHash.c.

References add_size(), ParallelContext::estimator, PlanState::instrument, mul_size(), ParallelContext::nworkers, offsetof, HashState::ps, shm_toc_estimate_chunk, and shm_toc_estimate_keys.

Referenced by ExecParallelEstimate().

2592 {
2593  size_t size;
2594 
2595  /* don't need this if not instrumenting or no workers */
2596  if (!node->ps.instrument || pcxt->nworkers == 0)
2597  return;
2598 
2599  size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation));
2600  size = add_size(size, offsetof(SharedHashInfo, hinstrument));
2601  shm_toc_estimate_chunk(&pcxt->estimator, size);
2602  shm_toc_estimate_keys(&pcxt->estimator, 1);
2603 }
Instrumentation * instrument
Definition: execnodes.h:976
shm_toc_estimator estimator
Definition: parallel.h:42
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
PlanState ps
Definition: execnodes.h:2536
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
#define offsetof(type, field)
Definition: c.h:727

◆ ExecHashGetBucketAndBatch()

void ExecHashGetBucketAndBatch ( HashJoinTable  hashtable,
uint32  hashvalue,
int *  bucketno,
int *  batchno 
)

Definition at line 1919 of file nodeHash.c.

References HashJoinTableData::log2_nbuckets, HashJoinTableData::nbatch, HashJoinTableData::nbuckets, and pg_rotate_right32().

Referenced by ExecHashIncreaseNumBatches(), ExecHashIncreaseNumBuckets(), ExecHashJoinImpl(), ExecHashRemoveNextSkewBucket(), ExecHashTableInsert(), ExecParallelHashIncreaseNumBuckets(), ExecParallelHashJoinPartitionOuter(), ExecParallelHashRepartitionFirst(), ExecParallelHashRepartitionRest(), ExecParallelHashTableInsert(), and ExecParallelHashTableInsertCurrentBatch().

1923 {
1924  uint32 nbuckets = (uint32) hashtable->nbuckets;
1925  uint32 nbatch = (uint32) hashtable->nbatch;
1926 
1927  if (nbatch > 1)
1928  {
1929  *bucketno = hashvalue & (nbuckets - 1);
1930  *batchno = pg_rotate_right32(hashvalue,
1931  hashtable->log2_nbuckets) & (nbatch - 1);
1932  }
1933  else
1934  {
1935  *bucketno = hashvalue & (nbuckets - 1);
1936  *batchno = 0;
1937  }
1938 }
static uint32 pg_rotate_right32(uint32 word, int n)
Definition: pg_bitutils.h:297
unsigned int uint32
Definition: c.h:441

◆ ExecHashGetHashValue()

bool ExecHashGetHashValue ( HashJoinTable  hashtable,
ExprContext econtext,
List hashkeys,
bool  outer_tuple,
bool  keep_nulls,
uint32 hashvalue 
)

Definition at line 1811 of file nodeHash.c.

References HashJoinTableData::collations, DatumGetUInt32, ExprContext::ecxt_per_tuple_memory, ExecEvalExpr(), FunctionCall1Coll(), HashJoinTableData::hashStrict, i, HashJoinTableData::inner_hashfunctions, lfirst, MemoryContextSwitchTo(), HashJoinTableData::outer_hashfunctions, and ResetExprContext.

Referenced by ExecHashJoinOuterGetTuple(), ExecParallelHashJoinOuterGetTuple(), ExecParallelHashJoinPartitionOuter(), MultiExecParallelHash(), and MultiExecPrivateHash().

1817 {
1818  uint32 hashkey = 0;
1819  FmgrInfo *hashfunctions;
1820  ListCell *hk;
1821  int i = 0;
1822  MemoryContext oldContext;
1823 
1824  /*
1825  * We reset the eval context each time to reclaim any memory leaked in the
1826  * hashkey expressions.
1827  */
1828  ResetExprContext(econtext);
1829 
1830  oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
1831 
1832  if (outer_tuple)
1833  hashfunctions = hashtable->outer_hashfunctions;
1834  else
1835  hashfunctions = hashtable->inner_hashfunctions;
1836 
1837  foreach(hk, hashkeys)
1838  {
1839  ExprState *keyexpr = (ExprState *) lfirst(hk);
1840  Datum keyval;
1841  bool isNull;
1842 
1843  /* rotate hashkey left 1 bit at each step */
1844  hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
1845 
1846  /*
1847  * Get the join attribute value of the tuple
1848  */
1849  keyval = ExecEvalExpr(keyexpr, econtext, &isNull);
1850 
1851  /*
1852  * If the attribute is NULL, and the join operator is strict, then
1853  * this tuple cannot pass the join qual so we can reject it
1854  * immediately (unless we're scanning the outside of an outer join, in
1855  * which case we must not reject it). Otherwise we act like the
1856  * hashcode of NULL is zero (this will support operators that act like
1857  * IS NOT DISTINCT, though not any more-random behavior). We treat
1858  * the hash support function as strict even if the operator is not.
1859  *
1860  * Note: currently, all hashjoinable operators must be strict since
1861  * the hash index AM assumes that. However, it takes so little extra
1862  * code here to allow non-strict that we may as well do it.
1863  */
1864  if (isNull)
1865  {
1866  if (hashtable->hashStrict[i] && !keep_nulls)
1867  {
1868  MemoryContextSwitchTo(oldContext);
1869  return false; /* cannot match */
1870  }
1871  /* else, leave hashkey unmodified, equivalent to hashcode 0 */
1872  }
1873  else
1874  {
1875  /* Compute the hash function */
1876  uint32 hkey;
1877 
1878  hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i], hashtable->collations[i], keyval));
1879  hashkey ^= hkey;
1880  }
1881 
1882  i++;
1883  }
1884 
1885  MemoryContextSwitchTo(oldContext);
1886 
1887  *hashvalue = hashkey;
1888  return true;
1889 }
#define DatumGetUInt32(X)
Definition: postgres.h:530
Definition: fmgr.h:56
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:234
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
FmgrInfo * inner_hashfunctions
Definition: hashjoin.h:338
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:316
unsigned int uint32
Definition: c.h:441
FmgrInfo * outer_hashfunctions
Definition: hashjoin.h:337
uintptr_t Datum
Definition: postgres.h:411
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
Definition: fmgr.c:1128
#define lfirst(lc)
Definition: pg_list.h:169
int i
bool * hashStrict
Definition: hashjoin.h:339
#define ResetExprContext(econtext)
Definition: executor.h:527

◆ ExecHashGetSkewBucket()

int ExecHashGetSkewBucket ( HashJoinTable  hashtable,
uint32  hashvalue 
)

Definition at line 2386 of file nodeHash.c.

References HashSkewBucket::hashvalue, INVALID_SKEW_BUCKET_NO, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketLen, and HashJoinTableData::skewEnabled.

Referenced by ExecHashJoinImpl(), and MultiExecPrivateHash().

2387 {
2388  int bucket;
2389 
2390  /*
2391  * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization (in
2392  * particular, this happens after the initial batch is done).
2393  */
2394  if (!hashtable->skewEnabled)
2395  return INVALID_SKEW_BUCKET_NO;
2396 
2397  /*
2398  * Since skewBucketLen is a power of 2, we can do a modulo by ANDing.
2399  */
2400  bucket = hashvalue & (hashtable->skewBucketLen - 1);
2401 
2402  /*
2403  * While we have not hit a hole in the hashtable and have not hit the
2404  * desired bucket, we have collided with some other hash value, so try the
2405  * next bucket location.
2406  */
2407  while (hashtable->skewBucket[bucket] != NULL &&
2408  hashtable->skewBucket[bucket]->hashvalue != hashvalue)
2409  bucket = (bucket + 1) & (hashtable->skewBucketLen - 1);
2410 
2411  /*
2412  * Found the desired bucket?
2413  */
2414  if (hashtable->skewBucket[bucket] != NULL)
2415  return bucket;
2416 
2417  /*
2418  * There must not be any hashtable entry for this hash value.
2419  */
2420  return INVALID_SKEW_BUCKET_NO;
2421 }
#define INVALID_SKEW_BUCKET_NO
Definition: hashjoin.h:109
HashSkewBucket ** skewBucket
Definition: hashjoin.h:305
uint32 hashvalue
Definition: hashjoin.h:104

◆ ExecHashInitializeDSM()

void ExecHashInitializeDSM ( HashState node,
ParallelContext pcxt 
)

Definition at line 2610 of file nodeHash.c.

References PlanState::instrument, SharedHashInfo::num_workers, ParallelContext::nworkers, offsetof, PlanState::plan, Plan::plan_node_id, HashState::ps, HashState::shared_info, shm_toc_allocate(), shm_toc_insert(), and ParallelContext::toc.

Referenced by ExecParallelInitializeDSM().

2611 {
2612  size_t size;
2613 
2614  /* don't need this if not instrumenting or no workers */
2615  if (!node->ps.instrument || pcxt->nworkers == 0)
2616  return;
2617 
2618  size = offsetof(SharedHashInfo, hinstrument) +
2619  pcxt->nworkers * sizeof(HashInstrumentation);
2620  node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size);
2621 
2622  /* Each per-worker area must start out as zeroes. */
2623  memset(node->shared_info, 0, size);
2624 
2625  node->shared_info->num_workers = pcxt->nworkers;
2626  shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id,
2627  node->shared_info);
2628 }
Instrumentation * instrument
Definition: execnodes.h:976
struct HashInstrumentation HashInstrumentation
int plan_node_id
Definition: plannodes.h:140
SharedHashInfo * shared_info
Definition: execnodes.h:2546
PlanState ps
Definition: execnodes.h:2536
Plan * plan
Definition: execnodes.h:966
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
#define offsetof(type, field)
Definition: c.h:727
shm_toc * toc
Definition: parallel.h:45

◆ ExecHashInitializeWorker()

void ExecHashInitializeWorker ( HashState node,
ParallelWorkerContext pwcxt 
)

Definition at line 2635 of file nodeHash.c.

References SharedHashInfo::hinstrument, HashState::hinstrument, PlanState::instrument, ParallelWorkerNumber, PlanState::plan, Plan::plan_node_id, HashState::ps, shm_toc_lookup(), and ParallelWorkerContext::toc.

Referenced by ExecParallelInitializeWorker().

2636 {
2637  SharedHashInfo *shared_info;
2638 
2639  /* don't need this if not instrumenting */
2640  if (!node->ps.instrument)
2641  return;
2642 
2643  /*
2644  * Find our entry in the shared area, and set up a pointer to it so that
2645  * we'll accumulate stats there when shutting down or rebuilding the hash
2646  * table.
2647  */
2648  shared_info = (SharedHashInfo *)
2649  shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
2650  node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber];
2651 }
Instrumentation * instrument
Definition: execnodes.h:976
int plan_node_id
Definition: plannodes.h:140
int ParallelWorkerNumber
Definition: parallel.c:112
PlanState ps
Definition: execnodes.h:2536
HashInstrumentation * hinstrument
Definition: execnodes.h:2553
HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER]
Definition: execnodes.h:2527
Plan * plan
Definition: execnodes.h:966
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232

◆ ExecHashRetrieveInstrumentation()

void ExecHashRetrieveInstrumentation ( HashState node)

Definition at line 2677 of file nodeHash.c.

References SharedHashInfo::num_workers, offsetof, palloc(), and HashState::shared_info.

Referenced by ExecParallelRetrieveInstrumentation().

2678 {
2679  SharedHashInfo *shared_info = node->shared_info;
2680  size_t size;
2681 
2682  if (shared_info == NULL)
2683  return;
2684 
2685  /* Replace node->shared_info with a copy in backend-local memory. */
2686  size = offsetof(SharedHashInfo, hinstrument) +
2687  shared_info->num_workers * sizeof(HashInstrumentation);
2688  node->shared_info = palloc(size);
2689  memcpy(node->shared_info, shared_info, size);
2690 }
struct HashInstrumentation HashInstrumentation
SharedHashInfo * shared_info
Definition: execnodes.h:2546
void * palloc(Size size)
Definition: mcxt.c:1062
#define offsetof(type, field)
Definition: c.h:727

◆ ExecHashTableCreate()

HashJoinTable ExecHashTableCreate ( HashState state,
List hashOperators,
List hashCollations,
bool  keepNulls 
)

Definition at line 431 of file nodeHash.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, HashJoinTableData::area, Assert, BarrierArriveAndWait(), BarrierAttach(), BarrierPhase(), HashJoinTableData::batchCxt, HashJoinTableData::batches, HashJoinTableData::buckets, ParallelHashJoinState::build_barrier, HashJoinTableData::chunks, HashJoinTableData::collations, HashJoinTableData::curbatch, HashJoinTableData::current_chunk, CurrentMemoryContext, elog, ERROR, EState::es_query_dsa, ExecChooseHashTableSize(), ExecHashBuildSkewHash(), ExecParallelHashJoinSetUpBatches(), ExecParallelHashTableAlloc(), fmgr_info(), forboth, get_op_hash_functions(), HashJoinTableData::growEnabled, ParallelHashJoinState::growth, HashJoinTableData::hashCxt, HashJoinTableData::hashStrict, i, HashJoinTableData::inner_hashfunctions, HashJoinTableData::innerBatchFile, HashJoinTableData::keepNulls, lfirst_oid, list_length(), HashJoinTableData::log2_nbuckets, HashJoinTableData::log2_nbuckets_optimal, MemoryContextSwitchTo(), my_log2(), ParallelHashJoinState::nbatch, HashJoinTableData::nbatch, HashJoinTableData::nbatch_original, HashJoinTableData::nbatch_outstart, ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, HashJoinTableData::nbuckets_optimal, HashJoinTableData::nbuckets_original, ParallelHashJoinState::nparticipants, HashJoinTableData::nSkewBuckets, OidIsValid, op_strict(), HashJoinTableData::outer_hashfunctions, HashJoinTableData::outerBatchFile, outerPlan, palloc(), palloc0(), Plan::parallel_aware, HashJoinTableData::parallel_state, HashState::parallel_state, HashJoinTableData::partialTuples, PHJ_BUILD_ELECTING, PHJ_GROWTH_OK, PlanState::plan, Hash::plan, Plan::plan_rows, Plan::plan_width, PrepareTempTablespaces(), printf, HashState::ps, Hash::rows_total, SKEW_HASH_MEM_PERCENT, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketLen, HashJoinTableData::skewBucketNums, HashJoinTableData::skewEnabled, Hash::skewTable, HashJoinTableData::skewTuples, ParallelHashJoinState::space_allowed, HashJoinTableData::spaceAllowed, HashJoinTableData::spaceAllowedSkew, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, HashJoinTableData::spaceUsedSkew, PlanState::state, HashJoinTableData::totalTuples, HashJoinTableData::unshared, and WAIT_EVENT_HASH_BUILD_ELECT.

Referenced by ExecHashJoinImpl().

432 {
433  Hash *node;
434  HashJoinTable hashtable;
435  Plan *outerNode;
436  size_t space_allowed;
437  int nbuckets;
438  int nbatch;
439  double rows;
440  int num_skew_mcvs;
441  int log2_nbuckets;
442  int nkeys;
443  int i;
444  ListCell *ho;
445  ListCell *hc;
446  MemoryContext oldcxt;
447 
448  /*
449  * Get information about the size of the relation to be hashed (it's the
450  * "outer" subtree of this node, but the inner relation of the hashjoin).
451  * Compute the appropriate size of the hash table.
452  */
453  node = (Hash *) state->ps.plan;
454  outerNode = outerPlan(node);
455 
456  /*
457  * If this is shared hash table with a partial plan, then we can't use
458  * outerNode->plan_rows to estimate its size. We need an estimate of the
459  * total number of rows across all copies of the partial plan.
460  */
461  rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows;
462 
463  ExecChooseHashTableSize(rows, outerNode->plan_width,
464  OidIsValid(node->skewTable),
465  state->parallel_state != NULL,
466  state->parallel_state != NULL ?
467  state->parallel_state->nparticipants - 1 : 0,
468  &space_allowed,
469  &nbuckets, &nbatch, &num_skew_mcvs);
470 
471  /* nbuckets must be a power of 2 */
472  log2_nbuckets = my_log2(nbuckets);
473  Assert(nbuckets == (1 << log2_nbuckets));
474 
475  /*
476  * Initialize the hash table control block.
477  *
478  * The hashtable control block is just palloc'd from the executor's
479  * per-query memory context. Everything else should be kept inside the
480  * subsidiary hashCxt or batchCxt.
481  */
482  hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
483  hashtable->nbuckets = nbuckets;
484  hashtable->nbuckets_original = nbuckets;
485  hashtable->nbuckets_optimal = nbuckets;
486  hashtable->log2_nbuckets = log2_nbuckets;
487  hashtable->log2_nbuckets_optimal = log2_nbuckets;
488  hashtable->buckets.unshared = NULL;
489  hashtable->keepNulls = keepNulls;
490  hashtable->skewEnabled = false;
491  hashtable->skewBucket = NULL;
492  hashtable->skewBucketLen = 0;
493  hashtable->nSkewBuckets = 0;
494  hashtable->skewBucketNums = NULL;
495  hashtable->nbatch = nbatch;
496  hashtable->curbatch = 0;
497  hashtable->nbatch_original = nbatch;
498  hashtable->nbatch_outstart = nbatch;
499  hashtable->growEnabled = true;
500  hashtable->totalTuples = 0;
501  hashtable->partialTuples = 0;
502  hashtable->skewTuples = 0;
503  hashtable->innerBatchFile = NULL;
504  hashtable->outerBatchFile = NULL;
505  hashtable->spaceUsed = 0;
506  hashtable->spacePeak = 0;
507  hashtable->spaceAllowed = space_allowed;
508  hashtable->spaceUsedSkew = 0;
509  hashtable->spaceAllowedSkew =
510  hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
511  hashtable->chunks = NULL;
512  hashtable->current_chunk = NULL;
513  hashtable->parallel_state = state->parallel_state;
514  hashtable->area = state->ps.state->es_query_dsa;
515  hashtable->batches = NULL;
516 
517 #ifdef HJDEBUG
518  printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
519  hashtable, nbatch, nbuckets);
520 #endif
521 
522  /*
523  * Create temporary memory contexts in which to keep the hashtable working
524  * storage. See notes in executor/hashjoin.h.
525  */
527  "HashTableContext",
529 
530  hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
531  "HashBatchContext",
533 
534  /* Allocate data that will live for the life of the hashjoin */
535 
536  oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
537 
538  /*
539  * Get info about the hash functions to be used for each hash key. Also
540  * remember whether the join operators are strict.
541  */
542  nkeys = list_length(hashOperators);
543  hashtable->outer_hashfunctions =
544  (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
545  hashtable->inner_hashfunctions =
546  (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
547  hashtable->hashStrict = (bool *) palloc(nkeys * sizeof(bool));
548  hashtable->collations = (Oid *) palloc(nkeys * sizeof(Oid));
549  i = 0;
550  forboth(ho, hashOperators, hc, hashCollations)
551  {
552  Oid hashop = lfirst_oid(ho);
553  Oid left_hashfn;
554  Oid right_hashfn;
555 
556  if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
557  elog(ERROR, "could not find hash function for hash operator %u",
558  hashop);
559  fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]);
560  fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]);
561  hashtable->hashStrict[i] = op_strict(hashop);
562  hashtable->collations[i] = lfirst_oid(hc);
563  i++;
564  }
565 
566  if (nbatch > 1 && hashtable->parallel_state == NULL)
567  {
568  /*
569  * allocate and initialize the file arrays in hashCxt (not needed for
570  * parallel case which uses shared tuplestores instead of raw files)
571  */
572  hashtable->innerBatchFile = (BufFile **)
573  palloc0(nbatch * sizeof(BufFile *));
574  hashtable->outerBatchFile = (BufFile **)
575  palloc0(nbatch * sizeof(BufFile *));
576  /* The files will not be opened until needed... */
577  /* ... but make sure we have temp tablespaces established for them */
579  }
580 
581  MemoryContextSwitchTo(oldcxt);
582 
583  if (hashtable->parallel_state)
584  {
585  ParallelHashJoinState *pstate = hashtable->parallel_state;
586  Barrier *build_barrier;
587 
588  /*
589  * Attach to the build barrier. The corresponding detach operation is
590  * in ExecHashTableDetach. Note that we won't attach to the
591  * batch_barrier for batch 0 yet. We'll attach later and start it out
592  * in PHJ_BATCH_PROBING phase, because batch 0 is allocated up front
593  * and then loaded while hashing (the standard hybrid hash join
594  * algorithm), and we'll coordinate that using build_barrier.
595  */
596  build_barrier = &pstate->build_barrier;
597  BarrierAttach(build_barrier);
598 
599  /*
600  * So far we have no idea whether there are any other participants,
601  * and if so, what phase they are working on. The only thing we care
602  * about at this point is whether someone has already created the
603  * SharedHashJoinBatch objects and the hash table for batch 0. One
604  * backend will be elected to do that now if necessary.
605  */
606  if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECTING &&
608  {
609  pstate->nbatch = nbatch;
610  pstate->space_allowed = space_allowed;
611  pstate->growth = PHJ_GROWTH_OK;
612 
613  /* Set up the shared state for coordinating batches. */
614  ExecParallelHashJoinSetUpBatches(hashtable, nbatch);
615 
616  /*
617  * Allocate batch 0's hash table up front so we can load it
618  * directly while hashing.
619  */
620  pstate->nbuckets = nbuckets;
621  ExecParallelHashTableAlloc(hashtable, 0);
622  }
623 
624  /*
625  * The next Parallel Hash synchronization point is in
626  * MultiExecParallelHash(), which will progress it all the way to
627  * PHJ_BUILD_DONE. The caller must not return control from this
628  * executor node between now and then.
629  */
630  }
631  else
632  {
633  /*
634  * Prepare context for the first-scan space allocations; allocate the
635  * hashbucket array therein, and set each bucket "empty".
636  */
637  MemoryContextSwitchTo(hashtable->batchCxt);
638 
639  hashtable->buckets.unshared = (HashJoinTuple *)
640  palloc0(nbuckets * sizeof(HashJoinTuple));
641 
642  /*
643  * Set up for skew optimization, if possible and there's a need for
644  * more than one batch. (In a one-batch join, there's no point in
645  * it.)
646  */
647  if (nbatch > 1)
648  ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
649 
650  MemoryContextSwitchTo(oldcxt);
651  }
652 
653  return hashtable;
654 }
int log2_nbuckets_optimal
Definition: hashjoin.h:291
Oid skewTable
Definition: plannodes.h:975
struct ParallelHashJoinState * parallel_state
Definition: execnodes.h:2556
double skewTuples
Definition: hashjoin.h:320
#define SKEW_HASH_MEM_PERCENT
Definition: hashjoin.h:110
Definition: fmgr.h:56
struct dsa_area * es_query_dsa
Definition: execnodes.h:634
bool op_strict(Oid opno)
Definition: lsyscache.c:1448
#define AllocSetContextCreate
Definition: memutils.h:173
bool get_op_hash_functions(Oid opno, RegProcedure *lhs_procno, RegProcedure *rhs_procno)
Definition: lsyscache.c:508
#define forboth(cell1, list1, cell2, list2)
Definition: pg_list.h:446
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
union HashJoinTableData::@93 buckets
#define printf(...)
Definition: port.h:222
FmgrInfo * inner_hashfunctions
Definition: hashjoin.h:338
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
Definition: nodeHash.c:3116
EState * state
Definition: execnodes.h:968
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:710
static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
Definition: nodeHash.c:2233
double partialTuples
Definition: hashjoin.h:319
dsa_area * area
Definition: hashjoin.h:356
int * skewBucketNums
Definition: hashjoin.h:308
#define ERROR
Definition: elog.h:46
void PrepareTempTablespaces(void)
Definition: tablespace.c:1332
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:126
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
BufFile ** outerBatchFile
Definition: hashjoin.h:330
Size spaceAllowedSkew
Definition: hashjoin.h:346
bool parallel_aware
Definition: plannodes.h:129
PlanState ps
Definition: execnodes.h:2536
#define PHJ_BUILD_ELECTING
Definition: hashjoin.h:257
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
MemoryContext batchCxt
Definition: hashjoin.h:349
struct HashJoinTableData * HashJoinTable
Definition: execnodes.h:2002
int my_log2(long num)
Definition: dynahash.c:1765
#define outerPlan(node)
Definition: plannodes.h:171
FmgrInfo * outer_hashfunctions
Definition: hashjoin.h:337
Cardinality rows_total
Definition: plannodes.h:979
HashSkewBucket ** skewBucket
Definition: hashjoin.h:305
int BarrierAttach(Barrier *barrier)
Definition: barrier.c:236
void * palloc0(Size size)
Definition: mcxt.c:1093
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
Plan * plan
Definition: execnodes.h:966
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
Definition: nodeHash.c:668
double totalTuples
Definition: hashjoin.h:318
int plan_width
Definition: plannodes.h:124
#define Assert(condition)
Definition: c.h:804
ParallelHashGrowth growth
Definition: hashjoin.h:241
BufFile ** innerBatchFile
Definition: hashjoin.h:329
static int list_length(const List *l)
Definition: pg_list.h:149
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:265
Cardinality plan_rows
Definition: plannodes.h:123
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
Definition: barrier.c:125
HashMemoryChunk chunks
Definition: hashjoin.h:352
Plan plan
Definition: plannodes.h:968
void * palloc(Size size)
Definition: mcxt.c:1062
static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
Definition: nodeHash.c:2955
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:297
HashMemoryChunk current_chunk
Definition: hashjoin.h:355
#define elog(elevel,...)
Definition: elog.h:232
int i
bool * hashStrict
Definition: hashjoin.h:339
MemoryContext hashCxt
Definition: hashjoin.h:348
#define lfirst_oid(lc)
Definition: pg_list.h:171

◆ ExecHashTableDestroy()

void ExecHashTableDestroy ( HashJoinTable  hashtable)

Definition at line 873 of file nodeHash.c.

References BufFileClose(), HashJoinTableData::hashCxt, i, HashJoinTableData::innerBatchFile, MemoryContextDelete(), HashJoinTableData::nbatch, HashJoinTableData::outerBatchFile, and pfree().

Referenced by ExecEndHashJoin(), and ExecReScanHashJoin().

874 {
875  int i;
876 
877  /*
878  * Make sure all the temp files are closed. We skip batch 0, since it
879  * can't have any temp files (and the arrays might not even exist if
880  * nbatch is only 1). Parallel hash joins don't use these files.
881  */
882  if (hashtable->innerBatchFile != NULL)
883  {
884  for (i = 1; i < hashtable->nbatch; i++)
885  {
886  if (hashtable->innerBatchFile[i])
887  BufFileClose(hashtable->innerBatchFile[i]);
888  if (hashtable->outerBatchFile[i])
889  BufFileClose(hashtable->outerBatchFile[i]);
890  }
891  }
892 
893  /* Release working memory (batchCxt is a child, so it goes away too) */
894  MemoryContextDelete(hashtable->hashCxt);
895 
896  /* And drop the control block */
897  pfree(hashtable);
898 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
void BufFileClose(BufFile *file)
Definition: buffile.c:407
void pfree(void *pointer)
Definition: mcxt.c:1169
BufFile ** outerBatchFile
Definition: hashjoin.h:330
BufFile ** innerBatchFile
Definition: hashjoin.h:329
int i
MemoryContext hashCxt
Definition: hashjoin.h:348

◆ ExecHashTableDetach()

void ExecHashTableDetach ( HashJoinTable  hashtable)

Definition at line 3193 of file nodeHash.c.

References HashJoinTableData::area, BarrierDetach(), ParallelHashJoinState::batches, HashJoinTableData::batches, ParallelHashJoinState::build_barrier, dsa_free(), DsaPointerIsValid, i, ParallelHashJoinBatchAccessor::inner_tuples, InvalidDsaPointer, HashJoinTableData::nbatch, ParallelHashJoinBatchAccessor::outer_tuples, HashJoinTableData::parallel_state, sts_end_parallel_scan(), and sts_end_write().

Referenced by ExecHashJoinReInitializeDSM(), and ExecShutdownHashJoin().

3194 {
3195  if (hashtable->parallel_state)
3196  {
3197  ParallelHashJoinState *pstate = hashtable->parallel_state;
3198  int i;
3199 
3200  /* Make sure any temporary files are closed. */
3201  if (hashtable->batches)
3202  {
3203  for (i = 0; i < hashtable->nbatch; ++i)
3204  {
3205  sts_end_write(hashtable->batches[i].inner_tuples);
3206  sts_end_write(hashtable->batches[i].outer_tuples);
3209  }
3210  }
3211 
3212  /* If we're last to detach, clean up shared memory. */
3213  if (BarrierDetach(&pstate->build_barrier))
3214  {
3215  if (DsaPointerIsValid(pstate->batches))
3216  {
3217  dsa_free(hashtable->area, pstate->batches);
3218  pstate->batches = InvalidDsaPointer;
3219  }
3220  }
3221 
3222  hashtable->parallel_state = NULL;
3223  }
3224 }
SharedTuplestoreAccessor * outer_tuples
Definition: hashjoin.h:209
#define InvalidDsaPointer
Definition: dsa.h:78
SharedTuplestoreAccessor * inner_tuples
Definition: hashjoin.h:208
dsa_area * area
Definition: hashjoin.h:356
dsa_pointer batches
Definition: hashjoin.h:236
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
bool BarrierDetach(Barrier *barrier)
Definition: barrier.c:256
#define DsaPointerIsValid(x)
Definition: dsa.h:81
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:820
int i
void sts_end_write(SharedTuplestoreAccessor *accessor)

◆ ExecHashTableDetachBatch()

void ExecHashTableDetachBatch ( HashJoinTable  hashtable)

Definition at line 3136 of file nodeHash.c.

References HashJoinTableData::area, Assert, BarrierArriveAndDetach(), BarrierPhase(), ParallelHashJoinBatch::batch_barrier, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, ParallelHashJoinBatch::chunks, HashJoinTableData::curbatch, dsa_free(), dsa_get_address(), DsaPointerIsValid, ParallelHashJoinBatchAccessor::inner_tuples, InvalidDsaPointer, Max, HashJoinTableData::nbuckets, HashMemoryChunkData::next, next, ParallelHashJoinBatchAccessor::outer_tuples, HashJoinTableData::parallel_state, PHJ_BATCH_DONE, HashMemoryChunkData::shared, ParallelHashJoinBatchAccessor::shared, ParallelHashJoinBatch::size, HashJoinTableData::spacePeak, and sts_end_parallel_scan().

Referenced by ExecHashJoinReInitializeDSM(), ExecParallelHashJoinNewBatch(), and ExecShutdownHashJoin().

3137 {
3138  if (hashtable->parallel_state != NULL &&
3139  hashtable->curbatch >= 0)
3140  {
3141  int curbatch = hashtable->curbatch;
3142  ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
3143 
3144  /* Make sure any temporary files are closed. */
3145  sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
3146  sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
3147 
3148  /* Detach from the batch we were last working on. */
3150  {
3151  /*
3152  * Technically we shouldn't access the barrier because we're no
3153  * longer attached, but since there is no way it's moving after
3154  * this point it seems safe to make the following assertion.
3155  */
3157 
3158  /* Free shared chunks and buckets. */
3159  while (DsaPointerIsValid(batch->chunks))
3160  {
3161  HashMemoryChunk chunk =
3162  dsa_get_address(hashtable->area, batch->chunks);
3163  dsa_pointer next = chunk->next.shared;
3164 
3165  dsa_free(hashtable->area, batch->chunks);
3166  batch->chunks = next;
3167  }
3168  if (DsaPointerIsValid(batch->buckets))
3169  {
3170  dsa_free(hashtable->area, batch->buckets);
3171  batch->buckets = InvalidDsaPointer;
3172  }
3173  }
3174 
3175  /*
3176  * Track the largest batch we've been attached to. Though each
3177  * backend might see a different subset of batches, explain.c will
3178  * scan the results from all backends to find the largest value.
3179  */
3180  hashtable->spacePeak =
3181  Max(hashtable->spacePeak,
3182  batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
3183 
3184  /* Remember that we are not attached to a batch. */
3185  hashtable->curbatch = -1;
3186  }
3187 }
SharedTuplestoreAccessor * outer_tuples
Definition: hashjoin.h:209
#define PHJ_BATCH_DONE
Definition: hashjoin.h:268
static int32 next
Definition: blutils.c:219
#define InvalidDsaPointer
Definition: dsa.h:78
dsa_pointer chunks
Definition: hashjoin.h:156
union HashMemoryChunkData::@92 next
uint64 dsa_pointer
Definition: dsa.h:62
SharedTuplestoreAccessor * inner_tuples
Definition: hashjoin.h:208
dsa_area * area
Definition: hashjoin.h:356
dsa_pointer shared
Definition: hashjoin.h:127
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:932
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
bool BarrierArriveAndDetach(Barrier *barrier)
Definition: barrier.c:203
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
#define Max(x, y)
Definition: c.h:980
#define Assert(condition)
Definition: c.h:804
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:265
ParallelHashJoinBatch * shared
Definition: hashjoin.h:197
#define DsaPointerIsValid(x)
Definition: dsa.h:81
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:820
dsa_pointer buckets
Definition: hashjoin.h:153

◆ ExecHashTableInsert()

void ExecHashTableInsert ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1613 of file nodeHash.c.

References Assert, HashJoinTableData::buckets, HashJoinTableData::curbatch, dense_alloc(), ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecHashIncreaseNumBatches(), ExecHashJoinSaveTuple(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, HashJoinTableData::innerBatchFile, HashJoinTableData::log2_nbuckets_optimal, MaxAllocSize, HashJoinTableData::nbatch, HashJoinTableData::nbuckets_optimal, HashJoinTupleData::next, NTUP_PER_BUCKET, HashJoinTableData::skewTuples, HashJoinTableData::spaceAllowed, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, MinimalTupleData::t_len, HashJoinTableData::totalTuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinNewBatch(), and MultiExecPrivateHash().

1616 {
1617  bool shouldFree;
1618  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1619  int bucketno;
1620  int batchno;
1621 
1622  ExecHashGetBucketAndBatch(hashtable, hashvalue,
1623  &bucketno, &batchno);
1624 
1625  /*
1626  * decide whether to put the tuple in the hash table or a temp file
1627  */
1628  if (batchno == hashtable->curbatch)
1629  {
1630  /*
1631  * put the tuple in hash table
1632  */
1633  HashJoinTuple hashTuple;
1634  int hashTupleSize;
1635  double ntuples = (hashtable->totalTuples - hashtable->skewTuples);
1636 
1637  /* Create the HashJoinTuple */
1638  hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
1639  hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
1640 
1641  hashTuple->hashvalue = hashvalue;
1642  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1643 
1644  /*
1645  * We always reset the tuple-matched flag on insertion. This is okay
1646  * even when reloading a tuple from a batch file, since the tuple
1647  * could not possibly have been matched to an outer tuple before it
1648  * went into the batch file.
1649  */
1651 
1652  /* Push it onto the front of the bucket's list */
1653  hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
1654  hashtable->buckets.unshared[bucketno] = hashTuple;
1655 
1656  /*
1657  * Increase the (optimal) number of buckets if we just exceeded the
1658  * NTUP_PER_BUCKET threshold, but only when there's still a single
1659  * batch.
1660  */
1661  if (hashtable->nbatch == 1 &&
1662  ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))
1663  {
1664  /* Guard against integer overflow and alloc size overflow */
1665  if (hashtable->nbuckets_optimal <= INT_MAX / 2 &&
1666  hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple))
1667  {
1668  hashtable->nbuckets_optimal *= 2;
1669  hashtable->log2_nbuckets_optimal += 1;
1670  }
1671  }
1672 
1673  /* Account for space used, and back off if we've used too much */
1674  hashtable->spaceUsed += hashTupleSize;
1675  if (hashtable->spaceUsed > hashtable->spacePeak)
1676  hashtable->spacePeak = hashtable->spaceUsed;
1677  if (hashtable->spaceUsed +
1678  hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
1679  > hashtable->spaceAllowed)
1680  ExecHashIncreaseNumBatches(hashtable);
1681  }
1682  else
1683  {
1684  /*
1685  * put the tuple into a temp file for later batches
1686  */
1687  Assert(batchno > hashtable->curbatch);
1688  ExecHashJoinSaveTuple(tuple,
1689  hashvalue,
1690  &hashtable->innerBatchFile[batchno]);
1691  }
1692 
1693  if (shouldFree)
1694  heap_free_minimal_tuple(tuple);
1695 }
int log2_nbuckets_optimal
Definition: hashjoin.h:291
double skewTuples
Definition: hashjoin.h:320
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1692
union HashJoinTableData::@93 buckets
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
Definition: nodeHash.c:906
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
Definition: nodeHash.c:1919
union HashJoinTupleData::@91 next
struct HashJoinTupleData * unshared
Definition: hashjoin.h:72
void heap_free_minimal_tuple(MinimalTuple mtup)
Definition: heaptuple.c:1427
struct HashJoinTupleData * HashJoinTuple
Definition: execnodes.h:2001
#define MaxAllocSize
Definition: memutils.h:40
static void * dense_alloc(HashJoinTable hashtable, Size size)
Definition: nodeHash.c:2727
#define NTUP_PER_BUCKET
Definition: nodeHash.c:665
#define HJTUPLE_OVERHEAD
Definition: hashjoin.h:79
double totalTuples
Definition: hashjoin.h:318
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
#define Assert(condition)
Definition: c.h:804
BufFile ** innerBatchFile
Definition: hashjoin.h:329
#define HeapTupleHeaderClearMatch(tup)
Definition: htup_details.h:525
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:297
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr)
uint32 hashvalue
Definition: hashjoin.h:75

◆ ExecHashTableReset()

void ExecHashTableReset ( HashJoinTable  hashtable)

Definition at line 2158 of file nodeHash.c.

References HashJoinTableData::batchCxt, HashJoinTableData::buckets, HashJoinTableData::chunks, MemoryContextReset(), MemoryContextSwitchTo(), HashJoinTableData::nbuckets, palloc0(), HashJoinTableData::spaceUsed, and HashJoinTableData::unshared.

Referenced by ExecHashJoinNewBatch().

2159 {
2160  MemoryContext oldcxt;
2161  int nbuckets = hashtable->nbuckets;
2162 
2163  /*
2164  * Release all the hash buckets and tuples acquired in the prior pass, and
2165  * reinitialize the context for a new pass.
2166  */
2167  MemoryContextReset(hashtable->batchCxt);
2168  oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
2169 
2170  /* Reallocate and reinitialize the hash bucket headers. */
2171  hashtable->buckets.unshared = (HashJoinTuple *)
2172  palloc0(nbuckets * sizeof(HashJoinTuple));
2173 
2174  hashtable->spaceUsed = 0;
2175 
2176  MemoryContextSwitchTo(oldcxt);
2177 
2178  /* Forget the chunks (the memory was freed by the context reset above). */
2179  hashtable->chunks = NULL;
2180 }
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
union HashJoinTableData::@93 buckets
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143
MemoryContext batchCxt
Definition: hashjoin.h:349
void * palloc0(Size size)
Definition: mcxt.c:1093
HashMemoryChunk chunks
Definition: hashjoin.h:352
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:297

◆ ExecHashTableResetMatchFlags()

void ExecHashTableResetMatchFlags ( HashJoinTable  hashtable)

Definition at line 2187 of file nodeHash.c.

References HashJoinTableData::buckets, HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, i, HashJoinTableData::nbuckets, HashJoinTupleData::next, HashJoinTableData::nSkewBuckets, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketNums, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecReScanHashJoin().

2188 {
2189  HashJoinTuple tuple;
2190  int i;
2191 
2192  /* Reset all flags in the main table ... */
2193  for (i = 0; i < hashtable->nbuckets; i++)
2194  {
2195  for (tuple = hashtable->buckets.unshared[i]; tuple != NULL;
2196  tuple = tuple->next.unshared)
2198  }
2199 
2200  /* ... and the same for the skew buckets, if any */
2201  for (i = 0; i < hashtable->nSkewBuckets; i++)
2202  {
2203  int j = hashtable->skewBucketNums[i];
2204  HashSkewBucket *skewBucket = hashtable->skewBucket[j];
2205 
2206  for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared)
2208  }
2209 }
union HashJoinTableData::@93 buckets
int * skewBucketNums
Definition: hashjoin.h:308
union HashJoinTupleData::@91 next
struct HashJoinTupleData * unshared
Definition: hashjoin.h:72
HashJoinTuple tuples
Definition: hashjoin.h:105
HashSkewBucket ** skewBucket
Definition: hashjoin.h:305
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
#define HeapTupleHeaderClearMatch(tup)
Definition: htup_details.h:525
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:297
int i

◆ ExecInitHash()

HashState* ExecInitHash ( Hash node,
EState estate,
int  eflags 
)

Definition at line 354 of file nodeHash.c.

References Assert, EXEC_FLAG_BACKWARD, EXEC_FLAG_MARK, ExecAssignExprContext(), ExecHash(), ExecInitExprList(), ExecInitNode(), ExecInitResultTupleSlotTL(), PlanState::ExecProcNode, Hash::hashkeys, HashState::hashkeys, HashState::hashtable, makeNode, NIL, outerPlan, outerPlanState, PlanState::plan, Hash::plan, HashState::ps, PlanState::ps_ProjInfo, Plan::qual, PlanState::state, and TTSOpsMinimalTuple.

Referenced by ExecInitNode().

355 {
356  HashState *hashstate;
357 
358  /* check for unsupported flags */
359  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
360 
361  /*
362  * create state structure
363  */
364  hashstate = makeNode(HashState);
365  hashstate->ps.plan = (Plan *) node;
366  hashstate->ps.state = estate;
367  hashstate->ps.ExecProcNode = ExecHash;
368  hashstate->hashtable = NULL;
369  hashstate->hashkeys = NIL; /* will be set by parent HashJoin */
370 
371  /*
372  * Miscellaneous initialization
373  *
374  * create expression context for node
375  */
376  ExecAssignExprContext(estate, &hashstate->ps);
377 
378  /*
379  * initialize child nodes
380  */
381  outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags);
382 
383  /*
384  * initialize our result slot and type. No need to build projection
385  * because this node doesn't do projections.
386  */
388  hashstate->ps.ps_ProjInfo = NULL;
389 
390  /*
391  * initialize child expressions
392  */
393  Assert(node->plan.qual == NIL);
394  hashstate->hashkeys =
395  ExecInitExprList(node->hashkeys, (PlanState *) hashstate);
396 
397  return hashstate;
398 }
#define NIL
Definition: pg_list.h:65
List * qual
Definition: plannodes.h:142
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:1006
HashJoinTable hashtable
Definition: execnodes.h:2537
EState * state
Definition: execnodes.h:968
#define EXEC_FLAG_BACKWARD
Definition: executor.h:58
#define outerPlanState(node)
Definition: execnodes.h:1062
List * ExecInitExprList(List *nodes, PlanState *parent)
Definition: execExpr.c:318
List * hashkeys
Definition: execnodes.h:2538
PlanState ps
Definition: execnodes.h:2536
#define outerPlan(node)
Definition: plannodes.h:171
static TupleTableSlot * ExecHash(PlanState *pstate)
Definition: nodeHash.c:92
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:972
List * hashkeys
Definition: plannodes.h:974
Plan * plan
Definition: execnodes.h:966
#define makeNode(_type_)
Definition: nodes.h:584
#define Assert(condition)
Definition: c.h:804
#define EXEC_FLAG_MARK
Definition: executor.h:59
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:480
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1799
Plan plan
Definition: plannodes.h:968
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:141
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:85

◆ ExecParallelHashTableAlloc()

void ExecParallelHashTableAlloc ( HashJoinTable  hashtable,
int  batchno 
)

Definition at line 3116 of file nodeHash.c.

References HashJoinTableData::area, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, dsa_allocate, dsa_get_address(), dsa_pointer_atomic_init, i, InvalidDsaPointer, ParallelHashJoinState::nbuckets, HashJoinTableData::parallel_state, and ParallelHashJoinBatchAccessor::shared.

Referenced by ExecHashTableCreate(), and ExecParallelHashJoinNewBatch().

3117 {
3118  ParallelHashJoinBatch *batch = hashtable->batches[batchno].shared;
3119  dsa_pointer_atomic *buckets;
3120  int nbuckets = hashtable->parallel_state->nbuckets;
3121  int i;
3122 
3123  batch->buckets =
3124  dsa_allocate(hashtable->area, sizeof(dsa_pointer_atomic) * nbuckets);
3125  buckets = (dsa_pointer_atomic *)
3126  dsa_get_address(hashtable->area, batch->buckets);
3127  for (i = 0; i < nbuckets; ++i)
3129 }
#define InvalidDsaPointer
Definition: dsa.h:78
dsa_area * area
Definition: hashjoin.h:356
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:932
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
ParallelHashJoinBatch * shared
Definition: hashjoin.h:197
#define dsa_pointer_atomic_init
Definition: dsa.h:64
int i
#define dsa_allocate(area, size)
Definition: dsa.h:84
dsa_pointer buckets
Definition: hashjoin.h:153

◆ ExecParallelHashTableInsert()

void ExecParallelHashTableInsert ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1702 of file nodeHash.c.

References Assert, BarrierPhase(), HashJoinTableData::batches, HashJoinTableData::buckets, ParallelHashJoinState::build_barrier, ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecParallelHashPushTuple(), ExecParallelHashTupleAlloc(), ExecParallelHashTuplePrealloc(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, ParallelHashJoinBatchAccessor::inner_tuples, MAXALIGN, ParallelHashJoinBatchAccessor::ntuples, HashJoinTableData::parallel_state, PHJ_BUILD_HASHING_INNER, ParallelHashJoinBatchAccessor::preallocated, HashJoinTableData::shared, sts_puttuple(), and MinimalTupleData::t_len.

Referenced by MultiExecParallelHash().

1705 {
1706  bool shouldFree;
1707  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1708  dsa_pointer shared;
1709  int bucketno;
1710  int batchno;
1711 
1712 retry:
1713  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1714 
1715  if (batchno == 0)
1716  {
1717  HashJoinTuple hashTuple;
1718 
1719  /* Try to load it into memory. */
1722  hashTuple = ExecParallelHashTupleAlloc(hashtable,
1723  HJTUPLE_OVERHEAD + tuple->t_len,
1724  &shared);
1725  if (hashTuple == NULL)
1726  goto retry;
1727 
1728  /* Store the hash value in the HashJoinTuple header. */
1729  hashTuple->hashvalue = hashvalue;
1730  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1731 
1732  /* Push it onto the front of the bucket's list */
1733  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1734  hashTuple, shared);
1735  }
1736  else
1737  {
1738  size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
1739 
1740  Assert(batchno > 0);
1741 
1742  /* Try to preallocate space in the batch if necessary. */
1743  if (hashtable->batches[batchno].preallocated < tuple_size)
1744  {
1745  if (!ExecParallelHashTuplePrealloc(hashtable, batchno, tuple_size))
1746  goto retry;
1747  }
1748 
1749  Assert(hashtable->batches[batchno].preallocated >= tuple_size);
1750  hashtable->batches[batchno].preallocated -= tuple_size;
1751  sts_puttuple(hashtable->batches[batchno].inner_tuples, &hashvalue,
1752  tuple);
1753  }
1754  ++hashtable->batches[batchno].ntuples;
1755 
1756  if (shouldFree)
1757  heap_free_minimal_tuple(tuple);
1758 }
dsa_pointer_atomic * shared
Definition: hashjoin.h:299
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1692
union HashJoinTableData::@93 buckets
uint64 dsa_pointer
Definition: dsa.h:62
SharedTuplestoreAccessor * inner_tuples
Definition: hashjoin.h:208
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
Definition: nodeHash.c:1919
void heap_free_minimal_tuple(MinimalTuple mtup)
Definition: heaptuple.c:1427
static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
Definition: nodeHash.c:3340
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
#define HJTUPLE_OVERHEAD
Definition: hashjoin.h:79
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
#define Assert(condition)
Definition: c.h:804
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:265
#define MAXALIGN(LEN)
Definition: c.h:757
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
Definition: nodeHash.c:3260
#define PHJ_BUILD_HASHING_INNER
Definition: hashjoin.h:259
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
Definition: nodeHash.c:2807
uint32 hashvalue
Definition: hashjoin.h:75

◆ ExecParallelHashTableInsertCurrentBatch()

void ExecParallelHashTableInsertCurrentBatch ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1767 of file nodeHash.c.

References Assert, HashJoinTableData::buckets, HashJoinTableData::curbatch, ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecParallelHashPushTuple(), ExecParallelHashTupleAlloc(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, HashJoinTableData::shared, and MinimalTupleData::t_len.

Referenced by ExecParallelHashJoinNewBatch().

1770 {
1771  bool shouldFree;
1772  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1773  HashJoinTuple hashTuple;
1774  dsa_pointer shared;
1775  int batchno;
1776  int bucketno;
1777 
1778  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1779  Assert(batchno == hashtable->curbatch);
1780  hashTuple = ExecParallelHashTupleAlloc(hashtable,
1781  HJTUPLE_OVERHEAD + tuple->t_len,
1782  &shared);
1783  hashTuple->hashvalue = hashvalue;
1784  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1786  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1787  hashTuple, shared);
1788 
1789  if (shouldFree)
1790  heap_free_minimal_tuple(tuple);
1791 }
dsa_pointer_atomic * shared
Definition: hashjoin.h:299
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1692
union HashJoinTableData::@93 buckets
uint64 dsa_pointer
Definition: dsa.h:62
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
Definition: nodeHash.c:1919
void heap_free_minimal_tuple(MinimalTuple mtup)
Definition: heaptuple.c:1427
#define HJTUPLE_OVERHEAD
Definition: hashjoin.h:79
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
#define Assert(condition)
Definition: c.h:804
#define HeapTupleHeaderClearMatch(tup)
Definition: htup_details.h:525
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
Definition: nodeHash.c:3260
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
Definition: nodeHash.c:2807
uint32 hashvalue
Definition: hashjoin.h:75

◆ ExecParallelHashTableSetCurrentBatch()

void ExecParallelHashTableSetCurrentBatch ( HashJoinTable  hashtable,
int  batchno 
)

Definition at line 3278 of file nodeHash.c.

References HashJoinTableData::area, Assert, ParallelHashJoinBatchAccessor::at_least_one_chunk, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, HashJoinTableData::buckets, HashJoinTableData::curbatch, HashJoinTableData::current_chunk, HashJoinTableData::current_chunk_shared, dsa_get_address(), InvalidDsaPointer, HashJoinTableData::log2_nbuckets, my_log2(), ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, HashJoinTableData::parallel_state, ParallelHashJoinBatchAccessor::shared, and HashJoinTableData::shared.

Referenced by ExecParallelHashIncreaseNumBatches(), ExecParallelHashIncreaseNumBuckets(), ExecParallelHashJoinNewBatch(), and MultiExecParallelHash().

3279 {
3280  Assert(hashtable->batches[batchno].shared->buckets != InvalidDsaPointer);
3281 
3282  hashtable->curbatch = batchno;
3283  hashtable->buckets.shared = (dsa_pointer_atomic *)
3284  dsa_get_address(hashtable->area,
3285  hashtable->batches[batchno].shared->buckets);
3286  hashtable->nbuckets = hashtable->parallel_state->nbuckets;
3287  hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
3288  hashtable->current_chunk = NULL;
3290  hashtable->batches[batchno].at_least_one_chunk = false;
3291 }
dsa_pointer current_chunk_shared
Definition: hashjoin.h:359
dsa_pointer_atomic * shared
Definition: hashjoin.h:299
#define InvalidDsaPointer
Definition: dsa.h:78
union HashJoinTableData::@93 buckets
dsa_area * area
Definition: hashjoin.h:356
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:932
int my_log2(long num)
Definition: dynahash.c:1765
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
#define Assert(condition)
Definition: c.h:804
ParallelHashJoinBatch * shared
Definition: hashjoin.h:197
HashMemoryChunk current_chunk
Definition: hashjoin.h:355
dsa_pointer buckets
Definition: hashjoin.h:153

◆ ExecParallelScanHashBucket()

bool ExecParallelScanHashBucket ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 2012 of file nodeHash.c.

References ExprContext::ecxt_innertuple, ExecParallelHashFirstTuple(), ExecParallelHashNextTuple(), ExecQualAndReset(), ExecStoreMinimalTuple(), HashJoinState::hashclauses, HashJoinTupleData::hashvalue, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurHashValue, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, and HJTUPLE_MINTUPLE.

Referenced by ExecHashJoinImpl().

2014 {
2015  ExprState *hjclauses = hjstate->hashclauses;
2016  HashJoinTable hashtable = hjstate->hj_HashTable;
2017  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2018  uint32 hashvalue = hjstate->hj_CurHashValue;
2019 
2020  /*
2021  * hj_CurTuple is the address of the tuple last returned from the current
2022  * bucket, or NULL if it's time to start scanning a new bucket.
2023  */
2024  if (hashTuple != NULL)
2025  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2026  else
2027  hashTuple = ExecParallelHashFirstTuple(hashtable,
2028  hjstate->hj_CurBucketNo);
2029 
2030  while (hashTuple != NULL)
2031  {
2032  if (hashTuple->hashvalue == hashvalue)
2033  {
2034  TupleTableSlot *inntuple;
2035 
2036  /* insert hashtable's tuple into exec slot so ExecQual sees it */
2037  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2038  hjstate->hj_HashTupleSlot,
2039  false); /* do not pfree */
2040  econtext->ecxt_innertuple = inntuple;
2041 
2042  if (ExecQualAndReset(hjclauses, econtext))
2043  {
2044  hjstate->hj_CurTuple = hashTuple;
2045  return true;
2046  }
2047  }
2048 
2049  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2050  }
2051 
2052  /*
2053  * no match
2054  */
2055  return false;
2056 }
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1446
uint32 hj_CurHashValue
Definition: execnodes.h:2012
HashJoinTuple hj_CurTuple
Definition: execnodes.h:2015
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:228
unsigned int uint32
Definition: c.h:441
int hj_CurBucketNo
Definition: execnodes.h:2013
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
Definition: executor.h:423
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
static HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable table, int bucketno)
Definition: nodeHash.c:3230
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:2017
static HashJoinTuple ExecParallelHashNextTuple(HashJoinTable table, HashJoinTuple tuple)
Definition: nodeHash.c:3246
HashJoinTable hj_HashTable
Definition: execnodes.h:2011
uint32 hashvalue
Definition: hashjoin.h:75
ExprState * hashclauses
Definition: execnodes.h:2007

◆ ExecPrepHashTableForUnmatched()

void ExecPrepHashTableForUnmatched ( HashJoinState hjstate)

Definition at line 2063 of file nodeHash.c.

References HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurSkewBucketNo, and HashJoinState::hj_CurTuple.

Referenced by ExecHashJoinImpl().

2064 {
2065  /*----------
2066  * During this scan we use the HashJoinState fields as follows:
2067  *
2068  * hj_CurBucketNo: next regular bucket to scan
2069  * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
2070  * hj_CurTuple: last tuple returned, or NULL to start next bucket
2071  *----------
2072  */
2073  hjstate->hj_CurBucketNo = 0;
2074  hjstate->hj_CurSkewBucketNo = 0;
2075  hjstate->hj_CurTuple = NULL;
2076 }
int hj_CurSkewBucketNo
Definition: execnodes.h:2014
HashJoinTuple hj_CurTuple
Definition: execnodes.h:2015
int hj_CurBucketNo
Definition: execnodes.h:2013

◆ ExecReScanHash()

void ExecReScanHash ( HashState node)

Definition at line 2213 of file nodeHash.c.

References PlanState::chgParam, ExecReScan(), PlanState::lefttree, and HashState::ps.

Referenced by ExecReScan().

2214 {
2215  /*
2216  * if chgParam of subnode is not null then plan will be re-scanned by
2217  * first ExecProcNode.
2218  */
2219  if (node->ps.lefttree->chgParam == NULL)
2220  ExecReScan(node->ps.lefttree);
2221 }
void ExecReScan(PlanState *node)
Definition: execAmi.c:78
struct PlanState * lefttree
Definition: execnodes.h:988
PlanState ps
Definition: execnodes.h:2536
Bitmapset * chgParam
Definition: execnodes.h:998

◆ ExecScanHashBucket()

bool ExecScanHashBucket ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 1951 of file nodeHash.c.

References HashJoinTableData::buckets, ExprContext::ecxt_innertuple, ExecQualAndReset(), ExecStoreMinimalTuple(), HashJoinState::hashclauses, HashJoinTupleData::hashvalue, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurHashValue, HashJoinState::hj_CurSkewBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, INVALID_SKEW_BUCKET_NO, HashJoinTupleData::next, HashJoinTableData::skewBucket, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

1953 {
1954  ExprState *hjclauses = hjstate->hashclauses;
1955  HashJoinTable hashtable = hjstate->hj_HashTable;
1956  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
1957  uint32 hashvalue = hjstate->hj_CurHashValue;
1958 
1959  /*
1960  * hj_CurTuple is the address of the tuple last returned from the current
1961  * bucket, or NULL if it's time to start scanning a new bucket.
1962  *
1963  * If the tuple hashed to a skew bucket then scan the skew bucket
1964  * otherwise scan the standard hashtable bucket.
1965  */
1966  if (hashTuple != NULL)
1967  hashTuple = hashTuple->next.unshared;
1968  else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
1969  hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
1970  else
1971  hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
1972 
1973  while (hashTuple != NULL)
1974  {
1975  if (hashTuple->hashvalue == hashvalue)
1976  {
1977  TupleTableSlot *inntuple;
1978 
1979  /* insert hashtable's tuple into exec slot so ExecQual sees it */
1980  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
1981  hjstate->hj_HashTupleSlot,
1982  false); /* do not pfree */
1983  econtext->ecxt_innertuple = inntuple;
1984 
1985  if (ExecQualAndReset(hjclauses, econtext))
1986  {
1987  hjstate->hj_CurTuple = hashTuple;
1988  return true;
1989  }
1990  }
1991 
1992  hashTuple = hashTuple->next.unshared;
1993  }
1994 
1995  /*
1996  * no match
1997  */
1998  return false;
1999 }
#define INVALID_SKEW_BUCKET_NO
Definition: hashjoin.h:109
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1446
union HashJoinTableData::@93 buckets
uint32 hj_CurHashValue
Definition: execnodes.h:2012
int hj_CurSkewBucketNo
Definition: execnodes.h:2014
union HashJoinTupleData::@91 next
struct HashJoinTupleData * unshared
Definition: hashjoin.h:72
HashJoinTuple hj_CurTuple
Definition: execnodes.h:2015
HashJoinTuple tuples
Definition: hashjoin.h:105
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:228
unsigned int uint32
Definition: c.h:441
int hj_CurBucketNo
Definition: execnodes.h:2013
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
Definition: executor.h:423
HashSkewBucket ** skewBucket
Definition: hashjoin.h:305
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:2017
HashJoinTable hj_HashTable
Definition: execnodes.h:2011
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:297
uint32 hashvalue
Definition: hashjoin.h:75
ExprState * hashclauses
Definition: execnodes.h:2007

◆ ExecScanHashTableForUnmatched()

bool ExecScanHashTableForUnmatched ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 2087 of file nodeHash.c.

References HashJoinTableData::buckets, CHECK_FOR_INTERRUPTS, ExprContext::ecxt_innertuple, ExecStoreMinimalTuple(), HeapTupleHeaderHasMatch, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurSkewBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, HashJoinTableData::nbuckets, HashJoinTupleData::next, HashJoinTableData::nSkewBuckets, ResetExprContext, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketNums, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

2088 {
2089  HashJoinTable hashtable = hjstate->hj_HashTable;
2090  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2091 
2092  for (;;)
2093  {
2094  /*
2095  * hj_CurTuple is the address of the tuple last returned from the
2096  * current bucket, or NULL if it's time to start scanning a new
2097  * bucket.
2098  */
2099  if (hashTuple != NULL)
2100  hashTuple = hashTuple->next.unshared;
2101  else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
2102  {
2103  hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
2104  hjstate->hj_CurBucketNo++;
2105  }
2106  else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
2107  {
2108  int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
2109 
2110  hashTuple = hashtable->skewBucket[j]->tuples;
2111  hjstate->hj_CurSkewBucketNo++;
2112  }
2113  else
2114  break; /* finished all buckets */
2115 
2116  while (hashTuple != NULL)
2117  {
2118  if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
2119  {
2120  TupleTableSlot *inntuple;
2121 
2122  /* insert hashtable's tuple into exec slot */
2123  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2124  hjstate->hj_HashTupleSlot,
2125  false); /* do not pfree */
2126  econtext->ecxt_innertuple = inntuple;
2127 
2128  /*
2129  * Reset temp memory each time; although this function doesn't
2130  * do any qual eval, the caller will, so let's keep it
2131  * parallel to ExecScanHashBucket.
2132  */
2133  ResetExprContext(econtext);
2134 
2135  hjstate->hj_CurTuple = hashTuple;
2136  return true;
2137  }
2138 
2139  hashTuple = hashTuple->next.unshared;
2140  }
2141 
2142  /* allow this loop to be cancellable */
2144  }
2145 
2146  /*
2147  * no more unmatched tuples
2148  */
2149  return false;
2150 }
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1446
union HashJoinTableData::@93 buckets
int * skewBucketNums
Definition: hashjoin.h:308
int hj_CurSkewBucketNo
Definition: execnodes.h:2014
union HashJoinTupleData::@91 next
struct HashJoinTupleData * unshared
Definition: hashjoin.h:72
HashJoinTuple hj_CurTuple
Definition: execnodes.h:2015
HashJoinTuple tuples
Definition: hashjoin.h:105
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:228
int hj_CurBucketNo
Definition: execnodes.h:2013
HashSkewBucket ** skewBucket
Definition: hashjoin.h:305
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
#define HeapTupleHeaderHasMatch(tup)
Definition: htup_details.h:515
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:2017
HashJoinTable hj_HashTable
Definition: execnodes.h:2011
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:297
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
#define ResetExprContext(econtext)
Definition: executor.h:527

◆ ExecShutdownHash()

void ExecShutdownHash ( HashState node)

Definition at line 2661 of file nodeHash.c.

References ExecHashAccumInstrumentation(), HashState::hashtable, HashState::hinstrument, PlanState::instrument, palloc0(), and HashState::ps.

Referenced by ExecShutdownNode().

2662 {
2663  /* Allocate save space if EXPLAIN'ing and we didn't do so already */
2664  if (node->ps.instrument && !node->hinstrument)
2665  node->hinstrument = (HashInstrumentation *)
2666  palloc0(sizeof(HashInstrumentation));
2667  /* Now accumulate data for the current (final) hash table */
2668  if (node->hinstrument && node->hashtable)
2670 }
Instrumentation * instrument
Definition: execnodes.h:976
HashJoinTable hashtable
Definition: execnodes.h:2537
void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
Definition: nodeHash.c:2708
PlanState ps
Definition: execnodes.h:2536
HashInstrumentation * hinstrument
Definition: execnodes.h:2553
void * palloc0(Size size)
Definition: mcxt.c:1093

◆ MultiExecHash()

Node* MultiExecHash ( HashState node)

Definition at line 106 of file nodeHash.c.

References HashState::hashtable, InstrStartNode(), InstrStopNode(), PlanState::instrument, MultiExecParallelHash(), MultiExecPrivateHash(), HashState::parallel_state, HashJoinTableData::partialTuples, and HashState::ps.

Referenced by MultiExecProcNode().

107 {
108  /* must provide our own instrumentation support */
109  if (node->ps.instrument)
111 
112  if (node->parallel_state != NULL)
113  MultiExecParallelHash(node);
114  else
115  MultiExecPrivateHash(node);
116 
117  /* must provide our own instrumentation support */
118  if (node->ps.instrument)
120 
121  /*
122  * We do not return the hash table directly because it's not a subtype of
123  * Node, and so would violate the MultiExecProcNode API. Instead, our
124  * parent Hashjoin node is expected to know how to fish it out of our node
125  * state. Ugly but not really worth cleaning up, since Hashjoin knows
126  * quite a bit more about Hash besides that.
127  */
128  return NULL;
129 }
struct ParallelHashJoinState * parallel_state
Definition: execnodes.h:2556
void InstrStopNode(Instrumentation *instr, double nTuples)
Definition: instrument.c:84
Instrumentation * instrument
Definition: execnodes.h:976
HashJoinTable hashtable
Definition: execnodes.h:2537
static void MultiExecPrivateHash(HashState *node)
Definition: nodeHash.c:139
double partialTuples
Definition: hashjoin.h:319
void InstrStartNode(Instrumentation *instr)
Definition: instrument.c:68
PlanState ps
Definition: execnodes.h:2536
static void MultiExecParallelHash(HashState *node)
Definition: nodeHash.c:215