PostgreSQL Source Code  git master
nodeHash.h File Reference
#include "access/parallel.h"
#include "nodes/execnodes.h"
Include dependency graph for nodeHash.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

HashStateExecInitHash (Hash *node, EState *estate, int eflags)
 
NodeMultiExecHash (HashState *node)
 
void ExecEndHash (HashState *node)
 
void ExecReScanHash (HashState *node)
 
HashJoinTable ExecHashTableCreate (HashState *state, List *hashOperators, List *hashCollations, bool keepNulls)
 
void ExecParallelHashTableAlloc (HashJoinTable hashtable, int batchno)
 
void ExecHashTableDestroy (HashJoinTable hashtable)
 
void ExecHashTableDetach (HashJoinTable hashtable)
 
void ExecHashTableDetachBatch (HashJoinTable hashtable)
 
void ExecParallelHashTableSetCurrentBatch (HashJoinTable hashtable, int batchno)
 
void ExecHashTableInsert (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
void ExecParallelHashTableInsert (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
void ExecParallelHashTableInsertCurrentBatch (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
bool ExecHashGetHashValue (HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, bool outer_tuple, bool keep_nulls, uint32 *hashvalue)
 
void ExecHashGetBucketAndBatch (HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
 
bool ExecScanHashBucket (HashJoinState *hjstate, ExprContext *econtext)
 
bool ExecParallelScanHashBucket (HashJoinState *hjstate, ExprContext *econtext)
 
void ExecPrepHashTableForUnmatched (HashJoinState *hjstate)
 
bool ExecParallelPrepHashTableForUnmatched (HashJoinState *hjstate)
 
bool ExecScanHashTableForUnmatched (HashJoinState *hjstate, ExprContext *econtext)
 
bool ExecParallelScanHashTableForUnmatched (HashJoinState *hjstate, ExprContext *econtext)
 
void ExecHashTableReset (HashJoinTable hashtable)
 
void ExecHashTableResetMatchFlags (HashJoinTable hashtable)
 
void ExecChooseHashTableSize (double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
 
int ExecHashGetSkewBucket (HashJoinTable hashtable, uint32 hashvalue)
 
void ExecHashEstimate (HashState *node, ParallelContext *pcxt)
 
void ExecHashInitializeDSM (HashState *node, ParallelContext *pcxt)
 
void ExecHashInitializeWorker (HashState *node, ParallelWorkerContext *pwcxt)
 
void ExecHashRetrieveInstrumentation (HashState *node)
 
void ExecShutdownHash (HashState *node)
 
void ExecHashAccumInstrumentation (HashInstrumentation *instrument, HashJoinTable hashtable)
 

Function Documentation

◆ ExecChooseHashTableSize()

void ExecChooseHashTableSize ( double  ntuples,
int  tupwidth,
bool  useskew,
bool  try_combined_hash_mem,
int  parallel_workers,
size_t *  space_allowed,
int *  numbuckets,
int *  numbatches,
int *  num_skew_mcvs 
)

Definition at line 675 of file nodeHash.c.

682 {
683  int tupsize;
684  double inner_rel_bytes;
685  size_t hash_table_bytes;
686  size_t bucket_bytes;
687  size_t max_pointers;
688  int nbatch = 1;
689  int nbuckets;
690  double dbuckets;
691 
692  /* Force a plausible relation size if no info */
693  if (ntuples <= 0.0)
694  ntuples = 1000.0;
695 
696  /*
697  * Estimate tupsize based on footprint of tuple in hashtable... note this
698  * does not allow for any palloc overhead. The manipulations of spaceUsed
699  * don't count palloc overhead either.
700  */
701  tupsize = HJTUPLE_OVERHEAD +
703  MAXALIGN(tupwidth);
704  inner_rel_bytes = ntuples * tupsize;
705 
706  /*
707  * Compute in-memory hashtable size limit from GUCs.
708  */
709  hash_table_bytes = get_hash_memory_limit();
710 
711  /*
712  * Parallel Hash tries to use the combined hash_mem of all workers to
713  * avoid the need to batch. If that won't work, it falls back to hash_mem
714  * per worker and tries to process batches in parallel.
715  */
716  if (try_combined_hash_mem)
717  {
718  /* Careful, this could overflow size_t */
719  double newlimit;
720 
721  newlimit = (double) hash_table_bytes * (double) (parallel_workers + 1);
722  newlimit = Min(newlimit, (double) SIZE_MAX);
723  hash_table_bytes = (size_t) newlimit;
724  }
725 
726  *space_allowed = hash_table_bytes;
727 
728  /*
729  * If skew optimization is possible, estimate the number of skew buckets
730  * that will fit in the memory allowed, and decrement the assumed space
731  * available for the main hash table accordingly.
732  *
733  * We make the optimistic assumption that each skew bucket will contain
734  * one inner-relation tuple. If that turns out to be low, we will recover
735  * at runtime by reducing the number of skew buckets.
736  *
737  * hashtable->skewBucket will have up to 8 times as many HashSkewBucket
738  * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash
739  * will round up to the next power of 2 and then multiply by 4 to reduce
740  * collisions.
741  */
742  if (useskew)
743  {
744  size_t bytes_per_mcv;
745  size_t skew_mcvs;
746 
747  /*----------
748  * Compute number of MCVs we could hold in hash_table_bytes
749  *
750  * Divisor is:
751  * size of a hash tuple +
752  * worst-case size of skewBucket[] per MCV +
753  * size of skewBucketNums[] entry +
754  * size of skew bucket struct itself
755  *----------
756  */
757  bytes_per_mcv = tupsize +
758  (8 * sizeof(HashSkewBucket *)) +
759  sizeof(int) +
761  skew_mcvs = hash_table_bytes / bytes_per_mcv;
762 
763  /*
764  * Now scale by SKEW_HASH_MEM_PERCENT (we do it in this order so as
765  * not to worry about size_t overflow in the multiplication)
766  */
767  skew_mcvs = (skew_mcvs * SKEW_HASH_MEM_PERCENT) / 100;
768 
769  /* Now clamp to integer range */
770  skew_mcvs = Min(skew_mcvs, INT_MAX);
771 
772  *num_skew_mcvs = (int) skew_mcvs;
773 
774  /* Reduce hash_table_bytes by the amount needed for the skew table */
775  if (skew_mcvs > 0)
776  hash_table_bytes -= skew_mcvs * bytes_per_mcv;
777  }
778  else
779  *num_skew_mcvs = 0;
780 
781  /*
782  * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when
783  * memory is filled, assuming a single batch; but limit the value so that
784  * the pointer arrays we'll try to allocate do not exceed hash_table_bytes
785  * nor MaxAllocSize.
786  *
787  * Note that both nbuckets and nbatch must be powers of 2 to make
788  * ExecHashGetBucketAndBatch fast.
789  */
790  max_pointers = hash_table_bytes / sizeof(HashJoinTuple);
791  max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));
792  /* If max_pointers isn't a power of 2, must round it down to one */
793  max_pointers = pg_prevpower2_size_t(max_pointers);
794 
795  /* Also ensure we avoid integer overflow in nbatch and nbuckets */
796  /* (this step is redundant given the current value of MaxAllocSize) */
797  max_pointers = Min(max_pointers, INT_MAX / 2 + 1);
798 
799  dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
800  dbuckets = Min(dbuckets, max_pointers);
801  nbuckets = (int) dbuckets;
802  /* don't let nbuckets be really small, though ... */
803  nbuckets = Max(nbuckets, 1024);
804  /* ... and force it to be a power of 2. */
805  nbuckets = pg_nextpower2_32(nbuckets);
806 
807  /*
808  * If there's not enough space to store the projected number of tuples and
809  * the required bucket headers, we will need multiple batches.
810  */
811  bucket_bytes = sizeof(HashJoinTuple) * nbuckets;
812  if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
813  {
814  /* We'll need multiple batches */
815  size_t sbuckets;
816  double dbatch;
817  int minbatch;
818  size_t bucket_size;
819 
820  /*
821  * If Parallel Hash with combined hash_mem would still need multiple
822  * batches, we'll have to fall back to regular hash_mem budget.
823  */
824  if (try_combined_hash_mem)
825  {
826  ExecChooseHashTableSize(ntuples, tupwidth, useskew,
827  false, parallel_workers,
828  space_allowed,
829  numbuckets,
830  numbatches,
831  num_skew_mcvs);
832  return;
833  }
834 
835  /*
836  * Estimate the number of buckets we'll want to have when hash_mem is
837  * entirely full. Each bucket will contain a bucket pointer plus
838  * NTUP_PER_BUCKET tuples, whose projected size already includes
839  * overhead for the hash code, pointer to the next tuple, etc.
840  */
841  bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple));
842  if (hash_table_bytes <= bucket_size)
843  sbuckets = 1; /* avoid pg_nextpower2_size_t(0) */
844  else
845  sbuckets = pg_nextpower2_size_t(hash_table_bytes / bucket_size);
846  sbuckets = Min(sbuckets, max_pointers);
847  nbuckets = (int) sbuckets;
848  nbuckets = pg_nextpower2_32(nbuckets);
849  bucket_bytes = nbuckets * sizeof(HashJoinTuple);
850 
851  /*
852  * Buckets are simple pointers to hashjoin tuples, while tupsize
853  * includes the pointer, hash code, and MinimalTupleData. So buckets
854  * should never really exceed 25% of hash_mem (even for
855  * NTUP_PER_BUCKET=1); except maybe for hash_mem values that are not
856  * 2^N bytes, where we might get more because of doubling. So let's
857  * look for 50% here.
858  */
859  Assert(bucket_bytes <= hash_table_bytes / 2);
860 
861  /* Calculate required number of batches. */
862  dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
863  dbatch = Min(dbatch, max_pointers);
864  minbatch = (int) dbatch;
865  nbatch = pg_nextpower2_32(Max(2, minbatch));
866  }
867 
868  Assert(nbuckets > 0);
869  Assert(nbatch > 0);
870 
871  *numbuckets = nbuckets;
872  *numbatches = nbatch;
873 }
#define Min(x, y)
Definition: c.h:1004
#define MAXALIGN(LEN)
Definition: c.h:811
#define Max(x, y)
Definition: c.h:998
#define Assert(condition)
Definition: c.h:858
struct HashJoinTupleData * HashJoinTuple
Definition: execnodes.h:2180
#define HJTUPLE_OVERHEAD
Definition: hashjoin.h:90
#define SKEW_BUCKET_OVERHEAD
Definition: hashjoin.h:119
#define SKEW_HASH_MEM_PERCENT
Definition: hashjoin.h:121
#define SizeofMinimalTupleHeader
Definition: htup_details.h:647
#define MaxAllocSize
Definition: memutils.h:40
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
Definition: nodeHash.c:675
#define NTUP_PER_BUCKET
Definition: nodeHash.c:672
size_t get_hash_memory_limit(void)
Definition: nodeHash.c:3595
static uint32 pg_nextpower2_32(uint32 num)
Definition: pg_bitutils.h:189
#define pg_nextpower2_size_t
Definition: pg_bitutils.h:417
#define pg_prevpower2_size_t
Definition: pg_bitutils.h:418

References Assert, get_hash_memory_limit(), HJTUPLE_OVERHEAD, Max, MAXALIGN, MaxAllocSize, Min, NTUP_PER_BUCKET, pg_nextpower2_32(), pg_nextpower2_size_t, pg_prevpower2_size_t, SizeofMinimalTupleHeader, SKEW_BUCKET_OVERHEAD, and SKEW_HASH_MEM_PERCENT.

Referenced by ExecHashTableCreate(), and initial_cost_hashjoin().

◆ ExecEndHash()

void ExecEndHash ( HashState node)

Definition at line 413 of file nodeHash.c.

414 {
416 
417  /*
418  * shut down the subplan
419  */
420  outerPlan = outerPlanState(node);
422 }
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:557
#define outerPlanState(node)
Definition: execnodes.h:1213
#define outerPlan(node)
Definition: plannodes.h:182

References ExecEndNode(), outerPlan, and outerPlanState.

Referenced by ExecEndNode().

◆ ExecHashAccumInstrumentation()

void ExecHashAccumInstrumentation ( HashInstrumentation instrument,
HashJoinTable  hashtable 
)

Definition at line 2850 of file nodeHash.c.

2852 {
2853  instrument->nbuckets = Max(instrument->nbuckets,
2854  hashtable->nbuckets);
2855  instrument->nbuckets_original = Max(instrument->nbuckets_original,
2856  hashtable->nbuckets_original);
2857  instrument->nbatch = Max(instrument->nbatch,
2858  hashtable->nbatch);
2859  instrument->nbatch_original = Max(instrument->nbatch_original,
2860  hashtable->nbatch_original);
2861  instrument->space_peak = Max(instrument->space_peak,
2862  hashtable->spacePeak);
2863 }

References Max, HashJoinTableData::nbatch, HashInstrumentation::nbatch, HashJoinTableData::nbatch_original, HashInstrumentation::nbatch_original, HashJoinTableData::nbuckets, HashInstrumentation::nbuckets, HashJoinTableData::nbuckets_original, HashInstrumentation::nbuckets_original, HashInstrumentation::space_peak, and HashJoinTableData::spacePeak.

Referenced by ExecReScanHashJoin(), and ExecShutdownHash().

◆ ExecHashEstimate()

void ExecHashEstimate ( HashState node,
ParallelContext pcxt 
)

Definition at line 2734 of file nodeHash.c.

2735 {
2736  size_t size;
2737 
2738  /* don't need this if not instrumenting or no workers */
2739  if (!node->ps.instrument || pcxt->nworkers == 0)
2740  return;
2741 
2742  size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation));
2743  size = add_size(size, offsetof(SharedHashInfo, hinstrument));
2745  shm_toc_estimate_keys(&pcxt->estimator, 1);
2746 }
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
static pg_noinline void Size size
Definition: slab.c:607
PlanState ps
Definition: execnodes.h:2740
shm_toc_estimator estimator
Definition: parallel.h:41
Instrumentation * instrument
Definition: execnodes.h:1127

References add_size(), ParallelContext::estimator, PlanState::instrument, mul_size(), ParallelContext::nworkers, HashState::ps, shm_toc_estimate_chunk, shm_toc_estimate_keys, and size.

Referenced by ExecParallelEstimate().

◆ ExecHashGetBucketAndBatch()

void ExecHashGetBucketAndBatch ( HashJoinTable  hashtable,
uint32  hashvalue,
int *  bucketno,
int *  batchno 
)

Definition at line 1932 of file nodeHash.c.

1936 {
1937  uint32 nbuckets = (uint32) hashtable->nbuckets;
1938  uint32 nbatch = (uint32) hashtable->nbatch;
1939 
1940  if (nbatch > 1)
1941  {
1942  *bucketno = hashvalue & (nbuckets - 1);
1943  *batchno = pg_rotate_right32(hashvalue,
1944  hashtable->log2_nbuckets) & (nbatch - 1);
1945  }
1946  else
1947  {
1948  *bucketno = hashvalue & (nbuckets - 1);
1949  *batchno = 0;
1950  }
1951 }
unsigned int uint32
Definition: c.h:506
static uint32 pg_rotate_right32(uint32 word, int n)
Definition: pg_bitutils.h:398

References HashJoinTableData::log2_nbuckets, HashJoinTableData::nbatch, HashJoinTableData::nbuckets, and pg_rotate_right32().

Referenced by ExecHashIncreaseNumBatches(), ExecHashIncreaseNumBuckets(), ExecHashJoinImpl(), ExecHashRemoveNextSkewBucket(), ExecHashTableInsert(), ExecParallelHashIncreaseNumBuckets(), ExecParallelHashJoinPartitionOuter(), ExecParallelHashRepartitionFirst(), ExecParallelHashRepartitionRest(), ExecParallelHashTableInsert(), and ExecParallelHashTableInsertCurrentBatch().

◆ ExecHashGetHashValue()

bool ExecHashGetHashValue ( HashJoinTable  hashtable,
ExprContext econtext,
List hashkeys,
bool  outer_tuple,
bool  keep_nulls,
uint32 hashvalue 
)

Definition at line 1824 of file nodeHash.c.

1830 {
1831  uint32 hashkey = 0;
1832  FmgrInfo *hashfunctions;
1833  ListCell *hk;
1834  int i = 0;
1835  MemoryContext oldContext;
1836 
1837  /*
1838  * We reset the eval context each time to reclaim any memory leaked in the
1839  * hashkey expressions.
1840  */
1841  ResetExprContext(econtext);
1842 
1843  oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
1844 
1845  if (outer_tuple)
1846  hashfunctions = hashtable->outer_hashfunctions;
1847  else
1848  hashfunctions = hashtable->inner_hashfunctions;
1849 
1850  foreach(hk, hashkeys)
1851  {
1852  ExprState *keyexpr = (ExprState *) lfirst(hk);
1853  Datum keyval;
1854  bool isNull;
1855 
1856  /* combine successive hashkeys by rotating */
1857  hashkey = pg_rotate_left32(hashkey, 1);
1858 
1859  /*
1860  * Get the join attribute value of the tuple
1861  */
1862  keyval = ExecEvalExpr(keyexpr, econtext, &isNull);
1863 
1864  /*
1865  * If the attribute is NULL, and the join operator is strict, then
1866  * this tuple cannot pass the join qual so we can reject it
1867  * immediately (unless we're scanning the outside of an outer join, in
1868  * which case we must not reject it). Otherwise we act like the
1869  * hashcode of NULL is zero (this will support operators that act like
1870  * IS NOT DISTINCT, though not any more-random behavior). We treat
1871  * the hash support function as strict even if the operator is not.
1872  *
1873  * Note: currently, all hashjoinable operators must be strict since
1874  * the hash index AM assumes that. However, it takes so little extra
1875  * code here to allow non-strict that we may as well do it.
1876  */
1877  if (isNull)
1878  {
1879  if (hashtable->hashStrict[i] && !keep_nulls)
1880  {
1881  MemoryContextSwitchTo(oldContext);
1882  return false; /* cannot match */
1883  }
1884  /* else, leave hashkey unmodified, equivalent to hashcode 0 */
1885  }
1886  else
1887  {
1888  /* Compute the hash function */
1889  uint32 hkey;
1890 
1891  hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i], hashtable->collations[i], keyval));
1892  hashkey ^= hkey;
1893  }
1894 
1895  i++;
1896  }
1897 
1898  MemoryContextSwitchTo(oldContext);
1899 
1900  *hashvalue = hashkey;
1901  return true;
1902 }
#define ResetExprContext(econtext)
Definition: executor.h:544
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:333
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
Definition: fmgr.c:1129
int i
Definition: isn.c:73
static uint32 pg_rotate_left32(uint32 word, int n)
Definition: pg_bitutils.h:404
#define lfirst(lc)
Definition: pg_list.h:172
static uint32 DatumGetUInt32(Datum X)
Definition: postgres.h:222
uintptr_t Datum
Definition: postgres.h:64
MemoryContextSwitchTo(old_ctx)
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:263
Definition: fmgr.h:57
FmgrInfo * outer_hashfunctions
Definition: hashjoin.h:351
bool * hashStrict
Definition: hashjoin.h:353
FmgrInfo * inner_hashfunctions
Definition: hashjoin.h:352

References HashJoinTableData::collations, DatumGetUInt32(), ExprContext::ecxt_per_tuple_memory, ExecEvalExpr(), FunctionCall1Coll(), HashJoinTableData::hashStrict, i, HashJoinTableData::inner_hashfunctions, lfirst, MemoryContextSwitchTo(), HashJoinTableData::outer_hashfunctions, pg_rotate_left32(), and ResetExprContext.

Referenced by ExecHashJoinOuterGetTuple(), ExecParallelHashJoinOuterGetTuple(), ExecParallelHashJoinPartitionOuter(), MultiExecParallelHash(), and MultiExecPrivateHash().

◆ ExecHashGetSkewBucket()

int ExecHashGetSkewBucket ( HashJoinTable  hashtable,
uint32  hashvalue 
)

Definition at line 2528 of file nodeHash.c.

2529 {
2530  int bucket;
2531 
2532  /*
2533  * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization (in
2534  * particular, this happens after the initial batch is done).
2535  */
2536  if (!hashtable->skewEnabled)
2537  return INVALID_SKEW_BUCKET_NO;
2538 
2539  /*
2540  * Since skewBucketLen is a power of 2, we can do a modulo by ANDing.
2541  */
2542  bucket = hashvalue & (hashtable->skewBucketLen - 1);
2543 
2544  /*
2545  * While we have not hit a hole in the hashtable and have not hit the
2546  * desired bucket, we have collided with some other hash value, so try the
2547  * next bucket location.
2548  */
2549  while (hashtable->skewBucket[bucket] != NULL &&
2550  hashtable->skewBucket[bucket]->hashvalue != hashvalue)
2551  bucket = (bucket + 1) & (hashtable->skewBucketLen - 1);
2552 
2553  /*
2554  * Found the desired bucket?
2555  */
2556  if (hashtable->skewBucket[bucket] != NULL)
2557  return bucket;
2558 
2559  /*
2560  * There must not be any hashtable entry for this hash value.
2561  */
2562  return INVALID_SKEW_BUCKET_NO;
2563 }
#define INVALID_SKEW_BUCKET_NO
Definition: hashjoin.h:120
HashSkewBucket ** skewBucket
Definition: hashjoin.h:319
uint32 hashvalue
Definition: hashjoin.h:115

References HashSkewBucket::hashvalue, INVALID_SKEW_BUCKET_NO, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketLen, and HashJoinTableData::skewEnabled.

Referenced by ExecHashJoinImpl(), and MultiExecPrivateHash().

◆ ExecHashInitializeDSM()

void ExecHashInitializeDSM ( HashState node,
ParallelContext pcxt 
)

Definition at line 2753 of file nodeHash.c.

2754 {
2755  size_t size;
2756 
2757  /* don't need this if not instrumenting or no workers */
2758  if (!node->ps.instrument || pcxt->nworkers == 0)
2759  return;
2760 
2761  size = offsetof(SharedHashInfo, hinstrument) +
2762  pcxt->nworkers * sizeof(HashInstrumentation);
2763  node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size);
2764 
2765  /* Each per-worker area must start out as zeroes. */
2766  memset(node->shared_info, 0, size);
2767 
2768  node->shared_info->num_workers = pcxt->nworkers;
2769  shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id,
2770  node->shared_info);
2771 }
struct HashInstrumentation HashInstrumentation
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
SharedHashInfo * shared_info
Definition: execnodes.h:2750
shm_toc * toc
Definition: parallel.h:44
Plan * plan
Definition: execnodes.h:1117
int plan_node_id
Definition: plannodes.h:151

References PlanState::instrument, SharedHashInfo::num_workers, ParallelContext::nworkers, PlanState::plan, Plan::plan_node_id, HashState::ps, HashState::shared_info, shm_toc_allocate(), shm_toc_insert(), size, and ParallelContext::toc.

Referenced by ExecParallelInitializeDSM().

◆ ExecHashInitializeWorker()

void ExecHashInitializeWorker ( HashState node,
ParallelWorkerContext pwcxt 
)

Definition at line 2778 of file nodeHash.c.

2779 {
2780  SharedHashInfo *shared_info;
2781 
2782  /* don't need this if not instrumenting */
2783  if (!node->ps.instrument)
2784  return;
2785 
2786  /*
2787  * Find our entry in the shared area, and set up a pointer to it so that
2788  * we'll accumulate stats there when shutting down or rebuilding the hash
2789  * table.
2790  */
2791  shared_info = (SharedHashInfo *)
2792  shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
2793  node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber];
2794 }
int ParallelWorkerNumber
Definition: parallel.c:112
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
HashInstrumentation * hinstrument
Definition: execnodes.h:2757
HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER]
Definition: execnodes.h:2731

References SharedHashInfo::hinstrument, HashState::hinstrument, PlanState::instrument, ParallelWorkerNumber, PlanState::plan, Plan::plan_node_id, HashState::ps, shm_toc_lookup(), and ParallelWorkerContext::toc.

Referenced by ExecParallelInitializeWorker().

◆ ExecHashRetrieveInstrumentation()

void ExecHashRetrieveInstrumentation ( HashState node)

Definition at line 2819 of file nodeHash.c.

2820 {
2821  SharedHashInfo *shared_info = node->shared_info;
2822  size_t size;
2823 
2824  if (shared_info == NULL)
2825  return;
2826 
2827  /* Replace node->shared_info with a copy in backend-local memory. */
2828  size = offsetof(SharedHashInfo, hinstrument) +
2829  shared_info->num_workers * sizeof(HashInstrumentation);
2830  node->shared_info = palloc(size);
2831  memcpy(node->shared_info, shared_info, size);
2832 }
void * palloc(Size size)
Definition: mcxt.c:1316

References SharedHashInfo::num_workers, palloc(), HashState::shared_info, and size.

Referenced by ExecParallelRetrieveInstrumentation().

◆ ExecHashTableCreate()

HashJoinTable ExecHashTableCreate ( HashState state,
List hashOperators,
List hashCollations,
bool  keepNulls 
)

Definition at line 432 of file nodeHash.c.

433 {
434  Hash *node;
435  HashJoinTable hashtable;
436  Plan *outerNode;
437  size_t space_allowed;
438  int nbuckets;
439  int nbatch;
440  double rows;
441  int num_skew_mcvs;
442  int log2_nbuckets;
443  int nkeys;
444  int i;
445  ListCell *ho;
446  ListCell *hc;
447  MemoryContext oldcxt;
448 
449  /*
450  * Get information about the size of the relation to be hashed (it's the
451  * "outer" subtree of this node, but the inner relation of the hashjoin).
452  * Compute the appropriate size of the hash table.
453  */
454  node = (Hash *) state->ps.plan;
455  outerNode = outerPlan(node);
456 
457  /*
458  * If this is shared hash table with a partial plan, then we can't use
459  * outerNode->plan_rows to estimate its size. We need an estimate of the
460  * total number of rows across all copies of the partial plan.
461  */
462  rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows;
463 
464  ExecChooseHashTableSize(rows, outerNode->plan_width,
465  OidIsValid(node->skewTable),
466  state->parallel_state != NULL,
467  state->parallel_state != NULL ?
468  state->parallel_state->nparticipants - 1 : 0,
469  &space_allowed,
470  &nbuckets, &nbatch, &num_skew_mcvs);
471 
472  /* nbuckets must be a power of 2 */
473  log2_nbuckets = my_log2(nbuckets);
474  Assert(nbuckets == (1 << log2_nbuckets));
475 
476  /*
477  * Initialize the hash table control block.
478  *
479  * The hashtable control block is just palloc'd from the executor's
480  * per-query memory context. Everything else should be kept inside the
481  * subsidiary hashCxt, batchCxt or spillCxt.
482  */
483  hashtable = palloc_object(HashJoinTableData);
484  hashtable->nbuckets = nbuckets;
485  hashtable->nbuckets_original = nbuckets;
486  hashtable->nbuckets_optimal = nbuckets;
487  hashtable->log2_nbuckets = log2_nbuckets;
488  hashtable->log2_nbuckets_optimal = log2_nbuckets;
489  hashtable->buckets.unshared = NULL;
490  hashtable->keepNulls = keepNulls;
491  hashtable->skewEnabled = false;
492  hashtable->skewBucket = NULL;
493  hashtable->skewBucketLen = 0;
494  hashtable->nSkewBuckets = 0;
495  hashtable->skewBucketNums = NULL;
496  hashtable->nbatch = nbatch;
497  hashtable->curbatch = 0;
498  hashtable->nbatch_original = nbatch;
499  hashtable->nbatch_outstart = nbatch;
500  hashtable->growEnabled = true;
501  hashtable->totalTuples = 0;
502  hashtable->partialTuples = 0;
503  hashtable->skewTuples = 0;
504  hashtable->innerBatchFile = NULL;
505  hashtable->outerBatchFile = NULL;
506  hashtable->spaceUsed = 0;
507  hashtable->spacePeak = 0;
508  hashtable->spaceAllowed = space_allowed;
509  hashtable->spaceUsedSkew = 0;
510  hashtable->spaceAllowedSkew =
511  hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
512  hashtable->chunks = NULL;
513  hashtable->current_chunk = NULL;
514  hashtable->parallel_state = state->parallel_state;
515  hashtable->area = state->ps.state->es_query_dsa;
516  hashtable->batches = NULL;
517 
518 #ifdef HJDEBUG
519  printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
520  hashtable, nbatch, nbuckets);
521 #endif
522 
523  /*
524  * Create temporary memory contexts in which to keep the hashtable working
525  * storage. See notes in executor/hashjoin.h.
526  */
528  "HashTableContext",
530 
531  hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
532  "HashBatchContext",
534 
535  hashtable->spillCxt = AllocSetContextCreate(hashtable->hashCxt,
536  "HashSpillContext",
538 
539  /* Allocate data that will live for the life of the hashjoin */
540 
541  oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
542 
543  /*
544  * Get info about the hash functions to be used for each hash key. Also
545  * remember whether the join operators are strict.
546  */
547  nkeys = list_length(hashOperators);
548  hashtable->outer_hashfunctions = palloc_array(FmgrInfo, nkeys);
549  hashtable->inner_hashfunctions = palloc_array(FmgrInfo, nkeys);
550  hashtable->hashStrict = palloc_array(bool, nkeys);
551  hashtable->collations = palloc_array(Oid, nkeys);
552  i = 0;
553  forboth(ho, hashOperators, hc, hashCollations)
554  {
555  Oid hashop = lfirst_oid(ho);
556  Oid left_hashfn;
557  Oid right_hashfn;
558 
559  if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
560  elog(ERROR, "could not find hash function for hash operator %u",
561  hashop);
562  fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]);
563  fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]);
564  hashtable->hashStrict[i] = op_strict(hashop);
565  hashtable->collations[i] = lfirst_oid(hc);
566  i++;
567  }
568 
569  if (nbatch > 1 && hashtable->parallel_state == NULL)
570  {
571  MemoryContext oldctx;
572 
573  /*
574  * allocate and initialize the file arrays in hashCxt (not needed for
575  * parallel case which uses shared tuplestores instead of raw files)
576  */
577  oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
578 
579  hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
580  hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
581 
582  MemoryContextSwitchTo(oldctx);
583 
584  /* The files will not be opened until needed... */
585  /* ... but make sure we have temp tablespaces established for them */
587  }
588 
589  MemoryContextSwitchTo(oldcxt);
590 
591  if (hashtable->parallel_state)
592  {
593  ParallelHashJoinState *pstate = hashtable->parallel_state;
594  Barrier *build_barrier;
595 
596  /*
597  * Attach to the build barrier. The corresponding detach operation is
598  * in ExecHashTableDetach. Note that we won't attach to the
599  * batch_barrier for batch 0 yet. We'll attach later and start it out
600  * in PHJ_BATCH_PROBE phase, because batch 0 is allocated up front and
601  * then loaded while hashing (the standard hybrid hash join
602  * algorithm), and we'll coordinate that using build_barrier.
603  */
604  build_barrier = &pstate->build_barrier;
605  BarrierAttach(build_barrier);
606 
607  /*
608  * So far we have no idea whether there are any other participants,
609  * and if so, what phase they are working on. The only thing we care
610  * about at this point is whether someone has already created the
611  * SharedHashJoinBatch objects and the hash table for batch 0. One
612  * backend will be elected to do that now if necessary.
613  */
614  if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECT &&
615  BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECT))
616  {
617  pstate->nbatch = nbatch;
618  pstate->space_allowed = space_allowed;
619  pstate->growth = PHJ_GROWTH_OK;
620 
621  /* Set up the shared state for coordinating batches. */
622  ExecParallelHashJoinSetUpBatches(hashtable, nbatch);
623 
624  /*
625  * Allocate batch 0's hash table up front so we can load it
626  * directly while hashing.
627  */
628  pstate->nbuckets = nbuckets;
629  ExecParallelHashTableAlloc(hashtable, 0);
630  }
631 
632  /*
633  * The next Parallel Hash synchronization point is in
634  * MultiExecParallelHash(), which will progress it all the way to
635  * PHJ_BUILD_RUN. The caller must not return control from this
636  * executor node between now and then.
637  */
638  }
639  else
640  {
641  /*
642  * Prepare context for the first-scan space allocations; allocate the
643  * hashbucket array therein, and set each bucket "empty".
644  */
645  MemoryContextSwitchTo(hashtable->batchCxt);
646 
647  hashtable->buckets.unshared = palloc0_array(HashJoinTuple, nbuckets);
648 
649  /*
650  * Set up for skew optimization, if possible and there's a need for
651  * more than one batch. (In a one-batch join, there's no point in
652  * it.)
653  */
654  if (nbatch > 1)
655  ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
656 
657  MemoryContextSwitchTo(oldcxt);
658  }
659 
660  return hashtable;
661 }
void PrepareTempTablespaces(void)
Definition: tablespace.c:1331
int BarrierAttach(Barrier *barrier)
Definition: barrier.c:236
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:265
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
Definition: barrier.c:125
#define OidIsValid(objectId)
Definition: c.h:775
int my_log2(long num)
Definition: dynahash.c:1751
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
#define palloc_object(type)
Definition: fe_memutils.h:62
#define palloc_array(type, count)
Definition: fe_memutils.h:64
#define palloc0_array(type, count)
Definition: fe_memutils.h:65
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:127
@ PHJ_GROWTH_OK
Definition: hashjoin.h:233
#define PHJ_BUILD_ELECT
Definition: hashjoin.h:269
bool op_strict(Oid opno)
Definition: lsyscache.c:1477
bool get_op_hash_functions(Oid opno, RegProcedure *lhs_procno, RegProcedure *rhs_procno)
Definition: lsyscache.c:510
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
Definition: nodeHash.c:2375
static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
Definition: nodeHash.c:3097
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
Definition: nodeHash.c:3262
static int list_length(const List *l)
Definition: pg_list.h:152
#define forboth(cell1, list1, cell2, list2)
Definition: pg_list.h:518
#define lfirst_oid(lc)
Definition: pg_list.h:174
#define printf(...)
Definition: port.h:244
unsigned int Oid
Definition: postgres_ext.h:31
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:311
HashMemoryChunk chunks
Definition: hashjoin.h:367
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:373
MemoryContext hashCxt
Definition: hashjoin.h:362
double totalTuples
Definition: hashjoin.h:332
double partialTuples
Definition: hashjoin.h:333
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:372
MemoryContext spillCxt
Definition: hashjoin.h:364
HashMemoryChunk current_chunk
Definition: hashjoin.h:370
Size spaceAllowedSkew
Definition: hashjoin.h:360
int * skewBucketNums
Definition: hashjoin.h:322
BufFile ** innerBatchFile
Definition: hashjoin.h:343
int log2_nbuckets_optimal
Definition: hashjoin.h:305
union HashJoinTableData::@104 buckets
dsa_area * area
Definition: hashjoin.h:371
BufFile ** outerBatchFile
Definition: hashjoin.h:344
MemoryContext batchCxt
Definition: hashjoin.h:363
double skewTuples
Definition: hashjoin.h:334
Oid skewTable
Definition: plannodes.h:1206
Cardinality rows_total
Definition: plannodes.h:1210
Plan plan
Definition: plannodes.h:1199
ParallelHashGrowth growth
Definition: hashjoin.h:253
bool parallel_aware
Definition: plannodes.h:140
int plan_width
Definition: plannodes.h:135
Cardinality plan_rows
Definition: plannodes.h:134
Definition: regguts.h:323

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, HashJoinTableData::area, Assert, BarrierArriveAndWait(), BarrierAttach(), BarrierPhase(), HashJoinTableData::batchCxt, HashJoinTableData::batches, HashJoinTableData::buckets, ParallelHashJoinState::build_barrier, HashJoinTableData::chunks, HashJoinTableData::collations, HashJoinTableData::curbatch, HashJoinTableData::current_chunk, CurrentMemoryContext, elog, ERROR, ExecChooseHashTableSize(), ExecHashBuildSkewHash(), ExecParallelHashJoinSetUpBatches(), ExecParallelHashTableAlloc(), fmgr_info(), forboth, get_op_hash_functions(), HashJoinTableData::growEnabled, ParallelHashJoinState::growth, HashJoinTableData::hashCxt, HashJoinTableData::hashStrict, i, HashJoinTableData::inner_hashfunctions, HashJoinTableData::innerBatchFile, HashJoinTableData::keepNulls, lfirst_oid, list_length(), HashJoinTableData::log2_nbuckets, HashJoinTableData::log2_nbuckets_optimal, MemoryContextSwitchTo(), my_log2(), ParallelHashJoinState::nbatch, HashJoinTableData::nbatch, HashJoinTableData::nbatch_original, HashJoinTableData::nbatch_outstart, ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, HashJoinTableData::nbuckets_optimal, HashJoinTableData::nbuckets_original, HashJoinTableData::nSkewBuckets, OidIsValid, op_strict(), HashJoinTableData::outer_hashfunctions, HashJoinTableData::outerBatchFile, outerPlan, palloc0_array, palloc_array, palloc_object, Plan::parallel_aware, HashJoinTableData::parallel_state, HashJoinTableData::partialTuples, PHJ_BUILD_ELECT, PHJ_GROWTH_OK, Hash::plan, Plan::plan_rows, Plan::plan_width, PrepareTempTablespaces(), printf, Hash::rows_total, SKEW_HASH_MEM_PERCENT, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketLen, HashJoinTableData::skewBucketNums, HashJoinTableData::skewEnabled, Hash::skewTable, HashJoinTableData::skewTuples, ParallelHashJoinState::space_allowed, HashJoinTableData::spaceAllowed, HashJoinTableData::spaceAllowedSkew, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, HashJoinTableData::spaceUsedSkew, HashJoinTableData::spillCxt, HashJoinTableData::totalTuples, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

◆ ExecHashTableDestroy()

void ExecHashTableDestroy ( HashJoinTable  hashtable)

Definition at line 883 of file nodeHash.c.

884 {
885  int i;
886 
887  /*
888  * Make sure all the temp files are closed. We skip batch 0, since it
889  * can't have any temp files (and the arrays might not even exist if
890  * nbatch is only 1). Parallel hash joins don't use these files.
891  */
892  if (hashtable->innerBatchFile != NULL)
893  {
894  for (i = 1; i < hashtable->nbatch; i++)
895  {
896  if (hashtable->innerBatchFile[i])
897  BufFileClose(hashtable->innerBatchFile[i]);
898  if (hashtable->outerBatchFile[i])
899  BufFileClose(hashtable->outerBatchFile[i]);
900  }
901  }
902 
903  /* Release working memory (batchCxt is a child, so it goes away too) */
904  MemoryContextDelete(hashtable->hashCxt);
905 
906  /* And drop the control block */
907  pfree(hashtable);
908 }
void BufFileClose(BufFile *file)
Definition: buffile.c:412
void pfree(void *pointer)
Definition: mcxt.c:1520
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454

References BufFileClose(), HashJoinTableData::hashCxt, i, HashJoinTableData::innerBatchFile, MemoryContextDelete(), HashJoinTableData::nbatch, HashJoinTableData::outerBatchFile, and pfree().

Referenced by ExecEndHashJoin(), and ExecReScanHashJoin().

◆ ExecHashTableDetach()

void ExecHashTableDetach ( HashJoinTable  hashtable)

Definition at line 3374 of file nodeHash.c.

3375 {
3376  ParallelHashJoinState *pstate = hashtable->parallel_state;
3377 
3378  /*
3379  * If we're involved in a parallel query, we must either have gotten all
3380  * the way to PHJ_BUILD_RUN, or joined too late and be in PHJ_BUILD_FREE.
3381  */
3382  Assert(!pstate ||
3384 
3385  if (pstate && BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_RUN)
3386  {
3387  int i;
3388 
3389  /* Make sure any temporary files are closed. */
3390  if (hashtable->batches)
3391  {
3392  for (i = 0; i < hashtable->nbatch; ++i)
3393  {
3394  sts_end_write(hashtable->batches[i].inner_tuples);
3395  sts_end_write(hashtable->batches[i].outer_tuples);
3398  }
3399  }
3400 
3401  /* If we're last to detach, clean up shared memory. */
3402  if (BarrierArriveAndDetach(&pstate->build_barrier))
3403  {
3404  /*
3405  * Late joining processes will see this state and give up
3406  * immediately.
3407  */
3409 
3410  if (DsaPointerIsValid(pstate->batches))
3411  {
3412  dsa_free(hashtable->area, pstate->batches);
3413  pstate->batches = InvalidDsaPointer;
3414  }
3415  }
3416  }
3417  hashtable->parallel_state = NULL;
3418 }
bool BarrierArriveAndDetach(Barrier *barrier)
Definition: barrier.c:203
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:826
#define InvalidDsaPointer
Definition: dsa.h:78
#define DsaPointerIsValid(x)
Definition: dsa.h:106
#define PHJ_BUILD_FREE
Definition: hashjoin.h:274
#define PHJ_BUILD_RUN
Definition: hashjoin.h:273
void sts_end_write(SharedTuplestoreAccessor *accessor)
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * outer_tuples
Definition: hashjoin.h:221
SharedTuplestoreAccessor * inner_tuples
Definition: hashjoin.h:220
dsa_pointer batches
Definition: hashjoin.h:248

References HashJoinTableData::area, Assert, BarrierArriveAndDetach(), BarrierPhase(), ParallelHashJoinState::batches, HashJoinTableData::batches, ParallelHashJoinState::build_barrier, dsa_free(), DsaPointerIsValid, i, ParallelHashJoinBatchAccessor::inner_tuples, InvalidDsaPointer, HashJoinTableData::nbatch, ParallelHashJoinBatchAccessor::outer_tuples, HashJoinTableData::parallel_state, PHJ_BUILD_FREE, PHJ_BUILD_RUN, sts_end_parallel_scan(), and sts_end_write().

Referenced by ExecHashJoinReInitializeDSM(), and ExecShutdownHashJoin().

◆ ExecHashTableDetachBatch()

void ExecHashTableDetachBatch ( HashJoinTable  hashtable)

Definition at line 3282 of file nodeHash.c.

3283 {
3284  if (hashtable->parallel_state != NULL &&
3285  hashtable->curbatch >= 0)
3286  {
3287  int curbatch = hashtable->curbatch;
3288  ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
3289  bool attached = true;
3290 
3291  /* Make sure any temporary files are closed. */
3292  sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
3293  sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
3294 
3295  /* After attaching we always get at least to PHJ_BATCH_PROBE. */
3298 
3299  /*
3300  * If we're abandoning the PHJ_BATCH_PROBE phase early without having
3301  * reached the end of it, it means the plan doesn't want any more
3302  * tuples, and it is happy to abandon any tuples buffered in this
3303  * process's subplans. For correctness, we can't allow any process to
3304  * execute the PHJ_BATCH_SCAN phase, because we will never have the
3305  * complete set of match bits. Therefore we skip emitting unmatched
3306  * tuples in all backends (if this is a full/right join), as if those
3307  * tuples were all due to be emitted by this process and it has
3308  * abandoned them too.
3309  */
3310  if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE &&
3311  !hashtable->batches[curbatch].outer_eof)
3312  {
3313  /*
3314  * This flag may be written to by multiple backends during
3315  * PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN
3316  * phase so requires no extra locking.
3317  */
3318  batch->skip_unmatched = true;
3319  }
3320 
3321  /*
3322  * Even if we aren't doing a full/right outer join, we'll step through
3323  * the PHJ_BATCH_SCAN phase just to maintain the invariant that
3324  * freeing happens in PHJ_BATCH_FREE, but that'll be wait-free.
3325  */
3326  if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE)
3327  attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
3328  if (attached && BarrierArriveAndDetach(&batch->batch_barrier))
3329  {
3330  /*
3331  * We are not longer attached to the batch barrier, but we're the
3332  * process that was chosen to free resources and it's safe to
3333  * assert the current phase. The ParallelHashJoinBatch can't go
3334  * away underneath us while we are attached to the build barrier,
3335  * making this access safe.
3336  */
3338 
3339  /* Free shared chunks and buckets. */
3340  while (DsaPointerIsValid(batch->chunks))
3341  {
3343  dsa_get_address(hashtable->area, batch->chunks);
3344  dsa_pointer next = chunk->next.shared;
3345 
3346  dsa_free(hashtable->area, batch->chunks);
3347  batch->chunks = next;
3348  }
3349  if (DsaPointerIsValid(batch->buckets))
3350  {
3351  dsa_free(hashtable->area, batch->buckets);
3352  batch->buckets = InvalidDsaPointer;
3353  }
3354  }
3355 
3356  /*
3357  * Track the largest batch we've been attached to. Though each
3358  * backend might see a different subset of batches, explain.c will
3359  * scan the results from all backends to find the largest value.
3360  */
3361  hashtable->spacePeak =
3362  Max(hashtable->spacePeak,
3363  batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
3364 
3365  /* Remember that we are not attached to a batch. */
3366  hashtable->curbatch = -1;
3367  }
3368 }
bool BarrierArriveAndDetachExceptLast(Barrier *barrier)
Definition: barrier.c:213
static int32 next
Definition: blutils.c:221
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:942
uint64 dsa_pointer
Definition: dsa.h:62
uint64 chunk
#define PHJ_BATCH_SCAN
Definition: hashjoin.h:281
#define PHJ_BATCH_PROBE
Definition: hashjoin.h:280
#define PHJ_BATCH_FREE
Definition: hashjoin.h:282
ParallelHashJoinBatch * shared
Definition: hashjoin.h:209
dsa_pointer chunks
Definition: hashjoin.h:167
dsa_pointer buckets
Definition: hashjoin.h:164

References HashJoinTableData::area, Assert, BarrierArriveAndDetach(), BarrierArriveAndDetachExceptLast(), BarrierPhase(), ParallelHashJoinBatch::batch_barrier, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, chunk, ParallelHashJoinBatch::chunks, HashJoinTableData::curbatch, dsa_free(), dsa_get_address(), DsaPointerIsValid, ParallelHashJoinBatchAccessor::inner_tuples, InvalidDsaPointer, Max, HashJoinTableData::nbuckets, next, ParallelHashJoinBatchAccessor::outer_eof, ParallelHashJoinBatchAccessor::outer_tuples, HashJoinTableData::parallel_state, PHJ_BATCH_FREE, PHJ_BATCH_PROBE, PHJ_BATCH_SCAN, ParallelHashJoinBatchAccessor::shared, ParallelHashJoinBatch::size, ParallelHashJoinBatch::skip_unmatched, HashJoinTableData::spacePeak, and sts_end_parallel_scan().

Referenced by ExecHashJoinReInitializeDSM(), ExecParallelHashJoinNewBatch(), ExecParallelPrepHashTableForUnmatched(), and ExecShutdownHashJoin().

◆ ExecHashTableInsert()

void ExecHashTableInsert ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1624 of file nodeHash.c.

1627 {
1628  bool shouldFree;
1629  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1630  int bucketno;
1631  int batchno;
1632 
1633  ExecHashGetBucketAndBatch(hashtable, hashvalue,
1634  &bucketno, &batchno);
1635 
1636  /*
1637  * decide whether to put the tuple in the hash table or a temp file
1638  */
1639  if (batchno == hashtable->curbatch)
1640  {
1641  /*
1642  * put the tuple in hash table
1643  */
1644  HashJoinTuple hashTuple;
1645  int hashTupleSize;
1646  double ntuples = (hashtable->totalTuples - hashtable->skewTuples);
1647 
1648  /* Create the HashJoinTuple */
1649  hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
1650  hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
1651 
1652  hashTuple->hashvalue = hashvalue;
1653  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1654 
1655  /*
1656  * We always reset the tuple-matched flag on insertion. This is okay
1657  * even when reloading a tuple from a batch file, since the tuple
1658  * could not possibly have been matched to an outer tuple before it
1659  * went into the batch file.
1660  */
1662 
1663  /* Push it onto the front of the bucket's list */
1664  hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
1665  hashtable->buckets.unshared[bucketno] = hashTuple;
1666 
1667  /*
1668  * Increase the (optimal) number of buckets if we just exceeded the
1669  * NTUP_PER_BUCKET threshold, but only when there's still a single
1670  * batch.
1671  */
1672  if (hashtable->nbatch == 1 &&
1673  ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))
1674  {
1675  /* Guard against integer overflow and alloc size overflow */
1676  if (hashtable->nbuckets_optimal <= INT_MAX / 2 &&
1677  hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple))
1678  {
1679  hashtable->nbuckets_optimal *= 2;
1680  hashtable->log2_nbuckets_optimal += 1;
1681  }
1682  }
1683 
1684  /* Account for space used, and back off if we've used too much */
1685  hashtable->spaceUsed += hashTupleSize;
1686  if (hashtable->spaceUsed > hashtable->spacePeak)
1687  hashtable->spacePeak = hashtable->spaceUsed;
1688  if (hashtable->spaceUsed +
1689  hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
1690  > hashtable->spaceAllowed)
1691  ExecHashIncreaseNumBatches(hashtable);
1692  }
1693  else
1694  {
1695  /*
1696  * put the tuple into a temp file for later batches
1697  */
1698  Assert(batchno > hashtable->curbatch);
1699  ExecHashJoinSaveTuple(tuple,
1700  hashvalue,
1701  &hashtable->innerBatchFile[batchno],
1702  hashtable);
1703  }
1704 
1705  if (shouldFree)
1706  heap_free_minimal_tuple(tuple);
1707 }
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1779
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:91
void heap_free_minimal_tuple(MinimalTuple mtup)
Definition: heaptuple.c:1523
#define HeapTupleHeaderClearMatch(tup)
Definition: htup_details.h:524
static void * dense_alloc(HashJoinTable hashtable, Size size)
Definition: nodeHash.c:2869
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
Definition: nodeHash.c:916
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
Definition: nodeHash.c:1932
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr, HashJoinTable hashtable)
union HashJoinTupleData::@102 next
uint32 hashvalue
Definition: hashjoin.h:86
struct HashJoinTupleData * unshared
Definition: hashjoin.h:83

References Assert, HashJoinTableData::buckets, HashJoinTableData::curbatch, dense_alloc(), ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecHashIncreaseNumBatches(), ExecHashJoinSaveTuple(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, HashJoinTableData::innerBatchFile, HashJoinTableData::log2_nbuckets_optimal, MaxAllocSize, HashJoinTableData::nbatch, HashJoinTableData::nbuckets_optimal, HashJoinTupleData::next, NTUP_PER_BUCKET, HashJoinTableData::skewTuples, HashJoinTableData::spaceAllowed, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, MinimalTupleData::t_len, HashJoinTableData::totalTuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinNewBatch(), and MultiExecPrivateHash().

◆ ExecHashTableReset()

void ExecHashTableReset ( HashJoinTable  hashtable)

Definition at line 2299 of file nodeHash.c.

2300 {
2301  MemoryContext oldcxt;
2302  int nbuckets = hashtable->nbuckets;
2303 
2304  /*
2305  * Release all the hash buckets and tuples acquired in the prior pass, and
2306  * reinitialize the context for a new pass.
2307  */
2308  MemoryContextReset(hashtable->batchCxt);
2309  oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
2310 
2311  /* Reallocate and reinitialize the hash bucket headers. */
2312  hashtable->buckets.unshared = palloc0_array(HashJoinTuple, nbuckets);
2313 
2314  hashtable->spaceUsed = 0;
2315 
2316  MemoryContextSwitchTo(oldcxt);
2317 
2318  /* Forget the chunks (the memory was freed by the context reset above). */
2319  hashtable->chunks = NULL;
2320 }
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383

References HashJoinTableData::batchCxt, HashJoinTableData::buckets, HashJoinTableData::chunks, MemoryContextReset(), MemoryContextSwitchTo(), HashJoinTableData::nbuckets, palloc0_array, HashJoinTableData::spaceUsed, and HashJoinTableData::unshared.

Referenced by ExecHashJoinNewBatch().

◆ ExecHashTableResetMatchFlags()

void ExecHashTableResetMatchFlags ( HashJoinTable  hashtable)

Definition at line 2327 of file nodeHash.c.

2328 {
2329  HashJoinTuple tuple;
2330  int i;
2331 
2332  /* Reset all flags in the main table ... */
2333  for (i = 0; i < hashtable->nbuckets; i++)
2334  {
2335  for (tuple = hashtable->buckets.unshared[i]; tuple != NULL;
2336  tuple = tuple->next.unshared)
2338  }
2339 
2340  /* ... and the same for the skew buckets, if any */
2341  for (i = 0; i < hashtable->nSkewBuckets; i++)
2342  {
2343  int j = hashtable->skewBucketNums[i];
2344  HashSkewBucket *skewBucket = hashtable->skewBucket[j];
2345 
2346  for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared)
2348  }
2349 }
int j
Definition: isn.c:74
HashJoinTuple tuples
Definition: hashjoin.h:116

References HashJoinTableData::buckets, HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, i, j, HashJoinTableData::nbuckets, HashJoinTupleData::next, HashJoinTableData::nSkewBuckets, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketNums, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecReScanHashJoin().

◆ ExecInitHash()

HashState* ExecInitHash ( Hash node,
EState estate,
int  eflags 
)

Definition at line 360 of file nodeHash.c.

361 {
362  HashState *hashstate;
363 
364  /* check for unsupported flags */
365  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
366 
367  /*
368  * create state structure
369  */
370  hashstate = makeNode(HashState);
371  hashstate->ps.plan = (Plan *) node;
372  hashstate->ps.state = estate;
373  hashstate->ps.ExecProcNode = ExecHash;
374  hashstate->hashtable = NULL;
375  hashstate->hashkeys = NIL; /* will be set by parent HashJoin */
376 
377  /*
378  * Miscellaneous initialization
379  *
380  * create expression context for node
381  */
382  ExecAssignExprContext(estate, &hashstate->ps);
383 
384  /*
385  * initialize child nodes
386  */
387  outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags);
388 
389  /*
390  * initialize our result slot and type. No need to build projection
391  * because this node doesn't do projections.
392  */
394  hashstate->ps.ps_ProjInfo = NULL;
395 
396  /*
397  * initialize child expressions
398  */
399  Assert(node->plan.qual == NIL);
400  hashstate->hashkeys =
401  ExecInitExprList(node->hashkeys, (PlanState *) hashstate);
402 
403  return hashstate;
404 }
List * ExecInitExprList(List *nodes, PlanState *parent)
Definition: execExpr.c:326
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:142
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1886
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:86
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:483
#define EXEC_FLAG_BACKWARD
Definition: executor.h:68
#define EXEC_FLAG_MARK
Definition: executor.h:69
static TupleTableSlot * ExecHash(PlanState *pstate)
Definition: nodeHash.c:91
#define makeNode(_type_)
Definition: nodes.h:155
#define NIL
Definition: pg_list.h:68
HashJoinTable hashtable
Definition: execnodes.h:2741
List * hashkeys
Definition: execnodes.h:2742
List * hashkeys
Definition: plannodes.h:1205
EState * state
Definition: execnodes.h:1119
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:1157
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:1123
List * qual
Definition: plannodes.h:153

References Assert, EXEC_FLAG_BACKWARD, EXEC_FLAG_MARK, ExecAssignExprContext(), ExecHash(), ExecInitExprList(), ExecInitNode(), ExecInitResultTupleSlotTL(), PlanState::ExecProcNode, HashState::hashkeys, Hash::hashkeys, HashState::hashtable, makeNode, NIL, outerPlan, outerPlanState, PlanState::plan, Hash::plan, HashState::ps, PlanState::ps_ProjInfo, Plan::qual, PlanState::state, and TTSOpsMinimalTuple.

Referenced by ExecInitNode().

◆ ExecParallelHashTableAlloc()

void ExecParallelHashTableAlloc ( HashJoinTable  hashtable,
int  batchno 
)

Definition at line 3262 of file nodeHash.c.

3263 {
3264  ParallelHashJoinBatch *batch = hashtable->batches[batchno].shared;
3265  dsa_pointer_atomic *buckets;
3266  int nbuckets = hashtable->parallel_state->nbuckets;
3267  int i;
3268 
3269  batch->buckets =
3270  dsa_allocate(hashtable->area, sizeof(dsa_pointer_atomic) * nbuckets);
3271  buckets = (dsa_pointer_atomic *)
3272  dsa_get_address(hashtable->area, batch->buckets);
3273  for (i = 0; i < nbuckets; ++i)
3275 }
#define dsa_pointer_atomic_init
Definition: dsa.h:64
#define dsa_allocate(area, size)
Definition: dsa.h:109

References HashJoinTableData::area, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, dsa_allocate, dsa_get_address(), dsa_pointer_atomic_init, i, InvalidDsaPointer, ParallelHashJoinState::nbuckets, HashJoinTableData::parallel_state, and ParallelHashJoinBatchAccessor::shared.

Referenced by ExecHashTableCreate(), and ExecParallelHashJoinNewBatch().

◆ ExecParallelHashTableInsert()

void ExecParallelHashTableInsert ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1714 of file nodeHash.c.

1717 {
1718  bool shouldFree;
1719  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1720  dsa_pointer shared;
1721  int bucketno;
1722  int batchno;
1723 
1724 retry:
1725  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1726 
1727  if (batchno == 0)
1728  {
1729  HashJoinTuple hashTuple;
1730 
1731  /* Try to load it into memory. */
1734  hashTuple = ExecParallelHashTupleAlloc(hashtable,
1735  HJTUPLE_OVERHEAD + tuple->t_len,
1736  &shared);
1737  if (hashTuple == NULL)
1738  goto retry;
1739 
1740  /* Store the hash value in the HashJoinTuple header. */
1741  hashTuple->hashvalue = hashvalue;
1742  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1744 
1745  /* Push it onto the front of the bucket's list */
1746  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1747  hashTuple, shared);
1748  }
1749  else
1750  {
1751  size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
1752 
1753  Assert(batchno > 0);
1754 
1755  /* Try to preallocate space in the batch if necessary. */
1756  if (hashtable->batches[batchno].preallocated < tuple_size)
1757  {
1758  if (!ExecParallelHashTuplePrealloc(hashtable, batchno, tuple_size))
1759  goto retry;
1760  }
1761 
1762  Assert(hashtable->batches[batchno].preallocated >= tuple_size);
1763  hashtable->batches[batchno].preallocated -= tuple_size;
1764  sts_puttuple(hashtable->batches[batchno].inner_tuples, &hashvalue,
1765  tuple);
1766  }
1767  ++hashtable->batches[batchno].ntuples;
1768 
1769  if (shouldFree)
1770  heap_free_minimal_tuple(tuple);
1771 }
#define PHJ_BUILD_HASH_INNER
Definition: hashjoin.h:271
static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
Definition: nodeHash.c:3534
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
Definition: nodeHash.c:2949
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
Definition: nodeHash.c:3454
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
dsa_pointer_atomic * shared
Definition: hashjoin.h:313

References Assert, BarrierPhase(), HashJoinTableData::batches, HashJoinTableData::buckets, ParallelHashJoinState::build_barrier, ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecParallelHashPushTuple(), ExecParallelHashTupleAlloc(), ExecParallelHashTuplePrealloc(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, ParallelHashJoinBatchAccessor::inner_tuples, MAXALIGN, ParallelHashJoinBatchAccessor::ntuples, HashJoinTableData::parallel_state, PHJ_BUILD_HASH_INNER, ParallelHashJoinBatchAccessor::preallocated, HashJoinTableData::shared, sts_puttuple(), and MinimalTupleData::t_len.

Referenced by MultiExecParallelHash().

◆ ExecParallelHashTableInsertCurrentBatch()

void ExecParallelHashTableInsertCurrentBatch ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1780 of file nodeHash.c.

1783 {
1784  bool shouldFree;
1785  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1786  HashJoinTuple hashTuple;
1787  dsa_pointer shared;
1788  int batchno;
1789  int bucketno;
1790 
1791  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1792  Assert(batchno == hashtable->curbatch);
1793  hashTuple = ExecParallelHashTupleAlloc(hashtable,
1794  HJTUPLE_OVERHEAD + tuple->t_len,
1795  &shared);
1796  hashTuple->hashvalue = hashvalue;
1797  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1799  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1800  hashTuple, shared);
1801 
1802  if (shouldFree)
1803  heap_free_minimal_tuple(tuple);
1804 }

References Assert, HashJoinTableData::buckets, HashJoinTableData::curbatch, ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecParallelHashPushTuple(), ExecParallelHashTupleAlloc(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, HashJoinTableData::shared, and MinimalTupleData::t_len.

Referenced by ExecParallelHashJoinNewBatch().

◆ ExecParallelHashTableSetCurrentBatch()

void ExecParallelHashTableSetCurrentBatch ( HashJoinTable  hashtable,
int  batchno 
)

Definition at line 3472 of file nodeHash.c.

3473 {
3474  Assert(hashtable->batches[batchno].shared->buckets != InvalidDsaPointer);
3475 
3476  hashtable->curbatch = batchno;
3477  hashtable->buckets.shared = (dsa_pointer_atomic *)
3478  dsa_get_address(hashtable->area,
3479  hashtable->batches[batchno].shared->buckets);
3480  hashtable->nbuckets = hashtable->parallel_state->nbuckets;
3481  hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
3482  hashtable->current_chunk = NULL;
3484  hashtable->batches[batchno].at_least_one_chunk = false;
3485 }
dsa_pointer current_chunk_shared
Definition: hashjoin.h:374

References HashJoinTableData::area, Assert, ParallelHashJoinBatchAccessor::at_least_one_chunk, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, HashJoinTableData::buckets, HashJoinTableData::curbatch, HashJoinTableData::current_chunk, HashJoinTableData::current_chunk_shared, dsa_get_address(), InvalidDsaPointer, HashJoinTableData::log2_nbuckets, my_log2(), ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, HashJoinTableData::parallel_state, ParallelHashJoinBatchAccessor::shared, and HashJoinTableData::shared.

Referenced by ExecParallelHashIncreaseNumBatches(), ExecParallelHashIncreaseNumBuckets(), ExecParallelHashJoinNewBatch(), and MultiExecParallelHash().

◆ ExecParallelPrepHashTableForUnmatched()

bool ExecParallelPrepHashTableForUnmatched ( HashJoinState hjstate)

Definition at line 2097 of file nodeHash.c.

2098 {
2099  HashJoinTable hashtable = hjstate->hj_HashTable;
2100  int curbatch = hashtable->curbatch;
2101  ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
2102 
2104 
2105  /*
2106  * It would not be deadlock-free to wait on the batch barrier, because it
2107  * is in PHJ_BATCH_PROBE phase, and thus processes attached to it have
2108  * already emitted tuples. Therefore, we'll hold a wait-free election:
2109  * only one process can continue to the next phase, and all others detach
2110  * from this batch. They can still go any work on other batches, if there
2111  * are any.
2112  */
2114  {
2115  /* This process considers the batch to be done. */
2116  hashtable->batches[hashtable->curbatch].done = true;
2117 
2118  /* Make sure any temporary files are closed. */
2119  sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
2120  sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
2121 
2122  /*
2123  * Track largest batch we've seen, which would normally happen in
2124  * ExecHashTableDetachBatch().
2125  */
2126  hashtable->spacePeak =
2127  Max(hashtable->spacePeak,
2128  batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
2129  hashtable->curbatch = -1;
2130  return false;
2131  }
2132 
2133  /* Now we are alone with this batch. */
2135 
2136  /*
2137  * Has another process decided to give up early and command all processes
2138  * to skip the unmatched scan?
2139  */
2140  if (batch->skip_unmatched)
2141  {
2142  hashtable->batches[hashtable->curbatch].done = true;
2143  ExecHashTableDetachBatch(hashtable);
2144  return false;
2145  }
2146 
2147  /* Now prepare the process local state, just as for non-parallel join. */
2149 
2150  return true;
2151 }
void ExecHashTableDetachBatch(HashJoinTable hashtable)
Definition: nodeHash.c:3282
void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
Definition: nodeHash.c:2076
HashJoinTable hj_HashTable
Definition: execnodes.h:2190

References Assert, BarrierArriveAndDetachExceptLast(), BarrierPhase(), ParallelHashJoinBatch::batch_barrier, HashJoinTableData::batches, HashJoinTableData::curbatch, ParallelHashJoinBatchAccessor::done, ExecHashTableDetachBatch(), ExecPrepHashTableForUnmatched(), HashJoinState::hj_HashTable, ParallelHashJoinBatchAccessor::inner_tuples, Max, HashJoinTableData::nbuckets, ParallelHashJoinBatchAccessor::outer_tuples, PHJ_BATCH_PROBE, PHJ_BATCH_SCAN, ParallelHashJoinBatchAccessor::shared, ParallelHashJoinBatch::size, ParallelHashJoinBatch::skip_unmatched, HashJoinTableData::spacePeak, and sts_end_parallel_scan().

Referenced by ExecHashJoinImpl().

◆ ExecParallelScanHashBucket()

bool ExecParallelScanHashBucket ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 2025 of file nodeHash.c.

2027 {
2028  ExprState *hjclauses = hjstate->hashclauses;
2029  HashJoinTable hashtable = hjstate->hj_HashTable;
2030  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2031  uint32 hashvalue = hjstate->hj_CurHashValue;
2032 
2033  /*
2034  * hj_CurTuple is the address of the tuple last returned from the current
2035  * bucket, or NULL if it's time to start scanning a new bucket.
2036  */
2037  if (hashTuple != NULL)
2038  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2039  else
2040  hashTuple = ExecParallelHashFirstTuple(hashtable,
2041  hjstate->hj_CurBucketNo);
2042 
2043  while (hashTuple != NULL)
2044  {
2045  if (hashTuple->hashvalue == hashvalue)
2046  {
2047  TupleTableSlot *inntuple;
2048 
2049  /* insert hashtable's tuple into exec slot so ExecQual sees it */
2050  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2051  hjstate->hj_HashTupleSlot,
2052  false); /* do not pfree */
2053  econtext->ecxt_innertuple = inntuple;
2054 
2055  if (ExecQualAndReset(hjclauses, econtext))
2056  {
2057  hjstate->hj_CurTuple = hashTuple;
2058  return true;
2059  }
2060  }
2061 
2062  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2063  }
2064 
2065  /*
2066  * no match
2067  */
2068  return false;
2069 }
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1533
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
Definition: executor.h:440
static HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno)
Definition: nodeHash.c:3424
static HashJoinTuple ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple)
Definition: nodeHash.c:3440
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:257
HashJoinTuple hj_CurTuple
Definition: execnodes.h:2194
ExprState * hashclauses
Definition: execnodes.h:2186
uint32 hj_CurHashValue
Definition: execnodes.h:2191
int hj_CurBucketNo
Definition: execnodes.h:2192
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:2196

References ExprContext::ecxt_innertuple, ExecParallelHashFirstTuple(), ExecParallelHashNextTuple(), ExecQualAndReset(), ExecStoreMinimalTuple(), HashJoinState::hashclauses, HashJoinTupleData::hashvalue, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurHashValue, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, and HJTUPLE_MINTUPLE.

Referenced by ExecHashJoinImpl().

◆ ExecParallelScanHashTableForUnmatched()

bool ExecParallelScanHashTableForUnmatched ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 2236 of file nodeHash.c.

2238 {
2239  HashJoinTable hashtable = hjstate->hj_HashTable;
2240  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2241 
2242  for (;;)
2243  {
2244  /*
2245  * hj_CurTuple is the address of the tuple last returned from the
2246  * current bucket, or NULL if it's time to start scanning a new
2247  * bucket.
2248  */
2249  if (hashTuple != NULL)
2250  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2251  else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
2252  hashTuple = ExecParallelHashFirstTuple(hashtable,
2253  hjstate->hj_CurBucketNo++);
2254  else
2255  break; /* finished all buckets */
2256 
2257  while (hashTuple != NULL)
2258  {
2259  if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
2260  {
2261  TupleTableSlot *inntuple;
2262 
2263  /* insert hashtable's tuple into exec slot */
2264  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2265  hjstate->hj_HashTupleSlot,
2266  false); /* do not pfree */
2267  econtext->ecxt_innertuple = inntuple;
2268 
2269  /*
2270  * Reset temp memory each time; although this function doesn't
2271  * do any qual eval, the caller will, so let's keep it
2272  * parallel to ExecScanHashBucket.
2273  */
2274  ResetExprContext(econtext);
2275 
2276  hjstate->hj_CurTuple = hashTuple;
2277  return true;
2278  }
2279 
2280  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2281  }
2282 
2283  /* allow this loop to be cancellable */
2285  }
2286 
2287  /*
2288  * no more unmatched tuples
2289  */
2290  return false;
2291 }
#define HeapTupleHeaderHasMatch(tup)
Definition: htup_details.h:514
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122

References CHECK_FOR_INTERRUPTS, ExprContext::ecxt_innertuple, ExecParallelHashFirstTuple(), ExecParallelHashNextTuple(), ExecStoreMinimalTuple(), HeapTupleHeaderHasMatch, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, HashJoinTableData::nbuckets, and ResetExprContext.

Referenced by ExecHashJoinImpl().

◆ ExecPrepHashTableForUnmatched()

void ExecPrepHashTableForUnmatched ( HashJoinState hjstate)

Definition at line 2076 of file nodeHash.c.

2077 {
2078  /*----------
2079  * During this scan we use the HashJoinState fields as follows:
2080  *
2081  * hj_CurBucketNo: next regular bucket to scan
2082  * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
2083  * hj_CurTuple: last tuple returned, or NULL to start next bucket
2084  *----------
2085  */
2086  hjstate->hj_CurBucketNo = 0;
2087  hjstate->hj_CurSkewBucketNo = 0;
2088  hjstate->hj_CurTuple = NULL;
2089 }
int hj_CurSkewBucketNo
Definition: execnodes.h:2193

References HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurSkewBucketNo, and HashJoinState::hj_CurTuple.

Referenced by ExecHashJoinImpl(), and ExecParallelPrepHashTableForUnmatched().

◆ ExecReScanHash()

void ExecReScanHash ( HashState node)

Definition at line 2353 of file nodeHash.c.

2354 {
2356 
2357  /*
2358  * if chgParam of subnode is not null then plan will be re-scanned by
2359  * first ExecProcNode.
2360  */
2361  if (outerPlan->chgParam == NULL)
2363 }
void ExecReScan(PlanState *node)
Definition: execAmi.c:76

References ExecReScan(), outerPlan, and outerPlanState.

Referenced by ExecReScan().

◆ ExecScanHashBucket()

bool ExecScanHashBucket ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 1964 of file nodeHash.c.

1966 {
1967  ExprState *hjclauses = hjstate->hashclauses;
1968  HashJoinTable hashtable = hjstate->hj_HashTable;
1969  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
1970  uint32 hashvalue = hjstate->hj_CurHashValue;
1971 
1972  /*
1973  * hj_CurTuple is the address of the tuple last returned from the current
1974  * bucket, or NULL if it's time to start scanning a new bucket.
1975  *
1976  * If the tuple hashed to a skew bucket then scan the skew bucket
1977  * otherwise scan the standard hashtable bucket.
1978  */
1979  if (hashTuple != NULL)
1980  hashTuple = hashTuple->next.unshared;
1981  else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
1982  hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
1983  else
1984  hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
1985 
1986  while (hashTuple != NULL)
1987  {
1988  if (hashTuple->hashvalue == hashvalue)
1989  {
1990  TupleTableSlot *inntuple;
1991 
1992  /* insert hashtable's tuple into exec slot so ExecQual sees it */
1993  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
1994  hjstate->hj_HashTupleSlot,
1995  false); /* do not pfree */
1996  econtext->ecxt_innertuple = inntuple;
1997 
1998  if (ExecQualAndReset(hjclauses, econtext))
1999  {
2000  hjstate->hj_CurTuple = hashTuple;
2001  return true;
2002  }
2003  }
2004 
2005  hashTuple = hashTuple->next.unshared;
2006  }
2007 
2008  /*
2009  * no match
2010  */
2011  return false;
2012 }

References HashJoinTableData::buckets, ExprContext::ecxt_innertuple, ExecQualAndReset(), ExecStoreMinimalTuple(), HashJoinState::hashclauses, HashJoinTupleData::hashvalue, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurHashValue, HashJoinState::hj_CurSkewBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, INVALID_SKEW_BUCKET_NO, HashJoinTupleData::next, HashJoinTableData::skewBucket, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

◆ ExecScanHashTableForUnmatched()

bool ExecScanHashTableForUnmatched ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 2162 of file nodeHash.c.

2163 {
2164  HashJoinTable hashtable = hjstate->hj_HashTable;
2165  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2166 
2167  for (;;)
2168  {
2169  /*
2170  * hj_CurTuple is the address of the tuple last returned from the
2171  * current bucket, or NULL if it's time to start scanning a new
2172  * bucket.
2173  */
2174  if (hashTuple != NULL)
2175  hashTuple = hashTuple->next.unshared;
2176  else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
2177  {
2178  hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
2179  hjstate->hj_CurBucketNo++;
2180  }
2181  else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
2182  {
2183  int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
2184 
2185  hashTuple = hashtable->skewBucket[j]->tuples;
2186  hjstate->hj_CurSkewBucketNo++;
2187  }
2188  else
2189  break; /* finished all buckets */
2190 
2191  while (hashTuple != NULL)
2192  {
2193  if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
2194  {
2195  TupleTableSlot *inntuple;
2196 
2197  /* insert hashtable's tuple into exec slot */
2198  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2199  hjstate->hj_HashTupleSlot,
2200  false); /* do not pfree */
2201  econtext->ecxt_innertuple = inntuple;
2202 
2203  /*
2204  * Reset temp memory each time; although this function doesn't
2205  * do any qual eval, the caller will, so let's keep it
2206  * parallel to ExecScanHashBucket.
2207  */
2208  ResetExprContext(econtext);
2209 
2210  hjstate->hj_CurTuple = hashTuple;
2211  return true;
2212  }
2213 
2214  hashTuple = hashTuple->next.unshared;
2215  }
2216 
2217  /* allow this loop to be cancellable */
2219  }
2220 
2221  /*
2222  * no more unmatched tuples
2223  */
2224  return false;
2225 }

References HashJoinTableData::buckets, CHECK_FOR_INTERRUPTS, ExprContext::ecxt_innertuple, ExecStoreMinimalTuple(), HeapTupleHeaderHasMatch, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurSkewBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, j, HashJoinTableData::nbuckets, HashJoinTupleData::next, HashJoinTableData::nSkewBuckets, ResetExprContext, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketNums, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

◆ ExecShutdownHash()

void ExecShutdownHash ( HashState node)

Definition at line 2804 of file nodeHash.c.

2805 {
2806  /* Allocate save space if EXPLAIN'ing and we didn't do so already */
2807  if (node->ps.instrument && !node->hinstrument)
2809  /* Now accumulate data for the current (final) hash table */
2810  if (node->hinstrument && node->hashtable)
2812 }
#define palloc0_object(type)
Definition: fe_memutils.h:63
void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
Definition: nodeHash.c:2850

References ExecHashAccumInstrumentation(), HashState::hashtable, HashState::hinstrument, PlanState::instrument, palloc0_object, and HashState::ps.

Referenced by ExecShutdownNode_walker().

◆ MultiExecHash()

Node* MultiExecHash ( HashState node)

Definition at line 105 of file nodeHash.c.

106 {
107  /* must provide our own instrumentation support */
108  if (node->ps.instrument)
110 
111  if (node->parallel_state != NULL)
112  MultiExecParallelHash(node);
113  else
114  MultiExecPrivateHash(node);
115 
116  /* must provide our own instrumentation support */
117  if (node->ps.instrument)
119 
120  /*
121  * We do not return the hash table directly because it's not a subtype of
122  * Node, and so would violate the MultiExecProcNode API. Instead, our
123  * parent Hashjoin node is expected to know how to fish it out of our node
124  * state. Ugly but not really worth cleaning up, since Hashjoin knows
125  * quite a bit more about Hash besides that.
126  */
127  return NULL;
128 }
void InstrStartNode(Instrumentation *instr)
Definition: instrument.c:68
void InstrStopNode(Instrumentation *instr, double nTuples)
Definition: instrument.c:84
static void MultiExecParallelHash(HashState *node)
Definition: nodeHash.c:214
static void MultiExecPrivateHash(HashState *node)
Definition: nodeHash.c:138
struct ParallelHashJoinState * parallel_state
Definition: execnodes.h:2760

References HashState::hashtable, InstrStartNode(), InstrStopNode(), PlanState::instrument, MultiExecParallelHash(), MultiExecPrivateHash(), HashState::parallel_state, HashJoinTableData::partialTuples, and HashState::ps.

Referenced by MultiExecProcNode().