PostgreSQL Source Code  git master
nodeHash.h File Reference
#include "access/parallel.h"
#include "nodes/execnodes.h"
Include dependency graph for nodeHash.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

HashStateExecInitHash (Hash *node, EState *estate, int eflags)
 
NodeMultiExecHash (HashState *node)
 
void ExecEndHash (HashState *node)
 
void ExecReScanHash (HashState *node)
 
HashJoinTable ExecHashTableCreate (HashState *state, List *hashOperators, List *hashCollations, bool keepNulls)
 
void ExecParallelHashTableAlloc (HashJoinTable hashtable, int batchno)
 
void ExecHashTableDestroy (HashJoinTable hashtable)
 
void ExecHashTableDetach (HashJoinTable hashtable)
 
void ExecHashTableDetachBatch (HashJoinTable hashtable)
 
void ExecParallelHashTableSetCurrentBatch (HashJoinTable hashtable, int batchno)
 
void ExecHashTableInsert (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
void ExecParallelHashTableInsert (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
void ExecParallelHashTableInsertCurrentBatch (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
bool ExecHashGetHashValue (HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, bool outer_tuple, bool keep_nulls, uint32 *hashvalue)
 
void ExecHashGetBucketAndBatch (HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
 
bool ExecScanHashBucket (HashJoinState *hjstate, ExprContext *econtext)
 
bool ExecParallelScanHashBucket (HashJoinState *hjstate, ExprContext *econtext)
 
void ExecPrepHashTableForUnmatched (HashJoinState *hjstate)
 
bool ExecScanHashTableForUnmatched (HashJoinState *hjstate, ExprContext *econtext)
 
void ExecHashTableReset (HashJoinTable hashtable)
 
void ExecHashTableResetMatchFlags (HashJoinTable hashtable)
 
void ExecChooseHashTableSize (double ntuples, int tupwidth, bool useskew, bool try_combined_work_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
 
int ExecHashGetSkewBucket (HashJoinTable hashtable, uint32 hashvalue)
 
void ExecHashEstimate (HashState *node, ParallelContext *pcxt)
 
void ExecHashInitializeDSM (HashState *node, ParallelContext *pcxt)
 
void ExecHashInitializeWorker (HashState *node, ParallelWorkerContext *pwcxt)
 
void ExecHashRetrieveInstrumentation (HashState *node)
 
void ExecShutdownHash (HashState *node)
 
void ExecHashAccumInstrumentation (HashInstrumentation *instrument, HashJoinTable hashtable)
 

Function Documentation

◆ ExecChooseHashTableSize()

void ExecChooseHashTableSize ( double  ntuples,
int  tupwidth,
bool  useskew,
bool  try_combined_work_mem,
int  parallel_workers,
size_t *  space_allowed,
int *  numbuckets,
int *  numbatches,
int *  num_skew_mcvs 
)

Definition at line 667 of file nodeHash.c.

References Assert, ExecChooseHashTableSize(), HJTUPLE_OVERHEAD, Max, MAXALIGN, MaxAllocSize, Min, my_log2(), NTUP_PER_BUCKET, pg_nextpower2_32(), SizeofMinimalTupleHeader, SKEW_BUCKET_OVERHEAD, SKEW_WORK_MEM_PERCENT, and work_mem.

Referenced by ExecChooseHashTableSize(), ExecHashTableCreate(), and initial_cost_hashjoin().

674 {
675  int tupsize;
676  double inner_rel_bytes;
677  long bucket_bytes;
678  long hash_table_bytes;
679  long skew_table_bytes;
680  long max_pointers;
681  long mppow2;
682  int nbatch = 1;
683  int nbuckets;
684  double dbuckets;
685 
686  /* Force a plausible relation size if no info */
687  if (ntuples <= 0.0)
688  ntuples = 1000.0;
689 
690  /*
691  * Estimate tupsize based on footprint of tuple in hashtable... note this
692  * does not allow for any palloc overhead. The manipulations of spaceUsed
693  * don't count palloc overhead either.
694  */
695  tupsize = HJTUPLE_OVERHEAD +
697  MAXALIGN(tupwidth);
698  inner_rel_bytes = ntuples * tupsize;
699 
700  /*
701  * Target in-memory hashtable size is work_mem kilobytes.
702  */
703  hash_table_bytes = work_mem * 1024L;
704 
705  /*
706  * Parallel Hash tries to use the combined work_mem of all workers to
707  * avoid the need to batch. If that won't work, it falls back to work_mem
708  * per worker and tries to process batches in parallel.
709  */
710  if (try_combined_work_mem)
711  hash_table_bytes += hash_table_bytes * parallel_workers;
712 
713  *space_allowed = hash_table_bytes;
714 
715  /*
716  * If skew optimization is possible, estimate the number of skew buckets
717  * that will fit in the memory allowed, and decrement the assumed space
718  * available for the main hash table accordingly.
719  *
720  * We make the optimistic assumption that each skew bucket will contain
721  * one inner-relation tuple. If that turns out to be low, we will recover
722  * at runtime by reducing the number of skew buckets.
723  *
724  * hashtable->skewBucket will have up to 8 times as many HashSkewBucket
725  * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash
726  * will round up to the next power of 2 and then multiply by 4 to reduce
727  * collisions.
728  */
729  if (useskew)
730  {
731  skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100;
732 
733  /*----------
734  * Divisor is:
735  * size of a hash tuple +
736  * worst-case size of skewBucket[] per MCV +
737  * size of skewBucketNums[] entry +
738  * size of skew bucket struct itself
739  *----------
740  */
741  *num_skew_mcvs = skew_table_bytes / (tupsize +
742  (8 * sizeof(HashSkewBucket *)) +
743  sizeof(int) +
745  if (*num_skew_mcvs > 0)
746  hash_table_bytes -= skew_table_bytes;
747  }
748  else
749  *num_skew_mcvs = 0;
750 
751  /*
752  * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when
753  * memory is filled, assuming a single batch; but limit the value so that
754  * the pointer arrays we'll try to allocate do not exceed work_mem nor
755  * MaxAllocSize.
756  *
757  * Note that both nbuckets and nbatch must be powers of 2 to make
758  * ExecHashGetBucketAndBatch fast.
759  */
760  max_pointers = *space_allowed / sizeof(HashJoinTuple);
761  max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));
762  /* If max_pointers isn't a power of 2, must round it down to one */
763  mppow2 = 1L << my_log2(max_pointers);
764  if (max_pointers != mppow2)
765  max_pointers = mppow2 / 2;
766 
767  /* Also ensure we avoid integer overflow in nbatch and nbuckets */
768  /* (this step is redundant given the current value of MaxAllocSize) */
769  max_pointers = Min(max_pointers, INT_MAX / 2);
770 
771  dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
772  dbuckets = Min(dbuckets, max_pointers);
773  nbuckets = (int) dbuckets;
774  /* don't let nbuckets be really small, though ... */
775  nbuckets = Max(nbuckets, 1024);
776  /* ... and force it to be a power of 2. */
777  nbuckets = 1 << my_log2(nbuckets);
778 
779  /*
780  * If there's not enough space to store the projected number of tuples and
781  * the required bucket headers, we will need multiple batches.
782  */
783  bucket_bytes = sizeof(HashJoinTuple) * nbuckets;
784  if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
785  {
786  /* We'll need multiple batches */
787  long lbuckets;
788  double dbatch;
789  int minbatch;
790  long bucket_size;
791 
792  /*
793  * If Parallel Hash with combined work_mem would still need multiple
794  * batches, we'll have to fall back to regular work_mem budget.
795  */
796  if (try_combined_work_mem)
797  {
798  ExecChooseHashTableSize(ntuples, tupwidth, useskew,
799  false, parallel_workers,
800  space_allowed,
801  numbuckets,
802  numbatches,
803  num_skew_mcvs);
804  return;
805  }
806 
807  /*
808  * Estimate the number of buckets we'll want to have when work_mem is
809  * entirely full. Each bucket will contain a bucket pointer plus
810  * NTUP_PER_BUCKET tuples, whose projected size already includes
811  * overhead for the hash code, pointer to the next tuple, etc.
812  */
813  bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple));
814  lbuckets = 1L << my_log2(hash_table_bytes / bucket_size);
815  lbuckets = Min(lbuckets, max_pointers);
816  nbuckets = (int) lbuckets;
817  nbuckets = 1 << my_log2(nbuckets);
818  bucket_bytes = nbuckets * sizeof(HashJoinTuple);
819 
820  /*
821  * Buckets are simple pointers to hashjoin tuples, while tupsize
822  * includes the pointer, hash code, and MinimalTupleData. So buckets
823  * should never really exceed 25% of work_mem (even for
824  * NTUP_PER_BUCKET=1); except maybe for work_mem values that are not
825  * 2^N bytes, where we might get more because of doubling. So let's
826  * look for 50% here.
827  */
828  Assert(bucket_bytes <= hash_table_bytes / 2);
829 
830  /* Calculate required number of batches. */
831  dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
832  dbatch = Min(dbatch, max_pointers);
833  minbatch = (int) dbatch;
834  nbatch = pg_nextpower2_32(Max(2, minbatch));
835  }
836 
837  Assert(nbuckets > 0);
838  Assert(nbatch > 0);
839 
840  *numbuckets = nbuckets;
841  *numbatches = nbatch;
842 }
#define SKEW_BUCKET_OVERHEAD
Definition: hashjoin.h:108
#define Min(x, y)
Definition: c.h:920
struct HashJoinTupleData * HashJoinTuple
Definition: execnodes.h:1937
static uint32 pg_nextpower2_32(uint32 num)
Definition: pg_bitutils.h:146
int my_log2(long num)
Definition: dynahash.c:1720
#define MaxAllocSize
Definition: memutils.h:40
#define SizeofMinimalTupleHeader
Definition: htup_details.h:649
#define NTUP_PER_BUCKET
Definition: nodeHash.c:664
int work_mem
Definition: globals.c:121
#define HJTUPLE_OVERHEAD
Definition: hashjoin.h:79
#define Max(x, y)
Definition: c.h:914
#define Assert(condition)
Definition: c.h:738
#define MAXALIGN(LEN)
Definition: c.h:691
#define SKEW_WORK_MEM_PERCENT
Definition: hashjoin.h:110
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_work_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
Definition: nodeHash.c:667

◆ ExecEndHash()

void ExecEndHash ( HashState node)

Definition at line 406 of file nodeHash.c.

References ExecEndNode(), ExecFreeExprContext(), outerPlan, outerPlanState, and HashState::ps.

Referenced by ExecEndNode().

407 {
409 
410  /*
411  * free exprcontext
412  */
413  ExecFreeExprContext(&node->ps);
414 
415  /*
416  * shut down the subplan
417  */
418  outerPlan = outerPlanState(node);
419  ExecEndNode(outerPlan);
420 }
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:543
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:655
#define outerPlanState(node)
Definition: execnodes.h:1039
PlanState ps
Definition: execnodes.h:2380
#define outerPlan(node)
Definition: plannodes.h:172

◆ ExecHashAccumInstrumentation()

void ExecHashAccumInstrumentation ( HashInstrumentation instrument,
HashJoinTable  hashtable 
)

Definition at line 2687 of file nodeHash.c.

References Max, HashJoinTableData::nbatch, HashInstrumentation::nbatch, HashJoinTableData::nbatch_original, HashInstrumentation::nbatch_original, HashJoinTableData::nbuckets, HashInstrumentation::nbuckets, HashJoinTableData::nbuckets_original, HashInstrumentation::nbuckets_original, HashInstrumentation::space_peak, and HashJoinTableData::spacePeak.

Referenced by ExecReScanHashJoin(), and ExecShutdownHash().

2689 {
2690  instrument->nbuckets = Max(instrument->nbuckets,
2691  hashtable->nbuckets);
2692  instrument->nbuckets_original = Max(instrument->nbuckets_original,
2693  hashtable->nbuckets_original);
2694  instrument->nbatch = Max(instrument->nbatch,
2695  hashtable->nbatch);
2696  instrument->nbatch_original = Max(instrument->nbatch_original,
2697  hashtable->nbatch_original);
2698  instrument->space_peak = Max(instrument->space_peak,
2699  hashtable->spacePeak);
2700 }
#define Max(x, y)
Definition: c.h:914

◆ ExecHashEstimate()

void ExecHashEstimate ( HashState node,
ParallelContext pcxt 
)

Definition at line 2570 of file nodeHash.c.

References add_size(), ParallelContext::estimator, PlanState::instrument, mul_size(), ParallelContext::nworkers, offsetof, HashState::ps, shm_toc_estimate_chunk, and shm_toc_estimate_keys.

Referenced by ExecParallelEstimate().

2571 {
2572  size_t size;
2573 
2574  /* don't need this if not instrumenting or no workers */
2575  if (!node->ps.instrument || pcxt->nworkers == 0)
2576  return;
2577 
2578  size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation));
2579  size = add_size(size, offsetof(SharedHashInfo, hinstrument));
2580  shm_toc_estimate_chunk(&pcxt->estimator, size);
2581  shm_toc_estimate_keys(&pcxt->estimator, 1);
2582 }
Instrumentation * instrument
Definition: execnodes.h:955
shm_toc_estimator estimator
Definition: parallel.h:42
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
PlanState ps
Definition: execnodes.h:2380
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
#define offsetof(type, field)
Definition: c.h:661

◆ ExecHashGetBucketAndBatch()

void ExecHashGetBucketAndBatch ( HashJoinTable  hashtable,
uint32  hashvalue,
int *  bucketno,
int *  batchno 
)

Definition at line 1898 of file nodeHash.c.

References HashJoinTableData::log2_nbuckets, HashJoinTableData::nbatch, HashJoinTableData::nbuckets, and pg_rotate_right32().

Referenced by ExecHashIncreaseNumBatches(), ExecHashIncreaseNumBuckets(), ExecHashJoinImpl(), ExecHashRemoveNextSkewBucket(), ExecHashTableInsert(), ExecParallelHashIncreaseNumBuckets(), ExecParallelHashJoinPartitionOuter(), ExecParallelHashRepartitionFirst(), ExecParallelHashRepartitionRest(), ExecParallelHashTableInsert(), and ExecParallelHashTableInsertCurrentBatch().

1902 {
1903  uint32 nbuckets = (uint32) hashtable->nbuckets;
1904  uint32 nbatch = (uint32) hashtable->nbatch;
1905 
1906  if (nbatch > 1)
1907  {
1908  *bucketno = hashvalue & (nbuckets - 1);
1909  *batchno = pg_rotate_right32(hashvalue,
1910  hashtable->log2_nbuckets) & (nbatch - 1);
1911  }
1912  else
1913  {
1914  *bucketno = hashvalue & (nbuckets - 1);
1915  *batchno = 0;
1916  }
1917 }
static uint32 pg_rotate_right32(uint32 word, int n)
Definition: pg_bitutils.h:221
unsigned int uint32
Definition: c.h:367

◆ ExecHashGetHashValue()

bool ExecHashGetHashValue ( HashJoinTable  hashtable,
ExprContext econtext,
List hashkeys,
bool  outer_tuple,
bool  keep_nulls,
uint32 hashvalue 
)

Definition at line 1790 of file nodeHash.c.

References HashJoinTableData::collations, DatumGetUInt32, ExprContext::ecxt_per_tuple_memory, ExecEvalExpr(), FunctionCall1Coll(), HashJoinTableData::hashStrict, i, HashJoinTableData::inner_hashfunctions, lfirst, MemoryContextSwitchTo(), HashJoinTableData::outer_hashfunctions, and ResetExprContext.

Referenced by ExecHashJoinOuterGetTuple(), ExecParallelHashJoinOuterGetTuple(), ExecParallelHashJoinPartitionOuter(), MultiExecParallelHash(), and MultiExecPrivateHash().

1796 {
1797  uint32 hashkey = 0;
1798  FmgrInfo *hashfunctions;
1799  ListCell *hk;
1800  int i = 0;
1801  MemoryContext oldContext;
1802 
1803  /*
1804  * We reset the eval context each time to reclaim any memory leaked in the
1805  * hashkey expressions.
1806  */
1807  ResetExprContext(econtext);
1808 
1809  oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
1810 
1811  if (outer_tuple)
1812  hashfunctions = hashtable->outer_hashfunctions;
1813  else
1814  hashfunctions = hashtable->inner_hashfunctions;
1815 
1816  foreach(hk, hashkeys)
1817  {
1818  ExprState *keyexpr = (ExprState *) lfirst(hk);
1819  Datum keyval;
1820  bool isNull;
1821 
1822  /* rotate hashkey left 1 bit at each step */
1823  hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
1824 
1825  /*
1826  * Get the join attribute value of the tuple
1827  */
1828  keyval = ExecEvalExpr(keyexpr, econtext, &isNull);
1829 
1830  /*
1831  * If the attribute is NULL, and the join operator is strict, then
1832  * this tuple cannot pass the join qual so we can reject it
1833  * immediately (unless we're scanning the outside of an outer join, in
1834  * which case we must not reject it). Otherwise we act like the
1835  * hashcode of NULL is zero (this will support operators that act like
1836  * IS NOT DISTINCT, though not any more-random behavior). We treat
1837  * the hash support function as strict even if the operator is not.
1838  *
1839  * Note: currently, all hashjoinable operators must be strict since
1840  * the hash index AM assumes that. However, it takes so little extra
1841  * code here to allow non-strict that we may as well do it.
1842  */
1843  if (isNull)
1844  {
1845  if (hashtable->hashStrict[i] && !keep_nulls)
1846  {
1847  MemoryContextSwitchTo(oldContext);
1848  return false; /* cannot match */
1849  }
1850  /* else, leave hashkey unmodified, equivalent to hashcode 0 */
1851  }
1852  else
1853  {
1854  /* Compute the hash function */
1855  uint32 hkey;
1856 
1857  hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i], hashtable->collations[i], keyval));
1858  hashkey ^= hkey;
1859  }
1860 
1861  i++;
1862  }
1863 
1864  MemoryContextSwitchTo(oldContext);
1865 
1866  *hashvalue = hashkey;
1867  return true;
1868 }
#define DatumGetUInt32(X)
Definition: postgres.h:486
Definition: fmgr.h:56
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:234
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
FmgrInfo * inner_hashfunctions
Definition: hashjoin.h:338
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:290
unsigned int uint32
Definition: c.h:367
FmgrInfo * outer_hashfunctions
Definition: hashjoin.h:337
uintptr_t Datum
Definition: postgres.h:367
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
Definition: fmgr.c:1132
#define lfirst(lc)
Definition: pg_list.h:190
int i
bool * hashStrict
Definition: hashjoin.h:339
#define ResetExprContext(econtext)
Definition: executor.h:501

◆ ExecHashGetSkewBucket()

int ExecHashGetSkewBucket ( HashJoinTable  hashtable,
uint32  hashvalue 
)

Definition at line 2365 of file nodeHash.c.

References HashSkewBucket::hashvalue, INVALID_SKEW_BUCKET_NO, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketLen, and HashJoinTableData::skewEnabled.

Referenced by ExecHashJoinImpl(), and MultiExecPrivateHash().

2366 {
2367  int bucket;
2368 
2369  /*
2370  * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization (in
2371  * particular, this happens after the initial batch is done).
2372  */
2373  if (!hashtable->skewEnabled)
2374  return INVALID_SKEW_BUCKET_NO;
2375 
2376  /*
2377  * Since skewBucketLen is a power of 2, we can do a modulo by ANDing.
2378  */
2379  bucket = hashvalue & (hashtable->skewBucketLen - 1);
2380 
2381  /*
2382  * While we have not hit a hole in the hashtable and have not hit the
2383  * desired bucket, we have collided with some other hash value, so try the
2384  * next bucket location.
2385  */
2386  while (hashtable->skewBucket[bucket] != NULL &&
2387  hashtable->skewBucket[bucket]->hashvalue != hashvalue)
2388  bucket = (bucket + 1) & (hashtable->skewBucketLen - 1);
2389 
2390  /*
2391  * Found the desired bucket?
2392  */
2393  if (hashtable->skewBucket[bucket] != NULL)
2394  return bucket;
2395 
2396  /*
2397  * There must not be any hashtable entry for this hash value.
2398  */
2399  return INVALID_SKEW_BUCKET_NO;
2400 }
#define INVALID_SKEW_BUCKET_NO
Definition: hashjoin.h:109
HashSkewBucket ** skewBucket
Definition: hashjoin.h:305
uint32 hashvalue
Definition: hashjoin.h:104

◆ ExecHashInitializeDSM()

void ExecHashInitializeDSM ( HashState node,
ParallelContext pcxt 
)

Definition at line 2589 of file nodeHash.c.

References PlanState::instrument, SharedHashInfo::num_workers, ParallelContext::nworkers, offsetof, PlanState::plan, Plan::plan_node_id, HashState::ps, HashState::shared_info, shm_toc_allocate(), shm_toc_insert(), and ParallelContext::toc.

Referenced by ExecParallelInitializeDSM().

2590 {
2591  size_t size;
2592 
2593  /* don't need this if not instrumenting or no workers */
2594  if (!node->ps.instrument || pcxt->nworkers == 0)
2595  return;
2596 
2597  size = offsetof(SharedHashInfo, hinstrument) +
2598  pcxt->nworkers * sizeof(HashInstrumentation);
2599  node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size);
2600 
2601  /* Each per-worker area must start out as zeroes. */
2602  memset(node->shared_info, 0, size);
2603 
2604  node->shared_info->num_workers = pcxt->nworkers;
2605  shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id,
2606  node->shared_info);
2607 }
Instrumentation * instrument
Definition: execnodes.h:955
struct HashInstrumentation HashInstrumentation
int plan_node_id
Definition: plannodes.h:141
SharedHashInfo * shared_info
Definition: execnodes.h:2390
PlanState ps
Definition: execnodes.h:2380
Plan * plan
Definition: execnodes.h:945
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
#define offsetof(type, field)
Definition: c.h:661
shm_toc * toc
Definition: parallel.h:45

◆ ExecHashInitializeWorker()

void ExecHashInitializeWorker ( HashState node,
ParallelWorkerContext pwcxt 
)

Definition at line 2614 of file nodeHash.c.

References SharedHashInfo::hinstrument, HashState::hinstrument, PlanState::instrument, ParallelWorkerNumber, PlanState::plan, Plan::plan_node_id, HashState::ps, shm_toc_lookup(), and ParallelWorkerContext::toc.

Referenced by ExecParallelInitializeWorker().

2615 {
2616  SharedHashInfo *shared_info;
2617 
2618  /* don't need this if not instrumenting */
2619  if (!node->ps.instrument)
2620  return;
2621 
2622  /*
2623  * Find our entry in the shared area, and set up a pointer to it so that
2624  * we'll accumulate stats there when shutting down or rebuilding the hash
2625  * table.
2626  */
2627  shared_info = (SharedHashInfo *)
2628  shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
2629  node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber];
2630 }
Instrumentation * instrument
Definition: execnodes.h:955
int plan_node_id
Definition: plannodes.h:141
int ParallelWorkerNumber
Definition: parallel.c:112
PlanState ps
Definition: execnodes.h:2380
HashInstrumentation * hinstrument
Definition: execnodes.h:2397
HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER]
Definition: execnodes.h:2371
Plan * plan
Definition: execnodes.h:945
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232

◆ ExecHashRetrieveInstrumentation()

void ExecHashRetrieveInstrumentation ( HashState node)

Definition at line 2656 of file nodeHash.c.

References SharedHashInfo::num_workers, offsetof, palloc(), and HashState::shared_info.

Referenced by ExecParallelRetrieveInstrumentation().

2657 {
2658  SharedHashInfo *shared_info = node->shared_info;
2659  size_t size;
2660 
2661  if (shared_info == NULL)
2662  return;
2663 
2664  /* Replace node->shared_info with a copy in backend-local memory. */
2665  size = offsetof(SharedHashInfo, hinstrument) +
2666  shared_info->num_workers * sizeof(HashInstrumentation);
2667  node->shared_info = palloc(size);
2668  memcpy(node->shared_info, shared_info, size);
2669 }
struct HashInstrumentation HashInstrumentation
SharedHashInfo * shared_info
Definition: execnodes.h:2390
void * palloc(Size size)
Definition: mcxt.c:949
#define offsetof(type, field)
Definition: c.h:661

◆ ExecHashTableCreate()

HashJoinTable ExecHashTableCreate ( HashState state,
List hashOperators,
List hashCollations,
bool  keepNulls 
)

Definition at line 430 of file nodeHash.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, HashJoinTableData::area, Assert, BarrierArriveAndWait(), BarrierAttach(), BarrierPhase(), HashJoinTableData::batchCxt, HashJoinTableData::batches, HashJoinTableData::buckets, ParallelHashJoinState::build_barrier, HashJoinTableData::chunks, HashJoinTableData::collations, HashJoinTableData::curbatch, HashJoinTableData::current_chunk, CurrentMemoryContext, elog, ERROR, EState::es_query_dsa, ExecChooseHashTableSize(), ExecHashBuildSkewHash(), ExecParallelHashJoinSetUpBatches(), ExecParallelHashTableAlloc(), fmgr_info(), forboth, get_op_hash_functions(), HashJoinTableData::growEnabled, ParallelHashJoinState::growth, HashJoinTableData::hashCxt, HashJoinTableData::hashStrict, i, HashJoinTableData::inner_hashfunctions, HashJoinTableData::innerBatchFile, HashJoinTableData::keepNulls, lfirst_oid, list_length(), HashJoinTableData::log2_nbuckets, HashJoinTableData::log2_nbuckets_optimal, MemoryContextSwitchTo(), my_log2(), ParallelHashJoinState::nbatch, HashJoinTableData::nbatch, HashJoinTableData::nbatch_original, HashJoinTableData::nbatch_outstart, ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, HashJoinTableData::nbuckets_optimal, HashJoinTableData::nbuckets_original, ParallelHashJoinState::nparticipants, HashJoinTableData::nSkewBuckets, OidIsValid, op_strict(), HashJoinTableData::outer_hashfunctions, HashJoinTableData::outerBatchFile, outerPlan, palloc(), palloc0(), Plan::parallel_aware, HashJoinTableData::parallel_state, HashState::parallel_state, HashJoinTableData::partialTuples, PHJ_BUILD_ELECTING, PHJ_GROWTH_OK, Hash::plan, PlanState::plan, Plan::plan_rows, Plan::plan_width, PrepareTempTablespaces(), printf, HashState::ps, Hash::rows_total, SKEW_WORK_MEM_PERCENT, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketLen, HashJoinTableData::skewBucketNums, HashJoinTableData::skewEnabled, Hash::skewTable, HashJoinTableData::skewTuples, ParallelHashJoinState::space_allowed, HashJoinTableData::spaceAllowed, HashJoinTableData::spaceAllowedSkew, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, HashJoinTableData::spaceUsedSkew, PlanState::state, HashJoinTableData::totalTuples, HashJoinTableData::unshared, and WAIT_EVENT_HASH_BUILD_ELECT.

Referenced by ExecHashJoinImpl().

431 {
432  Hash *node;
433  HashJoinTable hashtable;
434  Plan *outerNode;
435  size_t space_allowed;
436  int nbuckets;
437  int nbatch;
438  double rows;
439  int num_skew_mcvs;
440  int log2_nbuckets;
441  int nkeys;
442  int i;
443  ListCell *ho;
444  ListCell *hc;
445  MemoryContext oldcxt;
446 
447  /*
448  * Get information about the size of the relation to be hashed (it's the
449  * "outer" subtree of this node, but the inner relation of the hashjoin).
450  * Compute the appropriate size of the hash table.
451  */
452  node = (Hash *) state->ps.plan;
453  outerNode = outerPlan(node);
454 
455  /*
456  * If this is shared hash table with a partial plan, then we can't use
457  * outerNode->plan_rows to estimate its size. We need an estimate of the
458  * total number of rows across all copies of the partial plan.
459  */
460  rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows;
461 
462  ExecChooseHashTableSize(rows, outerNode->plan_width,
463  OidIsValid(node->skewTable),
464  state->parallel_state != NULL,
465  state->parallel_state != NULL ?
466  state->parallel_state->nparticipants - 1 : 0,
467  &space_allowed,
468  &nbuckets, &nbatch, &num_skew_mcvs);
469 
470  /* nbuckets must be a power of 2 */
471  log2_nbuckets = my_log2(nbuckets);
472  Assert(nbuckets == (1 << log2_nbuckets));
473 
474  /*
475  * Initialize the hash table control block.
476  *
477  * The hashtable control block is just palloc'd from the executor's
478  * per-query memory context. Everything else should be kept inside the
479  * subsidiary hashCxt or batchCxt.
480  */
481  hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
482  hashtable->nbuckets = nbuckets;
483  hashtable->nbuckets_original = nbuckets;
484  hashtable->nbuckets_optimal = nbuckets;
485  hashtable->log2_nbuckets = log2_nbuckets;
486  hashtable->log2_nbuckets_optimal = log2_nbuckets;
487  hashtable->buckets.unshared = NULL;
488  hashtable->keepNulls = keepNulls;
489  hashtable->skewEnabled = false;
490  hashtable->skewBucket = NULL;
491  hashtable->skewBucketLen = 0;
492  hashtable->nSkewBuckets = 0;
493  hashtable->skewBucketNums = NULL;
494  hashtable->nbatch = nbatch;
495  hashtable->curbatch = 0;
496  hashtable->nbatch_original = nbatch;
497  hashtable->nbatch_outstart = nbatch;
498  hashtable->growEnabled = true;
499  hashtable->totalTuples = 0;
500  hashtable->partialTuples = 0;
501  hashtable->skewTuples = 0;
502  hashtable->innerBatchFile = NULL;
503  hashtable->outerBatchFile = NULL;
504  hashtable->spaceUsed = 0;
505  hashtable->spacePeak = 0;
506  hashtable->spaceAllowed = space_allowed;
507  hashtable->spaceUsedSkew = 0;
508  hashtable->spaceAllowedSkew =
509  hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100;
510  hashtable->chunks = NULL;
511  hashtable->current_chunk = NULL;
512  hashtable->parallel_state = state->parallel_state;
513  hashtable->area = state->ps.state->es_query_dsa;
514  hashtable->batches = NULL;
515 
516 #ifdef HJDEBUG
517  printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
518  hashtable, nbatch, nbuckets);
519 #endif
520 
521  /*
522  * Create temporary memory contexts in which to keep the hashtable working
523  * storage. See notes in executor/hashjoin.h.
524  */
526  "HashTableContext",
528 
529  hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
530  "HashBatchContext",
532 
533  /* Allocate data that will live for the life of the hashjoin */
534 
535  oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
536 
537  /*
538  * Get info about the hash functions to be used for each hash key. Also
539  * remember whether the join operators are strict.
540  */
541  nkeys = list_length(hashOperators);
542  hashtable->outer_hashfunctions =
543  (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
544  hashtable->inner_hashfunctions =
545  (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
546  hashtable->hashStrict = (bool *) palloc(nkeys * sizeof(bool));
547  hashtable->collations = (Oid *) palloc(nkeys * sizeof(Oid));
548  i = 0;
549  forboth(ho, hashOperators, hc, hashCollations)
550  {
551  Oid hashop = lfirst_oid(ho);
552  Oid left_hashfn;
553  Oid right_hashfn;
554 
555  if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
556  elog(ERROR, "could not find hash function for hash operator %u",
557  hashop);
558  fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]);
559  fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]);
560  hashtable->hashStrict[i] = op_strict(hashop);
561  hashtable->collations[i] = lfirst_oid(hc);
562  i++;
563  }
564 
565  if (nbatch > 1 && hashtable->parallel_state == NULL)
566  {
567  /*
568  * allocate and initialize the file arrays in hashCxt (not needed for
569  * parallel case which uses shared tuplestores instead of raw files)
570  */
571  hashtable->innerBatchFile = (BufFile **)
572  palloc0(nbatch * sizeof(BufFile *));
573  hashtable->outerBatchFile = (BufFile **)
574  palloc0(nbatch * sizeof(BufFile *));
575  /* The files will not be opened until needed... */
576  /* ... but make sure we have temp tablespaces established for them */
578  }
579 
580  MemoryContextSwitchTo(oldcxt);
581 
582  if (hashtable->parallel_state)
583  {
584  ParallelHashJoinState *pstate = hashtable->parallel_state;
585  Barrier *build_barrier;
586 
587  /*
588  * Attach to the build barrier. The corresponding detach operation is
589  * in ExecHashTableDetach. Note that we won't attach to the
590  * batch_barrier for batch 0 yet. We'll attach later and start it out
591  * in PHJ_BATCH_PROBING phase, because batch 0 is allocated up front
592  * and then loaded while hashing (the standard hybrid hash join
593  * algorithm), and we'll coordinate that using build_barrier.
594  */
595  build_barrier = &pstate->build_barrier;
596  BarrierAttach(build_barrier);
597 
598  /*
599  * So far we have no idea whether there are any other participants,
600  * and if so, what phase they are working on. The only thing we care
601  * about at this point is whether someone has already created the
602  * SharedHashJoinBatch objects and the hash table for batch 0. One
603  * backend will be elected to do that now if necessary.
604  */
605  if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECTING &&
607  {
608  pstate->nbatch = nbatch;
609  pstate->space_allowed = space_allowed;
610  pstate->growth = PHJ_GROWTH_OK;
611 
612  /* Set up the shared state for coordinating batches. */
613  ExecParallelHashJoinSetUpBatches(hashtable, nbatch);
614 
615  /*
616  * Allocate batch 0's hash table up front so we can load it
617  * directly while hashing.
618  */
619  pstate->nbuckets = nbuckets;
620  ExecParallelHashTableAlloc(hashtable, 0);
621  }
622 
623  /*
624  * The next Parallel Hash synchronization point is in
625  * MultiExecParallelHash(), which will progress it all the way to
626  * PHJ_BUILD_DONE. The caller must not return control from this
627  * executor node between now and then.
628  */
629  }
630  else
631  {
632  /*
633  * Prepare context for the first-scan space allocations; allocate the
634  * hashbucket array therein, and set each bucket "empty".
635  */
636  MemoryContextSwitchTo(hashtable->batchCxt);
637 
638  hashtable->buckets.unshared = (HashJoinTuple *)
639  palloc0(nbuckets * sizeof(HashJoinTuple));
640 
641  /*
642  * Set up for skew optimization, if possible and there's a need for
643  * more than one batch. (In a one-batch join, there's no point in
644  * it.)
645  */
646  if (nbatch > 1)
647  ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
648 
649  MemoryContextSwitchTo(oldcxt);
650  }
651 
652  return hashtable;
653 }
int log2_nbuckets_optimal
Definition: hashjoin.h:291
double rows_total
Definition: plannodes.h:935
Oid skewTable
Definition: plannodes.h:931
struct ParallelHashJoinState * parallel_state
Definition: execnodes.h:2400
double skewTuples
Definition: hashjoin.h:320
Definition: fmgr.h:56
struct dsa_area * es_query_dsa
Definition: execnodes.h:589
double plan_rows
Definition: plannodes.h:129
bool op_strict(Oid opno)
Definition: lsyscache.c:1340
#define AllocSetContextCreate
Definition: memutils.h:170
bool get_op_hash_functions(Oid opno, RegProcedure *lhs_procno, RegProcedure *rhs_procno)
Definition: lsyscache.c:508
#define forboth(cell1, list1, cell2, list2)
Definition: pg_list.h:419
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define printf(...)
Definition: port.h:199
FmgrInfo * inner_hashfunctions
Definition: hashjoin.h:338
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
Definition: nodeHash.c:3095
EState * state
Definition: execnodes.h:947
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:644
static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
Definition: nodeHash.c:2212
double partialTuples
Definition: hashjoin.h:319
dsa_area * area
Definition: hashjoin.h:356
int * skewBucketNums
Definition: hashjoin.h:308
#define ERROR
Definition: elog.h:43
void PrepareTempTablespaces(void)
Definition: tablespace.c:1323
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:126
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
BufFile ** outerBatchFile
Definition: hashjoin.h:330
Size spaceAllowedSkew
Definition: hashjoin.h:346
bool parallel_aware
Definition: plannodes.h:135
PlanState ps
Definition: execnodes.h:2380
#define PHJ_BUILD_ELECTING
Definition: hashjoin.h:257
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
MemoryContext batchCxt
Definition: hashjoin.h:349
struct HashJoinTableData * HashJoinTable
Definition: execnodes.h:1938
int my_log2(long num)
Definition: dynahash.c:1720
#define outerPlan(node)
Definition: plannodes.h:172
FmgrInfo * outer_hashfunctions
Definition: hashjoin.h:337
HashSkewBucket ** skewBucket
Definition: hashjoin.h:305
int BarrierAttach(Barrier *barrier)
Definition: barrier.c:214
void * palloc0(Size size)
Definition: mcxt.c:980
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
union HashJoinTableData::@95 buckets
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
Plan * plan
Definition: execnodes.h:945
double totalTuples
Definition: hashjoin.h:318
int plan_width
Definition: plannodes.h:130
#define Assert(condition)
Definition: c.h:738
ParallelHashGrowth growth
Definition: hashjoin.h:241
BufFile ** innerBatchFile
Definition: hashjoin.h:329
static int list_length(const List *l)
Definition: pg_list.h:169
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:243
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
Definition: barrier.c:125
HashMemoryChunk chunks
Definition: hashjoin.h:352
Plan plan
Definition: plannodes.h:924
void * palloc(Size size)
Definition: mcxt.c:949
static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
Definition: nodeHash.c:2934
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:297
HashMemoryChunk current_chunk
Definition: hashjoin.h:355
#define elog(elevel,...)
Definition: elog.h:214
int i
bool * hashStrict
Definition: hashjoin.h:339
MemoryContext hashCxt
Definition: hashjoin.h:348
#define SKEW_WORK_MEM_PERCENT
Definition: hashjoin.h:110
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_work_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
Definition: nodeHash.c:667
#define lfirst_oid(lc)
Definition: pg_list.h:192

◆ ExecHashTableDestroy()

void ExecHashTableDestroy ( HashJoinTable  hashtable)

Definition at line 852 of file nodeHash.c.

References BufFileClose(), HashJoinTableData::hashCxt, i, HashJoinTableData::innerBatchFile, MemoryContextDelete(), HashJoinTableData::nbatch, HashJoinTableData::outerBatchFile, and pfree().

Referenced by ExecEndHashJoin(), and ExecReScanHashJoin().

853 {
854  int i;
855 
856  /*
857  * Make sure all the temp files are closed. We skip batch 0, since it
858  * can't have any temp files (and the arrays might not even exist if
859  * nbatch is only 1). Parallel hash joins don't use these files.
860  */
861  if (hashtable->innerBatchFile != NULL)
862  {
863  for (i = 1; i < hashtable->nbatch; i++)
864  {
865  if (hashtable->innerBatchFile[i])
866  BufFileClose(hashtable->innerBatchFile[i]);
867  if (hashtable->outerBatchFile[i])
868  BufFileClose(hashtable->outerBatchFile[i]);
869  }
870  }
871 
872  /* Release working memory (batchCxt is a child, so it goes away too) */
873  MemoryContextDelete(hashtable->hashCxt);
874 
875  /* And drop the control block */
876  pfree(hashtable);
877 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
void BufFileClose(BufFile *file)
Definition: buffile.c:391
void pfree(void *pointer)
Definition: mcxt.c:1056
BufFile ** outerBatchFile
Definition: hashjoin.h:330
BufFile ** innerBatchFile
Definition: hashjoin.h:329
int i
MemoryContext hashCxt
Definition: hashjoin.h:348

◆ ExecHashTableDetach()

void ExecHashTableDetach ( HashJoinTable  hashtable)

Definition at line 3172 of file nodeHash.c.

References HashJoinTableData::area, BarrierDetach(), ParallelHashJoinState::batches, HashJoinTableData::batches, ParallelHashJoinState::build_barrier, dsa_free(), DsaPointerIsValid, i, ParallelHashJoinBatchAccessor::inner_tuples, InvalidDsaPointer, HashJoinTableData::nbatch, ParallelHashJoinBatchAccessor::outer_tuples, HashJoinTableData::parallel_state, sts_end_parallel_scan(), and sts_end_write().

Referenced by ExecHashJoinReInitializeDSM(), and ExecShutdownHashJoin().

3173 {
3174  if (hashtable->parallel_state)
3175  {
3176  ParallelHashJoinState *pstate = hashtable->parallel_state;
3177  int i;
3178 
3179  /* Make sure any temporary files are closed. */
3180  if (hashtable->batches)
3181  {
3182  for (i = 0; i < hashtable->nbatch; ++i)
3183  {
3184  sts_end_write(hashtable->batches[i].inner_tuples);
3185  sts_end_write(hashtable->batches[i].outer_tuples);
3188  }
3189  }
3190 
3191  /* If we're last to detach, clean up shared memory. */
3192  if (BarrierDetach(&pstate->build_barrier))
3193  {
3194  if (DsaPointerIsValid(pstate->batches))
3195  {
3196  dsa_free(hashtable->area, pstate->batches);
3197  pstate->batches = InvalidDsaPointer;
3198  }
3199  }
3200 
3201  hashtable->parallel_state = NULL;
3202  }
3203 }
SharedTuplestoreAccessor * outer_tuples
Definition: hashjoin.h:209
#define InvalidDsaPointer
Definition: dsa.h:78
SharedTuplestoreAccessor * inner_tuples
Definition: hashjoin.h:208
dsa_area * area
Definition: hashjoin.h:356
dsa_pointer batches
Definition: hashjoin.h:236
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
bool BarrierDetach(Barrier *barrier)
Definition: barrier.c:234
#define DsaPointerIsValid(x)
Definition: dsa.h:81
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:820
int i
void sts_end_write(SharedTuplestoreAccessor *accessor)

◆ ExecHashTableDetachBatch()

void ExecHashTableDetachBatch ( HashJoinTable  hashtable)

Definition at line 3115 of file nodeHash.c.

References HashJoinTableData::area, Assert, BarrierArriveAndDetach(), BarrierPhase(), ParallelHashJoinBatch::batch_barrier, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, ParallelHashJoinBatch::chunks, HashJoinTableData::curbatch, dsa_free(), dsa_get_address(), DsaPointerIsValid, ParallelHashJoinBatchAccessor::inner_tuples, InvalidDsaPointer, Max, HashJoinTableData::nbuckets, HashMemoryChunkData::next, next, ParallelHashJoinBatchAccessor::outer_tuples, HashJoinTableData::parallel_state, PHJ_BATCH_DONE, HashMemoryChunkData::shared, ParallelHashJoinBatchAccessor::shared, ParallelHashJoinBatch::size, HashJoinTableData::spacePeak, and sts_end_parallel_scan().

Referenced by ExecHashJoinReInitializeDSM(), ExecParallelHashJoinNewBatch(), and ExecShutdownHashJoin().

3116 {
3117  if (hashtable->parallel_state != NULL &&
3118  hashtable->curbatch >= 0)
3119  {
3120  int curbatch = hashtable->curbatch;
3121  ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
3122 
3123  /* Make sure any temporary files are closed. */
3124  sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
3125  sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
3126 
3127  /* Detach from the batch we were last working on. */
3129  {
3130  /*
3131  * Technically we shouldn't access the barrier because we're no
3132  * longer attached, but since there is no way it's moving after
3133  * this point it seems safe to make the following assertion.
3134  */
3136 
3137  /* Free shared chunks and buckets. */
3138  while (DsaPointerIsValid(batch->chunks))
3139  {
3140  HashMemoryChunk chunk =
3141  dsa_get_address(hashtable->area, batch->chunks);
3142  dsa_pointer next = chunk->next.shared;
3143 
3144  dsa_free(hashtable->area, batch->chunks);
3145  batch->chunks = next;
3146  }
3147  if (DsaPointerIsValid(batch->buckets))
3148  {
3149  dsa_free(hashtable->area, batch->buckets);
3150  batch->buckets = InvalidDsaPointer;
3151  }
3152  }
3153 
3154  /*
3155  * Track the largest batch we've been attached to. Though each
3156  * backend might see a different subset of batches, explain.c will
3157  * scan the results from all backends to find the largest value.
3158  */
3159  hashtable->spacePeak =
3160  Max(hashtable->spacePeak,
3161  batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
3162 
3163  /* Remember that we are not attached to a batch. */
3164  hashtable->curbatch = -1;
3165  }
3166 }
SharedTuplestoreAccessor * outer_tuples
Definition: hashjoin.h:209
#define PHJ_BATCH_DONE
Definition: hashjoin.h:268
static int32 next
Definition: blutils.c:218
#define InvalidDsaPointer
Definition: dsa.h:78
dsa_pointer chunks
Definition: hashjoin.h:156
uint64 dsa_pointer
Definition: dsa.h:62
SharedTuplestoreAccessor * inner_tuples
Definition: hashjoin.h:208
dsa_area * area
Definition: hashjoin.h:356
dsa_pointer shared
Definition: hashjoin.h:127
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:932
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
bool BarrierArriveAndDetach(Barrier *barrier)
Definition: barrier.c:203
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
#define Max(x, y)
Definition: c.h:914
#define Assert(condition)
Definition: c.h:738
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:243
ParallelHashJoinBatch * shared
Definition: hashjoin.h:197
#define DsaPointerIsValid(x)
Definition: dsa.h:81
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:820
union HashMemoryChunkData::@94 next
dsa_pointer buckets
Definition: hashjoin.h:153

◆ ExecHashTableInsert()

void ExecHashTableInsert ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1592 of file nodeHash.c.

References Assert, HashJoinTableData::buckets, HashJoinTableData::curbatch, dense_alloc(), ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecHashIncreaseNumBatches(), ExecHashJoinSaveTuple(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, HashJoinTableData::innerBatchFile, HashJoinTableData::log2_nbuckets_optimal, MaxAllocSize, HashJoinTableData::nbatch, HashJoinTableData::nbuckets_optimal, HashJoinTupleData::next, NTUP_PER_BUCKET, HashJoinTableData::skewTuples, HashJoinTableData::spaceAllowed, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, MinimalTupleData::t_len, HashJoinTableData::totalTuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinNewBatch(), and MultiExecPrivateHash().

1595 {
1596  bool shouldFree;
1597  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1598  int bucketno;
1599  int batchno;
1600 
1601  ExecHashGetBucketAndBatch(hashtable, hashvalue,
1602  &bucketno, &batchno);
1603 
1604  /*
1605  * decide whether to put the tuple in the hash table or a temp file
1606  */
1607  if (batchno == hashtable->curbatch)
1608  {
1609  /*
1610  * put the tuple in hash table
1611  */
1612  HashJoinTuple hashTuple;
1613  int hashTupleSize;
1614  double ntuples = (hashtable->totalTuples - hashtable->skewTuples);
1615 
1616  /* Create the HashJoinTuple */
1617  hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
1618  hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
1619 
1620  hashTuple->hashvalue = hashvalue;
1621  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1622 
1623  /*
1624  * We always reset the tuple-matched flag on insertion. This is okay
1625  * even when reloading a tuple from a batch file, since the tuple
1626  * could not possibly have been matched to an outer tuple before it
1627  * went into the batch file.
1628  */
1630 
1631  /* Push it onto the front of the bucket's list */
1632  hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
1633  hashtable->buckets.unshared[bucketno] = hashTuple;
1634 
1635  /*
1636  * Increase the (optimal) number of buckets if we just exceeded the
1637  * NTUP_PER_BUCKET threshold, but only when there's still a single
1638  * batch.
1639  */
1640  if (hashtable->nbatch == 1 &&
1641  ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))
1642  {
1643  /* Guard against integer overflow and alloc size overflow */
1644  if (hashtable->nbuckets_optimal <= INT_MAX / 2 &&
1645  hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple))
1646  {
1647  hashtable->nbuckets_optimal *= 2;
1648  hashtable->log2_nbuckets_optimal += 1;
1649  }
1650  }
1651 
1652  /* Account for space used, and back off if we've used too much */
1653  hashtable->spaceUsed += hashTupleSize;
1654  if (hashtable->spaceUsed > hashtable->spacePeak)
1655  hashtable->spacePeak = hashtable->spaceUsed;
1656  if (hashtable->spaceUsed +
1657  hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
1658  > hashtable->spaceAllowed)
1659  ExecHashIncreaseNumBatches(hashtable);
1660  }
1661  else
1662  {
1663  /*
1664  * put the tuple into a temp file for later batches
1665  */
1666  Assert(batchno > hashtable->curbatch);
1667  ExecHashJoinSaveTuple(tuple,
1668  hashvalue,
1669  &hashtable->innerBatchFile[batchno]);
1670  }
1671 
1672  if (shouldFree)
1673  heap_free_minimal_tuple(tuple);
1674 }
int log2_nbuckets_optimal
Definition: hashjoin.h:291
double skewTuples
Definition: hashjoin.h:320
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1662
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
Definition: nodeHash.c:885
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
Definition: nodeHash.c:1898
struct HashJoinTupleData * unshared
Definition: hashjoin.h:72
void heap_free_minimal_tuple(MinimalTuple mtup)
Definition: heaptuple.c:1427
struct HashJoinTupleData * HashJoinTuple
Definition: execnodes.h:1937
union HashJoinTupleData::@93 next
#define MaxAllocSize
Definition: memutils.h:40
static void * dense_alloc(HashJoinTable hashtable, Size size)
Definition: nodeHash.c:2706
#define NTUP_PER_BUCKET
Definition: nodeHash.c:664
union HashJoinTableData::@95 buckets
#define HJTUPLE_OVERHEAD
Definition: hashjoin.h:79
double totalTuples
Definition: hashjoin.h:318
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
#define Assert(condition)
Definition: c.h:738
BufFile ** innerBatchFile
Definition: hashjoin.h:329
#define HeapTupleHeaderClearMatch(tup)
Definition: htup_details.h:526
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:297
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr)
uint32 hashvalue
Definition: hashjoin.h:75

◆ ExecHashTableReset()

void ExecHashTableReset ( HashJoinTable  hashtable)

Definition at line 2137 of file nodeHash.c.

References HashJoinTableData::batchCxt, HashJoinTableData::buckets, HashJoinTableData::chunks, MemoryContextReset(), MemoryContextSwitchTo(), HashJoinTableData::nbuckets, palloc0(), HashJoinTableData::spaceUsed, and HashJoinTableData::unshared.

Referenced by ExecHashJoinNewBatch().

2138 {
2139  MemoryContext oldcxt;
2140  int nbuckets = hashtable->nbuckets;
2141 
2142  /*
2143  * Release all the hash buckets and tuples acquired in the prior pass, and
2144  * reinitialize the context for a new pass.
2145  */
2146  MemoryContextReset(hashtable->batchCxt);
2147  oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
2148 
2149  /* Reallocate and reinitialize the hash bucket headers. */
2150  hashtable->buckets.unshared = (HashJoinTuple *)
2151  palloc0(nbuckets * sizeof(HashJoinTuple));
2152 
2153  hashtable->spaceUsed = 0;
2154 
2155  MemoryContextSwitchTo(oldcxt);
2156 
2157  /* Forget the chunks (the memory was freed by the context reset above). */
2158  hashtable->chunks = NULL;
2159 }
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
MemoryContext batchCxt
Definition: hashjoin.h:349
void * palloc0(Size size)
Definition: mcxt.c:980
union HashJoinTableData::@95 buckets
HashMemoryChunk chunks
Definition: hashjoin.h:352
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:297

◆ ExecHashTableResetMatchFlags()

void ExecHashTableResetMatchFlags ( HashJoinTable  hashtable)

Definition at line 2166 of file nodeHash.c.

References HashJoinTableData::buckets, HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, i, HashJoinTableData::nbuckets, HashJoinTupleData::next, HashJoinTableData::nSkewBuckets, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketNums, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecReScanHashJoin().

2167 {
2168  HashJoinTuple tuple;
2169  int i;
2170 
2171  /* Reset all flags in the main table ... */
2172  for (i = 0; i < hashtable->nbuckets; i++)
2173  {
2174  for (tuple = hashtable->buckets.unshared[i]; tuple != NULL;
2175  tuple = tuple->next.unshared)
2177  }
2178 
2179  /* ... and the same for the skew buckets, if any */
2180  for (i = 0; i < hashtable->nSkewBuckets; i++)
2181  {
2182  int j = hashtable->skewBucketNums[i];
2183  HashSkewBucket *skewBucket = hashtable->skewBucket[j];
2184 
2185  for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared)
2187  }
2188 }
int * skewBucketNums
Definition: hashjoin.h:308
struct HashJoinTupleData * unshared
Definition: hashjoin.h:72
HashJoinTuple tuples
Definition: hashjoin.h:105
union HashJoinTupleData::@93 next
HashSkewBucket ** skewBucket
Definition: hashjoin.h:305
union HashJoinTableData::@95 buckets
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
#define HeapTupleHeaderClearMatch(tup)
Definition: htup_details.h:526
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:297
int i

◆ ExecInitHash()

HashState* ExecInitHash ( Hash node,
EState estate,
int  eflags 
)

Definition at line 353 of file nodeHash.c.

References Assert, EXEC_FLAG_BACKWARD, EXEC_FLAG_MARK, ExecAssignExprContext(), ExecHash(), ExecInitExprList(), ExecInitNode(), ExecInitResultTupleSlotTL(), PlanState::ExecProcNode, Hash::hashkeys, HashState::hashkeys, HashState::hashtable, makeNode, NIL, outerPlan, outerPlanState, Hash::plan, PlanState::plan, HashState::ps, PlanState::ps_ProjInfo, Plan::qual, PlanState::state, and TTSOpsMinimalTuple.

Referenced by ExecInitNode().

354 {
355  HashState *hashstate;
356 
357  /* check for unsupported flags */
358  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
359 
360  /*
361  * create state structure
362  */
363  hashstate = makeNode(HashState);
364  hashstate->ps.plan = (Plan *) node;
365  hashstate->ps.state = estate;
366  hashstate->ps.ExecProcNode = ExecHash;
367  hashstate->hashtable = NULL;
368  hashstate->hashkeys = NIL; /* will be set by parent HashJoin */
369 
370  /*
371  * Miscellaneous initialization
372  *
373  * create expression context for node
374  */
375  ExecAssignExprContext(estate, &hashstate->ps);
376 
377  /*
378  * initialize child nodes
379  */
380  outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags);
381 
382  /*
383  * initialize our result slot and type. No need to build projection
384  * because this node doesn't do projections.
385  */
387  hashstate->ps.ps_ProjInfo = NULL;
388 
389  /*
390  * initialize child expressions
391  */
392  Assert(node->plan.qual == NIL);
393  hashstate->hashkeys =
394  ExecInitExprList(node->hashkeys, (PlanState *) hashstate);
395 
396  return hashstate;
397 }
#define NIL
Definition: pg_list.h:65
List * qual
Definition: plannodes.h:143
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:985
HashJoinTable hashtable
Definition: execnodes.h:2381
EState * state
Definition: execnodes.h:947
#define EXEC_FLAG_BACKWARD
Definition: executor.h:58
#define outerPlanState(node)
Definition: execnodes.h:1039
List * ExecInitExprList(List *nodes, PlanState *parent)
Definition: execExpr.c:318
List * hashkeys
Definition: execnodes.h:2382
PlanState ps
Definition: execnodes.h:2380
#define outerPlan(node)
Definition: plannodes.h:172
static TupleTableSlot * ExecHash(PlanState *pstate)
Definition: nodeHash.c:91
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:951
List * hashkeys
Definition: plannodes.h:930
Plan * plan
Definition: execnodes.h:945
#define makeNode(_type_)
Definition: nodes.h:577
#define Assert(condition)
Definition: c.h:738
#define EXEC_FLAG_MARK
Definition: executor.h:59
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:485
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1769
Plan plan
Definition: plannodes.h:924
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:139
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:85

◆ ExecParallelHashTableAlloc()

void ExecParallelHashTableAlloc ( HashJoinTable  hashtable,
int  batchno 
)

Definition at line 3095 of file nodeHash.c.

References HashJoinTableData::area, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, dsa_allocate, dsa_get_address(), dsa_pointer_atomic_init, i, InvalidDsaPointer, ParallelHashJoinState::nbuckets, HashJoinTableData::parallel_state, and ParallelHashJoinBatchAccessor::shared.

Referenced by ExecHashTableCreate(), and ExecParallelHashJoinNewBatch().

3096 {
3097  ParallelHashJoinBatch *batch = hashtable->batches[batchno].shared;
3098  dsa_pointer_atomic *buckets;
3099  int nbuckets = hashtable->parallel_state->nbuckets;
3100  int i;
3101 
3102  batch->buckets =
3103  dsa_allocate(hashtable->area, sizeof(dsa_pointer_atomic) * nbuckets);
3104  buckets = (dsa_pointer_atomic *)
3105  dsa_get_address(hashtable->area, batch->buckets);
3106  for (i = 0; i < nbuckets; ++i)
3108 }
#define InvalidDsaPointer
Definition: dsa.h:78
dsa_area * area
Definition: hashjoin.h:356
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:932
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
ParallelHashJoinBatch * shared
Definition: hashjoin.h:197
#define dsa_pointer_atomic_init
Definition: dsa.h:64
int i
#define dsa_allocate(area, size)
Definition: dsa.h:84
dsa_pointer buckets
Definition: hashjoin.h:153

◆ ExecParallelHashTableInsert()

void ExecParallelHashTableInsert ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1681 of file nodeHash.c.

References Assert, BarrierPhase(), HashJoinTableData::batches, HashJoinTableData::buckets, ParallelHashJoinState::build_barrier, ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecParallelHashPushTuple(), ExecParallelHashTupleAlloc(), ExecParallelHashTuplePrealloc(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, ParallelHashJoinBatchAccessor::inner_tuples, MAXALIGN, ParallelHashJoinBatchAccessor::ntuples, HashJoinTableData::parallel_state, PHJ_BUILD_HASHING_INNER, ParallelHashJoinBatchAccessor::preallocated, HashJoinTableData::shared, sts_puttuple(), and MinimalTupleData::t_len.

Referenced by MultiExecParallelHash().

1684 {
1685  bool shouldFree;
1686  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1687  dsa_pointer shared;
1688  int bucketno;
1689  int batchno;
1690 
1691 retry:
1692  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1693 
1694  if (batchno == 0)
1695  {
1696  HashJoinTuple hashTuple;
1697 
1698  /* Try to load it into memory. */
1701  hashTuple = ExecParallelHashTupleAlloc(hashtable,
1702  HJTUPLE_OVERHEAD + tuple->t_len,
1703  &shared);
1704  if (hashTuple == NULL)
1705  goto retry;
1706 
1707  /* Store the hash value in the HashJoinTuple header. */
1708  hashTuple->hashvalue = hashvalue;
1709  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1710 
1711  /* Push it onto the front of the bucket's list */
1712  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1713  hashTuple, shared);
1714  }
1715  else
1716  {
1717  size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
1718 
1719  Assert(batchno > 0);
1720 
1721  /* Try to preallocate space in the batch if necessary. */
1722  if (hashtable->batches[batchno].preallocated < tuple_size)
1723  {
1724  if (!ExecParallelHashTuplePrealloc(hashtable, batchno, tuple_size))
1725  goto retry;
1726  }
1727 
1728  Assert(hashtable->batches[batchno].preallocated >= tuple_size);
1729  hashtable->batches[batchno].preallocated -= tuple_size;
1730  sts_puttuple(hashtable->batches[batchno].inner_tuples, &hashvalue,
1731  tuple);
1732  }
1733  ++hashtable->batches[batchno].ntuples;
1734 
1735  if (shouldFree)
1736  heap_free_minimal_tuple(tuple);
1737 }
dsa_pointer_atomic * shared
Definition: hashjoin.h:299
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1662
uint64 dsa_pointer
Definition: dsa.h:62
SharedTuplestoreAccessor * inner_tuples
Definition: hashjoin.h:208
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
Definition: nodeHash.c:1898
void heap_free_minimal_tuple(MinimalTuple mtup)
Definition: heaptuple.c:1427
static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
Definition: nodeHash.c:3319
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
union HashJoinTableData::@95 buckets
#define HJTUPLE_OVERHEAD
Definition: hashjoin.h:79
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
#define Assert(condition)
Definition: c.h:738
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:243
#define MAXALIGN(LEN)
Definition: c.h:691
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
Definition: nodeHash.c:3239
#define PHJ_BUILD_HASHING_INNER
Definition: hashjoin.h:259
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
Definition: nodeHash.c:2786
uint32 hashvalue
Definition: hashjoin.h:75

◆ ExecParallelHashTableInsertCurrentBatch()

void ExecParallelHashTableInsertCurrentBatch ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1746 of file nodeHash.c.

References Assert, HashJoinTableData::buckets, HashJoinTableData::curbatch, ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecParallelHashPushTuple(), ExecParallelHashTupleAlloc(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, HashJoinTableData::shared, and MinimalTupleData::t_len.

Referenced by ExecParallelHashJoinNewBatch().

1749 {
1750  bool shouldFree;
1751  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1752  HashJoinTuple hashTuple;
1753  dsa_pointer shared;
1754  int batchno;
1755  int bucketno;
1756 
1757  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1758  Assert(batchno == hashtable->curbatch);
1759  hashTuple = ExecParallelHashTupleAlloc(hashtable,
1760  HJTUPLE_OVERHEAD + tuple->t_len,
1761  &shared);
1762  hashTuple->hashvalue = hashvalue;
1763  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1765  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1766  hashTuple, shared);
1767 
1768  if (shouldFree)
1769  heap_free_minimal_tuple(tuple);
1770 }
dsa_pointer_atomic * shared
Definition: hashjoin.h:299
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1662
uint64 dsa_pointer
Definition: dsa.h:62
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
Definition: nodeHash.c:1898
void heap_free_minimal_tuple(MinimalTuple mtup)
Definition: heaptuple.c:1427
union HashJoinTableData::@95 buckets
#define HJTUPLE_OVERHEAD
Definition: hashjoin.h:79
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
#define Assert(condition)
Definition: c.h:738
#define HeapTupleHeaderClearMatch(tup)
Definition: htup_details.h:526
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
Definition: nodeHash.c:3239
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
Definition: nodeHash.c:2786
uint32 hashvalue
Definition: hashjoin.h:75

◆ ExecParallelHashTableSetCurrentBatch()

void ExecParallelHashTableSetCurrentBatch ( HashJoinTable  hashtable,
int  batchno 
)

Definition at line 3257 of file nodeHash.c.

References HashJoinTableData::area, Assert, ParallelHashJoinBatchAccessor::at_least_one_chunk, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, HashJoinTableData::buckets, HashJoinTableData::curbatch, HashJoinTableData::current_chunk, HashJoinTableData::current_chunk_shared, dsa_get_address(), InvalidDsaPointer, HashJoinTableData::log2_nbuckets, my_log2(), ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, HashJoinTableData::parallel_state, ParallelHashJoinBatchAccessor::shared, and HashJoinTableData::shared.

Referenced by ExecParallelHashIncreaseNumBatches(), ExecParallelHashIncreaseNumBuckets(), ExecParallelHashJoinNewBatch(), and MultiExecParallelHash().

3258 {
3259  Assert(hashtable->batches[batchno].shared->buckets != InvalidDsaPointer);
3260 
3261  hashtable->curbatch = batchno;
3262  hashtable->buckets.shared = (dsa_pointer_atomic *)
3263  dsa_get_address(hashtable->area,
3264  hashtable->batches[batchno].shared->buckets);
3265  hashtable->nbuckets = hashtable->parallel_state->nbuckets;
3266  hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
3267  hashtable->current_chunk = NULL;
3269  hashtable->batches[batchno].at_least_one_chunk = false;
3270 }
dsa_pointer current_chunk_shared
Definition: hashjoin.h:359
dsa_pointer_atomic * shared
Definition: hashjoin.h:299
#define InvalidDsaPointer
Definition: dsa.h:78
dsa_area * area
Definition: hashjoin.h:356
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:932
int my_log2(long num)
Definition: dynahash.c:1720
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
union HashJoinTableData::@95 buckets
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
#define Assert(condition)
Definition: c.h:738
ParallelHashJoinBatch * shared
Definition: hashjoin.h:197
HashMemoryChunk current_chunk
Definition: hashjoin.h:355
dsa_pointer buckets
Definition: hashjoin.h:153

◆ ExecParallelScanHashBucket()

bool ExecParallelScanHashBucket ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 1991 of file nodeHash.c.

References ExprContext::ecxt_innertuple, ExecParallelHashFirstTuple(), ExecParallelHashNextTuple(), ExecQualAndReset(), ExecStoreMinimalTuple(), HashJoinState::hashclauses, HashJoinTupleData::hashvalue, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurHashValue, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, and HJTUPLE_MINTUPLE.

Referenced by ExecHashJoinImpl().

1993 {
1994  ExprState *hjclauses = hjstate->hashclauses;
1995  HashJoinTable hashtable = hjstate->hj_HashTable;
1996  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
1997  uint32 hashvalue = hjstate->hj_CurHashValue;
1998 
1999  /*
2000  * hj_CurTuple is the address of the tuple last returned from the current
2001  * bucket, or NULL if it's time to start scanning a new bucket.
2002  */
2003  if (hashTuple != NULL)
2004  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2005  else
2006  hashTuple = ExecParallelHashFirstTuple(hashtable,
2007  hjstate->hj_CurBucketNo);
2008 
2009  while (hashTuple != NULL)
2010  {
2011  if (hashTuple->hashvalue == hashvalue)
2012  {
2013  TupleTableSlot *inntuple;
2014 
2015  /* insert hashtable's tuple into exec slot so ExecQual sees it */
2016  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2017  hjstate->hj_HashTupleSlot,
2018  false); /* do not pfree */
2019  econtext->ecxt_innertuple = inntuple;
2020 
2021  if (ExecQualAndReset(hjclauses, econtext))
2022  {
2023  hjstate->hj_CurTuple = hashTuple;
2024  return true;
2025  }
2026  }
2027 
2028  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2029  }
2030 
2031  /*
2032  * no match
2033  */
2034  return false;
2035 }
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1416
uint32 hj_CurHashValue
Definition: execnodes.h:1948
HashJoinTuple hj_CurTuple
Definition: execnodes.h:1951
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:228
unsigned int uint32
Definition: c.h:367
int hj_CurBucketNo
Definition: execnodes.h:1949
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
Definition: executor.h:397
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
static HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable table, int bucketno)
Definition: nodeHash.c:3209
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:1953
static HashJoinTuple ExecParallelHashNextTuple(HashJoinTable table, HashJoinTuple tuple)
Definition: nodeHash.c:3225
HashJoinTable hj_HashTable
Definition: execnodes.h:1947
uint32 hashvalue
Definition: hashjoin.h:75
ExprState * hashclauses
Definition: execnodes.h:1943

◆ ExecPrepHashTableForUnmatched()

void ExecPrepHashTableForUnmatched ( HashJoinState hjstate)

Definition at line 2042 of file nodeHash.c.

References HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurSkewBucketNo, and HashJoinState::hj_CurTuple.

Referenced by ExecHashJoinImpl().

2043 {
2044  /*----------
2045  * During this scan we use the HashJoinState fields as follows:
2046  *
2047  * hj_CurBucketNo: next regular bucket to scan
2048  * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
2049  * hj_CurTuple: last tuple returned, or NULL to start next bucket
2050  *----------
2051  */
2052  hjstate->hj_CurBucketNo = 0;
2053  hjstate->hj_CurSkewBucketNo = 0;
2054  hjstate->hj_CurTuple = NULL;
2055 }
int hj_CurSkewBucketNo
Definition: execnodes.h:1950
HashJoinTuple hj_CurTuple
Definition: execnodes.h:1951
int hj_CurBucketNo
Definition: execnodes.h:1949

◆ ExecReScanHash()

void ExecReScanHash ( HashState node)

Definition at line 2192 of file nodeHash.c.

References PlanState::chgParam, ExecReScan(), PlanState::lefttree, and HashState::ps.

Referenced by ExecReScan().

2193 {
2194  /*
2195  * if chgParam of subnode is not null then plan will be re-scanned by
2196  * first ExecProcNode.
2197  */
2198  if (node->ps.lefttree->chgParam == NULL)
2199  ExecReScan(node->ps.lefttree);
2200 }
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
struct PlanState * lefttree
Definition: execnodes.h:967
PlanState ps
Definition: execnodes.h:2380
Bitmapset * chgParam
Definition: execnodes.h:977

◆ ExecScanHashBucket()

bool ExecScanHashBucket ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 1930 of file nodeHash.c.

References HashJoinTableData::buckets, ExprContext::ecxt_innertuple, ExecQualAndReset(), ExecStoreMinimalTuple(), HashJoinState::hashclauses, HashJoinTupleData::hashvalue, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurHashValue, HashJoinState::hj_CurSkewBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, INVALID_SKEW_BUCKET_NO, HashJoinTupleData::next, HashJoinTableData::skewBucket, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

1932 {
1933  ExprState *hjclauses = hjstate->hashclauses;
1934  HashJoinTable hashtable = hjstate->hj_HashTable;
1935  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
1936  uint32 hashvalue = hjstate->hj_CurHashValue;
1937 
1938  /*
1939  * hj_CurTuple is the address of the tuple last returned from the current
1940  * bucket, or NULL if it's time to start scanning a new bucket.
1941  *
1942  * If the tuple hashed to a skew bucket then scan the skew bucket
1943  * otherwise scan the standard hashtable bucket.
1944  */
1945  if (hashTuple != NULL)
1946  hashTuple = hashTuple->next.unshared;
1947  else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
1948  hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
1949  else
1950  hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
1951 
1952  while (hashTuple != NULL)
1953  {
1954  if (hashTuple->hashvalue == hashvalue)
1955  {
1956  TupleTableSlot *inntuple;
1957 
1958  /* insert hashtable's tuple into exec slot so ExecQual sees it */
1959  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
1960  hjstate->hj_HashTupleSlot,
1961  false); /* do not pfree */
1962  econtext->ecxt_innertuple = inntuple;
1963 
1964  if (ExecQualAndReset(hjclauses, econtext))
1965  {
1966  hjstate->hj_CurTuple = hashTuple;
1967  return true;
1968  }
1969  }
1970 
1971  hashTuple = hashTuple->next.unshared;
1972  }
1973 
1974  /*
1975  * no match
1976  */
1977  return false;
1978 }
#define INVALID_SKEW_BUCKET_NO
Definition: hashjoin.h:109
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1416
uint32 hj_CurHashValue
Definition: execnodes.h:1948
int hj_CurSkewBucketNo
Definition: execnodes.h:1950
struct HashJoinTupleData * unshared
Definition: hashjoin.h:72
HashJoinTuple hj_CurTuple
Definition: execnodes.h:1951
HashJoinTuple tuples
Definition: hashjoin.h:105
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:228
unsigned int uint32
Definition: c.h:367
union HashJoinTupleData::@93 next
int hj_CurBucketNo
Definition: execnodes.h:1949
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
Definition: executor.h:397
HashSkewBucket ** skewBucket
Definition: hashjoin.h:305
union HashJoinTableData::@95 buckets
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:1953
HashJoinTable hj_HashTable
Definition: execnodes.h:1947
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:297
uint32 hashvalue
Definition: hashjoin.h:75
ExprState * hashclauses
Definition: execnodes.h:1943

◆ ExecScanHashTableForUnmatched()

bool ExecScanHashTableForUnmatched ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 2066 of file nodeHash.c.

References HashJoinTableData::buckets, CHECK_FOR_INTERRUPTS, ExprContext::ecxt_innertuple, ExecStoreMinimalTuple(), HeapTupleHeaderHasMatch, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurSkewBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, HashJoinTableData::nbuckets, HashJoinTupleData::next, HashJoinTableData::nSkewBuckets, ResetExprContext, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketNums, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

2067 {
2068  HashJoinTable hashtable = hjstate->hj_HashTable;
2069  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2070 
2071  for (;;)
2072  {
2073  /*
2074  * hj_CurTuple is the address of the tuple last returned from the
2075  * current bucket, or NULL if it's time to start scanning a new
2076  * bucket.
2077  */
2078  if (hashTuple != NULL)
2079  hashTuple = hashTuple->next.unshared;
2080  else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
2081  {
2082  hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
2083  hjstate->hj_CurBucketNo++;
2084  }
2085  else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
2086  {
2087  int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
2088 
2089  hashTuple = hashtable->skewBucket[j]->tuples;
2090  hjstate->hj_CurSkewBucketNo++;
2091  }
2092  else
2093  break; /* finished all buckets */
2094 
2095  while (hashTuple != NULL)
2096  {
2097  if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
2098  {
2099  TupleTableSlot *inntuple;
2100 
2101  /* insert hashtable's tuple into exec slot */
2102  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2103  hjstate->hj_HashTupleSlot,
2104  false); /* do not pfree */
2105  econtext->ecxt_innertuple = inntuple;
2106 
2107  /*
2108  * Reset temp memory each time; although this function doesn't
2109  * do any qual eval, the caller will, so let's keep it
2110  * parallel to ExecScanHashBucket.
2111  */
2112  ResetExprContext(econtext);
2113 
2114  hjstate->hj_CurTuple = hashTuple;
2115  return true;
2116  }
2117 
2118  hashTuple = hashTuple->next.unshared;
2119  }
2120 
2121  /* allow this loop to be cancellable */
2123  }
2124 
2125  /*
2126  * no more unmatched tuples
2127  */
2128  return false;
2129 }
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1416
int * skewBucketNums
Definition: hashjoin.h:308
int hj_CurSkewBucketNo
Definition: execnodes.h:1950
struct HashJoinTupleData * unshared
Definition: hashjoin.h:72
HashJoinTuple hj_CurTuple
Definition: execnodes.h:1951
HashJoinTuple tuples
Definition: hashjoin.h:105
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:228
union HashJoinTupleData::@93 next
int hj_CurBucketNo
Definition: execnodes.h:1949
HashSkewBucket ** skewBucket
Definition: hashjoin.h:305
union HashJoinTableData::@95 buckets
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
#define HeapTupleHeaderHasMatch(tup)
Definition: htup_details.h:516
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:1953
HashJoinTable hj_HashTable
Definition: execnodes.h:1947
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:297
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
#define ResetExprContext(econtext)
Definition: executor.h:501

◆ ExecShutdownHash()

void ExecShutdownHash ( HashState node)

Definition at line 2640 of file nodeHash.c.

References ExecHashAccumInstrumentation(), HashState::hashtable, HashState::hinstrument, PlanState::instrument, palloc0(), and HashState::ps.

Referenced by ExecShutdownNode().

2641 {
2642  /* Allocate save space if EXPLAIN'ing and we didn't do so already */
2643  if (node->ps.instrument && !node->hinstrument)
2644  node->hinstrument = (HashInstrumentation *)
2645  palloc0(sizeof(HashInstrumentation));
2646  /* Now accumulate data for the current (final) hash table */
2647  if (node->hinstrument && node->hashtable)
2649 }
Instrumentation * instrument
Definition: execnodes.h:955
HashJoinTable hashtable
Definition: execnodes.h:2381
void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
Definition: nodeHash.c:2687
PlanState ps
Definition: execnodes.h:2380
HashInstrumentation * hinstrument
Definition: execnodes.h:2397
void * palloc0(Size size)
Definition: mcxt.c:980

◆ MultiExecHash()

Node* MultiExecHash ( HashState node)

Definition at line 105 of file nodeHash.c.

References HashState::hashtable, InstrStartNode(), InstrStopNode(), PlanState::instrument, MultiExecParallelHash(), MultiExecPrivateHash(), HashState::parallel_state, HashJoinTableData::partialTuples, and HashState::ps.

Referenced by MultiExecProcNode().

106 {
107  /* must provide our own instrumentation support */
108  if (node->ps.instrument)
110 
111  if (node->parallel_state != NULL)
112  MultiExecParallelHash(node);
113  else
114  MultiExecPrivateHash(node);
115 
116  /* must provide our own instrumentation support */
117  if (node->ps.instrument)
119 
120  /*
121  * We do not return the hash table directly because it's not a subtype of
122  * Node, and so would violate the MultiExecProcNode API. Instead, our
123  * parent Hashjoin node is expected to know how to fish it out of our node
124  * state. Ugly but not really worth cleaning up, since Hashjoin knows
125  * quite a bit more about Hash besides that.
126  */
127  return NULL;
128 }
struct ParallelHashJoinState * parallel_state
Definition: execnodes.h:2400
void InstrStopNode(Instrumentation *instr, double nTuples)
Definition: instrument.c:83
Instrumentation * instrument
Definition: execnodes.h:955
HashJoinTable hashtable
Definition: execnodes.h:2381
static void MultiExecPrivateHash(HashState *node)
Definition: nodeHash.c:138
double partialTuples
Definition: hashjoin.h:319
void InstrStartNode(Instrumentation *instr)
Definition: instrument.c:67
PlanState ps
Definition: execnodes.h:2380
static void MultiExecParallelHash(HashState *node)
Definition: nodeHash.c:214