PostgreSQL Source Code  git master
hashjoin.h
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * hashjoin.h
4  * internal structures for hash joins
5  *
6  *
7  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * src/include/executor/hashjoin.h
11  *
12  *-------------------------------------------------------------------------
13  */
14 #ifndef HASHJOIN_H
15 #define HASHJOIN_H
16 
17 #include "nodes/execnodes.h"
18 #include "port/atomics.h"
19 #include "storage/barrier.h"
20 #include "storage/buffile.h"
21 #include "storage/lwlock.h"
22 
23 /* ----------------------------------------------------------------
24  * hash-join hash table structures
25  *
26  * Each active hashjoin has a HashJoinTable structure, which is
27  * palloc'd in the executor's per-query context. Other storage needed for
28  * each hashjoin is kept in child contexts, three for each hashjoin:
29  * - HashTableContext (hashCxt): the parent hash table storage context
30  * - HashSpillContext (spillCxt): storage for temp files buffers
31  * - HashBatchContext (batchCxt): storage for a batch in serial hash join
32  *
33  * The hashtable contexts are made children of the per-query context, ensuring
34  * that they will be discarded at end of statement even if the join is
35  * aborted early by an error. (Likewise, any temporary files we make will
36  * be cleaned up by the virtual file manager in event of an error.)
37  *
38  * Storage that should live through the entire join is allocated from the
39  * "hashCxt" (mainly the hashtable's metadata). Also, the "hashCxt" context is
40  * the parent of "spillCxt" and "batchCxt". It makes it easy and fast to
41  * release the storage when we don't need it anymore.
42  *
43  * Data associated with temp files is allocated in the "spillCxt" context
44  * which lives for the duration of the entire join as batch files'
45  * creation and usage may span batch execution. These files are
46  * explicitly destroyed by calling BufFileClose() when the code is done
47  * with them. The aim of this context is to help accounting for the
48  * memory allocated for temp files and their buffers.
49  *
50  * Finally, data used only during a single batch's execution is allocated
51  * in the "batchCxt". By resetting the batchCxt at the end of each batch,
52  * we free all the per-batch storage reliably and without tedium.
53  *
54  * During first scan of inner relation, we get its tuples from executor.
55  * If nbatch > 1 then tuples that don't belong in first batch get saved
56  * into inner-batch temp files. The same statements apply for the
57  * first scan of the outer relation, except we write tuples to outer-batch
58  * temp files. After finishing the first scan, we do the following for
59  * each remaining batch:
60  * 1. Read tuples from inner batch file, load into hash buckets.
61  * 2. Read tuples from outer batch file, match to hash buckets and output.
62  *
63  * It is possible to increase nbatch on the fly if the in-memory hash table
64  * gets too big. The hash-value-to-batch computation is arranged so that this
65  * can only cause a tuple to go into a later batch than previously thought,
66  * never into an earlier batch. When we increase nbatch, we rescan the hash
67  * table and dump out any tuples that are now of a later batch to the correct
68  * inner batch file. Subsequently, while reading either inner or outer batch
69  * files, we might find tuples that no longer belong to the current batch;
70  * if so, we just dump them out to the correct batch file.
71  * ----------------------------------------------------------------
72  */
73 
74 /* these are in nodes/execnodes.h: */
75 /* typedef struct HashJoinTupleData *HashJoinTuple; */
76 /* typedef struct HashJoinTableData *HashJoinTable; */
77 
78 typedef struct HashJoinTupleData
79 {
80  /* link to next tuple in same bucket */
81  union
82  {
85  } next;
86  uint32 hashvalue; /* tuple's hash code */
87  /* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */
89 
90 #define HJTUPLE_OVERHEAD MAXALIGN(sizeof(HashJoinTupleData))
91 #define HJTUPLE_MINTUPLE(hjtup) \
92  ((MinimalTuple) ((char *) (hjtup) + HJTUPLE_OVERHEAD))
93 
94 /*
95  * If the outer relation's distribution is sufficiently nonuniform, we attempt
96  * to optimize the join by treating the hash values corresponding to the outer
97  * relation's MCVs specially. Inner relation tuples matching these hash
98  * values go into the "skew" hashtable instead of the main hashtable, and
99  * outer relation tuples with these hash values are matched against that
100  * table instead of the main one. Thus, tuples with these hash values are
101  * effectively handled as part of the first batch and will never go to disk.
102  * The skew hashtable is limited to SKEW_HASH_MEM_PERCENT of the total memory
103  * allowed for the join; while building the hashtables, we decrease the number
104  * of MCVs being specially treated if needed to stay under this limit.
105  *
106  * Note: you might wonder why we look at the outer relation stats for this,
107  * rather than the inner. One reason is that the outer relation is typically
108  * bigger, so we get more I/O savings by optimizing for its most common values.
109  * Also, for similarly-sized relations, the planner prefers to put the more
110  * uniformly distributed relation on the inside, so we're more likely to find
111  * interesting skew in the outer relation.
112  */
113 typedef struct HashSkewBucket
114 {
115  uint32 hashvalue; /* common hash value */
116  HashJoinTuple tuples; /* linked list of inner-relation tuples */
118 
119 #define SKEW_BUCKET_OVERHEAD MAXALIGN(sizeof(HashSkewBucket))
120 #define INVALID_SKEW_BUCKET_NO (-1)
121 #define SKEW_HASH_MEM_PERCENT 2
122 #define SKEW_MIN_OUTER_FRACTION 0.01
123 
124 /*
125  * To reduce palloc overhead, the HashJoinTuples for the current batch are
126  * packed in 32kB buffers instead of pallocing each tuple individually.
127  */
128 typedef struct HashMemoryChunkData
129 {
130  int ntuples; /* number of tuples stored in this chunk */
131  size_t maxlen; /* size of the chunk's tuple buffer */
132  size_t used; /* number of buffer bytes already used */
133 
134  /* pointer to the next chunk (linked list) */
135  union
136  {
139  } next;
140 
141  /*
142  * The chunk's tuple buffer starts after the HashMemoryChunkData struct,
143  * at offset HASH_CHUNK_HEADER_SIZE (which must be maxaligned). Note that
144  * that offset is not included in "maxlen" or "used".
145  */
147 
149 
150 #define HASH_CHUNK_SIZE (32 * 1024L)
151 #define HASH_CHUNK_HEADER_SIZE MAXALIGN(sizeof(HashMemoryChunkData))
152 #define HASH_CHUNK_DATA(hc) (((char *) (hc)) + HASH_CHUNK_HEADER_SIZE)
153 /* tuples exceeding HASH_CHUNK_THRESHOLD bytes are put in their own chunk */
154 #define HASH_CHUNK_THRESHOLD (HASH_CHUNK_SIZE / 4)
155 
156 /*
157  * For each batch of a Parallel Hash Join, we have a ParallelHashJoinBatch
158  * object in shared memory to coordinate access to it. Since they are
159  * followed by variable-sized objects, they are arranged in contiguous memory
160  * but not accessed directly as an array.
161  */
162 typedef struct ParallelHashJoinBatch
163 {
164  dsa_pointer buckets; /* array of hash table buckets */
165  Barrier batch_barrier; /* synchronization for joining this batch */
166 
167  dsa_pointer chunks; /* chunks of tuples loaded */
168  size_t size; /* size of buckets + chunks in memory */
169  size_t estimated_size; /* size of buckets + chunks while writing */
170  size_t ntuples; /* number of tuples loaded */
171  size_t old_ntuples; /* number of tuples before repartitioning */
173  bool skip_unmatched; /* whether to abandon unmatched scan */
174 
175  /*
176  * Variable-sized SharedTuplestore objects follow this struct in memory.
177  * See the accessor macros below.
178  */
180 
181 /* Accessor for inner batch tuplestore following a ParallelHashJoinBatch. */
182 #define ParallelHashJoinBatchInner(batch) \
183  ((SharedTuplestore *) \
184  ((char *) (batch) + MAXALIGN(sizeof(ParallelHashJoinBatch))))
185 
186 /* Accessor for outer batch tuplestore following a ParallelHashJoinBatch. */
187 #define ParallelHashJoinBatchOuter(batch, nparticipants) \
188  ((SharedTuplestore *) \
189  ((char *) ParallelHashJoinBatchInner(batch) + \
190  MAXALIGN(sts_estimate(nparticipants))))
191 
192 /* Total size of a ParallelHashJoinBatch and tuplestores. */
193 #define EstimateParallelHashJoinBatch(hashtable) \
194  (MAXALIGN(sizeof(ParallelHashJoinBatch)) + \
195  MAXALIGN(sts_estimate((hashtable)->parallel_state->nparticipants)) * 2)
196 
197 /* Accessor for the nth ParallelHashJoinBatch given the base. */
198 #define NthParallelHashJoinBatch(base, n) \
199  ((ParallelHashJoinBatch *) \
200  ((char *) (base) + \
201  EstimateParallelHashJoinBatch(hashtable) * (n)))
202 
203 /*
204  * Each backend requires a small amount of per-batch state to interact with
205  * each ParallelHashJoinBatch.
206  */
208 {
209  ParallelHashJoinBatch *shared; /* pointer to shared state */
210 
211  /* Per-backend partial counters to reduce contention. */
212  size_t preallocated; /* pre-allocated space for this backend */
213  size_t ntuples; /* number of tuples */
214  size_t size; /* size of partition in memory */
215  size_t estimated_size; /* size of partition on disk */
216  size_t old_ntuples; /* how many tuples before repartitioning? */
217  bool at_least_one_chunk; /* has this backend allocated a chunk? */
218  bool outer_eof; /* has this process hit end of batch? */
219  bool done; /* flag to remember that a batch is done */
223 
224 /*
225  * While hashing the inner relation, any participant might determine that it's
226  * time to increase the number of buckets to reduce the load factor or batches
227  * to reduce the memory size. This is indicated by setting the growth flag to
228  * these values.
229  */
230 typedef enum ParallelHashGrowth
231 {
232  /* The current dimensions are sufficient. */
234  /* The load factor is too high, so we need to add buckets. */
236  /* The memory budget would be exhausted, so we need to repartition. */
238  /* Repartitioning didn't help last time, so don't try to do that again. */
241 
242 /*
243  * The shared state used to coordinate a Parallel Hash Join. This is stored
244  * in the DSM segment.
245  */
246 typedef struct ParallelHashJoinState
247 {
248  dsa_pointer batches; /* array of ParallelHashJoinBatch */
249  dsa_pointer old_batches; /* previous generation during repartition */
250  int nbatch; /* number of batches now */
251  int old_nbatch; /* previous number of batches */
252  int nbuckets; /* number of buckets */
253  ParallelHashGrowth growth; /* control batch/bucket growth */
254  dsa_pointer chunk_work_queue; /* chunk work queue */
257  size_t total_tuples; /* total number of inner tuples */
258  LWLock lock; /* lock protecting the above */
259 
260  Barrier build_barrier; /* synchronization for the build phases */
263  pg_atomic_uint32 distributor; /* counter for load balancing */
264 
265  SharedFileSet fileset; /* space for shared temporary files */
267 
268 /* The phases for building batches, used by build_barrier. */
269 #define PHJ_BUILD_ELECT 0
270 #define PHJ_BUILD_ALLOCATE 1
271 #define PHJ_BUILD_HASH_INNER 2
272 #define PHJ_BUILD_HASH_OUTER 3
273 #define PHJ_BUILD_RUN 4
274 #define PHJ_BUILD_FREE 5
275 
276 /* The phases for probing each batch, used by for batch_barrier. */
277 #define PHJ_BATCH_ELECT 0
278 #define PHJ_BATCH_ALLOCATE 1
279 #define PHJ_BATCH_LOAD 2
280 #define PHJ_BATCH_PROBE 3
281 #define PHJ_BATCH_SCAN 4
282 #define PHJ_BATCH_FREE 5
283 
284 /* The phases of batch growth while hashing, for grow_batches_barrier. */
285 #define PHJ_GROW_BATCHES_ELECT 0
286 #define PHJ_GROW_BATCHES_REALLOCATE 1
287 #define PHJ_GROW_BATCHES_REPARTITION 2
288 #define PHJ_GROW_BATCHES_DECIDE 3
289 #define PHJ_GROW_BATCHES_FINISH 4
290 #define PHJ_GROW_BATCHES_PHASE(n) ((n) % 5) /* circular phases */
291 
292 /* The phases of bucket growth while hashing, for grow_buckets_barrier. */
293 #define PHJ_GROW_BUCKETS_ELECT 0
294 #define PHJ_GROW_BUCKETS_REALLOCATE 1
295 #define PHJ_GROW_BUCKETS_REINSERT 2
296 #define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */
297 
298 typedef struct HashJoinTableData
299 {
300  int nbuckets; /* # buckets in the in-memory hash table */
301  int log2_nbuckets; /* its log2 (nbuckets must be a power of 2) */
302 
303  int nbuckets_original; /* # buckets when starting the first hash */
304  int nbuckets_optimal; /* optimal # buckets (per batch) */
305  int log2_nbuckets_optimal; /* log2(nbuckets_optimal) */
306 
307  /* buckets[i] is head of list of tuples in i'th in-memory bucket */
308  union
309  {
310  /* unshared array is per-batch storage, as are all the tuples */
312  /* shared array is per-query DSA area, as are all the tuples */
315 
316  bool keepNulls; /* true to store unmatchable NULL tuples */
317 
318  bool skewEnabled; /* are we using skew optimization? */
319  HashSkewBucket **skewBucket; /* hashtable of skew buckets */
320  int skewBucketLen; /* size of skewBucket array (a power of 2!) */
321  int nSkewBuckets; /* number of active skew buckets */
322  int *skewBucketNums; /* array indexes of active skew buckets */
323 
324  int nbatch; /* number of batches */
325  int curbatch; /* current batch #; 0 during 1st pass */
326 
327  int nbatch_original; /* nbatch when we started inner scan */
328  int nbatch_outstart; /* nbatch when we started outer scan */
329 
330  bool growEnabled; /* flag to shut off nbatch increases */
331 
332  double totalTuples; /* # tuples obtained from inner plan */
333  double partialTuples; /* # tuples obtained from inner plan by me */
334  double skewTuples; /* # tuples inserted into skew tuples */
335 
336  /*
337  * These arrays are allocated for the life of the hash join, but only if
338  * nbatch > 1. A file is opened only when we first write a tuple into it
339  * (otherwise its pointer remains NULL). Note that the zero'th array
340  * elements never get used, since we will process rather than dump out any
341  * tuples of batch zero.
342  */
343  BufFile **innerBatchFile; /* buffered virtual temp file per batch */
344  BufFile **outerBatchFile; /* buffered virtual temp file per batch */
345 
346  /*
347  * Info about the datatype-specific hash functions for the datatypes being
348  * hashed. These are arrays of the same length as the number of hash join
349  * clauses (hash keys).
350  */
351  FmgrInfo *outer_hashfunctions; /* lookup data for hash functions */
352  FmgrInfo *inner_hashfunctions; /* lookup data for hash functions */
353  bool *hashStrict; /* is each hash join operator strict? */
355 
356  Size spaceUsed; /* memory space currently used by tuples */
357  Size spaceAllowed; /* upper limit for space used */
358  Size spacePeak; /* peak space used */
359  Size spaceUsedSkew; /* skew hash table's current space usage */
360  Size spaceAllowedSkew; /* upper limit for skew hashtable */
361 
362  MemoryContext hashCxt; /* context for whole-hash-join storage */
363  MemoryContext batchCxt; /* context for this-batch-only storage */
364  MemoryContext spillCxt; /* context for spilling to temp files */
365 
366  /* used for dense allocation of tuples (into linked chunks) */
367  HashMemoryChunk chunks; /* one list for the whole batch */
368 
369  /* Shared and private state for Parallel Hash. */
370  HashMemoryChunk current_chunk; /* this backend's current chunk */
371  dsa_area *area; /* DSA area to allocate memory from */
376 
377 #endif /* HASHJOIN_H */
unsigned int uint32
Definition: c.h:506
size_t Size
Definition: c.h:605
uint64 dsa_pointer
Definition: dsa.h:62
struct HashMemoryChunkData * HashMemoryChunk
Definition: hashjoin.h:148
struct HashJoinTupleData HashJoinTupleData
struct ParallelHashJoinBatch ParallelHashJoinBatch
struct HashSkewBucket HashSkewBucket
struct ParallelHashJoinBatchAccessor ParallelHashJoinBatchAccessor
struct HashJoinTableData HashJoinTableData
struct ParallelHashJoinState ParallelHashJoinState
ParallelHashGrowth
Definition: hashjoin.h:231
@ PHJ_GROWTH_NEED_MORE_BUCKETS
Definition: hashjoin.h:235
@ PHJ_GROWTH_OK
Definition: hashjoin.h:233
@ PHJ_GROWTH_NEED_MORE_BATCHES
Definition: hashjoin.h:237
@ PHJ_GROWTH_DISABLED
Definition: hashjoin.h:239
struct HashMemoryChunkData HashMemoryChunkData
unsigned int Oid
Definition: postgres_ext.h:31
Definition: fmgr.h:57
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:311
FmgrInfo * outer_hashfunctions
Definition: hashjoin.h:351
HashMemoryChunk chunks
Definition: hashjoin.h:367
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:373
MemoryContext hashCxt
Definition: hashjoin.h:362
double totalTuples
Definition: hashjoin.h:332
double partialTuples
Definition: hashjoin.h:333
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:372
MemoryContext spillCxt
Definition: hashjoin.h:364
HashMemoryChunk current_chunk
Definition: hashjoin.h:370
bool * hashStrict
Definition: hashjoin.h:353
Size spaceAllowedSkew
Definition: hashjoin.h:360
int * skewBucketNums
Definition: hashjoin.h:322
BufFile ** innerBatchFile
Definition: hashjoin.h:343
int log2_nbuckets_optimal
Definition: hashjoin.h:305
dsa_pointer_atomic * shared
Definition: hashjoin.h:313
union HashJoinTableData::@104 buckets
dsa_area * area
Definition: hashjoin.h:371
BufFile ** outerBatchFile
Definition: hashjoin.h:344
FmgrInfo * inner_hashfunctions
Definition: hashjoin.h:352
dsa_pointer current_chunk_shared
Definition: hashjoin.h:374
MemoryContext batchCxt
Definition: hashjoin.h:363
double skewTuples
Definition: hashjoin.h:334
HashSkewBucket ** skewBucket
Definition: hashjoin.h:319
dsa_pointer shared
Definition: hashjoin.h:84
union HashJoinTupleData::@102 next
uint32 hashvalue
Definition: hashjoin.h:86
struct HashJoinTupleData * unshared
Definition: hashjoin.h:83
struct HashMemoryChunkData * unshared
Definition: hashjoin.h:137
dsa_pointer shared
Definition: hashjoin.h:138
union HashMemoryChunkData::@103 next
HashJoinTuple tuples
Definition: hashjoin.h:116
uint32 hashvalue
Definition: hashjoin.h:115
Definition: lwlock.h:42
SharedTuplestoreAccessor * outer_tuples
Definition: hashjoin.h:221
ParallelHashJoinBatch * shared
Definition: hashjoin.h:209
SharedTuplestoreAccessor * inner_tuples
Definition: hashjoin.h:220
dsa_pointer chunks
Definition: hashjoin.h:167
dsa_pointer buckets
Definition: hashjoin.h:164
Barrier grow_batches_barrier
Definition: hashjoin.h:261
dsa_pointer old_batches
Definition: hashjoin.h:249
dsa_pointer chunk_work_queue
Definition: hashjoin.h:254
Barrier grow_buckets_barrier
Definition: hashjoin.h:262
ParallelHashGrowth growth
Definition: hashjoin.h:253
pg_atomic_uint32 distributor
Definition: hashjoin.h:263
SharedFileSet fileset
Definition: hashjoin.h:265
dsa_pointer batches
Definition: hashjoin.h:248
Definition: dsa.c:348