PostgreSQL Source Code  git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
nodeHash.c File Reference
#include "postgres.h"
#include <math.h>
#include <limits.h>
#include "access/htup_details.h"
#include "access/parallel.h"
#include "catalog/pg_statistic.h"
#include "commands/tablespace.h"
#include "executor/executor.h"
#include "executor/hashjoin.h"
#include "executor/nodeHash.h"
#include "executor/nodeHashjoin.h"
#include "miscadmin.h"
#include "port/pg_bitutils.h"
#include "utils/dynahash.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/wait_event.h"
Include dependency graph for nodeHash.c:

Go to the source code of this file.

Macros

#define NTUP_PER_BUCKET   1
 

Functions

static void ExecHashIncreaseNumBatches (HashJoinTable hashtable)
 
static void ExecHashIncreaseNumBuckets (HashJoinTable hashtable)
 
static void ExecParallelHashIncreaseNumBatches (HashJoinTable hashtable)
 
static void ExecParallelHashIncreaseNumBuckets (HashJoinTable hashtable)
 
static void ExecHashBuildSkewHash (HashState *hashstate, HashJoinTable hashtable, Hash *node, int mcvsToUse)
 
static void ExecHashSkewTableInsert (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue, int bucketNumber)
 
static void ExecHashRemoveNextSkewBucket (HashJoinTable hashtable)
 
static void * dense_alloc (HashJoinTable hashtable, Size size)
 
static HashJoinTuple ExecParallelHashTupleAlloc (HashJoinTable hashtable, size_t size, dsa_pointer *shared)
 
static void MultiExecPrivateHash (HashState *node)
 
static void MultiExecParallelHash (HashState *node)
 
static HashJoinTuple ExecParallelHashFirstTuple (HashJoinTable hashtable, int bucketno)
 
static HashJoinTuple ExecParallelHashNextTuple (HashJoinTable hashtable, HashJoinTuple tuple)
 
static void ExecParallelHashPushTuple (dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
 
static void ExecParallelHashJoinSetUpBatches (HashJoinTable hashtable, int nbatch)
 
static void ExecParallelHashEnsureBatchAccessors (HashJoinTable hashtable)
 
static void ExecParallelHashRepartitionFirst (HashJoinTable hashtable)
 
static void ExecParallelHashRepartitionRest (HashJoinTable hashtable)
 
static HashMemoryChunk ExecParallelHashPopChunkQueue (HashJoinTable hashtable, dsa_pointer *shared)
 
static bool ExecParallelHashTuplePrealloc (HashJoinTable hashtable, int batchno, size_t size)
 
static void ExecParallelHashMergeCounters (HashJoinTable hashtable)
 
static void ExecParallelHashCloseBatchAccessors (HashJoinTable hashtable)
 
static TupleTableSlotExecHash (PlanState *pstate)
 
NodeMultiExecHash (HashState *node)
 
HashStateExecInitHash (Hash *node, EState *estate, int eflags)
 
void ExecEndHash (HashState *node)
 
HashJoinTable ExecHashTableCreate (HashState *state)
 
void ExecChooseHashTableSize (double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
 
void ExecHashTableDestroy (HashJoinTable hashtable)
 
void ExecHashTableInsert (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
void ExecParallelHashTableInsert (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
void ExecParallelHashTableInsertCurrentBatch (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
void ExecHashGetBucketAndBatch (HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
 
bool ExecScanHashBucket (HashJoinState *hjstate, ExprContext *econtext)
 
bool ExecParallelScanHashBucket (HashJoinState *hjstate, ExprContext *econtext)
 
void ExecPrepHashTableForUnmatched (HashJoinState *hjstate)
 
bool ExecParallelPrepHashTableForUnmatched (HashJoinState *hjstate)
 
bool ExecScanHashTableForUnmatched (HashJoinState *hjstate, ExprContext *econtext)
 
bool ExecParallelScanHashTableForUnmatched (HashJoinState *hjstate, ExprContext *econtext)
 
void ExecHashTableReset (HashJoinTable hashtable)
 
void ExecHashTableResetMatchFlags (HashJoinTable hashtable)
 
void ExecReScanHash (HashState *node)
 
int ExecHashGetSkewBucket (HashJoinTable hashtable, uint32 hashvalue)
 
void ExecHashEstimate (HashState *node, ParallelContext *pcxt)
 
void ExecHashInitializeDSM (HashState *node, ParallelContext *pcxt)
 
void ExecHashInitializeWorker (HashState *node, ParallelWorkerContext *pwcxt)
 
void ExecShutdownHash (HashState *node)
 
void ExecHashRetrieveInstrumentation (HashState *node)
 
void ExecHashAccumInstrumentation (HashInstrumentation *instrument, HashJoinTable hashtable)
 
void ExecParallelHashTableAlloc (HashJoinTable hashtable, int batchno)
 
void ExecHashTableDetachBatch (HashJoinTable hashtable)
 
void ExecHashTableDetach (HashJoinTable hashtable)
 
void ExecParallelHashTableSetCurrentBatch (HashJoinTable hashtable, int batchno)
 
size_t get_hash_memory_limit (void)
 

Macro Definition Documentation

◆ NTUP_PER_BUCKET

#define NTUP_PER_BUCKET   1

Definition at line 655 of file nodeHash.c.

Function Documentation

◆ dense_alloc()

static void * dense_alloc ( HashJoinTable  hashtable,
Size  size 
)
static

Definition at line 2761 of file nodeHash.c.

2762 {
2763  HashMemoryChunk newChunk;
2764  char *ptr;
2765 
2766  /* just in case the size is not already aligned properly */
2767  size = MAXALIGN(size);
2768 
2769  /*
2770  * If tuple size is larger than threshold, allocate a separate chunk.
2771  */
2772  if (size > HASH_CHUNK_THRESHOLD)
2773  {
2774  /* allocate new chunk and put it at the beginning of the list */
2775  newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt,
2777  newChunk->maxlen = size;
2778  newChunk->used = size;
2779  newChunk->ntuples = 1;
2780 
2781  /*
2782  * Add this chunk to the list after the first existing chunk, so that
2783  * we don't lose the remaining space in the "current" chunk.
2784  */
2785  if (hashtable->chunks != NULL)
2786  {
2787  newChunk->next = hashtable->chunks->next;
2788  hashtable->chunks->next.unshared = newChunk;
2789  }
2790  else
2791  {
2792  newChunk->next.unshared = hashtable->chunks;
2793  hashtable->chunks = newChunk;
2794  }
2795 
2796  return HASH_CHUNK_DATA(newChunk);
2797  }
2798 
2799  /*
2800  * See if we have enough space for it in the current chunk (if any). If
2801  * not, allocate a fresh chunk.
2802  */
2803  if ((hashtable->chunks == NULL) ||
2804  (hashtable->chunks->maxlen - hashtable->chunks->used) < size)
2805  {
2806  /* allocate new chunk and put it at the beginning of the list */
2807  newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt,
2809 
2810  newChunk->maxlen = HASH_CHUNK_SIZE;
2811  newChunk->used = size;
2812  newChunk->ntuples = 1;
2813 
2814  newChunk->next.unshared = hashtable->chunks;
2815  hashtable->chunks = newChunk;
2816 
2817  return HASH_CHUNK_DATA(newChunk);
2818  }
2819 
2820  /* There is enough space in the current chunk, let's add the tuple */
2821  ptr = HASH_CHUNK_DATA(hashtable->chunks) + hashtable->chunks->used;
2822  hashtable->chunks->used += size;
2823  hashtable->chunks->ntuples += 1;
2824 
2825  /* return pointer to the start of the tuple memory */
2826  return ptr;
2827 }
#define MAXALIGN(LEN)
Definition: c.h:814
struct HashMemoryChunkData * HashMemoryChunk
Definition: hashjoin.h:148
#define HASH_CHUNK_DATA(hc)
Definition: hashjoin.h:152
#define HASH_CHUNK_THRESHOLD
Definition: hashjoin.h:154
#define HASH_CHUNK_HEADER_SIZE
Definition: hashjoin.h:151
#define HASH_CHUNK_SIZE
Definition: hashjoin.h:150
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1181
static pg_noinline void Size size
Definition: slab.c:607
HashMemoryChunk chunks
Definition: hashjoin.h:355
MemoryContext batchCxt
Definition: hashjoin.h:351
union HashMemoryChunkData::@106 next
struct HashMemoryChunkData * unshared
Definition: hashjoin.h:137

References HashJoinTableData::batchCxt, HashJoinTableData::chunks, HASH_CHUNK_DATA, HASH_CHUNK_HEADER_SIZE, HASH_CHUNK_SIZE, HASH_CHUNK_THRESHOLD, MAXALIGN, HashMemoryChunkData::maxlen, MemoryContextAlloc(), HashMemoryChunkData::next, HashMemoryChunkData::ntuples, size, HashMemoryChunkData::unshared, and HashMemoryChunkData::used.

Referenced by ExecHashIncreaseNumBatches(), ExecHashRemoveNextSkewBucket(), and ExecHashTableInsert().

◆ ExecChooseHashTableSize()

void ExecChooseHashTableSize ( double  ntuples,
int  tupwidth,
bool  useskew,
bool  try_combined_hash_mem,
int  parallel_workers,
size_t *  space_allowed,
int *  numbuckets,
int *  numbatches,
int *  num_skew_mcvs 
)

Definition at line 658 of file nodeHash.c.

665 {
666  int tupsize;
667  double inner_rel_bytes;
668  size_t hash_table_bytes;
669  size_t bucket_bytes;
670  size_t max_pointers;
671  int nbatch = 1;
672  int nbuckets;
673  double dbuckets;
674 
675  /* Force a plausible relation size if no info */
676  if (ntuples <= 0.0)
677  ntuples = 1000.0;
678 
679  /*
680  * Estimate tupsize based on footprint of tuple in hashtable... note this
681  * does not allow for any palloc overhead. The manipulations of spaceUsed
682  * don't count palloc overhead either.
683  */
684  tupsize = HJTUPLE_OVERHEAD +
686  MAXALIGN(tupwidth);
687  inner_rel_bytes = ntuples * tupsize;
688 
689  /*
690  * Compute in-memory hashtable size limit from GUCs.
691  */
692  hash_table_bytes = get_hash_memory_limit();
693 
694  /*
695  * Parallel Hash tries to use the combined hash_mem of all workers to
696  * avoid the need to batch. If that won't work, it falls back to hash_mem
697  * per worker and tries to process batches in parallel.
698  */
699  if (try_combined_hash_mem)
700  {
701  /* Careful, this could overflow size_t */
702  double newlimit;
703 
704  newlimit = (double) hash_table_bytes * (double) (parallel_workers + 1);
705  newlimit = Min(newlimit, (double) SIZE_MAX);
706  hash_table_bytes = (size_t) newlimit;
707  }
708 
709  *space_allowed = hash_table_bytes;
710 
711  /*
712  * If skew optimization is possible, estimate the number of skew buckets
713  * that will fit in the memory allowed, and decrement the assumed space
714  * available for the main hash table accordingly.
715  *
716  * We make the optimistic assumption that each skew bucket will contain
717  * one inner-relation tuple. If that turns out to be low, we will recover
718  * at runtime by reducing the number of skew buckets.
719  *
720  * hashtable->skewBucket will have up to 8 times as many HashSkewBucket
721  * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash
722  * will round up to the next power of 2 and then multiply by 4 to reduce
723  * collisions.
724  */
725  if (useskew)
726  {
727  size_t bytes_per_mcv;
728  size_t skew_mcvs;
729 
730  /*----------
731  * Compute number of MCVs we could hold in hash_table_bytes
732  *
733  * Divisor is:
734  * size of a hash tuple +
735  * worst-case size of skewBucket[] per MCV +
736  * size of skewBucketNums[] entry +
737  * size of skew bucket struct itself
738  *----------
739  */
740  bytes_per_mcv = tupsize +
741  (8 * sizeof(HashSkewBucket *)) +
742  sizeof(int) +
744  skew_mcvs = hash_table_bytes / bytes_per_mcv;
745 
746  /*
747  * Now scale by SKEW_HASH_MEM_PERCENT (we do it in this order so as
748  * not to worry about size_t overflow in the multiplication)
749  */
750  skew_mcvs = (skew_mcvs * SKEW_HASH_MEM_PERCENT) / 100;
751 
752  /* Now clamp to integer range */
753  skew_mcvs = Min(skew_mcvs, INT_MAX);
754 
755  *num_skew_mcvs = (int) skew_mcvs;
756 
757  /* Reduce hash_table_bytes by the amount needed for the skew table */
758  if (skew_mcvs > 0)
759  hash_table_bytes -= skew_mcvs * bytes_per_mcv;
760  }
761  else
762  *num_skew_mcvs = 0;
763 
764  /*
765  * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when
766  * memory is filled, assuming a single batch; but limit the value so that
767  * the pointer arrays we'll try to allocate do not exceed hash_table_bytes
768  * nor MaxAllocSize.
769  *
770  * Note that both nbuckets and nbatch must be powers of 2 to make
771  * ExecHashGetBucketAndBatch fast.
772  */
773  max_pointers = hash_table_bytes / sizeof(HashJoinTuple);
774  max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));
775  /* If max_pointers isn't a power of 2, must round it down to one */
776  max_pointers = pg_prevpower2_size_t(max_pointers);
777 
778  /* Also ensure we avoid integer overflow in nbatch and nbuckets */
779  /* (this step is redundant given the current value of MaxAllocSize) */
780  max_pointers = Min(max_pointers, INT_MAX / 2 + 1);
781 
782  dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
783  dbuckets = Min(dbuckets, max_pointers);
784  nbuckets = (int) dbuckets;
785  /* don't let nbuckets be really small, though ... */
786  nbuckets = Max(nbuckets, 1024);
787  /* ... and force it to be a power of 2. */
788  nbuckets = pg_nextpower2_32(nbuckets);
789 
790  /*
791  * If there's not enough space to store the projected number of tuples and
792  * the required bucket headers, we will need multiple batches.
793  */
794  bucket_bytes = sizeof(HashJoinTuple) * nbuckets;
795  if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
796  {
797  /* We'll need multiple batches */
798  size_t sbuckets;
799  double dbatch;
800  int minbatch;
801  size_t bucket_size;
802 
803  /*
804  * If Parallel Hash with combined hash_mem would still need multiple
805  * batches, we'll have to fall back to regular hash_mem budget.
806  */
807  if (try_combined_hash_mem)
808  {
809  ExecChooseHashTableSize(ntuples, tupwidth, useskew,
810  false, parallel_workers,
811  space_allowed,
812  numbuckets,
813  numbatches,
814  num_skew_mcvs);
815  return;
816  }
817 
818  /*
819  * Estimate the number of buckets we'll want to have when hash_mem is
820  * entirely full. Each bucket will contain a bucket pointer plus
821  * NTUP_PER_BUCKET tuples, whose projected size already includes
822  * overhead for the hash code, pointer to the next tuple, etc.
823  */
824  bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple));
825  if (hash_table_bytes <= bucket_size)
826  sbuckets = 1; /* avoid pg_nextpower2_size_t(0) */
827  else
828  sbuckets = pg_nextpower2_size_t(hash_table_bytes / bucket_size);
829  sbuckets = Min(sbuckets, max_pointers);
830  nbuckets = (int) sbuckets;
831  nbuckets = pg_nextpower2_32(nbuckets);
832  bucket_bytes = nbuckets * sizeof(HashJoinTuple);
833 
834  /*
835  * Buckets are simple pointers to hashjoin tuples, while tupsize
836  * includes the pointer, hash code, and MinimalTupleData. So buckets
837  * should never really exceed 25% of hash_mem (even for
838  * NTUP_PER_BUCKET=1); except maybe for hash_mem values that are not
839  * 2^N bytes, where we might get more because of doubling. So let's
840  * look for 50% here.
841  */
842  Assert(bucket_bytes <= hash_table_bytes / 2);
843 
844  /* Calculate required number of batches. */
845  dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
846  dbatch = Min(dbatch, max_pointers);
847  minbatch = (int) dbatch;
848  nbatch = pg_nextpower2_32(Max(2, minbatch));
849  }
850 
851  Assert(nbuckets > 0);
852  Assert(nbatch > 0);
853 
854  *numbuckets = nbuckets;
855  *numbatches = nbatch;
856 }
#define Min(x, y)
Definition: c.h:1007
#define Max(x, y)
Definition: c.h:1001
#define Assert(condition)
Definition: c.h:861
struct HashJoinTupleData * HashJoinTuple
Definition: execnodes.h:2220
#define MaxAllocSize
Definition: fe_memutils.h:22
#define HJTUPLE_OVERHEAD
Definition: hashjoin.h:90
#define SKEW_BUCKET_OVERHEAD
Definition: hashjoin.h:119
#define SKEW_HASH_MEM_PERCENT
Definition: hashjoin.h:121
#define SizeofMinimalTupleHeader
Definition: htup_details.h:647
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
Definition: nodeHash.c:658
#define NTUP_PER_BUCKET
Definition: nodeHash.c:655
size_t get_hash_memory_limit(void)
Definition: nodeHash.c:3487
static uint32 pg_nextpower2_32(uint32 num)
Definition: pg_bitutils.h:189
#define pg_nextpower2_size_t
Definition: pg_bitutils.h:417
#define pg_prevpower2_size_t
Definition: pg_bitutils.h:418

References Assert, get_hash_memory_limit(), HJTUPLE_OVERHEAD, Max, MAXALIGN, MaxAllocSize, Min, NTUP_PER_BUCKET, pg_nextpower2_32(), pg_nextpower2_size_t, pg_prevpower2_size_t, SizeofMinimalTupleHeader, SKEW_BUCKET_OVERHEAD, and SKEW_HASH_MEM_PERCENT.

Referenced by ExecHashTableCreate(), and initial_cost_hashjoin().

◆ ExecEndHash()

void ExecEndHash ( HashState node)

Definition at line 427 of file nodeHash.c.

428 {
430 
431  /*
432  * shut down the subplan
433  */
434  outerPlan = outerPlanState(node);
436 }
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:562
#define outerPlanState(node)
Definition: execnodes.h:1223
#define outerPlan(node)
Definition: plannodes.h:183

References ExecEndNode(), outerPlan, and outerPlanState.

Referenced by ExecEndNode().

◆ ExecHash()

static TupleTableSlot* ExecHash ( PlanState pstate)
static

Definition at line 91 of file nodeHash.c.

92 {
93  elog(ERROR, "Hash node does not support ExecProcNode call convention");
94  return NULL;
95 }
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225

References elog, and ERROR.

Referenced by ExecInitHash().

◆ ExecHashAccumInstrumentation()

void ExecHashAccumInstrumentation ( HashInstrumentation instrument,
HashJoinTable  hashtable 
)

Definition at line 2742 of file nodeHash.c.

2744 {
2745  instrument->nbuckets = Max(instrument->nbuckets,
2746  hashtable->nbuckets);
2747  instrument->nbuckets_original = Max(instrument->nbuckets_original,
2748  hashtable->nbuckets_original);
2749  instrument->nbatch = Max(instrument->nbatch,
2750  hashtable->nbatch);
2751  instrument->nbatch_original = Max(instrument->nbatch_original,
2752  hashtable->nbatch_original);
2753  instrument->space_peak = Max(instrument->space_peak,
2754  hashtable->spacePeak);
2755 }

References Max, HashJoinTableData::nbatch, HashInstrumentation::nbatch, HashJoinTableData::nbatch_original, HashInstrumentation::nbatch_original, HashJoinTableData::nbuckets, HashInstrumentation::nbuckets, HashJoinTableData::nbuckets_original, HashInstrumentation::nbuckets_original, HashInstrumentation::space_peak, and HashJoinTableData::spacePeak.

Referenced by ExecReScanHashJoin(), and ExecShutdownHash().

◆ ExecHashBuildSkewHash()

static void ExecHashBuildSkewHash ( HashState hashstate,
HashJoinTable  hashtable,
Hash node,
int  mcvsToUse 
)
static

Definition at line 2268 of file nodeHash.c.

2270 {
2271  HeapTupleData *statsTuple;
2272  AttStatsSlot sslot;
2273 
2274  /* Do nothing if planner didn't identify the outer relation's join key */
2275  if (!OidIsValid(node->skewTable))
2276  return;
2277  /* Also, do nothing if we don't have room for at least one skew bucket */
2278  if (mcvsToUse <= 0)
2279  return;
2280 
2281  /*
2282  * Try to find the MCV statistics for the outer relation's join key.
2283  */
2284  statsTuple = SearchSysCache3(STATRELATTINH,
2285  ObjectIdGetDatum(node->skewTable),
2286  Int16GetDatum(node->skewColumn),
2287  BoolGetDatum(node->skewInherit));
2288  if (!HeapTupleIsValid(statsTuple))
2289  return;
2290 
2291  if (get_attstatsslot(&sslot, statsTuple,
2292  STATISTIC_KIND_MCV, InvalidOid,
2294  {
2295  double frac;
2296  int nbuckets;
2297  int i;
2298 
2299  if (mcvsToUse > sslot.nvalues)
2300  mcvsToUse = sslot.nvalues;
2301 
2302  /*
2303  * Calculate the expected fraction of outer relation that will
2304  * participate in the skew optimization. If this isn't at least
2305  * SKEW_MIN_OUTER_FRACTION, don't use skew optimization.
2306  */
2307  frac = 0;
2308  for (i = 0; i < mcvsToUse; i++)
2309  frac += sslot.numbers[i];
2310  if (frac < SKEW_MIN_OUTER_FRACTION)
2311  {
2312  free_attstatsslot(&sslot);
2313  ReleaseSysCache(statsTuple);
2314  return;
2315  }
2316 
2317  /*
2318  * Okay, set up the skew hashtable.
2319  *
2320  * skewBucket[] is an open addressing hashtable with a power of 2 size
2321  * that is greater than the number of MCV values. (This ensures there
2322  * will be at least one null entry, so searches will always
2323  * terminate.)
2324  *
2325  * Note: this code could fail if mcvsToUse exceeds INT_MAX/8 or
2326  * MaxAllocSize/sizeof(void *)/8, but that is not currently possible
2327  * since we limit pg_statistic entries to much less than that.
2328  */
2329  nbuckets = pg_nextpower2_32(mcvsToUse + 1);
2330  /* use two more bits just to help avoid collisions */
2331  nbuckets <<= 2;
2332 
2333  hashtable->skewEnabled = true;
2334  hashtable->skewBucketLen = nbuckets;
2335 
2336  /*
2337  * We allocate the bucket memory in the hashtable's batch context. It
2338  * is only needed during the first batch, and this ensures it will be
2339  * automatically removed once the first batch is done.
2340  */
2341  hashtable->skewBucket = (HashSkewBucket **)
2342  MemoryContextAllocZero(hashtable->batchCxt,
2343  nbuckets * sizeof(HashSkewBucket *));
2344  hashtable->skewBucketNums = (int *)
2345  MemoryContextAllocZero(hashtable->batchCxt,
2346  mcvsToUse * sizeof(int));
2347 
2348  hashtable->spaceUsed += nbuckets * sizeof(HashSkewBucket *)
2349  + mcvsToUse * sizeof(int);
2350  hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
2351  + mcvsToUse * sizeof(int);
2352  if (hashtable->spaceUsed > hashtable->spacePeak)
2353  hashtable->spacePeak = hashtable->spaceUsed;
2354 
2355  /*
2356  * Create a skew bucket for each MCV hash value.
2357  *
2358  * Note: it is very important that we create the buckets in order of
2359  * decreasing MCV frequency. If we have to remove some buckets, they
2360  * must be removed in reverse order of creation (see notes in
2361  * ExecHashRemoveNextSkewBucket) and we want the least common MCVs to
2362  * be removed first.
2363  */
2364 
2365  for (i = 0; i < mcvsToUse; i++)
2366  {
2367  uint32 hashvalue;
2368  int bucket;
2369 
2370  hashvalue = DatumGetUInt32(FunctionCall1Coll(hashstate->skew_hashfunction,
2371  hashstate->skew_collation,
2372  sslot.values[i]));
2373 
2374  /*
2375  * While we have not hit a hole in the hashtable and have not hit
2376  * the desired bucket, we have collided with some previous hash
2377  * value, so try the next bucket location. NB: this code must
2378  * match ExecHashGetSkewBucket.
2379  */
2380  bucket = hashvalue & (nbuckets - 1);
2381  while (hashtable->skewBucket[bucket] != NULL &&
2382  hashtable->skewBucket[bucket]->hashvalue != hashvalue)
2383  bucket = (bucket + 1) & (nbuckets - 1);
2384 
2385  /*
2386  * If we found an existing bucket with the same hashvalue, leave
2387  * it alone. It's okay for two MCVs to share a hashvalue.
2388  */
2389  if (hashtable->skewBucket[bucket] != NULL)
2390  continue;
2391 
2392  /* Okay, create a new skew bucket for this hashvalue. */
2393  hashtable->skewBucket[bucket] = (HashSkewBucket *)
2394  MemoryContextAlloc(hashtable->batchCxt,
2395  sizeof(HashSkewBucket));
2396  hashtable->skewBucket[bucket]->hashvalue = hashvalue;
2397  hashtable->skewBucket[bucket]->tuples = NULL;
2398  hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket;
2399  hashtable->nSkewBuckets++;
2400  hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
2401  hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
2402  if (hashtable->spaceUsed > hashtable->spacePeak)
2403  hashtable->spacePeak = hashtable->spaceUsed;
2404  }
2405 
2406  free_attstatsslot(&sslot);
2407  }
2408 
2409  ReleaseSysCache(statsTuple);
2410 }
unsigned int uint32
Definition: c.h:518
#define OidIsValid(objectId)
Definition: c.h:778
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
Definition: fmgr.c:1129
#define SKEW_MIN_OUTER_FRACTION
Definition: hashjoin.h:122
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
int i
Definition: isn.c:72
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:76
void free_attstatsslot(AttStatsSlot *sslot)
Definition: lsyscache.c:3344
bool get_attstatsslot(AttStatsSlot *sslot, HeapTuple statstuple, int reqkind, Oid reqop, int flags)
Definition: lsyscache.c:3234
#define ATTSTATSSLOT_NUMBERS
Definition: lsyscache.h:43
#define ATTSTATSSLOT_VALUES
Definition: lsyscache.h:42
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1215
static uint32 DatumGetUInt32(Datum X)
Definition: postgres.h:222
static Datum Int16GetDatum(int16 X)
Definition: postgres.h:172
static Datum BoolGetDatum(bool X)
Definition: postgres.h:102
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
#define InvalidOid
Definition: postgres_ext.h:36
Datum * values
Definition: lsyscache.h:53
float4 * numbers
Definition: lsyscache.h:56
int * skewBucketNums
Definition: hashjoin.h:320
HashSkewBucket ** skewBucket
Definition: hashjoin.h:317
HashJoinTuple tuples
Definition: hashjoin.h:116
uint32 hashvalue
Definition: hashjoin.h:115
Oid skew_collation
Definition: execnodes.h:2785
FmgrInfo * skew_hashfunction
Definition: execnodes.h:2784
AttrNumber skewColumn
Definition: plannodes.h:1208
Oid skewTable
Definition: plannodes.h:1207
bool skewInherit
Definition: plannodes.h:1209
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:269
HeapTuple SearchSysCache3(int cacheId, Datum key1, Datum key2, Datum key3)
Definition: syscache.c:243

References ATTSTATSSLOT_NUMBERS, ATTSTATSSLOT_VALUES, HashJoinTableData::batchCxt, BoolGetDatum(), DatumGetUInt32(), free_attstatsslot(), FunctionCall1Coll(), get_attstatsslot(), HashSkewBucket::hashvalue, HeapTupleIsValid, i, if(), Int16GetDatum(), InvalidOid, MemoryContextAlloc(), MemoryContextAllocZero(), HashJoinTableData::nSkewBuckets, AttStatsSlot::numbers, AttStatsSlot::nvalues, ObjectIdGetDatum(), OidIsValid, pg_nextpower2_32(), ReleaseSysCache(), SearchSysCache3(), SKEW_BUCKET_OVERHEAD, HashState::skew_collation, HashState::skew_hashfunction, SKEW_MIN_OUTER_FRACTION, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketLen, HashJoinTableData::skewBucketNums, Hash::skewColumn, HashJoinTableData::skewEnabled, Hash::skewInherit, Hash::skewTable, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, HashJoinTableData::spaceUsedSkew, HashSkewBucket::tuples, and AttStatsSlot::values.

Referenced by ExecHashTableCreate().

◆ ExecHashEstimate()

void ExecHashEstimate ( HashState node,
ParallelContext pcxt 
)

Definition at line 2626 of file nodeHash.c.

2627 {
2628  size_t size;
2629 
2630  /* don't need this if not instrumenting or no workers */
2631  if (!node->ps.instrument || pcxt->nworkers == 0)
2632  return;
2633 
2634  size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation));
2635  size = add_size(size, offsetof(SharedHashInfo, hinstrument));
2637  shm_toc_estimate_keys(&pcxt->estimator, 1);
2638 }
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
PlanState ps
Definition: execnodes.h:2780
shm_toc_estimator estimator
Definition: parallel.h:41
Instrumentation * instrument
Definition: execnodes.h:1137

References add_size(), ParallelContext::estimator, PlanState::instrument, mul_size(), ParallelContext::nworkers, HashState::ps, shm_toc_estimate_chunk, shm_toc_estimate_keys, and size.

Referenced by ExecParallelEstimate().

◆ ExecHashGetBucketAndBatch()

void ExecHashGetBucketAndBatch ( HashJoinTable  hashtable,
uint32  hashvalue,
int *  bucketno,
int *  batchno 
)

Definition at line 1825 of file nodeHash.c.

1829 {
1830  uint32 nbuckets = (uint32) hashtable->nbuckets;
1831  uint32 nbatch = (uint32) hashtable->nbatch;
1832 
1833  if (nbatch > 1)
1834  {
1835  *bucketno = hashvalue & (nbuckets - 1);
1836  *batchno = pg_rotate_right32(hashvalue,
1837  hashtable->log2_nbuckets) & (nbatch - 1);
1838  }
1839  else
1840  {
1841  *bucketno = hashvalue & (nbuckets - 1);
1842  *batchno = 0;
1843  }
1844 }
static uint32 pg_rotate_right32(uint32 word, int n)
Definition: pg_bitutils.h:398

References HashJoinTableData::log2_nbuckets, HashJoinTableData::nbatch, HashJoinTableData::nbuckets, and pg_rotate_right32().

Referenced by ExecHashIncreaseNumBatches(), ExecHashIncreaseNumBuckets(), ExecHashJoinImpl(), ExecHashRemoveNextSkewBucket(), ExecHashTableInsert(), ExecParallelHashIncreaseNumBuckets(), ExecParallelHashJoinPartitionOuter(), ExecParallelHashRepartitionFirst(), ExecParallelHashRepartitionRest(), ExecParallelHashTableInsert(), and ExecParallelHashTableInsertCurrentBatch().

◆ ExecHashGetSkewBucket()

int ExecHashGetSkewBucket ( HashJoinTable  hashtable,
uint32  hashvalue 
)

Definition at line 2420 of file nodeHash.c.

2421 {
2422  int bucket;
2423 
2424  /*
2425  * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization (in
2426  * particular, this happens after the initial batch is done).
2427  */
2428  if (!hashtable->skewEnabled)
2429  return INVALID_SKEW_BUCKET_NO;
2430 
2431  /*
2432  * Since skewBucketLen is a power of 2, we can do a modulo by ANDing.
2433  */
2434  bucket = hashvalue & (hashtable->skewBucketLen - 1);
2435 
2436  /*
2437  * While we have not hit a hole in the hashtable and have not hit the
2438  * desired bucket, we have collided with some other hash value, so try the
2439  * next bucket location.
2440  */
2441  while (hashtable->skewBucket[bucket] != NULL &&
2442  hashtable->skewBucket[bucket]->hashvalue != hashvalue)
2443  bucket = (bucket + 1) & (hashtable->skewBucketLen - 1);
2444 
2445  /*
2446  * Found the desired bucket?
2447  */
2448  if (hashtable->skewBucket[bucket] != NULL)
2449  return bucket;
2450 
2451  /*
2452  * There must not be any hashtable entry for this hash value.
2453  */
2454  return INVALID_SKEW_BUCKET_NO;
2455 }
#define INVALID_SKEW_BUCKET_NO
Definition: hashjoin.h:120

References HashSkewBucket::hashvalue, INVALID_SKEW_BUCKET_NO, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketLen, and HashJoinTableData::skewEnabled.

Referenced by ExecHashJoinImpl(), and MultiExecPrivateHash().

◆ ExecHashIncreaseNumBatches()

static void ExecHashIncreaseNumBatches ( HashJoinTable  hashtable)
static

Definition at line 899 of file nodeHash.c.

900 {
901  int oldnbatch = hashtable->nbatch;
902  int curbatch = hashtable->curbatch;
903  int nbatch;
904  long ninmemory;
905  long nfreed;
906  HashMemoryChunk oldchunks;
907 
908  /* do nothing if we've decided to shut off growth */
909  if (!hashtable->growEnabled)
910  return;
911 
912  /* safety check to avoid overflow */
913  if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2)))
914  return;
915 
916  nbatch = oldnbatch * 2;
917  Assert(nbatch > 1);
918 
919 #ifdef HJDEBUG
920  printf("Hashjoin %p: increasing nbatch to %d because space = %zu\n",
921  hashtable, nbatch, hashtable->spaceUsed);
922 #endif
923 
924  if (hashtable->innerBatchFile == NULL)
925  {
926  MemoryContext oldcxt = MemoryContextSwitchTo(hashtable->spillCxt);
927 
928  /* we had no file arrays before */
929  hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
930  hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
931 
932  MemoryContextSwitchTo(oldcxt);
933 
934  /* time to establish the temp tablespaces, too */
936  }
937  else
938  {
939  /* enlarge arrays and zero out added entries */
940  hashtable->innerBatchFile = repalloc0_array(hashtable->innerBatchFile, BufFile *, oldnbatch, nbatch);
941  hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch, nbatch);
942  }
943 
944  hashtable->nbatch = nbatch;
945 
946  /*
947  * Scan through the existing hash table entries and dump out any that are
948  * no longer of the current batch.
949  */
950  ninmemory = nfreed = 0;
951 
952  /* If know we need to resize nbuckets, we can do it while rebatching. */
953  if (hashtable->nbuckets_optimal != hashtable->nbuckets)
954  {
955  /* we never decrease the number of buckets */
956  Assert(hashtable->nbuckets_optimal > hashtable->nbuckets);
957 
958  hashtable->nbuckets = hashtable->nbuckets_optimal;
959  hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
960 
961  hashtable->buckets.unshared =
962  repalloc_array(hashtable->buckets.unshared,
963  HashJoinTuple, hashtable->nbuckets);
964  }
965 
966  /*
967  * We will scan through the chunks directly, so that we can reset the
968  * buckets now and not have to keep track which tuples in the buckets have
969  * already been processed. We will free the old chunks as we go.
970  */
971  memset(hashtable->buckets.unshared, 0,
972  sizeof(HashJoinTuple) * hashtable->nbuckets);
973  oldchunks = hashtable->chunks;
974  hashtable->chunks = NULL;
975 
976  /* so, let's scan through the old chunks, and all tuples in each chunk */
977  while (oldchunks != NULL)
978  {
979  HashMemoryChunk nextchunk = oldchunks->next.unshared;
980 
981  /* position within the buffer (up to oldchunks->used) */
982  size_t idx = 0;
983 
984  /* process all tuples stored in this chunk (and then free it) */
985  while (idx < oldchunks->used)
986  {
987  HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(oldchunks) + idx);
988  MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
989  int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
990  int bucketno;
991  int batchno;
992 
993  ninmemory++;
994  ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
995  &bucketno, &batchno);
996 
997  if (batchno == curbatch)
998  {
999  /* keep tuple in memory - copy it into the new chunk */
1000  HashJoinTuple copyTuple;
1001 
1002  copyTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
1003  memcpy(copyTuple, hashTuple, hashTupleSize);
1004 
1005  /* and add it back to the appropriate bucket */
1006  copyTuple->next.unshared = hashtable->buckets.unshared[bucketno];
1007  hashtable->buckets.unshared[bucketno] = copyTuple;
1008  }
1009  else
1010  {
1011  /* dump it out */
1012  Assert(batchno > curbatch);
1014  hashTuple->hashvalue,
1015  &hashtable->innerBatchFile[batchno],
1016  hashtable);
1017 
1018  hashtable->spaceUsed -= hashTupleSize;
1019  nfreed++;
1020  }
1021 
1022  /* next tuple in this chunk */
1023  idx += MAXALIGN(hashTupleSize);
1024 
1025  /* allow this loop to be cancellable */
1027  }
1028 
1029  /* we're done with this chunk - free it and proceed to the next one */
1030  pfree(oldchunks);
1031  oldchunks = nextchunk;
1032  }
1033 
1034 #ifdef HJDEBUG
1035  printf("Hashjoin %p: freed %ld of %ld tuples, space now %zu\n",
1036  hashtable, nfreed, ninmemory, hashtable->spaceUsed);
1037 #endif
1038 
1039  /*
1040  * If we dumped out either all or none of the tuples in the table, disable
1041  * further expansion of nbatch. This situation implies that we have
1042  * enough tuples of identical hashvalues to overflow spaceAllowed.
1043  * Increasing nbatch will not fix it since there's no way to subdivide the
1044  * group any more finely. We have to just gut it out and hope the server
1045  * has enough RAM.
1046  */
1047  if (nfreed == 0 || nfreed == ninmemory)
1048  {
1049  hashtable->growEnabled = false;
1050 #ifdef HJDEBUG
1051  printf("Hashjoin %p: disabling further increase of nbatch\n",
1052  hashtable);
1053 #endif
1054  }
1055 }
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:259
void PrepareTempTablespaces(void)
Definition: tablespace.c:1331
#define repalloc_array(pointer, type, count)
Definition: fe_memutils.h:78
#define palloc0_array(type, count)
Definition: fe_memutils.h:77
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:91
void pfree(void *pointer)
Definition: mcxt.c:1521
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
static void * dense_alloc(HashJoinTable hashtable, Size size)
Definition: nodeHash.c:2761
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
Definition: nodeHash.c:1825
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr, HashJoinTable hashtable)
#define repalloc0_array(pointer, type, oldcount, count)
Definition: palloc.h:109
#define printf(...)
Definition: port.h:244
MemoryContextSwitchTo(old_ctx)
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:311
union HashJoinTableData::@107 buckets
MemoryContext spillCxt
Definition: hashjoin.h:352
BufFile ** innerBatchFile
Definition: hashjoin.h:341
int log2_nbuckets_optimal
Definition: hashjoin.h:305
BufFile ** outerBatchFile
Definition: hashjoin.h:342
union HashJoinTupleData::@105 next
uint32 hashvalue
Definition: hashjoin.h:86
struct HashJoinTupleData * unshared
Definition: hashjoin.h:83

References Assert, HashJoinTableData::buckets, CHECK_FOR_INTERRUPTS, HashJoinTableData::chunks, HashJoinTableData::curbatch, dense_alloc(), ExecHashGetBucketAndBatch(), ExecHashJoinSaveTuple(), HashJoinTableData::growEnabled, HASH_CHUNK_DATA, HashJoinTupleData::hashvalue, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, idx(), HashJoinTableData::innerBatchFile, HashJoinTableData::log2_nbuckets, HashJoinTableData::log2_nbuckets_optimal, MAXALIGN, MaxAllocSize, MemoryContextSwitchTo(), Min, HashJoinTableData::nbatch, HashJoinTableData::nbuckets, HashJoinTableData::nbuckets_optimal, HashJoinTupleData::next, HashMemoryChunkData::next, HashJoinTableData::outerBatchFile, palloc0_array, pfree(), PrepareTempTablespaces(), printf, repalloc0_array, repalloc_array, HashJoinTableData::spaceUsed, HashJoinTableData::spillCxt, MinimalTupleData::t_len, HashJoinTupleData::unshared, HashMemoryChunkData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashSkewTableInsert(), and ExecHashTableInsert().

◆ ExecHashIncreaseNumBuckets()

static void ExecHashIncreaseNumBuckets ( HashJoinTable  hashtable)
static

Definition at line 1452 of file nodeHash.c.

1453 {
1455 
1456  /* do nothing if not an increase (it's called increase for a reason) */
1457  if (hashtable->nbuckets >= hashtable->nbuckets_optimal)
1458  return;
1459 
1460 #ifdef HJDEBUG
1461  printf("Hashjoin %p: increasing nbuckets %d => %d\n",
1462  hashtable, hashtable->nbuckets, hashtable->nbuckets_optimal);
1463 #endif
1464 
1465  hashtable->nbuckets = hashtable->nbuckets_optimal;
1466  hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
1467 
1468  Assert(hashtable->nbuckets > 1);
1469  Assert(hashtable->nbuckets <= (INT_MAX / 2));
1470  Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets));
1471 
1472  /*
1473  * Just reallocate the proper number of buckets - we don't need to walk
1474  * through them - we can walk the dense-allocated chunks (just like in
1475  * ExecHashIncreaseNumBatches, but without all the copying into new
1476  * chunks)
1477  */
1478  hashtable->buckets.unshared =
1479  repalloc_array(hashtable->buckets.unshared,
1480  HashJoinTuple, hashtable->nbuckets);
1481 
1482  memset(hashtable->buckets.unshared, 0,
1483  hashtable->nbuckets * sizeof(HashJoinTuple));
1484 
1485  /* scan through all tuples in all chunks to rebuild the hash table */
1486  for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next.unshared)
1487  {
1488  /* process all tuples stored in this chunk */
1489  size_t idx = 0;
1490 
1491  while (idx < chunk->used)
1492  {
1494  int bucketno;
1495  int batchno;
1496 
1497  ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
1498  &bucketno, &batchno);
1499 
1500  /* add the tuple to the proper bucket */
1501  hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
1502  hashtable->buckets.unshared[bucketno] = hashTuple;
1503 
1504  /* advance index past the tuple */
1506  HJTUPLE_MINTUPLE(hashTuple)->t_len);
1507  }
1508 
1509  /* allow this loop to be cancellable */
1511  }
1512 }
uint64 chunk

References Assert, HashJoinTableData::buckets, CHECK_FOR_INTERRUPTS, chunk, HashJoinTableData::chunks, ExecHashGetBucketAndBatch(), HASH_CHUNK_DATA, HashJoinTupleData::hashvalue, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, idx(), HashJoinTableData::log2_nbuckets, HashJoinTableData::log2_nbuckets_optimal, MAXALIGN, HashJoinTableData::nbuckets, HashJoinTableData::nbuckets_optimal, HashJoinTupleData::next, printf, repalloc_array, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by MultiExecPrivateHash().

◆ ExecHashInitializeDSM()

void ExecHashInitializeDSM ( HashState node,
ParallelContext pcxt 
)

Definition at line 2645 of file nodeHash.c.

2646 {
2647  size_t size;
2648 
2649  /* don't need this if not instrumenting or no workers */
2650  if (!node->ps.instrument || pcxt->nworkers == 0)
2651  return;
2652 
2653  size = offsetof(SharedHashInfo, hinstrument) +
2654  pcxt->nworkers * sizeof(HashInstrumentation);
2655  node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size);
2656 
2657  /* Each per-worker area must start out as zeroes. */
2658  memset(node->shared_info, 0, size);
2659 
2660  node->shared_info->num_workers = pcxt->nworkers;
2661  shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id,
2662  node->shared_info);
2663 }
struct HashInstrumentation HashInstrumentation
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
SharedHashInfo * shared_info
Definition: execnodes.h:2793
shm_toc * toc
Definition: parallel.h:44
Plan * plan
Definition: execnodes.h:1127
int plan_node_id
Definition: plannodes.h:152

References PlanState::instrument, SharedHashInfo::num_workers, ParallelContext::nworkers, PlanState::plan, Plan::plan_node_id, HashState::ps, HashState::shared_info, shm_toc_allocate(), shm_toc_insert(), size, and ParallelContext::toc.

Referenced by ExecParallelInitializeDSM().

◆ ExecHashInitializeWorker()

void ExecHashInitializeWorker ( HashState node,
ParallelWorkerContext pwcxt 
)

Definition at line 2670 of file nodeHash.c.

2671 {
2672  SharedHashInfo *shared_info;
2673 
2674  /* don't need this if not instrumenting */
2675  if (!node->ps.instrument)
2676  return;
2677 
2678  /*
2679  * Find our entry in the shared area, and set up a pointer to it so that
2680  * we'll accumulate stats there when shutting down or rebuilding the hash
2681  * table.
2682  */
2683  shared_info = (SharedHashInfo *)
2684  shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
2685  node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber];
2686 }
int ParallelWorkerNumber
Definition: parallel.c:114
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
HashInstrumentation * hinstrument
Definition: execnodes.h:2800
HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER]
Definition: execnodes.h:2771

References SharedHashInfo::hinstrument, HashState::hinstrument, PlanState::instrument, ParallelWorkerNumber, PlanState::plan, Plan::plan_node_id, HashState::ps, shm_toc_lookup(), and ParallelWorkerContext::toc.

Referenced by ExecParallelInitializeWorker().

◆ ExecHashRemoveNextSkewBucket()

static void ExecHashRemoveNextSkewBucket ( HashJoinTable  hashtable)
static

Definition at line 2512 of file nodeHash.c.

2513 {
2514  int bucketToRemove;
2515  HashSkewBucket *bucket;
2516  uint32 hashvalue;
2517  int bucketno;
2518  int batchno;
2519  HashJoinTuple hashTuple;
2520 
2521  /* Locate the bucket to remove */
2522  bucketToRemove = hashtable->skewBucketNums[hashtable->nSkewBuckets - 1];
2523  bucket = hashtable->skewBucket[bucketToRemove];
2524 
2525  /*
2526  * Calculate which bucket and batch the tuples belong to in the main
2527  * hashtable. They all have the same hash value, so it's the same for all
2528  * of them. Also note that it's not possible for nbatch to increase while
2529  * we are processing the tuples.
2530  */
2531  hashvalue = bucket->hashvalue;
2532  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
2533 
2534  /* Process all tuples in the bucket */
2535  hashTuple = bucket->tuples;
2536  while (hashTuple != NULL)
2537  {
2538  HashJoinTuple nextHashTuple = hashTuple->next.unshared;
2539  MinimalTuple tuple;
2540  Size tupleSize;
2541 
2542  /*
2543  * This code must agree with ExecHashTableInsert. We do not use
2544  * ExecHashTableInsert directly as ExecHashTableInsert expects a
2545  * TupleTableSlot while we already have HashJoinTuples.
2546  */
2547  tuple = HJTUPLE_MINTUPLE(hashTuple);
2548  tupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
2549 
2550  /* Decide whether to put the tuple in the hash table or a temp file */
2551  if (batchno == hashtable->curbatch)
2552  {
2553  /* Move the tuple to the main hash table */
2554  HashJoinTuple copyTuple;
2555 
2556  /*
2557  * We must copy the tuple into the dense storage, else it will not
2558  * be found by, eg, ExecHashIncreaseNumBatches.
2559  */
2560  copyTuple = (HashJoinTuple) dense_alloc(hashtable, tupleSize);
2561  memcpy(copyTuple, hashTuple, tupleSize);
2562  pfree(hashTuple);
2563 
2564  copyTuple->next.unshared = hashtable->buckets.unshared[bucketno];
2565  hashtable->buckets.unshared[bucketno] = copyTuple;
2566 
2567  /* We have reduced skew space, but overall space doesn't change */
2568  hashtable->spaceUsedSkew -= tupleSize;
2569  }
2570  else
2571  {
2572  /* Put the tuple into a temp file for later batches */
2573  Assert(batchno > hashtable->curbatch);
2574  ExecHashJoinSaveTuple(tuple, hashvalue,
2575  &hashtable->innerBatchFile[batchno],
2576  hashtable);
2577  pfree(hashTuple);
2578  hashtable->spaceUsed -= tupleSize;
2579  hashtable->spaceUsedSkew -= tupleSize;
2580  }
2581 
2582  hashTuple = nextHashTuple;
2583 
2584  /* allow this loop to be cancellable */
2586  }
2587 
2588  /*
2589  * Free the bucket struct itself and reset the hashtable entry to NULL.
2590  *
2591  * NOTE: this is not nearly as simple as it looks on the surface, because
2592  * of the possibility of collisions in the hashtable. Suppose that hash
2593  * values A and B collide at a particular hashtable entry, and that A was
2594  * entered first so B gets shifted to a different table entry. If we were
2595  * to remove A first then ExecHashGetSkewBucket would mistakenly start
2596  * reporting that B is not in the hashtable, because it would hit the NULL
2597  * before finding B. However, we always remove entries in the reverse
2598  * order of creation, so this failure cannot happen.
2599  */
2600  hashtable->skewBucket[bucketToRemove] = NULL;
2601  hashtable->nSkewBuckets--;
2602  pfree(bucket);
2603  hashtable->spaceUsed -= SKEW_BUCKET_OVERHEAD;
2604  hashtable->spaceUsedSkew -= SKEW_BUCKET_OVERHEAD;
2605 
2606  /*
2607  * If we have removed all skew buckets then give up on skew optimization.
2608  * Release the arrays since they aren't useful any more.
2609  */
2610  if (hashtable->nSkewBuckets == 0)
2611  {
2612  hashtable->skewEnabled = false;
2613  pfree(hashtable->skewBucket);
2614  pfree(hashtable->skewBucketNums);
2615  hashtable->skewBucket = NULL;
2616  hashtable->skewBucketNums = NULL;
2617  hashtable->spaceUsed -= hashtable->spaceUsedSkew;
2618  hashtable->spaceUsedSkew = 0;
2619  }
2620 }
size_t Size
Definition: c.h:608

References Assert, HashJoinTableData::buckets, CHECK_FOR_INTERRUPTS, HashJoinTableData::curbatch, dense_alloc(), ExecHashGetBucketAndBatch(), ExecHashJoinSaveTuple(), HashSkewBucket::hashvalue, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, HashJoinTableData::innerBatchFile, HashJoinTupleData::next, HashJoinTableData::nSkewBuckets, pfree(), SKEW_BUCKET_OVERHEAD, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketNums, HashJoinTableData::skewEnabled, HashJoinTableData::spaceUsed, HashJoinTableData::spaceUsedSkew, MinimalTupleData::t_len, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashSkewTableInsert().

◆ ExecHashRetrieveInstrumentation()

void ExecHashRetrieveInstrumentation ( HashState node)

Definition at line 2711 of file nodeHash.c.

2712 {
2713  SharedHashInfo *shared_info = node->shared_info;
2714  size_t size;
2715 
2716  if (shared_info == NULL)
2717  return;
2718 
2719  /* Replace node->shared_info with a copy in backend-local memory. */
2720  size = offsetof(SharedHashInfo, hinstrument) +
2721  shared_info->num_workers * sizeof(HashInstrumentation);
2722  node->shared_info = palloc(size);
2723  memcpy(node->shared_info, shared_info, size);
2724 }
void * palloc(Size size)
Definition: mcxt.c:1317

References SharedHashInfo::num_workers, palloc(), HashState::shared_info, and size.

Referenced by ExecParallelRetrieveInstrumentation().

◆ ExecHashSkewTableInsert()

static void ExecHashSkewTableInsert ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue,
int  bucketNumber 
)
static

Definition at line 2466 of file nodeHash.c.

2470 {
2471  bool shouldFree;
2472  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
2473  HashJoinTuple hashTuple;
2474  int hashTupleSize;
2475 
2476  /* Create the HashJoinTuple */
2477  hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
2478  hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
2479  hashTupleSize);
2480  hashTuple->hashvalue = hashvalue;
2481  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
2483 
2484  /* Push it onto the front of the skew bucket's list */
2485  hashTuple->next.unshared = hashtable->skewBucket[bucketNumber]->tuples;
2486  hashtable->skewBucket[bucketNumber]->tuples = hashTuple;
2487  Assert(hashTuple != hashTuple->next.unshared);
2488 
2489  /* Account for space used, and back off if we've used too much */
2490  hashtable->spaceUsed += hashTupleSize;
2491  hashtable->spaceUsedSkew += hashTupleSize;
2492  if (hashtable->spaceUsed > hashtable->spacePeak)
2493  hashtable->spacePeak = hashtable->spaceUsed;
2494  while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
2495  ExecHashRemoveNextSkewBucket(hashtable);
2496 
2497  /* Check we are not over the total spaceAllowed, either */
2498  if (hashtable->spaceUsed > hashtable->spaceAllowed)
2499  ExecHashIncreaseNumBatches(hashtable);
2500 
2501  if (shouldFree)
2502  heap_free_minimal_tuple(tuple);
2503 }
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1779
void heap_free_minimal_tuple(MinimalTuple mtup)
Definition: heaptuple.c:1523
#define HeapTupleHeaderClearMatch(tup)
Definition: htup_details.h:524
static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
Definition: nodeHash.c:2512
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
Definition: nodeHash.c:899
Size spaceAllowedSkew
Definition: hashjoin.h:348

References Assert, HashJoinTableData::batchCxt, ExecFetchSlotMinimalTuple(), ExecHashIncreaseNumBatches(), ExecHashRemoveNextSkewBucket(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, MemoryContextAlloc(), HashJoinTupleData::next, HashJoinTableData::skewBucket, HashJoinTableData::spaceAllowed, HashJoinTableData::spaceAllowedSkew, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, HashJoinTableData::spaceUsedSkew, MinimalTupleData::t_len, HashSkewBucket::tuples, and HashJoinTupleData::unshared.

Referenced by MultiExecPrivateHash().

◆ ExecHashTableCreate()

HashJoinTable ExecHashTableCreate ( HashState state)

Definition at line 446 of file nodeHash.c.

447 {
448  Hash *node;
449  HashJoinTable hashtable;
450  Plan *outerNode;
451  size_t space_allowed;
452  int nbuckets;
453  int nbatch;
454  double rows;
455  int num_skew_mcvs;
456  int log2_nbuckets;
457  MemoryContext oldcxt;
458 
459  /*
460  * Get information about the size of the relation to be hashed (it's the
461  * "outer" subtree of this node, but the inner relation of the hashjoin).
462  * Compute the appropriate size of the hash table.
463  */
464  node = (Hash *) state->ps.plan;
465  outerNode = outerPlan(node);
466 
467  /*
468  * If this is shared hash table with a partial plan, then we can't use
469  * outerNode->plan_rows to estimate its size. We need an estimate of the
470  * total number of rows across all copies of the partial plan.
471  */
472  rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows;
473 
474  ExecChooseHashTableSize(rows, outerNode->plan_width,
475  OidIsValid(node->skewTable),
476  state->parallel_state != NULL,
477  state->parallel_state != NULL ?
478  state->parallel_state->nparticipants - 1 : 0,
479  &space_allowed,
480  &nbuckets, &nbatch, &num_skew_mcvs);
481 
482  /* nbuckets must be a power of 2 */
483  log2_nbuckets = my_log2(nbuckets);
484  Assert(nbuckets == (1 << log2_nbuckets));
485 
486  /*
487  * Initialize the hash table control block.
488  *
489  * The hashtable control block is just palloc'd from the executor's
490  * per-query memory context. Everything else should be kept inside the
491  * subsidiary hashCxt, batchCxt or spillCxt.
492  */
493  hashtable = palloc_object(HashJoinTableData);
494  hashtable->nbuckets = nbuckets;
495  hashtable->nbuckets_original = nbuckets;
496  hashtable->nbuckets_optimal = nbuckets;
497  hashtable->log2_nbuckets = log2_nbuckets;
498  hashtable->log2_nbuckets_optimal = log2_nbuckets;
499  hashtable->buckets.unshared = NULL;
500  hashtable->skewEnabled = false;
501  hashtable->skewBucket = NULL;
502  hashtable->skewBucketLen = 0;
503  hashtable->nSkewBuckets = 0;
504  hashtable->skewBucketNums = NULL;
505  hashtable->nbatch = nbatch;
506  hashtable->curbatch = 0;
507  hashtable->nbatch_original = nbatch;
508  hashtable->nbatch_outstart = nbatch;
509  hashtable->growEnabled = true;
510  hashtable->totalTuples = 0;
511  hashtable->partialTuples = 0;
512  hashtable->skewTuples = 0;
513  hashtable->innerBatchFile = NULL;
514  hashtable->outerBatchFile = NULL;
515  hashtable->spaceUsed = 0;
516  hashtable->spacePeak = 0;
517  hashtable->spaceAllowed = space_allowed;
518  hashtable->spaceUsedSkew = 0;
519  hashtable->spaceAllowedSkew =
520  hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
521  hashtable->chunks = NULL;
522  hashtable->current_chunk = NULL;
523  hashtable->parallel_state = state->parallel_state;
524  hashtable->area = state->ps.state->es_query_dsa;
525  hashtable->batches = NULL;
526 
527 #ifdef HJDEBUG
528  printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
529  hashtable, nbatch, nbuckets);
530 #endif
531 
532  /*
533  * Create temporary memory contexts in which to keep the hashtable working
534  * storage. See notes in executor/hashjoin.h.
535  */
537  "HashTableContext",
539 
540  hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
541  "HashBatchContext",
543 
544  hashtable->spillCxt = AllocSetContextCreate(hashtable->hashCxt,
545  "HashSpillContext",
547 
548  /* Allocate data that will live for the life of the hashjoin */
549 
550  oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
551 
552  if (nbatch > 1 && hashtable->parallel_state == NULL)
553  {
554  MemoryContext oldctx;
555 
556  /*
557  * allocate and initialize the file arrays in hashCxt (not needed for
558  * parallel case which uses shared tuplestores instead of raw files)
559  */
560  oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
561 
562  hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
563  hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
564 
565  MemoryContextSwitchTo(oldctx);
566 
567  /* The files will not be opened until needed... */
568  /* ... but make sure we have temp tablespaces established for them */
570  }
571 
572  MemoryContextSwitchTo(oldcxt);
573 
574  if (hashtable->parallel_state)
575  {
576  ParallelHashJoinState *pstate = hashtable->parallel_state;
577  Barrier *build_barrier;
578 
579  /*
580  * Attach to the build barrier. The corresponding detach operation is
581  * in ExecHashTableDetach. Note that we won't attach to the
582  * batch_barrier for batch 0 yet. We'll attach later and start it out
583  * in PHJ_BATCH_PROBE phase, because batch 0 is allocated up front and
584  * then loaded while hashing (the standard hybrid hash join
585  * algorithm), and we'll coordinate that using build_barrier.
586  */
587  build_barrier = &pstate->build_barrier;
588  BarrierAttach(build_barrier);
589 
590  /*
591  * So far we have no idea whether there are any other participants,
592  * and if so, what phase they are working on. The only thing we care
593  * about at this point is whether someone has already created the
594  * SharedHashJoinBatch objects and the hash table for batch 0. One
595  * backend will be elected to do that now if necessary.
596  */
597  if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECT &&
598  BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECT))
599  {
600  pstate->nbatch = nbatch;
601  pstate->space_allowed = space_allowed;
602  pstate->growth = PHJ_GROWTH_OK;
603 
604  /* Set up the shared state for coordinating batches. */
605  ExecParallelHashJoinSetUpBatches(hashtable, nbatch);
606 
607  /*
608  * Allocate batch 0's hash table up front so we can load it
609  * directly while hashing.
610  */
611  pstate->nbuckets = nbuckets;
612  ExecParallelHashTableAlloc(hashtable, 0);
613  }
614 
615  /*
616  * The next Parallel Hash synchronization point is in
617  * MultiExecParallelHash(), which will progress it all the way to
618  * PHJ_BUILD_RUN. The caller must not return control from this
619  * executor node between now and then.
620  */
621  }
622  else
623  {
624  /*
625  * Prepare context for the first-scan space allocations; allocate the
626  * hashbucket array therein, and set each bucket "empty".
627  */
628  MemoryContextSwitchTo(hashtable->batchCxt);
629 
630  hashtable->buckets.unshared = palloc0_array(HashJoinTuple, nbuckets);
631 
632  /*
633  * Set up for skew optimization, if possible and there's a need for
634  * more than one batch. (In a one-batch join, there's no point in
635  * it.)
636  */
637  if (nbatch > 1)
638  ExecHashBuildSkewHash(state, hashtable, node, num_skew_mcvs);
639 
640  MemoryContextSwitchTo(oldcxt);
641  }
642 
643  return hashtable;
644 }
int BarrierAttach(Barrier *barrier)
Definition: barrier.c:236
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:265
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
Definition: barrier.c:125
int my_log2(long num)
Definition: dynahash.c:1794
#define palloc_object(type)
Definition: fe_memutils.h:74
@ PHJ_GROWTH_OK
Definition: hashjoin.h:233
#define PHJ_BUILD_ELECT
Definition: hashjoin.h:269
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
static void ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable, Hash *node, int mcvsToUse)
Definition: nodeHash.c:2268
static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
Definition: nodeHash.c:2989
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
Definition: nodeHash.c:3154
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:361
MemoryContext hashCxt
Definition: hashjoin.h:350
double totalTuples
Definition: hashjoin.h:330
double partialTuples
Definition: hashjoin.h:331
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:360
HashMemoryChunk current_chunk
Definition: hashjoin.h:358
dsa_area * area
Definition: hashjoin.h:359
double skewTuples
Definition: hashjoin.h:332
Cardinality rows_total
Definition: plannodes.h:1211
Plan plan
Definition: plannodes.h:1200
ParallelHashGrowth growth
Definition: hashjoin.h:253
bool parallel_aware
Definition: plannodes.h:141
int plan_width
Definition: plannodes.h:136
Cardinality plan_rows
Definition: plannodes.h:135
Definition: regguts.h:323

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, HashJoinTableData::area, Assert, BarrierArriveAndWait(), BarrierAttach(), BarrierPhase(), HashJoinTableData::batchCxt, HashJoinTableData::batches, HashJoinTableData::buckets, ParallelHashJoinState::build_barrier, HashJoinTableData::chunks, HashJoinTableData::curbatch, HashJoinTableData::current_chunk, CurrentMemoryContext, ExecChooseHashTableSize(), ExecHashBuildSkewHash(), ExecParallelHashJoinSetUpBatches(), ExecParallelHashTableAlloc(), HashJoinTableData::growEnabled, ParallelHashJoinState::growth, HashJoinTableData::hashCxt, HashJoinTableData::innerBatchFile, HashJoinTableData::log2_nbuckets, HashJoinTableData::log2_nbuckets_optimal, MemoryContextSwitchTo(), my_log2(), ParallelHashJoinState::nbatch, HashJoinTableData::nbatch, HashJoinTableData::nbatch_original, HashJoinTableData::nbatch_outstart, ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, HashJoinTableData::nbuckets_optimal, HashJoinTableData::nbuckets_original, HashJoinTableData::nSkewBuckets, OidIsValid, HashJoinTableData::outerBatchFile, outerPlan, palloc0_array, palloc_object, Plan::parallel_aware, HashJoinTableData::parallel_state, HashJoinTableData::partialTuples, PHJ_BUILD_ELECT, PHJ_GROWTH_OK, Hash::plan, Plan::plan_rows, Plan::plan_width, PrepareTempTablespaces(), printf, Hash::rows_total, SKEW_HASH_MEM_PERCENT, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketLen, HashJoinTableData::skewBucketNums, HashJoinTableData::skewEnabled, Hash::skewTable, HashJoinTableData::skewTuples, ParallelHashJoinState::space_allowed, HashJoinTableData::spaceAllowed, HashJoinTableData::spaceAllowedSkew, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, HashJoinTableData::spaceUsedSkew, HashJoinTableData::spillCxt, HashJoinTableData::totalTuples, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

◆ ExecHashTableDestroy()

void ExecHashTableDestroy ( HashJoinTable  hashtable)

Definition at line 866 of file nodeHash.c.

867 {
868  int i;
869 
870  /*
871  * Make sure all the temp files are closed. We skip batch 0, since it
872  * can't have any temp files (and the arrays might not even exist if
873  * nbatch is only 1). Parallel hash joins don't use these files.
874  */
875  if (hashtable->innerBatchFile != NULL)
876  {
877  for (i = 1; i < hashtable->nbatch; i++)
878  {
879  if (hashtable->innerBatchFile[i])
880  BufFileClose(hashtable->innerBatchFile[i]);
881  if (hashtable->outerBatchFile[i])
882  BufFileClose(hashtable->outerBatchFile[i]);
883  }
884  }
885 
886  /* Release working memory (batchCxt is a child, so it goes away too) */
887  MemoryContextDelete(hashtable->hashCxt);
888 
889  /* And drop the control block */
890  pfree(hashtable);
891 }
void BufFileClose(BufFile *file)
Definition: buffile.c:412
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454

References BufFileClose(), HashJoinTableData::hashCxt, i, HashJoinTableData::innerBatchFile, MemoryContextDelete(), HashJoinTableData::nbatch, HashJoinTableData::outerBatchFile, and pfree().

Referenced by ExecEndHashJoin(), and ExecReScanHashJoin().

◆ ExecHashTableDetach()

void ExecHashTableDetach ( HashJoinTable  hashtable)

Definition at line 3266 of file nodeHash.c.

3267 {
3268  ParallelHashJoinState *pstate = hashtable->parallel_state;
3269 
3270  /*
3271  * If we're involved in a parallel query, we must either have gotten all
3272  * the way to PHJ_BUILD_RUN, or joined too late and be in PHJ_BUILD_FREE.
3273  */
3274  Assert(!pstate ||
3276 
3277  if (pstate && BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_RUN)
3278  {
3279  int i;
3280 
3281  /* Make sure any temporary files are closed. */
3282  if (hashtable->batches)
3283  {
3284  for (i = 0; i < hashtable->nbatch; ++i)
3285  {
3286  sts_end_write(hashtable->batches[i].inner_tuples);
3287  sts_end_write(hashtable->batches[i].outer_tuples);
3290  }
3291  }
3292 
3293  /* If we're last to detach, clean up shared memory. */
3294  if (BarrierArriveAndDetach(&pstate->build_barrier))
3295  {
3296  /*
3297  * Late joining processes will see this state and give up
3298  * immediately.
3299  */
3301 
3302  if (DsaPointerIsValid(pstate->batches))
3303  {
3304  dsa_free(hashtable->area, pstate->batches);
3305  pstate->batches = InvalidDsaPointer;
3306  }
3307  }
3308  }
3309  hashtable->parallel_state = NULL;
3310 }
bool BarrierArriveAndDetach(Barrier *barrier)
Definition: barrier.c:203
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:826
#define InvalidDsaPointer
Definition: dsa.h:78
#define DsaPointerIsValid(x)
Definition: dsa.h:106
#define PHJ_BUILD_FREE
Definition: hashjoin.h:274
#define PHJ_BUILD_RUN
Definition: hashjoin.h:273
void sts_end_write(SharedTuplestoreAccessor *accessor)
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * outer_tuples
Definition: hashjoin.h:221
SharedTuplestoreAccessor * inner_tuples
Definition: hashjoin.h:220
dsa_pointer batches
Definition: hashjoin.h:248

References HashJoinTableData::area, Assert, BarrierArriveAndDetach(), BarrierPhase(), ParallelHashJoinState::batches, HashJoinTableData::batches, ParallelHashJoinState::build_barrier, dsa_free(), DsaPointerIsValid, i, ParallelHashJoinBatchAccessor::inner_tuples, InvalidDsaPointer, HashJoinTableData::nbatch, ParallelHashJoinBatchAccessor::outer_tuples, HashJoinTableData::parallel_state, PHJ_BUILD_FREE, PHJ_BUILD_RUN, sts_end_parallel_scan(), and sts_end_write().

Referenced by ExecHashJoinReInitializeDSM(), and ExecShutdownHashJoin().

◆ ExecHashTableDetachBatch()

void ExecHashTableDetachBatch ( HashJoinTable  hashtable)

Definition at line 3174 of file nodeHash.c.

3175 {
3176  if (hashtable->parallel_state != NULL &&
3177  hashtable->curbatch >= 0)
3178  {
3179  int curbatch = hashtable->curbatch;
3180  ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
3181  bool attached = true;
3182 
3183  /* Make sure any temporary files are closed. */
3184  sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
3185  sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
3186 
3187  /* After attaching we always get at least to PHJ_BATCH_PROBE. */
3190 
3191  /*
3192  * If we're abandoning the PHJ_BATCH_PROBE phase early without having
3193  * reached the end of it, it means the plan doesn't want any more
3194  * tuples, and it is happy to abandon any tuples buffered in this
3195  * process's subplans. For correctness, we can't allow any process to
3196  * execute the PHJ_BATCH_SCAN phase, because we will never have the
3197  * complete set of match bits. Therefore we skip emitting unmatched
3198  * tuples in all backends (if this is a full/right join), as if those
3199  * tuples were all due to be emitted by this process and it has
3200  * abandoned them too.
3201  */
3202  if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE &&
3203  !hashtable->batches[curbatch].outer_eof)
3204  {
3205  /*
3206  * This flag may be written to by multiple backends during
3207  * PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN
3208  * phase so requires no extra locking.
3209  */
3210  batch->skip_unmatched = true;
3211  }
3212 
3213  /*
3214  * Even if we aren't doing a full/right outer join, we'll step through
3215  * the PHJ_BATCH_SCAN phase just to maintain the invariant that
3216  * freeing happens in PHJ_BATCH_FREE, but that'll be wait-free.
3217  */
3218  if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE)
3219  attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
3220  if (attached && BarrierArriveAndDetach(&batch->batch_barrier))
3221  {
3222  /*
3223  * We are not longer attached to the batch barrier, but we're the
3224  * process that was chosen to free resources and it's safe to
3225  * assert the current phase. The ParallelHashJoinBatch can't go
3226  * away underneath us while we are attached to the build barrier,
3227  * making this access safe.
3228  */
3230 
3231  /* Free shared chunks and buckets. */
3232  while (DsaPointerIsValid(batch->chunks))
3233  {
3235  dsa_get_address(hashtable->area, batch->chunks);
3236  dsa_pointer next = chunk->next.shared;
3237 
3238  dsa_free(hashtable->area, batch->chunks);
3239  batch->chunks = next;
3240  }
3241  if (DsaPointerIsValid(batch->buckets))
3242  {
3243  dsa_free(hashtable->area, batch->buckets);
3244  batch->buckets = InvalidDsaPointer;
3245  }
3246  }
3247 
3248  /*
3249  * Track the largest batch we've been attached to. Though each
3250  * backend might see a different subset of batches, explain.c will
3251  * scan the results from all backends to find the largest value.
3252  */
3253  hashtable->spacePeak =
3254  Max(hashtable->spacePeak,
3255  batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
3256 
3257  /* Remember that we are not attached to a batch. */
3258  hashtable->curbatch = -1;
3259  }
3260 }
bool BarrierArriveAndDetachExceptLast(Barrier *barrier)
Definition: barrier.c:213
static int32 next
Definition: blutils.c:219
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:942
uint64 dsa_pointer
Definition: dsa.h:62
#define PHJ_BATCH_SCAN
Definition: hashjoin.h:281
#define PHJ_BATCH_PROBE
Definition: hashjoin.h:280
#define PHJ_BATCH_FREE
Definition: hashjoin.h:282
ParallelHashJoinBatch * shared
Definition: hashjoin.h:209
dsa_pointer chunks
Definition: hashjoin.h:167
dsa_pointer buckets
Definition: hashjoin.h:164

References HashJoinTableData::area, Assert, BarrierArriveAndDetach(), BarrierArriveAndDetachExceptLast(), BarrierPhase(), ParallelHashJoinBatch::batch_barrier, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, chunk, ParallelHashJoinBatch::chunks, HashJoinTableData::curbatch, dsa_free(), dsa_get_address(), DsaPointerIsValid, ParallelHashJoinBatchAccessor::inner_tuples, InvalidDsaPointer, Max, HashJoinTableData::nbuckets, next, ParallelHashJoinBatchAccessor::outer_eof, ParallelHashJoinBatchAccessor::outer_tuples, HashJoinTableData::parallel_state, PHJ_BATCH_FREE, PHJ_BATCH_PROBE, PHJ_BATCH_SCAN, ParallelHashJoinBatchAccessor::shared, ParallelHashJoinBatch::size, ParallelHashJoinBatch::skip_unmatched, HashJoinTableData::spacePeak, and sts_end_parallel_scan().

Referenced by ExecHashJoinReInitializeDSM(), ExecParallelHashJoinNewBatch(), ExecParallelPrepHashTableForUnmatched(), and ExecShutdownHashJoin().

◆ ExecHashTableInsert()

void ExecHashTableInsert ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1614 of file nodeHash.c.

1617 {
1618  bool shouldFree;
1619  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1620  int bucketno;
1621  int batchno;
1622 
1623  ExecHashGetBucketAndBatch(hashtable, hashvalue,
1624  &bucketno, &batchno);
1625 
1626  /*
1627  * decide whether to put the tuple in the hash table or a temp file
1628  */
1629  if (batchno == hashtable->curbatch)
1630  {
1631  /*
1632  * put the tuple in hash table
1633  */
1634  HashJoinTuple hashTuple;
1635  int hashTupleSize;
1636  double ntuples = (hashtable->totalTuples - hashtable->skewTuples);
1637 
1638  /* Create the HashJoinTuple */
1639  hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
1640  hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
1641 
1642  hashTuple->hashvalue = hashvalue;
1643  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1644 
1645  /*
1646  * We always reset the tuple-matched flag on insertion. This is okay
1647  * even when reloading a tuple from a batch file, since the tuple
1648  * could not possibly have been matched to an outer tuple before it
1649  * went into the batch file.
1650  */
1652 
1653  /* Push it onto the front of the bucket's list */
1654  hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
1655  hashtable->buckets.unshared[bucketno] = hashTuple;
1656 
1657  /*
1658  * Increase the (optimal) number of buckets if we just exceeded the
1659  * NTUP_PER_BUCKET threshold, but only when there's still a single
1660  * batch.
1661  */
1662  if (hashtable->nbatch == 1 &&
1663  ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))
1664  {
1665  /* Guard against integer overflow and alloc size overflow */
1666  if (hashtable->nbuckets_optimal <= INT_MAX / 2 &&
1667  hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple))
1668  {
1669  hashtable->nbuckets_optimal *= 2;
1670  hashtable->log2_nbuckets_optimal += 1;
1671  }
1672  }
1673 
1674  /* Account for space used, and back off if we've used too much */
1675  hashtable->spaceUsed += hashTupleSize;
1676  if (hashtable->spaceUsed > hashtable->spacePeak)
1677  hashtable->spacePeak = hashtable->spaceUsed;
1678  if (hashtable->spaceUsed +
1679  hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
1680  > hashtable->spaceAllowed)
1681  ExecHashIncreaseNumBatches(hashtable);
1682  }
1683  else
1684  {
1685  /*
1686  * put the tuple into a temp file for later batches
1687  */
1688  Assert(batchno > hashtable->curbatch);
1689  ExecHashJoinSaveTuple(tuple,
1690  hashvalue,
1691  &hashtable->innerBatchFile[batchno],
1692  hashtable);
1693  }
1694 
1695  if (shouldFree)
1696  heap_free_minimal_tuple(tuple);
1697 }

References Assert, HashJoinTableData::buckets, HashJoinTableData::curbatch, dense_alloc(), ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecHashIncreaseNumBatches(), ExecHashJoinSaveTuple(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, HashJoinTableData::innerBatchFile, HashJoinTableData::log2_nbuckets_optimal, MaxAllocSize, HashJoinTableData::nbatch, HashJoinTableData::nbuckets_optimal, HashJoinTupleData::next, NTUP_PER_BUCKET, HashJoinTableData::skewTuples, HashJoinTableData::spaceAllowed, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, MinimalTupleData::t_len, HashJoinTableData::totalTuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinNewBatch(), and MultiExecPrivateHash().

◆ ExecHashTableReset()

void ExecHashTableReset ( HashJoinTable  hashtable)

Definition at line 2192 of file nodeHash.c.

2193 {
2194  MemoryContext oldcxt;
2195  int nbuckets = hashtable->nbuckets;
2196 
2197  /*
2198  * Release all the hash buckets and tuples acquired in the prior pass, and
2199  * reinitialize the context for a new pass.
2200  */
2201  MemoryContextReset(hashtable->batchCxt);
2202  oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
2203 
2204  /* Reallocate and reinitialize the hash bucket headers. */
2205  hashtable->buckets.unshared = palloc0_array(HashJoinTuple, nbuckets);
2206 
2207  hashtable->spaceUsed = 0;
2208 
2209  MemoryContextSwitchTo(oldcxt);
2210 
2211  /* Forget the chunks (the memory was freed by the context reset above). */
2212  hashtable->chunks = NULL;
2213 }
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383

References HashJoinTableData::batchCxt, HashJoinTableData::buckets, HashJoinTableData::chunks, MemoryContextReset(), MemoryContextSwitchTo(), HashJoinTableData::nbuckets, palloc0_array, HashJoinTableData::spaceUsed, and HashJoinTableData::unshared.

Referenced by ExecHashJoinNewBatch().

◆ ExecHashTableResetMatchFlags()

void ExecHashTableResetMatchFlags ( HashJoinTable  hashtable)

Definition at line 2220 of file nodeHash.c.

2221 {
2222  HashJoinTuple tuple;
2223  int i;
2224 
2225  /* Reset all flags in the main table ... */
2226  for (i = 0; i < hashtable->nbuckets; i++)
2227  {
2228  for (tuple = hashtable->buckets.unshared[i]; tuple != NULL;
2229  tuple = tuple->next.unshared)
2231  }
2232 
2233  /* ... and the same for the skew buckets, if any */
2234  for (i = 0; i < hashtable->nSkewBuckets; i++)
2235  {
2236  int j = hashtable->skewBucketNums[i];
2237  HashSkewBucket *skewBucket = hashtable->skewBucket[j];
2238 
2239  for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared)
2241  }
2242 }
int j
Definition: isn.c:73

References HashJoinTableData::buckets, HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, i, j, HashJoinTableData::nbuckets, HashJoinTupleData::next, HashJoinTableData::nSkewBuckets, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketNums, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecReScanHashJoin().

◆ ExecInitHash()

HashState* ExecInitHash ( Hash node,
EState estate,
int  eflags 
)

Definition at line 370 of file nodeHash.c.

371 {
372  HashState *hashstate;
373 
374  /* check for unsupported flags */
375  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
376 
377  /*
378  * create state structure
379  */
380  hashstate = makeNode(HashState);
381  hashstate->ps.plan = (Plan *) node;
382  hashstate->ps.state = estate;
383  hashstate->ps.ExecProcNode = ExecHash;
384  /* delay building hashtable until ExecHashTableCreate() in executor run */
385  hashstate->hashtable = NULL;
386 
387  /*
388  * Miscellaneous initialization
389  *
390  * create expression context for node
391  */
392  ExecAssignExprContext(estate, &hashstate->ps);
393 
394  /*
395  * initialize child nodes
396  */
397  outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags);
398 
399  /*
400  * initialize our result slot and type. No need to build projection
401  * because this node doesn't do projections.
402  */
404  hashstate->ps.ps_ProjInfo = NULL;
405 
406  Assert(node->plan.qual == NIL);
407 
408  /*
409  * Delay initialization of hash_expr until ExecInitHashJoin(). We cannot
410  * build the ExprState here as we don't yet know the join type we're going
411  * to be hashing values for and we need to know that before calling
412  * ExecBuildHash32Expr as the keep_nulls parameter depends on the join
413  * type.
414  */
415  hashstate->hash_expr = NULL;
416 
417  return hashstate;
418 }
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:142
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1886
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:86
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:485
#define EXEC_FLAG_BACKWARD
Definition: executor.h:68
#define EXEC_FLAG_MARK
Definition: executor.h:69
static TupleTableSlot * ExecHash(PlanState *pstate)
Definition: nodeHash.c:91
#define makeNode(_type_)
Definition: nodes.h:155
#define NIL
Definition: pg_list.h:68
HashJoinTable hashtable
Definition: execnodes.h:2781
ExprState * hash_expr
Definition: execnodes.h:2782
EState * state
Definition: execnodes.h:1129
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:1167
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:1133
List * qual
Definition: plannodes.h:154

References Assert, EXEC_FLAG_BACKWARD, EXEC_FLAG_MARK, ExecAssignExprContext(), ExecHash(), ExecInitNode(), ExecInitResultTupleSlotTL(), PlanState::ExecProcNode, HashState::hash_expr, HashState::hashtable, makeNode, NIL, outerPlan, outerPlanState, PlanState::plan, Hash::plan, HashState::ps, PlanState::ps_ProjInfo, Plan::qual, PlanState::state, and TTSOpsMinimalTuple.

Referenced by ExecInitNode().

◆ ExecParallelHashCloseBatchAccessors()

static void ExecParallelHashCloseBatchAccessors ( HashJoinTable  hashtable)
static

Definition at line 3069 of file nodeHash.c.

3070 {
3071  int i;
3072 
3073  for (i = 0; i < hashtable->nbatch; ++i)
3074  {
3075  /* Make sure no files are left open. */
3076  sts_end_write(hashtable->batches[i].inner_tuples);
3077  sts_end_write(hashtable->batches[i].outer_tuples);
3080  }
3081  pfree(hashtable->batches);
3082  hashtable->batches = NULL;
3083 }

References HashJoinTableData::batches, i, ParallelHashJoinBatchAccessor::inner_tuples, HashJoinTableData::nbatch, ParallelHashJoinBatchAccessor::outer_tuples, pfree(), sts_end_parallel_scan(), and sts_end_write().

Referenced by ExecParallelHashEnsureBatchAccessors(), and ExecParallelHashIncreaseNumBatches().

◆ ExecParallelHashEnsureBatchAccessors()

static void ExecParallelHashEnsureBatchAccessors ( HashJoinTable  hashtable)
static

Definition at line 3090 of file nodeHash.c.

3091 {
3092  ParallelHashJoinState *pstate = hashtable->parallel_state;
3093  ParallelHashJoinBatch *batches;
3094  MemoryContext oldcxt;
3095  int i;
3096 
3097  if (hashtable->batches != NULL)
3098  {
3099  if (hashtable->nbatch == pstate->nbatch)
3100  return;
3102  }
3103 
3104  /*
3105  * We should never see a state where the batch-tracking array is freed,
3106  * because we should have given up sooner if we join when the build
3107  * barrier has reached the PHJ_BUILD_FREE phase.
3108  */
3109  Assert(DsaPointerIsValid(pstate->batches));
3110 
3111  /*
3112  * Use hash join spill memory context to allocate accessors, including
3113  * buffers for the temporary files.
3114  */
3115  oldcxt = MemoryContextSwitchTo(hashtable->spillCxt);
3116 
3117  /* Allocate this backend's accessor array. */
3118  hashtable->nbatch = pstate->nbatch;
3119  hashtable->batches =
3121 
3122  /* Find the base of the pseudo-array of ParallelHashJoinBatch objects. */
3123  batches = (ParallelHashJoinBatch *)
3124  dsa_get_address(hashtable->area, pstate->batches);
3125 
3126  /* Set up the accessor array and attach to the tuplestores. */
3127  for (i = 0; i < hashtable->nbatch; ++i)
3128  {
3129  ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i];
3130  ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i);
3131 
3132  accessor->shared = shared;
3133  accessor->preallocated = 0;
3134  accessor->done = false;
3135  accessor->outer_eof = false;
3136  accessor->inner_tuples =
3139  &pstate->fileset);
3140  accessor->outer_tuples =
3142  pstate->nparticipants),
3144  &pstate->fileset);
3145  }
3146 
3147  MemoryContextSwitchTo(oldcxt);
3148 }
#define ParallelHashJoinBatchInner(batch)
Definition: hashjoin.h:182
#define NthParallelHashJoinBatch(base, n)
Definition: hashjoin.h:198
#define ParallelHashJoinBatchOuter(batch, nparticipants)
Definition: hashjoin.h:187
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable)
Definition: nodeHash.c:3069
SharedTuplestoreAccessor * sts_attach(SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
SharedFileSet fileset
Definition: hashjoin.h:265

References HashJoinTableData::area, Assert, ParallelHashJoinState::batches, HashJoinTableData::batches, ParallelHashJoinBatchAccessor::done, dsa_get_address(), DsaPointerIsValid, ExecParallelHashCloseBatchAccessors(), ParallelHashJoinState::fileset, i, ParallelHashJoinBatchAccessor::inner_tuples, MemoryContextSwitchTo(), ParallelHashJoinState::nbatch, HashJoinTableData::nbatch, ParallelHashJoinState::nparticipants, NthParallelHashJoinBatch, ParallelHashJoinBatchAccessor::outer_eof, ParallelHashJoinBatchAccessor::outer_tuples, palloc0_array, HashJoinTableData::parallel_state, ParallelHashJoinBatchInner, ParallelHashJoinBatchOuter, ParallelWorkerNumber, ParallelHashJoinBatchAccessor::preallocated, ParallelHashJoinBatchAccessor::shared, HashJoinTableData::spillCxt, and sts_attach().

Referenced by ExecParallelHashIncreaseNumBatches(), ExecParallelHashIncreaseNumBuckets(), and MultiExecParallelHash().

◆ ExecParallelHashFirstTuple()

static HashJoinTuple ExecParallelHashFirstTuple ( HashJoinTable  hashtable,
int  bucketno 
)
inlinestatic

Definition at line 3316 of file nodeHash.c.

3317 {
3318  HashJoinTuple tuple;
3319  dsa_pointer p;
3320 
3321  Assert(hashtable->parallel_state);
3322  p = dsa_pointer_atomic_read(&hashtable->buckets.shared[bucketno]);
3323  tuple = (HashJoinTuple) dsa_get_address(hashtable->area, p);
3324 
3325  return tuple;
3326 }
#define dsa_pointer_atomic_read
Definition: dsa.h:65
dsa_pointer_atomic * shared
Definition: hashjoin.h:313

References HashJoinTableData::area, Assert, HashJoinTableData::buckets, dsa_get_address(), dsa_pointer_atomic_read, HashJoinTableData::parallel_state, and HashJoinTableData::shared.

Referenced by ExecParallelScanHashBucket(), and ExecParallelScanHashTableForUnmatched().

◆ ExecParallelHashIncreaseNumBatches()

static void ExecParallelHashIncreaseNumBatches ( HashJoinTable  hashtable)
static

Definition at line 1063 of file nodeHash.c.

1064 {
1065  ParallelHashJoinState *pstate = hashtable->parallel_state;
1066 
1068 
1069  /*
1070  * It's unlikely, but we need to be prepared for new participants to show
1071  * up while we're in the middle of this operation so we need to switch on
1072  * barrier phase here.
1073  */
1075  {
1077 
1078  /*
1079  * Elect one participant to prepare to grow the number of batches.
1080  * This involves reallocating or resetting the buckets of batch 0
1081  * in preparation for all participants to begin repartitioning the
1082  * tuples.
1083  */
1085  WAIT_EVENT_HASH_GROW_BATCHES_ELECT))
1086  {
1087  dsa_pointer_atomic *buckets;
1088  ParallelHashJoinBatch *old_batch0;
1089  int new_nbatch;
1090  int i;
1091 
1092  /* Move the old batch out of the way. */
1093  old_batch0 = hashtable->batches[0].shared;
1094  pstate->old_batches = pstate->batches;
1095  pstate->old_nbatch = hashtable->nbatch;
1096  pstate->batches = InvalidDsaPointer;
1097 
1098  /* Free this backend's old accessors. */
1100 
1101  /* Figure out how many batches to use. */
1102  if (hashtable->nbatch == 1)
1103  {
1104  /*
1105  * We are going from single-batch to multi-batch. We need
1106  * to switch from one large combined memory budget to the
1107  * regular hash_mem budget.
1108  */
1110 
1111  /*
1112  * The combined hash_mem of all participants wasn't
1113  * enough. Therefore one batch per participant would be
1114  * approximately equivalent and would probably also be
1115  * insufficient. So try two batches per participant,
1116  * rounded up to a power of two.
1117  */
1118  new_nbatch = pg_nextpower2_32(pstate->nparticipants * 2);
1119  }
1120  else
1121  {
1122  /*
1123  * We were already multi-batched. Try doubling the number
1124  * of batches.
1125  */
1126  new_nbatch = hashtable->nbatch * 2;
1127  }
1128 
1129  /* Allocate new larger generation of batches. */
1130  Assert(hashtable->nbatch == pstate->nbatch);
1131  ExecParallelHashJoinSetUpBatches(hashtable, new_nbatch);
1132  Assert(hashtable->nbatch == pstate->nbatch);
1133 
1134  /* Replace or recycle batch 0's bucket array. */
1135  if (pstate->old_nbatch == 1)
1136  {
1137  double dtuples;
1138  double dbuckets;
1139  int new_nbuckets;
1140  uint32 max_buckets;
1141 
1142  /*
1143  * We probably also need a smaller bucket array. How many
1144  * tuples do we expect per batch, assuming we have only
1145  * half of them so far? Normally we don't need to change
1146  * the bucket array's size, because the size of each batch
1147  * stays the same as we add more batches, but in this
1148  * special case we move from a large batch to many smaller
1149  * batches and it would be wasteful to keep the large
1150  * array.
1151  */
1152  dtuples = (old_batch0->ntuples * 2.0) / new_nbatch;
1153 
1154  /*
1155  * We need to calculate the maximum number of buckets to
1156  * stay within the MaxAllocSize boundary. Round the
1157  * maximum number to the previous power of 2 given that
1158  * later we round the number to the next power of 2.
1159  */
1160  max_buckets = pg_prevpower2_32((uint32)
1161  (MaxAllocSize / sizeof(dsa_pointer_atomic)));
1162  dbuckets = ceil(dtuples / NTUP_PER_BUCKET);
1163  dbuckets = Min(dbuckets, max_buckets);
1164  new_nbuckets = (int) dbuckets;
1165  new_nbuckets = Max(new_nbuckets, 1024);
1166  new_nbuckets = pg_nextpower2_32(new_nbuckets);
1167  dsa_free(hashtable->area, old_batch0->buckets);
1168  hashtable->batches[0].shared->buckets =
1169  dsa_allocate(hashtable->area,
1170  sizeof(dsa_pointer_atomic) * new_nbuckets);
1171  buckets = (dsa_pointer_atomic *)
1172  dsa_get_address(hashtable->area,
1173  hashtable->batches[0].shared->buckets);
1174  for (i = 0; i < new_nbuckets; ++i)
1176  pstate->nbuckets = new_nbuckets;
1177  }
1178  else
1179  {
1180  /* Recycle the existing bucket array. */
1181  hashtable->batches[0].shared->buckets = old_batch0->buckets;
1182  buckets = (dsa_pointer_atomic *)
1183  dsa_get_address(hashtable->area, old_batch0->buckets);
1184  for (i = 0; i < hashtable->nbuckets; ++i)
1186  }
1187 
1188  /* Move all chunks to the work queue for parallel processing. */
1189  pstate->chunk_work_queue = old_batch0->chunks;
1190 
1191  /* Disable further growth temporarily while we're growing. */
1192  pstate->growth = PHJ_GROWTH_DISABLED;
1193  }
1194  else
1195  {
1196  /* All other participants just flush their tuples to disk. */
1198  }
1199  /* Fall through. */
1200 
1202  /* Wait for the above to be finished. */
1204  WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE);
1205  /* Fall through. */
1206 
1208  /* Make sure that we have the current dimensions and buckets. */
1211  /* Then partition, flush counters. */
1214  ExecParallelHashMergeCounters(hashtable);
1215  /* Wait for the above to be finished. */
1217  WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION);
1218  /* Fall through. */
1219 
1221 
1222  /*
1223  * Elect one participant to clean up and decide whether further
1224  * repartitioning is needed, or should be disabled because it's
1225  * not helping.
1226  */
1228  WAIT_EVENT_HASH_GROW_BATCHES_DECIDE))
1229  {
1230  ParallelHashJoinBatch *old_batches;
1231  bool space_exhausted = false;
1232  bool extreme_skew_detected = false;
1233 
1234  /* Make sure that we have the current dimensions and buckets. */
1237 
1238  old_batches = dsa_get_address(hashtable->area, pstate->old_batches);
1239 
1240  /* Are any of the new generation of batches exhausted? */
1241  for (int i = 0; i < hashtable->nbatch; ++i)
1242  {
1243  ParallelHashJoinBatch *batch;
1244  ParallelHashJoinBatch *old_batch;
1245  int parent;
1246 
1247  batch = hashtable->batches[i].shared;
1248  if (batch->space_exhausted ||
1249  batch->estimated_size > pstate->space_allowed)
1250  space_exhausted = true;
1251 
1252  parent = i % pstate->old_nbatch;
1253  old_batch = NthParallelHashJoinBatch(old_batches, parent);
1254  if (old_batch->space_exhausted ||
1255  batch->estimated_size > pstate->space_allowed)
1256  {
1257  /*
1258  * Did this batch receive ALL of the tuples from its
1259  * parent batch? That would indicate that further
1260  * repartitioning isn't going to help (the hash values
1261  * are probably all the same).
1262  */
1263  if (batch->ntuples == hashtable->batches[parent].shared->old_ntuples)
1264  extreme_skew_detected = true;
1265  }
1266  }
1267 
1268  /* Don't keep growing if it's not helping or we'd overflow. */
1269  if (extreme_skew_detected || hashtable->nbatch >= INT_MAX / 2)
1270  pstate->growth = PHJ_GROWTH_DISABLED;
1271  else if (space_exhausted)
1273  else
1274  pstate->growth = PHJ_GROWTH_OK;
1275 
1276  /* Free the old batches in shared memory. */
1277  dsa_free(hashtable->area, pstate->old_batches);
1278  pstate->old_batches = InvalidDsaPointer;
1279  }
1280  /* Fall through. */
1281 
1283  /* Wait for the above to complete. */
1285  WAIT_EVENT_HASH_GROW_BATCHES_FINISH);
1286  }
1287 }
#define dsa_pointer_atomic_init
Definition: dsa.h:64
#define dsa_allocate(area, size)
Definition: dsa.h:109
#define dsa_pointer_atomic_write
Definition: dsa.h:66
#define PHJ_GROW_BATCHES_REPARTITION
Definition: hashjoin.h:287
#define PHJ_GROW_BATCHES_ELECT
Definition: hashjoin.h:285
#define PHJ_BUILD_HASH_INNER
Definition: hashjoin.h:271
#define PHJ_GROW_BATCHES_DECIDE
Definition: hashjoin.h:288
#define PHJ_GROW_BATCHES_REALLOCATE
Definition: hashjoin.h:286
#define PHJ_GROW_BATCHES_FINISH
Definition: hashjoin.h:289
#define PHJ_GROW_BATCHES_PHASE(n)
Definition: hashjoin.h:290
@ PHJ_GROWTH_NEED_MORE_BATCHES
Definition: hashjoin.h:237
@ PHJ_GROWTH_DISABLED
Definition: hashjoin.h:239
void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
Definition: nodeHash.c:3364
static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
Definition: nodeHash.c:3090
static void ExecParallelHashRepartitionRest(HashJoinTable hashtable)
Definition: nodeHash.c:1362
static void ExecParallelHashMergeCounters(HashJoinTable hashtable)
Definition: nodeHash.c:1422
static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable)
Definition: nodeHash.c:1295
static uint32 pg_prevpower2_32(uint32 num)
Definition: pg_bitutils.h:235
Barrier grow_batches_barrier
Definition: hashjoin.h:261
dsa_pointer old_batches
Definition: hashjoin.h:249
dsa_pointer chunk_work_queue
Definition: hashjoin.h:254

References HashJoinTableData::area, Assert, BarrierArriveAndWait(), BarrierPhase(), ParallelHashJoinState::batches, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, ParallelHashJoinState::build_barrier, ParallelHashJoinState::chunk_work_queue, ParallelHashJoinBatch::chunks, dsa_allocate, dsa_free(), dsa_get_address(), dsa_pointer_atomic_init, dsa_pointer_atomic_write, ParallelHashJoinBatch::estimated_size, ExecParallelHashCloseBatchAccessors(), ExecParallelHashEnsureBatchAccessors(), ExecParallelHashJoinSetUpBatches(), ExecParallelHashMergeCounters(), ExecParallelHashRepartitionFirst(), ExecParallelHashRepartitionRest(), ExecParallelHashTableSetCurrentBatch(), get_hash_memory_limit(), ParallelHashJoinState::grow_batches_barrier, ParallelHashJoinState::growth, i, InvalidDsaPointer, Max, MaxAllocSize, Min, ParallelHashJoinState::nbatch, HashJoinTableData::nbatch, ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, ParallelHashJoinState::nparticipants, NthParallelHashJoinBatch, NTUP_PER_BUCKET, ParallelHashJoinBatch::ntuples, ParallelHashJoinState::old_batches, ParallelHashJoinState::old_nbatch, ParallelHashJoinBatch::old_ntuples, HashJoinTableData::parallel_state, pg_nextpower2_32(), pg_prevpower2_32(), PHJ_BUILD_HASH_INNER, PHJ_GROW_BATCHES_DECIDE, PHJ_GROW_BATCHES_ELECT, PHJ_GROW_BATCHES_FINISH, PHJ_GROW_BATCHES_PHASE, PHJ_GROW_BATCHES_REALLOCATE, PHJ_GROW_BATCHES_REPARTITION, PHJ_GROWTH_DISABLED, PHJ_GROWTH_NEED_MORE_BATCHES, PHJ_GROWTH_OK, ParallelHashJoinBatchAccessor::shared, ParallelHashJoinState::space_allowed, and ParallelHashJoinBatch::space_exhausted.

Referenced by ExecParallelHashTupleAlloc(), ExecParallelHashTuplePrealloc(), and MultiExecParallelHash().

◆ ExecParallelHashIncreaseNumBuckets()

static void ExecParallelHashIncreaseNumBuckets ( HashJoinTable  hashtable)
static

Definition at line 1515 of file nodeHash.c.

1516 {
1517  ParallelHashJoinState *pstate = hashtable->parallel_state;
1518  int i;
1520  dsa_pointer chunk_s;
1521 
1523 
1524  /*
1525  * It's unlikely, but we need to be prepared for new participants to show
1526  * up while we're in the middle of this operation so we need to switch on
1527  * barrier phase here.
1528  */
1530  {
1532  /* Elect one participant to prepare to increase nbuckets. */
1534  WAIT_EVENT_HASH_GROW_BUCKETS_ELECT))
1535  {
1536  size_t size;
1537  dsa_pointer_atomic *buckets;
1538 
1539  /* Double the size of the bucket array. */
1540  pstate->nbuckets *= 2;
1541  size = pstate->nbuckets * sizeof(dsa_pointer_atomic);
1542  hashtable->batches[0].shared->size += size / 2;
1543  dsa_free(hashtable->area, hashtable->batches[0].shared->buckets);
1544  hashtable->batches[0].shared->buckets =
1545  dsa_allocate(hashtable->area, size);
1546  buckets = (dsa_pointer_atomic *)
1547  dsa_get_address(hashtable->area,
1548  hashtable->batches[0].shared->buckets);
1549  for (i = 0; i < pstate->nbuckets; ++i)
1551 
1552  /* Put the chunk list onto the work queue. */
1553  pstate->chunk_work_queue = hashtable->batches[0].shared->chunks;
1554 
1555  /* Clear the flag. */
1556  pstate->growth = PHJ_GROWTH_OK;
1557  }
1558  /* Fall through. */
1559 
1561  /* Wait for the above to complete. */
1563  WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE);
1564  /* Fall through. */
1565 
1567  /* Reinsert all tuples into the hash table. */
1570  while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_s)))
1571  {
1572  size_t idx = 0;
1573 
1574  while (idx < chunk->used)
1575  {
1577  dsa_pointer shared = chunk_s + HASH_CHUNK_HEADER_SIZE + idx;
1578  int bucketno;
1579  int batchno;
1580 
1581  ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
1582  &bucketno, &batchno);
1583  Assert(batchno == 0);
1584 
1585  /* add the tuple to the proper bucket */
1586  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1587  hashTuple, shared);
1588 
1589  /* advance index past the tuple */
1591  HJTUPLE_MINTUPLE(hashTuple)->t_len);
1592  }
1593 
1594  /* allow this loop to be cancellable */
1596  }
1598  WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT);
1599  }
1600 }
pg_atomic_uint64 dsa_pointer_atomic
Definition: dsa.h:63
#define PHJ_GROW_BUCKETS_REINSERT
Definition: hashjoin.h:295
#define PHJ_GROW_BUCKETS_ELECT
Definition: hashjoin.h:293
#define PHJ_GROW_BUCKETS_PHASE(n)
Definition: hashjoin.h:296
#define PHJ_GROW_BUCKETS_REALLOCATE
Definition: hashjoin.h:294
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
Definition: nodeHash.c:3346
static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared)
Definition: nodeHash.c:3385
Barrier grow_buckets_barrier
Definition: hashjoin.h:262

References HashJoinTableData::area, Assert, BarrierArriveAndWait(), BarrierPhase(), HashJoinTableData::batches, ParallelHashJoinBatch::buckets, HashJoinTableData::buckets, ParallelHashJoinState::build_barrier, CHECK_FOR_INTERRUPTS, chunk, ParallelHashJoinState::chunk_work_queue, ParallelHashJoinBatch::chunks, dsa_allocate, dsa_free(), dsa_get_address(), dsa_pointer_atomic_init, ExecHashGetBucketAndBatch(), ExecParallelHashEnsureBatchAccessors(), ExecParallelHashPopChunkQueue(), ExecParallelHashPushTuple(), ExecParallelHashTableSetCurrentBatch(), ParallelHashJoinState::grow_buckets_barrier, ParallelHashJoinState::growth, HASH_CHUNK_DATA, HASH_CHUNK_HEADER_SIZE, HashJoinTupleData::hashvalue, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, i, idx(), InvalidDsaPointer, MAXALIGN, ParallelHashJoinState::nbuckets, HashJoinTableData::parallel_state, PHJ_BUILD_HASH_INNER, PHJ_GROW_BUCKETS_ELECT, PHJ_GROW_BUCKETS_PHASE, PHJ_GROW_BUCKETS_REALLOCATE, PHJ_GROW_BUCKETS_REINSERT, PHJ_GROWTH_OK, ParallelHashJoinBatchAccessor::shared, HashJoinTableData::shared, size, and ParallelHashJoinBatch::size.

Referenced by ExecParallelHashTupleAlloc(), ExecParallelHashTuplePrealloc(), and MultiExecParallelHash().

◆ ExecParallelHashJoinSetUpBatches()

static void ExecParallelHashJoinSetUpBatches ( HashJoinTable  hashtable,
int  nbatch 
)
static

Definition at line 2989 of file nodeHash.c.

2990 {
2991  ParallelHashJoinState *pstate = hashtable->parallel_state;
2992  ParallelHashJoinBatch *batches;
2993  MemoryContext oldcxt;
2994  int i;
2995 
2996  Assert(hashtable->batches == NULL);
2997 
2998  /* Allocate space. */
2999  pstate->batches =
3000  dsa_allocate0(hashtable->area,
3001  EstimateParallelHashJoinBatch(hashtable) * nbatch);
3002  pstate->nbatch = nbatch;
3003  batches = dsa_get_address(hashtable->area, pstate->batches);
3004 
3005  /*
3006  * Use hash join spill memory context to allocate accessors, including
3007  * buffers for the temporary files.
3008  */
3009  oldcxt = MemoryContextSwitchTo(hashtable->spillCxt);
3010 
3011  /* Allocate this backend's accessor array. */
3012  hashtable->nbatch = nbatch;
3013  hashtable->batches =
3015 
3016  /* Set up the shared state, tuplestores and backend-local accessors. */
3017  for (i = 0; i < hashtable->nbatch; ++i)
3018  {
3019  ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i];
3020  ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i);
3021  char name[MAXPGPATH];
3022 
3023  /*
3024  * All members of shared were zero-initialized. We just need to set
3025  * up the Barrier.
3026  */
3027  BarrierInit(&shared->batch_barrier, 0);
3028  if (i == 0)
3029  {
3030  /* Batch 0 doesn't need to be loaded. */
3031  BarrierAttach(&shared->batch_barrier);
3032  while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBE)
3033  BarrierArriveAndWait(&shared->batch_barrier, 0);
3034  BarrierDetach(&shared->batch_barrier);
3035  }
3036 
3037  /* Initialize accessor state. All members were zero-initialized. */
3038  accessor->shared = shared;
3039 
3040  /* Initialize the shared tuplestores. */
3041  snprintf(name, sizeof(name), "i%dof%d", i, hashtable->nbatch);
3042  accessor->inner_tuples =
3044  pstate->nparticipants,
3046  sizeof(uint32),
3048  &pstate->fileset,
3049  name);
3050  snprintf(name, sizeof(name), "o%dof%d", i, hashtable->nbatch);
3051  accessor->outer_tuples =
3053  pstate->nparticipants),
3054  pstate->nparticipants,
3056  sizeof(uint32),
3058  &pstate->fileset,
3059  name);
3060  }
3061 
3062  MemoryContextSwitchTo(oldcxt);
3063 }
void BarrierInit(Barrier *barrier, int participants)
Definition: barrier.c:100
bool BarrierDetach(Barrier *barrier)
Definition: barrier.c:256
#define dsa_allocate0(area, size)
Definition: dsa.h:113
#define EstimateParallelHashJoinBatch(hashtable)
Definition: hashjoin.h:193
#define MAXPGPATH
#define snprintf
Definition: port.h:238
SharedTuplestoreAccessor * sts_initialize(SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
#define SHARED_TUPLESTORE_SINGLE_PASS
const char * name

References HashJoinTableData::area, Assert, BarrierArriveAndWait(), BarrierAttach(), BarrierDetach(), BarrierInit(), BarrierPhase(), ParallelHashJoinBatch::batch_barrier, ParallelHashJoinState::batches, HashJoinTableData::batches, dsa_allocate0, dsa_get_address(), EstimateParallelHashJoinBatch, ParallelHashJoinState::fileset, i, ParallelHashJoinBatchAccessor::inner_tuples, MAXPGPATH, MemoryContextSwitchTo(), name, ParallelHashJoinState::nbatch, HashJoinTableData::nbatch, ParallelHashJoinState::nparticipants, NthParallelHashJoinBatch, ParallelHashJoinBatchAccessor::outer_tuples, palloc0_array, HashJoinTableData::parallel_state, ParallelHashJoinBatchInner, ParallelHashJoinBatchOuter, ParallelWorkerNumber, PHJ_BATCH_PROBE, ParallelHashJoinBatchAccessor::shared, SHARED_TUPLESTORE_SINGLE_PASS, snprintf, HashJoinTableData::spillCxt, and sts_initialize().

Referenced by ExecHashTableCreate(), and ExecParallelHashIncreaseNumBatches().

◆ ExecParallelHashMergeCounters()

static void ExecParallelHashMergeCounters ( HashJoinTable  hashtable)
static

Definition at line 1422 of file nodeHash.c.

1423 {
1424  ParallelHashJoinState *pstate = hashtable->parallel_state;
1425  int i;
1426 
1427  LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
1428  pstate->total_tuples = 0;
1429  for (i = 0; i < hashtable->nbatch; ++i)
1430  {
1431  ParallelHashJoinBatchAccessor *batch = &hashtable->batches[i];
1432 
1433  batch->shared->size += batch->size;
1434  batch->shared->estimated_size += batch->estimated_size;
1435  batch->shared->ntuples += batch->ntuples;
1436  batch->shared->old_ntuples += batch->old_ntuples;
1437  batch->size = 0;
1438  batch->estimated_size = 0;
1439  batch->ntuples = 0;
1440  batch->old_ntuples = 0;
1441  pstate->total_tuples += batch->shared->ntuples;
1442  }
1443  LWLockRelease(&pstate->lock);
1444 }
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_EXCLUSIVE
Definition: lwlock.h:114

References HashJoinTableData::batches, ParallelHashJoinBatch::estimated_size, ParallelHashJoinBatchAccessor::estimated_size, i, ParallelHashJoinState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), HashJoinTableData::nbatch, ParallelHashJoinBatch::ntuples, ParallelHashJoinBatchAccessor::ntuples, ParallelHashJoinBatch::old_ntuples, ParallelHashJoinBatchAccessor::old_ntuples, HashJoinTableData::parallel_state, ParallelHashJoinBatchAccessor::shared, ParallelHashJoinBatch::size, ParallelHashJoinBatchAccessor::size, and ParallelHashJoinState::total_tuples.

Referenced by ExecParallelHashIncreaseNumBatches(), and MultiExecParallelHash().

◆ ExecParallelHashNextTuple()

static HashJoinTuple ExecParallelHashNextTuple ( HashJoinTable  hashtable,
HashJoinTuple  tuple 
)
inlinestatic

Definition at line 3332 of file nodeHash.c.

3333 {
3335 
3336  Assert(hashtable->parallel_state);
3337  next = (HashJoinTuple) dsa_get_address(hashtable->area, tuple->next.shared);
3338 
3339  return next;
3340 }
dsa_pointer shared
Definition: hashjoin.h:84

References HashJoinTableData::area, Assert, dsa_get_address(), next, HashJoinTupleData::next, HashJoinTableData::parallel_state, and HashJoinTupleData::shared.

Referenced by ExecParallelScanHashBucket(), and ExecParallelScanHashTableForUnmatched().

◆ ExecParallelHashPopChunkQueue()

static HashMemoryChunk ExecParallelHashPopChunkQueue ( HashJoinTable  hashtable,
dsa_pointer shared 
)
static

Definition at line 3385 of file nodeHash.c.

3386 {
3387  ParallelHashJoinState *pstate = hashtable->parallel_state;
3389 
3390  LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
3391  if (DsaPointerIsValid(pstate->chunk_work_queue))
3392  {
3393  *shared = pstate->chunk_work_queue;
3395  dsa_get_address(hashtable->area, *shared);
3396  pstate->chunk_work_queue = chunk->next.shared;
3397  }
3398  else
3399  chunk = NULL;
3400  LWLockRelease(&pstate->lock);
3401 
3402  return chunk;
3403 }

References HashJoinTableData::area, chunk, ParallelHashJoinState::chunk_work_queue, dsa_get_address(), DsaPointerIsValid, ParallelHashJoinState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), and HashJoinTableData::parallel_state.

Referenced by ExecParallelHashIncreaseNumBuckets(), and ExecParallelHashRepartitionFirst().

◆ ExecParallelHashPushTuple()

static void ExecParallelHashPushTuple ( dsa_pointer_atomic head,
HashJoinTuple  tuple,
dsa_pointer  tuple_shared 
)
inlinestatic

Definition at line 3346 of file nodeHash.c.

3349 {
3350  for (;;)
3351  {
3352  tuple->next.shared = dsa_pointer_atomic_read(head);
3354  &tuple->next.shared,
3355  tuple_shared))
3356  break;
3357  }
3358 }
#define dsa_pointer_atomic_compare_exchange
Definition: dsa.h:68

References dsa_pointer_atomic_compare_exchange, dsa_pointer_atomic_read, HashJoinTupleData::next, and HashJoinTupleData::shared.

Referenced by ExecParallelHashIncreaseNumBuckets(), ExecParallelHashRepartitionFirst(), ExecParallelHashTableInsert(), and ExecParallelHashTableInsertCurrentBatch().

◆ ExecParallelHashRepartitionFirst()

static void ExecParallelHashRepartitionFirst ( HashJoinTable  hashtable)
static

Definition at line 1295 of file nodeHash.c.

1296 {
1297  dsa_pointer chunk_shared;
1299 
1300  Assert(hashtable->nbatch == hashtable->parallel_state->nbatch);
1301 
1302  while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_shared)))
1303  {
1304  size_t idx = 0;
1305 
1306  /* Repartition all tuples in this chunk. */
1307  while (idx < chunk->used)
1308  {
1310  MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
1311  HashJoinTuple copyTuple;
1312  dsa_pointer shared;
1313  int bucketno;
1314  int batchno;
1315 
1316  ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
1317  &bucketno, &batchno);
1318 
1319  Assert(batchno < hashtable->nbatch);
1320  if (batchno == 0)
1321  {
1322  /* It still belongs in batch 0. Copy to a new chunk. */
1323  copyTuple =
1324  ExecParallelHashTupleAlloc(hashtable,
1325  HJTUPLE_OVERHEAD + tuple->t_len,
1326  &shared);
1327  copyTuple->hashvalue = hashTuple->hashvalue;
1328  memcpy(HJTUPLE_MINTUPLE(copyTuple), tuple, tuple->t_len);
1329  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1330  copyTuple, shared);
1331  }
1332  else
1333  {
1334  size_t tuple_size =
1335  MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
1336 
1337  /* It belongs in a later batch. */
1338  hashtable->batches[batchno].estimated_size += tuple_size;
1339  sts_puttuple(hashtable->batches[batchno].inner_tuples,
1340  &hashTuple->hashvalue, tuple);
1341  }
1342 
1343  /* Count this tuple. */
1344  ++hashtable->batches[0].old_ntuples;
1345  ++hashtable->batches[batchno].ntuples;
1346 
1348  HJTUPLE_MINTUPLE(hashTuple)->t_len);
1349  }
1350 
1351  /* Free this chunk. */
1352  dsa_free(hashtable->area, chunk_shared);
1353 
1355  }
1356 }
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
Definition: nodeHash.c:2841
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)

References HashJoinTableData::area, Assert, HashJoinTableData::batches, HashJoinTableData::buckets, CHECK_FOR_INTERRUPTS, chunk, dsa_free(), ParallelHashJoinBatchAccessor::estimated_size, ExecHashGetBucketAndBatch(), ExecParallelHashPopChunkQueue(), ExecParallelHashPushTuple(), ExecParallelHashTupleAlloc(), HASH_CHUNK_DATA, HashJoinTupleData::hashvalue, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, idx(), ParallelHashJoinBatchAccessor::inner_tuples, MAXALIGN, ParallelHashJoinState::nbatch, HashJoinTableData::nbatch, ParallelHashJoinBatchAccessor::ntuples, ParallelHashJoinBatchAccessor::old_ntuples, HashJoinTableData::parallel_state, HashJoinTableData::shared, sts_puttuple(), and MinimalTupleData::t_len.

Referenced by ExecParallelHashIncreaseNumBatches().

◆ ExecParallelHashRepartitionRest()

static void ExecParallelHashRepartitionRest ( HashJoinTable  hashtable)
static

Definition at line 1362 of file nodeHash.c.

1363 {
1364  ParallelHashJoinState *pstate = hashtable->parallel_state;
1365  int old_nbatch = pstate->old_nbatch;
1366  SharedTuplestoreAccessor **old_inner_tuples;
1367  ParallelHashJoinBatch *old_batches;
1368  int i;
1369 
1370  /* Get our hands on the previous generation of batches. */
1371  old_batches = (ParallelHashJoinBatch *)
1372  dsa_get_address(hashtable->area, pstate->old_batches);
1373  old_inner_tuples = palloc0_array(SharedTuplestoreAccessor *, old_nbatch);
1374  for (i = 1; i < old_nbatch; ++i)
1375  {
1376  ParallelHashJoinBatch *shared =
1377  NthParallelHashJoinBatch(old_batches, i);
1378 
1379  old_inner_tuples[i] = sts_attach(ParallelHashJoinBatchInner(shared),
1381  &pstate->fileset);
1382  }
1383 
1384  /* Join in the effort to repartition them. */
1385  for (i = 1; i < old_nbatch; ++i)
1386  {
1387  MinimalTuple tuple;
1388  uint32 hashvalue;
1389 
1390  /* Scan one partition from the previous generation. */
1391  sts_begin_parallel_scan(old_inner_tuples[i]);
1392  while ((tuple = sts_parallel_scan_next(old_inner_tuples[i], &hashvalue)))
1393  {
1394  size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
1395  int bucketno;
1396  int batchno;
1397 
1398  /* Decide which partition it goes to in the new generation. */
1399  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
1400  &batchno);
1401 
1402  hashtable->batches[batchno].estimated_size += tuple_size;
1403  ++hashtable->batches[batchno].ntuples;
1404  ++hashtable->batches[i].old_ntuples;
1405 
1406  /* Store the tuple its new batch. */
1407  sts_puttuple(hashtable->batches[batchno].inner_tuples,
1408  &hashvalue, tuple);
1409 
1411  }
1412  sts_end_parallel_scan(old_inner_tuples[i]);
1413  }
1414 
1415  pfree(old_inner_tuples);
1416 }
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)

References HashJoinTableData::area, HashJoinTableData::batches, CHECK_FOR_INTERRUPTS, dsa_get_address(), ParallelHashJoinBatchAccessor::estimated_size, ExecHashGetBucketAndBatch(), ParallelHashJoinState::fileset, HJTUPLE_OVERHEAD, i, ParallelHashJoinBatchAccessor::inner_tuples, MAXALIGN, NthParallelHashJoinBatch, ParallelHashJoinBatchAccessor::ntuples, ParallelHashJoinState::old_batches, ParallelHashJoinState::old_nbatch, ParallelHashJoinBatchAccessor::old_ntuples, palloc0_array, HashJoinTableData::parallel_state, ParallelHashJoinBatchInner, ParallelWorkerNumber, pfree(), sts_attach(), sts_begin_parallel_scan(), sts_end_parallel_scan(), sts_parallel_scan_next(), sts_puttuple(), and MinimalTupleData::t_len.

Referenced by ExecParallelHashIncreaseNumBatches().

◆ ExecParallelHashTableAlloc()

void ExecParallelHashTableAlloc ( HashJoinTable  hashtable,
int  batchno 
)

Definition at line 3154 of file nodeHash.c.

3155 {
3156  ParallelHashJoinBatch *batch = hashtable->batches[batchno].shared;
3157  dsa_pointer_atomic *buckets;
3158  int nbuckets = hashtable->parallel_state->nbuckets;
3159  int i;
3160 
3161  batch->buckets =
3162  dsa_allocate(hashtable->area, sizeof(dsa_pointer_atomic) * nbuckets);
3163  buckets = (dsa_pointer_atomic *)
3164  dsa_get_address(hashtable->area, batch->buckets);
3165  for (i = 0; i < nbuckets; ++i)
3167 }

References HashJoinTableData::area, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, dsa_allocate, dsa_get_address(), dsa_pointer_atomic_init, i, InvalidDsaPointer, ParallelHashJoinState::nbuckets, HashJoinTableData::parallel_state, and ParallelHashJoinBatchAccessor::shared.

Referenced by ExecHashTableCreate(), and ExecParallelHashJoinNewBatch().

◆ ExecParallelHashTableInsert()

void ExecParallelHashTableInsert ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1704 of file nodeHash.c.

1707 {
1708  bool shouldFree;
1709  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1710  dsa_pointer shared;
1711  int bucketno;
1712  int batchno;
1713 
1714 retry:
1715  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1716 
1717  if (batchno == 0)
1718  {
1719  HashJoinTuple hashTuple;
1720 
1721  /* Try to load it into memory. */
1724  hashTuple = ExecParallelHashTupleAlloc(hashtable,
1725  HJTUPLE_OVERHEAD + tuple->t_len,
1726  &shared);
1727  if (hashTuple == NULL)
1728  goto retry;
1729 
1730  /* Store the hash value in the HashJoinTuple header. */
1731  hashTuple->hashvalue = hashvalue;
1732  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1734 
1735  /* Push it onto the front of the bucket's list */
1736  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1737  hashTuple, shared);
1738  }
1739  else
1740  {
1741  size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
1742 
1743  Assert(batchno > 0);
1744 
1745  /* Try to preallocate space in the batch if necessary. */
1746  if (hashtable->batches[batchno].preallocated < tuple_size)
1747  {
1748  if (!ExecParallelHashTuplePrealloc(hashtable, batchno, tuple_size))
1749  goto retry;
1750  }
1751 
1752  Assert(hashtable->batches[batchno].preallocated >= tuple_size);
1753  hashtable->batches[batchno].preallocated -= tuple_size;
1754  sts_puttuple(hashtable->batches[batchno].inner_tuples, &hashvalue,
1755  tuple);
1756  }
1757  ++hashtable->batches[batchno].ntuples;
1758 
1759  if (shouldFree)
1760  heap_free_minimal_tuple(tuple);
1761 }
static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
Definition: nodeHash.c:3426

References Assert, BarrierPhase(), HashJoinTableData::batches, HashJoinTableData::buckets, ParallelHashJoinState::build_barrier, ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecParallelHashPushTuple(), ExecParallelHashTupleAlloc(), ExecParallelHashTuplePrealloc(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, ParallelHashJoinBatchAccessor::inner_tuples, MAXALIGN, ParallelHashJoinBatchAccessor::ntuples, HashJoinTableData::parallel_state, PHJ_BUILD_HASH_INNER, ParallelHashJoinBatchAccessor::preallocated, HashJoinTableData::shared, sts_puttuple(), and MinimalTupleData::t_len.

Referenced by MultiExecParallelHash().

◆ ExecParallelHashTableInsertCurrentBatch()

void ExecParallelHashTableInsertCurrentBatch ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1770 of file nodeHash.c.

1773 {
1774  bool shouldFree;
1775  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1776  HashJoinTuple hashTuple;
1777  dsa_pointer shared;
1778  int batchno;
1779  int bucketno;
1780 
1781  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1782  Assert(batchno == hashtable->curbatch);
1783  hashTuple = ExecParallelHashTupleAlloc(hashtable,
1784  HJTUPLE_OVERHEAD + tuple->t_len,
1785  &shared);
1786  hashTuple->hashvalue = hashvalue;
1787  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1789  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1790  hashTuple, shared);
1791 
1792  if (shouldFree)
1793  heap_free_minimal_tuple(tuple);
1794 }

References Assert, HashJoinTableData::buckets, HashJoinTableData::curbatch, ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecParallelHashPushTuple(), ExecParallelHashTupleAlloc(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, HashJoinTableData::shared, and MinimalTupleData::t_len.

Referenced by ExecParallelHashJoinNewBatch().

◆ ExecParallelHashTableSetCurrentBatch()

void ExecParallelHashTableSetCurrentBatch ( HashJoinTable  hashtable,
int  batchno 
)

Definition at line 3364 of file nodeHash.c.

3365 {
3366  Assert(hashtable->batches[batchno].shared->buckets != InvalidDsaPointer);
3367 
3368  hashtable->curbatch = batchno;
3369  hashtable->buckets.shared = (dsa_pointer_atomic *)
3370  dsa_get_address(hashtable->area,
3371  hashtable->batches[batchno].shared->buckets);
3372  hashtable->nbuckets = hashtable->parallel_state->nbuckets;
3373  hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
3374  hashtable->current_chunk = NULL;
3376  hashtable->batches[batchno].at_least_one_chunk = false;
3377 }
dsa_pointer current_chunk_shared
Definition: hashjoin.h:362

References HashJoinTableData::area, Assert, ParallelHashJoinBatchAccessor::at_least_one_chunk, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, HashJoinTableData::buckets, HashJoinTableData::curbatch, HashJoinTableData::current_chunk, HashJoinTableData::current_chunk_shared, dsa_get_address(), InvalidDsaPointer, HashJoinTableData::log2_nbuckets, my_log2(), ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, HashJoinTableData::parallel_state, ParallelHashJoinBatchAccessor::shared, and HashJoinTableData::shared.

Referenced by ExecParallelHashIncreaseNumBatches(), ExecParallelHashIncreaseNumBuckets(), ExecParallelHashJoinNewBatch(), and MultiExecParallelHash().

◆ ExecParallelHashTupleAlloc()

static HashJoinTuple ExecParallelHashTupleAlloc ( HashJoinTable  hashtable,
size_t  size,
dsa_pointer shared 
)
static

Definition at line 2841 of file nodeHash.c.

2843 {
2844  ParallelHashJoinState *pstate = hashtable->parallel_state;
2845  dsa_pointer chunk_shared;
2847  Size chunk_size;
2848  HashJoinTuple result;
2849  int curbatch = hashtable->curbatch;
2850 
2851  size = MAXALIGN(size);
2852 
2853  /*
2854  * Fast path: if there is enough space in this backend's current chunk,
2855  * then we can allocate without any locking.
2856  */
2857  chunk = hashtable->current_chunk;
2858  if (chunk != NULL &&
2860  chunk->maxlen - chunk->used >= size)
2861  {
2862 
2863  chunk_shared = hashtable->current_chunk_shared;
2864  Assert(chunk == dsa_get_address(hashtable->area, chunk_shared));
2865  *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE + chunk->used;
2866  result = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + chunk->used);
2867  chunk->used += size;
2868 
2869  Assert(chunk->used <= chunk->maxlen);
2870  Assert(result == dsa_get_address(hashtable->area, *shared));
2871 
2872  return result;
2873  }
2874 
2875  /* Slow path: try to allocate a new chunk. */
2876  LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
2877 
2878  /*
2879  * Check if we need to help increase the number of buckets or batches.
2880  */
2881  if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES ||
2883  {
2884  ParallelHashGrowth growth = pstate->growth;
2885 
2886  hashtable->current_chunk = NULL;
2887  LWLockRelease(&pstate->lock);
2888 
2889  /* Another participant has commanded us to help grow. */
2890  if (growth == PHJ_GROWTH_NEED_MORE_BATCHES)
2892  else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS)
2894 
2895  /* The caller must retry. */
2896  return NULL;
2897  }
2898 
2899  /* Oversized tuples get their own chunk. */
2900  if (size > HASH_CHUNK_THRESHOLD)
2901  chunk_size = size + HASH_CHUNK_HEADER_SIZE;
2902  else
2903  chunk_size = HASH_CHUNK_SIZE;
2904 
2905  /* Check if it's time to grow batches or buckets. */
2906  if (pstate->growth != PHJ_GROWTH_DISABLED)
2907  {
2908  Assert(curbatch == 0);
2910 
2911  /*
2912  * Check if our space limit would be exceeded. To avoid choking on
2913  * very large tuples or very low hash_mem setting, we'll always allow
2914  * each backend to allocate at least one chunk.
2915  */
2916  if (hashtable->batches[0].at_least_one_chunk &&
2917  hashtable->batches[0].shared->size +
2918  chunk_size > pstate->space_allowed)
2919  {
2921  hashtable->batches[0].shared->space_exhausted = true;
2922  LWLockRelease(&pstate->lock);
2923 
2924  return NULL;
2925  }
2926 
2927  /* Check if our load factor limit would be exceeded. */
2928  if (hashtable->nbatch == 1)
2929  {
2930  hashtable->batches[0].shared->ntuples += hashtable->batches[0].ntuples;
2931  hashtable->batches[0].ntuples = 0;
2932  /* Guard against integer overflow and alloc size overflow */
2933  if (hashtable->batches[0].shared->ntuples + 1 >
2934  hashtable->nbuckets * NTUP_PER_BUCKET &&
2935  hashtable->nbuckets < (INT_MAX / 2) &&
2936  hashtable->nbuckets * 2 <=
2937  MaxAllocSize / sizeof(dsa_pointer_atomic))
2938  {
2940  LWLockRelease(&pstate->lock);
2941 
2942  return NULL;
2943  }
2944  }
2945  }
2946 
2947  /* We are cleared to allocate a new chunk. */
2948  chunk_shared = dsa_allocate(hashtable->area, chunk_size);
2949  hashtable->batches[curbatch].shared->size += chunk_size;
2950  hashtable->batches[curbatch].at_least_one_chunk = true;
2951 
2952  /* Set up the chunk. */
2953  chunk = (HashMemoryChunk) dsa_get_address(hashtable->area, chunk_shared);
2954  *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE;
2955  chunk->maxlen = chunk_size - HASH_CHUNK_HEADER_SIZE;
2956  chunk->used = size;
2957 
2958  /*
2959  * Push it onto the list of chunks, so that it can be found if we need to
2960  * increase the number of buckets or batches (batch 0 only) and later for
2961  * freeing the memory (all batches).
2962  */
2963  chunk->next.shared = hashtable->batches[curbatch].shared->chunks;
2964  hashtable->batches[curbatch].shared->chunks = chunk_shared;
2965 
2966  if (size <= HASH_CHUNK_THRESHOLD)
2967  {
2968  /*
2969  * Make this the current chunk so that we can use the fast path to
2970  * fill the rest of it up in future calls.
2971  */
2972  hashtable->current_chunk = chunk;
2973  hashtable->current_chunk_shared = chunk_shared;
2974  }
2975  LWLockRelease(&pstate->lock);
2976 
2977  Assert(HASH_CHUNK_DATA(chunk) == dsa_get_address(hashtable->area, *shared));
2978  result = (HashJoinTuple) HASH_CHUNK_DATA(chunk);
2979 
2980  return result;
2981 }
ParallelHashGrowth
Definition: hashjoin.h:231
@ PHJ_GROWTH_NEED_MORE_BUCKETS
Definition: hashjoin.h:235
static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
Definition: nodeHash.c:1515
static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
Definition: nodeHash.c:1063

References HashJoinTableData::area, Assert, ParallelHashJoinBatchAccessor::at_least_one_chunk, BarrierPhase(), HashJoinTableData::batches, ParallelHashJoinState::build_barrier, chunk, ParallelHashJoinBatch::chunks, HashJoinTableData::curbatch, HashJoinTableData::current_chunk, HashJoinTableData::current_chunk_shared, dsa_allocate, dsa_get_address(), ExecParallelHashIncreaseNumBatches(), ExecParallelHashIncreaseNumBuckets(), ParallelHashJoinState::growth, HASH_CHUNK_DATA, HASH_CHUNK_HEADER_SIZE, HASH_CHUNK_SIZE, HASH_CHUNK_THRESHOLD, ParallelHashJoinState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAXALIGN, MaxAllocSize, HashJoinTableData::nbatch, HashJoinTableData::nbuckets, NTUP_PER_BUCKET, ParallelHashJoinBatch::ntuples, ParallelHashJoinBatchAccessor::ntuples, HashJoinTableData::parallel_state, PHJ_BUILD_HASH_INNER, PHJ_GROWTH_DISABLED, PHJ_GROWTH_NEED_MORE_BATCHES, PHJ_GROWTH_NEED_MORE_BUCKETS, ParallelHashJoinBatchAccessor::shared, size, ParallelHashJoinBatch::size, ParallelHashJoinState::space_allowed, and ParallelHashJoinBatch::space_exhausted.

Referenced by ExecParallelHashRepartitionFirst(), ExecParallelHashTableInsert(), and ExecParallelHashTableInsertCurrentBatch().

◆ ExecParallelHashTuplePrealloc()

static bool ExecParallelHashTuplePrealloc ( HashJoinTable  hashtable,
int  batchno,
size_t  size 
)
static

Definition at line 3426 of file nodeHash.c.

3427 {
3428  ParallelHashJoinState *pstate = hashtable->parallel_state;
3429  ParallelHashJoinBatchAccessor *batch = &hashtable->batches[batchno];
3430  size_t want = Max(size, HASH_CHUNK_SIZE - HASH_CHUNK_HEADER_SIZE);
3431 
3432  Assert(batchno > 0);
3433  Assert(batchno < hashtable->nbatch);
3434  Assert(size == MAXALIGN(size));
3435 
3436  LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
3437 
3438  /* Has another participant commanded us to help grow? */
3439  if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES ||
3441  {
3442  ParallelHashGrowth growth = pstate->growth;
3443 
3444  LWLockRelease(&pstate->lock);
3445  if (growth == PHJ_GROWTH_NEED_MORE_BATCHES)
3447  else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS)
3449 
3450  return false;
3451  }
3452 
3453  if (pstate->growth != PHJ_GROWTH_DISABLED &&
3454  batch->at_least_one_chunk &&
3455  (batch->shared->estimated_size + want + HASH_CHUNK_HEADER_SIZE
3456  > pstate->space_allowed))
3457  {
3458  /*
3459  * We have determined that this batch would exceed the space budget if
3460  * loaded into memory. Command all participants to help repartition.
3461  */
3462  batch->shared->space_exhausted = true;
3464  LWLockRelease(&pstate->lock);
3465 
3466  return false;
3467  }
3468 
3469  batch->at_least_one_chunk = true;
3470  batch->shared->estimated_size += want + HASH_CHUNK_HEADER_SIZE;
3471  batch->preallocated = want;
3472  LWLockRelease(&pstate->lock);
3473 
3474  return true;
3475 }

References Assert, ParallelHashJoinBatchAccessor::at_least_one_chunk, HashJoinTableData::batches, ParallelHashJoinBatch::estimated_size, ExecParallelHashIncreaseNumBatches(), ExecParallelHashIncreaseNumBuckets(), ParallelHashJoinState::growth, HASH_CHUNK_HEADER_SIZE, HASH_CHUNK_SIZE, ParallelHashJoinState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), Max, MAXALIGN, HashJoinTableData::parallel_state, PHJ_GROWTH_DISABLED, PHJ_GROWTH_NEED_MORE_BATCHES, PHJ_GROWTH_NEED_MORE_BUCKETS, ParallelHashJoinBatchAccessor::preallocated, ParallelHashJoinBatchAccessor::shared, size, ParallelHashJoinState::space_allowed, and ParallelHashJoinBatch::space_exhausted.

Referenced by ExecParallelHashTableInsert().

◆ ExecParallelPrepHashTableForUnmatched()

bool ExecParallelPrepHashTableForUnmatched ( HashJoinState hjstate)

Definition at line 1990 of file nodeHash.c.

1991 {
1992  HashJoinTable hashtable = hjstate->hj_HashTable;
1993  int curbatch = hashtable->curbatch;
1994  ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
1995 
1997 
1998  /*
1999  * It would not be deadlock-free to wait on the batch barrier, because it
2000  * is in PHJ_BATCH_PROBE phase, and thus processes attached to it have
2001  * already emitted tuples. Therefore, we'll hold a wait-free election:
2002  * only one process can continue to the next phase, and all others detach
2003  * from this batch. They can still go any work on other batches, if there
2004  * are any.
2005  */
2007  {
2008  /* This process considers the batch to be done. */
2009  hashtable->batches[hashtable->curbatch].done = true;
2010 
2011  /* Make sure any temporary files are closed. */
2012  sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
2013  sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
2014 
2015  /*
2016  * Track largest batch we've seen, which would normally happen in
2017  * ExecHashTableDetachBatch().
2018  */
2019  hashtable->spacePeak =
2020  Max(hashtable->spacePeak,
2021  batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
2022  hashtable->curbatch = -1;
2023  return false;
2024  }
2025 
2026  /* Now we are alone with this batch. */
2028 
2029  /*
2030  * Has another process decided to give up early and command all processes
2031  * to skip the unmatched scan?
2032  */
2033  if (batch->skip_unmatched)
2034  {
2035  hashtable->batches[hashtable->curbatch].done = true;
2036  ExecHashTableDetachBatch(hashtable);
2037  return false;
2038  }
2039 
2040  /* Now prepare the process local state, just as for non-parallel join. */
2042 
2043  return true;
2044 }
void ExecHashTableDetachBatch(HashJoinTable hashtable)
Definition: nodeHash.c:3174
void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
Definition: nodeHash.c:1969
HashJoinTable hj_HashTable
Definition: execnodes.h:2228

References Assert, BarrierArriveAndDetachExceptLast(), BarrierPhase(), ParallelHashJoinBatch::batch_barrier, HashJoinTableData::batches, HashJoinTableData::curbatch, ParallelHashJoinBatchAccessor::done, ExecHashTableDetachBatch(), ExecPrepHashTableForUnmatched(), HashJoinState::hj_HashTable, ParallelHashJoinBatchAccessor::inner_tuples, Max, HashJoinTableData::nbuckets, ParallelHashJoinBatchAccessor::outer_tuples, PHJ_BATCH_PROBE, PHJ_BATCH_SCAN, ParallelHashJoinBatchAccessor::shared, ParallelHashJoinBatch::size, ParallelHashJoinBatch::skip_unmatched, HashJoinTableData::spacePeak, and sts_end_parallel_scan().

Referenced by ExecHashJoinImpl().

◆ ExecParallelScanHashBucket()

bool ExecParallelScanHashBucket ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 1918 of file nodeHash.c.

1920 {
1921  ExprState *hjclauses = hjstate->hashclauses;
1922  HashJoinTable hashtable = hjstate->hj_HashTable;
1923  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
1924  uint32 hashvalue = hjstate->hj_CurHashValue;
1925 
1926  /*
1927  * hj_CurTuple is the address of the tuple last returned from the current
1928  * bucket, or NULL if it's time to start scanning a new bucket.
1929  */
1930  if (hashTuple != NULL)
1931  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
1932  else
1933  hashTuple = ExecParallelHashFirstTuple(hashtable,
1934  hjstate->hj_CurBucketNo);
1935 
1936  while (hashTuple != NULL)
1937  {
1938  if (hashTuple->hashvalue == hashvalue)
1939  {
1940  TupleTableSlot *inntuple;
1941 
1942  /* insert hashtable's tuple into exec slot so ExecQual sees it */
1943  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
1944  hjstate->hj_HashTupleSlot,
1945  false); /* do not pfree */
1946  econtext->ecxt_innertuple = inntuple;
1947 
1948  if (ExecQualAndReset(hjclauses, econtext))
1949  {
1950  hjstate->hj_CurTuple = hashTuple;
1951  return true;
1952  }
1953  }
1954 
1955  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
1956  }
1957 
1958  /*
1959  * no match
1960  */
1961  return false;
1962 }
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1533
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
Definition: executor.h:451
static HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno)
Definition: nodeHash.c:3316
static HashJoinTuple ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple)
Definition: nodeHash.c:3332
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:260
HashJoinTuple hj_CurTuple
Definition: execnodes.h:2232
ExprState * hashclauses
Definition: execnodes.h:2226
uint32 hj_CurHashValue
Definition: execnodes.h:2229
int hj_CurBucketNo
Definition: execnodes.h:2230
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:2234

References ExprContext::ecxt_innertuple, ExecParallelHashFirstTuple(), ExecParallelHashNextTuple(), ExecQualAndReset(), ExecStoreMinimalTuple(), HashJoinState::hashclauses, HashJoinTupleData::hashvalue, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurHashValue, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, and HJTUPLE_MINTUPLE.

Referenced by ExecHashJoinImpl().

◆ ExecParallelScanHashTableForUnmatched()

bool ExecParallelScanHashTableForUnmatched ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 2129 of file nodeHash.c.

2131 {
2132  HashJoinTable hashtable = hjstate->hj_HashTable;
2133  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2134 
2135  for (;;)
2136  {
2137  /*
2138  * hj_CurTuple is the address of the tuple last returned from the
2139  * current bucket, or NULL if it's time to start scanning a new
2140  * bucket.
2141  */
2142  if (hashTuple != NULL)
2143  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2144  else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
2145  hashTuple = ExecParallelHashFirstTuple(hashtable,
2146  hjstate->hj_CurBucketNo++);
2147  else
2148  break; /* finished all buckets */
2149 
2150  while (hashTuple != NULL)
2151  {
2152  if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
2153  {
2154  TupleTableSlot *inntuple;
2155 
2156  /* insert hashtable's tuple into exec slot */
2157  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2158  hjstate->hj_HashTupleSlot,
2159  false); /* do not pfree */
2160  econtext->ecxt_innertuple = inntuple;
2161 
2162  /*
2163  * Reset temp memory each time; although this function doesn't
2164  * do any qual eval, the caller will, so let's keep it
2165  * parallel to ExecScanHashBucket.
2166  */
2167  ResetExprContext(econtext);
2168 
2169  hjstate->hj_CurTuple = hashTuple;
2170  return true;
2171  }
2172 
2173  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2174  }
2175 
2176  /* allow this loop to be cancellable */
2178  }
2179 
2180  /*
2181  * no more unmatched tuples
2182  */
2183  return false;
2184 }
#define ResetExprContext(econtext)
Definition: executor.h:555
#define HeapTupleHeaderHasMatch(tup)
Definition: htup_details.h:514

References CHECK_FOR_INTERRUPTS, ExprContext::ecxt_innertuple, ExecParallelHashFirstTuple(), ExecParallelHashNextTuple(), ExecStoreMinimalTuple(), HeapTupleHeaderHasMatch, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, HashJoinTableData::nbuckets, and ResetExprContext.

Referenced by ExecHashJoinImpl().

◆ ExecPrepHashTableForUnmatched()

void ExecPrepHashTableForUnmatched ( HashJoinState hjstate)

Definition at line 1969 of file nodeHash.c.

1970 {
1971  /*----------
1972  * During this scan we use the HashJoinState fields as follows:
1973  *
1974  * hj_CurBucketNo: next regular bucket to scan
1975  * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
1976  * hj_CurTuple: last tuple returned, or NULL to start next bucket
1977  *----------
1978  */
1979  hjstate->hj_CurBucketNo = 0;
1980  hjstate->hj_CurSkewBucketNo = 0;
1981  hjstate->hj_CurTuple = NULL;
1982 }
int hj_CurSkewBucketNo
Definition: execnodes.h:2231

References HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurSkewBucketNo, and HashJoinState::hj_CurTuple.

Referenced by ExecHashJoinImpl(), and ExecParallelPrepHashTableForUnmatched().

◆ ExecReScanHash()

void ExecReScanHash ( HashState node)

Definition at line 2246 of file nodeHash.c.

2247 {
2249 
2250  /*
2251  * if chgParam of subnode is not null then plan will be re-scanned by
2252  * first ExecProcNode.
2253  */
2254  if (outerPlan->chgParam == NULL)
2256 }
void ExecReScan(PlanState *node)
Definition: execAmi.c:76

References ExecReScan(), outerPlan, and outerPlanState.

Referenced by ExecReScan().

◆ ExecScanHashBucket()

bool ExecScanHashBucket ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 1857 of file nodeHash.c.

1859 {
1860  ExprState *hjclauses = hjstate->hashclauses;
1861  HashJoinTable hashtable = hjstate->hj_HashTable;
1862  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
1863  uint32 hashvalue = hjstate->hj_CurHashValue;
1864 
1865  /*
1866  * hj_CurTuple is the address of the tuple last returned from the current
1867  * bucket, or NULL if it's time to start scanning a new bucket.
1868  *
1869  * If the tuple hashed to a skew bucket then scan the skew bucket
1870  * otherwise scan the standard hashtable bucket.
1871  */
1872  if (hashTuple != NULL)
1873  hashTuple = hashTuple->next.unshared;
1874  else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
1875  hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
1876  else
1877  hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
1878 
1879  while (hashTuple != NULL)
1880  {
1881  if (hashTuple->hashvalue == hashvalue)
1882  {
1883  TupleTableSlot *inntuple;
1884 
1885  /* insert hashtable's tuple into exec slot so ExecQual sees it */
1886  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
1887  hjstate->hj_HashTupleSlot,
1888  false); /* do not pfree */
1889  econtext->ecxt_innertuple = inntuple;
1890 
1891  if (ExecQualAndReset(hjclauses, econtext))
1892  {
1893  hjstate->hj_CurTuple = hashTuple;
1894  return true;
1895  }
1896  }
1897 
1898  hashTuple = hashTuple->next.unshared;
1899  }
1900 
1901  /*
1902  * no match
1903  */
1904  return false;
1905 }

References HashJoinTableData::buckets, ExprContext::ecxt_innertuple, ExecQualAndReset(), ExecStoreMinimalTuple(), HashJoinState::hashclauses, HashJoinTupleData::hashvalue, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurHashValue, HashJoinState::hj_CurSkewBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, INVALID_SKEW_BUCKET_NO, HashJoinTupleData::next, HashJoinTableData::skewBucket, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

◆ ExecScanHashTableForUnmatched()

bool ExecScanHashTableForUnmatched ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 2055 of file nodeHash.c.

2056 {
2057  HashJoinTable hashtable = hjstate->hj_HashTable;
2058  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2059 
2060  for (;;)
2061  {
2062  /*
2063  * hj_CurTuple is the address of the tuple last returned from the
2064  * current bucket, or NULL if it's time to start scanning a new
2065  * bucket.
2066  */
2067  if (hashTuple != NULL)
2068  hashTuple = hashTuple->next.unshared;
2069  else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
2070  {
2071  hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
2072  hjstate->hj_CurBucketNo++;
2073  }
2074  else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
2075  {
2076  int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
2077 
2078  hashTuple = hashtable->skewBucket[j]->tuples;
2079  hjstate->hj_CurSkewBucketNo++;
2080  }
2081  else
2082  break; /* finished all buckets */
2083 
2084  while (hashTuple != NULL)
2085  {
2086  if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
2087  {
2088  TupleTableSlot *inntuple;
2089 
2090  /* insert hashtable's tuple into exec slot */
2091  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2092  hjstate->hj_HashTupleSlot,
2093  false); /* do not pfree */
2094  econtext->ecxt_innertuple = inntuple;
2095 
2096  /*
2097  * Reset temp memory each time; although this function doesn't
2098  * do any qual eval, the caller will, so let's keep it
2099  * parallel to ExecScanHashBucket.
2100  */
2101  ResetExprContext(econtext);
2102 
2103  hjstate->hj_CurTuple = hashTuple;
2104  return true;
2105  }
2106 
2107  hashTuple = hashTuple->next.unshared;
2108  }
2109 
2110  /* allow this loop to be cancellable */
2112  }
2113 
2114  /*
2115  * no more unmatched tuples
2116  */
2117  return false;
2118 }

References HashJoinTableData::buckets, CHECK_FOR_INTERRUPTS, ExprContext::ecxt_innertuple, ExecStoreMinimalTuple(), HeapTupleHeaderHasMatch, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurSkewBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, j, HashJoinTableData::nbuckets, HashJoinTupleData::next, HashJoinTableData::nSkewBuckets, ResetExprContext, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketNums, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

◆ ExecShutdownHash()

void ExecShutdownHash ( HashState node)

Definition at line 2696 of file nodeHash.c.

2697 {
2698  /* Allocate save space if EXPLAIN'ing and we didn't do so already */
2699  if (node->ps.instrument && !node->hinstrument)
2701  /* Now accumulate data for the current (final) hash table */
2702  if (node->hinstrument && node->hashtable)
2704 }
#define palloc0_object(type)
Definition: fe_memutils.h:75
void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
Definition: nodeHash.c:2742

References ExecHashAccumInstrumentation(), HashState::hashtable, HashState::hinstrument, PlanState::instrument, palloc0_object, and HashState::ps.

Referenced by ExecShutdownNode_walker().

◆ get_hash_memory_limit()

size_t get_hash_memory_limit ( void  )

Definition at line 3487 of file nodeHash.c.

3488 {
3489  double mem_limit;
3490 
3491  /* Do initial calculation in double arithmetic */
3492  mem_limit = (double) work_mem * hash_mem_multiplier * 1024.0;
3493 
3494  /* Clamp in case it doesn't fit in size_t */
3495  mem_limit = Min(mem_limit, (double) SIZE_MAX);
3496 
3497  return (size_t) mem_limit;
3498 }
double hash_mem_multiplier
Definition: globals.c:131
int work_mem
Definition: globals.c:130

References hash_mem_multiplier, Min, and work_mem.

Referenced by BuildTupleHashTableExt(), choose_hashed_setop(), consider_groupingsets_paths(), cost_memoize_rescan(), create_unique_path(), ExecChooseHashTableSize(), ExecInitMemoize(), ExecParallelHashIncreaseNumBatches(), final_cost_hashjoin(), hash_agg_set_limits(), hash_choose_num_partitions(), subpath_is_hashable(), and subplan_is_hashable().

◆ MultiExecHash()

Node* MultiExecHash ( HashState node)

Definition at line 105 of file nodeHash.c.

106 {
107  /* must provide our own instrumentation support */
108  if (node->ps.instrument)
110 
111  if (node->parallel_state != NULL)
112  MultiExecParallelHash(node);
113  else
114  MultiExecPrivateHash(node);
115 
116  /* must provide our own instrumentation support */
117  if (node->ps.instrument)
119 
120  /*
121  * We do not return the hash table directly because it's not a subtype of
122  * Node, and so would violate the MultiExecProcNode API. Instead, our
123  * parent Hashjoin node is expected to know how to fish it out of our node
124  * state. Ugly but not really worth cleaning up, since Hashjoin knows
125  * quite a bit more about Hash besides that.
126  */
127  return NULL;
128 }
void InstrStartNode(Instrumentation *instr)
Definition: instrument.c:68
void InstrStopNode(Instrumentation *instr, double nTuples)
Definition: instrument.c:84
static void MultiExecParallelHash(HashState *node)
Definition: nodeHash.c:219
static void MultiExecPrivateHash(HashState *node)
Definition: nodeHash.c:138
struct ParallelHashJoinState * parallel_state
Definition: execnodes.h:2803

References HashState::hashtable, InstrStartNode(), InstrStopNode(), PlanState::instrument, MultiExecParallelHash(), MultiExecPrivateHash(), HashState::parallel_state, HashJoinTableData::partialTuples, and HashState::ps.

Referenced by MultiExecProcNode().

◆ MultiExecParallelHash()

static void MultiExecParallelHash ( HashState node)
static

Definition at line 219 of file nodeHash.c.

220 {
221  ParallelHashJoinState *pstate;
222  PlanState *outerNode;
223  HashJoinTable hashtable;
224  TupleTableSlot *slot;
225  ExprContext *econtext;
226  uint32 hashvalue;
227  Barrier *build_barrier;
228  int i;
229 
230  /*
231  * get state info from node
232  */
233  outerNode = outerPlanState(node);
234  hashtable = node->hashtable;
235 
236  /*
237  * set expression context
238  */
239  econtext = node->ps.ps_ExprContext;
240 
241  /*
242  * Synchronize the parallel hash table build. At this stage we know that
243  * the shared hash table has been or is being set up by
244  * ExecHashTableCreate(), but we don't know if our peers have returned
245  * from there or are here in MultiExecParallelHash(), and if so how far
246  * through they are. To find out, we check the build_barrier phase then
247  * and jump to the right step in the build algorithm.
248  */
249  pstate = hashtable->parallel_state;
250  build_barrier = &pstate->build_barrier;
251  Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATE);
252  switch (BarrierPhase(build_barrier))
253  {
254  case PHJ_BUILD_ALLOCATE:
255 
256  /*
257  * Either I just allocated the initial hash table in
258  * ExecHashTableCreate(), or someone else is doing that. Either
259  * way, wait for everyone to arrive here so we can proceed.
260  */
261  BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATE);
262  /* Fall through. */
263 
265 
266  /*
267  * It's time to begin hashing, or if we just arrived here then
268  * hashing is already underway, so join in that effort. While
269  * hashing we have to be prepared to help increase the number of
270  * batches or buckets at any time, and if we arrived here when
271  * that was already underway we'll have to help complete that work
272  * immediately so that it's safe to access batches and buckets
273  * below.
274  */
283  for (;;)
284  {
285  bool isnull;
286 
287  slot = ExecProcNode(outerNode);
288  if (TupIsNull(slot))
289  break;
290  econtext->ecxt_outertuple = slot;
291 
292  ResetExprContext(econtext);
293 
295  econtext,
296  &isnull));
297 
298  if (!isnull)
299  ExecParallelHashTableInsert(hashtable, slot, hashvalue);
300  hashtable->partialTuples++;
301  }
302 
303  /*
304  * Make sure that any tuples we wrote to disk are visible to
305  * others before anyone tries to load them.
306  */
307  for (i = 0; i < hashtable->nbatch; ++i)
308  sts_end_write(hashtable->batches[i].inner_tuples);
309 
310  /*
311  * Update shared counters. We need an accurate total tuple count
312  * to control the empty table optimization.
313  */
315 
318 
319  /*
320  * Wait for everyone to finish building and flushing files and
321  * counters.
322  */
323  if (BarrierArriveAndWait(build_barrier,
324  WAIT_EVENT_HASH_BUILD_HASH_INNER))
325  {
326  /*
327  * Elect one backend to disable any further growth. Batches
328  * are now fixed. While building them we made sure they'd fit
329  * in our memory budget when we load them back in later (or we
330  * tried to do that and gave up because we detected extreme
331  * skew).
332  */
333  pstate->growth = PHJ_GROWTH_DISABLED;
334  }
335  }
336 
337  /*
338  * We're not yet attached to a batch. We all agree on the dimensions and
339  * number of inner tuples (for the empty table optimization).
340  */
341  hashtable->curbatch = -1;
342  hashtable->nbuckets = pstate->nbuckets;
343  hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
344  hashtable->totalTuples = pstate->total_tuples;
345 
346  /*
347  * Unless we're completely done and the batch state has been freed, make
348  * sure we have accessors.
349  */
350  if (BarrierPhase(build_barrier) < PHJ_BUILD_FREE)
352 
353  /*
354  * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE
355  * case, which will bring the build phase to PHJ_BUILD_RUN (if it isn't
356  * there already).
357  */
358  Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER ||
359  BarrierPhase(build_barrier) == PHJ_BUILD_RUN ||
360  BarrierPhase(build_barrier) == PHJ_BUILD_FREE);
361 }
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:359
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:273
#define PHJ_BUILD_HASH_OUTER
Definition: hashjoin.h:272
#define PHJ_BUILD_ALLOCATE
Definition: hashjoin.h:270
void ExecParallelHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
Definition: nodeHash.c:1704
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:262
ExprContext * ps_ExprContext
Definition: execnodes.h:1166
#define TupIsNull(slot)
Definition: tuptable.h:306

References Assert, BarrierArriveAndWait(), BarrierAttach(), BarrierDetach(), BarrierPhase(), HashJoinTableData::batches, ParallelHashJoinState::build_barrier, HashJoinTableData::curbatch, DatumGetUInt32(), ExprContext::ecxt_outertuple, ExecEvalExprSwitchContext(), ExecParallelHashEnsureBatchAccessors(), ExecParallelHashIncreaseNumBatches(), ExecParallelHashIncreaseNumBuckets(), ExecParallelHashMergeCounters(), ExecParallelHashTableInsert(), ExecParallelHashTableSetCurrentBatch(), ExecProcNode(), ParallelHashJoinState::grow_batches_barrier, ParallelHashJoinState::grow_buckets_barrier, ParallelHashJoinState::growth, HashState::hash_expr, HashState::hashtable, i, ParallelHashJoinBatchAccessor::inner_tuples, HashJoinTableData::log2_nbuckets, my_log2(), HashJoinTableData::nbatch, ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, outerPlanState, HashJoinTableData::parallel_state, HashJoinTableData::partialTuples, PHJ_BUILD_ALLOCATE, PHJ_BUILD_FREE, PHJ_BUILD_HASH_INNER, PHJ_BUILD_HASH_OUTER, PHJ_BUILD_RUN, PHJ_GROW_BATCHES_ELECT, PHJ_GROW_BATCHES_PHASE, PHJ_GROW_BUCKETS_ELECT, PHJ_GROW_BUCKETS_PHASE, PHJ_GROWTH_DISABLED, HashState::ps, PlanState::ps_ExprContext, ResetExprContext, sts_end_write(), ParallelHashJoinState::total_tuples, HashJoinTableData::totalTuples, and TupIsNull.

Referenced by MultiExecHash().

◆ MultiExecPrivateHash()

static void MultiExecPrivateHash ( HashState node)
static

Definition at line 138 of file nodeHash.c.

139 {
140  PlanState *outerNode;
141  HashJoinTable hashtable;
142  TupleTableSlot *slot;
143  ExprContext *econtext;
144 
145  /*
146  * get state info from node
147  */
148  outerNode = outerPlanState(node);
149  hashtable = node->hashtable;
150 
151  /*
152  * set expression context
153  */
154  econtext = node->ps.ps_ExprContext;
155 
156  /*
157  * Get all tuples from the node below the Hash node and insert into the
158  * hash table (or temp files).
159  */
160  for (;;)
161  {
162  bool isnull;
163  Datum hashdatum;
164 
165  slot = ExecProcNode(outerNode);
166  if (TupIsNull(slot))
167  break;
168  /* We have to compute the hash value */
169  econtext->ecxt_outertuple = slot;
170 
171  ResetExprContext(econtext);
172 
173  hashdatum = ExecEvalExprSwitchContext(node->hash_expr, econtext,
174  &isnull);
175 
176  if (!isnull)
177  {
178  uint32 hashvalue = DatumGetUInt32(hashdatum);
179  int bucketNumber;
180 
181  bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue);
182  if (bucketNumber != INVALID_SKEW_BUCKET_NO)
183  {
184  /* It's a skew tuple, so put it into that hash table */
185  ExecHashSkewTableInsert(hashtable, slot, hashvalue,
186  bucketNumber);
187  hashtable->skewTuples += 1;
188  }
189  else
190  {
191  /* Not subject to skew optimization, so insert normally */
192  ExecHashTableInsert(hashtable, slot, hashvalue);
193  }
194  hashtable->totalTuples += 1;
195  }
196  }
197 
198  /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
199  if (hashtable->nbuckets != hashtable->nbuckets_optimal)
200  ExecHashIncreaseNumBuckets(hashtable);
201 
202  /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
203  hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
204  if (hashtable->spaceUsed > hashtable->spacePeak)
205  hashtable->spacePeak = hashtable->spaceUsed;
206 
207  hashtable->partialTuples = hashtable->totalTuples;
208 }
static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
Definition: nodeHash.c:1452
int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
Definition: nodeHash.c:2420
static void ExecHashSkewTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue, int bucketNumber)
Definition: nodeHash.c:2466
void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
Definition: nodeHash.c:1614
uintptr_t Datum
Definition: postgres.h:64

References DatumGetUInt32(), ExprContext::ecxt_outertuple, ExecEvalExprSwitchContext(), ExecHashGetSkewBucket(), ExecHashIncreaseNumBuckets(), ExecHashSkewTableInsert(), ExecHashTableInsert(), ExecProcNode(), HashState::hash_expr, HashState::hashtable, INVALID_SKEW_BUCKET_NO, HashJoinTableData::nbuckets, HashJoinTableData::nbuckets_optimal, outerPlanState, HashJoinTableData::partialTuples, HashState::ps, PlanState::ps_ExprContext, ResetExprContext, HashJoinTableData::skewTuples, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, HashJoinTableData::totalTuples, and TupIsNull.

Referenced by MultiExecHash().