PostgreSQL Source Code  git master
nodeHash.c File Reference
#include "postgres.h"
#include <math.h>
#include <limits.h>
#include "access/htup_details.h"
#include "access/parallel.h"
#include "catalog/pg_statistic.h"
#include "commands/tablespace.h"
#include "executor/execdebug.h"
#include "executor/hashjoin.h"
#include "executor/nodeHash.h"
#include "executor/nodeHashjoin.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "port/atomics.h"
#include "port/pg_bitutils.h"
#include "utils/dynahash.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
Include dependency graph for nodeHash.c:

Go to the source code of this file.

Macros

#define NTUP_PER_BUCKET   1
 

Functions

static void ExecHashIncreaseNumBatches (HashJoinTable hashtable)
 
static void ExecHashIncreaseNumBuckets (HashJoinTable hashtable)
 
static void ExecParallelHashIncreaseNumBatches (HashJoinTable hashtable)
 
static void ExecParallelHashIncreaseNumBuckets (HashJoinTable hashtable)
 
static void ExecHashBuildSkewHash (HashJoinTable hashtable, Hash *node, int mcvsToUse)
 
static void ExecHashSkewTableInsert (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue, int bucketNumber)
 
static void ExecHashRemoveNextSkewBucket (HashJoinTable hashtable)
 
static void * dense_alloc (HashJoinTable hashtable, Size size)
 
static HashJoinTuple ExecParallelHashTupleAlloc (HashJoinTable hashtable, size_t size, dsa_pointer *shared)
 
static void MultiExecPrivateHash (HashState *node)
 
static void MultiExecParallelHash (HashState *node)
 
static HashJoinTuple ExecParallelHashFirstTuple (HashJoinTable hashtable, int bucketno)
 
static HashJoinTuple ExecParallelHashNextTuple (HashJoinTable hashtable, HashJoinTuple tuple)
 
static void ExecParallelHashPushTuple (dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
 
static void ExecParallelHashJoinSetUpBatches (HashJoinTable hashtable, int nbatch)
 
static void ExecParallelHashEnsureBatchAccessors (HashJoinTable hashtable)
 
static void ExecParallelHashRepartitionFirst (HashJoinTable hashtable)
 
static void ExecParallelHashRepartitionRest (HashJoinTable hashtable)
 
static HashMemoryChunk ExecParallelHashPopChunkQueue (HashJoinTable hashtable, dsa_pointer *shared)
 
static bool ExecParallelHashTuplePrealloc (HashJoinTable hashtable, int batchno, size_t size)
 
static void ExecParallelHashMergeCounters (HashJoinTable hashtable)
 
static void ExecParallelHashCloseBatchAccessors (HashJoinTable hashtable)
 
static TupleTableSlotExecHash (PlanState *pstate)
 
NodeMultiExecHash (HashState *node)
 
HashStateExecInitHash (Hash *node, EState *estate, int eflags)
 
void ExecEndHash (HashState *node)
 
HashJoinTable ExecHashTableCreate (HashState *state, List *hashOperators, List *hashCollations, bool keepNulls)
 
void ExecChooseHashTableSize (double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
 
void ExecHashTableDestroy (HashJoinTable hashtable)
 
void ExecHashTableInsert (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
void ExecParallelHashTableInsert (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
void ExecParallelHashTableInsertCurrentBatch (HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
 
bool ExecHashGetHashValue (HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, bool outer_tuple, bool keep_nulls, uint32 *hashvalue)
 
void ExecHashGetBucketAndBatch (HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
 
bool ExecScanHashBucket (HashJoinState *hjstate, ExprContext *econtext)
 
bool ExecParallelScanHashBucket (HashJoinState *hjstate, ExprContext *econtext)
 
void ExecPrepHashTableForUnmatched (HashJoinState *hjstate)
 
bool ExecParallelPrepHashTableForUnmatched (HashJoinState *hjstate)
 
bool ExecScanHashTableForUnmatched (HashJoinState *hjstate, ExprContext *econtext)
 
bool ExecParallelScanHashTableForUnmatched (HashJoinState *hjstate, ExprContext *econtext)
 
void ExecHashTableReset (HashJoinTable hashtable)
 
void ExecHashTableResetMatchFlags (HashJoinTable hashtable)
 
void ExecReScanHash (HashState *node)
 
int ExecHashGetSkewBucket (HashJoinTable hashtable, uint32 hashvalue)
 
void ExecHashEstimate (HashState *node, ParallelContext *pcxt)
 
void ExecHashInitializeDSM (HashState *node, ParallelContext *pcxt)
 
void ExecHashInitializeWorker (HashState *node, ParallelWorkerContext *pwcxt)
 
void ExecShutdownHash (HashState *node)
 
void ExecHashRetrieveInstrumentation (HashState *node)
 
void ExecHashAccumInstrumentation (HashInstrumentation *instrument, HashJoinTable hashtable)
 
void ExecParallelHashTableAlloc (HashJoinTable hashtable, int batchno)
 
void ExecHashTableDetachBatch (HashJoinTable hashtable)
 
void ExecHashTableDetach (HashJoinTable hashtable)
 
void ExecParallelHashTableSetCurrentBatch (HashJoinTable hashtable, int batchno)
 
size_t get_hash_memory_limit (void)
 

Macro Definition Documentation

◆ NTUP_PER_BUCKET

#define NTUP_PER_BUCKET   1

Definition at line 673 of file nodeHash.c.

Function Documentation

◆ dense_alloc()

static void * dense_alloc ( HashJoinTable  hashtable,
Size  size 
)
static

Definition at line 2861 of file nodeHash.c.

2862 {
2863  HashMemoryChunk newChunk;
2864  char *ptr;
2865 
2866  /* just in case the size is not already aligned properly */
2867  size = MAXALIGN(size);
2868 
2869  /*
2870  * If tuple size is larger than threshold, allocate a separate chunk.
2871  */
2872  if (size > HASH_CHUNK_THRESHOLD)
2873  {
2874  /* allocate new chunk and put it at the beginning of the list */
2875  newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt,
2876  HASH_CHUNK_HEADER_SIZE + size);
2877  newChunk->maxlen = size;
2878  newChunk->used = size;
2879  newChunk->ntuples = 1;
2880 
2881  /*
2882  * Add this chunk to the list after the first existing chunk, so that
2883  * we don't lose the remaining space in the "current" chunk.
2884  */
2885  if (hashtable->chunks != NULL)
2886  {
2887  newChunk->next = hashtable->chunks->next;
2888  hashtable->chunks->next.unshared = newChunk;
2889  }
2890  else
2891  {
2892  newChunk->next.unshared = hashtable->chunks;
2893  hashtable->chunks = newChunk;
2894  }
2895 
2896  return HASH_CHUNK_DATA(newChunk);
2897  }
2898 
2899  /*
2900  * See if we have enough space for it in the current chunk (if any). If
2901  * not, allocate a fresh chunk.
2902  */
2903  if ((hashtable->chunks == NULL) ||
2904  (hashtable->chunks->maxlen - hashtable->chunks->used) < size)
2905  {
2906  /* allocate new chunk and put it at the beginning of the list */
2907  newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt,
2909 
2910  newChunk->maxlen = HASH_CHUNK_SIZE;
2911  newChunk->used = size;
2912  newChunk->ntuples = 1;
2913 
2914  newChunk->next.unshared = hashtable->chunks;
2915  hashtable->chunks = newChunk;
2916 
2917  return HASH_CHUNK_DATA(newChunk);
2918  }
2919 
2920  /* There is enough space in the current chunk, let's add the tuple */
2921  ptr = HASH_CHUNK_DATA(hashtable->chunks) + hashtable->chunks->used;
2922  hashtable->chunks->used += size;
2923  hashtable->chunks->ntuples += 1;
2924 
2925  /* return pointer to the start of the tuple memory */
2926  return ptr;
2927 }
#define MAXALIGN(LEN)
Definition: c.h:800
struct HashMemoryChunkData * HashMemoryChunk
Definition: hashjoin.h:148
#define HASH_CHUNK_DATA(hc)
Definition: hashjoin.h:152
#define HASH_CHUNK_THRESHOLD
Definition: hashjoin.h:154
#define HASH_CHUNK_HEADER_SIZE
Definition: hashjoin.h:151
#define HASH_CHUNK_SIZE
Definition: hashjoin.h:150
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1021
HashMemoryChunk chunks
Definition: hashjoin.h:367
MemoryContext batchCxt
Definition: hashjoin.h:363
struct HashMemoryChunkData * unshared
Definition: hashjoin.h:137
union HashMemoryChunkData::@98 next

References HashJoinTableData::batchCxt, HashJoinTableData::chunks, HASH_CHUNK_DATA, HASH_CHUNK_HEADER_SIZE, HASH_CHUNK_SIZE, HASH_CHUNK_THRESHOLD, MAXALIGN, HashMemoryChunkData::maxlen, MemoryContextAlloc(), HashMemoryChunkData::next, HashMemoryChunkData::ntuples, HashMemoryChunkData::unshared, and HashMemoryChunkData::used.

Referenced by ExecHashIncreaseNumBatches(), ExecHashRemoveNextSkewBucket(), and ExecHashTableInsert().

◆ ExecChooseHashTableSize()

void ExecChooseHashTableSize ( double  ntuples,
int  tupwidth,
bool  useskew,
bool  try_combined_hash_mem,
int  parallel_workers,
size_t *  space_allowed,
int *  numbuckets,
int *  numbatches,
int *  num_skew_mcvs 
)

Definition at line 676 of file nodeHash.c.

683 {
684  int tupsize;
685  double inner_rel_bytes;
686  size_t hash_table_bytes;
687  size_t bucket_bytes;
688  size_t max_pointers;
689  int nbatch = 1;
690  int nbuckets;
691  double dbuckets;
692 
693  /* Force a plausible relation size if no info */
694  if (ntuples <= 0.0)
695  ntuples = 1000.0;
696 
697  /*
698  * Estimate tupsize based on footprint of tuple in hashtable... note this
699  * does not allow for any palloc overhead. The manipulations of spaceUsed
700  * don't count palloc overhead either.
701  */
702  tupsize = HJTUPLE_OVERHEAD +
704  MAXALIGN(tupwidth);
705  inner_rel_bytes = ntuples * tupsize;
706 
707  /*
708  * Compute in-memory hashtable size limit from GUCs.
709  */
710  hash_table_bytes = get_hash_memory_limit();
711 
712  /*
713  * Parallel Hash tries to use the combined hash_mem of all workers to
714  * avoid the need to batch. If that won't work, it falls back to hash_mem
715  * per worker and tries to process batches in parallel.
716  */
717  if (try_combined_hash_mem)
718  {
719  /* Careful, this could overflow size_t */
720  double newlimit;
721 
722  newlimit = (double) hash_table_bytes * (double) (parallel_workers + 1);
723  newlimit = Min(newlimit, (double) SIZE_MAX);
724  hash_table_bytes = (size_t) newlimit;
725  }
726 
727  *space_allowed = hash_table_bytes;
728 
729  /*
730  * If skew optimization is possible, estimate the number of skew buckets
731  * that will fit in the memory allowed, and decrement the assumed space
732  * available for the main hash table accordingly.
733  *
734  * We make the optimistic assumption that each skew bucket will contain
735  * one inner-relation tuple. If that turns out to be low, we will recover
736  * at runtime by reducing the number of skew buckets.
737  *
738  * hashtable->skewBucket will have up to 8 times as many HashSkewBucket
739  * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash
740  * will round up to the next power of 2 and then multiply by 4 to reduce
741  * collisions.
742  */
743  if (useskew)
744  {
745  size_t bytes_per_mcv;
746  size_t skew_mcvs;
747 
748  /*----------
749  * Compute number of MCVs we could hold in hash_table_bytes
750  *
751  * Divisor is:
752  * size of a hash tuple +
753  * worst-case size of skewBucket[] per MCV +
754  * size of skewBucketNums[] entry +
755  * size of skew bucket struct itself
756  *----------
757  */
758  bytes_per_mcv = tupsize +
759  (8 * sizeof(HashSkewBucket *)) +
760  sizeof(int) +
762  skew_mcvs = hash_table_bytes / bytes_per_mcv;
763 
764  /*
765  * Now scale by SKEW_HASH_MEM_PERCENT (we do it in this order so as
766  * not to worry about size_t overflow in the multiplication)
767  */
768  skew_mcvs = (skew_mcvs * SKEW_HASH_MEM_PERCENT) / 100;
769 
770  /* Now clamp to integer range */
771  skew_mcvs = Min(skew_mcvs, INT_MAX);
772 
773  *num_skew_mcvs = (int) skew_mcvs;
774 
775  /* Reduce hash_table_bytes by the amount needed for the skew table */
776  if (skew_mcvs > 0)
777  hash_table_bytes -= skew_mcvs * bytes_per_mcv;
778  }
779  else
780  *num_skew_mcvs = 0;
781 
782  /*
783  * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when
784  * memory is filled, assuming a single batch; but limit the value so that
785  * the pointer arrays we'll try to allocate do not exceed hash_table_bytes
786  * nor MaxAllocSize.
787  *
788  * Note that both nbuckets and nbatch must be powers of 2 to make
789  * ExecHashGetBucketAndBatch fast.
790  */
791  max_pointers = hash_table_bytes / sizeof(HashJoinTuple);
792  max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));
793  /* If max_pointers isn't a power of 2, must round it down to one */
794  max_pointers = pg_prevpower2_size_t(max_pointers);
795 
796  /* Also ensure we avoid integer overflow in nbatch and nbuckets */
797  /* (this step is redundant given the current value of MaxAllocSize) */
798  max_pointers = Min(max_pointers, INT_MAX / 2 + 1);
799 
800  dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
801  dbuckets = Min(dbuckets, max_pointers);
802  nbuckets = (int) dbuckets;
803  /* don't let nbuckets be really small, though ... */
804  nbuckets = Max(nbuckets, 1024);
805  /* ... and force it to be a power of 2. */
806  nbuckets = pg_nextpower2_32(nbuckets);
807 
808  /*
809  * If there's not enough space to store the projected number of tuples and
810  * the required bucket headers, we will need multiple batches.
811  */
812  bucket_bytes = sizeof(HashJoinTuple) * nbuckets;
813  if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
814  {
815  /* We'll need multiple batches */
816  size_t sbuckets;
817  double dbatch;
818  int minbatch;
819  size_t bucket_size;
820 
821  /*
822  * If Parallel Hash with combined hash_mem would still need multiple
823  * batches, we'll have to fall back to regular hash_mem budget.
824  */
825  if (try_combined_hash_mem)
826  {
827  ExecChooseHashTableSize(ntuples, tupwidth, useskew,
828  false, parallel_workers,
829  space_allowed,
830  numbuckets,
831  numbatches,
832  num_skew_mcvs);
833  return;
834  }
835 
836  /*
837  * Estimate the number of buckets we'll want to have when hash_mem is
838  * entirely full. Each bucket will contain a bucket pointer plus
839  * NTUP_PER_BUCKET tuples, whose projected size already includes
840  * overhead for the hash code, pointer to the next tuple, etc.
841  */
842  bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple));
843  if (hash_table_bytes <= bucket_size)
844  sbuckets = 1; /* avoid pg_nextpower2_size_t(0) */
845  else
846  sbuckets = pg_nextpower2_size_t(hash_table_bytes / bucket_size);
847  sbuckets = Min(sbuckets, max_pointers);
848  nbuckets = (int) sbuckets;
849  nbuckets = pg_nextpower2_32(nbuckets);
850  bucket_bytes = nbuckets * sizeof(HashJoinTuple);
851 
852  /*
853  * Buckets are simple pointers to hashjoin tuples, while tupsize
854  * includes the pointer, hash code, and MinimalTupleData. So buckets
855  * should never really exceed 25% of hash_mem (even for
856  * NTUP_PER_BUCKET=1); except maybe for hash_mem values that are not
857  * 2^N bytes, where we might get more because of doubling. So let's
858  * look for 50% here.
859  */
860  Assert(bucket_bytes <= hash_table_bytes / 2);
861 
862  /* Calculate required number of batches. */
863  dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
864  dbatch = Min(dbatch, max_pointers);
865  minbatch = (int) dbatch;
866  nbatch = pg_nextpower2_32(Max(2, minbatch));
867  }
868 
869  Assert(nbuckets > 0);
870  Assert(nbatch > 0);
871 
872  *numbuckets = nbuckets;
873  *numbatches = nbatch;
874 }
#define Min(x, y)
Definition: c.h:993
#define Max(x, y)
Definition: c.h:987
struct HashJoinTupleData * HashJoinTuple
Definition: execnodes.h:2100
#define HJTUPLE_OVERHEAD
Definition: hashjoin.h:90
#define SKEW_BUCKET_OVERHEAD
Definition: hashjoin.h:119
#define SKEW_HASH_MEM_PERCENT
Definition: hashjoin.h:121
#define SizeofMinimalTupleHeader
Definition: htup_details.h:647
Assert(fmt[strlen(fmt) - 1] !='\n')
#define MaxAllocSize
Definition: memutils.h:40
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
Definition: nodeHash.c:676
#define NTUP_PER_BUCKET
Definition: nodeHash.c:673
size_t get_hash_memory_limit(void)
Definition: nodeHash.c:3587
static uint32 pg_nextpower2_32(uint32 num)
Definition: pg_bitutils.h:189
#define pg_nextpower2_size_t
Definition: pg_bitutils.h:339
#define pg_prevpower2_size_t
Definition: pg_bitutils.h:340

References Assert(), get_hash_memory_limit(), HJTUPLE_OVERHEAD, Max, MAXALIGN, MaxAllocSize, Min, NTUP_PER_BUCKET, pg_nextpower2_32(), pg_nextpower2_size_t, pg_prevpower2_size_t, SizeofMinimalTupleHeader, SKEW_BUCKET_OVERHEAD, and SKEW_HASH_MEM_PERCENT.

Referenced by ExecHashTableCreate(), and initial_cost_hashjoin().

◆ ExecEndHash()

void ExecEndHash ( HashState node)

Definition at line 414 of file nodeHash.c.

415 {
417 
418  /*
419  * shut down the subplan
420  */
421  outerPlan = outerPlanState(node);
423 }
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:557
#define outerPlanState(node)
Definition: execnodes.h:1133
#define outerPlan(node)
Definition: plannodes.h:183

References ExecEndNode(), outerPlan, and outerPlanState.

Referenced by ExecEndNode().

◆ ExecHash()

static TupleTableSlot* ExecHash ( PlanState pstate)
static

Definition at line 92 of file nodeHash.c.

93 {
94  elog(ERROR, "Hash node does not support ExecProcNode call convention");
95  return NULL;
96 }
#define ERROR
Definition: elog.h:39

References elog(), and ERROR.

Referenced by ExecInitHash().

◆ ExecHashAccumInstrumentation()

void ExecHashAccumInstrumentation ( HashInstrumentation instrument,
HashJoinTable  hashtable 
)

Definition at line 2842 of file nodeHash.c.

2844 {
2845  instrument->nbuckets = Max(instrument->nbuckets,
2846  hashtable->nbuckets);
2847  instrument->nbuckets_original = Max(instrument->nbuckets_original,
2848  hashtable->nbuckets_original);
2849  instrument->nbatch = Max(instrument->nbatch,
2850  hashtable->nbatch);
2851  instrument->nbatch_original = Max(instrument->nbatch_original,
2852  hashtable->nbatch_original);
2853  instrument->space_peak = Max(instrument->space_peak,
2854  hashtable->spacePeak);
2855 }

References Max, HashJoinTableData::nbatch, HashInstrumentation::nbatch, HashJoinTableData::nbatch_original, HashInstrumentation::nbatch_original, HashJoinTableData::nbuckets, HashInstrumentation::nbuckets, HashJoinTableData::nbuckets_original, HashInstrumentation::nbuckets_original, HashInstrumentation::space_peak, and HashJoinTableData::spacePeak.

Referenced by ExecReScanHashJoin(), and ExecShutdownHash().

◆ ExecHashBuildSkewHash()

static void ExecHashBuildSkewHash ( HashJoinTable  hashtable,
Hash node,
int  mcvsToUse 
)
static

Definition at line 2367 of file nodeHash.c.

2368 {
2369  HeapTupleData *statsTuple;
2370  AttStatsSlot sslot;
2371 
2372  /* Do nothing if planner didn't identify the outer relation's join key */
2373  if (!OidIsValid(node->skewTable))
2374  return;
2375  /* Also, do nothing if we don't have room for at least one skew bucket */
2376  if (mcvsToUse <= 0)
2377  return;
2378 
2379  /*
2380  * Try to find the MCV statistics for the outer relation's join key.
2381  */
2382  statsTuple = SearchSysCache3(STATRELATTINH,
2383  ObjectIdGetDatum(node->skewTable),
2384  Int16GetDatum(node->skewColumn),
2385  BoolGetDatum(node->skewInherit));
2386  if (!HeapTupleIsValid(statsTuple))
2387  return;
2388 
2389  if (get_attstatsslot(&sslot, statsTuple,
2390  STATISTIC_KIND_MCV, InvalidOid,
2392  {
2393  double frac;
2394  int nbuckets;
2395  FmgrInfo *hashfunctions;
2396  int i;
2397 
2398  if (mcvsToUse > sslot.nvalues)
2399  mcvsToUse = sslot.nvalues;
2400 
2401  /*
2402  * Calculate the expected fraction of outer relation that will
2403  * participate in the skew optimization. If this isn't at least
2404  * SKEW_MIN_OUTER_FRACTION, don't use skew optimization.
2405  */
2406  frac = 0;
2407  for (i = 0; i < mcvsToUse; i++)
2408  frac += sslot.numbers[i];
2409  if (frac < SKEW_MIN_OUTER_FRACTION)
2410  {
2411  free_attstatsslot(&sslot);
2412  ReleaseSysCache(statsTuple);
2413  return;
2414  }
2415 
2416  /*
2417  * Okay, set up the skew hashtable.
2418  *
2419  * skewBucket[] is an open addressing hashtable with a power of 2 size
2420  * that is greater than the number of MCV values. (This ensures there
2421  * will be at least one null entry, so searches will always
2422  * terminate.)
2423  *
2424  * Note: this code could fail if mcvsToUse exceeds INT_MAX/8 or
2425  * MaxAllocSize/sizeof(void *)/8, but that is not currently possible
2426  * since we limit pg_statistic entries to much less than that.
2427  */
2428  nbuckets = pg_nextpower2_32(mcvsToUse + 1);
2429  /* use two more bits just to help avoid collisions */
2430  nbuckets <<= 2;
2431 
2432  hashtable->skewEnabled = true;
2433  hashtable->skewBucketLen = nbuckets;
2434 
2435  /*
2436  * We allocate the bucket memory in the hashtable's batch context. It
2437  * is only needed during the first batch, and this ensures it will be
2438  * automatically removed once the first batch is done.
2439  */
2440  hashtable->skewBucket = (HashSkewBucket **)
2441  MemoryContextAllocZero(hashtable->batchCxt,
2442  nbuckets * sizeof(HashSkewBucket *));
2443  hashtable->skewBucketNums = (int *)
2444  MemoryContextAllocZero(hashtable->batchCxt,
2445  mcvsToUse * sizeof(int));
2446 
2447  hashtable->spaceUsed += nbuckets * sizeof(HashSkewBucket *)
2448  + mcvsToUse * sizeof(int);
2449  hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
2450  + mcvsToUse * sizeof(int);
2451  if (hashtable->spaceUsed > hashtable->spacePeak)
2452  hashtable->spacePeak = hashtable->spaceUsed;
2453 
2454  /*
2455  * Create a skew bucket for each MCV hash value.
2456  *
2457  * Note: it is very important that we create the buckets in order of
2458  * decreasing MCV frequency. If we have to remove some buckets, they
2459  * must be removed in reverse order of creation (see notes in
2460  * ExecHashRemoveNextSkewBucket) and we want the least common MCVs to
2461  * be removed first.
2462  */
2463  hashfunctions = hashtable->outer_hashfunctions;
2464 
2465  for (i = 0; i < mcvsToUse; i++)
2466  {
2467  uint32 hashvalue;
2468  int bucket;
2469 
2470  hashvalue = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[0],
2471  hashtable->collations[0],
2472  sslot.values[i]));
2473 
2474  /*
2475  * While we have not hit a hole in the hashtable and have not hit
2476  * the desired bucket, we have collided with some previous hash
2477  * value, so try the next bucket location. NB: this code must
2478  * match ExecHashGetSkewBucket.
2479  */
2480  bucket = hashvalue & (nbuckets - 1);
2481  while (hashtable->skewBucket[bucket] != NULL &&
2482  hashtable->skewBucket[bucket]->hashvalue != hashvalue)
2483  bucket = (bucket + 1) & (nbuckets - 1);
2484 
2485  /*
2486  * If we found an existing bucket with the same hashvalue, leave
2487  * it alone. It's okay for two MCVs to share a hashvalue.
2488  */
2489  if (hashtable->skewBucket[bucket] != NULL)
2490  continue;
2491 
2492  /* Okay, create a new skew bucket for this hashvalue. */
2493  hashtable->skewBucket[bucket] = (HashSkewBucket *)
2494  MemoryContextAlloc(hashtable->batchCxt,
2495  sizeof(HashSkewBucket));
2496  hashtable->skewBucket[bucket]->hashvalue = hashvalue;
2497  hashtable->skewBucket[bucket]->tuples = NULL;
2498  hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket;
2499  hashtable->nSkewBuckets++;
2500  hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
2501  hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
2502  if (hashtable->spaceUsed > hashtable->spacePeak)
2503  hashtable->spacePeak = hashtable->spaceUsed;
2504  }
2505 
2506  free_attstatsslot(&sslot);
2507  }
2508 
2509  ReleaseSysCache(statsTuple);
2510 }
unsigned int uint32
Definition: c.h:495
#define OidIsValid(objectId)
Definition: c.h:764
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
Definition: fmgr.c:1112
#define SKEW_MIN_OUTER_FRACTION
Definition: hashjoin.h:122
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
void free_attstatsslot(AttStatsSlot *sslot)
Definition: lsyscache.c:3326
bool get_attstatsslot(AttStatsSlot *sslot, HeapTuple statstuple, int reqkind, Oid reqop, int flags)
Definition: lsyscache.c:3216
#define ATTSTATSSLOT_NUMBERS
Definition: lsyscache.h:43
#define ATTSTATSSLOT_VALUES
Definition: lsyscache.h:42
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1064
static uint32 DatumGetUInt32(Datum X)
Definition: postgres.h:222
static Datum Int16GetDatum(int16 X)
Definition: postgres.h:172
static Datum BoolGetDatum(bool X)
Definition: postgres.h:102
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
#define InvalidOid
Definition: postgres_ext.h:36
Datum * values
Definition: lsyscache.h:53
float4 * numbers
Definition: lsyscache.h:56
Definition: fmgr.h:57
FmgrInfo * outer_hashfunctions
Definition: hashjoin.h:351
int * skewBucketNums
Definition: hashjoin.h:322
HashSkewBucket ** skewBucket
Definition: hashjoin.h:319
HashJoinTuple tuples
Definition: hashjoin.h:116
uint32 hashvalue
Definition: hashjoin.h:115
AttrNumber skewColumn
Definition: plannodes.h:1205
Oid skewTable
Definition: plannodes.h:1204
bool skewInherit
Definition: plannodes.h:1206
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:868
HeapTuple SearchSysCache3(int cacheId, Datum key1, Datum key2, Datum key3)
Definition: syscache.c:842
@ STATRELATTINH
Definition: syscache.h:97

References ATTSTATSSLOT_NUMBERS, ATTSTATSSLOT_VALUES, HashJoinTableData::batchCxt, BoolGetDatum(), HashJoinTableData::collations, DatumGetUInt32(), free_attstatsslot(), FunctionCall1Coll(), get_attstatsslot(), HashSkewBucket::hashvalue, HeapTupleIsValid, i, if(), Int16GetDatum(), InvalidOid, MemoryContextAlloc(), MemoryContextAllocZero(), HashJoinTableData::nSkewBuckets, AttStatsSlot::numbers, AttStatsSlot::nvalues, ObjectIdGetDatum(), OidIsValid, HashJoinTableData::outer_hashfunctions, pg_nextpower2_32(), ReleaseSysCache(), SearchSysCache3(), SKEW_BUCKET_OVERHEAD, SKEW_MIN_OUTER_FRACTION, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketLen, HashJoinTableData::skewBucketNums, Hash::skewColumn, HashJoinTableData::skewEnabled, Hash::skewInherit, Hash::skewTable, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, HashJoinTableData::spaceUsedSkew, STATRELATTINH, HashSkewBucket::tuples, and AttStatsSlot::values.

Referenced by ExecHashTableCreate().

◆ ExecHashEstimate()

void ExecHashEstimate ( HashState node,
ParallelContext pcxt 
)

Definition at line 2726 of file nodeHash.c.

2727 {
2728  size_t size;
2729 
2730  /* don't need this if not instrumenting or no workers */
2731  if (!node->ps.instrument || pcxt->nworkers == 0)
2732  return;
2733 
2734  size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation));
2735  size = add_size(size, offsetof(SharedHashInfo, hinstrument));
2736  shm_toc_estimate_chunk(&pcxt->estimator, size);
2737  shm_toc_estimate_keys(&pcxt->estimator, 1);
2738 }
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
PlanState ps
Definition: execnodes.h:2661
shm_toc_estimator estimator
Definition: parallel.h:42
Instrumentation * instrument
Definition: execnodes.h:1047

References add_size(), ParallelContext::estimator, PlanState::instrument, mul_size(), ParallelContext::nworkers, HashState::ps, shm_toc_estimate_chunk, and shm_toc_estimate_keys.

Referenced by ExecParallelEstimate().

◆ ExecHashGetBucketAndBatch()

void ExecHashGetBucketAndBatch ( HashJoinTable  hashtable,
uint32  hashvalue,
int *  bucketno,
int *  batchno 
)

Definition at line 1924 of file nodeHash.c.

1928 {
1929  uint32 nbuckets = (uint32) hashtable->nbuckets;
1930  uint32 nbatch = (uint32) hashtable->nbatch;
1931 
1932  if (nbatch > 1)
1933  {
1934  *bucketno = hashvalue & (nbuckets - 1);
1935  *batchno = pg_rotate_right32(hashvalue,
1936  hashtable->log2_nbuckets) & (nbatch - 1);
1937  }
1938  else
1939  {
1940  *bucketno = hashvalue & (nbuckets - 1);
1941  *batchno = 0;
1942  }
1943 }
static uint32 pg_rotate_right32(uint32 word, int n)
Definition: pg_bitutils.h:320

References HashJoinTableData::log2_nbuckets, HashJoinTableData::nbatch, HashJoinTableData::nbuckets, and pg_rotate_right32().

Referenced by ExecHashIncreaseNumBatches(), ExecHashIncreaseNumBuckets(), ExecHashJoinImpl(), ExecHashRemoveNextSkewBucket(), ExecHashTableInsert(), ExecParallelHashIncreaseNumBuckets(), ExecParallelHashJoinPartitionOuter(), ExecParallelHashRepartitionFirst(), ExecParallelHashRepartitionRest(), ExecParallelHashTableInsert(), and ExecParallelHashTableInsertCurrentBatch().

◆ ExecHashGetHashValue()

bool ExecHashGetHashValue ( HashJoinTable  hashtable,
ExprContext econtext,
List hashkeys,
bool  outer_tuple,
bool  keep_nulls,
uint32 hashvalue 
)

Definition at line 1816 of file nodeHash.c.

1822 {
1823  uint32 hashkey = 0;
1824  FmgrInfo *hashfunctions;
1825  ListCell *hk;
1826  int i = 0;
1827  MemoryContext oldContext;
1828 
1829  /*
1830  * We reset the eval context each time to reclaim any memory leaked in the
1831  * hashkey expressions.
1832  */
1833  ResetExprContext(econtext);
1834 
1835  oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
1836 
1837  if (outer_tuple)
1838  hashfunctions = hashtable->outer_hashfunctions;
1839  else
1840  hashfunctions = hashtable->inner_hashfunctions;
1841 
1842  foreach(hk, hashkeys)
1843  {
1844  ExprState *keyexpr = (ExprState *) lfirst(hk);
1845  Datum keyval;
1846  bool isNull;
1847 
1848  /* combine successive hashkeys by rotating */
1849  hashkey = pg_rotate_left32(hashkey, 1);
1850 
1851  /*
1852  * Get the join attribute value of the tuple
1853  */
1854  keyval = ExecEvalExpr(keyexpr, econtext, &isNull);
1855 
1856  /*
1857  * If the attribute is NULL, and the join operator is strict, then
1858  * this tuple cannot pass the join qual so we can reject it
1859  * immediately (unless we're scanning the outside of an outer join, in
1860  * which case we must not reject it). Otherwise we act like the
1861  * hashcode of NULL is zero (this will support operators that act like
1862  * IS NOT DISTINCT, though not any more-random behavior). We treat
1863  * the hash support function as strict even if the operator is not.
1864  *
1865  * Note: currently, all hashjoinable operators must be strict since
1866  * the hash index AM assumes that. However, it takes so little extra
1867  * code here to allow non-strict that we may as well do it.
1868  */
1869  if (isNull)
1870  {
1871  if (hashtable->hashStrict[i] && !keep_nulls)
1872  {
1873  MemoryContextSwitchTo(oldContext);
1874  return false; /* cannot match */
1875  }
1876  /* else, leave hashkey unmodified, equivalent to hashcode 0 */
1877  }
1878  else
1879  {
1880  /* Compute the hash function */
1881  uint32 hkey;
1882 
1883  hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i], hashtable->collations[i], keyval));
1884  hashkey ^= hkey;
1885  }
1886 
1887  i++;
1888  }
1889 
1890  MemoryContextSwitchTo(oldContext);
1891 
1892  *hashvalue = hashkey;
1893  return true;
1894 }
#define ResetExprContext(econtext)
Definition: executor.h:543
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:332
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
static uint32 pg_rotate_left32(uint32 word, int n)
Definition: pg_bitutils.h:326
#define lfirst(lc)
Definition: pg_list.h:172
uintptr_t Datum
Definition: postgres.h:64
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:257
bool * hashStrict
Definition: hashjoin.h:353
FmgrInfo * inner_hashfunctions
Definition: hashjoin.h:352

References HashJoinTableData::collations, DatumGetUInt32(), ExprContext::ecxt_per_tuple_memory, ExecEvalExpr(), FunctionCall1Coll(), HashJoinTableData::hashStrict, i, HashJoinTableData::inner_hashfunctions, lfirst, MemoryContextSwitchTo(), HashJoinTableData::outer_hashfunctions, pg_rotate_left32(), and ResetExprContext.

Referenced by ExecHashJoinOuterGetTuple(), ExecParallelHashJoinOuterGetTuple(), ExecParallelHashJoinPartitionOuter(), MultiExecParallelHash(), and MultiExecPrivateHash().

◆ ExecHashGetSkewBucket()

int ExecHashGetSkewBucket ( HashJoinTable  hashtable,
uint32  hashvalue 
)

Definition at line 2520 of file nodeHash.c.

2521 {
2522  int bucket;
2523 
2524  /*
2525  * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization (in
2526  * particular, this happens after the initial batch is done).
2527  */
2528  if (!hashtable->skewEnabled)
2529  return INVALID_SKEW_BUCKET_NO;
2530 
2531  /*
2532  * Since skewBucketLen is a power of 2, we can do a modulo by ANDing.
2533  */
2534  bucket = hashvalue & (hashtable->skewBucketLen - 1);
2535 
2536  /*
2537  * While we have not hit a hole in the hashtable and have not hit the
2538  * desired bucket, we have collided with some other hash value, so try the
2539  * next bucket location.
2540  */
2541  while (hashtable->skewBucket[bucket] != NULL &&
2542  hashtable->skewBucket[bucket]->hashvalue != hashvalue)
2543  bucket = (bucket + 1) & (hashtable->skewBucketLen - 1);
2544 
2545  /*
2546  * Found the desired bucket?
2547  */
2548  if (hashtable->skewBucket[bucket] != NULL)
2549  return bucket;
2550 
2551  /*
2552  * There must not be any hashtable entry for this hash value.
2553  */
2554  return INVALID_SKEW_BUCKET_NO;
2555 }
#define INVALID_SKEW_BUCKET_NO
Definition: hashjoin.h:120

References HashSkewBucket::hashvalue, INVALID_SKEW_BUCKET_NO, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketLen, and HashJoinTableData::skewEnabled.

Referenced by ExecHashJoinImpl(), and MultiExecPrivateHash().

◆ ExecHashIncreaseNumBatches()

static void ExecHashIncreaseNumBatches ( HashJoinTable  hashtable)
static

Definition at line 917 of file nodeHash.c.

918 {
919  int oldnbatch = hashtable->nbatch;
920  int curbatch = hashtable->curbatch;
921  int nbatch;
922  long ninmemory;
923  long nfreed;
924  HashMemoryChunk oldchunks;
925 
926  /* do nothing if we've decided to shut off growth */
927  if (!hashtable->growEnabled)
928  return;
929 
930  /* safety check to avoid overflow */
931  if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2)))
932  return;
933 
934  nbatch = oldnbatch * 2;
935  Assert(nbatch > 1);
936 
937 #ifdef HJDEBUG
938  printf("Hashjoin %p: increasing nbatch to %d because space = %zu\n",
939  hashtable, nbatch, hashtable->spaceUsed);
940 #endif
941 
942  if (hashtable->innerBatchFile == NULL)
943  {
944  MemoryContext oldcxt = MemoryContextSwitchTo(hashtable->spillCxt);
945 
946  /* we had no file arrays before */
947  hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
948  hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
949 
950  MemoryContextSwitchTo(oldcxt);
951 
952  /* time to establish the temp tablespaces, too */
954  }
955  else
956  {
957  /* enlarge arrays and zero out added entries */
958  hashtable->innerBatchFile = repalloc0_array(hashtable->innerBatchFile, BufFile *, oldnbatch, nbatch);
959  hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch, nbatch);
960  }
961 
962  hashtable->nbatch = nbatch;
963 
964  /*
965  * Scan through the existing hash table entries and dump out any that are
966  * no longer of the current batch.
967  */
968  ninmemory = nfreed = 0;
969 
970  /* If know we need to resize nbuckets, we can do it while rebatching. */
971  if (hashtable->nbuckets_optimal != hashtable->nbuckets)
972  {
973  /* we never decrease the number of buckets */
974  Assert(hashtable->nbuckets_optimal > hashtable->nbuckets);
975 
976  hashtable->nbuckets = hashtable->nbuckets_optimal;
977  hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
978 
979  hashtable->buckets.unshared =
980  repalloc_array(hashtable->buckets.unshared,
981  HashJoinTuple, hashtable->nbuckets);
982  }
983 
984  /*
985  * We will scan through the chunks directly, so that we can reset the
986  * buckets now and not have to keep track which tuples in the buckets have
987  * already been processed. We will free the old chunks as we go.
988  */
989  memset(hashtable->buckets.unshared, 0,
990  sizeof(HashJoinTuple) * hashtable->nbuckets);
991  oldchunks = hashtable->chunks;
992  hashtable->chunks = NULL;
993 
994  /* so, let's scan through the old chunks, and all tuples in each chunk */
995  while (oldchunks != NULL)
996  {
997  HashMemoryChunk nextchunk = oldchunks->next.unshared;
998 
999  /* position within the buffer (up to oldchunks->used) */
1000  size_t idx = 0;
1001 
1002  /* process all tuples stored in this chunk (and then free it) */
1003  while (idx < oldchunks->used)
1004  {
1005  HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(oldchunks) + idx);
1006  MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
1007  int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
1008  int bucketno;
1009  int batchno;
1010 
1011  ninmemory++;
1012  ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
1013  &bucketno, &batchno);
1014 
1015  if (batchno == curbatch)
1016  {
1017  /* keep tuple in memory - copy it into the new chunk */
1018  HashJoinTuple copyTuple;
1019 
1020  copyTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
1021  memcpy(copyTuple, hashTuple, hashTupleSize);
1022 
1023  /* and add it back to the appropriate bucket */
1024  copyTuple->next.unshared = hashtable->buckets.unshared[bucketno];
1025  hashtable->buckets.unshared[bucketno] = copyTuple;
1026  }
1027  else
1028  {
1029  /* dump it out */
1030  Assert(batchno > curbatch);
1032  hashTuple->hashvalue,
1033  &hashtable->innerBatchFile[batchno],
1034  hashtable);
1035 
1036  hashtable->spaceUsed -= hashTupleSize;
1037  nfreed++;
1038  }
1039 
1040  /* next tuple in this chunk */
1041  idx += MAXALIGN(hashTupleSize);
1042 
1043  /* allow this loop to be cancellable */
1045  }
1046 
1047  /* we're done with this chunk - free it and proceed to the next one */
1048  pfree(oldchunks);
1049  oldchunks = nextchunk;
1050  }
1051 
1052 #ifdef HJDEBUG
1053  printf("Hashjoin %p: freed %ld of %ld tuples, space now %zu\n",
1054  hashtable, nfreed, ninmemory, hashtable->spaceUsed);
1055 #endif
1056 
1057  /*
1058  * If we dumped out either all or none of the tuples in the table, disable
1059  * further expansion of nbatch. This situation implies that we have
1060  * enough tuples of identical hashvalues to overflow spaceAllowed.
1061  * Increasing nbatch will not fix it since there's no way to subdivide the
1062  * group any more finely. We have to just gut it out and hope the server
1063  * has enough RAM.
1064  */
1065  if (nfreed == 0 || nfreed == ninmemory)
1066  {
1067  hashtable->growEnabled = false;
1068 #ifdef HJDEBUG
1069  printf("Hashjoin %p: disabling further increase of nbatch\n",
1070  hashtable);
1071 #endif
1072  }
1073 }
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:259
void PrepareTempTablespaces(void)
Definition: tablespace.c:1337
#define repalloc_array(pointer, type, count)
Definition: fe_memutils.h:66
#define palloc0_array(type, count)
Definition: fe_memutils.h:65
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:91
void pfree(void *pointer)
Definition: mcxt.c:1456
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
static void * dense_alloc(HashJoinTable hashtable, Size size)
Definition: nodeHash.c:2861
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
Definition: nodeHash.c:1924
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr, HashJoinTable hashtable)
#define repalloc0_array(pointer, type, oldcount, count)
Definition: palloc.h:110
#define printf(...)
Definition: port.h:244
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:311
MemoryContext spillCxt
Definition: hashjoin.h:364
BufFile ** innerBatchFile
Definition: hashjoin.h:343
int log2_nbuckets_optimal
Definition: hashjoin.h:305
BufFile ** outerBatchFile
Definition: hashjoin.h:344
union HashJoinTableData::@99 buckets
uint32 hashvalue
Definition: hashjoin.h:86
union HashJoinTupleData::@97 next
struct HashJoinTupleData * unshared
Definition: hashjoin.h:83

References Assert(), HashJoinTableData::buckets, CHECK_FOR_INTERRUPTS, HashJoinTableData::chunks, HashJoinTableData::curbatch, dense_alloc(), ExecHashGetBucketAndBatch(), ExecHashJoinSaveTuple(), HashJoinTableData::growEnabled, HASH_CHUNK_DATA, HashJoinTupleData::hashvalue, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, idx(), HashJoinTableData::innerBatchFile, HashJoinTableData::log2_nbuckets, HashJoinTableData::log2_nbuckets_optimal, MAXALIGN, MaxAllocSize, MemoryContextSwitchTo(), Min, HashJoinTableData::nbatch, HashJoinTableData::nbuckets, HashJoinTableData::nbuckets_optimal, HashJoinTupleData::next, HashMemoryChunkData::next, HashJoinTableData::outerBatchFile, palloc0_array, pfree(), PrepareTempTablespaces(), printf, repalloc0_array, repalloc_array, HashJoinTableData::spaceUsed, HashJoinTableData::spillCxt, MinimalTupleData::t_len, HashJoinTupleData::unshared, HashMemoryChunkData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashSkewTableInsert(), and ExecHashTableInsert().

◆ ExecHashIncreaseNumBuckets()

static void ExecHashIncreaseNumBuckets ( HashJoinTable  hashtable)
static

Definition at line 1454 of file nodeHash.c.

1455 {
1456  HashMemoryChunk chunk;
1457 
1458  /* do nothing if not an increase (it's called increase for a reason) */
1459  if (hashtable->nbuckets >= hashtable->nbuckets_optimal)
1460  return;
1461 
1462 #ifdef HJDEBUG
1463  printf("Hashjoin %p: increasing nbuckets %d => %d\n",
1464  hashtable, hashtable->nbuckets, hashtable->nbuckets_optimal);
1465 #endif
1466 
1467  hashtable->nbuckets = hashtable->nbuckets_optimal;
1468  hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
1469 
1470  Assert(hashtable->nbuckets > 1);
1471  Assert(hashtable->nbuckets <= (INT_MAX / 2));
1472  Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets));
1473 
1474  /*
1475  * Just reallocate the proper number of buckets - we don't need to walk
1476  * through them - we can walk the dense-allocated chunks (just like in
1477  * ExecHashIncreaseNumBatches, but without all the copying into new
1478  * chunks)
1479  */
1480  hashtable->buckets.unshared =
1481  repalloc_array(hashtable->buckets.unshared,
1482  HashJoinTuple, hashtable->nbuckets);
1483 
1484  memset(hashtable->buckets.unshared, 0,
1485  hashtable->nbuckets * sizeof(HashJoinTuple));
1486 
1487  /* scan through all tuples in all chunks to rebuild the hash table */
1488  for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next.unshared)
1489  {
1490  /* process all tuples stored in this chunk */
1491  size_t idx = 0;
1492 
1493  while (idx < chunk->used)
1494  {
1495  HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + idx);
1496  int bucketno;
1497  int batchno;
1498 
1499  ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
1500  &bucketno, &batchno);
1501 
1502  /* add the tuple to the proper bucket */
1503  hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
1504  hashtable->buckets.unshared[bucketno] = hashTuple;
1505 
1506  /* advance index past the tuple */
1508  HJTUPLE_MINTUPLE(hashTuple)->t_len);
1509  }
1510 
1511  /* allow this loop to be cancellable */
1513  }
1514 }

References Assert(), HashJoinTableData::buckets, CHECK_FOR_INTERRUPTS, HashJoinTableData::chunks, ExecHashGetBucketAndBatch(), HASH_CHUNK_DATA, HashJoinTupleData::hashvalue, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, idx(), HashJoinTableData::log2_nbuckets, HashJoinTableData::log2_nbuckets_optimal, MAXALIGN, HashJoinTableData::nbuckets, HashJoinTableData::nbuckets_optimal, HashJoinTupleData::next, HashMemoryChunkData::next, printf, repalloc_array, HashJoinTupleData::unshared, HashMemoryChunkData::unshared, and HashJoinTableData::unshared.

Referenced by MultiExecPrivateHash().

◆ ExecHashInitializeDSM()

void ExecHashInitializeDSM ( HashState node,
ParallelContext pcxt 
)

Definition at line 2745 of file nodeHash.c.

2746 {
2747  size_t size;
2748 
2749  /* don't need this if not instrumenting or no workers */
2750  if (!node->ps.instrument || pcxt->nworkers == 0)
2751  return;
2752 
2753  size = offsetof(SharedHashInfo, hinstrument) +
2754  pcxt->nworkers * sizeof(HashInstrumentation);
2755  node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size);
2756 
2757  /* Each per-worker area must start out as zeroes. */
2758  memset(node->shared_info, 0, size);
2759 
2760  node->shared_info->num_workers = pcxt->nworkers;
2761  shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id,
2762  node->shared_info);
2763 }
struct HashInstrumentation HashInstrumentation
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
SharedHashInfo * shared_info
Definition: execnodes.h:2671
shm_toc * toc
Definition: parallel.h:45
Plan * plan
Definition: execnodes.h:1037
int plan_node_id
Definition: plannodes.h:152

References PlanState::instrument, SharedHashInfo::num_workers, ParallelContext::nworkers, PlanState::plan, Plan::plan_node_id, HashState::ps, HashState::shared_info, shm_toc_allocate(), shm_toc_insert(), and ParallelContext::toc.

Referenced by ExecParallelInitializeDSM().

◆ ExecHashInitializeWorker()

void ExecHashInitializeWorker ( HashState node,
ParallelWorkerContext pwcxt 
)

Definition at line 2770 of file nodeHash.c.

2771 {
2772  SharedHashInfo *shared_info;
2773 
2774  /* don't need this if not instrumenting */
2775  if (!node->ps.instrument)
2776  return;
2777 
2778  /*
2779  * Find our entry in the shared area, and set up a pointer to it so that
2780  * we'll accumulate stats there when shutting down or rebuilding the hash
2781  * table.
2782  */
2783  shared_info = (SharedHashInfo *)
2784  shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
2785  node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber];
2786 }
int ParallelWorkerNumber
Definition: parallel.c:114
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
HashInstrumentation * hinstrument
Definition: execnodes.h:2678
HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER]
Definition: execnodes.h:2652

References SharedHashInfo::hinstrument, HashState::hinstrument, PlanState::instrument, ParallelWorkerNumber, PlanState::plan, Plan::plan_node_id, HashState::ps, shm_toc_lookup(), and ParallelWorkerContext::toc.

Referenced by ExecParallelInitializeWorker().

◆ ExecHashRemoveNextSkewBucket()

static void ExecHashRemoveNextSkewBucket ( HashJoinTable  hashtable)
static

Definition at line 2612 of file nodeHash.c.

2613 {
2614  int bucketToRemove;
2615  HashSkewBucket *bucket;
2616  uint32 hashvalue;
2617  int bucketno;
2618  int batchno;
2619  HashJoinTuple hashTuple;
2620 
2621  /* Locate the bucket to remove */
2622  bucketToRemove = hashtable->skewBucketNums[hashtable->nSkewBuckets - 1];
2623  bucket = hashtable->skewBucket[bucketToRemove];
2624 
2625  /*
2626  * Calculate which bucket and batch the tuples belong to in the main
2627  * hashtable. They all have the same hash value, so it's the same for all
2628  * of them. Also note that it's not possible for nbatch to increase while
2629  * we are processing the tuples.
2630  */
2631  hashvalue = bucket->hashvalue;
2632  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
2633 
2634  /* Process all tuples in the bucket */
2635  hashTuple = bucket->tuples;
2636  while (hashTuple != NULL)
2637  {
2638  HashJoinTuple nextHashTuple = hashTuple->next.unshared;
2639  MinimalTuple tuple;
2640  Size tupleSize;
2641 
2642  /*
2643  * This code must agree with ExecHashTableInsert. We do not use
2644  * ExecHashTableInsert directly as ExecHashTableInsert expects a
2645  * TupleTableSlot while we already have HashJoinTuples.
2646  */
2647  tuple = HJTUPLE_MINTUPLE(hashTuple);
2648  tupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
2649 
2650  /* Decide whether to put the tuple in the hash table or a temp file */
2651  if (batchno == hashtable->curbatch)
2652  {
2653  /* Move the tuple to the main hash table */
2654  HashJoinTuple copyTuple;
2655 
2656  /*
2657  * We must copy the tuple into the dense storage, else it will not
2658  * be found by, eg, ExecHashIncreaseNumBatches.
2659  */
2660  copyTuple = (HashJoinTuple) dense_alloc(hashtable, tupleSize);
2661  memcpy(copyTuple, hashTuple, tupleSize);
2662  pfree(hashTuple);
2663 
2664  copyTuple->next.unshared = hashtable->buckets.unshared[bucketno];
2665  hashtable->buckets.unshared[bucketno] = copyTuple;
2666 
2667  /* We have reduced skew space, but overall space doesn't change */
2668  hashtable->spaceUsedSkew -= tupleSize;
2669  }
2670  else
2671  {
2672  /* Put the tuple into a temp file for later batches */
2673  Assert(batchno > hashtable->curbatch);
2674  ExecHashJoinSaveTuple(tuple, hashvalue,
2675  &hashtable->innerBatchFile[batchno],
2676  hashtable);
2677  pfree(hashTuple);
2678  hashtable->spaceUsed -= tupleSize;
2679  hashtable->spaceUsedSkew -= tupleSize;
2680  }
2681 
2682  hashTuple = nextHashTuple;
2683 
2684  /* allow this loop to be cancellable */
2686  }
2687 
2688  /*
2689  * Free the bucket struct itself and reset the hashtable entry to NULL.
2690  *
2691  * NOTE: this is not nearly as simple as it looks on the surface, because
2692  * of the possibility of collisions in the hashtable. Suppose that hash
2693  * values A and B collide at a particular hashtable entry, and that A was
2694  * entered first so B gets shifted to a different table entry. If we were
2695  * to remove A first then ExecHashGetSkewBucket would mistakenly start
2696  * reporting that B is not in the hashtable, because it would hit the NULL
2697  * before finding B. However, we always remove entries in the reverse
2698  * order of creation, so this failure cannot happen.
2699  */
2700  hashtable->skewBucket[bucketToRemove] = NULL;
2701  hashtable->nSkewBuckets--;
2702  pfree(bucket);
2703  hashtable->spaceUsed -= SKEW_BUCKET_OVERHEAD;
2704  hashtable->spaceUsedSkew -= SKEW_BUCKET_OVERHEAD;
2705 
2706  /*
2707  * If we have removed all skew buckets then give up on skew optimization.
2708  * Release the arrays since they aren't useful any more.
2709  */
2710  if (hashtable->nSkewBuckets == 0)
2711  {
2712  hashtable->skewEnabled = false;
2713  pfree(hashtable->skewBucket);
2714  pfree(hashtable->skewBucketNums);
2715  hashtable->skewBucket = NULL;
2716  hashtable->skewBucketNums = NULL;
2717  hashtable->spaceUsed -= hashtable->spaceUsedSkew;
2718  hashtable->spaceUsedSkew = 0;
2719  }
2720 }
size_t Size
Definition: c.h:594

References Assert(), HashJoinTableData::buckets, CHECK_FOR_INTERRUPTS, HashJoinTableData::curbatch, dense_alloc(), ExecHashGetBucketAndBatch(), ExecHashJoinSaveTuple(), HashSkewBucket::hashvalue, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, HashJoinTableData::innerBatchFile, HashJoinTupleData::next, HashJoinTableData::nSkewBuckets, pfree(), SKEW_BUCKET_OVERHEAD, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketNums, HashJoinTableData::skewEnabled, HashJoinTableData::spaceUsed, HashJoinTableData::spaceUsedSkew, MinimalTupleData::t_len, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashSkewTableInsert().

◆ ExecHashRetrieveInstrumentation()

void ExecHashRetrieveInstrumentation ( HashState node)

Definition at line 2811 of file nodeHash.c.

2812 {
2813  SharedHashInfo *shared_info = node->shared_info;
2814  size_t size;
2815 
2816  if (shared_info == NULL)
2817  return;
2818 
2819  /* Replace node->shared_info with a copy in backend-local memory. */
2820  size = offsetof(SharedHashInfo, hinstrument) +
2821  shared_info->num_workers * sizeof(HashInstrumentation);
2822  node->shared_info = palloc(size);
2823  memcpy(node->shared_info, shared_info, size);
2824 }
void * palloc(Size size)
Definition: mcxt.c:1226

References SharedHashInfo::num_workers, palloc(), and HashState::shared_info.

Referenced by ExecParallelRetrieveInstrumentation().

◆ ExecHashSkewTableInsert()

static void ExecHashSkewTableInsert ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue,
int  bucketNumber 
)
static

Definition at line 2566 of file nodeHash.c.

2570 {
2571  bool shouldFree;
2572  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
2573  HashJoinTuple hashTuple;
2574  int hashTupleSize;
2575 
2576  /* Create the HashJoinTuple */
2577  hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
2578  hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
2579  hashTupleSize);
2580  hashTuple->hashvalue = hashvalue;
2581  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
2583 
2584  /* Push it onto the front of the skew bucket's list */
2585  hashTuple->next.unshared = hashtable->skewBucket[bucketNumber]->tuples;
2586  hashtable->skewBucket[bucketNumber]->tuples = hashTuple;
2587  Assert(hashTuple != hashTuple->next.unshared);
2588 
2589  /* Account for space used, and back off if we've used too much */
2590  hashtable->spaceUsed += hashTupleSize;
2591  hashtable->spaceUsedSkew += hashTupleSize;
2592  if (hashtable->spaceUsed > hashtable->spacePeak)
2593  hashtable->spacePeak = hashtable->spaceUsed;
2594  while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
2595  ExecHashRemoveNextSkewBucket(hashtable);
2596 
2597  /* Check we are not over the total spaceAllowed, either */
2598  if (hashtable->spaceUsed > hashtable->spaceAllowed)
2599  ExecHashIncreaseNumBatches(hashtable);
2600 
2601  if (shouldFree)
2602  heap_free_minimal_tuple(tuple);
2603 }
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1693
void heap_free_minimal_tuple(MinimalTuple mtup)
Definition: heaptuple.c:1515
#define HeapTupleHeaderClearMatch(tup)
Definition: htup_details.h:524
static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
Definition: nodeHash.c:2612
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
Definition: nodeHash.c:917
Size spaceAllowedSkew
Definition: hashjoin.h:360

References Assert(), HashJoinTableData::batchCxt, ExecFetchSlotMinimalTuple(), ExecHashIncreaseNumBatches(), ExecHashRemoveNextSkewBucket(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, MemoryContextAlloc(), HashJoinTupleData::next, HashJoinTableData::skewBucket, HashJoinTableData::spaceAllowed, HashJoinTableData::spaceAllowedSkew, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, HashJoinTableData::spaceUsedSkew, MinimalTupleData::t_len, HashSkewBucket::tuples, and HashJoinTupleData::unshared.

Referenced by MultiExecPrivateHash().

◆ ExecHashTableCreate()

HashJoinTable ExecHashTableCreate ( HashState state,
List hashOperators,
List hashCollations,
bool  keepNulls 
)

Definition at line 433 of file nodeHash.c.

434 {
435  Hash *node;
436  HashJoinTable hashtable;
437  Plan *outerNode;
438  size_t space_allowed;
439  int nbuckets;
440  int nbatch;
441  double rows;
442  int num_skew_mcvs;
443  int log2_nbuckets;
444  int nkeys;
445  int i;
446  ListCell *ho;
447  ListCell *hc;
448  MemoryContext oldcxt;
449 
450  /*
451  * Get information about the size of the relation to be hashed (it's the
452  * "outer" subtree of this node, but the inner relation of the hashjoin).
453  * Compute the appropriate size of the hash table.
454  */
455  node = (Hash *) state->ps.plan;
456  outerNode = outerPlan(node);
457 
458  /*
459  * If this is shared hash table with a partial plan, then we can't use
460  * outerNode->plan_rows to estimate its size. We need an estimate of the
461  * total number of rows across all copies of the partial plan.
462  */
463  rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows;
464 
465  ExecChooseHashTableSize(rows, outerNode->plan_width,
466  OidIsValid(node->skewTable),
467  state->parallel_state != NULL,
468  state->parallel_state != NULL ?
469  state->parallel_state->nparticipants - 1 : 0,
470  &space_allowed,
471  &nbuckets, &nbatch, &num_skew_mcvs);
472 
473  /* nbuckets must be a power of 2 */
474  log2_nbuckets = my_log2(nbuckets);
475  Assert(nbuckets == (1 << log2_nbuckets));
476 
477  /*
478  * Initialize the hash table control block.
479  *
480  * The hashtable control block is just palloc'd from the executor's
481  * per-query memory context. Everything else should be kept inside the
482  * subsidiary hashCxt, batchCxt or spillCxt.
483  */
484  hashtable = palloc_object(HashJoinTableData);
485  hashtable->nbuckets = nbuckets;
486  hashtable->nbuckets_original = nbuckets;
487  hashtable->nbuckets_optimal = nbuckets;
488  hashtable->log2_nbuckets = log2_nbuckets;
489  hashtable->log2_nbuckets_optimal = log2_nbuckets;
490  hashtable->buckets.unshared = NULL;
491  hashtable->keepNulls = keepNulls;
492  hashtable->skewEnabled = false;
493  hashtable->skewBucket = NULL;
494  hashtable->skewBucketLen = 0;
495  hashtable->nSkewBuckets = 0;
496  hashtable->skewBucketNums = NULL;
497  hashtable->nbatch = nbatch;
498  hashtable->curbatch = 0;
499  hashtable->nbatch_original = nbatch;
500  hashtable->nbatch_outstart = nbatch;
501  hashtable->growEnabled = true;
502  hashtable->totalTuples = 0;
503  hashtable->partialTuples = 0;
504  hashtable->skewTuples = 0;
505  hashtable->innerBatchFile = NULL;
506  hashtable->outerBatchFile = NULL;
507  hashtable->spaceUsed = 0;
508  hashtable->spacePeak = 0;
509  hashtable->spaceAllowed = space_allowed;
510  hashtable->spaceUsedSkew = 0;
511  hashtable->spaceAllowedSkew =
512  hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
513  hashtable->chunks = NULL;
514  hashtable->current_chunk = NULL;
515  hashtable->parallel_state = state->parallel_state;
516  hashtable->area = state->ps.state->es_query_dsa;
517  hashtable->batches = NULL;
518 
519 #ifdef HJDEBUG
520  printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
521  hashtable, nbatch, nbuckets);
522 #endif
523 
524  /*
525  * Create temporary memory contexts in which to keep the hashtable working
526  * storage. See notes in executor/hashjoin.h.
527  */
529  "HashTableContext",
531 
532  hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
533  "HashBatchContext",
535 
536  hashtable->spillCxt = AllocSetContextCreate(hashtable->hashCxt,
537  "HashSpillContext",
539 
540  /* Allocate data that will live for the life of the hashjoin */
541 
542  oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
543 
544  /*
545  * Get info about the hash functions to be used for each hash key. Also
546  * remember whether the join operators are strict.
547  */
548  nkeys = list_length(hashOperators);
549  hashtable->outer_hashfunctions = palloc_array(FmgrInfo, nkeys);
550  hashtable->inner_hashfunctions = palloc_array(FmgrInfo, nkeys);
551  hashtable->hashStrict = palloc_array(bool, nkeys);
552  hashtable->collations = palloc_array(Oid, nkeys);
553  i = 0;
554  forboth(ho, hashOperators, hc, hashCollations)
555  {
556  Oid hashop = lfirst_oid(ho);
557  Oid left_hashfn;
558  Oid right_hashfn;
559 
560  if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
561  elog(ERROR, "could not find hash function for hash operator %u",
562  hashop);
563  fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]);
564  fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]);
565  hashtable->hashStrict[i] = op_strict(hashop);
566  hashtable->collations[i] = lfirst_oid(hc);
567  i++;
568  }
569 
570  if (nbatch > 1 && hashtable->parallel_state == NULL)
571  {
572  MemoryContext oldctx;
573 
574  /*
575  * allocate and initialize the file arrays in hashCxt (not needed for
576  * parallel case which uses shared tuplestores instead of raw files)
577  */
578  oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
579 
580  hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
581  hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
582 
583  MemoryContextSwitchTo(oldctx);
584 
585  /* The files will not be opened until needed... */
586  /* ... but make sure we have temp tablespaces established for them */
588  }
589 
590  MemoryContextSwitchTo(oldcxt);
591 
592  if (hashtable->parallel_state)
593  {
594  ParallelHashJoinState *pstate = hashtable->parallel_state;
595  Barrier *build_barrier;
596 
597  /*
598  * Attach to the build barrier. The corresponding detach operation is
599  * in ExecHashTableDetach. Note that we won't attach to the
600  * batch_barrier for batch 0 yet. We'll attach later and start it out
601  * in PHJ_BATCH_PROBE phase, because batch 0 is allocated up front and
602  * then loaded while hashing (the standard hybrid hash join
603  * algorithm), and we'll coordinate that using build_barrier.
604  */
605  build_barrier = &pstate->build_barrier;
606  BarrierAttach(build_barrier);
607 
608  /*
609  * So far we have no idea whether there are any other participants,
610  * and if so, what phase they are working on. The only thing we care
611  * about at this point is whether someone has already created the
612  * SharedHashJoinBatch objects and the hash table for batch 0. One
613  * backend will be elected to do that now if necessary.
614  */
615  if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECT &&
616  BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECT))
617  {
618  pstate->nbatch = nbatch;
619  pstate->space_allowed = space_allowed;
620  pstate->growth = PHJ_GROWTH_OK;
621 
622  /* Set up the shared state for coordinating batches. */
623  ExecParallelHashJoinSetUpBatches(hashtable, nbatch);
624 
625  /*
626  * Allocate batch 0's hash table up front so we can load it
627  * directly while hashing.
628  */
629  pstate->nbuckets = nbuckets;
630  ExecParallelHashTableAlloc(hashtable, 0);
631  }
632 
633  /*
634  * The next Parallel Hash synchronization point is in
635  * MultiExecParallelHash(), which will progress it all the way to
636  * PHJ_BUILD_RUN. The caller must not return control from this
637  * executor node between now and then.
638  */
639  }
640  else
641  {
642  /*
643  * Prepare context for the first-scan space allocations; allocate the
644  * hashbucket array therein, and set each bucket "empty".
645  */
646  MemoryContextSwitchTo(hashtable->batchCxt);
647 
648  hashtable->buckets.unshared = palloc0_array(HashJoinTuple, nbuckets);
649 
650  /*
651  * Set up for skew optimization, if possible and there's a need for
652  * more than one batch. (In a one-batch join, there's no point in
653  * it.)
654  */
655  if (nbatch > 1)
656  ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
657 
658  MemoryContextSwitchTo(oldcxt);
659  }
660 
661  return hashtable;
662 }
int BarrierAttach(Barrier *barrier)
Definition: barrier.c:236
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:265
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
Definition: barrier.c:125
int my_log2(long num)
Definition: dynahash.c:1760
#define palloc_object(type)
Definition: fe_memutils.h:62
#define palloc_array(type, count)
Definition: fe_memutils.h:64
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:127
@ PHJ_GROWTH_OK
Definition: hashjoin.h:233
#define PHJ_BUILD_ELECT
Definition: hashjoin.h:269
bool op_strict(Oid opno)
Definition: lsyscache.c:1481
bool get_op_hash_functions(Oid opno, RegProcedure *lhs_procno, RegProcedure *rhs_procno)
Definition: lsyscache.c:509
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
Definition: nodeHash.c:2367
static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
Definition: nodeHash.c:3089
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
Definition: nodeHash.c:3254
static int list_length(const List *l)
Definition: pg_list.h:152
#define forboth(cell1, list1, cell2, list2)
Definition: pg_list.h:467
#define lfirst_oid(lc)
Definition: pg_list.h:174
unsigned int Oid
Definition: postgres_ext.h:31
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:373
MemoryContext hashCxt
Definition: hashjoin.h:362
double totalTuples
Definition: hashjoin.h:332
double partialTuples
Definition: hashjoin.h:333
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:372
HashMemoryChunk current_chunk
Definition: hashjoin.h:370
dsa_area * area
Definition: hashjoin.h:371
double skewTuples
Definition: hashjoin.h:334
Cardinality rows_total
Definition: plannodes.h:1208
Plan plan
Definition: plannodes.h:1197
ParallelHashGrowth growth
Definition: hashjoin.h:253
bool parallel_aware
Definition: plannodes.h:141
int plan_width
Definition: plannodes.h:136
Cardinality plan_rows
Definition: plannodes.h:135
Definition: regguts.h:323

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, HashJoinTableData::area, Assert(), BarrierArriveAndWait(), BarrierAttach(), BarrierPhase(), HashJoinTableData::batchCxt, HashJoinTableData::batches, HashJoinTableData::buckets, ParallelHashJoinState::build_barrier, HashJoinTableData::chunks, HashJoinTableData::collations, HashJoinTableData::curbatch, HashJoinTableData::current_chunk, CurrentMemoryContext, elog(), ERROR, ExecChooseHashTableSize(), ExecHashBuildSkewHash(), ExecParallelHashJoinSetUpBatches(), ExecParallelHashTableAlloc(), fmgr_info(), forboth, get_op_hash_functions(), HashJoinTableData::growEnabled, ParallelHashJoinState::growth, HashJoinTableData::hashCxt, HashJoinTableData::hashStrict, i, HashJoinTableData::inner_hashfunctions, HashJoinTableData::innerBatchFile, HashJoinTableData::keepNulls, lfirst_oid, list_length(), HashJoinTableData::log2_nbuckets, HashJoinTableData::log2_nbuckets_optimal, MemoryContextSwitchTo(), my_log2(), ParallelHashJoinState::nbatch, HashJoinTableData::nbatch, HashJoinTableData::nbatch_original, HashJoinTableData::nbatch_outstart, ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, HashJoinTableData::nbuckets_optimal, HashJoinTableData::nbuckets_original, HashJoinTableData::nSkewBuckets, OidIsValid, op_strict(), HashJoinTableData::outer_hashfunctions, HashJoinTableData::outerBatchFile, outerPlan, palloc0_array, palloc_array, palloc_object, Plan::parallel_aware, HashJoinTableData::parallel_state, HashJoinTableData::partialTuples, PHJ_BUILD_ELECT, PHJ_GROWTH_OK, Hash::plan, Plan::plan_rows, Plan::plan_width, PrepareTempTablespaces(), printf, Hash::rows_total, SKEW_HASH_MEM_PERCENT, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketLen, HashJoinTableData::skewBucketNums, HashJoinTableData::skewEnabled, Hash::skewTable, HashJoinTableData::skewTuples, ParallelHashJoinState::space_allowed, HashJoinTableData::spaceAllowed, HashJoinTableData::spaceAllowedSkew, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, HashJoinTableData::spaceUsedSkew, HashJoinTableData::spillCxt, HashJoinTableData::totalTuples, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

◆ ExecHashTableDestroy()

void ExecHashTableDestroy ( HashJoinTable  hashtable)

Definition at line 884 of file nodeHash.c.

885 {
886  int i;
887 
888  /*
889  * Make sure all the temp files are closed. We skip batch 0, since it
890  * can't have any temp files (and the arrays might not even exist if
891  * nbatch is only 1). Parallel hash joins don't use these files.
892  */
893  if (hashtable->innerBatchFile != NULL)
894  {
895  for (i = 1; i < hashtable->nbatch; i++)
896  {
897  if (hashtable->innerBatchFile[i])
898  BufFileClose(hashtable->innerBatchFile[i]);
899  if (hashtable->outerBatchFile[i])
900  BufFileClose(hashtable->outerBatchFile[i]);
901  }
902  }
903 
904  /* Release working memory (batchCxt is a child, so it goes away too) */
905  MemoryContextDelete(hashtable->hashCxt);
906 
907  /* And drop the control block */
908  pfree(hashtable);
909 }
void BufFileClose(BufFile *file)
Definition: buffile.c:412
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:403

References BufFileClose(), HashJoinTableData::hashCxt, i, HashJoinTableData::innerBatchFile, MemoryContextDelete(), HashJoinTableData::nbatch, HashJoinTableData::outerBatchFile, and pfree().

Referenced by ExecEndHashJoin(), and ExecReScanHashJoin().

◆ ExecHashTableDetach()

void ExecHashTableDetach ( HashJoinTable  hashtable)

Definition at line 3366 of file nodeHash.c.

3367 {
3368  ParallelHashJoinState *pstate = hashtable->parallel_state;
3369 
3370  /*
3371  * If we're involved in a parallel query, we must either have gotten all
3372  * the way to PHJ_BUILD_RUN, or joined too late and be in PHJ_BUILD_FREE.
3373  */
3374  Assert(!pstate ||
3376 
3377  if (pstate && BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_RUN)
3378  {
3379  int i;
3380 
3381  /* Make sure any temporary files are closed. */
3382  if (hashtable->batches)
3383  {
3384  for (i = 0; i < hashtable->nbatch; ++i)
3385  {
3386  sts_end_write(hashtable->batches[i].inner_tuples);
3387  sts_end_write(hashtable->batches[i].outer_tuples);
3390  }
3391  }
3392 
3393  /* If we're last to detach, clean up shared memory. */
3394  if (BarrierArriveAndDetach(&pstate->build_barrier))
3395  {
3396  /*
3397  * Late joining processes will see this state and give up
3398  * immediately.
3399  */
3401 
3402  if (DsaPointerIsValid(pstate->batches))
3403  {
3404  dsa_free(hashtable->area, pstate->batches);
3405  pstate->batches = InvalidDsaPointer;
3406  }
3407  }
3408  }
3409  hashtable->parallel_state = NULL;
3410 }
bool BarrierArriveAndDetach(Barrier *barrier)
Definition: barrier.c:203
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:833
#define InvalidDsaPointer
Definition: dsa.h:78
#define DsaPointerIsValid(x)
Definition: dsa.h:81
#define PHJ_BUILD_FREE
Definition: hashjoin.h:274
#define PHJ_BUILD_RUN
Definition: hashjoin.h:273
void sts_end_write(SharedTuplestoreAccessor *accessor)
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * outer_tuples
Definition: hashjoin.h:221
SharedTuplestoreAccessor * inner_tuples
Definition: hashjoin.h:220
dsa_pointer batches
Definition: hashjoin.h:248

References HashJoinTableData::area, Assert(), BarrierArriveAndDetach(), BarrierPhase(), ParallelHashJoinState::batches, HashJoinTableData::batches, ParallelHashJoinState::build_barrier, dsa_free(), DsaPointerIsValid, i, ParallelHashJoinBatchAccessor::inner_tuples, InvalidDsaPointer, HashJoinTableData::nbatch, ParallelHashJoinBatchAccessor::outer_tuples, HashJoinTableData::parallel_state, PHJ_BUILD_FREE, PHJ_BUILD_RUN, sts_end_parallel_scan(), and sts_end_write().

Referenced by ExecHashJoinReInitializeDSM(), and ExecShutdownHashJoin().

◆ ExecHashTableDetachBatch()

void ExecHashTableDetachBatch ( HashJoinTable  hashtable)

Definition at line 3274 of file nodeHash.c.

3275 {
3276  if (hashtable->parallel_state != NULL &&
3277  hashtable->curbatch >= 0)
3278  {
3279  int curbatch = hashtable->curbatch;
3280  ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
3281  bool attached = true;
3282 
3283  /* Make sure any temporary files are closed. */
3284  sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
3285  sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
3286 
3287  /* After attaching we always get at least to PHJ_BATCH_PROBE. */
3290 
3291  /*
3292  * If we're abandoning the PHJ_BATCH_PROBE phase early without having
3293  * reached the end of it, it means the plan doesn't want any more
3294  * tuples, and it is happy to abandon any tuples buffered in this
3295  * process's subplans. For correctness, we can't allow any process to
3296  * execute the PHJ_BATCH_SCAN phase, because we will never have the
3297  * complete set of match bits. Therefore we skip emitting unmatched
3298  * tuples in all backends (if this is a full/right join), as if those
3299  * tuples were all due to be emitted by this process and it has
3300  * abandoned them too.
3301  */
3302  if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE &&
3303  !hashtable->batches[curbatch].outer_eof)
3304  {
3305  /*
3306  * This flag may be written to by multiple backends during
3307  * PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN
3308  * phase so requires no extra locking.
3309  */
3310  batch->skip_unmatched = true;
3311  }
3312 
3313  /*
3314  * Even if we aren't doing a full/right outer join, we'll step through
3315  * the PHJ_BATCH_SCAN phase just to maintain the invariant that
3316  * freeing happens in PHJ_BATCH_FREE, but that'll be wait-free.
3317  */
3318  if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE)
3319  attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
3320  if (attached && BarrierArriveAndDetach(&batch->batch_barrier))
3321  {
3322  /*
3323  * We are not longer attached to the batch barrier, but we're the
3324  * process that was chosen to free resources and it's safe to
3325  * assert the current phase. The ParallelHashJoinBatch can't go
3326  * away underneath us while we are attached to the build barrier,
3327  * making this access safe.
3328  */
3330 
3331  /* Free shared chunks and buckets. */
3332  while (DsaPointerIsValid(batch->chunks))
3333  {
3334  HashMemoryChunk chunk =
3335  dsa_get_address(hashtable->area, batch->chunks);
3336  dsa_pointer next = chunk->next.shared;
3337 
3338  dsa_free(hashtable->area, batch->chunks);
3339  batch->chunks = next;
3340  }
3341  if (DsaPointerIsValid(batch->buckets))
3342  {
3343  dsa_free(hashtable->area, batch->buckets);
3344  batch->buckets = InvalidDsaPointer;
3345  }
3346  }
3347 
3348  /*
3349  * Track the largest batch we've been attached to. Though each
3350  * backend might see a different subset of batches, explain.c will
3351  * scan the results from all backends to find the largest value.
3352  */
3353  hashtable->spacePeak =
3354  Max(hashtable->spacePeak,
3355  batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
3356 
3357  /* Remember that we are not attached to a batch. */
3358  hashtable->curbatch = -1;
3359  }
3360 }
bool BarrierArriveAndDetachExceptLast(Barrier *barrier)
Definition: barrier.c:213
static int32 next
Definition: blutils.c:219
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:949
uint64 dsa_pointer
Definition: dsa.h:62
#define PHJ_BATCH_SCAN
Definition: hashjoin.h:281
#define PHJ_BATCH_PROBE
Definition: hashjoin.h:280
#define PHJ_BATCH_FREE
Definition: hashjoin.h:282
dsa_pointer shared
Definition: hashjoin.h:138
ParallelHashJoinBatch * shared
Definition: hashjoin.h:209
dsa_pointer chunks
Definition: hashjoin.h:167
dsa_pointer buckets
Definition: hashjoin.h:164

References HashJoinTableData::area, Assert(), BarrierArriveAndDetach(), BarrierArriveAndDetachExceptLast(), BarrierPhase(), ParallelHashJoinBatch::batch_barrier, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, ParallelHashJoinBatch::chunks, HashJoinTableData::curbatch, dsa_free(), dsa_get_address(), DsaPointerIsValid, ParallelHashJoinBatchAccessor::inner_tuples, InvalidDsaPointer, Max, HashJoinTableData::nbuckets, next, HashMemoryChunkData::next, ParallelHashJoinBatchAccessor::outer_eof, ParallelHashJoinBatchAccessor::outer_tuples, HashJoinTableData::parallel_state, PHJ_BATCH_FREE, PHJ_BATCH_PROBE, PHJ_BATCH_SCAN, HashMemoryChunkData::shared, ParallelHashJoinBatchAccessor::shared, ParallelHashJoinBatch::size, ParallelHashJoinBatch::skip_unmatched, HashJoinTableData::spacePeak, and sts_end_parallel_scan().

Referenced by ExecHashJoinReInitializeDSM(), ExecParallelHashJoinNewBatch(), ExecParallelPrepHashTableForUnmatched(), and ExecShutdownHashJoin().

◆ ExecHashTableInsert()

void ExecHashTableInsert ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1616 of file nodeHash.c.

1619 {
1620  bool shouldFree;
1621  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1622  int bucketno;
1623  int batchno;
1624 
1625  ExecHashGetBucketAndBatch(hashtable, hashvalue,
1626  &bucketno, &batchno);
1627 
1628  /*
1629  * decide whether to put the tuple in the hash table or a temp file
1630  */
1631  if (batchno == hashtable->curbatch)
1632  {
1633  /*
1634  * put the tuple in hash table
1635  */
1636  HashJoinTuple hashTuple;
1637  int hashTupleSize;
1638  double ntuples = (hashtable->totalTuples - hashtable->skewTuples);
1639 
1640  /* Create the HashJoinTuple */
1641  hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
1642  hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
1643 
1644  hashTuple->hashvalue = hashvalue;
1645  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1646 
1647  /*
1648  * We always reset the tuple-matched flag on insertion. This is okay
1649  * even when reloading a tuple from a batch file, since the tuple
1650  * could not possibly have been matched to an outer tuple before it
1651  * went into the batch file.
1652  */
1654 
1655  /* Push it onto the front of the bucket's list */
1656  hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
1657  hashtable->buckets.unshared[bucketno] = hashTuple;
1658 
1659  /*
1660  * Increase the (optimal) number of buckets if we just exceeded the
1661  * NTUP_PER_BUCKET threshold, but only when there's still a single
1662  * batch.
1663  */
1664  if (hashtable->nbatch == 1 &&
1665  ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))
1666  {
1667  /* Guard against integer overflow and alloc size overflow */
1668  if (hashtable->nbuckets_optimal <= INT_MAX / 2 &&
1669  hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple))
1670  {
1671  hashtable->nbuckets_optimal *= 2;
1672  hashtable->log2_nbuckets_optimal += 1;
1673  }
1674  }
1675 
1676  /* Account for space used, and back off if we've used too much */
1677  hashtable->spaceUsed += hashTupleSize;
1678  if (hashtable->spaceUsed > hashtable->spacePeak)
1679  hashtable->spacePeak = hashtable->spaceUsed;
1680  if (hashtable->spaceUsed +
1681  hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
1682  > hashtable->spaceAllowed)
1683  ExecHashIncreaseNumBatches(hashtable);
1684  }
1685  else
1686  {
1687  /*
1688  * put the tuple into a temp file for later batches
1689  */
1690  Assert(batchno > hashtable->curbatch);
1691  ExecHashJoinSaveTuple(tuple,
1692  hashvalue,
1693  &hashtable->innerBatchFile[batchno],
1694  hashtable);
1695  }
1696 
1697  if (shouldFree)
1698  heap_free_minimal_tuple(tuple);
1699 }

References Assert(), HashJoinTableData::buckets, HashJoinTableData::curbatch, dense_alloc(), ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecHashIncreaseNumBatches(), ExecHashJoinSaveTuple(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, HashJoinTableData::innerBatchFile, HashJoinTableData::log2_nbuckets_optimal, MaxAllocSize, HashJoinTableData::nbatch, HashJoinTableData::nbuckets_optimal, HashJoinTupleData::next, NTUP_PER_BUCKET, HashJoinTableData::skewTuples, HashJoinTableData::spaceAllowed, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, MinimalTupleData::t_len, HashJoinTableData::totalTuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinNewBatch(), and MultiExecPrivateHash().

◆ ExecHashTableReset()

void ExecHashTableReset ( HashJoinTable  hashtable)

Definition at line 2291 of file nodeHash.c.

2292 {
2293  MemoryContext oldcxt;
2294  int nbuckets = hashtable->nbuckets;
2295 
2296  /*
2297  * Release all the hash buckets and tuples acquired in the prior pass, and
2298  * reinitialize the context for a new pass.
2299  */
2300  MemoryContextReset(hashtable->batchCxt);
2301  oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
2302 
2303  /* Reallocate and reinitialize the hash bucket headers. */
2304  hashtable->buckets.unshared = palloc0_array(HashJoinTuple, nbuckets);
2305 
2306  hashtable->spaceUsed = 0;
2307 
2308  MemoryContextSwitchTo(oldcxt);
2309 
2310  /* Forget the chunks (the memory was freed by the context reset above). */
2311  hashtable->chunks = NULL;
2312 }
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:330

References HashJoinTableData::batchCxt, HashJoinTableData::buckets, HashJoinTableData::chunks, MemoryContextReset(), MemoryContextSwitchTo(), HashJoinTableData::nbuckets, palloc0_array, HashJoinTableData::spaceUsed, and HashJoinTableData::unshared.

Referenced by ExecHashJoinNewBatch().

◆ ExecHashTableResetMatchFlags()

void ExecHashTableResetMatchFlags ( HashJoinTable  hashtable)

Definition at line 2319 of file nodeHash.c.

2320 {
2321  HashJoinTuple tuple;
2322  int i;
2323 
2324  /* Reset all flags in the main table ... */
2325  for (i = 0; i < hashtable->nbuckets; i++)
2326  {
2327  for (tuple = hashtable->buckets.unshared[i]; tuple != NULL;
2328  tuple = tuple->next.unshared)
2330  }
2331 
2332  /* ... and the same for the skew buckets, if any */
2333  for (i = 0; i < hashtable->nSkewBuckets; i++)
2334  {
2335  int j = hashtable->skewBucketNums[i];
2336  HashSkewBucket *skewBucket = hashtable->skewBucket[j];
2337 
2338  for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared)
2340  }
2341 }
int j
Definition: isn.c:74

References HashJoinTableData::buckets, HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, i, j, HashJoinTableData::nbuckets, HashJoinTupleData::next, HashJoinTableData::nSkewBuckets, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketNums, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecReScanHashJoin().

◆ ExecInitHash()

HashState* ExecInitHash ( Hash node,
EState estate,
int  eflags 
)

Definition at line 361 of file nodeHash.c.

362 {
363  HashState *hashstate;
364 
365  /* check for unsupported flags */
366  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
367 
368  /*
369  * create state structure
370  */
371  hashstate = makeNode(HashState);
372  hashstate->ps.plan = (Plan *) node;
373  hashstate->ps.state = estate;
374  hashstate->ps.ExecProcNode = ExecHash;
375  hashstate->hashtable = NULL;
376  hashstate->hashkeys = NIL; /* will be set by parent HashJoin */
377 
378  /*
379  * Miscellaneous initialization
380  *
381  * create expression context for node
382  */
383  ExecAssignExprContext(estate, &hashstate->ps);
384 
385  /*
386  * initialize child nodes
387  */
388  outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags);
389 
390  /*
391  * initialize our result slot and type. No need to build projection
392  * because this node doesn't do projections.
393  */
395  hashstate->ps.ps_ProjInfo = NULL;
396 
397  /*
398  * initialize child expressions
399  */
400  Assert(node->plan.qual == NIL);
401  hashstate->hashkeys =
402  ExecInitExprList(node->hashkeys, (PlanState *) hashstate);
403 
404  return hashstate;
405 }
List * ExecInitExprList(List *nodes, PlanState *parent)
Definition: execExpr.c:323
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:142
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1800
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:85
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:488
#define EXEC_FLAG_BACKWARD
Definition: executor.h:68
#define EXEC_FLAG_MARK
Definition: executor.h:69
static TupleTableSlot * ExecHash(PlanState *pstate)
Definition: nodeHash.c:92
#define makeNode(_type_)
Definition: nodes.h:176
#define NIL
Definition: pg_list.h:68
HashJoinTable hashtable
Definition: execnodes.h:2662
List * hashkeys
Definition: execnodes.h:2663
List * hashkeys
Definition: plannodes.h:1203
EState * state
Definition: execnodes.h:1039
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:1077
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:1043
List * qual
Definition: plannodes.h:154

References Assert(), EXEC_FLAG_BACKWARD, EXEC_FLAG_MARK, ExecAssignExprContext(), ExecHash(), ExecInitExprList(), ExecInitNode(), ExecInitResultTupleSlotTL(), PlanState::ExecProcNode, HashState::hashkeys, Hash::hashkeys, HashState::hashtable, makeNode, NIL, outerPlan, outerPlanState, PlanState::plan, Hash::plan, HashState::ps, PlanState::ps_ProjInfo, Plan::qual, PlanState::state, and TTSOpsMinimalTuple.

Referenced by ExecInitNode().

◆ ExecParallelHashCloseBatchAccessors()

static void ExecParallelHashCloseBatchAccessors ( HashJoinTable  hashtable)
static

Definition at line 3169 of file nodeHash.c.

3170 {
3171  int i;
3172 
3173  for (i = 0; i < hashtable->nbatch; ++i)
3174  {
3175  /* Make sure no files are left open. */
3176  sts_end_write(hashtable->batches[i].inner_tuples);
3177  sts_end_write(hashtable->batches[i].outer_tuples);
3180  }
3181  pfree(hashtable->batches);
3182  hashtable->batches = NULL;
3183 }

References HashJoinTableData::batches, i, ParallelHashJoinBatchAccessor::inner_tuples, HashJoinTableData::nbatch, ParallelHashJoinBatchAccessor::outer_tuples, pfree(), sts_end_parallel_scan(), and sts_end_write().

Referenced by ExecParallelHashEnsureBatchAccessors(), and ExecParallelHashIncreaseNumBatches().

◆ ExecParallelHashEnsureBatchAccessors()

static void ExecParallelHashEnsureBatchAccessors ( HashJoinTable  hashtable)
static

Definition at line 3190 of file nodeHash.c.

3191 {
3192  ParallelHashJoinState *pstate = hashtable->parallel_state;
3193  ParallelHashJoinBatch *batches;
3194  MemoryContext oldcxt;
3195  int i;
3196 
3197  if (hashtable->batches != NULL)
3198  {
3199  if (hashtable->nbatch == pstate->nbatch)
3200  return;
3202  }
3203 
3204  /*
3205  * We should never see a state where the batch-tracking array is freed,
3206  * because we should have given up sooner if we join when the build
3207  * barrier has reached the PHJ_BUILD_FREE phase.
3208  */
3209  Assert(DsaPointerIsValid(pstate->batches));
3210 
3211  /*
3212  * Use hash join spill memory context to allocate accessors, including
3213  * buffers for the temporary files.
3214  */
3215  oldcxt = MemoryContextSwitchTo(hashtable->spillCxt);
3216 
3217  /* Allocate this backend's accessor array. */
3218  hashtable->nbatch = pstate->nbatch;
3219  hashtable->batches =
3221 
3222  /* Find the base of the pseudo-array of ParallelHashJoinBatch objects. */
3223  batches = (ParallelHashJoinBatch *)
3224  dsa_get_address(hashtable->area, pstate->batches);
3225 
3226  /* Set up the accessor array and attach to the tuplestores. */
3227  for (i = 0; i < hashtable->nbatch; ++i)
3228  {
3229  ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i];
3230  ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i);
3231 
3232  accessor->shared = shared;
3233  accessor->preallocated = 0;
3234  accessor->done = false;
3235  accessor->outer_eof = false;
3236  accessor->inner_tuples =
3239  &pstate->fileset);
3240  accessor->outer_tuples =
3242  pstate->nparticipants),
3244  &pstate->fileset);
3245  }
3246 
3247  MemoryContextSwitchTo(oldcxt);
3248 }
#define ParallelHashJoinBatchInner(batch)
Definition: hashjoin.h:182
#define NthParallelHashJoinBatch(base, n)
Definition: hashjoin.h:198
#define ParallelHashJoinBatchOuter(batch, nparticipants)
Definition: hashjoin.h:187
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable)
Definition: nodeHash.c:3169
SharedTuplestoreAccessor * sts_attach(SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
SharedFileSet fileset
Definition: hashjoin.h:265

References HashJoinTableData::area, Assert(), ParallelHashJoinState::batches, HashJoinTableData::batches, ParallelHashJoinBatchAccessor::done, dsa_get_address(), DsaPointerIsValid, ExecParallelHashCloseBatchAccessors(), ParallelHashJoinState::fileset, i, ParallelHashJoinBatchAccessor::inner_tuples, MemoryContextSwitchTo(), ParallelHashJoinState::nbatch, HashJoinTableData::nbatch, ParallelHashJoinState::nparticipants, NthParallelHashJoinBatch, ParallelHashJoinBatchAccessor::outer_eof, ParallelHashJoinBatchAccessor::outer_tuples, palloc0_array, HashJoinTableData::parallel_state, ParallelHashJoinBatchInner, ParallelHashJoinBatchOuter, ParallelWorkerNumber, ParallelHashJoinBatchAccessor::preallocated, ParallelHashJoinBatchAccessor::shared, HashJoinTableData::spillCxt, and sts_attach().

Referenced by ExecParallelHashIncreaseNumBatches(), ExecParallelHashIncreaseNumBuckets(), and MultiExecParallelHash().

◆ ExecParallelHashFirstTuple()

static HashJoinTuple ExecParallelHashFirstTuple ( HashJoinTable  hashtable,
int  bucketno 
)
inlinestatic

Definition at line 3416 of file nodeHash.c.

3417 {
3418  HashJoinTuple tuple;
3419  dsa_pointer p;
3420 
3421  Assert(hashtable->parallel_state);
3422  p = dsa_pointer_atomic_read(&hashtable->buckets.shared[bucketno]);
3423  tuple = (HashJoinTuple) dsa_get_address(hashtable->area, p);
3424 
3425  return tuple;
3426 }
#define dsa_pointer_atomic_read
Definition: dsa.h:65
dsa_pointer_atomic * shared
Definition: hashjoin.h:313

References HashJoinTableData::area, Assert(), HashJoinTableData::buckets, dsa_get_address(), dsa_pointer_atomic_read, HashJoinTableData::parallel_state, and HashJoinTableData::shared.

Referenced by ExecParallelScanHashBucket(), and ExecParallelScanHashTableForUnmatched().

◆ ExecParallelHashIncreaseNumBatches()

static void ExecParallelHashIncreaseNumBatches ( HashJoinTable  hashtable)
static

Definition at line 1081 of file nodeHash.c.

1082 {
1083  ParallelHashJoinState *pstate = hashtable->parallel_state;
1084 
1086 
1087  /*
1088  * It's unlikely, but we need to be prepared for new participants to show
1089  * up while we're in the middle of this operation so we need to switch on
1090  * barrier phase here.
1091  */
1093  {
1095 
1096  /*
1097  * Elect one participant to prepare to grow the number of batches.
1098  * This involves reallocating or resetting the buckets of batch 0
1099  * in preparation for all participants to begin repartitioning the
1100  * tuples.
1101  */
1103  WAIT_EVENT_HASH_GROW_BATCHES_ELECT))
1104  {
1105  dsa_pointer_atomic *buckets;
1106  ParallelHashJoinBatch *old_batch0;
1107  int new_nbatch;
1108  int i;
1109 
1110  /* Move the old batch out of the way. */
1111  old_batch0 = hashtable->batches[0].shared;
1112  pstate->old_batches = pstate->batches;
1113  pstate->old_nbatch = hashtable->nbatch;
1114  pstate->batches = InvalidDsaPointer;
1115 
1116  /* Free this backend's old accessors. */
1118 
1119  /* Figure out how many batches to use. */
1120  if (hashtable->nbatch == 1)
1121  {
1122  /*
1123  * We are going from single-batch to multi-batch. We need
1124  * to switch from one large combined memory budget to the
1125  * regular hash_mem budget.
1126  */
1128 
1129  /*
1130  * The combined hash_mem of all participants wasn't
1131  * enough. Therefore one batch per participant would be
1132  * approximately equivalent and would probably also be
1133  * insufficient. So try two batches per participant,
1134  * rounded up to a power of two.
1135  */
1136  new_nbatch = pg_nextpower2_32(pstate->nparticipants * 2);
1137  }
1138  else
1139  {
1140  /*
1141  * We were already multi-batched. Try doubling the number
1142  * of batches.
1143  */
1144  new_nbatch = hashtable->nbatch * 2;
1145  }
1146 
1147  /* Allocate new larger generation of batches. */
1148  Assert(hashtable->nbatch == pstate->nbatch);
1149  ExecParallelHashJoinSetUpBatches(hashtable, new_nbatch);
1150  Assert(hashtable->nbatch == pstate->nbatch);
1151 
1152  /* Replace or recycle batch 0's bucket array. */
1153  if (pstate->old_nbatch == 1)
1154  {
1155  double dtuples;
1156  double dbuckets;
1157  int new_nbuckets;
1158 
1159  /*
1160  * We probably also need a smaller bucket array. How many
1161  * tuples do we expect per batch, assuming we have only
1162  * half of them so far? Normally we don't need to change
1163  * the bucket array's size, because the size of each batch
1164  * stays the same as we add more batches, but in this
1165  * special case we move from a large batch to many smaller
1166  * batches and it would be wasteful to keep the large
1167  * array.
1168  */
1169  dtuples = (old_batch0->ntuples * 2.0) / new_nbatch;
1170  dbuckets = ceil(dtuples / NTUP_PER_BUCKET);
1171  dbuckets = Min(dbuckets,
1172  MaxAllocSize / sizeof(dsa_pointer_atomic));
1173  new_nbuckets = (int) dbuckets;
1174  new_nbuckets = Max(new_nbuckets, 1024);
1175  new_nbuckets = pg_nextpower2_32(new_nbuckets);
1176  dsa_free(hashtable->area, old_batch0->buckets);
1177  hashtable->batches[0].shared->buckets =
1178  dsa_allocate(hashtable->area,
1179  sizeof(dsa_pointer_atomic) * new_nbuckets);
1180  buckets = (dsa_pointer_atomic *)
1181  dsa_get_address(hashtable->area,
1182  hashtable->batches[0].shared->buckets);
1183  for (i = 0; i < new_nbuckets; ++i)
1185  pstate->nbuckets = new_nbuckets;
1186  }
1187  else
1188  {
1189  /* Recycle the existing bucket array. */
1190  hashtable->batches[0].shared->buckets = old_batch0->buckets;
1191  buckets = (dsa_pointer_atomic *)
1192  dsa_get_address(hashtable->area, old_batch0->buckets);
1193  for (i = 0; i < hashtable->nbuckets; ++i)
1195  }
1196 
1197  /* Move all chunks to the work queue for parallel processing. */
1198  pstate->chunk_work_queue = old_batch0->chunks;
1199 
1200  /* Disable further growth temporarily while we're growing. */
1201  pstate->growth = PHJ_GROWTH_DISABLED;
1202  }
1203  else
1204  {
1205  /* All other participants just flush their tuples to disk. */
1207  }
1208  /* Fall through. */
1209 
1211  /* Wait for the above to be finished. */
1213  WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE);
1214  /* Fall through. */
1215 
1217  /* Make sure that we have the current dimensions and buckets. */
1220  /* Then partition, flush counters. */
1223  ExecParallelHashMergeCounters(hashtable);
1224  /* Wait for the above to be finished. */
1226  WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION);
1227  /* Fall through. */
1228 
1230 
1231  /*
1232  * Elect one participant to clean up and decide whether further
1233  * repartitioning is needed, or should be disabled because it's
1234  * not helping.
1235  */
1237  WAIT_EVENT_HASH_GROW_BATCHES_DECIDE))
1238  {
1239  bool space_exhausted = false;
1240  bool extreme_skew_detected = false;
1241 
1242  /* Make sure that we have the current dimensions and buckets. */
1245 
1246  /* Are any of the new generation of batches exhausted? */
1247  for (int i = 0; i < hashtable->nbatch; ++i)
1248  {
1249  ParallelHashJoinBatch *batch = hashtable->batches[i].shared;
1250 
1251  if (batch->space_exhausted ||
1252  batch->estimated_size > pstate->space_allowed)
1253  {
1254  int parent;
1255 
1256  space_exhausted = true;
1257 
1258  /*
1259  * Did this batch receive ALL of the tuples from its
1260  * parent batch? That would indicate that further
1261  * repartitioning isn't going to help (the hash values
1262  * are probably all the same).
1263  */
1264  parent = i % pstate->old_nbatch;
1265  if (batch->ntuples == hashtable->batches[parent].shared->old_ntuples)
1266  extreme_skew_detected = true;
1267  }
1268  }
1269 
1270  /* Don't keep growing if it's not helping or we'd overflow. */
1271  if (extreme_skew_detected || hashtable->nbatch >= INT_MAX / 2)
1272  pstate->growth = PHJ_GROWTH_DISABLED;
1273  else if (space_exhausted)
1275  else
1276  pstate->growth = PHJ_GROWTH_OK;
1277 
1278  /* Free the old batches in shared memory. */
1279  dsa_free(hashtable->area, pstate->old_batches);
1280  pstate->old_batches = InvalidDsaPointer;
1281  }
1282  /* Fall through. */
1283 
1285  /* Wait for the above to complete. */
1287  WAIT_EVENT_HASH_GROW_BATCHES_FINISH);
1288  }
1289 }
#define dsa_pointer_atomic_init
Definition: dsa.h:64
#define dsa_allocate(area, size)
Definition: dsa.h:84
#define dsa_pointer_atomic_write
Definition: dsa.h:66
#define PHJ_GROW_BATCHES_REPARTITION
Definition: hashjoin.h:287
#define PHJ_GROW_BATCHES_ELECT
Definition: hashjoin.h:285
#define PHJ_BUILD_HASH_INNER
Definition: hashjoin.h:271
#define PHJ_GROW_BATCHES_DECIDE
Definition: hashjoin.h:288
#define PHJ_GROW_BATCHES_REALLOCATE
Definition: hashjoin.h:286
#define PHJ_GROW_BATCHES_FINISH
Definition: hashjoin.h:289
#define PHJ_GROW_BATCHES_PHASE(n)
Definition: hashjoin.h:290
@ PHJ_GROWTH_NEED_MORE_BATCHES
Definition: hashjoin.h:237
@ PHJ_GROWTH_DISABLED
Definition: hashjoin.h:239
void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
Definition: nodeHash.c:3464
static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
Definition: nodeHash.c:3190
static void ExecParallelHashRepartitionRest(HashJoinTable hashtable)
Definition: nodeHash.c:1364
static void ExecParallelHashMergeCounters(HashJoinTable hashtable)
Definition: nodeHash.c:1424
static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable)
Definition: nodeHash.c:1297
Barrier grow_batches_barrier
Definition: hashjoin.h:261
dsa_pointer old_batches
Definition: hashjoin.h:249
dsa_pointer chunk_work_queue
Definition: hashjoin.h:254

References HashJoinTableData::area, Assert(), BarrierArriveAndWait(), BarrierPhase(), ParallelHashJoinState::batches, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, ParallelHashJoinState::build_barrier, ParallelHashJoinState::chunk_work_queue, ParallelHashJoinBatch::chunks, dsa_allocate, dsa_free(), dsa_get_address(), dsa_pointer_atomic_init, dsa_pointer_atomic_write, ParallelHashJoinBatch::estimated_size, ExecParallelHashCloseBatchAccessors(), ExecParallelHashEnsureBatchAccessors(), ExecParallelHashJoinSetUpBatches(), ExecParallelHashMergeCounters(), ExecParallelHashRepartitionFirst(), ExecParallelHashRepartitionRest(), ExecParallelHashTableSetCurrentBatch(), get_hash_memory_limit(), ParallelHashJoinState::grow_batches_barrier, ParallelHashJoinState::growth, i, InvalidDsaPointer, Max, MaxAllocSize, Min, ParallelHashJoinState::nbatch, HashJoinTableData::nbatch, ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, ParallelHashJoinState::nparticipants, NTUP_PER_BUCKET, ParallelHashJoinBatch::ntuples, ParallelHashJoinState::old_batches, ParallelHashJoinState::old_nbatch, ParallelHashJoinBatch::old_ntuples, HashJoinTableData::parallel_state, pg_nextpower2_32(), PHJ_BUILD_HASH_INNER, PHJ_GROW_BATCHES_DECIDE, PHJ_GROW_BATCHES_ELECT, PHJ_GROW_BATCHES_FINISH, PHJ_GROW_BATCHES_PHASE, PHJ_GROW_BATCHES_REALLOCATE, PHJ_GROW_BATCHES_REPARTITION, PHJ_GROWTH_DISABLED, PHJ_GROWTH_NEED_MORE_BATCHES, PHJ_GROWTH_OK, ParallelHashJoinBatchAccessor::shared, ParallelHashJoinState::space_allowed, and ParallelHashJoinBatch::space_exhausted.

Referenced by ExecParallelHashTupleAlloc(), ExecParallelHashTuplePrealloc(), and MultiExecParallelHash().

◆ ExecParallelHashIncreaseNumBuckets()

static void ExecParallelHashIncreaseNumBuckets ( HashJoinTable  hashtable)
static

Definition at line 1517 of file nodeHash.c.

1518 {
1519  ParallelHashJoinState *pstate = hashtable->parallel_state;
1520  int i;
1521  HashMemoryChunk chunk;
1522  dsa_pointer chunk_s;
1523 
1525 
1526  /*
1527  * It's unlikely, but we need to be prepared for new participants to show
1528  * up while we're in the middle of this operation so we need to switch on
1529  * barrier phase here.
1530  */
1532  {
1534  /* Elect one participant to prepare to increase nbuckets. */
1536  WAIT_EVENT_HASH_GROW_BUCKETS_ELECT))
1537  {
1538  size_t size;
1539  dsa_pointer_atomic *buckets;
1540 
1541  /* Double the size of the bucket array. */
1542  pstate->nbuckets *= 2;
1543  size = pstate->nbuckets * sizeof(dsa_pointer_atomic);
1544  hashtable->batches[0].shared->size += size / 2;
1545  dsa_free(hashtable->area, hashtable->batches[0].shared->buckets);
1546  hashtable->batches[0].shared->buckets =
1547  dsa_allocate(hashtable->area, size);
1548  buckets = (dsa_pointer_atomic *)
1549  dsa_get_address(hashtable->area,
1550  hashtable->batches[0].shared->buckets);
1551  for (i = 0; i < pstate->nbuckets; ++i)
1553 
1554  /* Put the chunk list onto the work queue. */
1555  pstate->chunk_work_queue = hashtable->batches[0].shared->chunks;
1556 
1557  /* Clear the flag. */
1558  pstate->growth = PHJ_GROWTH_OK;
1559  }
1560  /* Fall through. */
1561 
1563  /* Wait for the above to complete. */
1565  WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE);
1566  /* Fall through. */
1567 
1569  /* Reinsert all tuples into the hash table. */
1572  while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_s)))
1573  {
1574  size_t idx = 0;
1575 
1576  while (idx < chunk->used)
1577  {
1578  HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + idx);
1579  dsa_pointer shared = chunk_s + HASH_CHUNK_HEADER_SIZE + idx;
1580  int bucketno;
1581  int batchno;
1582 
1583  ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
1584  &bucketno, &batchno);
1585  Assert(batchno == 0);
1586 
1587  /* add the tuple to the proper bucket */
1588  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1589  hashTuple, shared);
1590 
1591  /* advance index past the tuple */
1593  HJTUPLE_MINTUPLE(hashTuple)->t_len);
1594  }
1595 
1596  /* allow this loop to be cancellable */
1598  }
1600  WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT);
1601  }
1602 }
pg_atomic_uint64 dsa_pointer_atomic
Definition: dsa.h:63
#define PHJ_GROW_BUCKETS_REINSERT
Definition: hashjoin.h:295
#define PHJ_GROW_BUCKETS_ELECT
Definition: hashjoin.h:293
#define PHJ_GROW_BUCKETS_PHASE(n)
Definition: hashjoin.h:296
#define PHJ_GROW_BUCKETS_REALLOCATE
Definition: hashjoin.h:294
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
Definition: nodeHash.c:3446
static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared)
Definition: nodeHash.c:3485
Barrier grow_buckets_barrier
Definition: hashjoin.h:262

References HashJoinTableData::area, Assert(), BarrierArriveAndWait(), BarrierPhase(), HashJoinTableData::batches, ParallelHashJoinBatch::buckets, HashJoinTableData::buckets, ParallelHashJoinState::build_barrier, CHECK_FOR_INTERRUPTS, ParallelHashJoinState::chunk_work_queue, ParallelHashJoinBatch::chunks, dsa_allocate, dsa_free(), dsa_get_address(), dsa_pointer_atomic_init, ExecHashGetBucketAndBatch(), ExecParallelHashEnsureBatchAccessors(), ExecParallelHashPopChunkQueue(), ExecParallelHashPushTuple(), ExecParallelHashTableSetCurrentBatch(), ParallelHashJoinState::grow_buckets_barrier, ParallelHashJoinState::growth, HASH_CHUNK_DATA, HASH_CHUNK_HEADER_SIZE, HashJoinTupleData::hashvalue, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, i, idx(), InvalidDsaPointer, MAXALIGN, ParallelHashJoinState::nbuckets, HashJoinTableData::parallel_state, PHJ_BUILD_HASH_INNER, PHJ_GROW_BUCKETS_ELECT, PHJ_GROW_BUCKETS_PHASE, PHJ_GROW_BUCKETS_REALLOCATE, PHJ_GROW_BUCKETS_REINSERT, PHJ_GROWTH_OK, ParallelHashJoinBatchAccessor::shared, HashJoinTableData::shared, and ParallelHashJoinBatch::size.

Referenced by ExecParallelHashTupleAlloc(), ExecParallelHashTuplePrealloc(), and MultiExecParallelHash().

◆ ExecParallelHashJoinSetUpBatches()

static void ExecParallelHashJoinSetUpBatches ( HashJoinTable  hashtable,
int  nbatch 
)
static

Definition at line 3089 of file nodeHash.c.

3090 {
3091  ParallelHashJoinState *pstate = hashtable->parallel_state;
3092  ParallelHashJoinBatch *batches;
3093  MemoryContext oldcxt;
3094  int i;
3095 
3096  Assert(hashtable->batches == NULL);
3097 
3098  /* Allocate space. */
3099  pstate->batches =
3100  dsa_allocate0(hashtable->area,
3101  EstimateParallelHashJoinBatch(hashtable) * nbatch);
3102  pstate->nbatch = nbatch;
3103  batches = dsa_get_address(hashtable->area, pstate->batches);
3104 
3105  /*
3106  * Use hash join spill memory context to allocate accessors, including
3107  * buffers for the temporary files.
3108  */
3109  oldcxt = MemoryContextSwitchTo(hashtable->spillCxt);
3110 
3111  /* Allocate this backend's accessor array. */
3112  hashtable->nbatch = nbatch;
3113  hashtable->batches =
3115 
3116  /* Set up the shared state, tuplestores and backend-local accessors. */
3117  for (i = 0; i < hashtable->nbatch; ++i)
3118  {
3119  ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i];
3120  ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i);
3121  char name[MAXPGPATH];
3122 
3123  /*
3124  * All members of shared were zero-initialized. We just need to set
3125  * up the Barrier.
3126  */
3127  BarrierInit(&shared->batch_barrier, 0);
3128  if (i == 0)
3129  {
3130  /* Batch 0 doesn't need to be loaded. */
3131  BarrierAttach(&shared->batch_barrier);
3132  while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBE)
3133  BarrierArriveAndWait(&shared->batch_barrier, 0);
3134  BarrierDetach(&shared->batch_barrier);
3135  }
3136 
3137  /* Initialize accessor state. All members were zero-initialized. */
3138  accessor->shared = shared;
3139 
3140  /* Initialize the shared tuplestores. */
3141  snprintf(name, sizeof(name), "i%dof%d", i, hashtable->nbatch);
3142  accessor->inner_tuples =
3144  pstate->nparticipants,
3146  sizeof(uint32),
3148  &pstate->fileset,
3149  name);
3150  snprintf(name, sizeof(name), "o%dof%d", i, hashtable->nbatch);
3151  accessor->outer_tuples =
3153  pstate->nparticipants),
3154  pstate->nparticipants,
3156  sizeof(uint32),
3158  &pstate->fileset,
3159  name);
3160  }
3161 
3162  MemoryContextSwitchTo(oldcxt);
3163 }
void BarrierInit(Barrier *barrier, int participants)
Definition: barrier.c:100
bool BarrierDetach(Barrier *barrier)
Definition: barrier.c:256
#define dsa_allocate0(area, size)
Definition: dsa.h:88
#define EstimateParallelHashJoinBatch(hashtable)
Definition: hashjoin.h:193
#define MAXPGPATH
#define snprintf
Definition: port.h:238
SharedTuplestoreAccessor * sts_initialize(SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
#define SHARED_TUPLESTORE_SINGLE_PASS
const char * name

References HashJoinTableData::area, Assert(), BarrierArriveAndWait(), BarrierAttach(), BarrierDetach(), BarrierInit(), BarrierPhase(), ParallelHashJoinBatch::batch_barrier, ParallelHashJoinState::batches, HashJoinTableData::batches, dsa_allocate0, dsa_get_address(), EstimateParallelHashJoinBatch, ParallelHashJoinState::fileset, i, ParallelHashJoinBatchAccessor::inner_tuples, MAXPGPATH, MemoryContextSwitchTo(), name, ParallelHashJoinState::nbatch, HashJoinTableData::nbatch, ParallelHashJoinState::nparticipants, NthParallelHashJoinBatch, ParallelHashJoinBatchAccessor::outer_tuples, palloc0_array, HashJoinTableData::parallel_state, ParallelHashJoinBatchInner, ParallelHashJoinBatchOuter, ParallelWorkerNumber, PHJ_BATCH_PROBE, ParallelHashJoinBatchAccessor::shared, SHARED_TUPLESTORE_SINGLE_PASS, snprintf, HashJoinTableData::spillCxt, and sts_initialize().

Referenced by ExecHashTableCreate(), and ExecParallelHashIncreaseNumBatches().

◆ ExecParallelHashMergeCounters()

static void ExecParallelHashMergeCounters ( HashJoinTable  hashtable)
static

Definition at line 1424 of file nodeHash.c.

1425 {
1426  ParallelHashJoinState *pstate = hashtable->parallel_state;
1427  int i;
1428 
1429  LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
1430  pstate->total_tuples = 0;
1431  for (i = 0; i < hashtable->nbatch; ++i)
1432  {
1433  ParallelHashJoinBatchAccessor *batch = &hashtable->batches[i];
1434 
1435  batch->shared->size += batch->size;
1436  batch->shared->estimated_size += batch->estimated_size;
1437  batch->shared->ntuples += batch->ntuples;
1438  batch->shared->old_ntuples += batch->old_ntuples;
1439  batch->size = 0;
1440  batch->estimated_size = 0;
1441  batch->ntuples = 0;
1442  batch->old_ntuples = 0;
1443  pstate->total_tuples += batch->shared->ntuples;
1444  }
1445  LWLockRelease(&pstate->lock);
1446 }
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1808
@ LW_EXCLUSIVE
Definition: lwlock.h:116

References HashJoinTableData::batches, ParallelHashJoinBatch::estimated_size, ParallelHashJoinBatchAccessor::estimated_size, i, ParallelHashJoinState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), HashJoinTableData::nbatch, ParallelHashJoinBatch::ntuples, ParallelHashJoinBatchAccessor::ntuples, ParallelHashJoinBatch::old_ntuples, ParallelHashJoinBatchAccessor::old_ntuples, HashJoinTableData::parallel_state, ParallelHashJoinBatchAccessor::shared, ParallelHashJoinBatch::size, ParallelHashJoinBatchAccessor::size, and ParallelHashJoinState::total_tuples.

Referenced by ExecParallelHashIncreaseNumBatches(), and MultiExecParallelHash().

◆ ExecParallelHashNextTuple()

static HashJoinTuple ExecParallelHashNextTuple ( HashJoinTable  hashtable,
HashJoinTuple  tuple 
)
inlinestatic

Definition at line 3432 of file nodeHash.c.

3433 {
3435 
3436  Assert(hashtable->parallel_state);
3437  next = (HashJoinTuple) dsa_get_address(hashtable->area, tuple->next.shared);
3438 
3439  return next;
3440 }
dsa_pointer shared
Definition: hashjoin.h:84

References HashJoinTableData::area, Assert(), dsa_get_address(), next, HashJoinTupleData::next, HashJoinTableData::parallel_state, and HashJoinTupleData::shared.

Referenced by ExecParallelScanHashBucket(), and ExecParallelScanHashTableForUnmatched().

◆ ExecParallelHashPopChunkQueue()

static HashMemoryChunk ExecParallelHashPopChunkQueue ( HashJoinTable  hashtable,
dsa_pointer shared 
)
static

Definition at line 3485 of file nodeHash.c.

3486 {
3487  ParallelHashJoinState *pstate = hashtable->parallel_state;
3488  HashMemoryChunk chunk;
3489 
3490  LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
3491  if (DsaPointerIsValid(pstate->chunk_work_queue))
3492  {
3493  *shared = pstate->chunk_work_queue;
3494  chunk = (HashMemoryChunk)
3495  dsa_get_address(hashtable->area, *shared);
3496  pstate->chunk_work_queue = chunk->next.shared;
3497  }
3498  else
3499  chunk = NULL;
3500  LWLockRelease(&pstate->lock);
3501 
3502  return chunk;
3503 }

References HashJoinTableData::area, ParallelHashJoinState::chunk_work_queue, dsa_get_address(), DsaPointerIsValid, ParallelHashJoinState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), HashMemoryChunkData::next, HashJoinTableData::parallel_state, and HashMemoryChunkData::shared.

Referenced by ExecParallelHashIncreaseNumBuckets(), and ExecParallelHashRepartitionFirst().

◆ ExecParallelHashPushTuple()

static void ExecParallelHashPushTuple ( dsa_pointer_atomic head,
HashJoinTuple  tuple,
dsa_pointer  tuple_shared 
)
inlinestatic

Definition at line 3446 of file nodeHash.c.

3449 {
3450  for (;;)
3451  {
3452  tuple->next.shared = dsa_pointer_atomic_read(head);
3454  &tuple->next.shared,
3455  tuple_shared))
3456  break;
3457  }
3458 }
#define dsa_pointer_atomic_compare_exchange
Definition: dsa.h:68

References dsa_pointer_atomic_compare_exchange, dsa_pointer_atomic_read, HashJoinTupleData::next, and HashJoinTupleData::shared.

Referenced by ExecParallelHashIncreaseNumBuckets(), ExecParallelHashRepartitionFirst(), ExecParallelHashTableInsert(), and ExecParallelHashTableInsertCurrentBatch().

◆ ExecParallelHashRepartitionFirst()

static void ExecParallelHashRepartitionFirst ( HashJoinTable  hashtable)
static

Definition at line 1297 of file nodeHash.c.

1298 {
1299  dsa_pointer chunk_shared;
1300  HashMemoryChunk chunk;
1301 
1302  Assert(hashtable->nbatch == hashtable->parallel_state->nbatch);
1303 
1304  while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_shared)))
1305  {
1306  size_t idx = 0;
1307 
1308  /* Repartition all tuples in this chunk. */
1309  while (idx < chunk->used)
1310  {
1311  HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + idx);
1312  MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
1313  HashJoinTuple copyTuple;
1314  dsa_pointer shared;
1315  int bucketno;
1316  int batchno;
1317 
1318  ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
1319  &bucketno, &batchno);
1320 
1321  Assert(batchno < hashtable->nbatch);
1322  if (batchno == 0)
1323  {
1324  /* It still belongs in batch 0. Copy to a new chunk. */
1325  copyTuple =
1326  ExecParallelHashTupleAlloc(hashtable,
1327  HJTUPLE_OVERHEAD + tuple->t_len,
1328  &shared);
1329  copyTuple->hashvalue = hashTuple->hashvalue;
1330  memcpy(HJTUPLE_MINTUPLE(copyTuple), tuple, tuple->t_len);
1331  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1332  copyTuple, shared);
1333  }
1334  else
1335  {
1336  size_t tuple_size =
1337  MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
1338 
1339  /* It belongs in a later batch. */
1340  hashtable->batches[batchno].estimated_size += tuple_size;
1341  sts_puttuple(hashtable->batches[batchno].inner_tuples,
1342  &hashTuple->hashvalue, tuple);
1343  }
1344 
1345  /* Count this tuple. */
1346  ++hashtable->batches[0].old_ntuples;
1347  ++hashtable->batches[batchno].ntuples;
1348 
1350  HJTUPLE_MINTUPLE(hashTuple)->t_len);
1351  }
1352 
1353  /* Free this chunk. */
1354  dsa_free(hashtable->area, chunk_shared);
1355 
1357  }
1358 }
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
Definition: nodeHash.c:2941
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)

References HashJoinTableData::area, Assert(), HashJoinTableData::batches, HashJoinTableData::buckets, CHECK_FOR_INTERRUPTS, dsa_free(), ParallelHashJoinBatchAccessor::estimated_size, ExecHashGetBucketAndBatch(), ExecParallelHashPopChunkQueue(), ExecParallelHashPushTuple(), ExecParallelHashTupleAlloc(), HASH_CHUNK_DATA, HashJoinTupleData::hashvalue, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, idx(), ParallelHashJoinBatchAccessor::inner_tuples, MAXALIGN, ParallelHashJoinState::nbatch, HashJoinTableData::nbatch, ParallelHashJoinBatchAccessor::ntuples, ParallelHashJoinBatchAccessor::old_ntuples, HashJoinTableData::parallel_state, HashJoinTableData::shared, sts_puttuple(), and MinimalTupleData::t_len.

Referenced by ExecParallelHashIncreaseNumBatches().

◆ ExecParallelHashRepartitionRest()

static void ExecParallelHashRepartitionRest ( HashJoinTable  hashtable)
static

Definition at line 1364 of file nodeHash.c.

1365 {
1366  ParallelHashJoinState *pstate = hashtable->parallel_state;
1367  int old_nbatch = pstate->old_nbatch;
1368  SharedTuplestoreAccessor **old_inner_tuples;
1369  ParallelHashJoinBatch *old_batches;
1370  int i;
1371 
1372  /* Get our hands on the previous generation of batches. */
1373  old_batches = (ParallelHashJoinBatch *)
1374  dsa_get_address(hashtable->area, pstate->old_batches);
1375  old_inner_tuples = palloc0_array(SharedTuplestoreAccessor *, old_nbatch);
1376  for (i = 1; i < old_nbatch; ++i)
1377  {
1378  ParallelHashJoinBatch *shared =
1379  NthParallelHashJoinBatch(old_batches, i);
1380 
1381  old_inner_tuples[i] = sts_attach(ParallelHashJoinBatchInner(shared),
1383  &pstate->fileset);
1384  }
1385 
1386  /* Join in the effort to repartition them. */
1387  for (i = 1; i < old_nbatch; ++i)
1388  {
1389  MinimalTuple tuple;
1390  uint32 hashvalue;
1391 
1392  /* Scan one partition from the previous generation. */
1393  sts_begin_parallel_scan(old_inner_tuples[i]);
1394  while ((tuple = sts_parallel_scan_next(old_inner_tuples[i], &hashvalue)))
1395  {
1396  size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
1397  int bucketno;
1398  int batchno;
1399 
1400  /* Decide which partition it goes to in the new generation. */
1401  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
1402  &batchno);
1403 
1404  hashtable->batches[batchno].estimated_size += tuple_size;
1405  ++hashtable->batches[batchno].ntuples;
1406  ++hashtable->batches[i].old_ntuples;
1407 
1408  /* Store the tuple its new batch. */
1409  sts_puttuple(hashtable->batches[batchno].inner_tuples,
1410  &hashvalue, tuple);
1411 
1413  }
1414  sts_end_parallel_scan(old_inner_tuples[i]);
1415  }
1416 
1417  pfree(old_inner_tuples);
1418 }
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)

References HashJoinTableData::area, HashJoinTableData::batches, CHECK_FOR_INTERRUPTS, dsa_get_address(), ParallelHashJoinBatchAccessor::estimated_size, ExecHashGetBucketAndBatch(), ParallelHashJoinState::fileset, HJTUPLE_OVERHEAD, i, ParallelHashJoinBatchAccessor::inner_tuples, MAXALIGN, NthParallelHashJoinBatch, ParallelHashJoinBatchAccessor::ntuples, ParallelHashJoinState::old_batches, ParallelHashJoinState::old_nbatch, ParallelHashJoinBatchAccessor::old_ntuples, palloc0_array, HashJoinTableData::parallel_state, ParallelHashJoinBatchInner, ParallelWorkerNumber, pfree(), sts_attach(), sts_begin_parallel_scan(), sts_end_parallel_scan(), sts_parallel_scan_next(), sts_puttuple(), and MinimalTupleData::t_len.

Referenced by ExecParallelHashIncreaseNumBatches().

◆ ExecParallelHashTableAlloc()

void ExecParallelHashTableAlloc ( HashJoinTable  hashtable,
int  batchno 
)

Definition at line 3254 of file nodeHash.c.

3255 {
3256  ParallelHashJoinBatch *batch = hashtable->batches[batchno].shared;
3257  dsa_pointer_atomic *buckets;
3258  int nbuckets = hashtable->parallel_state->nbuckets;
3259  int i;
3260 
3261  batch->buckets =
3262  dsa_allocate(hashtable->area, sizeof(dsa_pointer_atomic) * nbuckets);
3263  buckets = (dsa_pointer_atomic *)
3264  dsa_get_address(hashtable->area, batch->buckets);
3265  for (i = 0; i < nbuckets; ++i)
3267 }

References HashJoinTableData::area, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, dsa_allocate, dsa_get_address(), dsa_pointer_atomic_init, i, InvalidDsaPointer, ParallelHashJoinState::nbuckets, HashJoinTableData::parallel_state, and ParallelHashJoinBatchAccessor::shared.

Referenced by ExecHashTableCreate(), and ExecParallelHashJoinNewBatch().

◆ ExecParallelHashTableInsert()

void ExecParallelHashTableInsert ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1706 of file nodeHash.c.

1709 {
1710  bool shouldFree;
1711  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1712  dsa_pointer shared;
1713  int bucketno;
1714  int batchno;
1715 
1716 retry:
1717  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1718 
1719  if (batchno == 0)
1720  {
1721  HashJoinTuple hashTuple;
1722 
1723  /* Try to load it into memory. */
1726  hashTuple = ExecParallelHashTupleAlloc(hashtable,
1727  HJTUPLE_OVERHEAD + tuple->t_len,
1728  &shared);
1729  if (hashTuple == NULL)
1730  goto retry;
1731 
1732  /* Store the hash value in the HashJoinTuple header. */
1733  hashTuple->hashvalue = hashvalue;
1734  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1736 
1737  /* Push it onto the front of the bucket's list */
1738  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1739  hashTuple, shared);
1740  }
1741  else
1742  {
1743  size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
1744 
1745  Assert(batchno > 0);
1746 
1747  /* Try to preallocate space in the batch if necessary. */
1748  if (hashtable->batches[batchno].preallocated < tuple_size)
1749  {
1750  if (!ExecParallelHashTuplePrealloc(hashtable, batchno, tuple_size))
1751  goto retry;
1752  }
1753 
1754  Assert(hashtable->batches[batchno].preallocated >= tuple_size);
1755  hashtable->batches[batchno].preallocated -= tuple_size;
1756  sts_puttuple(hashtable->batches[batchno].inner_tuples, &hashvalue,
1757  tuple);
1758  }
1759  ++hashtable->batches[batchno].ntuples;
1760 
1761  if (shouldFree)
1762  heap_free_minimal_tuple(tuple);
1763 }
static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
Definition: nodeHash.c:3526

References Assert(), BarrierPhase(), HashJoinTableData::batches, HashJoinTableData::buckets, ParallelHashJoinState::build_barrier, ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecParallelHashPushTuple(), ExecParallelHashTupleAlloc(), ExecParallelHashTuplePrealloc(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, ParallelHashJoinBatchAccessor::inner_tuples, MAXALIGN, ParallelHashJoinBatchAccessor::ntuples, HashJoinTableData::parallel_state, PHJ_BUILD_HASH_INNER, ParallelHashJoinBatchAccessor::preallocated, HashJoinTableData::shared, sts_puttuple(), and MinimalTupleData::t_len.

Referenced by MultiExecParallelHash().

◆ ExecParallelHashTableInsertCurrentBatch()

void ExecParallelHashTableInsertCurrentBatch ( HashJoinTable  hashtable,
TupleTableSlot slot,
uint32  hashvalue 
)

Definition at line 1772 of file nodeHash.c.

1775 {
1776  bool shouldFree;
1777  MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1778  HashJoinTuple hashTuple;
1779  dsa_pointer shared;
1780  int batchno;
1781  int bucketno;
1782 
1783  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1784  Assert(batchno == hashtable->curbatch);
1785  hashTuple = ExecParallelHashTupleAlloc(hashtable,
1786  HJTUPLE_OVERHEAD + tuple->t_len,
1787  &shared);
1788  hashTuple->hashvalue = hashvalue;
1789  memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1791  ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1792  hashTuple, shared);
1793 
1794  if (shouldFree)
1795  heap_free_minimal_tuple(tuple);
1796 }

References Assert(), HashJoinTableData::buckets, HashJoinTableData::curbatch, ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecParallelHashPushTuple(), ExecParallelHashTupleAlloc(), HashJoinTupleData::hashvalue, heap_free_minimal_tuple(), HeapTupleHeaderClearMatch, HJTUPLE_MINTUPLE, HJTUPLE_OVERHEAD, HashJoinTableData::shared, and MinimalTupleData::t_len.

Referenced by ExecParallelHashJoinNewBatch().

◆ ExecParallelHashTableSetCurrentBatch()

void ExecParallelHashTableSetCurrentBatch ( HashJoinTable  hashtable,
int  batchno 
)

Definition at line 3464 of file nodeHash.c.

3465 {
3466  Assert(hashtable->batches[batchno].shared->buckets != InvalidDsaPointer);
3467 
3468  hashtable->curbatch = batchno;
3469  hashtable->buckets.shared = (dsa_pointer_atomic *)
3470  dsa_get_address(hashtable->area,
3471  hashtable->batches[batchno].shared->buckets);
3472  hashtable->nbuckets = hashtable->parallel_state->nbuckets;
3473  hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
3474  hashtable->current_chunk = NULL;
3476  hashtable->batches[batchno].at_least_one_chunk = false;
3477 }
dsa_pointer current_chunk_shared
Definition: hashjoin.h:374

References HashJoinTableData::area, Assert(), ParallelHashJoinBatchAccessor::at_least_one_chunk, HashJoinTableData::batches, ParallelHashJoinBatch::buckets, HashJoinTableData::buckets, HashJoinTableData::curbatch, HashJoinTableData::current_chunk, HashJoinTableData::current_chunk_shared, dsa_get_address(), InvalidDsaPointer, HashJoinTableData::log2_nbuckets, my_log2(), ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, HashJoinTableData::parallel_state, ParallelHashJoinBatchAccessor::shared, and HashJoinTableData::shared.

Referenced by ExecParallelHashIncreaseNumBatches(), ExecParallelHashIncreaseNumBuckets(), ExecParallelHashJoinNewBatch(), and MultiExecParallelHash().

◆ ExecParallelHashTupleAlloc()

static HashJoinTuple ExecParallelHashTupleAlloc ( HashJoinTable  hashtable,
size_t  size,
dsa_pointer shared 
)
static

Definition at line 2941 of file nodeHash.c.

2943 {
2944  ParallelHashJoinState *pstate = hashtable->parallel_state;
2945  dsa_pointer chunk_shared;
2946  HashMemoryChunk chunk;
2947  Size chunk_size;
2948  HashJoinTuple result;
2949  int curbatch = hashtable->curbatch;
2950 
2951  size = MAXALIGN(size);
2952 
2953  /*
2954  * Fast path: if there is enough space in this backend's current chunk,
2955  * then we can allocate without any locking.
2956  */
2957  chunk = hashtable->current_chunk;
2958  if (chunk != NULL &&
2959  size <= HASH_CHUNK_THRESHOLD &&
2960  chunk->maxlen - chunk->used >= size)
2961  {
2962 
2963  chunk_shared = hashtable->current_chunk_shared;
2964  Assert(chunk == dsa_get_address(hashtable->area, chunk_shared));
2965  *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE + chunk->used;
2966  result = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + chunk->used);
2967  chunk->used += size;
2968 
2969  Assert(chunk->used <= chunk->maxlen);
2970  Assert(result == dsa_get_address(hashtable->area, *shared));
2971 
2972  return result;
2973  }
2974 
2975  /* Slow path: try to allocate a new chunk. */
2976  LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
2977 
2978  /*
2979  * Check if we need to help increase the number of buckets or batches.
2980  */
2981  if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES ||
2983  {
2984  ParallelHashGrowth growth = pstate->growth;
2985 
2986  hashtable->current_chunk = NULL;
2987  LWLockRelease(&pstate->lock);
2988 
2989  /* Another participant has commanded us to help grow. */
2990  if (growth == PHJ_GROWTH_NEED_MORE_BATCHES)
2992  else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS)
2994 
2995  /* The caller must retry. */
2996  return NULL;
2997  }
2998 
2999  /* Oversized tuples get their own chunk. */
3000  if (size > HASH_CHUNK_THRESHOLD)
3001  chunk_size = size + HASH_CHUNK_HEADER_SIZE;
3002  else
3003  chunk_size = HASH_CHUNK_SIZE;
3004 
3005  /* Check if it's time to grow batches or buckets. */
3006  if (pstate->growth != PHJ_GROWTH_DISABLED)
3007  {
3008  Assert(curbatch == 0);
3010 
3011  /*
3012  * Check if our space limit would be exceeded. To avoid choking on
3013  * very large tuples or very low hash_mem setting, we'll always allow
3014  * each backend to allocate at least one chunk.
3015  */
3016  if (hashtable->batches[0].at_least_one_chunk &&
3017  hashtable->batches[0].shared->size +
3018  chunk_size > pstate->space_allowed)
3019  {
3021  hashtable->batches[0].shared->space_exhausted = true;
3022  LWLockRelease(&pstate->lock);
3023 
3024  return NULL;
3025  }
3026 
3027  /* Check if our load factor limit would be exceeded. */
3028  if (hashtable->nbatch == 1)
3029  {
3030  hashtable->batches[0].shared->ntuples += hashtable->batches[0].ntuples;
3031  hashtable->batches[0].ntuples = 0;
3032  /* Guard against integer overflow and alloc size overflow */
3033  if (hashtable->batches[0].shared->ntuples + 1 >
3034  hashtable->nbuckets * NTUP_PER_BUCKET &&
3035  hashtable->nbuckets < (INT_MAX / 2) &&
3036  hashtable->nbuckets * 2 <=
3037  MaxAllocSize / sizeof(dsa_pointer_atomic))
3038  {
3040  LWLockRelease(&pstate->lock);
3041 
3042  return NULL;
3043  }
3044  }
3045  }
3046 
3047  /* We are cleared to allocate a new chunk. */
3048  chunk_shared = dsa_allocate(hashtable->area, chunk_size);
3049  hashtable->batches[curbatch].shared->size += chunk_size;
3050  hashtable->batches[curbatch].at_least_one_chunk = true;
3051 
3052  /* Set up the chunk. */
3053  chunk = (HashMemoryChunk) dsa_get_address(hashtable->area, chunk_shared);
3054  *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE;
3055  chunk->maxlen = chunk_size - HASH_CHUNK_HEADER_SIZE;
3056  chunk->used = size;
3057 
3058  /*
3059  * Push it onto the list of chunks, so that it can be found if we need to
3060  * increase the number of buckets or batches (batch 0 only) and later for
3061  * freeing the memory (all batches).
3062  */
3063  chunk->next.shared = hashtable->batches[curbatch].shared->chunks;
3064  hashtable->batches[curbatch].shared->chunks = chunk_shared;
3065 
3066  if (size <= HASH_CHUNK_THRESHOLD)
3067  {
3068  /*
3069  * Make this the current chunk so that we can use the fast path to
3070  * fill the rest of it up in future calls.
3071  */
3072  hashtable->current_chunk = chunk;
3073  hashtable->current_chunk_shared = chunk_shared;
3074  }
3075  LWLockRelease(&pstate->lock);
3076 
3077  Assert(HASH_CHUNK_DATA(chunk) == dsa_get_address(hashtable->area, *shared));
3078  result = (HashJoinTuple) HASH_CHUNK_DATA(chunk);
3079 
3080  return result;
3081 }
ParallelHashGrowth
Definition: hashjoin.h:231
@ PHJ_GROWTH_NEED_MORE_BUCKETS
Definition: hashjoin.h:235
static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
Definition: nodeHash.c:1517
static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
Definition: nodeHash.c:1081

References HashJoinTableData::area, Assert(), ParallelHashJoinBatchAccessor::at_least_one_chunk, BarrierPhase(), HashJoinTableData::batches, ParallelHashJoinState::build_barrier, ParallelHashJoinBatch::chunks, HashJoinTableData::curbatch, HashJoinTableData::current_chunk, HashJoinTableData::current_chunk_shared, dsa_allocate, dsa_get_address(), ExecParallelHashIncreaseNumBatches(), ExecParallelHashIncreaseNumBuckets(), ParallelHashJoinState::growth, HASH_CHUNK_DATA, HASH_CHUNK_HEADER_SIZE, HASH_CHUNK_SIZE, HASH_CHUNK_THRESHOLD, ParallelHashJoinState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAXALIGN, MaxAllocSize, HashMemoryChunkData::maxlen, HashJoinTableData::nbatch, HashJoinTableData::nbuckets, HashMemoryChunkData::next, NTUP_PER_BUCKET, ParallelHashJoinBatch::ntuples, ParallelHashJoinBatchAccessor::ntuples, HashJoinTableData::parallel_state, PHJ_BUILD_HASH_INNER, PHJ_GROWTH_DISABLED, PHJ_GROWTH_NEED_MORE_BATCHES, PHJ_GROWTH_NEED_MORE_BUCKETS, HashMemoryChunkData::shared, ParallelHashJoinBatchAccessor::shared, ParallelHashJoinBatch::size, ParallelHashJoinState::space_allowed, ParallelHashJoinBatch::space_exhausted, and HashMemoryChunkData::used.

Referenced by ExecParallelHashRepartitionFirst(), ExecParallelHashTableInsert(), and ExecParallelHashTableInsertCurrentBatch().

◆ ExecParallelHashTuplePrealloc()

static bool ExecParallelHashTuplePrealloc ( HashJoinTable  hashtable,
int  batchno,
size_t  size 
)
static

Definition at line 3526 of file nodeHash.c.

3527 {
3528  ParallelHashJoinState *pstate = hashtable->parallel_state;
3529  ParallelHashJoinBatchAccessor *batch = &hashtable->batches[batchno];
3530  size_t want = Max(size, HASH_CHUNK_SIZE - HASH_CHUNK_HEADER_SIZE);
3531 
3532  Assert(batchno > 0);
3533  Assert(batchno < hashtable->nbatch);
3534  Assert(size == MAXALIGN(size));
3535 
3536  LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
3537 
3538  /* Has another participant commanded us to help grow? */
3539  if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES ||
3541  {
3542  ParallelHashGrowth growth = pstate->growth;
3543 
3544  LWLockRelease(&pstate->lock);
3545  if (growth == PHJ_GROWTH_NEED_MORE_BATCHES)
3547  else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS)
3549 
3550  return false;
3551  }
3552 
3553  if (pstate->growth != PHJ_GROWTH_DISABLED &&
3554  batch->at_least_one_chunk &&
3555  (batch->shared->estimated_size + want + HASH_CHUNK_HEADER_SIZE
3556  > pstate->space_allowed))
3557  {
3558  /*
3559  * We have determined that this batch would exceed the space budget if
3560  * loaded into memory. Command all participants to help repartition.
3561  */
3562  batch->shared->space_exhausted = true;
3564  LWLockRelease(&pstate->lock);
3565 
3566  return false;
3567  }
3568 
3569  batch->at_least_one_chunk = true;
3570  batch->shared->estimated_size += want + HASH_CHUNK_HEADER_SIZE;
3571  batch->preallocated = want;
3572  LWLockRelease(&pstate->lock);
3573 
3574  return true;
3575 }

References Assert(), ParallelHashJoinBatchAccessor::at_least_one_chunk, HashJoinTableData::batches, ParallelHashJoinBatch::estimated_size, ExecParallelHashIncreaseNumBatches(), ExecParallelHashIncreaseNumBuckets(), ParallelHashJoinState::growth, HASH_CHUNK_HEADER_SIZE, HASH_CHUNK_SIZE, ParallelHashJoinState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), Max, MAXALIGN, HashJoinTableData::parallel_state, PHJ_GROWTH_DISABLED, PHJ_GROWTH_NEED_MORE_BATCHES, PHJ_GROWTH_NEED_MORE_BUCKETS, ParallelHashJoinBatchAccessor::preallocated, ParallelHashJoinBatchAccessor::shared, ParallelHashJoinState::space_allowed, and ParallelHashJoinBatch::space_exhausted.

Referenced by ExecParallelHashTableInsert().

◆ ExecParallelPrepHashTableForUnmatched()

bool ExecParallelPrepHashTableForUnmatched ( HashJoinState hjstate)

Definition at line 2089 of file nodeHash.c.

2090 {
2091  HashJoinTable hashtable = hjstate->hj_HashTable;
2092  int curbatch = hashtable->curbatch;
2093  ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
2094 
2096 
2097  /*
2098  * It would not be deadlock-free to wait on the batch barrier, because it
2099  * is in PHJ_BATCH_PROBE phase, and thus processes attached to it have
2100  * already emitted tuples. Therefore, we'll hold a wait-free election:
2101  * only one process can continue to the next phase, and all others detach
2102  * from this batch. They can still go any work on other batches, if there
2103  * are any.
2104  */
2106  {
2107  /* This process considers the batch to be done. */
2108  hashtable->batches[hashtable->curbatch].done = true;
2109 
2110  /* Make sure any temporary files are closed. */
2111  sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
2112  sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
2113 
2114  /*
2115  * Track largest batch we've seen, which would normally happen in
2116  * ExecHashTableDetachBatch().
2117  */
2118  hashtable->spacePeak =
2119  Max(hashtable->spacePeak,
2120  batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
2121  hashtable->curbatch = -1;
2122  return false;
2123  }
2124 
2125  /* Now we are alone with this batch. */
2127 
2128  /*
2129  * Has another process decided to give up early and command all processes
2130  * to skip the unmatched scan?
2131  */
2132  if (batch->skip_unmatched)
2133  {
2134  hashtable->batches[hashtable->curbatch].done = true;
2135  ExecHashTableDetachBatch(hashtable);
2136  return false;
2137  }
2138 
2139  /* Now prepare the process local state, just as for non-parallel join. */
2141 
2142  return true;
2143 }
void ExecHashTableDetachBatch(HashJoinTable hashtable)
Definition: nodeHash.c:3274
void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
Definition: nodeHash.c:2068
HashJoinTable hj_HashTable
Definition: execnodes.h:2110

References Assert(), BarrierArriveAndDetachExceptLast(), BarrierPhase(), ParallelHashJoinBatch::batch_barrier, HashJoinTableData::batches, HashJoinTableData::curbatch, ParallelHashJoinBatchAccessor::done, ExecHashTableDetachBatch(), ExecPrepHashTableForUnmatched(), HashJoinState::hj_HashTable, ParallelHashJoinBatchAccessor::inner_tuples, Max, HashJoinTableData::nbuckets, ParallelHashJoinBatchAccessor::outer_tuples, PHJ_BATCH_PROBE, PHJ_BATCH_SCAN, ParallelHashJoinBatchAccessor::shared, ParallelHashJoinBatch::size, ParallelHashJoinBatch::skip_unmatched, HashJoinTableData::spacePeak, and sts_end_parallel_scan().

Referenced by ExecHashJoinImpl().

◆ ExecParallelScanHashBucket()

bool ExecParallelScanHashBucket ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 2017 of file nodeHash.c.

2019 {
2020  ExprState *hjclauses = hjstate->hashclauses;
2021  HashJoinTable hashtable = hjstate->hj_HashTable;
2022  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2023  uint32 hashvalue = hjstate->hj_CurHashValue;
2024 
2025  /*
2026  * hj_CurTuple is the address of the tuple last returned from the current
2027  * bucket, or NULL if it's time to start scanning a new bucket.
2028  */
2029  if (hashTuple != NULL)
2030  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2031  else
2032  hashTuple = ExecParallelHashFirstTuple(hashtable,
2033  hjstate->hj_CurBucketNo);
2034 
2035  while (hashTuple != NULL)
2036  {
2037  if (hashTuple->hashvalue == hashvalue)
2038  {
2039  TupleTableSlot *inntuple;
2040 
2041  /* insert hashtable's tuple into exec slot so ExecQual sees it */
2042  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2043  hjstate->hj_HashTupleSlot,
2044  false); /* do not pfree */
2045  econtext->ecxt_innertuple = inntuple;
2046 
2047  if (ExecQualAndReset(hjclauses, econtext))
2048  {
2049  hjstate->hj_CurTuple = hashTuple;
2050  return true;
2051  }
2052  }
2053 
2054  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2055  }
2056 
2057  /*
2058  * no match
2059  */
2060  return false;
2061 }
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1447
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
Definition: executor.h:439
static HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno)
Definition: nodeHash.c:3416
static HashJoinTuple ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple)
Definition: nodeHash.c:3432
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:251
HashJoinTuple hj_CurTuple
Definition: execnodes.h:2114
ExprState * hashclauses
Definition: execnodes.h:2106
uint32 hj_CurHashValue
Definition: execnodes.h:2111
int hj_CurBucketNo
Definition: execnodes.h:2112
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:2116

References ExprContext::ecxt_innertuple, ExecParallelHashFirstTuple(), ExecParallelHashNextTuple(), ExecQualAndReset(), ExecStoreMinimalTuple(), HashJoinState::hashclauses, HashJoinTupleData::hashvalue, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurHashValue, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, and HJTUPLE_MINTUPLE.

Referenced by ExecHashJoinImpl().

◆ ExecParallelScanHashTableForUnmatched()

bool ExecParallelScanHashTableForUnmatched ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 2228 of file nodeHash.c.

2230 {
2231  HashJoinTable hashtable = hjstate->hj_HashTable;
2232  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2233 
2234  for (;;)
2235  {
2236  /*
2237  * hj_CurTuple is the address of the tuple last returned from the
2238  * current bucket, or NULL if it's time to start scanning a new
2239  * bucket.
2240  */
2241  if (hashTuple != NULL)
2242  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2243  else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
2244  hashTuple = ExecParallelHashFirstTuple(hashtable,
2245  hjstate->hj_CurBucketNo++);
2246  else
2247  break; /* finished all buckets */
2248 
2249  while (hashTuple != NULL)
2250  {
2251  if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
2252  {
2253  TupleTableSlot *inntuple;
2254 
2255  /* insert hashtable's tuple into exec slot */
2256  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2257  hjstate->hj_HashTupleSlot,
2258  false); /* do not pfree */
2259  econtext->ecxt_innertuple = inntuple;
2260 
2261  /*
2262  * Reset temp memory each time; although this function doesn't
2263  * do any qual eval, the caller will, so let's keep it
2264  * parallel to ExecScanHashBucket.
2265  */
2266  ResetExprContext(econtext);
2267 
2268  hjstate->hj_CurTuple = hashTuple;
2269  return true;
2270  }
2271 
2272  hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2273  }
2274 
2275  /* allow this loop to be cancellable */
2277  }
2278 
2279  /*
2280  * no more unmatched tuples
2281  */
2282  return false;
2283 }
#define HeapTupleHeaderHasMatch(tup)
Definition: htup_details.h:514

References CHECK_FOR_INTERRUPTS, ExprContext::ecxt_innertuple, ExecParallelHashFirstTuple(), ExecParallelHashNextTuple(), ExecStoreMinimalTuple(), HeapTupleHeaderHasMatch, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, HashJoinTableData::nbuckets, and ResetExprContext.

Referenced by ExecHashJoinImpl().

◆ ExecPrepHashTableForUnmatched()

void ExecPrepHashTableForUnmatched ( HashJoinState hjstate)

Definition at line 2068 of file nodeHash.c.

2069 {
2070  /*----------
2071  * During this scan we use the HashJoinState fields as follows:
2072  *
2073  * hj_CurBucketNo: next regular bucket to scan
2074  * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
2075  * hj_CurTuple: last tuple returned, or NULL to start next bucket
2076  *----------
2077  */
2078  hjstate->hj_CurBucketNo = 0;
2079  hjstate->hj_CurSkewBucketNo = 0;
2080  hjstate->hj_CurTuple = NULL;
2081 }
int hj_CurSkewBucketNo
Definition: execnodes.h:2113

References HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurSkewBucketNo, and HashJoinState::hj_CurTuple.

Referenced by ExecHashJoinImpl(), and ExecParallelPrepHashTableForUnmatched().

◆ ExecReScanHash()

void ExecReScanHash ( HashState node)

Definition at line 2345 of file nodeHash.c.

2346 {
2348 
2349  /*
2350  * if chgParam of subnode is not null then plan will be re-scanned by
2351  * first ExecProcNode.
2352  */
2353  if (outerPlan->chgParam == NULL)
2355 }
void ExecReScan(PlanState *node)
Definition: execAmi.c:78

References ExecReScan(), outerPlan, and outerPlanState.

Referenced by ExecReScan().

◆ ExecScanHashBucket()

bool ExecScanHashBucket ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 1956 of file nodeHash.c.

1958 {
1959  ExprState *hjclauses = hjstate->hashclauses;
1960  HashJoinTable hashtable = hjstate->hj_HashTable;
1961  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
1962  uint32 hashvalue = hjstate->hj_CurHashValue;
1963 
1964  /*
1965  * hj_CurTuple is the address of the tuple last returned from the current
1966  * bucket, or NULL if it's time to start scanning a new bucket.
1967  *
1968  * If the tuple hashed to a skew bucket then scan the skew bucket
1969  * otherwise scan the standard hashtable bucket.
1970  */
1971  if (hashTuple != NULL)
1972  hashTuple = hashTuple->next.unshared;
1973  else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
1974  hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
1975  else
1976  hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
1977 
1978  while (hashTuple != NULL)
1979  {
1980  if (hashTuple->hashvalue == hashvalue)
1981  {
1982  TupleTableSlot *inntuple;
1983 
1984  /* insert hashtable's tuple into exec slot so ExecQual sees it */
1985  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
1986  hjstate->hj_HashTupleSlot,
1987  false); /* do not pfree */
1988  econtext->ecxt_innertuple = inntuple;
1989 
1990  if (ExecQualAndReset(hjclauses, econtext))
1991  {
1992  hjstate->hj_CurTuple = hashTuple;
1993  return true;
1994  }
1995  }
1996 
1997  hashTuple = hashTuple->next.unshared;
1998  }
1999 
2000  /*
2001  * no match
2002  */
2003  return false;
2004 }

References HashJoinTableData::buckets, ExprContext::ecxt_innertuple, ExecQualAndReset(), ExecStoreMinimalTuple(), HashJoinState::hashclauses, HashJoinTupleData::hashvalue, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurHashValue, HashJoinState::hj_CurSkewBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, INVALID_SKEW_BUCKET_NO, HashJoinTupleData::next, HashJoinTableData::skewBucket, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

◆ ExecScanHashTableForUnmatched()

bool ExecScanHashTableForUnmatched ( HashJoinState hjstate,
ExprContext econtext 
)

Definition at line 2154 of file nodeHash.c.

2155 {
2156  HashJoinTable hashtable = hjstate->hj_HashTable;
2157  HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2158 
2159  for (;;)
2160  {
2161  /*
2162  * hj_CurTuple is the address of the tuple last returned from the
2163  * current bucket, or NULL if it's time to start scanning a new
2164  * bucket.
2165  */
2166  if (hashTuple != NULL)
2167  hashTuple = hashTuple->next.unshared;
2168  else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
2169  {
2170  hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
2171  hjstate->hj_CurBucketNo++;
2172  }
2173  else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
2174  {
2175  int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
2176 
2177  hashTuple = hashtable->skewBucket[j]->tuples;
2178  hjstate->hj_CurSkewBucketNo++;
2179  }
2180  else
2181  break; /* finished all buckets */
2182 
2183  while (hashTuple != NULL)
2184  {
2185  if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
2186  {
2187  TupleTableSlot *inntuple;
2188 
2189  /* insert hashtable's tuple into exec slot */
2190  inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2191  hjstate->hj_HashTupleSlot,
2192  false); /* do not pfree */
2193  econtext->ecxt_innertuple = inntuple;
2194 
2195  /*
2196  * Reset temp memory each time; although this function doesn't
2197  * do any qual eval, the caller will, so let's keep it
2198  * parallel to ExecScanHashBucket.
2199  */
2200  ResetExprContext(econtext);
2201 
2202  hjstate->hj_CurTuple = hashTuple;
2203  return true;
2204  }
2205 
2206  hashTuple = hashTuple->next.unshared;
2207  }
2208 
2209  /* allow this loop to be cancellable */
2211  }
2212 
2213  /*
2214  * no more unmatched tuples
2215  */
2216  return false;
2217 }

References HashJoinTableData::buckets, CHECK_FOR_INTERRUPTS, ExprContext::ecxt_innertuple, ExecStoreMinimalTuple(), HeapTupleHeaderHasMatch, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurSkewBucketNo, HashJoinState::hj_CurTuple, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HJTUPLE_MINTUPLE, j, HashJoinTableData::nbuckets, HashJoinTupleData::next, HashJoinTableData::nSkewBuckets, ResetExprContext, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketNums, HashSkewBucket::tuples, HashJoinTupleData::unshared, and HashJoinTableData::unshared.

Referenced by ExecHashJoinImpl().

◆ ExecShutdownHash()

void ExecShutdownHash ( HashState node)

Definition at line 2796 of file nodeHash.c.

2797 {
2798  /* Allocate save space if EXPLAIN'ing and we didn't do so already */
2799  if (node->ps.instrument && !node->hinstrument)
2801  /* Now accumulate data for the current (final) hash table */
2802  if (node->hinstrument && node->hashtable)
2804 }
#define palloc0_object(type)
Definition: fe_memutils.h:63
void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
Definition: nodeHash.c:2842

References ExecHashAccumInstrumentation(), HashState::hashtable, HashState::hinstrument, PlanState::instrument, palloc0_object, and HashState::ps.

Referenced by ExecShutdownNode_walker().

◆ get_hash_memory_limit()

size_t get_hash_memory_limit ( void  )

Definition at line 3587 of file nodeHash.c.

3588 {
3589  double mem_limit;
3590 
3591  /* Do initial calculation in double arithmetic */
3592  mem_limit = (double) work_mem * hash_mem_multiplier * 1024.0;
3593 
3594  /* Clamp in case it doesn't fit in size_t */
3595  mem_limit = Min(mem_limit, (double) SIZE_MAX);
3596 
3597  return (size_t) mem_limit;
3598 }
double hash_mem_multiplier
Definition: globals.c:126
int work_mem
Definition: globals.c:125

References hash_mem_multiplier, Min, and work_mem.

Referenced by BuildTupleHashTableExt(), choose_hashed_setop(), consider_groupingsets_paths(), cost_memoize_rescan(), create_unique_path(), ExecChooseHashTableSize(), ExecInitMemoize(), ExecParallelHashIncreaseNumBatches(), final_cost_hashjoin(), hash_agg_set_limits(), hash_choose_num_partitions(), subpath_is_hashable(), and subplan_is_hashable().

◆ MultiExecHash()

Node* MultiExecHash ( HashState node)

Definition at line 106 of file nodeHash.c.

107 {
108  /* must provide our own instrumentation support */
109  if (node->ps.instrument)
111 
112  if (node->parallel_state != NULL)
113  MultiExecParallelHash(node);
114  else
115  MultiExecPrivateHash(node);
116 
117  /* must provide our own instrumentation support */
118  if (node->ps.instrument)
120 
121  /*
122  * We do not return the hash table directly because it's not a subtype of
123  * Node, and so would violate the MultiExecProcNode API. Instead, our
124  * parent Hashjoin node is expected to know how to fish it out of our node
125  * state. Ugly but not really worth cleaning up, since Hashjoin knows
126  * quite a bit more about Hash besides that.
127  */
128  return NULL;
129 }
void InstrStartNode(Instrumentation *instr)
Definition: instrument.c:68
void InstrStopNode(Instrumentation *instr, double nTuples)
Definition: instrument.c:84
static void MultiExecParallelHash(HashState *node)
Definition: nodeHash.c:215
static void MultiExecPrivateHash(HashState *node)
Definition: nodeHash.c:139
struct ParallelHashJoinState * parallel_state
Definition: execnodes.h:2681

References HashState::hashtable, InstrStartNode(), InstrStopNode(), PlanState::instrument, MultiExecParallelHash(), MultiExecPrivateHash(), HashState::parallel_state, HashJoinTableData::partialTuples, and HashState::ps.

Referenced by MultiExecProcNode().

◆ MultiExecParallelHash()

static void MultiExecParallelHash ( HashState node)
static

Definition at line 215 of file nodeHash.c.

216 {
217  ParallelHashJoinState *pstate;
218  PlanState *outerNode;
219  List *hashkeys;
220  HashJoinTable hashtable;
221  TupleTableSlot *slot;
222  ExprContext *econtext;
223  uint32 hashvalue;
224  Barrier *build_barrier;
225  int i;
226 
227  /*
228  * get state info from node
229  */
230  outerNode = outerPlanState(node);
231  hashtable = node->hashtable;
232 
233  /*
234  * set expression context
235  */
236  hashkeys = node->hashkeys;
237  econtext = node->ps.ps_ExprContext;
238 
239  /*
240  * Synchronize the parallel hash table build. At this stage we know that
241  * the shared hash table has been or is being set up by
242  * ExecHashTableCreate(), but we don't know if our peers have returned
243  * from there or are here in MultiExecParallelHash(), and if so how far
244  * through they are. To find out, we check the build_barrier phase then
245  * and jump to the right step in the build algorithm.
246  */
247  pstate = hashtable->parallel_state;
248  build_barrier = &pstate->build_barrier;
249  Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATE);
250  switch (BarrierPhase(build_barrier))
251  {
252  case PHJ_BUILD_ALLOCATE:
253 
254  /*
255  * Either I just allocated the initial hash table in
256  * ExecHashTableCreate(), or someone else is doing that. Either
257  * way, wait for everyone to arrive here so we can proceed.
258  */
259  BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATE);
260  /* Fall through. */
261 
263 
264  /*
265  * It's time to begin hashing, or if we just arrived here then
266  * hashing is already underway, so join in that effort. While
267  * hashing we have to be prepared to help increase the number of
268  * batches or buckets at any time, and if we arrived here when
269  * that was already underway we'll have to help complete that work
270  * immediately so that it's safe to access batches and buckets
271  * below.
272  */
281  for (;;)
282  {
283  slot = ExecProcNode(outerNode);
284  if (TupIsNull(slot))
285  break;
286  econtext->ecxt_outertuple = slot;
287  if (ExecHashGetHashValue(hashtable, econtext, hashkeys,
288  false, hashtable->keepNulls,
289  &hashvalue))
290  ExecParallelHashTableInsert(hashtable, slot, hashvalue);
291  hashtable->partialTuples++;
292  }
293 
294  /*
295  * Make sure that any tuples we wrote to disk are visible to
296  * others before anyone tries to load them.
297  */
298  for (i = 0; i < hashtable->nbatch; ++i)
299  sts_end_write(hashtable->batches[i].inner_tuples);
300 
301  /*
302  * Update shared counters. We need an accurate total tuple count
303  * to control the empty table optimization.
304  */
306 
309 
310  /*
311  * Wait for everyone to finish building and flushing files and
312  * counters.
313  */
314  if (BarrierArriveAndWait(build_barrier,
315  WAIT_EVENT_HASH_BUILD_HASH_INNER))
316  {
317  /*
318  * Elect one backend to disable any further growth. Batches
319  * are now fixed. While building them we made sure they'd fit
320  * in our memory budget when we load them back in later (or we
321  * tried to do that and gave up because we detected extreme
322  * skew).
323  */
324  pstate->growth = PHJ_GROWTH_DISABLED;
325  }
326  }
327 
328  /*
329  * We're not yet attached to a batch. We all agree on the dimensions and
330  * number of inner tuples (for the empty table optimization).
331  */
332  hashtable->curbatch = -1;
333  hashtable->nbuckets = pstate->nbuckets;
334  hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
335  hashtable->totalTuples = pstate->total_tuples;
336 
337  /*
338  * Unless we're completely done and the batch state has been freed, make
339  * sure we have accessors.
340  */
341  if (BarrierPhase(build_barrier) < PHJ_BUILD_FREE)
343 
344  /*
345  * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE
346  * case, which will bring the build phase to PHJ_BUILD_RUN (if it isn't
347  * there already).
348  */
349  Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER ||
350  BarrierPhase(build_barrier) == PHJ_BUILD_RUN ||
351  BarrierPhase(build_barrier) == PHJ_BUILD_FREE);
352 }
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:268
#define PHJ_BUILD_HASH_OUTER
Definition: hashjoin.h:272
#define PHJ_BUILD_ALLOCATE
Definition: hashjoin.h:270
void ExecParallelHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
Definition: nodeHash.c:1706
bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, bool outer_tuple, bool keep_nulls, uint32 *hashvalue)
Definition: nodeHash.c:1816
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:253
Definition: pg_list.h:54
ExprContext * ps_ExprContext
Definition: execnodes.h:1076
#define TupIsNull(slot)
Definition: tuptable.h:299

References Assert(), BarrierArriveAndWait(), BarrierAttach(), BarrierDetach(), BarrierPhase(), HashJoinTableData::batches, ParallelHashJoinState::build_barrier, HashJoinTableData::curbatch, ExprContext::ecxt_outertuple, ExecHashGetHashValue(), ExecParallelHashEnsureBatchAccessors(), ExecParallelHashIncreaseNumBatches(), ExecParallelHashIncreaseNumBuckets(), ExecParallelHashMergeCounters(), ExecParallelHashTableInsert(), ExecParallelHashTableSetCurrentBatch(), ExecProcNode(), ParallelHashJoinState::grow_batches_barrier, ParallelHashJoinState::grow_buckets_barrier, ParallelHashJoinState::growth, HashState::hashkeys, HashState::hashtable, i, ParallelHashJoinBatchAccessor::inner_tuples, HashJoinTableData::keepNulls, HashJoinTableData::log2_nbuckets, my_log2(), HashJoinTableData::nbatch, ParallelHashJoinState::nbuckets, HashJoinTableData::nbuckets, outerPlanState, HashJoinTableData::parallel_state, HashJoinTableData::partialTuples, PHJ_BUILD_ALLOCATE, PHJ_BUILD_FREE, PHJ_BUILD_HASH_INNER, PHJ_BUILD_HASH_OUTER, PHJ_BUILD_RUN, PHJ_GROW_BATCHES_ELECT, PHJ_GROW_BATCHES_PHASE, PHJ_GROW_BUCKETS_ELECT, PHJ_GROW_BUCKETS_PHASE, PHJ_GROWTH_DISABLED, HashState::ps, PlanState::ps_ExprContext, sts_end_write(), ParallelHashJoinState::total_tuples, HashJoinTableData::totalTuples, and TupIsNull.

Referenced by MultiExecHash().

◆ MultiExecPrivateHash()

static void MultiExecPrivateHash ( HashState node)
static

Definition at line 139 of file nodeHash.c.

140 {
141  PlanState *outerNode;
142  List *hashkeys;
143  HashJoinTable hashtable;
144  TupleTableSlot *slot;
145  ExprContext *econtext;
146  uint32 hashvalue;
147 
148  /*
149  * get state info from node
150  */
151  outerNode = outerPlanState(node);
152  hashtable = node->hashtable;
153 
154  /*
155  * set expression context
156  */
157  hashkeys = node->hashkeys;
158  econtext = node->ps.ps_ExprContext;
159 
160  /*
161  * Get all tuples from the node below the Hash node and insert into the
162  * hash table (or temp files).
163  */
164  for (;;)
165  {
166  slot = ExecProcNode(outerNode);
167  if (TupIsNull(slot))
168  break;
169  /* We have to compute the hash value */
170  econtext->ecxt_outertuple = slot;
171  if (ExecHashGetHashValue(hashtable, econtext, hashkeys,
172  false, hashtable->keepNulls,
173  &hashvalue))
174  {
175  int bucketNumber;
176 
177  bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue);
178  if (bucketNumber != INVALID_SKEW_BUCKET_NO)
179  {
180  /* It's a skew tuple, so put it into that hash table */
181  ExecHashSkewTableInsert(hashtable, slot, hashvalue,
182  bucketNumber);
183  hashtable->skewTuples += 1;
184  }
185  else
186  {
187  /* Not subject to skew optimization, so insert normally */
188  ExecHashTableInsert(hashtable, slot, hashvalue);
189  }
190  hashtable->totalTuples += 1;
191  }
192  }
193 
194  /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
195  if (hashtable->nbuckets != hashtable->nbuckets_optimal)
196  ExecHashIncreaseNumBuckets(hashtable);
197 
198  /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
199  hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
200  if (hashtable->spaceUsed > hashtable->spacePeak)
201  hashtable->spacePeak = hashtable->spaceUsed;
202 
203  hashtable->partialTuples = hashtable->totalTuples;
204 }
static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
Definition: nodeHash.c:1454
int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
Definition: nodeHash.c:2520
static void ExecHashSkewTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue, int bucketNumber)
Definition: nodeHash.c:2566
void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
Definition: nodeHash.c:1616

References ExprContext::ecxt_outertuple, ExecHashGetHashValue(), ExecHashGetSkewBucket(), ExecHashIncreaseNumBuckets(), ExecHashSkewTableInsert(), ExecHashTableInsert(), ExecProcNode(), HashState::hashkeys, HashState::hashtable, INVALID_SKEW_BUCKET_NO, HashJoinTableData::keepNulls, HashJoinTableData::nbuckets, HashJoinTableData::nbuckets_optimal, outerPlanState, HashJoinTableData::partialTuples, HashState::ps, PlanState::ps_ExprContext, HashJoinTableData::skewTuples, HashJoinTableData::spacePeak, HashJoinTableData::spaceUsed, HashJoinTableData::totalTuples, and TupIsNull.

Referenced by MultiExecHash().