PostgreSQL Source Code  git master
hashjoin.h
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * hashjoin.h
4  * internal structures for hash joins
5  *
6  *
7  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * src/include/executor/hashjoin.h
11  *
12  *-------------------------------------------------------------------------
13  */
14 #ifndef HASHJOIN_H
15 #define HASHJOIN_H
16 
17 #include "nodes/execnodes.h"
18 #include "port/atomics.h"
19 #include "storage/barrier.h"
20 #include "storage/buffile.h"
21 #include "storage/lwlock.h"
22 
23 /* ----------------------------------------------------------------
24  * hash-join hash table structures
25  *
26  * Each active hashjoin has a HashJoinTable control block, which is
27  * palloc'd in the executor's per-query context. All other storage needed
28  * for the hashjoin is kept in private memory contexts, two for each hashjoin.
29  * This makes it easy and fast to release the storage when we don't need it
30  * anymore. (Exception: data associated with the temp files lives in the
31  * per-query context too, since we always call buffile.c in that context.)
32  *
33  * The hashtable contexts are made children of the per-query context, ensuring
34  * that they will be discarded at end of statement even if the join is
35  * aborted early by an error. (Likewise, any temporary files we make will
36  * be cleaned up by the virtual file manager in event of an error.)
37  *
38  * Storage that should live through the entire join is allocated from the
39  * "hashCxt", while storage that is only wanted for the current batch is
40  * allocated in the "batchCxt". By resetting the batchCxt at the end of
41  * each batch, we free all the per-batch storage reliably and without tedium.
42  *
43  * During first scan of inner relation, we get its tuples from executor.
44  * If nbatch > 1 then tuples that don't belong in first batch get saved
45  * into inner-batch temp files. The same statements apply for the
46  * first scan of the outer relation, except we write tuples to outer-batch
47  * temp files. After finishing the first scan, we do the following for
48  * each remaining batch:
49  * 1. Read tuples from inner batch file, load into hash buckets.
50  * 2. Read tuples from outer batch file, match to hash buckets and output.
51  *
52  * It is possible to increase nbatch on the fly if the in-memory hash table
53  * gets too big. The hash-value-to-batch computation is arranged so that this
54  * can only cause a tuple to go into a later batch than previously thought,
55  * never into an earlier batch. When we increase nbatch, we rescan the hash
56  * table and dump out any tuples that are now of a later batch to the correct
57  * inner batch file. Subsequently, while reading either inner or outer batch
58  * files, we might find tuples that no longer belong to the current batch;
59  * if so, we just dump them out to the correct batch file.
60  * ----------------------------------------------------------------
61  */
62 
63 /* these are in nodes/execnodes.h: */
64 /* typedef struct HashJoinTupleData *HashJoinTuple; */
65 /* typedef struct HashJoinTableData *HashJoinTable; */
66 
67 typedef struct HashJoinTupleData
68 {
69  /* link to next tuple in same bucket */
70  union
71  {
74  } next;
75  uint32 hashvalue; /* tuple's hash code */
76  /* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */
78 
79 #define HJTUPLE_OVERHEAD MAXALIGN(sizeof(HashJoinTupleData))
80 #define HJTUPLE_MINTUPLE(hjtup) \
81  ((MinimalTuple) ((char *) (hjtup) + HJTUPLE_OVERHEAD))
82 
83 /*
84  * If the outer relation's distribution is sufficiently nonuniform, we attempt
85  * to optimize the join by treating the hash values corresponding to the outer
86  * relation's MCVs specially. Inner relation tuples matching these hash
87  * values go into the "skew" hashtable instead of the main hashtable, and
88  * outer relation tuples with these hash values are matched against that
89  * table instead of the main one. Thus, tuples with these hash values are
90  * effectively handled as part of the first batch and will never go to disk.
91  * The skew hashtable is limited to SKEW_WORK_MEM_PERCENT of the total memory
92  * allowed for the join; while building the hashtables, we decrease the number
93  * of MCVs being specially treated if needed to stay under this limit.
94  *
95  * Note: you might wonder why we look at the outer relation stats for this,
96  * rather than the inner. One reason is that the outer relation is typically
97  * bigger, so we get more I/O savings by optimizing for its most common values.
98  * Also, for similarly-sized relations, the planner prefers to put the more
99  * uniformly distributed relation on the inside, so we're more likely to find
100  * interesting skew in the outer relation.
101  */
102 typedef struct HashSkewBucket
103 {
104  uint32 hashvalue; /* common hash value */
105  HashJoinTuple tuples; /* linked list of inner-relation tuples */
107 
108 #define SKEW_BUCKET_OVERHEAD MAXALIGN(sizeof(HashSkewBucket))
109 #define INVALID_SKEW_BUCKET_NO (-1)
110 #define SKEW_WORK_MEM_PERCENT 2
111 #define SKEW_MIN_OUTER_FRACTION 0.01
112 
113 /*
114  * To reduce palloc overhead, the HashJoinTuples for the current batch are
115  * packed in 32kB buffers instead of pallocing each tuple individually.
116  */
117 typedef struct HashMemoryChunkData
118 {
119  int ntuples; /* number of tuples stored in this chunk */
120  size_t maxlen; /* size of the chunk's tuple buffer */
121  size_t used; /* number of buffer bytes already used */
122 
123  /* pointer to the next chunk (linked list) */
124  union
125  {
128  } next;
129 
130  /*
131  * The chunk's tuple buffer starts after the HashMemoryChunkData struct,
132  * at offset HASH_CHUNK_HEADER_SIZE (which must be maxaligned). Note that
133  * that offset is not included in "maxlen" or "used".
134  */
136 
138 
139 #define HASH_CHUNK_SIZE (32 * 1024L)
140 #define HASH_CHUNK_HEADER_SIZE MAXALIGN(sizeof(HashMemoryChunkData))
141 #define HASH_CHUNK_DATA(hc) (((char *) (hc)) + HASH_CHUNK_HEADER_SIZE)
142 /* tuples exceeding HASH_CHUNK_THRESHOLD bytes are put in their own chunk */
143 #define HASH_CHUNK_THRESHOLD (HASH_CHUNK_SIZE / 4)
144 
145 /*
146  * For each batch of a Parallel Hash Join, we have a ParallelHashJoinBatch
147  * object in shared memory to coordinate access to it. Since they are
148  * followed by variable-sized objects, they are arranged in contiguous memory
149  * but not accessed directly as an array.
150  */
151 typedef struct ParallelHashJoinBatch
152 {
153  dsa_pointer buckets; /* array of hash table buckets */
154  Barrier batch_barrier; /* synchronization for joining this batch */
155 
156  dsa_pointer chunks; /* chunks of tuples loaded */
157  size_t size; /* size of buckets + chunks in memory */
158  size_t estimated_size; /* size of buckets + chunks while writing */
159  size_t ntuples; /* number of tuples loaded */
160  size_t old_ntuples; /* number of tuples before repartitioning */
162 
163  /*
164  * Variable-sized SharedTuplestore objects follow this struct in memory.
165  * See the accessor macros below.
166  */
168 
169 /* Accessor for inner batch tuplestore following a ParallelHashJoinBatch. */
170 #define ParallelHashJoinBatchInner(batch) \
171  ((SharedTuplestore *) \
172  ((char *) (batch) + MAXALIGN(sizeof(ParallelHashJoinBatch))))
173 
174 /* Accessor for outer batch tuplestore following a ParallelHashJoinBatch. */
175 #define ParallelHashJoinBatchOuter(batch, nparticipants) \
176  ((SharedTuplestore *) \
177  ((char *) ParallelHashJoinBatchInner(batch) + \
178  MAXALIGN(sts_estimate(nparticipants))))
179 
180 /* Total size of a ParallelHashJoinBatch and tuplestores. */
181 #define EstimateParallelHashJoinBatch(hashtable) \
182  (MAXALIGN(sizeof(ParallelHashJoinBatch)) + \
183  MAXALIGN(sts_estimate((hashtable)->parallel_state->nparticipants)) * 2)
184 
185 /* Accessor for the nth ParallelHashJoinBatch given the base. */
186 #define NthParallelHashJoinBatch(base, n) \
187  ((ParallelHashJoinBatch *) \
188  ((char *) (base) + \
189  EstimateParallelHashJoinBatch(hashtable) * (n)))
190 
191 /*
192  * Each backend requires a small amount of per-batch state to interact with
193  * each ParalellHashJoinBatch.
194  */
196 {
197  ParallelHashJoinBatch *shared; /* pointer to shared state */
198 
199  /* Per-backend partial counters to reduce contention. */
200  size_t preallocated; /* pre-allocated space for this backend */
201  size_t ntuples; /* number of tuples */
202  size_t size; /* size of partition in memory */
203  size_t estimated_size; /* size of partition on disk */
204  size_t old_ntuples; /* how many tuples before repartioning? */
205  bool at_least_one_chunk; /* has this backend allocated a chunk? */
206 
207  bool done; /* flag to remember that a batch is done */
211 
212 /*
213  * While hashing the inner relation, any participant might determine that it's
214  * time to increase the number of buckets to reduce the load factor or batches
215  * to reduce the memory size. This is indicated by setting the growth flag to
216  * these values.
217  */
218 typedef enum ParallelHashGrowth
219 {
220  /* The current dimensions are sufficient. */
222  /* The load factor is too high, so we need to add buckets. */
224  /* The memory budget would be exhausted, so we need to repartition. */
226  /* Repartitioning didn't help last time, so don't try to do that again. */
229 
230 /*
231  * The shared state used to coordinate a Parallel Hash Join. This is stored
232  * in the DSM segment.
233  */
234 typedef struct ParallelHashJoinState
235 {
236  dsa_pointer batches; /* array of ParallelHashJoinBatch */
237  dsa_pointer old_batches; /* previous generation during repartition */
238  int nbatch; /* number of batches now */
239  int old_nbatch; /* previous number of batches */
240  int nbuckets; /* number of buckets */
241  ParallelHashGrowth growth; /* control batch/bucket growth */
242  dsa_pointer chunk_work_queue; /* chunk work queue */
245  size_t total_tuples; /* total number of inner tuples */
246  LWLock lock; /* lock protecting the above */
247 
248  Barrier build_barrier; /* synchronization for the build phases */
251  pg_atomic_uint32 distributor; /* counter for load balancing */
252 
253  SharedFileSet fileset; /* space for shared temporary files */
255 
256 /* The phases for building batches, used by build_barrier. */
257 #define PHJ_BUILD_ELECTING 0
258 #define PHJ_BUILD_ALLOCATING 1
259 #define PHJ_BUILD_HASHING_INNER 2
260 #define PHJ_BUILD_HASHING_OUTER 3
261 #define PHJ_BUILD_DONE 4
262 
263 /* The phases for probing each batch, used by for batch_barrier. */
264 #define PHJ_BATCH_ELECTING 0
265 #define PHJ_BATCH_ALLOCATING 1
266 #define PHJ_BATCH_LOADING 2
267 #define PHJ_BATCH_PROBING 3
268 #define PHJ_BATCH_DONE 4
269 
270 /* The phases of batch growth while hashing, for grow_batches_barrier. */
271 #define PHJ_GROW_BATCHES_ELECTING 0
272 #define PHJ_GROW_BATCHES_ALLOCATING 1
273 #define PHJ_GROW_BATCHES_REPARTITIONING 2
274 #define PHJ_GROW_BATCHES_DECIDING 3
275 #define PHJ_GROW_BATCHES_FINISHING 4
276 #define PHJ_GROW_BATCHES_PHASE(n) ((n) % 5) /* circular phases */
277 
278 /* The phases of bucket growth while hashing, for grow_buckets_barrier. */
279 #define PHJ_GROW_BUCKETS_ELECTING 0
280 #define PHJ_GROW_BUCKETS_ALLOCATING 1
281 #define PHJ_GROW_BUCKETS_REINSERTING 2
282 #define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */
283 
284 typedef struct HashJoinTableData
285 {
286  int nbuckets; /* # buckets in the in-memory hash table */
287  int log2_nbuckets; /* its log2 (nbuckets must be a power of 2) */
288 
289  int nbuckets_original; /* # buckets when starting the first hash */
290  int nbuckets_optimal; /* optimal # buckets (per batch) */
291  int log2_nbuckets_optimal; /* log2(nbuckets_optimal) */
292 
293  /* buckets[i] is head of list of tuples in i'th in-memory bucket */
294  union
295  {
296  /* unshared array is per-batch storage, as are all the tuples */
298  /* shared array is per-query DSA area, as are all the tuples */
300  } buckets;
301 
302  bool keepNulls; /* true to store unmatchable NULL tuples */
303 
304  bool skewEnabled; /* are we using skew optimization? */
305  HashSkewBucket **skewBucket; /* hashtable of skew buckets */
306  int skewBucketLen; /* size of skewBucket array (a power of 2!) */
307  int nSkewBuckets; /* number of active skew buckets */
308  int *skewBucketNums; /* array indexes of active skew buckets */
309 
310  int nbatch; /* number of batches */
311  int curbatch; /* current batch #; 0 during 1st pass */
312 
313  int nbatch_original; /* nbatch when we started inner scan */
314  int nbatch_outstart; /* nbatch when we started outer scan */
315 
316  bool growEnabled; /* flag to shut off nbatch increases */
317 
318  double totalTuples; /* # tuples obtained from inner plan */
319  double partialTuples; /* # tuples obtained from inner plan by me */
320  double skewTuples; /* # tuples inserted into skew tuples */
321 
322  /*
323  * These arrays are allocated for the life of the hash join, but only if
324  * nbatch > 1. A file is opened only when we first write a tuple into it
325  * (otherwise its pointer remains NULL). Note that the zero'th array
326  * elements never get used, since we will process rather than dump out any
327  * tuples of batch zero.
328  */
329  BufFile **innerBatchFile; /* buffered virtual temp file per batch */
330  BufFile **outerBatchFile; /* buffered virtual temp file per batch */
331 
332  /*
333  * Info about the datatype-specific hash functions for the datatypes being
334  * hashed. These are arrays of the same length as the number of hash join
335  * clauses (hash keys).
336  */
337  FmgrInfo *outer_hashfunctions; /* lookup data for hash functions */
338  FmgrInfo *inner_hashfunctions; /* lookup data for hash functions */
339  bool *hashStrict; /* is each hash join operator strict? */
340 
341  Size spaceUsed; /* memory space currently used by tuples */
342  Size spaceAllowed; /* upper limit for space used */
343  Size spacePeak; /* peak space used */
344  Size spaceUsedSkew; /* skew hash table's current space usage */
345  Size spaceAllowedSkew; /* upper limit for skew hashtable */
346 
347  MemoryContext hashCxt; /* context for whole-hash-join storage */
348  MemoryContext batchCxt; /* context for this-batch-only storage */
349 
350  /* used for dense allocation of tuples (into linked chunks) */
351  HashMemoryChunk chunks; /* one list for the whole batch */
352 
353  /* Shared and private state for Parallel Hash. */
354  HashMemoryChunk current_chunk; /* this backend's current chunk */
355  dsa_area *area; /* DSA area to allocate memory from */
360 
361 #endif /* HASHJOIN_H */
dsa_pointer current_chunk_shared
Definition: hashjoin.h:358
int log2_nbuckets_optimal
Definition: hashjoin.h:291
double skewTuples
Definition: hashjoin.h:320
struct ParallelHashJoinBatchAccessor ParallelHashJoinBatchAccessor
SharedTuplestoreAccessor * outer_tuples
Definition: hashjoin.h:209
dsa_pointer_atomic * shared
Definition: hashjoin.h:299
Definition: fmgr.h:56
Definition: lwlock.h:32
dsa_pointer chunk_work_queue
Definition: hashjoin.h:242
union HashJoinTupleData::@95 next
struct HashMemoryChunkData HashMemoryChunkData
dsa_pointer shared
Definition: hashjoin.h:73
dsa_pointer chunks
Definition: hashjoin.h:156
struct ParallelHashJoinState ParallelHashJoinState
struct ParallelHashJoinBatch ParallelHashJoinBatch
FmgrInfo * inner_hashfunctions
Definition: hashjoin.h:338
SharedFileSet fileset
Definition: hashjoin.h:253
struct HashJoinTableData HashJoinTableData
uint64 dsa_pointer
Definition: dsa.h:62
double partialTuples
Definition: hashjoin.h:319
SharedTuplestoreAccessor * inner_tuples
Definition: hashjoin.h:208
dsa_area * area
Definition: hashjoin.h:355
dsa_pointer shared
Definition: hashjoin.h:127
int * skewBucketNums
Definition: hashjoin.h:308
Barrier grow_buckets_barrier
Definition: hashjoin.h:250
struct HashJoinTupleData * unshared
Definition: hashjoin.h:72
dsa_pointer batches
Definition: hashjoin.h:236
BufFile ** outerBatchFile
Definition: hashjoin.h:330
Size spaceAllowedSkew
Definition: hashjoin.h:345
HashJoinTuple tuples
Definition: hashjoin.h:105
struct HashMemoryChunkData * unshared
Definition: hashjoin.h:126
unsigned int uint32
Definition: c.h:306
MemoryContext batchCxt
Definition: hashjoin.h:348
struct HashSkewBucket HashSkewBucket
FmgrInfo * outer_hashfunctions
Definition: hashjoin.h:337
HashSkewBucket ** skewBucket
Definition: hashjoin.h:305
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:356
struct HashMemoryChunkData * HashMemoryChunk
Definition: hashjoin.h:137
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:357
double totalTuples
Definition: hashjoin.h:318
uint32 hashvalue
Definition: hashjoin.h:104
ParallelHashGrowth growth
Definition: hashjoin.h:241
dsa_pointer old_batches
Definition: hashjoin.h:237
size_t Size
Definition: c.h:414
BufFile ** innerBatchFile
Definition: hashjoin.h:329
ParallelHashJoinBatch * shared
Definition: hashjoin.h:197
HashMemoryChunk chunks
Definition: hashjoin.h:351
Definition: dsa.c:354
pg_atomic_uint32 distributor
Definition: hashjoin.h:251
struct HashJoinTupleData ** unshared
Definition: hashjoin.h:297
HashMemoryChunk current_chunk
Definition: hashjoin.h:354
struct HashJoinTupleData HashJoinTupleData
bool * hashStrict
Definition: hashjoin.h:339
MemoryContext hashCxt
Definition: hashjoin.h:347
Barrier grow_batches_barrier
Definition: hashjoin.h:249
ParallelHashGrowth
Definition: hashjoin.h:218
uint32 hashvalue
Definition: hashjoin.h:75
dsa_pointer buckets
Definition: hashjoin.h:153