PostgreSQL Source Code  git master
nodeHash.h File Reference
#include "access/parallel.h"
#include "nodes/execnodes.h"
Include dependency graph for nodeHash.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

HashStateExecInitHash (Hash *node, EState *estate, int eflags)
 
NodeMultiExecHash (HashState *node)
 
void ExecEndHash (HashState *node)
 
void ExecReScanHash (HashState *node)
 
HashJoinTable ExecHashTableCreate (HashState *state, List *hashOperators, List *hashCollations, bool keepNulls)
 
void ExecParallelHashTableAlloc (HashJoinTable hashtable, int batchno)
 
void ExecHashTableDestroy (HashJoinTable hashtable)
 
void ExecHashTableDetach (HashJoinTable hashtable)
 
void ExecHashTableDetachBatch (HashJoinTable hashtable)
 
void ExecParallelHashTableSetCurrentBatch (HashJoinTable hashtable, int batchno)
 
void ExecHashTableInsert (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
void ExecParallelHashTableInsert (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
void ExecParallelHashTableInsertCurrentBatch (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
bool ExecHashGetHashValue (HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, bool outer_tuple, bool keep_nulls, uint32 *hashvalue)
 
void ExecHashGetBucketAndBatch (HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
 
bool ExecScanHashBucket (HashJoinState *hjstate, ExprContext *econtext)
 
bool ExecParallelScanHashBucket (HashJoinState *hjstate, ExprContext *econtext)
 
void ExecPrepHashTableForUnmatched (HashJoinState *hjstate)
 
bool ExecParallelPrepHashTableForUnmatched (HashJoinState *hjstate)
 
bool ExecScanHashTableForUnmatched (HashJoinState *hjstate, ExprContext *econtext)
 
bool ExecParallelScanHashTableForUnmatched (HashJoinState *hjstate, ExprContext *econtext)
 
void ExecHashTableReset (HashJoinTable hashtable)
 
void ExecHashTableResetMatchFlags (HashJoinTable hashtable)
 
void ExecChooseHashTableSize (double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
 
int ExecHashGetSkewBucket (HashJoinTable hashtable, uint32 hashvalue)
 
void ExecHashEstimate (HashState *node, ParallelContext *pcxt)
 
void ExecHashInitializeDSM (HashState *node, ParallelContext *pcxt)
 
void ExecHashInitializeWorker (HashState *node, ParallelWorkerContext *pwcxt)
 
void ExecHashRetrieveInstrumentation (HashState *node)
 
void ExecShutdownHash (HashState *node)
 
void ExecHashAccumInstrumentation (HashInstrumentation *instrument, HashJoinTable hashtable)
 

Function Documentation

◆ ExecChooseHashTableSize()

void ExecChooseHashTableSize ( double  ntuples,
int  tupwidth,
bool  useskew,
bool  try_combined_hash_mem,
int  parallel_workers,
size_t *  space_allowed,
int *  numbuckets,
int *  numbatches,
int *  num_skew_mcvs 
)

Definition at line 681 of file nodeHash.c.

688 {
689  int tupsize;
690  double inner_rel_bytes;
691  size_t hash_table_bytes;
692  size_t bucket_bytes;
693  size_t max_pointers;
694  int nbatch = 1;
695  int nbuckets;
696  double dbuckets;
697 
698  /* Force a plausible relation size if no info */
699  if (ntuples <= 0.0)
700  ntuples = 1000.0;
701 
702  /*
703  * Estimate tupsize based on footprint of tuple in hashtable... note this
704  * does not allow for any palloc overhead. The manipulations of spaceUsed
705  * don't count palloc overhead either.
706  */
707  tupsize = HJTUPLE_OVERHEAD +
709  MAXALIGN(tupwidth);
710  inner_rel_bytes = ntuples * tupsize;
711 
712  /*
713  * Compute in-memory hashtable size limit from GUCs.
714  */
715  hash_table_bytes = get_hash_memory_limit();
716 
717  /*
718  * Parallel Hash tries to use the combined hash_mem of all workers to
719  * avoid the need to batch. If that won't work, it falls back to hash_mem
720  * per worker and tries to process batches in parallel.
721  */
722  if (try_combined_hash_mem)
723  {
724  /* Careful, this could overflow size_t */
725  double newlimit;
726 
727  newlimit = (double) hash_table_bytes * (double) (parallel_workers + 1);
728  newlimit = Min(newlimit, (double) SIZE_MAX);
729  hash_table_bytes = (size_t) newlimit;
730  }
731 
732  *space_allowed = hash_table_bytes;
733 
734  /*
735  * If skew optimization is possible, estimate the number of skew buckets
736  * that will fit in the memory allowed, and decrement the assumed space
737  * available for the main hash table accordingly.
738  *
739  * We make the optimistic assumption that each skew bucket will contain
740  * one inner-relation tuple. If that turns out to be low, we will recover
741  * at runtime by reducing the number of skew buckets.
742  *
743  * hashtable->skewBucket will have up to 8 times as many HashSkewBucket
744  * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash
745  * will round up to the next power of 2 and then multiply by 4 to reduce
746  * collisions.
747  */
748  if (useskew)
749  {
750  size_t bytes_per_mcv;
751  size_t skew_mcvs;
752 
753  /*----------
754  * Compute number of MCVs we could hold in hash_table_bytes
755  *
756  * Divisor is:
757  * size of a hash tuple +
758  * worst-case size of skewBucket[] per MCV +
759  * size of skewBucketNums[] entry +
760  * size of skew bucket struct itself
761  *----------
762  */
763  bytes_per_mcv = tupsize +
764  (8 * sizeof(HashSkewBucket *)) +
765  sizeof(int) +
767  skew_mcvs = hash_table_bytes / bytes_per_mcv;
768 
769  /*
770  * Now scale by SKEW_HASH_MEM_PERCENT (we do it in this order so as
771  * not to worry about size_t overflow in the multiplication)
772  */
773  skew_mcvs = (skew_mcvs * SKEW_HASH_MEM_PERCENT) / 100;
774 
775  /* Now clamp to integer range */
776  skew_mcvs = Min(skew_mcvs, INT_MAX);
777 
778  *num_skew_mcvs = (int) skew_mcvs;
779 
780  /* Reduce hash_table_bytes by the amount needed for the skew table */
781  if (skew_mcvs > 0)
782  hash_table_bytes -= skew_mcvs * bytes_per_mcv;
783  }
784  else
785  *num_skew_mcvs = 0;
786 
787  /*
788  * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when
789  * memory is filled, assuming a single batch; but limit the value so that
790  * the pointer arrays we'll try to allocate do not exceed hash_table_bytes
791  * nor MaxAllocSize.
792  *
793  * Note that both nbuckets and nbatch must be powers of 2 to make
794  * ExecHashGetBucketAndBatch fast.
795  */
796  max_pointers = hash_table_bytes / sizeof(HashJoinTuple);
797  max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));
798  /* If max_pointers isn't a power of 2, must round it down to one */
799  max_pointers = pg_prevpower2_size_t(max_pointers);
800 
801  /* Also ensure we avoid integer overflow in nbatch and nbuckets */
802  /* (this step is redundant given the current value of MaxAllocSize) */
803  max_pointers = Min(max_pointers, INT_MAX / 2 + 1);
804 
805  dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
806  dbuckets = Min(dbuckets, max_pointers);
807  nbuckets = (int) dbuckets;
808  /* don't let nbuckets be really small, though ... */
809  nbuckets = Max(nbuckets, 1024);
810  /* ... and force it to be a power of 2. */
811  nbuckets = pg_nextpower2_32(nbuckets);
812 
813  /*
814  * If there's not enough space to store the projected number of tuples and
815  * the required bucket headers, we will need multiple batches.
816  */
817  bucket_bytes = sizeof(HashJoinTuple) * nbuckets;
818  if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
819  {
820  /* We'll need multiple batches */
821  size_t sbuckets;
822  double dbatch;
823  int minbatch;
824  size_t bucket_size;
825 
826  /*
827  * If Parallel Hash with combined hash_mem would still need multiple
828  * batches, we'll have to fall back to regular hash_mem budget.
829  */
830  if (try_combined_hash_mem)
831  {
832  ExecChooseHashTableSize(ntuples, tupwidth, useskew,
833  false, parallel_workers,
834  space_allowed,
835  numbuckets,
836  numbatches,
837  num_skew_mcvs);
838  return;
839  }
840 
841  /*
842  * Estimate the number of buckets we'll want to have when hash_mem is
843  * entirely full. Each bucket will contain a bucket pointer plus
844  * NTUP_PER_BUCKET tuples, whose projected size already includes
845  * overhead for the hash code, pointer to the next tuple, etc.
846  */
847  bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple));
848  if (hash_table_bytes <= bucket_size)
849  sbuckets = 1; /* avoid pg_nextpower2_size_t(0) */
850  else
851  sbuckets = pg_nextpower2_size_t(hash_table_bytes / bucket_size);
852  sbuckets = Min(sbuckets, max_pointers);
853  nbuckets = (int) sbuckets;
854  nbuckets = pg_nextpower2_32(nbuckets);
855  bucket_bytes = nbuckets * sizeof(HashJoinTuple);
856 
857  /*
858  * Buckets are simple pointers to hashjoin tuples, while tupsize
859  * includes the pointer, hash code, and MinimalTupleData. So buckets
860  * should never really exceed 25% of hash_mem (even for
861  * NTUP_PER_BUCKET=1); except maybe for hash_mem values that are not
862  * 2^N bytes, where we might get more because of doubling. So let's
863  * look for 50% here.
864  */
865  Assert(bucket_bytes <= hash_table_bytes / 2);
866 
867  /* Calculate required number of batches. */
868  dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
869  dbatch = Min(dbatch, max_pointers);
870  minbatch = (int) dbatch;
871  nbatch = pg_nextpower2_32(Max(2, minbatch));
872  }
873 
874  Assert(nbuckets > 0);
875  Assert(nbatch > 0);
876 
877  *numbuckets = nbuckets;
878  *numbatches = nbatch;
879 }
#define Min(x, y)
Definition: c.h:993
#define MAXALIGN(LEN)
Definition: c.h:800
#define Max(x, y)
Definition: c.h:987
struct HashJoinTupleData * HashJoinTuple
Definition: execnodes.h:2100
#define HJTUPLE_OVERHEAD
Definition: hashjoin.h:90
#define SKEW_BUCKET_OVERHEAD
Definition: hashjoin.h:119
#define SKEW_HASH_MEM_PERCENT
Definition: hashjoin.h:121
#define SizeofMinimalTupleHeader
Definition: htup_details.h:647
Assert(fmt[strlen(fmt) - 1] !='\n')
#define MaxAllocSize
Definition: memutils.h:40
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
Definition: nodeHash.c:681
#define NTUP_PER_BUCKET
Definition: nodeHash.c:678
size_t get_hash_memory_limit(void)
Definition: nodeHash.c:3592
static uint32 pg_nextpower2_32(uint32 num)
Definition: pg_bitutils.h:189
#define pg_nextpower2_size_t
Definition: pg_bitutils.h:339
#define pg_prevpower2_size_t
Definition: pg_bitutils.h:340

References Assert(), get_hash_memory_limit(), HJTUPLE_OVERHEAD, Max, MAXALIGN, MaxAllocSize, Min, NTUP_PER_BUCKET, pg_nextpower2_32(), pg_nextpower2_size_t, pg_prevpower2_size_t, SizeofMinimalTupleHeader, SKEW_BUCKET_OVERHEAD, and SKEW_HASH_MEM_PERCENT.

Referenced by ExecHashTableCreate(), and initial_cost_hashjoin().

◆ ExecEndHash()

void ExecEndHash ( HashState node)

Definition at line 414 of file nodeHash.c.

415 {
417 
418  /*
419  * free exprcontext
420  */
421  ExecFreeExprContext(&node->ps);
422 
423  /*
424  * shut down the subplan
425  */
426  outerPlan = outerPlanState(node);
428 }
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:557
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:658
#define outerPlanState(node)
Definition: execnodes.h:1133
#define outerPlan(node)
Definition: plannodes.h:183
PlanState ps
Definition: execnodes.h:2661

References ExecEndNode(), ExecFreeExprContext(), outerPlan, outerPlanState, and HashState::ps.

Referenced by ExecEndNode().

◆ ExecHashAccumInstrumentation()

void ExecHashAccumInstrumentation ( HashInstrumentation instrument,
HashJoinTable  hashtable 
)

Definition at line 2847 of file nodeHash.c.

2849 {
2850  instrument->nbuckets = Max(instrument->nbuckets,
2851  hashtable->nbuckets);
2852  instrument->nbuckets_original = Max(instrument->nbuckets_original,
2853  hashtable->nbuckets_original);
2854  instrument->nbatch = Max(instrument->nbatch,
2855  hashtable->nbatch);
2856  instrument->nbatch_original = Max(instrument->nbatch_original,
2857  hashtable->nbatch_original);
2858  instrument->space_peak = Max(instrument->space_peak,
2859  hashtable->spacePeak);
2860 }

References Max, HashJoinTableData::nbatch, HashInstrumentation::nbatch, HashJoinTableData::nbatch_original, HashInstrumentation::nbatch_original, HashJoinTableData::nbuckets, HashInstrumentation::nbuckets, HashJoinTableData::nbuckets_original, HashInstrumentation::nbuckets_original, HashInstrumentation::space_peak, and HashJoinTableData::spacePeak.

Referenced by ExecReScanHashJoin(), and ExecShutdownHash().

◆ ExecHashEstimate()

void ExecHashEstimate ( HashState node,
ParallelContext pcxt 
)

Definition at line 2731 of file nodeHash.c.

2732 {
2733  size_t size;
2734 
2735  /* don't need this if not instrumenting or no workers */
2736  if (!node->ps.instrument || pcxt->nworkers == 0)
2737  return;
2738 
2739  size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation));
2740  size = add_size(size, offsetof(SharedHashInfo, hinstrument));
2741  shm_toc_estimate_chunk(&pcxt->estimator, size);
2742  shm_toc_estimate_keys(&pcxt->estimator, 1);
2743 }
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
shm_toc_estimator estimator
Definition: parallel.h:42
Instrumentation * instrument
Definition: execnodes.h:1047

References add_size(), ParallelContext::estimator, PlanState::instrument, mul_size(), ParallelContext::nworkers, HashState::ps, shm_toc_estimate_chunk, and shm_toc_estimate_keys.

Referenced by ExecParallelEstimate().

◆ ExecHashGetBucketAndBatch()

void ExecHashGetBucketAndBatch ( HashJoinTable  hashtable,
uint32  hashvalue,
int *  bucketno,
int *  batchno 
)

Definition at line 1929 of file nodeHash.c.

1933 {
1934  uint32 nbuckets = (uint32) hashtable->nbuckets;
1935  uint32 nbatch = (uint32) hashtable->nbatch;
1936 
1937  if (nbatch > 1)
1938  {
1939  *bucketno = hashvalue & (nbuckets - 1);
1940  *batchno = pg_rotate_right32(hashvalue,
1941  hashtable->log2_nbuckets) & (nbatch - 1);
1942  }
1943  else
1944  {
1945  *bucketno = hashvalue & (nbuckets - 1);
1946  *batchno = 0;
1947  }
1948 }
unsigned int uint32
Definition: c.h:495
static uint32 pg_rotate_right32(uint32 word, int n)
Definition: pg_bitutils.h:320

References HashJoinTableData::log2_nbuckets, HashJoinTableData::nbatch, HashJoinTableData::nbuckets, and pg_rotate_right32().

Referenced by ExecHashIncreaseNumBatches(), ExecHashIncreaseNumBuckets(), ExecHashJoinImpl(), ExecHashRemoveNextSkewBucket(), ExecHashTableInsert(), ExecParallelHashIncreaseNumBuckets(), ExecParallelHashJoinPartitionOuter(), ExecParallelHashRepartitionFirst(), ExecParallelHashRepartitionRest(), ExecParallelHashTableInsert(), and ExecParallelHashTableInsertCurrentBatch().

◆ ExecHashGetHashValue()

bool ExecHashGetHashValue ( HashJoinTable  hashtable,
ExprContext econtext,
List hashkeys,
bool  outer_tuple,
bool  keep_nulls,
uint32 hashvalue 
)

Definition at line 1821 of file nodeHash.c.

1827 {
1828  uint32 hashkey = 0;
1829  FmgrInfo *hashfunctions;
1830  ListCell *hk;
1831  int i = 0;
1832  MemoryContext oldContext;
1833 
1834  /*
1835  * We reset the eval context each time to reclaim any memory leaked in the
1836  * hashkey expressions.
1837  */
1838  ResetExprContext(econtext);
1839 
1840  oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
1841 
1842  if (outer_tuple)
1843  hashfunctions = hashtable->outer_hashfunctions;
1844  else
1845  hashfunctions = hashtable->inner_hashfunctions;
1846 
1847  foreach(hk, hashkeys)
1848  {
1849  ExprState *keyexpr = (ExprState *) lfirst(hk);
1850  Datum keyval;
1851  bool isNull;
1852 
1853  /* combine successive hashkeys by rotating */
1854  hashkey = pg_rotate_left32(hashkey, 1);
1855 
1856  /*
1857  * Get the join attribute value of the tuple
1858  */
1859  keyval = ExecEvalExpr(keyexpr, econtext, &isNull);
1860 
1861  /*
1862  * If the attribute is NULL, and the join operator is strict, then
1863  * this tuple cannot pass the join qual so we can reject it
1864  * immediately (unless we're scanning the outside of an outer join, in
1865  * which case we must not reject it). Otherwise we act like the
1866  * hashcode of NULL is zero (this will support operators that act like
1867  * IS NOT DISTINCT, though not any more-random behavior). We treat
1868  * the hash support function as strict even if the operator is not.
1869  *
1870  * Note: currently, all hashjoinable operators must be strict since
1871  * the hash index AM assumes that. However, it takes so little extra
1872  * code here to allow non-strict that we may as well do it.
1873  */
1874  if (isNull)
1875  {
1876  if (hashtable->hashStrict[i] && !keep_nulls)
1877  {
1878  MemoryContextSwitchTo(oldContext);
1879  return false; /* cannot match */
1880  }
1881  /* else, leave hashkey unmodified, equivalent to hashcode 0 */
1882  }
1883  else
1884  {
1885  /* Compute the hash function */
1886  uint32 hkey;
1887 
1888  hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i], hashtable->collations[i], keyval));
1889  hashkey ^= hkey;
1890  }
1891 
1892  i++;
1893  }
1894 
1895  MemoryContextSwitchTo(oldContext);
1896 
1897  *hashvalue = hashkey;
1898  return true;
1899 }
#define ResetExprContext(econtext)
Definition: executor.h:543
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:332
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
Definition: fmgr.c:1112
int i
Definition: isn.c:73
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
static uint32 pg_rotate_left32(uint32 word, int n)
Definition: pg_bitutils.h:326
#define lfirst(lc)
Definition: pg_list.h:172
static uint32 DatumGetUInt32(Datum X)
Definition: postgres.h:222
uintptr_t Datum
Definition: postgres.h:64
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:257
Definition: fmgr.h:57
FmgrInfo * outer_hashfunctions
Definition: hashjoin.h:351
bool * hashStrict
Definition: hashjoin.h:353
FmgrInfo * inner_hashfunctions
Definition: hashjoin.h:352

References HashJoinTableData::collations, DatumGetUInt32(), ExprContext::ecxt_per_tuple_memory, ExecEvalExpr(), FunctionCall1Coll(), HashJoinTableData::hashStrict, i, HashJoinTableData::inner_hashfunctions, lfirst, MemoryContextSwitchTo(), HashJoinTableData::outer_hashfunctions, pg_rotate_left32(), and ResetExprContext.

Referenced by ExecHashJoinOuterGetTuple(), ExecParallelHashJoinOuterGetTuple(), ExecParallelHashJoinPartitionOuter(), MultiExecParallelHash(), and MultiExecPrivateHash().

◆ ExecHashGetSkewBucket()

int ExecHashGetSkewBucket ( HashJoinTable  hashtable,
uint32  hashvalue 
)

Definition at line 2525 of file nodeHash.c.

2526 {
2527  int bucket;
2528 
2529  /*
2530  * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization (in
2531  * particular, this happens after the initial batch is done).
2532  */
2533  if (!hashtable->skewEnabled)
2534  return INVALID_SKEW_BUCKET_NO;
2535 
2536  /*
2537  * Since skewBucketLen is a power of 2, we can do a modulo by ANDing.
2538  */
2539  bucket = hashvalue & (hashtable->skewBucketLen - 1);
2540 
2541  /*
2542  * While we have not hit a hole in the hashtable and have not hit the
2543  * desired bucket, we have collided with some other hash value, so try the
2544  * next bucket location.
2545  */
2546  while (hashtable->skewBucket[bucket] != NULL &&
2547  hashtable->skewBucket[bucket]->hashvalue != hashvalue)
2548  bucket = (bucket + 1) & (hashtable->skewBucketLen - 1);
2549 
2550  /*
2551  * Found the desired bucket?
2552  */
2553  if (hashtable->skewBucket[bucket] != NULL)
2554  return bucket;
2555 
2556  /*
2557  * There must not be any hashtable entry for this hash value.
2558  */
2559  return INVALID_SKEW_BUCKET_NO;
2560 }
#define INVALID_SKEW_BUCKET_NO
Definition: hashjoin.h:120
HashSkewBucket ** skewBucket
Definition: hashjoin.h:319
uint32 hashvalue
Definition: hashjoin.h:115

References HashSkewBucket::hashvalue, INVALID_SKEW_BUCKET_NO, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketLen, and HashJoinTableData::skewEnabled.

Referenced by ExecHashJoinImpl(), and MultiExecPrivateHash().

◆ ExecHashInitializeDSM()

void ExecHashInitializeDSM ( HashState node,
ParallelContext pcxt 
)

Definition at line 2750 of file nodeHash.c.

2751 {
2752  size_t size;
2753 
2754  /* don't need this if not instrumenting or no workers */
2755  if (!node->ps.instrument || pcxt->nworkers == 0)
2756  return;
2757 
2758  size = offsetof(SharedHashInfo, hinstrument) +
2759  pcxt->nworkers * sizeof(HashInstrumentation);
2760  node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size);
2761 
2762  /* Each per-worker area must start out as zeroes. */
2763  memset(node->shared_info, 0, size);
2764 
2765  node->shared_info->num_workers = pcxt->nworkers;
2766  shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id,
2767  node->shared_info);
2768 }
struct HashInstrumentation HashInstrumentation
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
SharedHashInfo * shared_info
Definition: execnodes.h:2671
shm_toc * toc
Definition: parallel.h:45
Plan * plan
Definition: execnodes.h:1037
int plan_node_id
Definition: plannodes.h:152

References PlanState::instrument, SharedHashInfo::num_workers, ParallelContext::nworkers, PlanState::plan, Plan::plan_node_id, HashState::ps, HashState::shared_info, shm_toc_allocate(), shm_toc_insert(), and ParallelContext::toc.

Referenced by ExecParallelInitializeDSM().

◆ ExecHashInitializeWorker()

void ExecHashInitializeWorker ( HashState node,
ParallelWorkerContext pwcxt 
)

Definition at line 2775 of file nodeHash.c.

2776 {
2777  SharedHashInfo *shared_info;
2778 
2779  /* don't need this if not instrumenting */
2780  if (!node->ps.instrument)
2781  return;
2782 
2783  /*
2784  * Find our entry in the shared area, and set up a pointer to it so that
2785  * we'll accumulate stats there when shutting down or rebuilding the hash
2786  * table.
2787  */
2788  shared_info = (SharedHashInfo *)
2789  shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
2790  node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber];
2791 }
int ParallelWorkerNumber
Definition: parallel.c:114
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
HashInstrumentation * hinstrument
Definition: execnodes.h:2678
HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER]
Definition: execnodes.h:2652

References SharedHashInfo::hinstrument, HashState::hinstrument, PlanState::instrument, ParallelWorkerNumber, PlanState::plan, Plan::plan_node_id, HashState::ps, shm_toc_lookup(), and ParallelWorkerContext::toc.

Referenced by ExecParallelInitializeWorker().

◆ ExecHashRetrieveInstrumentation()

void ExecHashRetrieveInstrumentation ( HashState node)

Definition at line 2816 of file nodeHash.c.

2817 {
2818  SharedHashInfo *shared_info = node->shared_info;
2819  size_t size;
2820 
2821  if (shared_info == NULL)
2822  return;
2823 
2824  /* Replace node->shared_info with a copy in backend-local memory. */
2825  size = offsetof(SharedHashInfo, hinstrument) +
2826  shared_info->num_workers * sizeof(HashInstrumentation);
2827  node->shared_info = palloc(size);
2828  memcpy(node->shared_info, shared_info, size);
2829 }
void * palloc(Size size)
Definition: mcxt.c:1226

References SharedHashInfo::num_workers, palloc(), and HashState::shared_info.

Referenced by ExecParallelRetrieveInstrumentation().

◆ ExecHashTableCreate()

HashJoinTable ExecHashTableCreate ( HashState state,
List hashOperators,
List hashCollations,
bool  keepNulls 
)

Definition at line 438 of file nodeHash.c.

439 {
440  Hash *node;
441  HashJoinTable hashtable;
442  Plan *outerNode;
443  size_t space_allowed;
444  int nbuckets;
445  int nbatch;
446  double rows;
447  int num_skew_mcvs;
448  int log2_nbuckets;
449  int nkeys;
450  int i;
451  ListCell *ho;
452  ListCell *hc;
453  MemoryContext oldcxt;
454 
455  /*
456  * Get information about the size of the relation to be hashed (it's the
457  * "outer" subtree of this node, but the inner relation of the hashjoin).
458  * Compute the appropriate size of the hash table.
459  */
460  node = (Hash *) state->ps.plan;
461  outerNode = outerPlan(node);
462 
463  /*
464  * If this is shared hash table with a partial plan, then we can't use
465  * outerNode->plan_rows to estimate its size. We need an estimate of the
466  * total number of rows across all copies of the partial plan.
467  */
468  rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows;
469 
470  ExecChooseHashTableSize(rows, outerNode->plan_width,
471  OidIsValid(node->skewTable),
472  state->parallel_state != NULL,
473  state->parallel_state != NULL ?
474  state->parallel_state->nparticipants - 1 : 0,
475  &space_allowed,
476  &nbuckets, &nbatch, &num_skew_mcvs);
477 
478  /* nbuckets must be a power of 2 */
479  log2_nbuckets = my_log2(nbuckets);
480  Assert(nbuckets == (1 << log2_nbuckets));
481 
482  /*
483  * Initialize the hash table control block.
484  *
485  * The hashtable control block is just palloc'd from the executor's
486  * per-query memory context. Everything else should be kept inside the
487  * subsidiary hashCxt, batchCxt or spillCxt.
488  */
489  hashtable = palloc_object(HashJoinTableData);
490  hashtable->nbuckets = nbuckets;
491  hashtable->nbuckets_original = nbuckets;
492  hashtable->nbuckets_optimal = nbuckets;
493  hashtable->log2_nbuckets = log2_nbuckets;
494  hashtable->log2_nbuckets_optimal = log2_nbuckets;
495  hashtable->buckets.unshared = NULL;
496  hashtable->keepNulls = keepNulls;
497  hashtable->skewEnabled = false;
498  hashtable->skewBucket = NULL;
499  hashtable->skewBucketLen = 0;
500  hashtable->nSkewBuckets = 0;
501  hashtable->skewBucketNums = NULL;
502  hashtable->nbatch = nbatch;
503  hashtable->curbatch = 0;
504  hashtable->nbatch_original = nbatch;
505  hashtable->nbatch_outstart = nbatch;
506  hashtable->growEnabled = true;
507  hashtable->totalTuples = 0;
508  hashtable->partialTuples = 0;
509  hashtable->skewTuples = 0;
510  hashtable->innerBatchFile = NULL;
511  hashtable->outerBatchFile = NULL;
512  hashtable->spaceUsed = 0;
513  hashtable->spacePeak = 0;
514  hashtable->spaceAllowed = space_allowed;
515  hashtable->spaceUsedSkew = 0;
516  hashtable->spaceAllowedSkew =
517  hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
518  hashtable->chunks = NULL;
519  hashtable->current_chunk = NULL;
520  hashtable->parallel_state = state->parallel_state;
521  hashtable->area = state->ps.state->es_query_dsa;
522  hashtable->batches = NULL;
523 
524 #ifdef HJDEBUG
525  printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
526  hashtable, nbatch, nbuckets);
527 #endif
528 
529  /*
530  * Create temporary memory contexts in which to keep the hashtable working
531  * storage. See notes in executor/hashjoin.h.
532  */
534  "HashTableContext",
536 
537  hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
538  "HashBatchContext",
540 
541  hashtable->spillCxt = AllocSetContextCreate(hashtable->hashCxt,
542  "HashSpillContext",
544 
545  /* Allocate data that will live for the life of the hashjoin */
546 
547  oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
548 
549  /*
550  * Get info about the hash functions to be used for each hash key. Also
551  * remember whether the join operators are strict.
552  */
553  nkeys = list_length(hashOperators);
554  hashtable->outer_hashfunctions = palloc_array(FmgrInfo, nkeys);
555  hashtable->inner_hashfunctions = palloc_array(FmgrInfo, nkeys);
556  hashtable->hashStrict = palloc_array(bool, nkeys);
557  hashtable->collations = palloc_array(Oid, nkeys);
558  i = 0;
559  forboth(ho, hashOperators, hc, hashCollations)
560  {
561  Oid hashop = lfirst_oid(ho);
562  Oid left_hashfn;
563  Oid right_hashfn;
564 
565  if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
566  elog(ERROR, "could not find hash function for hash operator %u",
567  hashop);
568  fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]);
569  fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]);
570  hashtable->hashStrict[i] = op_strict(hashop);
571  hashtable->collations[i] = lfirst_oid(hc);
572  i++;
573  }
574 
575  if (nbatch > 1 && hashtable->parallel_state == NULL)
576  {
577  MemoryContext oldctx;
578 
579  /*
580  * allocate and initialize the file arrays in hashCxt (not needed for
581  * parallel case which uses shared tuplestores instead of raw files)
582  */
583  oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
584 
585  hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
586  hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
587 
588  MemoryContextSwitchTo(oldctx);
589 
590  /* The files will not be opened until needed... */
591  /* ... but make sure we have temp tablespaces established for them */
593  }
594 
595  MemoryContextSwitchTo(oldcxt);
596 
597  if (hashtable->parallel_state)
598  {
599  ParallelHashJoinState *pstate = hashtable->parallel_state;
600  Barrier *build_barrier;
601 
602  /*
603  * Attach to the build barrier. The corresponding detach operation is
604  * in ExecHashTableDetach. Note that we won't attach to the
605  * batch_barrier for batch 0 yet. We'll attach later and start it out
606  * in PHJ_BATCH_PROBE phase, because batch 0 is allocated up front and
607  * then loaded while hashing (the standard hybrid hash join
608  * algorithm), and we'll coordinate that using build_barrier.
609  */
610  build_barrier = &pstate->build_barrier;
611  BarrierAttach(build_barrier);
612 
613  /*
614  * So far we have no idea whether there are any other participants,
615  * and if so, what phase they are working on. The only thing we care
616  * about at this point is whether someone has already created the
617  * SharedHashJoinBatch objects and the hash table for batch 0. One
618  * backend will be elected to do that now if necessary.
619  */
620  if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECT &&
621  BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECT))
622  {
623  pstate->nbatch = nbatch;
624  pstate->space_allowed = space_allowed;
625  pstate->growth = PHJ_GROWTH_OK;
626 
627  /* Set up the shared state for coordinating batches. */
628  ExecParallelHashJoinSetUpBatches(hashtable, nbatch);
629 
630  /*
631  * Allocate batch 0's hash table up front so we can load it
632  * directly while hashing.
633  */
634  pstate->nbuckets = nbuckets;
635  ExecParallelHashTableAlloc(hashtable, 0);
636  }
637 
638  /*
639  * The next Parallel Hash synchronization point is in
640  * MultiExecParallelHash(), which will progress it all the way to
641  * PHJ_BUILD_RUN. The caller must not return control from this
642  * executor node between now and then.
643  */
644  }
645  else
646  {
647  /*
648  * Prepare context for the first-scan space allocations; allocate the
649  * hashbucket array therein, and set each bucket "empty".
650  */
651  MemoryContextSwitchTo(hashtable->batchCxt);
652 
653  hashtable->buckets.unshared = palloc0_array(HashJoinTuple, nbuckets);
654 
655  /*
656  * Set up for skew optimization, if possible and there's a need for
657  * more than one batch. (In a one-batch join, there's no point in
658  * it.)
659  */
660  if (nbatch > 1)
661  ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
662 
663  MemoryContextSwitchTo(oldcxt);
664  }
665 
666  return hashtable;
667 }
void PrepareTempTablespaces(void)
Definition: tablespace.c:1337
int BarrierAttach(Barrier *barrier)
Definition: barrier.c:236
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:265
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
Definition: barrier.c:125
#define OidIsValid(objectId)
Definition: c.h:764
int my_log2(long num)
Definition: dynahash.c:1760
#define ERROR
Definition: elog.h:39
#define palloc_object(type)
Definition: fe_memutils.h:62
#define palloc_array(type, count)
Definition: fe_memutils.h:64
#define palloc0_array(type, count)
Definition: fe_memutils.h:65
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:127
@ PHJ_GROWTH_OK
Definition: hashjoin.h:233
#define PHJ_BUILD_ELECT
Definition: hashjoin.h:269
bool op_strict(Oid opno)
Definition: lsyscache.c:1481
bool get_op_hash_functions(Oid opno, RegProcedure *lhs_procno, RegProcedure *rhs_procno)
Definition: lsyscache.c:509
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
Definition: nodeHash.c:2372
static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
Definition: nodeHash.c:3094
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
Definition: nodeHash.c:3259
static int list_length(const List *l)
Definition: pg_list.h:152
#define forboth(cell1, list1, cell2, list2)
Definition: pg_list.h:467
#define lfirst_oid(lc)
Definition: pg_list.h:174
#define printf(...)
Definition: port.h:244
unsigned int Oid
Definition: postgres_ext.h:31
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:311
HashMemoryChunk chunks
Definition: hashjoin.h:367
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:373
MemoryContext hashCxt
Definition: hashjoin.h:362
double totalTuples
Definition: hashjoin.h:332
double partialTuples
Definition: hashjoin.h:333
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:372
MemoryContext spillCxt
Definition: hashjoin.h:364
HashMemoryChunk current_chunk
Definition: hashjoin.h:370
Size spaceAllowedSkew
Definition: hashjoin.h:360
int * skewBucketNums
Definition: hashjoin.h:322
BufFile ** innerBatchFile
Definition: hashjoin.h:343
int log2_nbuckets_optimal
Definition: hashjoin.h:305
dsa_area * area
Definition: hashjoin.h:371
BufFile ** outerBatchFile
Definition: hashjoin.h:344
MemoryContext batchCxt
Definition: hashjoin.h:363
double skewTuples
Definition: hashjoin.h:334
union HashJoinTableData::@99 buckets
Oid skewTable
Definition: plannodes.h:1204
Cardinality rows_total
Definition: plannodes.h:1208
Plan plan
Definition: plannodes.h:1197
ParallelHashGrowth growth
Definition: hashjoin.h:253
bool parallel_aware
Definition: plannodes.h:141
int plan_width
Definition: plannodes.h:136
Cardinality plan_rows
Definition: plannodes.h:135
Definition: regguts.h:323

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, HashJoinTableData::area, Assert(), BarrierArriveAndWait(), BarrierAttach(), BarrierPhase(), HashJoinTableData::batchCxt, HashJoinTableData::batches, HashJoinTableData::buckets, ParallelHashJoinState::build_barrier, HashJoinTableData::chunks, HashJoinTableData::collations, HashJoinTableData::curbatch, HashJoinTableData::current_chunk, CurrentMemoryContext, elog(), ERROR, ExecChooseHashTableSize(), ExecHashBuildSkewHash(), ExecParallelHashJoinSetUpBatches(), ExecParallelHashTableAlloc(), fmgr_info(), forboth, get_op_hash_functions(), HashJoinTableData::growEnabled, ParallelHashJoinState::growth, HashJoinTableData::hashCxt, HashJoinTableData::hashStrict, i, HashJoinTableData::inner_hashfunctions, HashJoinTableData::innerBatchFile, HashJoinTableData::keepNulls, lfirst_oid, list_length(), HashJoinTableData::log2_nbuckets, HashJoinTableData::log2_nbuckets_optimal, MemoryContextSwitchTo(), my_log2(), ParallelHashJoinState::nbatch, HashJoinTableData::nbatch, HashJoinTableData::nbatch_original, HashJoinTableData::nbatch_outstart, ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, HashJoinTableData::nbuckets_optimal, HashJoinTableData::nbuckets_original, HashJoinTableData::nSkewBuckets, OidIsValid, op_strict(), HashJoinTableData::outer_hashfunctions, HashJoinTableData::outerBatchFile, outerPlan, palloc0_array, palloc_array, palloc_object, Plan::parallel_aware, HashJoinTableData::parallel_state, HashJoinTableData::partialTuples, PHJ_BUILD_ELECT, PHJ_GROWTH_OK, Hash::plan, Plan::plan_rows, Plan::plan_width, PrepareTempTablespaces(), printf, Hash::rows_total, SKEW_HASH_MEM_PERCENT, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketLen, HashJoinTableData::skewBucketNums, HashJoinTableData::skewEnabled, Hash::skewTable, HashJoinTableData::skewTuples, ParallelHashJoinState::space_allowed, HashJoinTableData::spaceAllowed, HashJoinTableData::spaceAllowedSkew, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, HashJoinTableData::spaceUsedSkew, HashJoinTableData::spillCxt, HashJoinTableData::totalTuples, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

◆ ExecHashTableDestroy()

void ExecHashTableDestroy ( HashJoinTable  hashtable)

Definition at line 889 of file nodeHash.c.

890 {
891  int i;
892 
893  /*
894  * Make sure all the temp files are closed. We skip batch 0, since it
895  * can't have any temp files (and the arrays might not even exist if
896  * nbatch is only 1). Parallel hash joins don't use these files.
897  */
898  if (hashtable->innerBatchFile != NULL)
899  {
900  for (i = 1; i < hashtable->nbatch; i++)
901  {
902  if (hashtable->innerBatchFile[i])
903  BufFileClose(hashtable->innerBatchFile[i]);
904  if (hashtable->outerBatchFile[i])
905  BufFileClose(hashtable->outerBatchFile[i]);
906  }
907  }
908 
909  /* Release working memory (batchCxt is a child, so it goes away too) */
910  MemoryContextDelete(hashtable->hashCxt);
911 
912  /* And drop the control block */
913  pfree(hashtable);
914 }
void BufFileClose(BufFile *file)
Definition: buffile.c:412
void pfree(void *pointer)
Definition: mcxt.c:1456
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:403

References BufFileClose(), HashJoinTableData::hashCxt, i, HashJoinTableData::innerBatchFile, MemoryContextDelete(), HashJoinTableData::nbatch, HashJoinTableData::outerBatchFile, and pfree().

Referenced by ExecEndHashJoin(), and ExecReScanHashJoin().

◆ ExecHashTableDetach()

void ExecHashTableDetach ( HashJoinTable  hashtable)

Definition at line 3371 of file nodeHash.c.

3372 {
3373  ParallelHashJoinState *pstate = hashtable->parallel_state;
3374 
3375  /*
3376  * If we're involved in a parallel query, we must either have gotten all
3377  * the way to PHJ_BUILD_RUN, or joined too late and be in PHJ_BUILD_FREE.
3378  */
3379  Assert(!pstate ||
3381 
3382  if (pstate && BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_RUN)
3383  {
3384  int i;
3385 
3386  /* Make sure any temporary files are closed. */
3387  if (hashtable->batches)
3388  {
3389  for (i = 0; i < hashtable->nbatch; ++i)
3390  {
3391  sts_end_write(hashtable->batches[i].inner_tuples);
3392  sts_end_write(hashtable->batches[i].outer_tuples);
3395  }
3396  }
3397 
3398  /* If we're last to detach, clean up shared memory. */
3399  if (BarrierArriveAndDetach(&pstate->build_barrier))
3400  {
3401  /*
3402  * Late joining processes will see this state and give up
3403  * immediately.
3404  */
3406 
3407  if (DsaPointerIsValid(pstate->batches))
3408  {
3409  dsa_free(hashtable->area, pstate->batches);
3410  pstate->batches = InvalidDsaPointer;
3411  }
3412  }
3413  }
3414  hashtable->parallel_state = NULL;
3415 }
bool BarrierArriveAndDetach(Barrier *barrier)
Definition: barrier.c:203
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:833
#define InvalidDsaPointer
Definition: dsa.h:78
#define DsaPointerIsValid(x)
Definition: dsa.h:81
#define PHJ_BUILD_FREE
Definition: hashjoin.h:274
#define PHJ_BUILD_RUN
Definition: hashjoin.h:273
void sts_end_write(SharedTuplestoreAccessor *accessor)
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * outer_tuples
Definition: hashjoin.h:221
SharedTuplestoreAccessor * inner_tuples
Definition: hashjoin.h:220
dsa_pointer batches
Definition: hashjoin.h:248

References HashJoinTableData::area, Assert(), BarrierArriveAndDetach(), BarrierPhase(), ParallelHashJoinState::batches, HashJoinTableData::batches, ParallelHashJoinState::build_barrier, dsa_free(), DsaPointerIsValid, i, ParallelHashJoinBatchAccessor::inner_tuples, InvalidDsaPointer, HashJoinTableData::nbatch, ParallelHashJoinBatchAccessor::outer_tuples, HashJoinTableData::parallel_state, PHJ_BUILD_FREE, PHJ_BUILD_RUN, sts_end_parallel_scan(), and sts_end_write().

Referenced by ExecHashJoinReInitializeDSM(), and ExecShutdownHashJoin().

◆ ExecHashTableDetachBatch()

void ExecHashTableDetachBatch ( HashJoinTable  hashtable)

Definition at line 3279 of file nodeHash.c.

3280 {
3281  if (hashtable->parallel_state != NULL &&
3282  hashtable->curbatch >= 0)
3283  {
3284  int curbatch = hashtable->curbatch;
3285  ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
3286  bool attached = true;
3287 
3288  /* Make sure any temporary files are closed. */
3289  sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
3290  sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
3291 
3292  /* After attaching we always get at least to PHJ_BATCH_PROBE. */
3295 
3296  /*
3297  * If we're abandoning the PHJ_BATCH_PROBE phase early without having
3298  * reached the end of it, it means the plan doesn't want any more
3299  * tuples, and it is happy to abandon any tuples buffered in this
3300  * process's subplans. For correctness, we can't allow any process to
3301  * execute the PHJ_BATCH_SCAN phase, because we will never have the
3302  * complete set of match bits. Therefore we skip emitting unmatched
3303  * tuples in all backends (if this is a full/right join), as if those
3304  * tuples were all due to be emitted by this process and it has
3305  * abandoned them too.
3306  */
3307  if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE &&
3308  !hashtable->batches[curbatch].outer_eof)
3309  {
3310  /*
3311  * This flag may be written to by multiple backends during
3312  * PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN
3313  * phase so requires no extra locking.
3314  */
3315  batch->skip_unmatched = true;
3316  }
3317 
3318  /*
3319  * Even if we aren't doing a full/right outer join, we'll step through
3320  * the PHJ_BATCH_SCAN phase just to maintain the invariant that
3321  * freeing happens in PHJ_BATCH_FREE, but that'll be wait-free.
3322  */
3323  if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE)
3324  attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
3325  if (attached && BarrierArriveAndDetach(&batch->batch_barrier))
3326  {
3327  /*
3328  * We are not longer attached to the batch barrier, but we're the
3329  * process that was chosen to free resources and it's safe to
3330  * assert the current phase. The ParallelHashJoinBatch can't go
3331  * away underneath us while we are attached to the build barrier,
3332  * making this access safe.
3333  */
3335 
3336  /* Free shared chunks and buckets. */
3337  while (DsaPointerIsValid(batch->chunks))
3338  {
3339  HashMemoryChunk chunk =
3340  dsa_get_address(hashtable->area, batch->chunks);
3341  dsa_pointer next = chunk->next.shared;
3342 
3343  dsa_free(hashtable->area, batch->chunks);
3344  batch->chunks = next;
3345  }
3346  if (DsaPointerIsValid(batch->buckets))
3347  {
3348  dsa_free(hashtable->area, batch->buckets);
3349  batch->buckets = InvalidDsaPointer;
3350  }
3351  }
3352 
3353  /*
3354  * Track the largest batch we've been attached to. Though each
3355  * backend might see a different subset of batches, explain.c will
3356  * scan the results from all backends to find the largest value.
3357  */
3358  hashtable->spacePeak =
3359  Max(hashtable->spacePeak,
3360  batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
3361 
3362  /* Remember that we are not attached to a batch. */
3363  hashtable->curbatch = -1;
3364  }
3365 }
bool BarrierArriveAndDetachExceptLast(Barrier *barrier)
Definition: barrier.c:213
static int32 next
Definition: blutils.c:219
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:949
uint64 dsa_pointer
Definition: dsa.h:62
#define PHJ_BATCH_SCAN
Definition: hashjoin.h:281
#define PHJ_BATCH_PROBE
Definition: hashjoin.h:280
#define PHJ_BATCH_FREE
Definition: hashjoin.h:282
dsa_pointer shared
Definition: hashjoin.h:138
union HashMemoryChunkData::@98 next
ParallelHashJoinBatch * shared
Definition: hashjoin.h:209
dsa_pointer chunks
Definition: hashjoin.h:167
dsa_pointer buckets
Definition: hashjoin.h:164

References HashJoinTableData::area, Assert(), BarrierArriveAndDetach(), BarrierArriveAndDetachExceptLast(), BarrierPhase(), ParallelHashJoinBatch::batch_barrier, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, ParallelHashJoinBatch::chunks, HashJoinTableData::curbatch, dsa_free(), dsa_get_address(), DsaPointerIsValid, ParallelHashJoinBatchAccessor::inner_tuples, InvalidDsaPointer, Max, HashJoinTableData::nbuckets, next, HashMemoryChunkData::next, ParallelHashJoinBatchAccessor::outer_eof, ParallelHashJoinBatchAccessor::outer_tuples, HashJoinTableData::parallel_state, PHJ_BATCH_FREE, PHJ_BATCH_PROBE, PHJ_BATCH_SCAN, HashMemoryChunkData::shared, ParallelHashJoinBatchAccessor::shared, ParallelHashJoinBatch::size, ParallelHashJoinBatch::skip_unmatched, HashJoinTableData::spacePeak, and sts_end_parallel_scan().

Referenced by ExecHashJoinReInitializeDSM(), ExecParallelHashJoinNewBatch(), ExecParallelPrepHashTableForUnmatched(), and ExecShutdownHashJoin().

◆ ExecHashTableInsert()

void ExecHashTableInsert ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1621 of file nodeHash.c.

1624 {
1625  bool shouldFree;
1626  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1627  int bucketno;
1628  int batchno;
1629 
1630  ExecHashGetBucketAndBatch(hashtable, hashvalue,
1631  &bucketno, &batchno);
1632 
1633  /*
1634  * decide whether to put the tuple in the hash table or a temp file
1635  */
1636  if (batchno == hashtable->curbatch)
1637  {
1638  /*
1639  * put the tuple in hash table
1640  */
1641  HashJoinTuple hashTuple;
1642  int hashTupleSize;
1643  double ntuples = (hashtable->totalTuples - hashtable->skewTuples);
1644 
1645  /* Create the HashJoinTuple */
1646  hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
1647  hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
1648 
1649  hashTuple->hashvalue = hashvalue;
1650  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1651 
1652  /*
1653  * We always reset the tuple-matched flag on insertion. This is okay
1654  * even when reloading a tuple from a batch file, since the tuple
1655  * could not possibly have been matched to an outer tuple before it
1656  * went into the batch file.
1657  */
1659 
1660  /* Push it onto the front of the bucket's list */
1661  hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
1662  hashtable->buckets.unshared[bucketno] = hashTuple;
1663 
1664  /*
1665  * Increase the (optimal) number of buckets if we just exceeded the
1666  * NTUP_PER_BUCKET threshold, but only when there's still a single
1667  * batch.
1668  */
1669  if (hashtable->nbatch == 1 &&
1670  ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))
1671  {
1672  /* Guard against integer overflow and alloc size overflow */
1673  if (hashtable->nbuckets_optimal <= INT_MAX / 2 &&
1674  hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple))
1675  {
1676  hashtable->nbuckets_optimal *= 2;
1677  hashtable->log2_nbuckets_optimal += 1;
1678  }
1679  }
1680 
1681  /* Account for space used, and back off if we've used too much */
1682  hashtable->spaceUsed += hashTupleSize;
1683  if (hashtable->spaceUsed > hashtable->spacePeak)
1684  hashtable->spacePeak = hashtable->spaceUsed;
1685  if (hashtable->spaceUsed +
1686  hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
1687  > hashtable->spaceAllowed)
1688  ExecHashIncreaseNumBatches(hashtable);
1689  }
1690  else
1691  {
1692  /*
1693  * put the tuple into a temp file for later batches
1694  */
1695  Assert(batchno > hashtable->curbatch);
1696  ExecHashJoinSaveTuple(tuple,
1697  hashvalue,
1698  &hashtable->innerBatchFile[batchno],
1699  hashtable);
1700  }
1701 
1702  if (shouldFree)
1703  heap_free_minimal_tuple(tuple);
1704 }
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1693
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:91
void heap_free_minimal_tuple(MinimalTuple mtup)
Definition: heaptuple.c:1515
#define HeapTupleHeaderClearMatch(tup)
Definition: htup_details.h:524
static void * dense_alloc(HashJoinTable hashtable, Size size)
Definition: nodeHash.c:2866
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
Definition: nodeHash.c:922
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
Definition: nodeHash.c:1929
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr, HashJoinTable hashtable)
uint32 hashvalue
Definition: hashjoin.h:86
union HashJoinTupleData::@97 next
struct HashJoinTupleData * unshared
Definition: hashjoin.h:83

References Assert(), HashJoinTableData::buckets, HashJoinTableData::curbatch, dense_alloc(), ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecHashIncreaseNumBatches(), ExecHashJoinSaveTuple(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, HashJoinTableData::innerBatchFile, HashJoinTableData::log2_nbuckets_optimal, MaxAllocSize, HashJoinTableData::nbatch, HashJoinTableData::nbuckets_optimal, HashJoinTupleData::next, NTUP_PER_BUCKET, HashJoinTableData::skewTuples, HashJoinTableData::spaceAllowed, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, MinimalTupleData::t_len, HashJoinTableData::totalTuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinNewBatch(), and MultiExecPrivateHash().

◆ ExecHashTableReset()

void ExecHashTableReset ( HashJoinTable  hashtable)

Definition at line 2296 of file nodeHash.c.

2297 {
2298  MemoryContext oldcxt;
2299  int nbuckets = hashtable->nbuckets;
2300 
2301  /*
2302  * Release all the hash buckets and tuples acquired in the prior pass, and
2303  * reinitialize the context for a new pass.
2304  */
2305  MemoryContextReset(hashtable->batchCxt);
2306  oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
2307 
2308  /* Reallocate and reinitialize the hash bucket headers. */
2309  hashtable->buckets.unshared = palloc0_array(HashJoinTuple, nbuckets);
2310 
2311  hashtable->spaceUsed = 0;
2312 
2313  MemoryContextSwitchTo(oldcxt);
2314 
2315  /* Forget the chunks (the memory was freed by the context reset above). */
2316  hashtable->chunks = NULL;
2317 }
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:330

References HashJoinTableData::batchCxt, HashJoinTableData::buckets, HashJoinTableData::chunks, MemoryContextReset(), MemoryContextSwitchTo(), HashJoinTableData::nbuckets, palloc0_array, HashJoinTableData::spaceUsed, and HashJoinTableData::unshared.

Referenced by ExecHashJoinNewBatch().

◆ ExecHashTableResetMatchFlags()

void ExecHashTableResetMatchFlags ( HashJoinTable  hashtable)

Definition at line 2324 of file nodeHash.c.

2325 {
2326  HashJoinTuple tuple;
2327  int i;
2328 
2329  /* Reset all flags in the main table ... */
2330  for (i = 0; i < hashtable->nbuckets; i++)
2331  {
2332  for (tuple = hashtable->buckets.unshared[i]; tuple != NULL;
2333  tuple = tuple->next.unshared)
2335  }
2336 
2337  /* ... and the same for the skew buckets, if any */
2338  for (i = 0; i < hashtable->nSkewBuckets; i++)
2339  {
2340  int j = hashtable->skewBucketNums[i];
2341  HashSkewBucket *skewBucket = hashtable->skewBucket[j];
2342 
2343  for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared)
2345  }
2346 }
int j
Definition: isn.c:74
HashJoinTuple tuples
Definition: hashjoin.h:116

References HashJoinTableData::buckets, HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, i, j, HashJoinTableData::nbuckets, HashJoinTupleData::next, HashJoinTableData::nSkewBuckets, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketNums, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecReScanHashJoin().

◆ ExecInitHash()

HashState* ExecInitHash ( Hash node,
EState estate,
int  eflags 
)

Definition at line 361 of file nodeHash.c.

362 {
363  HashState *hashstate;
364 
365  /* check for unsupported flags */
366  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
367 
368  /*
369  * create state structure
370  */
371  hashstate = makeNode(HashState);
372  hashstate->ps.plan = (Plan *) node;
373  hashstate->ps.state = estate;
374  hashstate->ps.ExecProcNode = ExecHash;
375  hashstate->hashtable = NULL;
376  hashstate->hashkeys = NIL; /* will be set by parent HashJoin */
377 
378  /*
379  * Miscellaneous initialization
380  *
381  * create expression context for node
382  */
383  ExecAssignExprContext(estate, &hashstate->ps);
384 
385  /*
386  * initialize child nodes
387  */
388  outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags);
389 
390  /*
391  * initialize our result slot and type. No need to build projection
392  * because this node doesn't do projections.
393  */
395  hashstate->ps.ps_ProjInfo = NULL;
396 
397  /*
398  * initialize child expressions
399  */
400  Assert(node->plan.qual == NIL);
401  hashstate->hashkeys =
402  ExecInitExprList(node->hashkeys, (PlanState *) hashstate);
403 
404  return hashstate;
405 }
List * ExecInitExprList(List *nodes, PlanState *parent)
Definition: execExpr.c:323
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:142
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1800
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:85
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:488
#define EXEC_FLAG_BACKWARD
Definition: executor.h:68
#define EXEC_FLAG_MARK
Definition: executor.h:69
static TupleTableSlot * ExecHash(PlanState *pstate)
Definition: nodeHash.c:92
#define makeNode(_type_)
Definition: nodes.h:176
#define NIL
Definition: pg_list.h:68
HashJoinTable hashtable
Definition: execnodes.h:2662
List * hashkeys
Definition: execnodes.h:2663
List * hashkeys
Definition: plannodes.h:1203
EState * state
Definition: execnodes.h:1039
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:1077
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:1043
List * qual
Definition: plannodes.h:154

References Assert(), EXEC_FLAG_BACKWARD, EXEC_FLAG_MARK, ExecAssignExprContext(), ExecHash(), ExecInitExprList(), ExecInitNode(), ExecInitResultTupleSlotTL(), PlanState::ExecProcNode, HashState::hashkeys, Hash::hashkeys, HashState::hashtable, makeNode, NIL, outerPlan, outerPlanState, PlanState::plan, Hash::plan, HashState::ps, PlanState::ps_ProjInfo, Plan::qual, PlanState::state, and TTSOpsMinimalTuple.

Referenced by ExecInitNode().

◆ ExecParallelHashTableAlloc()

void ExecParallelHashTableAlloc ( HashJoinTable  hashtable,
int  batchno 
)

Definition at line 3259 of file nodeHash.c.

3260 {
3261  ParallelHashJoinBatch *batch = hashtable->batches[batchno].shared;
3262  dsa_pointer_atomic *buckets;
3263  int nbuckets = hashtable->parallel_state->nbuckets;
3264  int i;
3265 
3266  batch->buckets =
3267  dsa_allocate(hashtable->area, sizeof(dsa_pointer_atomic) * nbuckets);
3268  buckets = (dsa_pointer_atomic *)
3269  dsa_get_address(hashtable->area, batch->buckets);
3270  for (i = 0; i < nbuckets; ++i)
3272 }
#define dsa_pointer_atomic_init
Definition: dsa.h:64
#define dsa_allocate(area, size)
Definition: dsa.h:84

References HashJoinTableData::area, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, dsa_allocate, dsa_get_address(), dsa_pointer_atomic_init, i, InvalidDsaPointer, ParallelHashJoinState::nbuckets, HashJoinTableData::parallel_state, and ParallelHashJoinBatchAccessor::shared.

Referenced by ExecHashTableCreate(), and ExecParallelHashJoinNewBatch().

◆ ExecParallelHashTableInsert()

void ExecParallelHashTableInsert ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1711 of file nodeHash.c.

1714 {
1715  bool shouldFree;
1716  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1717  dsa_pointer shared;
1718  int bucketno;
1719  int batchno;
1720 
1721 retry:
1722  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1723 
1724  if (batchno == 0)
1725  {
1726  HashJoinTuple hashTuple;
1727 
1728  /* Try to load it into memory. */
1731  hashTuple = ExecParallelHashTupleAlloc(hashtable,
1732  HJTUPLE_OVERHEAD + tuple->t_len,
1733  &shared);
1734  if (hashTuple == NULL)
1735  goto retry;
1736 
1737  /* Store the hash value in the HashJoinTuple header. */
1738  hashTuple->hashvalue = hashvalue;
1739  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1741 
1742  /* Push it onto the front of the bucket's list */
1743  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1744  hashTuple, shared);
1745  }
1746  else
1747  {
1748  size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
1749 
1750  Assert(batchno > 0);
1751 
1752  /* Try to preallocate space in the batch if necessary. */
1753  if (hashtable->batches[batchno].preallocated < tuple_size)
1754  {
1755  if (!ExecParallelHashTuplePrealloc(hashtable, batchno, tuple_size))
1756  goto retry;
1757  }
1758 
1759  Assert(hashtable->batches[batchno].preallocated >= tuple_size);
1760  hashtable->batches[batchno].preallocated -= tuple_size;
1761  sts_puttuple(hashtable->batches[batchno].inner_tuples, &hashvalue,
1762  tuple);
1763  }
1764  ++hashtable->batches[batchno].ntuples;
1765 
1766  if (shouldFree)
1767  heap_free_minimal_tuple(tuple);
1768 }
#define PHJ_BUILD_HASH_INNER
Definition: hashjoin.h:271
static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
Definition: nodeHash.c:3531
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
Definition: nodeHash.c:2946
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
Definition: nodeHash.c:3451
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
dsa_pointer_atomic * shared
Definition: hashjoin.h:313

References Assert(), BarrierPhase(), HashJoinTableData::batches, HashJoinTableData::buckets, ParallelHashJoinState::build_barrier, ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecParallelHashPushTuple(), ExecParallelHashTupleAlloc(), ExecParallelHashTuplePrealloc(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, ParallelHashJoinBatchAccessor::inner_tuples, MAXALIGN, ParallelHashJoinBatchAccessor::ntuples, HashJoinTableData::parallel_state, PHJ_BUILD_HASH_INNER, ParallelHashJoinBatchAccessor::preallocated, HashJoinTableData::shared, sts_puttuple(), and MinimalTupleData::t_len.

Referenced by MultiExecParallelHash().

◆ ExecParallelHashTableInsertCurrentBatch()

void ExecParallelHashTableInsertCurrentBatch ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1777 of file nodeHash.c.

1780 {
1781  bool shouldFree;
1782  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1783  HashJoinTuple hashTuple;
1784  dsa_pointer shared;
1785  int batchno;
1786  int bucketno;
1787 
1788  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1789  Assert(batchno == hashtable->curbatch);
1790  hashTuple = ExecParallelHashTupleAlloc(hashtable,
1791  HJTUPLE_OVERHEAD + tuple->t_len,
1792  &shared);
1793  hashTuple->hashvalue = hashvalue;
1794  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1796  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1797  hashTuple, shared);
1798 
1799  if (shouldFree)
1800  heap_free_minimal_tuple(tuple);
1801 }

References Assert(), HashJoinTableData::buckets, HashJoinTableData::curbatch, ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecParallelHashPushTuple(), ExecParallelHashTupleAlloc(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, HashJoinTableData::shared, and MinimalTupleData::t_len.

Referenced by ExecParallelHashJoinNewBatch().

◆ ExecParallelHashTableSetCurrentBatch()

void ExecParallelHashTableSetCurrentBatch ( HashJoinTable  hashtable,
int  batchno 
)

Definition at line 3469 of file nodeHash.c.

3470 {
3471  Assert(hashtable->batches[batchno].shared->buckets != InvalidDsaPointer);
3472 
3473  hashtable->curbatch = batchno;
3474  hashtable->buckets.shared = (dsa_pointer_atomic *)
3475  dsa_get_address(hashtable->area,
3476  hashtable->batches[batchno].shared->buckets);
3477  hashtable->nbuckets = hashtable->parallel_state->nbuckets;
3478  hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
3479  hashtable->current_chunk = NULL;
3481  hashtable->batches[batchno].at_least_one_chunk = false;
3482 }
dsa_pointer current_chunk_shared
Definition: hashjoin.h:374

References HashJoinTableData::area, Assert(), ParallelHashJoinBatchAccessor::at_least_one_chunk, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, HashJoinTableData::buckets, HashJoinTableData::curbatch, HashJoinTableData::current_chunk, HashJoinTableData::current_chunk_shared, dsa_get_address(), InvalidDsaPointer, HashJoinTableData::log2_nbuckets, my_log2(), ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, HashJoinTableData::parallel_state, ParallelHashJoinBatchAccessor::shared, and HashJoinTableData::shared.

Referenced by ExecParallelHashIncreaseNumBatches(), ExecParallelHashIncreaseNumBuckets(), ExecParallelHashJoinNewBatch(), and MultiExecParallelHash().

◆ ExecParallelPrepHashTableForUnmatched()

bool ExecParallelPrepHashTableForUnmatched ( HashJoinState hjstate)

Definition at line 2094 of file nodeHash.c.

2095 {
2096  HashJoinTable hashtable = hjstate->hj_HashTable;
2097  int curbatch = hashtable->curbatch;
2098  ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
2099 
2101 
2102  /*
2103  * It would not be deadlock-free to wait on the batch barrier, because it
2104  * is in PHJ_BATCH_PROBE phase, and thus processes attached to it have
2105  * already emitted tuples. Therefore, we'll hold a wait-free election:
2106  * only one process can continue to the next phase, and all others detach
2107  * from this batch. They can still go any work on other batches, if there
2108  * are any.
2109  */
2111  {
2112  /* This process considers the batch to be done. */
2113  hashtable->batches[hashtable->curbatch].done = true;
2114 
2115  /* Make sure any temporary files are closed. */
2116  sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
2117  sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
2118 
2119  /*
2120  * Track largest batch we've seen, which would normally happen in
2121  * ExecHashTableDetachBatch().
2122  */
2123  hashtable->spacePeak =
2124  Max(hashtable->spacePeak,
2125  batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
2126  hashtable->curbatch = -1;
2127  return false;
2128  }
2129 
2130  /* Now we are alone with this batch. */
2132 
2133  /*
2134  * Has another process decided to give up early and command all processes
2135  * to skip the unmatched scan?
2136  */
2137  if (batch->skip_unmatched)
2138  {
2139  hashtable->batches[hashtable->curbatch].done = true;
2140  ExecHashTableDetachBatch(hashtable);
2141  return false;
2142  }
2143 
2144  /* Now prepare the process local state, just as for non-parallel join. */
2146 
2147  return true;
2148 }
void ExecHashTableDetachBatch(HashJoinTable hashtable)
Definition: nodeHash.c:3279
void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
Definition: nodeHash.c:2073
HashJoinTable hj_HashTable
Definition: execnodes.h:2110

References Assert(), BarrierArriveAndDetachExceptLast(), BarrierPhase(), ParallelHashJoinBatch::batch_barrier, HashJoinTableData::batches, HashJoinTableData::curbatch, ParallelHashJoinBatchAccessor::done, ExecHashTableDetachBatch(), ExecPrepHashTableForUnmatched(), HashJoinState::hj_HashTable, ParallelHashJoinBatchAccessor::inner_tuples, Max, HashJoinTableData::nbuckets, ParallelHashJoinBatchAccessor::outer_tuples, PHJ_BATCH_PROBE, PHJ_BATCH_SCAN, ParallelHashJoinBatchAccessor::shared, ParallelHashJoinBatch::size, ParallelHashJoinBatch::skip_unmatched, HashJoinTableData::spacePeak, and sts_end_parallel_scan().

Referenced by ExecHashJoinImpl().

◆ ExecParallelScanHashBucket()

bool ExecParallelScanHashBucket ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 2022 of file nodeHash.c.

2024 {
2025  ExprState *hjclauses = hjstate->hashclauses;
2026  HashJoinTable hashtable = hjstate->hj_HashTable;
2027  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2028  uint32 hashvalue = hjstate->hj_CurHashValue;
2029 
2030  /*
2031  * hj_CurTuple is the address of the tuple last returned from the current
2032  * bucket, or NULL if it's time to start scanning a new bucket.
2033  */
2034  if (hashTuple != NULL)
2035  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2036  else
2037  hashTuple = ExecParallelHashFirstTuple(hashtable,
2038  hjstate->hj_CurBucketNo);
2039 
2040  while (hashTuple != NULL)
2041  {
2042  if (hashTuple->hashvalue == hashvalue)
2043  {
2044  TupleTableSlot *inntuple;
2045 
2046  /* insert hashtable's tuple into exec slot so ExecQual sees it */
2047  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2048  hjstate->hj_HashTupleSlot,
2049  false); /* do not pfree */
2050  econtext->ecxt_innertuple = inntuple;
2051 
2052  if (ExecQualAndReset(hjclauses, econtext))
2053  {
2054  hjstate->hj_CurTuple = hashTuple;
2055  return true;
2056  }
2057  }
2058 
2059  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2060  }
2061 
2062  /*
2063  * no match
2064  */
2065  return false;
2066 }
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1447
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
Definition: executor.h:439
static HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno)
Definition: nodeHash.c:3421
static HashJoinTuple ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple)
Definition: nodeHash.c:3437
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:251
HashJoinTuple hj_CurTuple
Definition: execnodes.h:2114
ExprState * hashclauses
Definition: execnodes.h:2106
uint32 hj_CurHashValue
Definition: execnodes.h:2111
int hj_CurBucketNo
Definition: execnodes.h:2112
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:2116

References ExprContext::ecxt_innertuple, ExecParallelHashFirstTuple(), ExecParallelHashNextTuple(), ExecQualAndReset(), ExecStoreMinimalTuple(), HashJoinState::hashclauses, HashJoinTupleData::hashvalue, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurHashValue, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, and HJTUPLE_MINTUPLE.

Referenced by ExecHashJoinImpl().

◆ ExecParallelScanHashTableForUnmatched()

bool ExecParallelScanHashTableForUnmatched ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 2233 of file nodeHash.c.

2235 {
2236  HashJoinTable hashtable = hjstate->hj_HashTable;
2237  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2238 
2239  for (;;)
2240  {
2241  /*
2242  * hj_CurTuple is the address of the tuple last returned from the
2243  * current bucket, or NULL if it's time to start scanning a new
2244  * bucket.
2245  */
2246  if (hashTuple != NULL)
2247  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2248  else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
2249  hashTuple = ExecParallelHashFirstTuple(hashtable,
2250  hjstate->hj_CurBucketNo++);
2251  else
2252  break; /* finished all buckets */
2253 
2254  while (hashTuple != NULL)
2255  {
2256  if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
2257  {
2258  TupleTableSlot *inntuple;
2259 
2260  /* insert hashtable's tuple into exec slot */
2261  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2262  hjstate->hj_HashTupleSlot,
2263  false); /* do not pfree */
2264  econtext->ecxt_innertuple = inntuple;
2265 
2266  /*
2267  * Reset temp memory each time; although this function doesn't
2268  * do any qual eval, the caller will, so let's keep it
2269  * parallel to ExecScanHashBucket.
2270  */
2271  ResetExprContext(econtext);
2272 
2273  hjstate->hj_CurTuple = hashTuple;
2274  return true;
2275  }
2276 
2277  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2278  }
2279 
2280  /* allow this loop to be cancellable */
2282  }
2283 
2284  /*
2285  * no more unmatched tuples
2286  */
2287  return false;
2288 }
#define HeapTupleHeaderHasMatch(tup)
Definition: htup_details.h:514
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121

References CHECK_FOR_INTERRUPTS, ExprContext::ecxt_innertuple, ExecParallelHashFirstTuple(), ExecParallelHashNextTuple(), ExecStoreMinimalTuple(), HeapTupleHeaderHasMatch, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, HashJoinTableData::nbuckets, and ResetExprContext.

Referenced by ExecHashJoinImpl().

◆ ExecPrepHashTableForUnmatched()

void ExecPrepHashTableForUnmatched ( HashJoinState hjstate)

Definition at line 2073 of file nodeHash.c.

2074 {
2075  /*----------
2076  * During this scan we use the HashJoinState fields as follows:
2077  *
2078  * hj_CurBucketNo: next regular bucket to scan
2079  * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
2080  * hj_CurTuple: last tuple returned, or NULL to start next bucket
2081  *----------
2082  */
2083  hjstate->hj_CurBucketNo = 0;
2084  hjstate->hj_CurSkewBucketNo = 0;
2085  hjstate->hj_CurTuple = NULL;
2086 }
int hj_CurSkewBucketNo
Definition: execnodes.h:2113

References HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurSkewBucketNo, and HashJoinState::hj_CurTuple.

Referenced by ExecHashJoinImpl(), and ExecParallelPrepHashTableForUnmatched().

◆ ExecReScanHash()

void ExecReScanHash ( HashState node)

Definition at line 2350 of file nodeHash.c.

2351 {
2353 
2354  /*
2355  * if chgParam of subnode is not null then plan will be re-scanned by
2356  * first ExecProcNode.
2357  */
2358  if (outerPlan->chgParam == NULL)
2360 }
void ExecReScan(PlanState *node)
Definition: execAmi.c:78

References ExecReScan(), outerPlan, and outerPlanState.

Referenced by ExecReScan().

◆ ExecScanHashBucket()

bool ExecScanHashBucket ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 1961 of file nodeHash.c.

1963 {
1964  ExprState *hjclauses = hjstate->hashclauses;
1965  HashJoinTable hashtable = hjstate->hj_HashTable;
1966  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
1967  uint32 hashvalue = hjstate->hj_CurHashValue;
1968 
1969  /*
1970  * hj_CurTuple is the address of the tuple last returned from the current
1971  * bucket, or NULL if it's time to start scanning a new bucket.
1972  *
1973  * If the tuple hashed to a skew bucket then scan the skew bucket
1974  * otherwise scan the standard hashtable bucket.
1975  */
1976  if (hashTuple != NULL)
1977  hashTuple = hashTuple->next.unshared;
1978  else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
1979  hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
1980  else
1981  hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
1982 
1983  while (hashTuple != NULL)
1984  {
1985  if (hashTuple->hashvalue == hashvalue)
1986  {
1987  TupleTableSlot *inntuple;
1988 
1989  /* insert hashtable's tuple into exec slot so ExecQual sees it */
1990  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
1991  hjstate->hj_HashTupleSlot,
1992  false); /* do not pfree */
1993  econtext->ecxt_innertuple = inntuple;
1994 
1995  if (ExecQualAndReset(hjclauses, econtext))
1996  {
1997  hjstate->hj_CurTuple = hashTuple;
1998  return true;
1999  }
2000  }
2001 
2002  hashTuple = hashTuple->next.unshared;
2003  }
2004 
2005  /*
2006  * no match
2007  */
2008  return false;
2009 }

References HashJoinTableData::buckets, ExprContext::ecxt_innertuple, ExecQualAndReset(), ExecStoreMinimalTuple(), HashJoinState::hashclauses, HashJoinTupleData::hashvalue, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurHashValue, HashJoinState::hj_CurSkewBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, INVALID_SKEW_BUCKET_NO, HashJoinTupleData::next, HashJoinTableData::skewBucket, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

◆ ExecScanHashTableForUnmatched()

bool ExecScanHashTableForUnmatched ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 2159 of file nodeHash.c.

2160 {
2161  HashJoinTable hashtable = hjstate->hj_HashTable;
2162  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2163 
2164  for (;;)
2165  {
2166  /*
2167  * hj_CurTuple is the address of the tuple last returned from the
2168  * current bucket, or NULL if it's time to start scanning a new
2169  * bucket.
2170  */
2171  if (hashTuple != NULL)
2172  hashTuple = hashTuple->next.unshared;
2173  else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
2174  {
2175  hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
2176  hjstate->hj_CurBucketNo++;
2177  }
2178  else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
2179  {
2180  int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
2181 
2182  hashTuple = hashtable->skewBucket[j]->tuples;
2183  hjstate->hj_CurSkewBucketNo++;
2184  }
2185  else
2186  break; /* finished all buckets */
2187 
2188  while (hashTuple != NULL)
2189  {
2190  if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
2191  {
2192  TupleTableSlot *inntuple;
2193 
2194  /* insert hashtable's tuple into exec slot */
2195  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2196  hjstate->hj_HashTupleSlot,
2197  false); /* do not pfree */
2198  econtext->ecxt_innertuple = inntuple;
2199 
2200  /*
2201  * Reset temp memory each time; although this function doesn't
2202  * do any qual eval, the caller will, so let's keep it
2203  * parallel to ExecScanHashBucket.
2204  */
2205  ResetExprContext(econtext);
2206 
2207  hjstate->hj_CurTuple = hashTuple;
2208  return true;
2209  }
2210 
2211  hashTuple = hashTuple->next.unshared;
2212  }
2213 
2214  /* allow this loop to be cancellable */
2216  }
2217 
2218  /*
2219  * no more unmatched tuples
2220  */
2221  return false;
2222 }

References HashJoinTableData::buckets, CHECK_FOR_INTERRUPTS, ExprContext::ecxt_innertuple, ExecStoreMinimalTuple(), HeapTupleHeaderHasMatch, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurSkewBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, j, HashJoinTableData::nbuckets, HashJoinTupleData::next, HashJoinTableData::nSkewBuckets, ResetExprContext, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketNums, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

◆ ExecShutdownHash()

void ExecShutdownHash ( HashState node)

Definition at line 2801 of file nodeHash.c.

2802 {
2803  /* Allocate save space if EXPLAIN'ing and we didn't do so already */
2804  if (node->ps.instrument && !node->hinstrument)
2806  /* Now accumulate data for the current (final) hash table */
2807  if (node->hinstrument && node->hashtable)
2809 }
#define palloc0_object(type)
Definition: fe_memutils.h:63
void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
Definition: nodeHash.c:2847

References ExecHashAccumInstrumentation(), HashState::hashtable, HashState::hinstrument, PlanState::instrument, palloc0_object, and HashState::ps.

Referenced by ExecShutdownNode_walker().

◆ MultiExecHash()

Node* MultiExecHash ( HashState node)

Definition at line 106 of file nodeHash.c.

107 {
108  /* must provide our own instrumentation support */
109  if (node->ps.instrument)
111 
112  if (node->parallel_state != NULL)
113  MultiExecParallelHash(node);
114  else
115  MultiExecPrivateHash(node);
116 
117  /* must provide our own instrumentation support */
118  if (node->ps.instrument)
120 
121  /*
122  * We do not return the hash table directly because it's not a subtype of
123  * Node, and so would violate the MultiExecProcNode API. Instead, our
124  * parent Hashjoin node is expected to know how to fish it out of our node
125  * state. Ugly but not really worth cleaning up, since Hashjoin knows
126  * quite a bit more about Hash besides that.
127  */
128  return NULL;
129 }
void InstrStartNode(Instrumentation *instr)
Definition: instrument.c:68
void InstrStopNode(Instrumentation *instr, double nTuples)
Definition: instrument.c:84
static void MultiExecParallelHash(HashState *node)
Definition: nodeHash.c:215
static void MultiExecPrivateHash(HashState *node)
Definition: nodeHash.c:139
struct ParallelHashJoinState * parallel_state
Definition: execnodes.h:2681

References HashState::hashtable, InstrStartNode(), InstrStopNode(), PlanState::instrument, MultiExecParallelHash(), MultiExecPrivateHash(), HashState::parallel_state, HashJoinTableData::partialTuples, and HashState::ps.

Referenced by MultiExecProcNode().