PostgreSQL Source Code  git master
nodeHashjoin.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeHashjoin.c
4  * Routines to handle hash join nodes
5  *
6  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/executor/nodeHashjoin.c
12  *
13  * PARALLELISM
14  *
15  * Hash joins can participate in parallel query execution in several ways. A
16  * parallel-oblivious hash join is one where the node is unaware that it is
17  * part of a parallel plan. In this case, a copy of the inner plan is used to
18  * build a copy of the hash table in every backend, and the outer plan could
19  * either be built from a partial or complete path, so that the results of the
20  * hash join are correspondingly either partial or complete. A parallel-aware
21  * hash join is one that behaves differently, coordinating work between
22  * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel
23  * Hash Join always appears with a Parallel Hash node.
24  *
25  * Parallel-aware hash joins use the same per-backend state machine to track
26  * progress through the hash join algorithm as parallel-oblivious hash joins.
27  * In a parallel-aware hash join, there is also a shared state machine that
28  * co-operating backends use to synchronize their local state machines and
29  * program counters. The shared state machine is managed with a Barrier IPC
30  * primitive. When all attached participants arrive at a barrier, the phase
31  * advances and all waiting participants are released.
32  *
33  * When a participant begins working on a parallel hash join, it must first
34  * figure out how much progress has already been made, because participants
35  * don't wait for each other to begin. For this reason there are switch
36  * statements at key points in the code where we have to synchronize our local
37  * state machine with the phase, and then jump to the correct part of the
38  * algorithm so that we can get started.
39  *
40  * One barrier called build_barrier is used to coordinate the hashing phases.
41  * The phase is represented by an integer which begins at zero and increments
42  * one by one, but in the code it is referred to by symbolic names as follows:
43  *
44  * PHJ_BUILD_ELECTING -- initial state
45  * PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0
46  * PHJ_BUILD_HASHING_INNER -- all hash the inner rel
47  * PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer
48  * PHJ_BUILD_DONE -- building done, probing can begin
49  *
50  * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
51  * be used repeatedly as required to coordinate expansions in the number of
52  * batches or buckets. Their phases are as follows:
53  *
54  * PHJ_GROW_BATCHES_ELECTING -- initial state
55  * PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches
56  * PHJ_GROW_BATCHES_REPARTITIONING -- all repartition
57  * PHJ_GROW_BATCHES_FINISHING -- one cleans up, detects skew
58  *
59  * PHJ_GROW_BUCKETS_ELECTING -- initial state
60  * PHJ_GROW_BUCKETS_ALLOCATING -- one allocates new buckets
61  * PHJ_GROW_BUCKETS_REINSERTING -- all insert tuples
62  *
63  * If the planner got the number of batches and buckets right, those won't be
64  * necessary, but on the other hand we might finish up needing to expand the
65  * buckets or batches multiple times while hashing the inner relation to stay
66  * within our memory budget and load factor target. For that reason it's a
67  * separate pair of barriers using circular phases.
68  *
69  * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins,
70  * because we need to divide the outer relation into batches up front in order
71  * to be able to process batches entirely independently. In contrast, the
72  * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
73  * batches whenever it encounters them while scanning and probing, which it
74  * can do because it processes batches in serial order.
75  *
76  * Once PHJ_BUILD_DONE is reached, backends then split up and process
77  * different batches, or gang up and work together on probing batches if there
78  * aren't enough to go around. For each batch there is a separate barrier
79  * with the following phases:
80  *
81  * PHJ_BATCH_ELECTING -- initial state
82  * PHJ_BATCH_ALLOCATING -- one allocates buckets
83  * PHJ_BATCH_LOADING -- all load the hash table from disk
84  * PHJ_BATCH_PROBING -- all probe
85  * PHJ_BATCH_DONE -- end
86  *
87  * Batch 0 is a special case, because it starts out in phase
88  * PHJ_BATCH_PROBING; populating batch 0's hash table is done during
89  * PHJ_BUILD_HASHING_INNER so we can skip loading.
90  *
91  * Initially we try to plan for a single-batch hash join using the combined
92  * hash_mem of all participants to create a large shared hash table. If that
93  * turns out either at planning or execution time to be impossible then we
94  * fall back to regular hash_mem sized hash tables.
95  *
96  * To avoid deadlocks, we never wait for any barrier unless it is known that
97  * all other backends attached to it are actively executing the node or have
98  * already arrived. Practically, that means that we never return a tuple
99  * while attached to a barrier, unless the barrier has reached its final
100  * state. In the slightly special case of the per-batch barrier, we return
101  * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
102  * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
103  *
104  *-------------------------------------------------------------------------
105  */
106 
107 #include "postgres.h"
108 
109 #include "access/htup_details.h"
110 #include "access/parallel.h"
111 #include "executor/executor.h"
112 #include "executor/hashjoin.h"
113 #include "executor/nodeHash.h"
114 #include "executor/nodeHashjoin.h"
115 #include "miscadmin.h"
116 #include "pgstat.h"
117 #include "utils/memutils.h"
118 #include "utils/sharedtuplestore.h"
119 
120 
121 /*
122  * States of the ExecHashJoin state machine
123  */
124 #define HJ_BUILD_HASHTABLE 1
125 #define HJ_NEED_NEW_OUTER 2
126 #define HJ_SCAN_BUCKET 3
127 #define HJ_FILL_OUTER_TUPLE 4
128 #define HJ_FILL_INNER_TUPLES 5
129 #define HJ_NEED_NEW_BATCH 6
130 
131 /* Returns true if doing null-fill on outer relation */
132 #define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
133 /* Returns true if doing null-fill on inner relation */
134 #define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
135 
137  HashJoinState *hjstate,
138  uint32 *hashvalue);
140  HashJoinState *hjstate,
141  uint32 *hashvalue);
143  BufFile *file,
144  uint32 *hashvalue,
145  TupleTableSlot *tupleSlot);
146 static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
147 static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
149 
150 
151 /* ----------------------------------------------------------------
152  * ExecHashJoinImpl
153  *
154  * This function implements the Hybrid Hashjoin algorithm. It is marked
155  * with an always-inline attribute so that ExecHashJoin() and
156  * ExecParallelHashJoin() can inline it. Compilers that respect the
157  * attribute should create versions specialized for parallel == true and
158  * parallel == false with unnecessary branches removed.
159  *
160  * Note: the relation we build hash table on is the "inner"
161  * the other one is "outer".
162  * ----------------------------------------------------------------
163  */
165 ExecHashJoinImpl(PlanState *pstate, bool parallel)
166 {
167  HashJoinState *node = castNode(HashJoinState, pstate);
168  PlanState *outerNode;
169  HashState *hashNode;
170  ExprState *joinqual;
171  ExprState *otherqual;
172  ExprContext *econtext;
173  HashJoinTable hashtable;
174  TupleTableSlot *outerTupleSlot;
175  uint32 hashvalue;
176  int batchno;
177  ParallelHashJoinState *parallel_state;
178 
179  /*
180  * get information from HashJoin node
181  */
182  joinqual = node->js.joinqual;
183  otherqual = node->js.ps.qual;
184  hashNode = (HashState *) innerPlanState(node);
185  outerNode = outerPlanState(node);
186  hashtable = node->hj_HashTable;
187  econtext = node->js.ps.ps_ExprContext;
188  parallel_state = hashNode->parallel_state;
189 
190  /*
191  * Reset per-tuple memory context to free any expression evaluation
192  * storage allocated in the previous tuple cycle.
193  */
194  ResetExprContext(econtext);
195 
196  /*
197  * run the hash join state machine
198  */
199  for (;;)
200  {
201  /*
202  * It's possible to iterate this loop many times before returning a
203  * tuple, in some pathological cases such as needing to move much of
204  * the current batch to a later batch. So let's check for interrupts
205  * each time through.
206  */
208 
209  switch (node->hj_JoinState)
210  {
211  case HJ_BUILD_HASHTABLE:
212 
213  /*
214  * First time through: build hash table for inner relation.
215  */
216  Assert(hashtable == NULL);
217 
218  /*
219  * If the outer relation is completely empty, and it's not
220  * right/full join, we can quit without building the hash
221  * table. However, for an inner join it is only a win to
222  * check this when the outer relation's startup cost is less
223  * than the projected cost of building the hash table.
224  * Otherwise it's best to build the hash table first and see
225  * if the inner relation is empty. (When it's a left join, we
226  * should always make this check, since we aren't going to be
227  * able to skip the join on the strength of an empty inner
228  * relation anyway.)
229  *
230  * If we are rescanning the join, we make use of information
231  * gained on the previous scan: don't bother to try the
232  * prefetch if the previous scan found the outer relation
233  * nonempty. This is not 100% reliable since with new
234  * parameters the outer relation might yield different
235  * results, but it's a good heuristic.
236  *
237  * The only way to make the check is to try to fetch a tuple
238  * from the outer plan node. If we succeed, we have to stash
239  * it away for later consumption by ExecHashJoinOuterGetTuple.
240  */
241  if (HJ_FILL_INNER(node))
242  {
243  /* no chance to not build the hash table */
244  node->hj_FirstOuterTupleSlot = NULL;
245  }
246  else if (parallel)
247  {
248  /*
249  * The empty-outer optimization is not implemented for
250  * shared hash tables, because no one participant can
251  * determine that there are no outer tuples, and it's not
252  * yet clear that it's worth the synchronization overhead
253  * of reaching consensus to figure that out. So we have
254  * to build the hash table.
255  */
256  node->hj_FirstOuterTupleSlot = NULL;
257  }
258  else if (HJ_FILL_OUTER(node) ||
259  (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
260  !node->hj_OuterNotEmpty))
261  {
262  node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
264  {
265  node->hj_OuterNotEmpty = false;
266  return NULL;
267  }
268  else
269  node->hj_OuterNotEmpty = true;
270  }
271  else
272  node->hj_FirstOuterTupleSlot = NULL;
273 
274  /*
275  * Create the hash table. If using Parallel Hash, then
276  * whoever gets here first will create the hash table and any
277  * later arrivals will merely attach to it.
278  */
279  hashtable = ExecHashTableCreate(hashNode,
280  node->hj_HashOperators,
281  node->hj_Collations,
282  HJ_FILL_INNER(node));
283  node->hj_HashTable = hashtable;
284 
285  /*
286  * Execute the Hash node, to build the hash table. If using
287  * Parallel Hash, then we'll try to help hashing unless we
288  * arrived too late.
289  */
290  hashNode->hashtable = hashtable;
291  (void) MultiExecProcNode((PlanState *) hashNode);
292 
293  /*
294  * If the inner relation is completely empty, and we're not
295  * doing a left outer join, we can quit without scanning the
296  * outer relation.
297  */
298  if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
299  return NULL;
300 
301  /*
302  * need to remember whether nbatch has increased since we
303  * began scanning the outer relation
304  */
305  hashtable->nbatch_outstart = hashtable->nbatch;
306 
307  /*
308  * Reset OuterNotEmpty for scan. (It's OK if we fetched a
309  * tuple above, because ExecHashJoinOuterGetTuple will
310  * immediately set it again.)
311  */
312  node->hj_OuterNotEmpty = false;
313 
314  if (parallel)
315  {
316  Barrier *build_barrier;
317 
318  build_barrier = &parallel_state->build_barrier;
319  Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
320  BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
321  if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
322  {
323  /*
324  * If multi-batch, we need to hash the outer relation
325  * up front.
326  */
327  if (hashtable->nbatch > 1)
329  BarrierArriveAndWait(build_barrier,
331  }
332  Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
333 
334  /* Each backend should now select a batch to work on. */
335  hashtable->curbatch = -1;
337 
338  continue;
339  }
340  else
342 
343  /* FALL THRU */
344 
345  case HJ_NEED_NEW_OUTER:
346 
347  /*
348  * We don't have an outer tuple, try to get the next one
349  */
350  if (parallel)
351  outerTupleSlot =
352  ExecParallelHashJoinOuterGetTuple(outerNode, node,
353  &hashvalue);
354  else
355  outerTupleSlot =
356  ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
357 
358  if (TupIsNull(outerTupleSlot))
359  {
360  /* end of batch, or maybe whole join */
361  if (HJ_FILL_INNER(node))
362  {
363  /* set up to scan for unmatched inner tuples */
366  }
367  else
369  continue;
370  }
371 
372  econtext->ecxt_outertuple = outerTupleSlot;
373  node->hj_MatchedOuter = false;
374 
375  /*
376  * Find the corresponding bucket for this tuple in the main
377  * hash table or skew hash table.
378  */
379  node->hj_CurHashValue = hashvalue;
380  ExecHashGetBucketAndBatch(hashtable, hashvalue,
381  &node->hj_CurBucketNo, &batchno);
382  node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
383  hashvalue);
384  node->hj_CurTuple = NULL;
385 
386  /*
387  * The tuple might not belong to the current batch (where
388  * "current batch" includes the skew buckets if any).
389  */
390  if (batchno != hashtable->curbatch &&
392  {
393  bool shouldFree;
394  MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
395  &shouldFree);
396 
397  /*
398  * Need to postpone this outer tuple to a later batch.
399  * Save it in the corresponding outer-batch file.
400  */
401  Assert(parallel_state == NULL);
402  Assert(batchno > hashtable->curbatch);
403  ExecHashJoinSaveTuple(mintuple, hashvalue,
404  &hashtable->outerBatchFile[batchno]);
405 
406  if (shouldFree)
407  heap_free_minimal_tuple(mintuple);
408 
409  /* Loop around, staying in HJ_NEED_NEW_OUTER state */
410  continue;
411  }
412 
413  /* OK, let's scan the bucket for matches */
415 
416  /* FALL THRU */
417 
418  case HJ_SCAN_BUCKET:
419 
420  /*
421  * Scan the selected hash bucket for matches to current outer
422  */
423  if (parallel)
424  {
425  if (!ExecParallelScanHashBucket(node, econtext))
426  {
427  /* out of matches; check for possible outer-join fill */
429  continue;
430  }
431  }
432  else
433  {
434  if (!ExecScanHashBucket(node, econtext))
435  {
436  /* out of matches; check for possible outer-join fill */
438  continue;
439  }
440  }
441 
442  /*
443  * We've got a match, but still need to test non-hashed quals.
444  * ExecScanHashBucket already set up all the state needed to
445  * call ExecQual.
446  *
447  * If we pass the qual, then save state for next call and have
448  * ExecProject form the projection, store it in the tuple
449  * table, and return the slot.
450  *
451  * Only the joinquals determine tuple match status, but all
452  * quals must pass to actually return the tuple.
453  */
454  if (joinqual == NULL || ExecQual(joinqual, econtext))
455  {
456  node->hj_MatchedOuter = true;
457 
458  if (parallel)
459  {
460  /*
461  * Full/right outer joins are currently not supported
462  * for parallel joins, so we don't need to set the
463  * match bit. Experiments show that it's worth
464  * avoiding the shared memory traffic on large
465  * systems.
466  */
467  Assert(!HJ_FILL_INNER(node));
468  }
469  else
470  {
471  /*
472  * This is really only needed if HJ_FILL_INNER(node),
473  * but we'll avoid the branch and just set it always.
474  */
476  }
477 
478  /* In an antijoin, we never return a matched tuple */
479  if (node->js.jointype == JOIN_ANTI)
480  {
482  continue;
483  }
484 
485  /*
486  * If we only need to join to the first matching inner
487  * tuple, then consider returning this one, but after that
488  * continue with next outer tuple.
489  */
490  if (node->js.single_match)
492 
493  if (otherqual == NULL || ExecQual(otherqual, econtext))
494  return ExecProject(node->js.ps.ps_ProjInfo);
495  else
496  InstrCountFiltered2(node, 1);
497  }
498  else
499  InstrCountFiltered1(node, 1);
500  break;
501 
502  case HJ_FILL_OUTER_TUPLE:
503 
504  /*
505  * The current outer tuple has run out of matches, so check
506  * whether to emit a dummy outer-join tuple. Whether we emit
507  * one or not, the next state is NEED_NEW_OUTER.
508  */
510 
511  if (!node->hj_MatchedOuter &&
512  HJ_FILL_OUTER(node))
513  {
514  /*
515  * Generate a fake join tuple with nulls for the inner
516  * tuple, and return it if it passes the non-join quals.
517  */
518  econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
519 
520  if (otherqual == NULL || ExecQual(otherqual, econtext))
521  return ExecProject(node->js.ps.ps_ProjInfo);
522  else
523  InstrCountFiltered2(node, 1);
524  }
525  break;
526 
528 
529  /*
530  * We have finished a batch, but we are doing right/full join,
531  * so any unmatched inner tuples in the hashtable have to be
532  * emitted before we continue to the next batch.
533  */
534  if (!ExecScanHashTableForUnmatched(node, econtext))
535  {
536  /* no more unmatched tuples */
538  continue;
539  }
540 
541  /*
542  * Generate a fake join tuple with nulls for the outer tuple,
543  * and return it if it passes the non-join quals.
544  */
545  econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
546 
547  if (otherqual == NULL || ExecQual(otherqual, econtext))
548  return ExecProject(node->js.ps.ps_ProjInfo);
549  else
550  InstrCountFiltered2(node, 1);
551  break;
552 
553  case HJ_NEED_NEW_BATCH:
554 
555  /*
556  * Try to advance to next batch. Done if there are no more.
557  */
558  if (parallel)
559  {
560  if (!ExecParallelHashJoinNewBatch(node))
561  return NULL; /* end of parallel-aware join */
562  }
563  else
564  {
565  if (!ExecHashJoinNewBatch(node))
566  return NULL; /* end of parallel-oblivious join */
567  }
569  break;
570 
571  default:
572  elog(ERROR, "unrecognized hashjoin state: %d",
573  (int) node->hj_JoinState);
574  }
575  }
576 }
577 
578 /* ----------------------------------------------------------------
579  * ExecHashJoin
580  *
581  * Parallel-oblivious version.
582  * ----------------------------------------------------------------
583  */
584 static TupleTableSlot * /* return: a tuple or NULL */
586 {
587  /*
588  * On sufficiently smart compilers this should be inlined with the
589  * parallel-aware branches removed.
590  */
591  return ExecHashJoinImpl(pstate, false);
592 }
593 
594 /* ----------------------------------------------------------------
595  * ExecParallelHashJoin
596  *
597  * Parallel-aware version.
598  * ----------------------------------------------------------------
599  */
600 static TupleTableSlot * /* return: a tuple or NULL */
602 {
603  /*
604  * On sufficiently smart compilers this should be inlined with the
605  * parallel-oblivious branches removed.
606  */
607  return ExecHashJoinImpl(pstate, true);
608 }
609 
610 /* ----------------------------------------------------------------
611  * ExecInitHashJoin
612  *
613  * Init routine for HashJoin node.
614  * ----------------------------------------------------------------
615  */
617 ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
618 {
619  HashJoinState *hjstate;
620  Plan *outerNode;
621  Hash *hashNode;
622  TupleDesc outerDesc,
623  innerDesc;
624  const TupleTableSlotOps *ops;
625 
626  /* check for unsupported flags */
627  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
628 
629  /*
630  * create state structure
631  */
632  hjstate = makeNode(HashJoinState);
633  hjstate->js.ps.plan = (Plan *) node;
634  hjstate->js.ps.state = estate;
635 
636  /*
637  * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
638  * where this function may be replaced with a parallel version, if we
639  * managed to launch a parallel query.
640  */
641  hjstate->js.ps.ExecProcNode = ExecHashJoin;
642  hjstate->js.jointype = node->join.jointype;
643 
644  /*
645  * Miscellaneous initialization
646  *
647  * create expression context for node
648  */
649  ExecAssignExprContext(estate, &hjstate->js.ps);
650 
651  /*
652  * initialize child nodes
653  *
654  * Note: we could suppress the REWIND flag for the inner input, which
655  * would amount to betting that the hash will be a single batch. Not
656  * clear if this would be a win or not.
657  */
658  outerNode = outerPlan(node);
659  hashNode = (Hash *) innerPlan(node);
660 
661  outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
662  outerDesc = ExecGetResultType(outerPlanState(hjstate));
663  innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
664  innerDesc = ExecGetResultType(innerPlanState(hjstate));
665 
666  /*
667  * Initialize result slot, type and projection.
668  */
670  ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
671 
672  /*
673  * tuple table initialization
674  */
675  ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL);
676  hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc,
677  ops);
678 
679  /*
680  * detect whether we need only consider the first matching inner tuple
681  */
682  hjstate->js.single_match = (node->join.inner_unique ||
683  node->join.jointype == JOIN_SEMI);
684 
685  /* set up null tuples for outer joins, if needed */
686  switch (node->join.jointype)
687  {
688  case JOIN_INNER:
689  case JOIN_SEMI:
690  break;
691  case JOIN_LEFT:
692  case JOIN_ANTI:
693  hjstate->hj_NullInnerTupleSlot =
694  ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
695  break;
696  case JOIN_RIGHT:
697  hjstate->hj_NullOuterTupleSlot =
698  ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
699  break;
700  case JOIN_FULL:
701  hjstate->hj_NullOuterTupleSlot =
702  ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
703  hjstate->hj_NullInnerTupleSlot =
704  ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
705  break;
706  default:
707  elog(ERROR, "unrecognized join type: %d",
708  (int) node->join.jointype);
709  }
710 
711  /*
712  * now for some voodoo. our temporary tuple slot is actually the result
713  * tuple slot of the Hash node (which is our inner plan). we can do this
714  * because Hash nodes don't return tuples via ExecProcNode() -- instead
715  * the hash join node uses ExecScanHashBucket() to get at the contents of
716  * the hash table. -cim 6/9/91
717  */
718  {
719  HashState *hashstate = (HashState *) innerPlanState(hjstate);
720  TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot;
721 
722  hjstate->hj_HashTupleSlot = slot;
723  }
724 
725  /*
726  * initialize child expressions
727  */
728  hjstate->js.ps.qual =
729  ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
730  hjstate->js.joinqual =
731  ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
732  hjstate->hashclauses =
733  ExecInitQual(node->hashclauses, (PlanState *) hjstate);
734 
735  /*
736  * initialize hash-specific info
737  */
738  hjstate->hj_HashTable = NULL;
739  hjstate->hj_FirstOuterTupleSlot = NULL;
740 
741  hjstate->hj_CurHashValue = 0;
742  hjstate->hj_CurBucketNo = 0;
743  hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
744  hjstate->hj_CurTuple = NULL;
745 
746  hjstate->hj_OuterHashKeys = ExecInitExprList(node->hashkeys,
747  (PlanState *) hjstate);
748  hjstate->hj_HashOperators = node->hashoperators;
749  hjstate->hj_Collations = node->hashcollations;
750 
751  hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
752  hjstate->hj_MatchedOuter = false;
753  hjstate->hj_OuterNotEmpty = false;
754 
755  return hjstate;
756 }
757 
758 /* ----------------------------------------------------------------
759  * ExecEndHashJoin
760  *
761  * clean up routine for HashJoin node
762  * ----------------------------------------------------------------
763  */
764 void
766 {
767  /*
768  * Free hash table
769  */
770  if (node->hj_HashTable)
771  {
773  node->hj_HashTable = NULL;
774  }
775 
776  /*
777  * Free the exprcontext
778  */
779  ExecFreeExprContext(&node->js.ps);
780 
781  /*
782  * clean out the tuple table
783  */
787 
788  /*
789  * clean up subtrees
790  */
793 }
794 
795 /*
796  * ExecHashJoinOuterGetTuple
797  *
798  * get the next outer tuple for a parallel oblivious hashjoin: either by
799  * executing the outer plan node in the first pass, or from the temp
800  * files for the hashjoin batches.
801  *
802  * Returns a null slot if no more outer tuples (within the current batch).
803  *
804  * On success, the tuple's hash value is stored at *hashvalue --- this is
805  * either originally computed, or re-read from the temp file.
806  */
807 static TupleTableSlot *
809  HashJoinState *hjstate,
810  uint32 *hashvalue)
811 {
812  HashJoinTable hashtable = hjstate->hj_HashTable;
813  int curbatch = hashtable->curbatch;
814  TupleTableSlot *slot;
815 
816  if (curbatch == 0) /* if it is the first pass */
817  {
818  /*
819  * Check to see if first outer tuple was already fetched by
820  * ExecHashJoin() and not used yet.
821  */
822  slot = hjstate->hj_FirstOuterTupleSlot;
823  if (!TupIsNull(slot))
824  hjstate->hj_FirstOuterTupleSlot = NULL;
825  else
826  slot = ExecProcNode(outerNode);
827 
828  while (!TupIsNull(slot))
829  {
830  /*
831  * We have to compute the tuple's hash value.
832  */
833  ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
834 
835  econtext->ecxt_outertuple = slot;
836  if (ExecHashGetHashValue(hashtable, econtext,
837  hjstate->hj_OuterHashKeys,
838  true, /* outer tuple */
839  HJ_FILL_OUTER(hjstate),
840  hashvalue))
841  {
842  /* remember outer relation is not empty for possible rescan */
843  hjstate->hj_OuterNotEmpty = true;
844 
845  return slot;
846  }
847 
848  /*
849  * That tuple couldn't match because of a NULL, so discard it and
850  * continue with the next one.
851  */
852  slot = ExecProcNode(outerNode);
853  }
854  }
855  else if (curbatch < hashtable->nbatch)
856  {
857  BufFile *file = hashtable->outerBatchFile[curbatch];
858 
859  /*
860  * In outer-join cases, we could get here even though the batch file
861  * is empty.
862  */
863  if (file == NULL)
864  return NULL;
865 
866  slot = ExecHashJoinGetSavedTuple(hjstate,
867  file,
868  hashvalue,
869  hjstate->hj_OuterTupleSlot);
870  if (!TupIsNull(slot))
871  return slot;
872  }
873 
874  /* End of this batch */
875  return NULL;
876 }
877 
878 /*
879  * ExecHashJoinOuterGetTuple variant for the parallel case.
880  */
881 static TupleTableSlot *
883  HashJoinState *hjstate,
884  uint32 *hashvalue)
885 {
886  HashJoinTable hashtable = hjstate->hj_HashTable;
887  int curbatch = hashtable->curbatch;
888  TupleTableSlot *slot;
889 
890  /*
891  * In the Parallel Hash case we only run the outer plan directly for
892  * single-batch hash joins. Otherwise we have to go to batch files, even
893  * for batch 0.
894  */
895  if (curbatch == 0 && hashtable->nbatch == 1)
896  {
897  slot = ExecProcNode(outerNode);
898 
899  while (!TupIsNull(slot))
900  {
901  ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
902 
903  econtext->ecxt_outertuple = slot;
904  if (ExecHashGetHashValue(hashtable, econtext,
905  hjstate->hj_OuterHashKeys,
906  true, /* outer tuple */
907  HJ_FILL_OUTER(hjstate),
908  hashvalue))
909  return slot;
910 
911  /*
912  * That tuple couldn't match because of a NULL, so discard it and
913  * continue with the next one.
914  */
915  slot = ExecProcNode(outerNode);
916  }
917  }
918  else if (curbatch < hashtable->nbatch)
919  {
920  MinimalTuple tuple;
921 
922  tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
923  hashvalue);
924  if (tuple != NULL)
925  {
927  hjstate->hj_OuterTupleSlot,
928  false);
929  slot = hjstate->hj_OuterTupleSlot;
930  return slot;
931  }
932  else
934  }
935 
936  /* End of this batch */
937  return NULL;
938 }
939 
940 /*
941  * ExecHashJoinNewBatch
942  * switch to a new hashjoin batch
943  *
944  * Returns true if successful, false if there are no more batches.
945  */
946 static bool
948 {
949  HashJoinTable hashtable = hjstate->hj_HashTable;
950  int nbatch;
951  int curbatch;
952  BufFile *innerFile;
953  TupleTableSlot *slot;
954  uint32 hashvalue;
955 
956  nbatch = hashtable->nbatch;
957  curbatch = hashtable->curbatch;
958 
959  if (curbatch > 0)
960  {
961  /*
962  * We no longer need the previous outer batch file; close it right
963  * away to free disk space.
964  */
965  if (hashtable->outerBatchFile[curbatch])
966  BufFileClose(hashtable->outerBatchFile[curbatch]);
967  hashtable->outerBatchFile[curbatch] = NULL;
968  }
969  else /* we just finished the first batch */
970  {
971  /*
972  * Reset some of the skew optimization state variables, since we no
973  * longer need to consider skew tuples after the first batch. The
974  * memory context reset we are about to do will release the skew
975  * hashtable itself.
976  */
977  hashtable->skewEnabled = false;
978  hashtable->skewBucket = NULL;
979  hashtable->skewBucketNums = NULL;
980  hashtable->nSkewBuckets = 0;
981  hashtable->spaceUsedSkew = 0;
982  }
983 
984  /*
985  * We can always skip over any batches that are completely empty on both
986  * sides. We can sometimes skip over batches that are empty on only one
987  * side, but there are exceptions:
988  *
989  * 1. In a left/full outer join, we have to process outer batches even if
990  * the inner batch is empty. Similarly, in a right/full outer join, we
991  * have to process inner batches even if the outer batch is empty.
992  *
993  * 2. If we have increased nbatch since the initial estimate, we have to
994  * scan inner batches since they might contain tuples that need to be
995  * reassigned to later inner batches.
996  *
997  * 3. Similarly, if we have increased nbatch since starting the outer
998  * scan, we have to rescan outer batches in case they contain tuples that
999  * need to be reassigned.
1000  */
1001  curbatch++;
1002  while (curbatch < nbatch &&
1003  (hashtable->outerBatchFile[curbatch] == NULL ||
1004  hashtable->innerBatchFile[curbatch] == NULL))
1005  {
1006  if (hashtable->outerBatchFile[curbatch] &&
1007  HJ_FILL_OUTER(hjstate))
1008  break; /* must process due to rule 1 */
1009  if (hashtable->innerBatchFile[curbatch] &&
1010  HJ_FILL_INNER(hjstate))
1011  break; /* must process due to rule 1 */
1012  if (hashtable->innerBatchFile[curbatch] &&
1013  nbatch != hashtable->nbatch_original)
1014  break; /* must process due to rule 2 */
1015  if (hashtable->outerBatchFile[curbatch] &&
1016  nbatch != hashtable->nbatch_outstart)
1017  break; /* must process due to rule 3 */
1018  /* We can ignore this batch. */
1019  /* Release associated temp files right away. */
1020  if (hashtable->innerBatchFile[curbatch])
1021  BufFileClose(hashtable->innerBatchFile[curbatch]);
1022  hashtable->innerBatchFile[curbatch] = NULL;
1023  if (hashtable->outerBatchFile[curbatch])
1024  BufFileClose(hashtable->outerBatchFile[curbatch]);
1025  hashtable->outerBatchFile[curbatch] = NULL;
1026  curbatch++;
1027  }
1028 
1029  if (curbatch >= nbatch)
1030  return false; /* no more batches */
1031 
1032  hashtable->curbatch = curbatch;
1033 
1034  /*
1035  * Reload the hash table with the new inner batch (which could be empty)
1036  */
1037  ExecHashTableReset(hashtable);
1038 
1039  innerFile = hashtable->innerBatchFile[curbatch];
1040 
1041  if (innerFile != NULL)
1042  {
1043  if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))
1044  ereport(ERROR,
1046  errmsg("could not rewind hash-join temporary file")));
1047 
1048  while ((slot = ExecHashJoinGetSavedTuple(hjstate,
1049  innerFile,
1050  &hashvalue,
1051  hjstate->hj_HashTupleSlot)))
1052  {
1053  /*
1054  * NOTE: some tuples may be sent to future batches. Also, it is
1055  * possible for hashtable->nbatch to be increased here!
1056  */
1057  ExecHashTableInsert(hashtable, slot, hashvalue);
1058  }
1059 
1060  /*
1061  * after we build the hash table, the inner batch file is no longer
1062  * needed
1063  */
1064  BufFileClose(innerFile);
1065  hashtable->innerBatchFile[curbatch] = NULL;
1066  }
1067 
1068  /*
1069  * Rewind outer batch file (if present), so that we can start reading it.
1070  */
1071  if (hashtable->outerBatchFile[curbatch] != NULL)
1072  {
1073  if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
1074  ereport(ERROR,
1076  errmsg("could not rewind hash-join temporary file")));
1077  }
1078 
1079  return true;
1080 }
1081 
1082 /*
1083  * Choose a batch to work on, and attach to it. Returns true if successful,
1084  * false if there are no more batches.
1085  */
1086 static bool
1088 {
1089  HashJoinTable hashtable = hjstate->hj_HashTable;
1090  int start_batchno;
1091  int batchno;
1092 
1093  /*
1094  * If we started up so late that the batch tracking array has been freed
1095  * already by ExecHashTableDetach(), then we are finished. See also
1096  * ExecParallelHashEnsureBatchAccessors().
1097  */
1098  if (hashtable->batches == NULL)
1099  return false;
1100 
1101  /*
1102  * If we were already attached to a batch, remember not to bother checking
1103  * it again, and detach from it (possibly freeing the hash table if we are
1104  * last to detach).
1105  */
1106  if (hashtable->curbatch >= 0)
1107  {
1108  hashtable->batches[hashtable->curbatch].done = true;
1109  ExecHashTableDetachBatch(hashtable);
1110  }
1111 
1112  /*
1113  * Search for a batch that isn't done. We use an atomic counter to start
1114  * our search at a different batch in every participant when there are
1115  * more batches than participants.
1116  */
1117  batchno = start_batchno =
1119  hashtable->nbatch;
1120  do
1121  {
1122  uint32 hashvalue;
1123  MinimalTuple tuple;
1124  TupleTableSlot *slot;
1125 
1126  if (!hashtable->batches[batchno].done)
1127  {
1128  SharedTuplestoreAccessor *inner_tuples;
1129  Barrier *batch_barrier =
1130  &hashtable->batches[batchno].shared->batch_barrier;
1131 
1132  switch (BarrierAttach(batch_barrier))
1133  {
1134  case PHJ_BATCH_ELECTING:
1135 
1136  /* One backend allocates the hash table. */
1137  if (BarrierArriveAndWait(batch_barrier,
1139  ExecParallelHashTableAlloc(hashtable, batchno);
1140  /* Fall through. */
1141 
1142  case PHJ_BATCH_ALLOCATING:
1143  /* Wait for allocation to complete. */
1144  BarrierArriveAndWait(batch_barrier,
1146  /* Fall through. */
1147 
1148  case PHJ_BATCH_LOADING:
1149  /* Start (or join in) loading tuples. */
1150  ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1151  inner_tuples = hashtable->batches[batchno].inner_tuples;
1152  sts_begin_parallel_scan(inner_tuples);
1153  while ((tuple = sts_parallel_scan_next(inner_tuples,
1154  &hashvalue)))
1155  {
1157  hjstate->hj_HashTupleSlot,
1158  false);
1159  slot = hjstate->hj_HashTupleSlot;
1161  hashvalue);
1162  }
1163  sts_end_parallel_scan(inner_tuples);
1164  BarrierArriveAndWait(batch_barrier,
1166  /* Fall through. */
1167 
1168  case PHJ_BATCH_PROBING:
1169 
1170  /*
1171  * This batch is ready to probe. Return control to
1172  * caller. We stay attached to batch_barrier so that the
1173  * hash table stays alive until everyone's finished
1174  * probing it, but no participant is allowed to wait at
1175  * this barrier again (or else a deadlock could occur).
1176  * All attached participants must eventually call
1177  * BarrierArriveAndDetach() so that the final phase
1178  * PHJ_BATCH_DONE can be reached.
1179  */
1180  ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1181  sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
1182  return true;
1183 
1184  case PHJ_BATCH_DONE:
1185 
1186  /*
1187  * Already done. Detach and go around again (if any
1188  * remain).
1189  */
1190  BarrierDetach(batch_barrier);
1191  hashtable->batches[batchno].done = true;
1192  hashtable->curbatch = -1;
1193  break;
1194 
1195  default:
1196  elog(ERROR, "unexpected batch phase %d",
1197  BarrierPhase(batch_barrier));
1198  }
1199  }
1200  batchno = (batchno + 1) % hashtable->nbatch;
1201  } while (batchno != start_batchno);
1202 
1203  return false;
1204 }
1205 
1206 /*
1207  * ExecHashJoinSaveTuple
1208  * save a tuple to a batch file.
1209  *
1210  * The data recorded in the file for each tuple is its hash value,
1211  * then the tuple in MinimalTuple format.
1212  *
1213  * Note: it is important always to call this in the regular executor
1214  * context, not in a shorter-lived context; else the temp file buffers
1215  * will get messed up.
1216  */
1217 void
1219  BufFile **fileptr)
1220 {
1221  BufFile *file = *fileptr;
1222 
1223  if (file == NULL)
1224  {
1225  /* First write to this batch file, so open it. */
1226  file = BufFileCreateTemp(false);
1227  *fileptr = file;
1228  }
1229 
1230  BufFileWrite(file, (void *) &hashvalue, sizeof(uint32));
1231  BufFileWrite(file, (void *) tuple, tuple->t_len);
1232 }
1233 
1234 /*
1235  * ExecHashJoinGetSavedTuple
1236  * read the next tuple from a batch file. Return NULL if no more.
1237  *
1238  * On success, *hashvalue is set to the tuple's hash value, and the tuple
1239  * itself is stored in the given slot.
1240  */
1241 static TupleTableSlot *
1243  BufFile *file,
1244  uint32 *hashvalue,
1245  TupleTableSlot *tupleSlot)
1246 {
1247  uint32 header[2];
1248  size_t nread;
1249  MinimalTuple tuple;
1250 
1251  /*
1252  * We check for interrupts here because this is typically taken as an
1253  * alternative code path to an ExecProcNode() call, which would include
1254  * such a check.
1255  */
1257 
1258  /*
1259  * Since both the hash value and the MinimalTuple length word are uint32,
1260  * we can read them both in one BufFileRead() call without any type
1261  * cheating.
1262  */
1263  nread = BufFileRead(file, (void *) header, sizeof(header));
1264  if (nread == 0) /* end of file */
1265  {
1266  ExecClearTuple(tupleSlot);
1267  return NULL;
1268  }
1269  if (nread != sizeof(header))
1270  ereport(ERROR,
1272  errmsg("could not read from hash-join temporary file: read only %zu of %zu bytes",
1273  nread, sizeof(header))));
1274  *hashvalue = header[0];
1275  tuple = (MinimalTuple) palloc(header[1]);
1276  tuple->t_len = header[1];
1277  nread = BufFileRead(file,
1278  (void *) ((char *) tuple + sizeof(uint32)),
1279  header[1] - sizeof(uint32));
1280  if (nread != header[1] - sizeof(uint32))
1281  ereport(ERROR,
1283  errmsg("could not read from hash-join temporary file: read only %zu of %zu bytes",
1284  nread, header[1] - sizeof(uint32))));
1285  ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
1286  return tupleSlot;
1287 }
1288 
1289 
1290 void
1292 {
1293  /*
1294  * In a multi-batch join, we currently have to do rescans the hard way,
1295  * primarily because batch temp files may have already been released. But
1296  * if it's a single-batch join, and there is no parameter change for the
1297  * inner subnode, then we can just re-use the existing hash table without
1298  * rebuilding it.
1299  */
1300  if (node->hj_HashTable != NULL)
1301  {
1302  if (node->hj_HashTable->nbatch == 1 &&
1303  node->js.ps.righttree->chgParam == NULL)
1304  {
1305  /*
1306  * Okay to reuse the hash table; needn't rescan inner, either.
1307  *
1308  * However, if it's a right/full join, we'd better reset the
1309  * inner-tuple match flags contained in the table.
1310  */
1311  if (HJ_FILL_INNER(node))
1313 
1314  /*
1315  * Also, we need to reset our state about the emptiness of the
1316  * outer relation, so that the new scan of the outer will update
1317  * it correctly if it turns out to be empty this time. (There's no
1318  * harm in clearing it now because ExecHashJoin won't need the
1319  * info. In the other cases, where the hash table doesn't exist
1320  * or we are destroying it, we leave this state alone because
1321  * ExecHashJoin will need it the first time through.)
1322  */
1323  node->hj_OuterNotEmpty = false;
1324 
1325  /* ExecHashJoin can skip the BUILD_HASHTABLE step */
1327  }
1328  else
1329  {
1330  /* must destroy and rebuild hash table */
1331  HashState *hashNode = castNode(HashState, innerPlanState(node));
1332 
1333  Assert(hashNode->hashtable == node->hj_HashTable);
1334  /* accumulate stats from old hash table, if wanted */
1335  /* (this should match ExecShutdownHash) */
1336  if (hashNode->ps.instrument && !hashNode->hinstrument)
1337  hashNode->hinstrument = (HashInstrumentation *)
1338  palloc0(sizeof(HashInstrumentation));
1339  if (hashNode->hinstrument)
1341  hashNode->hashtable);
1342  /* for safety, be sure to clear child plan node's pointer too */
1343  hashNode->hashtable = NULL;
1344 
1346  node->hj_HashTable = NULL;
1348 
1349  /*
1350  * if chgParam of subnode is not null then plan will be re-scanned
1351  * by first ExecProcNode.
1352  */
1353  if (node->js.ps.righttree->chgParam == NULL)
1354  ExecReScan(node->js.ps.righttree);
1355  }
1356  }
1357 
1358  /* Always reset intra-tuple state */
1359  node->hj_CurHashValue = 0;
1360  node->hj_CurBucketNo = 0;
1362  node->hj_CurTuple = NULL;
1363 
1364  node->hj_MatchedOuter = false;
1365  node->hj_FirstOuterTupleSlot = NULL;
1366 
1367  /*
1368  * if chgParam of subnode is not null then plan will be re-scanned by
1369  * first ExecProcNode.
1370  */
1371  if (node->js.ps.lefttree->chgParam == NULL)
1372  ExecReScan(node->js.ps.lefttree);
1373 }
1374 
1375 void
1377 {
1378  if (node->hj_HashTable)
1379  {
1380  /*
1381  * Detach from shared state before DSM memory goes away. This makes
1382  * sure that we don't have any pointers into DSM memory by the time
1383  * ExecEndHashJoin runs.
1384  */
1387  }
1388 }
1389 
1390 static void
1392 {
1393  PlanState *outerState = outerPlanState(hjstate);
1394  ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1395  HashJoinTable hashtable = hjstate->hj_HashTable;
1396  TupleTableSlot *slot;
1397  uint32 hashvalue;
1398  int i;
1399 
1400  Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
1401 
1402  /* Execute outer plan, writing all tuples to shared tuplestores. */
1403  for (;;)
1404  {
1405  slot = ExecProcNode(outerState);
1406  if (TupIsNull(slot))
1407  break;
1408  econtext->ecxt_outertuple = slot;
1409  if (ExecHashGetHashValue(hashtable, econtext,
1410  hjstate->hj_OuterHashKeys,
1411  true, /* outer tuple */
1412  HJ_FILL_OUTER(hjstate),
1413  &hashvalue))
1414  {
1415  int batchno;
1416  int bucketno;
1417  bool shouldFree;
1418  MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1419 
1420  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
1421  &batchno);
1422  sts_puttuple(hashtable->batches[batchno].outer_tuples,
1423  &hashvalue, mintup);
1424 
1425  if (shouldFree)
1426  heap_free_minimal_tuple(mintup);
1427  }
1429  }
1430 
1431  /* Make sure all outer partitions are readable by any backend. */
1432  for (i = 0; i < hashtable->nbatch; ++i)
1433  sts_end_write(hashtable->batches[i].outer_tuples);
1434 }
1435 
1436 void
1438 {
1440  shm_toc_estimate_keys(&pcxt->estimator, 1);
1441 }
1442 
1443 void
1445 {
1446  int plan_node_id = state->js.ps.plan->plan_node_id;
1447  HashState *hashNode;
1448  ParallelHashJoinState *pstate;
1449 
1450  /*
1451  * Disable shared hash table mode if we failed to create a real DSM
1452  * segment, because that means that we don't have a DSA area to work with.
1453  */
1454  if (pcxt->seg == NULL)
1455  return;
1456 
1458 
1459  /*
1460  * Set up the state needed to coordinate access to the shared hash
1461  * table(s), using the plan node ID as the toc key.
1462  */
1463  pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
1464  shm_toc_insert(pcxt->toc, plan_node_id, pstate);
1465 
1466  /*
1467  * Set up the shared hash join state with no batches initially.
1468  * ExecHashTableCreate() will prepare at least one later and set nbatch
1469  * and space_allowed.
1470  */
1471  pstate->nbatch = 0;
1472  pstate->space_allowed = 0;
1473  pstate->batches = InvalidDsaPointer;
1474  pstate->old_batches = InvalidDsaPointer;
1475  pstate->nbuckets = 0;
1476  pstate->growth = PHJ_GROWTH_OK;
1478  pg_atomic_init_u32(&pstate->distributor, 0);
1479  pstate->nparticipants = pcxt->nworkers + 1;
1480  pstate->total_tuples = 0;
1481  LWLockInitialize(&pstate->lock,
1483  BarrierInit(&pstate->build_barrier, 0);
1484  BarrierInit(&pstate->grow_batches_barrier, 0);
1485  BarrierInit(&pstate->grow_buckets_barrier, 0);
1486 
1487  /* Set up the space we'll use for shared temporary files. */
1488  SharedFileSetInit(&pstate->fileset, pcxt->seg);
1489 
1490  /* Initialize the shared state in the hash node. */
1491  hashNode = (HashState *) innerPlanState(state);
1492  hashNode->parallel_state = pstate;
1493 }
1494 
1495 /* ----------------------------------------------------------------
1496  * ExecHashJoinReInitializeDSM
1497  *
1498  * Reset shared state before beginning a fresh scan.
1499  * ----------------------------------------------------------------
1500  */
1501 void
1503 {
1504  int plan_node_id = state->js.ps.plan->plan_node_id;
1505  ParallelHashJoinState *pstate =
1506  shm_toc_lookup(cxt->toc, plan_node_id, false);
1507 
1508  /*
1509  * It would be possible to reuse the shared hash table in single-batch
1510  * cases by resetting and then fast-forwarding build_barrier to
1511  * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but
1512  * currently shared hash tables are already freed by now (by the last
1513  * participant to detach from the batch). We could consider keeping it
1514  * around for single-batch joins. We'd also need to adjust
1515  * finalize_plan() so that it doesn't record a dummy dependency for
1516  * Parallel Hash nodes, preventing the rescan optimization. For now we
1517  * don't try.
1518  */
1519 
1520  /* Detach, freeing any remaining shared memory. */
1521  if (state->hj_HashTable != NULL)
1522  {
1525  }
1526 
1527  /* Clear any shared batch files. */
1528  SharedFileSetDeleteAll(&pstate->fileset);
1529 
1530  /* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */
1531  BarrierInit(&pstate->build_barrier, 0);
1532 }
1533 
1534 void
1536  ParallelWorkerContext *pwcxt)
1537 {
1538  HashState *hashNode;
1539  int plan_node_id = state->js.ps.plan->plan_node_id;
1540  ParallelHashJoinState *pstate =
1541  shm_toc_lookup(pwcxt->toc, plan_node_id, false);
1542 
1543  /* Attach to the space for shared temporary files. */
1544  SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
1545 
1546  /* Attach to the shared state in the hash node. */
1547  hashNode = (HashState *) innerPlanState(state);
1548  hashNode->parallel_state = pstate;
1549 
1551 }
JoinType jointype
Definition: execnodes.h:1838
struct ParallelHashJoinState * parallel_state
Definition: execnodes.h:2423
SharedTuplestoreAccessor * outer_tuples
Definition: hashjoin.h:209
#define HJ_NEED_NEW_BATCH
Definition: nodeHashjoin.c:129
#define PHJ_BATCH_DONE
Definition: hashjoin.h:268
List * qual
Definition: plannodes.h:137
#define INVALID_SKEW_BUCKET_NO
Definition: hashjoin.h:109
#define HJ_SCAN_BUCKET
Definition: nodeHashjoin.c:126
dsa_pointer chunk_work_queue
Definition: hashjoin.h:242
void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
Definition: nodeHash.c:1750
TupleTableSlot * hj_NullInnerTupleSlot
Definition: execnodes.h:1952
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
ExprState * joinqual
Definition: execnodes.h:1841
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:979
Instrumentation * instrument
Definition: execnodes.h:949
#define InvalidDsaPointer
Definition: dsa.h:78
const TupleTableSlotOps * ExecGetResultSlotOps(PlanState *planstate, bool *isfixed)
Definition: execUtils.c:498
List * hashkeys
Definition: plannodes.h:751
void BarrierInit(Barrier *barrier, int participants)
Definition: barrier.c:100
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:64
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:650
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
static TupleTableSlot * ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue)
Definition: nodeHashjoin.c:882
PlanState ps
Definition: execnodes.h:1837
#define castNode(_type_, nodeptr)
Definition: nodes.h:597
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:543
bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
Definition: nodeHash.c:2070
void ExecHashTableDetachBatch(HashJoinTable hashtable)
Definition: nodeHash.c:3119
void ExecHashTableDetach(HashJoinTable hashtable)
Definition: nodeHash.c:3176
dsm_segment * seg
Definition: parallel.h:43
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1662
List * hashclauses
Definition: plannodes.h:743
#define PHJ_BATCH_ALLOCATING
Definition: hashjoin.h:265
void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
Definition: nodeHash.c:2046
ExprContext * ps_ExprContext
Definition: execnodes.h:978
void ExecHashTableReset(HashJoinTable hashtable)
Definition: nodeHash.c:2141
shm_toc_estimator estimator
Definition: parallel.h:42
void ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
Definition: nodeHash.c:3261
bool single_match
Definition: execnodes.h:1839
HashJoinTable hashtable
Definition: execnodes.h:2404
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
int plan_node_id
Definition: plannodes.h:135
void ExecShutdownHashJoin(HashJoinState *node)
bool hj_MatchedOuter
Definition: execnodes.h:1955
static TupleTableSlot * ExecHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue)
Definition: nodeHashjoin.c:808
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
Definition: nodeHash.c:3099
EState * state
Definition: execnodes.h:941
TupleTableSlot * hj_OuterTupleSlot
Definition: execnodes.h:1949
struct PlanState * righttree
Definition: execnodes.h:962
static bool ExecQual(ExprState *state, ExprContext *econtext)
Definition: executor.h:369
List * hj_OuterHashKeys
Definition: execnodes.h:1941
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
SharedFileSet fileset
Definition: hashjoin.h:253
TupleTableSlot * hj_FirstOuterTupleSlot
Definition: execnodes.h:1953
#define PHJ_BUILD_HASHING_OUTER
Definition: hashjoin.h:260
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:649
Join join
Definition: plannodes.h:742
void BufFileClose(BufFile *file)
Definition: buffile.c:395
int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
Definition: nodeHash.c:2369
ExprState * ExecInitQual(List *qual, PlanState *parent)
Definition: execExpr.c:209
#define PHJ_BATCH_LOADING
Definition: hashjoin.h:266
#define PHJ_BATCH_ELECTING
Definition: hashjoin.h:264
struct PlanState * lefttree
Definition: execnodes.h:961
void ExecEndHashJoin(HashJoinState *node)
Definition: nodeHashjoin.c:765
HashJoinTable ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, bool keepNulls)
Definition: nodeHash.c:431
SharedTuplestoreAccessor * inner_tuples
Definition: hashjoin.h:208
#define HJ_FILL_INNER(hjstate)
Definition: nodeHashjoin.c:134
int * skewBucketNums
Definition: hashjoin.h:308
void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
Definition: nodeHash.c:1596
List * hashcollations
Definition: plannodes.h:745
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
Definition: nodeHash.c:1902
JoinType jointype
Definition: plannodes.h:684
uint32 hj_CurHashValue
Definition: execnodes.h:1945
int hj_CurSkewBucketNo
Definition: execnodes.h:1947
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:977
Barrier grow_buckets_barrier
Definition: hashjoin.h:250
#define ERROR
Definition: elog.h:43
TupleTableSlot * hj_NullOuterTupleSlot
Definition: execnodes.h:1951
dsa_pointer batches
Definition: hashjoin.h:236
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
static void ExecParallelHashJoinPartitionOuter(HashJoinState *node)
void ExecForceStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1482
BufFile * BufFileCreateTemp(bool interXact)
Definition: buffile.c:188
void heap_free_minimal_tuple(MinimalTuple mtup)
Definition: heaptuple.c:1427
List * hashoperators
Definition: plannodes.h:744
void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
Definition: nodeHash.c:2691
#define EXEC_FLAG_BACKWARD
Definition: executor.h:58
BufFile ** outerBatchFile
Definition: hashjoin.h:330
#define outerPlanState(node)
Definition: execnodes.h:1033
#define innerPlan(node)
Definition: plannodes.h:165
Cost startup_cost
Definition: plannodes.h:117
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
Definition: execUtils.c:534
HashJoinTuple hj_CurTuple
Definition: execnodes.h:1948
MinimalTupleData * MinimalTuple
Definition: htup.h:27
bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
Definition: nodeHash.c:1934
int errcode_for_file_access(void)
Definition: elog.c:633
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:227
List * ExecInitExprList(List *nodes, PlanState *parent)
Definition: execExpr.c:318
#define TupIsNull(slot)
Definition: tuptable.h:292
unsigned int uint32
Definition: c.h:375
PlanState ps
Definition: execnodes.h:2403
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
#define InstrCountFiltered1(node, delta)
Definition: execnodes.h:1041
HashInstrumentation * hinstrument
Definition: execnodes.h:2420
List * hj_HashOperators
Definition: execnodes.h:1942
Bitmapset * chgParam
Definition: execnodes.h:971
#define outerPlan(node)
Definition: plannodes.h:166
#define PHJ_BUILD_DONE
Definition: hashjoin.h:261
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:745
int hj_CurBucketNo
Definition: execnodes.h:1946
#define HJ_FILL_INNER_TUPLES
Definition: nodeHashjoin.c:128
void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
#define HJ_FILL_OUTER(hjstate)
Definition: nodeHashjoin.c:132
HashSkewBucket ** skewBucket
Definition: hashjoin.h:305
HashJoinState * ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
Definition: nodeHashjoin.c:617
int BarrierAttach(Barrier *barrier)
Definition: barrier.c:214
void * palloc0(Size size)
Definition: mcxt.c:981
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:945
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:239
#define PHJ_BATCH_PROBING
Definition: hashjoin.h:267
void ExecHashJoinInitializeWorker(HashJoinState *state, ParallelWorkerContext *pwcxt)
void ExecSetExecProcNode(PlanState *node, ExecProcNodeMtd function)
Definition: execProcnode.c:411
List * hj_Collations
Definition: execnodes.h:1943
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
Plan * plan
Definition: execnodes.h:939
#define ereport(elevel,...)
Definition: elog.h:144
double totalTuples
Definition: hashjoin.h:318
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:328
#define makeNode(_type_)
Definition: nodes.h:576
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:229
void SharedFileSetDeleteAll(SharedFileSet *fileset)
bool hj_OuterNotEmpty
Definition: execnodes.h:1956
#define Assert(condition)
Definition: c.h:746
#define EXEC_FLAG_MARK
Definition: executor.h:59
Definition: regguts.h:298
TupleTableSlot * ExecInitNullTupleSlot(EState *estate, TupleDesc tupType, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1817
ParallelHashGrowth growth
Definition: hashjoin.h:241
#define InstrCountFiltered2(node, delta)
Definition: execnodes.h:1046
dsa_pointer old_batches
Definition: hashjoin.h:237
bool BarrierDetach(Barrier *barrier)
Definition: barrier.c:234
static pg_attribute_always_inline TupleTableSlot * ExecHashJoinImpl(PlanState *pstate, bool parallel)
Definition: nodeHashjoin.c:165
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:479
BufFile ** innerBatchFile
Definition: hashjoin.h:329
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:243
static TupleTableSlot * ExecHashJoinGetSavedTuple(HashJoinState *hjstate, BufFile *file, uint32 *hashvalue, TupleTableSlot *tupleSlot)
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1769
static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
static TupleTableSlot * ExecParallelHashJoin(PlanState *pstate)
Definition: nodeHashjoin.c:601
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
static void header(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:210
TupleDesc ExecGetResultType(PlanState *planstate)
Definition: execUtils.c:489
ParallelHashJoinBatch * shared
Definition: hashjoin.h:197
#define HJ_NEED_NEW_OUTER
Definition: nodeHashjoin.c:125
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
Definition: barrier.c:125
ExprState * qual
Definition: execnodes.h:960
void ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:1950
pg_atomic_uint32 distributor
Definition: hashjoin.h:251
#define HJ_BUILD_HASHTABLE
Definition: nodeHashjoin.c:124
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * palloc(Size size)
Definition: mcxt.c:950
HashJoinTable hj_HashTable
Definition: execnodes.h:1944
int errmsg(const char *fmt,...)
Definition: elog.c:821
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
Node * MultiExecProcNode(PlanState *node)
Definition: execProcnode.c:488
#define elog(elevel,...)
Definition: elog.h:214
int i
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:543
Cost total_cost
Definition: plannodes.h:118
#define HeapTupleHeaderSetMatch(tup)
Definition: htup_details.h:521
static TupleTableSlot * ExecHashJoin(PlanState *pstate)
Definition: nodeHashjoin.c:585
void ExecHashTableResetMatchFlags(HashJoinTable hashtable)
Definition: nodeHash.c:2170
#define pg_attribute_always_inline
Definition: c.h:163
bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, bool outer_tuple, bool keep_nulls, uint32 *hashvalue)
Definition: nodeHash.c:1794
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:223
#define HJ_FILL_OUTER_TUPLE
Definition: nodeHashjoin.c:127
bool inner_unique
Definition: plannodes.h:685
void BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:586
#define innerPlanState(node)
Definition: execnodes.h:1032
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:139
bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
Definition: nodeHash.c:1995
JoinState js
Definition: execnodes.h:1939
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
List * joinqual
Definition: plannodes.h:686
static bool ExecHashJoinNewBatch(HashJoinState *hjstate)
Definition: nodeHashjoin.c:947
Barrier grow_batches_barrier
Definition: hashjoin.h:249
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr)
void ExecHashTableDestroy(HashJoinTable hashtable)
Definition: nodeHash.c:854
dsm_segment * seg
Definition: parallel.h:53
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
Definition: executor.h:332
#define ResetExprContext(econtext)
Definition: executor.h:500
shm_toc * toc
Definition: parallel.h:45
Plan plan
Definition: plannodes.h:683
void sts_end_write(SharedTuplestoreAccessor *accessor)
void ExecReScanHashJoin(HashJoinState *node)