PostgreSQL Source Code  git master
nodeHash.h File Reference
#include "access/parallel.h"
#include "nodes/execnodes.h"
Include dependency graph for nodeHash.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

HashStateExecInitHash (Hash *node, EState *estate, int eflags)
 
NodeMultiExecHash (HashState *node)
 
void ExecEndHash (HashState *node)
 
void ExecReScanHash (HashState *node)
 
HashJoinTable ExecHashTableCreate (HashState *state, List *hashOperators, List *hashCollations, bool keepNulls)
 
void ExecParallelHashTableAlloc (HashJoinTable hashtable, int batchno)
 
void ExecHashTableDestroy (HashJoinTable hashtable)
 
void ExecHashTableDetach (HashJoinTable hashtable)
 
void ExecHashTableDetachBatch (HashJoinTable hashtable)
 
void ExecParallelHashTableSetCurrentBatch (HashJoinTable hashtable, int batchno)
 
void ExecHashTableInsert (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
void ExecParallelHashTableInsert (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
void ExecParallelHashTableInsertCurrentBatch (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
bool ExecHashGetHashValue (HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, bool outer_tuple, bool keep_nulls, uint32 *hashvalue)
 
void ExecHashGetBucketAndBatch (HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
 
bool ExecScanHashBucket (HashJoinState *hjstate, ExprContext *econtext)
 
bool ExecParallelScanHashBucket (HashJoinState *hjstate, ExprContext *econtext)
 
void ExecPrepHashTableForUnmatched (HashJoinState *hjstate)
 
bool ExecParallelPrepHashTableForUnmatched (HashJoinState *hjstate)
 
bool ExecScanHashTableForUnmatched (HashJoinState *hjstate, ExprContext *econtext)
 
bool ExecParallelScanHashTableForUnmatched (HashJoinState *hjstate, ExprContext *econtext)
 
void ExecHashTableReset (HashJoinTable hashtable)
 
void ExecHashTableResetMatchFlags (HashJoinTable hashtable)
 
void ExecChooseHashTableSize (double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
 
int ExecHashGetSkewBucket (HashJoinTable hashtable, uint32 hashvalue)
 
void ExecHashEstimate (HashState *node, ParallelContext *pcxt)
 
void ExecHashInitializeDSM (HashState *node, ParallelContext *pcxt)
 
void ExecHashInitializeWorker (HashState *node, ParallelWorkerContext *pwcxt)
 
void ExecHashRetrieveInstrumentation (HashState *node)
 
void ExecShutdownHash (HashState *node)
 
void ExecHashAccumInstrumentation (HashInstrumentation *instrument, HashJoinTable hashtable)
 

Function Documentation

◆ ExecChooseHashTableSize()

void ExecChooseHashTableSize ( double  ntuples,
int  tupwidth,
bool  useskew,
bool  try_combined_hash_mem,
int  parallel_workers,
size_t *  space_allowed,
int *  numbuckets,
int *  numbatches,
int *  num_skew_mcvs 
)

Definition at line 676 of file nodeHash.c.

683 {
684  int tupsize;
685  double inner_rel_bytes;
686  size_t hash_table_bytes;
687  size_t bucket_bytes;
688  size_t max_pointers;
689  int nbatch = 1;
690  int nbuckets;
691  double dbuckets;
692 
693  /* Force a plausible relation size if no info */
694  if (ntuples <= 0.0)
695  ntuples = 1000.0;
696 
697  /*
698  * Estimate tupsize based on footprint of tuple in hashtable... note this
699  * does not allow for any palloc overhead. The manipulations of spaceUsed
700  * don't count palloc overhead either.
701  */
702  tupsize = HJTUPLE_OVERHEAD +
704  MAXALIGN(tupwidth);
705  inner_rel_bytes = ntuples * tupsize;
706 
707  /*
708  * Compute in-memory hashtable size limit from GUCs.
709  */
710  hash_table_bytes = get_hash_memory_limit();
711 
712  /*
713  * Parallel Hash tries to use the combined hash_mem of all workers to
714  * avoid the need to batch. If that won't work, it falls back to hash_mem
715  * per worker and tries to process batches in parallel.
716  */
717  if (try_combined_hash_mem)
718  {
719  /* Careful, this could overflow size_t */
720  double newlimit;
721 
722  newlimit = (double) hash_table_bytes * (double) (parallel_workers + 1);
723  newlimit = Min(newlimit, (double) SIZE_MAX);
724  hash_table_bytes = (size_t) newlimit;
725  }
726 
727  *space_allowed = hash_table_bytes;
728 
729  /*
730  * If skew optimization is possible, estimate the number of skew buckets
731  * that will fit in the memory allowed, and decrement the assumed space
732  * available for the main hash table accordingly.
733  *
734  * We make the optimistic assumption that each skew bucket will contain
735  * one inner-relation tuple. If that turns out to be low, we will recover
736  * at runtime by reducing the number of skew buckets.
737  *
738  * hashtable->skewBucket will have up to 8 times as many HashSkewBucket
739  * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash
740  * will round up to the next power of 2 and then multiply by 4 to reduce
741  * collisions.
742  */
743  if (useskew)
744  {
745  size_t bytes_per_mcv;
746  size_t skew_mcvs;
747 
748  /*----------
749  * Compute number of MCVs we could hold in hash_table_bytes
750  *
751  * Divisor is:
752  * size of a hash tuple +
753  * worst-case size of skewBucket[] per MCV +
754  * size of skewBucketNums[] entry +
755  * size of skew bucket struct itself
756  *----------
757  */
758  bytes_per_mcv = tupsize +
759  (8 * sizeof(HashSkewBucket *)) +
760  sizeof(int) +
762  skew_mcvs = hash_table_bytes / bytes_per_mcv;
763 
764  /*
765  * Now scale by SKEW_HASH_MEM_PERCENT (we do it in this order so as
766  * not to worry about size_t overflow in the multiplication)
767  */
768  skew_mcvs = (skew_mcvs * SKEW_HASH_MEM_PERCENT) / 100;
769 
770  /* Now clamp to integer range */
771  skew_mcvs = Min(skew_mcvs, INT_MAX);
772 
773  *num_skew_mcvs = (int) skew_mcvs;
774 
775  /* Reduce hash_table_bytes by the amount needed for the skew table */
776  if (skew_mcvs > 0)
777  hash_table_bytes -= skew_mcvs * bytes_per_mcv;
778  }
779  else
780  *num_skew_mcvs = 0;
781 
782  /*
783  * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when
784  * memory is filled, assuming a single batch; but limit the value so that
785  * the pointer arrays we'll try to allocate do not exceed hash_table_bytes
786  * nor MaxAllocSize.
787  *
788  * Note that both nbuckets and nbatch must be powers of 2 to make
789  * ExecHashGetBucketAndBatch fast.
790  */
791  max_pointers = hash_table_bytes / sizeof(HashJoinTuple);
792  max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));
793  /* If max_pointers isn't a power of 2, must round it down to one */
794  max_pointers = pg_prevpower2_size_t(max_pointers);
795 
796  /* Also ensure we avoid integer overflow in nbatch and nbuckets */
797  /* (this step is redundant given the current value of MaxAllocSize) */
798  max_pointers = Min(max_pointers, INT_MAX / 2 + 1);
799 
800  dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
801  dbuckets = Min(dbuckets, max_pointers);
802  nbuckets = (int) dbuckets;
803  /* don't let nbuckets be really small, though ... */
804  nbuckets = Max(nbuckets, 1024);
805  /* ... and force it to be a power of 2. */
806  nbuckets = pg_nextpower2_32(nbuckets);
807 
808  /*
809  * If there's not enough space to store the projected number of tuples and
810  * the required bucket headers, we will need multiple batches.
811  */
812  bucket_bytes = sizeof(HashJoinTuple) * nbuckets;
813  if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
814  {
815  /* We'll need multiple batches */
816  size_t sbuckets;
817  double dbatch;
818  int minbatch;
819  size_t bucket_size;
820 
821  /*
822  * If Parallel Hash with combined hash_mem would still need multiple
823  * batches, we'll have to fall back to regular hash_mem budget.
824  */
825  if (try_combined_hash_mem)
826  {
827  ExecChooseHashTableSize(ntuples, tupwidth, useskew,
828  false, parallel_workers,
829  space_allowed,
830  numbuckets,
831  numbatches,
832  num_skew_mcvs);
833  return;
834  }
835 
836  /*
837  * Estimate the number of buckets we'll want to have when hash_mem is
838  * entirely full. Each bucket will contain a bucket pointer plus
839  * NTUP_PER_BUCKET tuples, whose projected size already includes
840  * overhead for the hash code, pointer to the next tuple, etc.
841  */
842  bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple));
843  if (hash_table_bytes <= bucket_size)
844  sbuckets = 1; /* avoid pg_nextpower2_size_t(0) */
845  else
846  sbuckets = pg_nextpower2_size_t(hash_table_bytes / bucket_size);
847  sbuckets = Min(sbuckets, max_pointers);
848  nbuckets = (int) sbuckets;
849  nbuckets = pg_nextpower2_32(nbuckets);
850  bucket_bytes = nbuckets * sizeof(HashJoinTuple);
851 
852  /*
853  * Buckets are simple pointers to hashjoin tuples, while tupsize
854  * includes the pointer, hash code, and MinimalTupleData. So buckets
855  * should never really exceed 25% of hash_mem (even for
856  * NTUP_PER_BUCKET=1); except maybe for hash_mem values that are not
857  * 2^N bytes, where we might get more because of doubling. So let's
858  * look for 50% here.
859  */
860  Assert(bucket_bytes <= hash_table_bytes / 2);
861 
862  /* Calculate required number of batches. */
863  dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
864  dbatch = Min(dbatch, max_pointers);
865  minbatch = (int) dbatch;
866  nbatch = pg_nextpower2_32(Max(2, minbatch));
867  }
868 
869  Assert(nbuckets > 0);
870  Assert(nbatch > 0);
871 
872  *numbuckets = nbuckets;
873  *numbatches = nbatch;
874 }
#define Min(x, y)
Definition: c.h:993
#define MAXALIGN(LEN)
Definition: c.h:800
#define Max(x, y)
Definition: c.h:987
struct HashJoinTupleData * HashJoinTuple
Definition: execnodes.h:2106
#define HJTUPLE_OVERHEAD
Definition: hashjoin.h:90
#define SKEW_BUCKET_OVERHEAD
Definition: hashjoin.h:119
#define SKEW_HASH_MEM_PERCENT
Definition: hashjoin.h:121
#define SizeofMinimalTupleHeader
Definition: htup_details.h:647
Assert(fmt[strlen(fmt) - 1] !='\n')
#define MaxAllocSize
Definition: memutils.h:40
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
Definition: nodeHash.c:676
#define NTUP_PER_BUCKET
Definition: nodeHash.c:673
size_t get_hash_memory_limit(void)
Definition: nodeHash.c:3596
static uint32 pg_nextpower2_32(uint32 num)
Definition: pg_bitutils.h:189
#define pg_nextpower2_size_t
Definition: pg_bitutils.h:339
#define pg_prevpower2_size_t
Definition: pg_bitutils.h:340

References Assert(), get_hash_memory_limit(), HJTUPLE_OVERHEAD, Max, MAXALIGN, MaxAllocSize, Min, NTUP_PER_BUCKET, pg_nextpower2_32(), pg_nextpower2_size_t, pg_prevpower2_size_t, SizeofMinimalTupleHeader, SKEW_BUCKET_OVERHEAD, and SKEW_HASH_MEM_PERCENT.

Referenced by ExecHashTableCreate(), and initial_cost_hashjoin().

◆ ExecEndHash()

void ExecEndHash ( HashState node)

Definition at line 414 of file nodeHash.c.

415 {
417 
418  /*
419  * shut down the subplan
420  */
421  outerPlan = outerPlanState(node);
423 }
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:557
#define outerPlanState(node)
Definition: execnodes.h:1139
#define outerPlan(node)
Definition: plannodes.h:182

References ExecEndNode(), outerPlan, and outerPlanState.

Referenced by ExecEndNode().

◆ ExecHashAccumInstrumentation()

void ExecHashAccumInstrumentation ( HashInstrumentation instrument,
HashJoinTable  hashtable 
)

Definition at line 2851 of file nodeHash.c.

2853 {
2854  instrument->nbuckets = Max(instrument->nbuckets,
2855  hashtable->nbuckets);
2856  instrument->nbuckets_original = Max(instrument->nbuckets_original,
2857  hashtable->nbuckets_original);
2858  instrument->nbatch = Max(instrument->nbatch,
2859  hashtable->nbatch);
2860  instrument->nbatch_original = Max(instrument->nbatch_original,
2861  hashtable->nbatch_original);
2862  instrument->space_peak = Max(instrument->space_peak,
2863  hashtable->spacePeak);
2864 }

References Max, HashJoinTableData::nbatch, HashInstrumentation::nbatch, HashJoinTableData::nbatch_original, HashInstrumentation::nbatch_original, HashJoinTableData::nbuckets, HashInstrumentation::nbuckets, HashJoinTableData::nbuckets_original, HashInstrumentation::nbuckets_original, HashInstrumentation::space_peak, and HashJoinTableData::spacePeak.

Referenced by ExecReScanHashJoin(), and ExecShutdownHash().

◆ ExecHashEstimate()

void ExecHashEstimate ( HashState node,
ParallelContext pcxt 
)

Definition at line 2735 of file nodeHash.c.

2736 {
2737  size_t size;
2738 
2739  /* don't need this if not instrumenting or no workers */
2740  if (!node->ps.instrument || pcxt->nworkers == 0)
2741  return;
2742 
2743  size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation));
2744  size = add_size(size, offsetof(SharedHashInfo, hinstrument));
2745  shm_toc_estimate_chunk(&pcxt->estimator, size);
2746  shm_toc_estimate_keys(&pcxt->estimator, 1);
2747 }
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size add_size(Size s1, Size s2)
Definition: shmem.c:494
Size mul_size(Size s1, Size s2)
Definition: shmem.c:511
PlanState ps
Definition: execnodes.h:2667
shm_toc_estimator estimator
Definition: parallel.h:42
Instrumentation * instrument
Definition: execnodes.h:1053

References add_size(), ParallelContext::estimator, PlanState::instrument, mul_size(), ParallelContext::nworkers, HashState::ps, shm_toc_estimate_chunk, and shm_toc_estimate_keys.

Referenced by ExecParallelEstimate().

◆ ExecHashGetBucketAndBatch()

void ExecHashGetBucketAndBatch ( HashJoinTable  hashtable,
uint32  hashvalue,
int *  bucketno,
int *  batchno 
)

Definition at line 1933 of file nodeHash.c.

1937 {
1938  uint32 nbuckets = (uint32) hashtable->nbuckets;
1939  uint32 nbatch = (uint32) hashtable->nbatch;
1940 
1941  if (nbatch > 1)
1942  {
1943  *bucketno = hashvalue & (nbuckets - 1);
1944  *batchno = pg_rotate_right32(hashvalue,
1945  hashtable->log2_nbuckets) & (nbatch - 1);
1946  }
1947  else
1948  {
1949  *bucketno = hashvalue & (nbuckets - 1);
1950  *batchno = 0;
1951  }
1952 }
unsigned int uint32
Definition: c.h:495
static uint32 pg_rotate_right32(uint32 word, int n)
Definition: pg_bitutils.h:320

References HashJoinTableData::log2_nbuckets, HashJoinTableData::nbatch, HashJoinTableData::nbuckets, and pg_rotate_right32().

Referenced by ExecHashIncreaseNumBatches(), ExecHashIncreaseNumBuckets(), ExecHashJoinImpl(), ExecHashRemoveNextSkewBucket(), ExecHashTableInsert(), ExecParallelHashIncreaseNumBuckets(), ExecParallelHashJoinPartitionOuter(), ExecParallelHashRepartitionFirst(), ExecParallelHashRepartitionRest(), ExecParallelHashTableInsert(), and ExecParallelHashTableInsertCurrentBatch().

◆ ExecHashGetHashValue()

bool ExecHashGetHashValue ( HashJoinTable  hashtable,
ExprContext econtext,
List hashkeys,
bool  outer_tuple,
bool  keep_nulls,
uint32 hashvalue 
)

Definition at line 1825 of file nodeHash.c.

1831 {
1832  uint32 hashkey = 0;
1833  FmgrInfo *hashfunctions;
1834  ListCell *hk;
1835  int i = 0;
1836  MemoryContext oldContext;
1837 
1838  /*
1839  * We reset the eval context each time to reclaim any memory leaked in the
1840  * hashkey expressions.
1841  */
1842  ResetExprContext(econtext);
1843 
1844  oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
1845 
1846  if (outer_tuple)
1847  hashfunctions = hashtable->outer_hashfunctions;
1848  else
1849  hashfunctions = hashtable->inner_hashfunctions;
1850 
1851  foreach(hk, hashkeys)
1852  {
1853  ExprState *keyexpr = (ExprState *) lfirst(hk);
1854  Datum keyval;
1855  bool isNull;
1856 
1857  /* combine successive hashkeys by rotating */
1858  hashkey = pg_rotate_left32(hashkey, 1);
1859 
1860  /*
1861  * Get the join attribute value of the tuple
1862  */
1863  keyval = ExecEvalExpr(keyexpr, econtext, &isNull);
1864 
1865  /*
1866  * If the attribute is NULL, and the join operator is strict, then
1867  * this tuple cannot pass the join qual so we can reject it
1868  * immediately (unless we're scanning the outside of an outer join, in
1869  * which case we must not reject it). Otherwise we act like the
1870  * hashcode of NULL is zero (this will support operators that act like
1871  * IS NOT DISTINCT, though not any more-random behavior). We treat
1872  * the hash support function as strict even if the operator is not.
1873  *
1874  * Note: currently, all hashjoinable operators must be strict since
1875  * the hash index AM assumes that. However, it takes so little extra
1876  * code here to allow non-strict that we may as well do it.
1877  */
1878  if (isNull)
1879  {
1880  if (hashtable->hashStrict[i] && !keep_nulls)
1881  {
1882  MemoryContextSwitchTo(oldContext);
1883  return false; /* cannot match */
1884  }
1885  /* else, leave hashkey unmodified, equivalent to hashcode 0 */
1886  }
1887  else
1888  {
1889  /* Compute the hash function */
1890  uint32 hkey;
1891 
1892  hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i], hashtable->collations[i], keyval));
1893  hashkey ^= hkey;
1894  }
1895 
1896  i++;
1897  }
1898 
1899  MemoryContextSwitchTo(oldContext);
1900 
1901  *hashvalue = hashkey;
1902  return true;
1903 }
#define ResetExprContext(econtext)
Definition: executor.h:543
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:332
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
Definition: fmgr.c:1129
int i
Definition: isn.c:73
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
static uint32 pg_rotate_left32(uint32 word, int n)
Definition: pg_bitutils.h:326
#define lfirst(lc)
Definition: pg_list.h:172
static uint32 DatumGetUInt32(Datum X)
Definition: postgres.h:222
uintptr_t Datum
Definition: postgres.h:64
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:263
Definition: fmgr.h:57
FmgrInfo * outer_hashfunctions
Definition: hashjoin.h:351
bool * hashStrict
Definition: hashjoin.h:353
FmgrInfo * inner_hashfunctions
Definition: hashjoin.h:352

References HashJoinTableData::collations, DatumGetUInt32(), ExprContext::ecxt_per_tuple_memory, ExecEvalExpr(), FunctionCall1Coll(), HashJoinTableData::hashStrict, i, HashJoinTableData::inner_hashfunctions, lfirst, MemoryContextSwitchTo(), HashJoinTableData::outer_hashfunctions, pg_rotate_left32(), and ResetExprContext.

Referenced by ExecHashJoinOuterGetTuple(), ExecParallelHashJoinOuterGetTuple(), ExecParallelHashJoinPartitionOuter(), MultiExecParallelHash(), and MultiExecPrivateHash().

◆ ExecHashGetSkewBucket()

int ExecHashGetSkewBucket ( HashJoinTable  hashtable,
uint32  hashvalue 
)

Definition at line 2529 of file nodeHash.c.

2530 {
2531  int bucket;
2532 
2533  /*
2534  * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization (in
2535  * particular, this happens after the initial batch is done).
2536  */
2537  if (!hashtable->skewEnabled)
2538  return INVALID_SKEW_BUCKET_NO;
2539 
2540  /*
2541  * Since skewBucketLen is a power of 2, we can do a modulo by ANDing.
2542  */
2543  bucket = hashvalue & (hashtable->skewBucketLen - 1);
2544 
2545  /*
2546  * While we have not hit a hole in the hashtable and have not hit the
2547  * desired bucket, we have collided with some other hash value, so try the
2548  * next bucket location.
2549  */
2550  while (hashtable->skewBucket[bucket] != NULL &&
2551  hashtable->skewBucket[bucket]->hashvalue != hashvalue)
2552  bucket = (bucket + 1) & (hashtable->skewBucketLen - 1);
2553 
2554  /*
2555  * Found the desired bucket?
2556  */
2557  if (hashtable->skewBucket[bucket] != NULL)
2558  return bucket;
2559 
2560  /*
2561  * There must not be any hashtable entry for this hash value.
2562  */
2563  return INVALID_SKEW_BUCKET_NO;
2564 }
#define INVALID_SKEW_BUCKET_NO
Definition: hashjoin.h:120
HashSkewBucket ** skewBucket
Definition: hashjoin.h:319
uint32 hashvalue
Definition: hashjoin.h:115

References HashSkewBucket::hashvalue, INVALID_SKEW_BUCKET_NO, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketLen, and HashJoinTableData::skewEnabled.

Referenced by ExecHashJoinImpl(), and MultiExecPrivateHash().

◆ ExecHashInitializeDSM()

void ExecHashInitializeDSM ( HashState node,
ParallelContext pcxt 
)

Definition at line 2754 of file nodeHash.c.

2755 {
2756  size_t size;
2757 
2758  /* don't need this if not instrumenting or no workers */
2759  if (!node->ps.instrument || pcxt->nworkers == 0)
2760  return;
2761 
2762  size = offsetof(SharedHashInfo, hinstrument) +
2763  pcxt->nworkers * sizeof(HashInstrumentation);
2764  node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size);
2765 
2766  /* Each per-worker area must start out as zeroes. */
2767  memset(node->shared_info, 0, size);
2768 
2769  node->shared_info->num_workers = pcxt->nworkers;
2770  shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id,
2771  node->shared_info);
2772 }
struct HashInstrumentation HashInstrumentation
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
SharedHashInfo * shared_info
Definition: execnodes.h:2677
shm_toc * toc
Definition: parallel.h:45
Plan * plan
Definition: execnodes.h:1043
int plan_node_id
Definition: plannodes.h:151

References PlanState::instrument, SharedHashInfo::num_workers, ParallelContext::nworkers, PlanState::plan, Plan::plan_node_id, HashState::ps, HashState::shared_info, shm_toc_allocate(), shm_toc_insert(), and ParallelContext::toc.

Referenced by ExecParallelInitializeDSM().

◆ ExecHashInitializeWorker()

void ExecHashInitializeWorker ( HashState node,
ParallelWorkerContext pwcxt 
)

Definition at line 2779 of file nodeHash.c.

2780 {
2781  SharedHashInfo *shared_info;
2782 
2783  /* don't need this if not instrumenting */
2784  if (!node->ps.instrument)
2785  return;
2786 
2787  /*
2788  * Find our entry in the shared area, and set up a pointer to it so that
2789  * we'll accumulate stats there when shutting down or rebuilding the hash
2790  * table.
2791  */
2792  shared_info = (SharedHashInfo *)
2793  shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
2794  node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber];
2795 }
int ParallelWorkerNumber
Definition: parallel.c:115
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
HashInstrumentation * hinstrument
Definition: execnodes.h:2684
HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER]
Definition: execnodes.h:2658

References SharedHashInfo::hinstrument, HashState::hinstrument, PlanState::instrument, ParallelWorkerNumber, PlanState::plan, Plan::plan_node_id, HashState::ps, shm_toc_lookup(), and ParallelWorkerContext::toc.

Referenced by ExecParallelInitializeWorker().

◆ ExecHashRetrieveInstrumentation()

void ExecHashRetrieveInstrumentation ( HashState node)

Definition at line 2820 of file nodeHash.c.

2821 {
2822  SharedHashInfo *shared_info = node->shared_info;
2823  size_t size;
2824 
2825  if (shared_info == NULL)
2826  return;
2827 
2828  /* Replace node->shared_info with a copy in backend-local memory. */
2829  size = offsetof(SharedHashInfo, hinstrument) +
2830  shared_info->num_workers * sizeof(HashInstrumentation);
2831  node->shared_info = palloc(size);
2832  memcpy(node->shared_info, shared_info, size);
2833 }
void * palloc(Size size)
Definition: mcxt.c:1201

References SharedHashInfo::num_workers, palloc(), and HashState::shared_info.

Referenced by ExecParallelRetrieveInstrumentation().

◆ ExecHashTableCreate()

HashJoinTable ExecHashTableCreate ( HashState state,
List hashOperators,
List hashCollations,
bool  keepNulls 
)

Definition at line 433 of file nodeHash.c.

434 {
435  Hash *node;
436  HashJoinTable hashtable;
437  Plan *outerNode;
438  size_t space_allowed;
439  int nbuckets;
440  int nbatch;
441  double rows;
442  int num_skew_mcvs;
443  int log2_nbuckets;
444  int nkeys;
445  int i;
446  ListCell *ho;
447  ListCell *hc;
448  MemoryContext oldcxt;
449 
450  /*
451  * Get information about the size of the relation to be hashed (it's the
452  * "outer" subtree of this node, but the inner relation of the hashjoin).
453  * Compute the appropriate size of the hash table.
454  */
455  node = (Hash *) state->ps.plan;
456  outerNode = outerPlan(node);
457 
458  /*
459  * If this is shared hash table with a partial plan, then we can't use
460  * outerNode->plan_rows to estimate its size. We need an estimate of the
461  * total number of rows across all copies of the partial plan.
462  */
463  rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows;
464 
465  ExecChooseHashTableSize(rows, outerNode->plan_width,
466  OidIsValid(node->skewTable),
467  state->parallel_state != NULL,
468  state->parallel_state != NULL ?
469  state->parallel_state->nparticipants - 1 : 0,
470  &space_allowed,
471  &nbuckets, &nbatch, &num_skew_mcvs);
472 
473  /* nbuckets must be a power of 2 */
474  log2_nbuckets = my_log2(nbuckets);
475  Assert(nbuckets == (1 << log2_nbuckets));
476 
477  /*
478  * Initialize the hash table control block.
479  *
480  * The hashtable control block is just palloc'd from the executor's
481  * per-query memory context. Everything else should be kept inside the
482  * subsidiary hashCxt, batchCxt or spillCxt.
483  */
484  hashtable = palloc_object(HashJoinTableData);
485  hashtable->nbuckets = nbuckets;
486  hashtable->nbuckets_original = nbuckets;
487  hashtable->nbuckets_optimal = nbuckets;
488  hashtable->log2_nbuckets = log2_nbuckets;
489  hashtable->log2_nbuckets_optimal = log2_nbuckets;
490  hashtable->buckets.unshared = NULL;
491  hashtable->keepNulls = keepNulls;
492  hashtable->skewEnabled = false;
493  hashtable->skewBucket = NULL;
494  hashtable->skewBucketLen = 0;
495  hashtable->nSkewBuckets = 0;
496  hashtable->skewBucketNums = NULL;
497  hashtable->nbatch = nbatch;
498  hashtable->curbatch = 0;
499  hashtable->nbatch_original = nbatch;
500  hashtable->nbatch_outstart = nbatch;
501  hashtable->growEnabled = true;
502  hashtable->totalTuples = 0;
503  hashtable->partialTuples = 0;
504  hashtable->skewTuples = 0;
505  hashtable->innerBatchFile = NULL;
506  hashtable->outerBatchFile = NULL;
507  hashtable->spaceUsed = 0;
508  hashtable->spacePeak = 0;
509  hashtable->spaceAllowed = space_allowed;
510  hashtable->spaceUsedSkew = 0;
511  hashtable->spaceAllowedSkew =
512  hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
513  hashtable->chunks = NULL;
514  hashtable->current_chunk = NULL;
515  hashtable->parallel_state = state->parallel_state;
516  hashtable->area = state->ps.state->es_query_dsa;
517  hashtable->batches = NULL;
518 
519 #ifdef HJDEBUG
520  printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
521  hashtable, nbatch, nbuckets);
522 #endif
523 
524  /*
525  * Create temporary memory contexts in which to keep the hashtable working
526  * storage. See notes in executor/hashjoin.h.
527  */
529  "HashTableContext",
531 
532  hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
533  "HashBatchContext",
535 
536  hashtable->spillCxt = AllocSetContextCreate(hashtable->hashCxt,
537  "HashSpillContext",
539 
540  /* Allocate data that will live for the life of the hashjoin */
541 
542  oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
543 
544  /*
545  * Get info about the hash functions to be used for each hash key. Also
546  * remember whether the join operators are strict.
547  */
548  nkeys = list_length(hashOperators);
549  hashtable->outer_hashfunctions = palloc_array(FmgrInfo, nkeys);
550  hashtable->inner_hashfunctions = palloc_array(FmgrInfo, nkeys);
551  hashtable->hashStrict = palloc_array(bool, nkeys);
552  hashtable->collations = palloc_array(Oid, nkeys);
553  i = 0;
554  forboth(ho, hashOperators, hc, hashCollations)
555  {
556  Oid hashop = lfirst_oid(ho);
557  Oid left_hashfn;
558  Oid right_hashfn;
559 
560  if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
561  elog(ERROR, "could not find hash function for hash operator %u",
562  hashop);
563  fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]);
564  fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]);
565  hashtable->hashStrict[i] = op_strict(hashop);
566  hashtable->collations[i] = lfirst_oid(hc);
567  i++;
568  }
569 
570  if (nbatch > 1 && hashtable->parallel_state == NULL)
571  {
572  MemoryContext oldctx;
573 
574  /*
575  * allocate and initialize the file arrays in hashCxt (not needed for
576  * parallel case which uses shared tuplestores instead of raw files)
577  */
578  oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
579 
580  hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
581  hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
582 
583  MemoryContextSwitchTo(oldctx);
584 
585  /* The files will not be opened until needed... */
586  /* ... but make sure we have temp tablespaces established for them */
588  }
589 
590  MemoryContextSwitchTo(oldcxt);
591 
592  if (hashtable->parallel_state)
593  {
594  ParallelHashJoinState *pstate = hashtable->parallel_state;
595  Barrier *build_barrier;
596 
597  /*
598  * Attach to the build barrier. The corresponding detach operation is
599  * in ExecHashTableDetach. Note that we won't attach to the
600  * batch_barrier for batch 0 yet. We'll attach later and start it out
601  * in PHJ_BATCH_PROBE phase, because batch 0 is allocated up front and
602  * then loaded while hashing (the standard hybrid hash join
603  * algorithm), and we'll coordinate that using build_barrier.
604  */
605  build_barrier = &pstate->build_barrier;
606  BarrierAttach(build_barrier);
607 
608  /*
609  * So far we have no idea whether there are any other participants,
610  * and if so, what phase they are working on. The only thing we care
611  * about at this point is whether someone has already created the
612  * SharedHashJoinBatch objects and the hash table for batch 0. One
613  * backend will be elected to do that now if necessary.
614  */
615  if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECT &&
616  BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECT))
617  {
618  pstate->nbatch = nbatch;
619  pstate->space_allowed = space_allowed;
620  pstate->growth = PHJ_GROWTH_OK;
621 
622  /* Set up the shared state for coordinating batches. */
623  ExecParallelHashJoinSetUpBatches(hashtable, nbatch);
624 
625  /*
626  * Allocate batch 0's hash table up front so we can load it
627  * directly while hashing.
628  */
629  pstate->nbuckets = nbuckets;
630  ExecParallelHashTableAlloc(hashtable, 0);
631  }
632 
633  /*
634  * The next Parallel Hash synchronization point is in
635  * MultiExecParallelHash(), which will progress it all the way to
636  * PHJ_BUILD_RUN. The caller must not return control from this
637  * executor node between now and then.
638  */
639  }
640  else
641  {
642  /*
643  * Prepare context for the first-scan space allocations; allocate the
644  * hashbucket array therein, and set each bucket "empty".
645  */
646  MemoryContextSwitchTo(hashtable->batchCxt);
647 
648  hashtable->buckets.unshared = palloc0_array(HashJoinTuple, nbuckets);
649 
650  /*
651  * Set up for skew optimization, if possible and there's a need for
652  * more than one batch. (In a one-batch join, there's no point in
653  * it.)
654  */
655  if (nbatch > 1)
656  ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
657 
658  MemoryContextSwitchTo(oldcxt);
659  }
660 
661  return hashtable;
662 }
void PrepareTempTablespaces(void)
Definition: tablespace.c:1337
int BarrierAttach(Barrier *barrier)
Definition: barrier.c:236
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:265
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
Definition: barrier.c:125
#define OidIsValid(objectId)
Definition: c.h:764
int my_log2(long num)
Definition: dynahash.c:1760
#define ERROR
Definition: elog.h:39
#define palloc_object(type)
Definition: fe_memutils.h:62
#define palloc_array(type, count)
Definition: fe_memutils.h:64
#define palloc0_array(type, count)
Definition: fe_memutils.h:65
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:127
@ PHJ_GROWTH_OK
Definition: hashjoin.h:233
#define PHJ_BUILD_ELECT
Definition: hashjoin.h:269
bool op_strict(Oid opno)
Definition: lsyscache.c:1454
bool get_op_hash_functions(Oid opno, RegProcedure *lhs_procno, RegProcedure *rhs_procno)
Definition: lsyscache.c:509
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
#define AllocSetContextCreate
Definition: memutils.h:128
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:152
static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
Definition: nodeHash.c:2376
static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
Definition: nodeHash.c:3098
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
Definition: nodeHash.c:3263
static int list_length(const List *l)
Definition: pg_list.h:152
#define forboth(cell1, list1, cell2, list2)
Definition: pg_list.h:518
#define lfirst_oid(lc)
Definition: pg_list.h:174
#define printf(...)
Definition: port.h:244
unsigned int Oid
Definition: postgres_ext.h:31
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:311
HashMemoryChunk chunks
Definition: hashjoin.h:367
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:373
MemoryContext hashCxt
Definition: hashjoin.h:362
double totalTuples
Definition: hashjoin.h:332
double partialTuples
Definition: hashjoin.h:333
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:372
MemoryContext spillCxt
Definition: hashjoin.h:364
HashMemoryChunk current_chunk
Definition: hashjoin.h:370
Size spaceAllowedSkew
Definition: hashjoin.h:360
int * skewBucketNums
Definition: hashjoin.h:322
BufFile ** innerBatchFile
Definition: hashjoin.h:343
int log2_nbuckets_optimal
Definition: hashjoin.h:305
dsa_area * area
Definition: hashjoin.h:371
BufFile ** outerBatchFile
Definition: hashjoin.h:344
MemoryContext batchCxt
Definition: hashjoin.h:363
double skewTuples
Definition: hashjoin.h:334
union HashJoinTableData::@99 buckets
Oid skewTable
Definition: plannodes.h:1204
Cardinality rows_total
Definition: plannodes.h:1208
Plan plan
Definition: plannodes.h:1197
ParallelHashGrowth growth
Definition: hashjoin.h:253
bool parallel_aware
Definition: plannodes.h:140
int plan_width
Definition: plannodes.h:135
Cardinality plan_rows
Definition: plannodes.h:134
Definition: regguts.h:323

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, HashJoinTableData::area, Assert(), BarrierArriveAndWait(), BarrierAttach(), BarrierPhase(), HashJoinTableData::batchCxt, HashJoinTableData::batches, HashJoinTableData::buckets, ParallelHashJoinState::build_barrier, HashJoinTableData::chunks, HashJoinTableData::collations, HashJoinTableData::curbatch, HashJoinTableData::current_chunk, CurrentMemoryContext, elog(), ERROR, ExecChooseHashTableSize(), ExecHashBuildSkewHash(), ExecParallelHashJoinSetUpBatches(), ExecParallelHashTableAlloc(), fmgr_info(), forboth, get_op_hash_functions(), HashJoinTableData::growEnabled, ParallelHashJoinState::growth, HashJoinTableData::hashCxt, HashJoinTableData::hashStrict, i, HashJoinTableData::inner_hashfunctions, HashJoinTableData::innerBatchFile, HashJoinTableData::keepNulls, lfirst_oid, list_length(), HashJoinTableData::log2_nbuckets, HashJoinTableData::log2_nbuckets_optimal, MemoryContextSwitchTo(), my_log2(), ParallelHashJoinState::nbatch, HashJoinTableData::nbatch, HashJoinTableData::nbatch_original, HashJoinTableData::nbatch_outstart, ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, HashJoinTableData::nbuckets_optimal, HashJoinTableData::nbuckets_original, HashJoinTableData::nSkewBuckets, OidIsValid, op_strict(), HashJoinTableData::outer_hashfunctions, HashJoinTableData::outerBatchFile, outerPlan, palloc0_array, palloc_array, palloc_object, Plan::parallel_aware, HashJoinTableData::parallel_state, HashJoinTableData::partialTuples, PHJ_BUILD_ELECT, PHJ_GROWTH_OK, Hash::plan, Plan::plan_rows, Plan::plan_width, PrepareTempTablespaces(), printf, Hash::rows_total, SKEW_HASH_MEM_PERCENT, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketLen, HashJoinTableData::skewBucketNums, HashJoinTableData::skewEnabled, Hash::skewTable, HashJoinTableData::skewTuples, ParallelHashJoinState::space_allowed, HashJoinTableData::spaceAllowed, HashJoinTableData::spaceAllowedSkew, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, HashJoinTableData::spaceUsedSkew, HashJoinTableData::spillCxt, HashJoinTableData::totalTuples, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

◆ ExecHashTableDestroy()

void ExecHashTableDestroy ( HashJoinTable  hashtable)

Definition at line 884 of file nodeHash.c.

885 {
886  int i;
887 
888  /*
889  * Make sure all the temp files are closed. We skip batch 0, since it
890  * can't have any temp files (and the arrays might not even exist if
891  * nbatch is only 1). Parallel hash joins don't use these files.
892  */
893  if (hashtable->innerBatchFile != NULL)
894  {
895  for (i = 1; i < hashtable->nbatch; i++)
896  {
897  if (hashtable->innerBatchFile[i])
898  BufFileClose(hashtable->innerBatchFile[i]);
899  if (hashtable->outerBatchFile[i])
900  BufFileClose(hashtable->outerBatchFile[i]);
901  }
902  }
903 
904  /* Release working memory (batchCxt is a child, so it goes away too) */
905  MemoryContextDelete(hashtable->hashCxt);
906 
907  /* And drop the control block */
908  pfree(hashtable);
909 }
void BufFileClose(BufFile *file)
Definition: buffile.c:412
void pfree(void *pointer)
Definition: mcxt.c:1431
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:403

References BufFileClose(), HashJoinTableData::hashCxt, i, HashJoinTableData::innerBatchFile, MemoryContextDelete(), HashJoinTableData::nbatch, HashJoinTableData::outerBatchFile, and pfree().

Referenced by ExecEndHashJoin(), and ExecReScanHashJoin().

◆ ExecHashTableDetach()

void ExecHashTableDetach ( HashJoinTable  hashtable)

Definition at line 3375 of file nodeHash.c.

3376 {
3377  ParallelHashJoinState *pstate = hashtable->parallel_state;
3378 
3379  /*
3380  * If we're involved in a parallel query, we must either have gotten all
3381  * the way to PHJ_BUILD_RUN, or joined too late and be in PHJ_BUILD_FREE.
3382  */
3383  Assert(!pstate ||
3385 
3386  if (pstate && BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_RUN)
3387  {
3388  int i;
3389 
3390  /* Make sure any temporary files are closed. */
3391  if (hashtable->batches)
3392  {
3393  for (i = 0; i < hashtable->nbatch; ++i)
3394  {
3395  sts_end_write(hashtable->batches[i].inner_tuples);
3396  sts_end_write(hashtable->batches[i].outer_tuples);
3399  }
3400  }
3401 
3402  /* If we're last to detach, clean up shared memory. */
3403  if (BarrierArriveAndDetach(&pstate->build_barrier))
3404  {
3405  /*
3406  * Late joining processes will see this state and give up
3407  * immediately.
3408  */
3410 
3411  if (DsaPointerIsValid(pstate->batches))
3412  {
3413  dsa_free(hashtable->area, pstate->batches);
3414  pstate->batches = InvalidDsaPointer;
3415  }
3416  }
3417  }
3418  hashtable->parallel_state = NULL;
3419 }
bool BarrierArriveAndDetach(Barrier *barrier)
Definition: barrier.c:203
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:841
#define InvalidDsaPointer
Definition: dsa.h:78
#define DsaPointerIsValid(x)
Definition: dsa.h:81
#define PHJ_BUILD_FREE
Definition: hashjoin.h:274
#define PHJ_BUILD_RUN
Definition: hashjoin.h:273
void sts_end_write(SharedTuplestoreAccessor *accessor)
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * outer_tuples
Definition: hashjoin.h:221
SharedTuplestoreAccessor * inner_tuples
Definition: hashjoin.h:220
dsa_pointer batches
Definition: hashjoin.h:248

References HashJoinTableData::area, Assert(), BarrierArriveAndDetach(), BarrierPhase(), ParallelHashJoinState::batches, HashJoinTableData::batches, ParallelHashJoinState::build_barrier, dsa_free(), DsaPointerIsValid, i, ParallelHashJoinBatchAccessor::inner_tuples, InvalidDsaPointer, HashJoinTableData::nbatch, ParallelHashJoinBatchAccessor::outer_tuples, HashJoinTableData::parallel_state, PHJ_BUILD_FREE, PHJ_BUILD_RUN, sts_end_parallel_scan(), and sts_end_write().

Referenced by ExecHashJoinReInitializeDSM(), and ExecShutdownHashJoin().

◆ ExecHashTableDetachBatch()

void ExecHashTableDetachBatch ( HashJoinTable  hashtable)

Definition at line 3283 of file nodeHash.c.

3284 {
3285  if (hashtable->parallel_state != NULL &&
3286  hashtable->curbatch >= 0)
3287  {
3288  int curbatch = hashtable->curbatch;
3289  ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
3290  bool attached = true;
3291 
3292  /* Make sure any temporary files are closed. */
3293  sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
3294  sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
3295 
3296  /* After attaching we always get at least to PHJ_BATCH_PROBE. */
3299 
3300  /*
3301  * If we're abandoning the PHJ_BATCH_PROBE phase early without having
3302  * reached the end of it, it means the plan doesn't want any more
3303  * tuples, and it is happy to abandon any tuples buffered in this
3304  * process's subplans. For correctness, we can't allow any process to
3305  * execute the PHJ_BATCH_SCAN phase, because we will never have the
3306  * complete set of match bits. Therefore we skip emitting unmatched
3307  * tuples in all backends (if this is a full/right join), as if those
3308  * tuples were all due to be emitted by this process and it has
3309  * abandoned them too.
3310  */
3311  if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE &&
3312  !hashtable->batches[curbatch].outer_eof)
3313  {
3314  /*
3315  * This flag may be written to by multiple backends during
3316  * PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN
3317  * phase so requires no extra locking.
3318  */
3319  batch->skip_unmatched = true;
3320  }
3321 
3322  /*
3323  * Even if we aren't doing a full/right outer join, we'll step through
3324  * the PHJ_BATCH_SCAN phase just to maintain the invariant that
3325  * freeing happens in PHJ_BATCH_FREE, but that'll be wait-free.
3326  */
3327  if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE)
3328  attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
3329  if (attached && BarrierArriveAndDetach(&batch->batch_barrier))
3330  {
3331  /*
3332  * We are not longer attached to the batch barrier, but we're the
3333  * process that was chosen to free resources and it's safe to
3334  * assert the current phase. The ParallelHashJoinBatch can't go
3335  * away underneath us while we are attached to the build barrier,
3336  * making this access safe.
3337  */
3339 
3340  /* Free shared chunks and buckets. */
3341  while (DsaPointerIsValid(batch->chunks))
3342  {
3343  HashMemoryChunk chunk =
3344  dsa_get_address(hashtable->area, batch->chunks);
3345  dsa_pointer next = chunk->next.shared;
3346 
3347  dsa_free(hashtable->area, batch->chunks);
3348  batch->chunks = next;
3349  }
3350  if (DsaPointerIsValid(batch->buckets))
3351  {
3352  dsa_free(hashtable->area, batch->buckets);
3353  batch->buckets = InvalidDsaPointer;
3354  }
3355  }
3356 
3357  /*
3358  * Track the largest batch we've been attached to. Though each
3359  * backend might see a different subset of batches, explain.c will
3360  * scan the results from all backends to find the largest value.
3361  */
3362  hashtable->spacePeak =
3363  Max(hashtable->spacePeak,
3364  batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
3365 
3366  /* Remember that we are not attached to a batch. */
3367  hashtable->curbatch = -1;
3368  }
3369 }
bool BarrierArriveAndDetachExceptLast(Barrier *barrier)
Definition: barrier.c:213
static int32 next
Definition: blutils.c:221
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:957
uint64 dsa_pointer
Definition: dsa.h:62
#define PHJ_BATCH_SCAN
Definition: hashjoin.h:281
#define PHJ_BATCH_PROBE
Definition: hashjoin.h:280
#define PHJ_BATCH_FREE
Definition: hashjoin.h:282
dsa_pointer shared
Definition: hashjoin.h:138
union HashMemoryChunkData::@98 next
ParallelHashJoinBatch * shared
Definition: hashjoin.h:209
dsa_pointer chunks
Definition: hashjoin.h:167
dsa_pointer buckets
Definition: hashjoin.h:164

References HashJoinTableData::area, Assert(), BarrierArriveAndDetach(), BarrierArriveAndDetachExceptLast(), BarrierPhase(), ParallelHashJoinBatch::batch_barrier, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, ParallelHashJoinBatch::chunks, HashJoinTableData::curbatch, dsa_free(), dsa_get_address(), DsaPointerIsValid, ParallelHashJoinBatchAccessor::inner_tuples, InvalidDsaPointer, Max, HashJoinTableData::nbuckets, next, HashMemoryChunkData::next, ParallelHashJoinBatchAccessor::outer_eof, ParallelHashJoinBatchAccessor::outer_tuples, HashJoinTableData::parallel_state, PHJ_BATCH_FREE, PHJ_BATCH_PROBE, PHJ_BATCH_SCAN, HashMemoryChunkData::shared, ParallelHashJoinBatchAccessor::shared, ParallelHashJoinBatch::size, ParallelHashJoinBatch::skip_unmatched, HashJoinTableData::spacePeak, and sts_end_parallel_scan().

Referenced by ExecHashJoinReInitializeDSM(), ExecParallelHashJoinNewBatch(), ExecParallelPrepHashTableForUnmatched(), and ExecShutdownHashJoin().

◆ ExecHashTableInsert()

void ExecHashTableInsert ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1625 of file nodeHash.c.

1628 {
1629  bool shouldFree;
1630  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1631  int bucketno;
1632  int batchno;
1633 
1634  ExecHashGetBucketAndBatch(hashtable, hashvalue,
1635  &bucketno, &batchno);
1636 
1637  /*
1638  * decide whether to put the tuple in the hash table or a temp file
1639  */
1640  if (batchno == hashtable->curbatch)
1641  {
1642  /*
1643  * put the tuple in hash table
1644  */
1645  HashJoinTuple hashTuple;
1646  int hashTupleSize;
1647  double ntuples = (hashtable->totalTuples - hashtable->skewTuples);
1648 
1649  /* Create the HashJoinTuple */
1650  hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
1651  hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
1652 
1653  hashTuple->hashvalue = hashvalue;
1654  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1655 
1656  /*
1657  * We always reset the tuple-matched flag on insertion. This is okay
1658  * even when reloading a tuple from a batch file, since the tuple
1659  * could not possibly have been matched to an outer tuple before it
1660  * went into the batch file.
1661  */
1663 
1664  /* Push it onto the front of the bucket's list */
1665  hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
1666  hashtable->buckets.unshared[bucketno] = hashTuple;
1667 
1668  /*
1669  * Increase the (optimal) number of buckets if we just exceeded the
1670  * NTUP_PER_BUCKET threshold, but only when there's still a single
1671  * batch.
1672  */
1673  if (hashtable->nbatch == 1 &&
1674  ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))
1675  {
1676  /* Guard against integer overflow and alloc size overflow */
1677  if (hashtable->nbuckets_optimal <= INT_MAX / 2 &&
1678  hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple))
1679  {
1680  hashtable->nbuckets_optimal *= 2;
1681  hashtable->log2_nbuckets_optimal += 1;
1682  }
1683  }
1684 
1685  /* Account for space used, and back off if we've used too much */
1686  hashtable->spaceUsed += hashTupleSize;
1687  if (hashtable->spaceUsed > hashtable->spacePeak)
1688  hashtable->spacePeak = hashtable->spaceUsed;
1689  if (hashtable->spaceUsed +
1690  hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
1691  > hashtable->spaceAllowed)
1692  ExecHashIncreaseNumBatches(hashtable);
1693  }
1694  else
1695  {
1696  /*
1697  * put the tuple into a temp file for later batches
1698  */
1699  Assert(batchno > hashtable->curbatch);
1700  ExecHashJoinSaveTuple(tuple,
1701  hashvalue,
1702  &hashtable->innerBatchFile[batchno],
1703  hashtable);
1704  }
1705 
1706  if (shouldFree)
1707  heap_free_minimal_tuple(tuple);
1708 }
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1691
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:91
void heap_free_minimal_tuple(MinimalTuple mtup)
Definition: heaptuple.c:1524
#define HeapTupleHeaderClearMatch(tup)
Definition: htup_details.h:524
static void * dense_alloc(HashJoinTable hashtable, Size size)
Definition: nodeHash.c:2870
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
Definition: nodeHash.c:917
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
Definition: nodeHash.c:1933
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr, HashJoinTable hashtable)
uint32 hashvalue
Definition: hashjoin.h:86
union HashJoinTupleData::@97 next
struct HashJoinTupleData * unshared
Definition: hashjoin.h:83

References Assert(), HashJoinTableData::buckets, HashJoinTableData::curbatch, dense_alloc(), ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecHashIncreaseNumBatches(), ExecHashJoinSaveTuple(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, HashJoinTableData::innerBatchFile, HashJoinTableData::log2_nbuckets_optimal, MaxAllocSize, HashJoinTableData::nbatch, HashJoinTableData::nbuckets_optimal, HashJoinTupleData::next, NTUP_PER_BUCKET, HashJoinTableData::skewTuples, HashJoinTableData::spaceAllowed, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, MinimalTupleData::t_len, HashJoinTableData::totalTuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinNewBatch(), and MultiExecPrivateHash().

◆ ExecHashTableReset()

void ExecHashTableReset ( HashJoinTable  hashtable)

Definition at line 2300 of file nodeHash.c.

2301 {
2302  MemoryContext oldcxt;
2303  int nbuckets = hashtable->nbuckets;
2304 
2305  /*
2306  * Release all the hash buckets and tuples acquired in the prior pass, and
2307  * reinitialize the context for a new pass.
2308  */
2309  MemoryContextReset(hashtable->batchCxt);
2310  oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
2311 
2312  /* Reallocate and reinitialize the hash bucket headers. */
2313  hashtable->buckets.unshared = palloc0_array(HashJoinTuple, nbuckets);
2314 
2315  hashtable->spaceUsed = 0;
2316 
2317  MemoryContextSwitchTo(oldcxt);
2318 
2319  /* Forget the chunks (the memory was freed by the context reset above). */
2320  hashtable->chunks = NULL;
2321 }
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:330

References HashJoinTableData::batchCxt, HashJoinTableData::buckets, HashJoinTableData::chunks, MemoryContextReset(), MemoryContextSwitchTo(), HashJoinTableData::nbuckets, palloc0_array, HashJoinTableData::spaceUsed, and HashJoinTableData::unshared.

Referenced by ExecHashJoinNewBatch().

◆ ExecHashTableResetMatchFlags()

void ExecHashTableResetMatchFlags ( HashJoinTable  hashtable)

Definition at line 2328 of file nodeHash.c.

2329 {
2330  HashJoinTuple tuple;
2331  int i;
2332 
2333  /* Reset all flags in the main table ... */
2334  for (i = 0; i < hashtable->nbuckets; i++)
2335  {
2336  for (tuple = hashtable->buckets.unshared[i]; tuple != NULL;
2337  tuple = tuple->next.unshared)
2339  }
2340 
2341  /* ... and the same for the skew buckets, if any */
2342  for (i = 0; i < hashtable->nSkewBuckets; i++)
2343  {
2344  int j = hashtable->skewBucketNums[i];
2345  HashSkewBucket *skewBucket = hashtable->skewBucket[j];
2346 
2347  for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared)
2349  }
2350 }
int j
Definition: isn.c:74
HashJoinTuple tuples
Definition: hashjoin.h:116

References HashJoinTableData::buckets, HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, i, j, HashJoinTableData::nbuckets, HashJoinTupleData::next, HashJoinTableData::nSkewBuckets, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketNums, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecReScanHashJoin().

◆ ExecInitHash()

HashState* ExecInitHash ( Hash node,
EState estate,
int  eflags 
)

Definition at line 361 of file nodeHash.c.

362 {
363  HashState *hashstate;
364 
365  /* check for unsupported flags */
366  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
367 
368  /*
369  * create state structure
370  */
371  hashstate = makeNode(HashState);
372  hashstate->ps.plan = (Plan *) node;
373  hashstate->ps.state = estate;
374  hashstate->ps.ExecProcNode = ExecHash;
375  hashstate->hashtable = NULL;
376  hashstate->hashkeys = NIL; /* will be set by parent HashJoin */
377 
378  /*
379  * Miscellaneous initialization
380  *
381  * create expression context for node
382  */
383  ExecAssignExprContext(estate, &hashstate->ps);
384 
385  /*
386  * initialize child nodes
387  */
388  outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags);
389 
390  /*
391  * initialize our result slot and type. No need to build projection
392  * because this node doesn't do projections.
393  */
395  hashstate->ps.ps_ProjInfo = NULL;
396 
397  /*
398  * initialize child expressions
399  */
400  Assert(node->plan.qual == NIL);
401  hashstate->hashkeys =
402  ExecInitExprList(node->hashkeys, (PlanState *) hashstate);
403 
404  return hashstate;
405 }
List * ExecInitExprList(List *nodes, PlanState *parent)
Definition: execExpr.c:320
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:142
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1798
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:85
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:488
#define EXEC_FLAG_BACKWARD
Definition: executor.h:68
#define EXEC_FLAG_MARK
Definition: executor.h:69
static TupleTableSlot * ExecHash(PlanState *pstate)
Definition: nodeHash.c:92
#define makeNode(_type_)
Definition: nodes.h:155
#define NIL
Definition: pg_list.h:68
HashJoinTable hashtable
Definition: execnodes.h:2668
List * hashkeys
Definition: execnodes.h:2669
List * hashkeys
Definition: plannodes.h:1203
EState * state
Definition: execnodes.h:1045
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:1083
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:1049
List * qual
Definition: plannodes.h:153

References Assert(), EXEC_FLAG_BACKWARD, EXEC_FLAG_MARK, ExecAssignExprContext(), ExecHash(), ExecInitExprList(), ExecInitNode(), ExecInitResultTupleSlotTL(), PlanState::ExecProcNode, HashState::hashkeys, Hash::hashkeys, HashState::hashtable, makeNode, NIL, outerPlan, outerPlanState, PlanState::plan, Hash::plan, HashState::ps, PlanState::ps_ProjInfo, Plan::qual, PlanState::state, and TTSOpsMinimalTuple.

Referenced by ExecInitNode().

◆ ExecParallelHashTableAlloc()

void ExecParallelHashTableAlloc ( HashJoinTable  hashtable,
int  batchno 
)

Definition at line 3263 of file nodeHash.c.

3264 {
3265  ParallelHashJoinBatch *batch = hashtable->batches[batchno].shared;
3266  dsa_pointer_atomic *buckets;
3267  int nbuckets = hashtable->parallel_state->nbuckets;
3268  int i;
3269 
3270  batch->buckets =
3271  dsa_allocate(hashtable->area, sizeof(dsa_pointer_atomic) * nbuckets);
3272  buckets = (dsa_pointer_atomic *)
3273  dsa_get_address(hashtable->area, batch->buckets);
3274  for (i = 0; i < nbuckets; ++i)
3276 }
#define dsa_pointer_atomic_init
Definition: dsa.h:64
#define dsa_allocate(area, size)
Definition: dsa.h:84

References HashJoinTableData::area, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, dsa_allocate, dsa_get_address(), dsa_pointer_atomic_init, i, InvalidDsaPointer, ParallelHashJoinState::nbuckets, HashJoinTableData::parallel_state, and ParallelHashJoinBatchAccessor::shared.

Referenced by ExecHashTableCreate(), and ExecParallelHashJoinNewBatch().

◆ ExecParallelHashTableInsert()

void ExecParallelHashTableInsert ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1715 of file nodeHash.c.

1718 {
1719  bool shouldFree;
1720  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1721  dsa_pointer shared;
1722  int bucketno;
1723  int batchno;
1724 
1725 retry:
1726  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1727 
1728  if (batchno == 0)
1729  {
1730  HashJoinTuple hashTuple;
1731 
1732  /* Try to load it into memory. */
1735  hashTuple = ExecParallelHashTupleAlloc(hashtable,
1736  HJTUPLE_OVERHEAD + tuple->t_len,
1737  &shared);
1738  if (hashTuple == NULL)
1739  goto retry;
1740 
1741  /* Store the hash value in the HashJoinTuple header. */
1742  hashTuple->hashvalue = hashvalue;
1743  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1745 
1746  /* Push it onto the front of the bucket's list */
1747  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1748  hashTuple, shared);
1749  }
1750  else
1751  {
1752  size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
1753 
1754  Assert(batchno > 0);
1755 
1756  /* Try to preallocate space in the batch if necessary. */
1757  if (hashtable->batches[batchno].preallocated < tuple_size)
1758  {
1759  if (!ExecParallelHashTuplePrealloc(hashtable, batchno, tuple_size))
1760  goto retry;
1761  }
1762 
1763  Assert(hashtable->batches[batchno].preallocated >= tuple_size);
1764  hashtable->batches[batchno].preallocated -= tuple_size;
1765  sts_puttuple(hashtable->batches[batchno].inner_tuples, &hashvalue,
1766  tuple);
1767  }
1768  ++hashtable->batches[batchno].ntuples;
1769 
1770  if (shouldFree)
1771  heap_free_minimal_tuple(tuple);
1772 }
#define PHJ_BUILD_HASH_INNER
Definition: hashjoin.h:271
static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
Definition: nodeHash.c:3535
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
Definition: nodeHash.c:2950
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
Definition: nodeHash.c:3455
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
dsa_pointer_atomic * shared
Definition: hashjoin.h:313

References Assert(), BarrierPhase(), HashJoinTableData::batches, HashJoinTableData::buckets, ParallelHashJoinState::build_barrier, ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecParallelHashPushTuple(), ExecParallelHashTupleAlloc(), ExecParallelHashTuplePrealloc(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, ParallelHashJoinBatchAccessor::inner_tuples, MAXALIGN, ParallelHashJoinBatchAccessor::ntuples, HashJoinTableData::parallel_state, PHJ_BUILD_HASH_INNER, ParallelHashJoinBatchAccessor::preallocated, HashJoinTableData::shared, sts_puttuple(), and MinimalTupleData::t_len.

Referenced by MultiExecParallelHash().

◆ ExecParallelHashTableInsertCurrentBatch()

void ExecParallelHashTableInsertCurrentBatch ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1781 of file nodeHash.c.

1784 {
1785  bool shouldFree;
1786  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1787  HashJoinTuple hashTuple;
1788  dsa_pointer shared;
1789  int batchno;
1790  int bucketno;
1791 
1792  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1793  Assert(batchno == hashtable->curbatch);
1794  hashTuple = ExecParallelHashTupleAlloc(hashtable,
1795  HJTUPLE_OVERHEAD + tuple->t_len,
1796  &shared);
1797  hashTuple->hashvalue = hashvalue;
1798  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1800  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1801  hashTuple, shared);
1802 
1803  if (shouldFree)
1804  heap_free_minimal_tuple(tuple);
1805 }

References Assert(), HashJoinTableData::buckets, HashJoinTableData::curbatch, ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecParallelHashPushTuple(), ExecParallelHashTupleAlloc(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, HashJoinTableData::shared, and MinimalTupleData::t_len.

Referenced by ExecParallelHashJoinNewBatch().

◆ ExecParallelHashTableSetCurrentBatch()

void ExecParallelHashTableSetCurrentBatch ( HashJoinTable  hashtable,
int  batchno 
)

Definition at line 3473 of file nodeHash.c.

3474 {
3475  Assert(hashtable->batches[batchno].shared->buckets != InvalidDsaPointer);
3476 
3477  hashtable->curbatch = batchno;
3478  hashtable->buckets.shared = (dsa_pointer_atomic *)
3479  dsa_get_address(hashtable->area,
3480  hashtable->batches[batchno].shared->buckets);
3481  hashtable->nbuckets = hashtable->parallel_state->nbuckets;
3482  hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
3483  hashtable->current_chunk = NULL;
3485  hashtable->batches[batchno].at_least_one_chunk = false;
3486 }
dsa_pointer current_chunk_shared
Definition: hashjoin.h:374

References HashJoinTableData::area, Assert(), ParallelHashJoinBatchAccessor::at_least_one_chunk, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, HashJoinTableData::buckets, HashJoinTableData::curbatch, HashJoinTableData::current_chunk, HashJoinTableData::current_chunk_shared, dsa_get_address(), InvalidDsaPointer, HashJoinTableData::log2_nbuckets, my_log2(), ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, HashJoinTableData::parallel_state, ParallelHashJoinBatchAccessor::shared, and HashJoinTableData::shared.

Referenced by ExecParallelHashIncreaseNumBatches(), ExecParallelHashIncreaseNumBuckets(), ExecParallelHashJoinNewBatch(), and MultiExecParallelHash().

◆ ExecParallelPrepHashTableForUnmatched()

bool ExecParallelPrepHashTableForUnmatched ( HashJoinState hjstate)

Definition at line 2098 of file nodeHash.c.

2099 {
2100  HashJoinTable hashtable = hjstate->hj_HashTable;
2101  int curbatch = hashtable->curbatch;
2102  ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
2103 
2105 
2106  /*
2107  * It would not be deadlock-free to wait on the batch barrier, because it
2108  * is in PHJ_BATCH_PROBE phase, and thus processes attached to it have
2109  * already emitted tuples. Therefore, we'll hold a wait-free election:
2110  * only one process can continue to the next phase, and all others detach
2111  * from this batch. They can still go any work on other batches, if there
2112  * are any.
2113  */
2115  {
2116  /* This process considers the batch to be done. */
2117  hashtable->batches[hashtable->curbatch].done = true;
2118 
2119  /* Make sure any temporary files are closed. */
2120  sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
2121  sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
2122 
2123  /*
2124  * Track largest batch we've seen, which would normally happen in
2125  * ExecHashTableDetachBatch().
2126  */
2127  hashtable->spacePeak =
2128  Max(hashtable->spacePeak,
2129  batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
2130  hashtable->curbatch = -1;
2131  return false;
2132  }
2133 
2134  /* Now we are alone with this batch. */
2136 
2137  /*
2138  * Has another process decided to give up early and command all processes
2139  * to skip the unmatched scan?
2140  */
2141  if (batch->skip_unmatched)
2142  {
2143  hashtable->batches[hashtable->curbatch].done = true;
2144  ExecHashTableDetachBatch(hashtable);
2145  return false;
2146  }
2147 
2148  /* Now prepare the process local state, just as for non-parallel join. */
2150 
2151  return true;
2152 }
void ExecHashTableDetachBatch(HashJoinTable hashtable)
Definition: nodeHash.c:3283
void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
Definition: nodeHash.c:2077
HashJoinTable hj_HashTable
Definition: execnodes.h:2116

References Assert(), BarrierArriveAndDetachExceptLast(), BarrierPhase(), ParallelHashJoinBatch::batch_barrier, HashJoinTableData::batches, HashJoinTableData::curbatch, ParallelHashJoinBatchAccessor::done, ExecHashTableDetachBatch(), ExecPrepHashTableForUnmatched(), HashJoinState::hj_HashTable, ParallelHashJoinBatchAccessor::inner_tuples, Max, HashJoinTableData::nbuckets, ParallelHashJoinBatchAccessor::outer_tuples, PHJ_BATCH_PROBE, PHJ_BATCH_SCAN, ParallelHashJoinBatchAccessor::shared, ParallelHashJoinBatch::size, ParallelHashJoinBatch::skip_unmatched, HashJoinTableData::spacePeak, and sts_end_parallel_scan().

Referenced by ExecHashJoinImpl().

◆ ExecParallelScanHashBucket()

bool ExecParallelScanHashBucket ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 2026 of file nodeHash.c.

2028 {
2029  ExprState *hjclauses = hjstate->hashclauses;
2030  HashJoinTable hashtable = hjstate->hj_HashTable;
2031  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2032  uint32 hashvalue = hjstate->hj_CurHashValue;
2033 
2034  /*
2035  * hj_CurTuple is the address of the tuple last returned from the current
2036  * bucket, or NULL if it's time to start scanning a new bucket.
2037  */
2038  if (hashTuple != NULL)
2039  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2040  else
2041  hashTuple = ExecParallelHashFirstTuple(hashtable,
2042  hjstate->hj_CurBucketNo);
2043 
2044  while (hashTuple != NULL)
2045  {
2046  if (hashTuple->hashvalue == hashvalue)
2047  {
2048  TupleTableSlot *inntuple;
2049 
2050  /* insert hashtable's tuple into exec slot so ExecQual sees it */
2051  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2052  hjstate->hj_HashTupleSlot,
2053  false); /* do not pfree */
2054  econtext->ecxt_innertuple = inntuple;
2055 
2056  if (ExecQualAndReset(hjclauses, econtext))
2057  {
2058  hjstate->hj_CurTuple = hashTuple;
2059  return true;
2060  }
2061  }
2062 
2063  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2064  }
2065 
2066  /*
2067  * no match
2068  */
2069  return false;
2070 }
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1445
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
Definition: executor.h:439
static HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno)
Definition: nodeHash.c:3425
static HashJoinTuple ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple)
Definition: nodeHash.c:3441
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:257
HashJoinTuple hj_CurTuple
Definition: execnodes.h:2120
ExprState * hashclauses
Definition: execnodes.h:2112
uint32 hj_CurHashValue
Definition: execnodes.h:2117
int hj_CurBucketNo
Definition: execnodes.h:2118
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:2122

References ExprContext::ecxt_innertuple, ExecParallelHashFirstTuple(), ExecParallelHashNextTuple(), ExecQualAndReset(), ExecStoreMinimalTuple(), HashJoinState::hashclauses, HashJoinTupleData::hashvalue, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurHashValue, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, and HJTUPLE_MINTUPLE.

Referenced by ExecHashJoinImpl().

◆ ExecParallelScanHashTableForUnmatched()

bool ExecParallelScanHashTableForUnmatched ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 2237 of file nodeHash.c.

2239 {
2240  HashJoinTable hashtable = hjstate->hj_HashTable;
2241  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2242 
2243  for (;;)
2244  {
2245  /*
2246  * hj_CurTuple is the address of the tuple last returned from the
2247  * current bucket, or NULL if it's time to start scanning a new
2248  * bucket.
2249  */
2250  if (hashTuple != NULL)
2251  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2252  else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
2253  hashTuple = ExecParallelHashFirstTuple(hashtable,
2254  hjstate->hj_CurBucketNo++);
2255  else
2256  break; /* finished all buckets */
2257 
2258  while (hashTuple != NULL)
2259  {
2260  if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
2261  {
2262  TupleTableSlot *inntuple;
2263 
2264  /* insert hashtable's tuple into exec slot */
2265  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2266  hjstate->hj_HashTupleSlot,
2267  false); /* do not pfree */
2268  econtext->ecxt_innertuple = inntuple;
2269 
2270  /*
2271  * Reset temp memory each time; although this function doesn't
2272  * do any qual eval, the caller will, so let's keep it
2273  * parallel to ExecScanHashBucket.
2274  */
2275  ResetExprContext(econtext);
2276 
2277  hjstate->hj_CurTuple = hashTuple;
2278  return true;
2279  }
2280 
2281  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2282  }
2283 
2284  /* allow this loop to be cancellable */
2286  }
2287 
2288  /*
2289  * no more unmatched tuples
2290  */
2291  return false;
2292 }
#define HeapTupleHeaderHasMatch(tup)
Definition: htup_details.h:514
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122

References CHECK_FOR_INTERRUPTS, ExprContext::ecxt_innertuple, ExecParallelHashFirstTuple(), ExecParallelHashNextTuple(), ExecStoreMinimalTuple(), HeapTupleHeaderHasMatch, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, HashJoinTableData::nbuckets, and ResetExprContext.

Referenced by ExecHashJoinImpl().

◆ ExecPrepHashTableForUnmatched()

void ExecPrepHashTableForUnmatched ( HashJoinState hjstate)

Definition at line 2077 of file nodeHash.c.

2078 {
2079  /*----------
2080  * During this scan we use the HashJoinState fields as follows:
2081  *
2082  * hj_CurBucketNo: next regular bucket to scan
2083  * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
2084  * hj_CurTuple: last tuple returned, or NULL to start next bucket
2085  *----------
2086  */
2087  hjstate->hj_CurBucketNo = 0;
2088  hjstate->hj_CurSkewBucketNo = 0;
2089  hjstate->hj_CurTuple = NULL;
2090 }
int hj_CurSkewBucketNo
Definition: execnodes.h:2119

References HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurSkewBucketNo, and HashJoinState::hj_CurTuple.

Referenced by ExecHashJoinImpl(), and ExecParallelPrepHashTableForUnmatched().

◆ ExecReScanHash()

void ExecReScanHash ( HashState node)

Definition at line 2354 of file nodeHash.c.

2355 {
2357 
2358  /*
2359  * if chgParam of subnode is not null then plan will be re-scanned by
2360  * first ExecProcNode.
2361  */
2362  if (outerPlan->chgParam == NULL)
2364 }
void ExecReScan(PlanState *node)
Definition: execAmi.c:78

References ExecReScan(), outerPlan, and outerPlanState.

Referenced by ExecReScan().

◆ ExecScanHashBucket()

bool ExecScanHashBucket ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 1965 of file nodeHash.c.

1967 {
1968  ExprState *hjclauses = hjstate->hashclauses;
1969  HashJoinTable hashtable = hjstate->hj_HashTable;
1970  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
1971  uint32 hashvalue = hjstate->hj_CurHashValue;
1972 
1973  /*
1974  * hj_CurTuple is the address of the tuple last returned from the current
1975  * bucket, or NULL if it's time to start scanning a new bucket.
1976  *
1977  * If the tuple hashed to a skew bucket then scan the skew bucket
1978  * otherwise scan the standard hashtable bucket.
1979  */
1980  if (hashTuple != NULL)
1981  hashTuple = hashTuple->next.unshared;
1982  else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
1983  hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
1984  else
1985  hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
1986 
1987  while (hashTuple != NULL)
1988  {
1989  if (hashTuple->hashvalue == hashvalue)
1990  {
1991  TupleTableSlot *inntuple;
1992 
1993  /* insert hashtable's tuple into exec slot so ExecQual sees it */
1994  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
1995  hjstate->hj_HashTupleSlot,
1996  false); /* do not pfree */
1997  econtext->ecxt_innertuple = inntuple;
1998 
1999  if (ExecQualAndReset(hjclauses, econtext))
2000  {
2001  hjstate->hj_CurTuple = hashTuple;
2002  return true;
2003  }
2004  }
2005 
2006  hashTuple = hashTuple->next.unshared;
2007  }
2008 
2009  /*
2010  * no match
2011  */
2012  return false;
2013 }

References HashJoinTableData::buckets, ExprContext::ecxt_innertuple, ExecQualAndReset(), ExecStoreMinimalTuple(), HashJoinState::hashclauses, HashJoinTupleData::hashvalue, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurHashValue, HashJoinState::hj_CurSkewBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, INVALID_SKEW_BUCKET_NO, HashJoinTupleData::next, HashJoinTableData::skewBucket, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

◆ ExecScanHashTableForUnmatched()

bool ExecScanHashTableForUnmatched ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 2163 of file nodeHash.c.

2164 {
2165  HashJoinTable hashtable = hjstate->hj_HashTable;
2166  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2167 
2168  for (;;)
2169  {
2170  /*
2171  * hj_CurTuple is the address of the tuple last returned from the
2172  * current bucket, or NULL if it's time to start scanning a new
2173  * bucket.
2174  */
2175  if (hashTuple != NULL)
2176  hashTuple = hashTuple->next.unshared;
2177  else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
2178  {
2179  hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
2180  hjstate->hj_CurBucketNo++;
2181  }
2182  else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
2183  {
2184  int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
2185 
2186  hashTuple = hashtable->skewBucket[j]->tuples;
2187  hjstate->hj_CurSkewBucketNo++;
2188  }
2189  else
2190  break; /* finished all buckets */
2191 
2192  while (hashTuple != NULL)
2193  {
2194  if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
2195  {
2196  TupleTableSlot *inntuple;
2197 
2198  /* insert hashtable's tuple into exec slot */
2199  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2200  hjstate->hj_HashTupleSlot,
2201  false); /* do not pfree */
2202  econtext->ecxt_innertuple = inntuple;
2203 
2204  /*
2205  * Reset temp memory each time; although this function doesn't
2206  * do any qual eval, the caller will, so let's keep it
2207  * parallel to ExecScanHashBucket.
2208  */
2209  ResetExprContext(econtext);
2210 
2211  hjstate->hj_CurTuple = hashTuple;
2212  return true;
2213  }
2214 
2215  hashTuple = hashTuple->next.unshared;
2216  }
2217 
2218  /* allow this loop to be cancellable */
2220  }
2221 
2222  /*
2223  * no more unmatched tuples
2224  */
2225  return false;
2226 }

References HashJoinTableData::buckets, CHECK_FOR_INTERRUPTS, ExprContext::ecxt_innertuple, ExecStoreMinimalTuple(), HeapTupleHeaderHasMatch, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurSkewBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, j, HashJoinTableData::nbuckets, HashJoinTupleData::next, HashJoinTableData::nSkewBuckets, ResetExprContext, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketNums, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

◆ ExecShutdownHash()

void ExecShutdownHash ( HashState node)

Definition at line 2805 of file nodeHash.c.

2806 {
2807  /* Allocate save space if EXPLAIN'ing and we didn't do so already */
2808  if (node->ps.instrument && !node->hinstrument)
2810  /* Now accumulate data for the current (final) hash table */
2811  if (node->hinstrument && node->hashtable)
2813 }
#define palloc0_object(type)
Definition: fe_memutils.h:63
void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
Definition: nodeHash.c:2851

References ExecHashAccumInstrumentation(), HashState::hashtable, HashState::hinstrument, PlanState::instrument, palloc0_object, and HashState::ps.

Referenced by ExecShutdownNode_walker().

◆ MultiExecHash()

Node* MultiExecHash ( HashState node)

Definition at line 106 of file nodeHash.c.

107 {
108  /* must provide our own instrumentation support */
109  if (node->ps.instrument)
111 
112  if (node->parallel_state != NULL)
113  MultiExecParallelHash(node);
114  else
115  MultiExecPrivateHash(node);
116 
117  /* must provide our own instrumentation support */
118  if (node->ps.instrument)
120 
121  /*
122  * We do not return the hash table directly because it's not a subtype of
123  * Node, and so would violate the MultiExecProcNode API. Instead, our
124  * parent Hashjoin node is expected to know how to fish it out of our node
125  * state. Ugly but not really worth cleaning up, since Hashjoin knows
126  * quite a bit more about Hash besides that.
127  */
128  return NULL;
129 }
void InstrStartNode(Instrumentation *instr)
Definition: instrument.c:68
void InstrStopNode(Instrumentation *instr, double nTuples)
Definition: instrument.c:84
static void MultiExecParallelHash(HashState *node)
Definition: nodeHash.c:215
static void MultiExecPrivateHash(HashState *node)
Definition: nodeHash.c:139
struct ParallelHashJoinState * parallel_state
Definition: execnodes.h:2687

References HashState::hashtable, InstrStartNode(), InstrStopNode(), PlanState::instrument, MultiExecParallelHash(), MultiExecPrivateHash(), HashState::parallel_state, HashJoinTableData::partialTuples, and HashState::ps.

Referenced by MultiExecProcNode().