PostgreSQL Source Code git master
Loading...
Searching...
No Matches
hashjoin.h
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * hashjoin.h
4 * internal structures for hash joins
5 *
6 *
7 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * src/include/executor/hashjoin.h
11 *
12 *-------------------------------------------------------------------------
13 */
14#ifndef HASHJOIN_H
15#define HASHJOIN_H
16
17#include "nodes/execnodes.h"
18#include "port/atomics.h"
19#include "storage/barrier.h"
20#include "storage/buffile.h"
21#include "storage/lwlock.h"
22#include "utils/dsa.h"
24
25/* ----------------------------------------------------------------
26 * hash-join hash table structures
27 *
28 * Each active hashjoin has a HashJoinTable structure, which is
29 * palloc'd in the executor's per-query context. Other storage needed for
30 * each hashjoin is kept in child contexts, three for each hashjoin:
31 * - HashTableContext (hashCxt): the parent hash table storage context
32 * - HashSpillContext (spillCxt): storage for temp files buffers
33 * - HashBatchContext (batchCxt): storage for a batch in serial hash join
34 *
35 * The hashtable contexts are made children of the per-query context, ensuring
36 * that they will be discarded at end of statement even if the join is
37 * aborted early by an error. (Likewise, any temporary files we make will
38 * be cleaned up by the virtual file manager in event of an error.)
39 *
40 * Storage that should live through the entire join is allocated from the
41 * "hashCxt" (mainly the hashtable's metadata). Also, the "hashCxt" context is
42 * the parent of "spillCxt" and "batchCxt". It makes it easy and fast to
43 * release the storage when we don't need it anymore.
44 *
45 * Data associated with temp files is allocated in the "spillCxt" context
46 * which lives for the duration of the entire join as batch files'
47 * creation and usage may span batch execution. These files are
48 * explicitly destroyed by calling BufFileClose() when the code is done
49 * with them. The aim of this context is to help accounting for the
50 * memory allocated for temp files and their buffers.
51 *
52 * Finally, data used only during a single batch's execution is allocated
53 * in the "batchCxt". By resetting the batchCxt at the end of each batch,
54 * we free all the per-batch storage reliably and without tedium.
55 *
56 * During first scan of inner relation, we get its tuples from executor.
57 * If nbatch > 1 then tuples that don't belong in first batch get saved
58 * into inner-batch temp files. The same statements apply for the
59 * first scan of the outer relation, except we write tuples to outer-batch
60 * temp files. After finishing the first scan, we do the following for
61 * each remaining batch:
62 * 1. Read tuples from inner batch file, load into hash buckets.
63 * 2. Read tuples from outer batch file, match to hash buckets and output.
64 *
65 * It is possible to increase nbatch on the fly if the in-memory hash table
66 * gets too big. The hash-value-to-batch computation is arranged so that this
67 * can only cause a tuple to go into a later batch than previously thought,
68 * never into an earlier batch. When we increase nbatch, we rescan the hash
69 * table and dump out any tuples that are now of a later batch to the correct
70 * inner batch file. Subsequently, while reading either inner or outer batch
71 * files, we might find tuples that no longer belong to the current batch;
72 * if so, we just dump them out to the correct batch file.
73 *
74 * If an input tuple has a null join key, then it cannot match anything from
75 * the other side of the join. Normally we can just discard such a tuple
76 * immediately, but if it comes from the outer side of an outer join then we
77 * must emit it with null-extension of the other side. For various reasons
78 * it's not convenient to do that immediately on seeing the tuple, so we dump
79 * the tuple into a tuplestore and emit it later. (In the unlikely but
80 * supported case of a non-strict join operator, we treat null keys as normal
81 * data.)
82 * ----------------------------------------------------------------
83 */
84
85/* these are in nodes/execnodes.h: */
86/* typedef struct HashJoinTupleData *HashJoinTuple; */
87/* typedef struct HashJoinTableData *HashJoinTable; */
88
89typedef struct HashJoinTupleData
90{
91 /* link to next tuple in same bucket */
92 union
93 {
97 uint32 hashvalue; /* tuple's hash code */
98 /* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */
100
101#define HJTUPLE_OVERHEAD MAXALIGN(sizeof(HashJoinTupleData))
102#define HJTUPLE_MINTUPLE(hjtup) \
103 ((MinimalTuple) ((char *) (hjtup) + HJTUPLE_OVERHEAD))
104
105/*
106 * If the outer relation's distribution is sufficiently nonuniform, we attempt
107 * to optimize the join by treating the hash values corresponding to the outer
108 * relation's MCVs specially. Inner relation tuples matching these hash
109 * values go into the "skew" hashtable instead of the main hashtable, and
110 * outer relation tuples with these hash values are matched against that
111 * table instead of the main one. Thus, tuples with these hash values are
112 * effectively handled as part of the first batch and will never go to disk.
113 * The skew hashtable is limited to SKEW_HASH_MEM_PERCENT of the total memory
114 * allowed for the join; while building the hashtables, we decrease the number
115 * of MCVs being specially treated if needed to stay under this limit.
116 *
117 * Note: you might wonder why we look at the outer relation stats for this,
118 * rather than the inner. One reason is that the outer relation is typically
119 * bigger, so we get more I/O savings by optimizing for its most common values.
120 * Also, for similarly-sized relations, the planner prefers to put the more
121 * uniformly distributed relation on the inside, so we're more likely to find
122 * interesting skew in the outer relation.
123 */
124typedef struct HashSkewBucket
125{
126 uint32 hashvalue; /* common hash value */
127 HashJoinTuple tuples; /* linked list of inner-relation tuples */
129
130#define SKEW_BUCKET_OVERHEAD MAXALIGN(sizeof(HashSkewBucket))
131#define INVALID_SKEW_BUCKET_NO (-1)
132#define SKEW_HASH_MEM_PERCENT 2
133#define SKEW_MIN_OUTER_FRACTION 0.01
134
135/*
136 * To reduce palloc overhead, the HashJoinTuples for the current batch are
137 * packed in 32kB buffers instead of pallocing each tuple individually.
138 */
140{
141 int ntuples; /* number of tuples stored in this chunk */
142 size_t maxlen; /* size of the chunk's tuple buffer */
143 size_t used; /* number of buffer bytes already used */
144
145 /* pointer to the next chunk (linked list) */
146 union
147 {
151
152 /*
153 * The chunk's tuple buffer starts after the HashMemoryChunkData struct,
154 * at offset HASH_CHUNK_HEADER_SIZE (which must be maxaligned). Note that
155 * that offset is not included in "maxlen" or "used".
156 */
158
160
161#define HASH_CHUNK_SIZE ((Size) (32 * 1024))
162#define HASH_CHUNK_HEADER_SIZE MAXALIGN(sizeof(HashMemoryChunkData))
163#define HASH_CHUNK_DATA(hc) (((char *) (hc)) + HASH_CHUNK_HEADER_SIZE)
164/* tuples exceeding HASH_CHUNK_THRESHOLD bytes are put in their own chunk */
165#define HASH_CHUNK_THRESHOLD (HASH_CHUNK_SIZE / 4)
166
167/*
168 * For each batch of a Parallel Hash Join, we have a ParallelHashJoinBatch
169 * object in shared memory to coordinate access to it. Since they are
170 * followed by variable-sized objects, they are arranged in contiguous memory
171 * but not accessed directly as an array.
172 */
174{
175 dsa_pointer buckets; /* array of hash table buckets */
176 Barrier batch_barrier; /* synchronization for joining this batch */
177
178 dsa_pointer chunks; /* chunks of tuples loaded */
179 size_t size; /* size of buckets + chunks in memory */
180 size_t estimated_size; /* size of buckets + chunks while writing */
181 size_t ntuples; /* number of tuples loaded */
182 size_t old_ntuples; /* number of tuples before repartitioning */
184 bool skip_unmatched; /* whether to abandon unmatched scan */
185
186 /*
187 * Variable-sized SharedTuplestore objects follow this struct in memory.
188 * See the accessor macros below.
189 */
191
192/* Accessor for inner batch tuplestore following a ParallelHashJoinBatch. */
193#define ParallelHashJoinBatchInner(batch) \
194 ((SharedTuplestore *) \
195 ((char *) (batch) + MAXALIGN(sizeof(ParallelHashJoinBatch))))
196
197/* Accessor for outer batch tuplestore following a ParallelHashJoinBatch. */
198#define ParallelHashJoinBatchOuter(batch, nparticipants) \
199 ((SharedTuplestore *) \
200 ((char *) ParallelHashJoinBatchInner(batch) + \
201 MAXALIGN(sts_estimate(nparticipants))))
202
203/* Total size of a ParallelHashJoinBatch and tuplestores. */
204#define EstimateParallelHashJoinBatch(hashtable) \
205 (MAXALIGN(sizeof(ParallelHashJoinBatch)) + \
206 MAXALIGN(sts_estimate((hashtable)->parallel_state->nparticipants)) * 2)
207
208/* Accessor for the nth ParallelHashJoinBatch given the base. */
209#define NthParallelHashJoinBatch(base, n) \
210 ((ParallelHashJoinBatch *) \
211 ((char *) (base) + \
212 EstimateParallelHashJoinBatch(hashtable) * (n)))
213
214/*
215 * Each backend requires a small amount of per-batch state to interact with
216 * each ParallelHashJoinBatch.
217 */
219{
220 ParallelHashJoinBatch *shared; /* pointer to shared state */
221
222 /* Per-backend partial counters to reduce contention. */
223 size_t preallocated; /* pre-allocated space for this backend */
224 size_t ntuples; /* number of tuples */
225 size_t size; /* size of partition in memory */
226 size_t estimated_size; /* size of partition on disk */
227 size_t old_ntuples; /* how many tuples before repartitioning? */
228 bool at_least_one_chunk; /* has this backend allocated a chunk? */
229 bool outer_eof; /* has this process hit end of batch? */
230 bool done; /* flag to remember that a batch is done */
234
235/*
236 * While hashing the inner relation, any participant might determine that it's
237 * time to increase the number of buckets to reduce the load factor or batches
238 * to reduce the memory size. This is indicated by setting the growth flag to
239 * these values.
240 */
242{
243 /* The current dimensions are sufficient. */
245 /* The load factor is too high, so we need to add buckets. */
247 /* The memory budget would be exhausted, so we need to repartition. */
249 /* Repartitioning didn't help last time, so don't try to do that again. */
252
253/*
254 * The shared state used to coordinate a Parallel Hash Join. This is stored
255 * in the DSM segment.
256 */
258{
259 dsa_pointer batches; /* array of ParallelHashJoinBatch */
260 dsa_pointer old_batches; /* previous generation during repartition */
261 int nbatch; /* number of batches now */
262 int old_nbatch; /* previous number of batches */
263 int nbuckets; /* number of buckets */
264 ParallelHashGrowth growth; /* control batch/bucket growth */
265 dsa_pointer chunk_work_queue; /* chunk work queue */
268 size_t total_tuples; /* total number of inner tuples */
269 LWLock lock; /* lock protecting the above */
270
271 Barrier build_barrier; /* synchronization for the build phases */
274 pg_atomic_uint32 distributor; /* counter for load balancing */
275
276 SharedFileSet fileset; /* space for shared temporary files */
278
279/* The phases for building batches, used by build_barrier. */
280#define PHJ_BUILD_ELECT 0
281#define PHJ_BUILD_ALLOCATE 1
282#define PHJ_BUILD_HASH_INNER 2
283#define PHJ_BUILD_HASH_OUTER 3
284#define PHJ_BUILD_RUN 4
285#define PHJ_BUILD_FREE 5
286
287/* The phases for probing each batch, used by for batch_barrier. */
288#define PHJ_BATCH_ELECT 0
289#define PHJ_BATCH_ALLOCATE 1
290#define PHJ_BATCH_LOAD 2
291#define PHJ_BATCH_PROBE 3
292#define PHJ_BATCH_SCAN 4
293#define PHJ_BATCH_FREE 5
294
295/* The phases of batch growth while hashing, for grow_batches_barrier. */
296#define PHJ_GROW_BATCHES_ELECT 0
297#define PHJ_GROW_BATCHES_REALLOCATE 1
298#define PHJ_GROW_BATCHES_REPARTITION 2
299#define PHJ_GROW_BATCHES_DECIDE 3
300#define PHJ_GROW_BATCHES_FINISH 4
301#define PHJ_GROW_BATCHES_PHASE(n) ((n) % 5) /* circular phases */
302
303/* The phases of bucket growth while hashing, for grow_buckets_barrier. */
304#define PHJ_GROW_BUCKETS_ELECT 0
305#define PHJ_GROW_BUCKETS_REALLOCATE 1
306#define PHJ_GROW_BUCKETS_REINSERT 2
307#define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */
308
309typedef struct HashJoinTableData
310{
311 int nbuckets; /* # buckets in the in-memory hash table */
312 int log2_nbuckets; /* its log2 (nbuckets must be a power of 2) */
313
314 int nbuckets_original; /* # buckets when starting the first hash */
315 int nbuckets_optimal; /* optimal # buckets (per batch) */
316 int log2_nbuckets_optimal; /* log2(nbuckets_optimal) */
317
318 /* buckets[i] is head of list of tuples in i'th in-memory bucket */
319 union
320 {
321 /* unshared array is per-batch storage, as are all the tuples */
323 /* shared array is per-query DSA area, as are all the tuples */
326
327 bool skewEnabled; /* are we using skew optimization? */
328 HashSkewBucket **skewBucket; /* hashtable of skew buckets */
329 int skewBucketLen; /* size of skewBucket array (a power of 2!) */
330 int nSkewBuckets; /* number of active skew buckets */
331 int *skewBucketNums; /* array indexes of active skew buckets */
332
333 int nbatch; /* number of batches */
334 int curbatch; /* current batch #; 0 during 1st pass */
335
336 int nbatch_original; /* nbatch when we started inner scan */
337 int nbatch_outstart; /* nbatch when we started outer scan */
338
339 bool growEnabled; /* flag to shut off nbatch increases */
340
341 /*
342 * totalTuples is the running total of tuples inserted into either the
343 * main or skew hash tables. reportTuples is the number of tuples that we
344 * want EXPLAIN to show as output from the Hash node (this includes saved
345 * null-keyed tuples as well as those inserted into the hash tables).
346 * skewTuples is the number of tuples present in the skew hash table.
347 */
351
352 /*
353 * These arrays are allocated for the life of the hash join, but only if
354 * nbatch > 1. A file is opened only when we first write a tuple into it
355 * (otherwise its pointer remains NULL). Note that the zero'th array
356 * elements never get used, since we will process rather than dump out any
357 * tuples of batch zero.
358 */
359 BufFile **innerBatchFile; /* buffered virtual temp file per batch */
360 BufFile **outerBatchFile; /* buffered virtual temp file per batch */
361
362 Size spaceUsed; /* memory space currently used by tuples */
363 Size spaceAllowed; /* upper limit for space used */
364 Size spacePeak; /* peak space used */
365 Size spaceUsedSkew; /* skew hash table's current space usage */
366 Size spaceAllowedSkew; /* upper limit for skew hashtable */
367
368 MemoryContext hashCxt; /* context for whole-hash-join storage */
369 MemoryContext batchCxt; /* context for this-batch-only storage */
370 MemoryContext spillCxt; /* context for spilling to temp files */
371
372 /* used for dense allocation of tuples (into linked chunks) */
373 HashMemoryChunk chunks; /* one list for the whole batch */
374
375 /* Shared and private state for Parallel Hash. */
376 HashMemoryChunk current_chunk; /* this backend's current chunk */
377 dsa_area *area; /* DSA area to allocate memory from */
382
383#endif /* HASHJOIN_H */
uint32_t uint32
Definition c.h:618
size_t Size
Definition c.h:691
uint64 dsa_pointer
Definition dsa.h:62
struct HashMemoryChunkData * HashMemoryChunk
Definition hashjoin.h:159
ParallelHashGrowth
Definition hashjoin.h:242
@ PHJ_GROWTH_NEED_MORE_BUCKETS
Definition hashjoin.h:246
@ PHJ_GROWTH_OK
Definition hashjoin.h:244
@ PHJ_GROWTH_NEED_MORE_BATCHES
Definition hashjoin.h:248
@ PHJ_GROWTH_DISABLED
Definition hashjoin.h:250
struct HashJoinTupleData ** unshared
Definition hashjoin.h:322
HashMemoryChunk chunks
Definition hashjoin.h:373
double reportTuples
Definition hashjoin.h:349
ParallelHashJoinBatchAccessor * batches
Definition hashjoin.h:379
MemoryContext hashCxt
Definition hashjoin.h:368
ParallelHashJoinState * parallel_state
Definition hashjoin.h:378
MemoryContext spillCxt
Definition hashjoin.h:370
union HashJoinTableData::@113 buckets
HashMemoryChunk current_chunk
Definition hashjoin.h:376
BufFile ** innerBatchFile
Definition hashjoin.h:359
int log2_nbuckets_optimal
Definition hashjoin.h:316
dsa_pointer_atomic * shared
Definition hashjoin.h:324
dsa_area * area
Definition hashjoin.h:377
BufFile ** outerBatchFile
Definition hashjoin.h:360
dsa_pointer current_chunk_shared
Definition hashjoin.h:380
MemoryContext batchCxt
Definition hashjoin.h:369
HashSkewBucket ** skewBucket
Definition hashjoin.h:328
dsa_pointer shared
Definition hashjoin.h:95
union HashJoinTupleData::@111 next
struct HashJoinTupleData * unshared
Definition hashjoin.h:94
struct HashMemoryChunkData * unshared
Definition hashjoin.h:148
dsa_pointer shared
Definition hashjoin.h:149
union HashMemoryChunkData::@112 next
HashJoinTuple tuples
Definition hashjoin.h:127
uint32 hashvalue
Definition hashjoin.h:126
SharedTuplestoreAccessor * outer_tuples
Definition hashjoin.h:232
ParallelHashJoinBatch * shared
Definition hashjoin.h:220
SharedTuplestoreAccessor * inner_tuples
Definition hashjoin.h:231
dsa_pointer chunks
Definition hashjoin.h:178
dsa_pointer buckets
Definition hashjoin.h:175
Barrier grow_batches_barrier
Definition hashjoin.h:272
dsa_pointer old_batches
Definition hashjoin.h:260
dsa_pointer chunk_work_queue
Definition hashjoin.h:265
Barrier grow_buckets_barrier
Definition hashjoin.h:273
ParallelHashGrowth growth
Definition hashjoin.h:264
pg_atomic_uint32 distributor
Definition hashjoin.h:274
SharedFileSet fileset
Definition hashjoin.h:276
dsa_pointer batches
Definition hashjoin.h:259