PostgreSQL Source Code  git master
nodeHashjoin.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeHashjoin.c
4  * Routines to handle hash join nodes
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/executor/nodeHashjoin.c
12  *
13  * PARALLELISM
14  *
15  * Hash joins can participate in parallel query execution in several ways. A
16  * parallel-oblivious hash join is one where the node is unaware that it is
17  * part of a parallel plan. In this case, a copy of the inner plan is used to
18  * build a copy of the hash table in every backend, and the outer plan could
19  * either be built from a partial or complete path, so that the results of the
20  * hash join are correspondingly either partial or complete. A parallel-aware
21  * hash join is one that behaves differently, coordinating work between
22  * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel
23  * Hash Join always appears with a Parallel Hash node.
24  *
25  * Parallel-aware hash joins use the same per-backend state machine to track
26  * progress through the hash join algorithm as parallel-oblivious hash joins.
27  * In a parallel-aware hash join, there is also a shared state machine that
28  * co-operating backends use to synchronize their local state machines and
29  * program counters. The shared state machine is managed with a Barrier IPC
30  * primitive. When all attached participants arrive at a barrier, the phase
31  * advances and all waiting participants are released.
32  *
33  * When a participant begins working on a parallel hash join, it must first
34  * figure out how much progress has already been made, because participants
35  * don't wait for each other to begin. For this reason there are switch
36  * statements at key points in the code where we have to synchronize our local
37  * state machine with the phase, and then jump to the correct part of the
38  * algorithm so that we can get started.
39  *
40  * One barrier called build_barrier is used to coordinate the hashing phases.
41  * The phase is represented by an integer which begins at zero and increments
42  * one by one, but in the code it is referred to by symbolic names as follows:
43  *
44  * PHJ_BUILD_ELECTING -- initial state
45  * PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0
46  * PHJ_BUILD_HASHING_INNER -- all hash the inner rel
47  * PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer
48  * PHJ_BUILD_DONE -- building done, probing can begin
49  *
50  * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
51  * be used repeatedly as required to coordinate expansions in the number of
52  * batches or buckets. Their phases are as follows:
53  *
54  * PHJ_GROW_BATCHES_ELECTING -- initial state
55  * PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches
56  * PHJ_GROW_BATCHES_REPARTITIONING -- all repartition
57  * PHJ_GROW_BATCHES_FINISHING -- one cleans up, detects skew
58  *
59  * PHJ_GROW_BUCKETS_ELECTING -- initial state
60  * PHJ_GROW_BUCKETS_ALLOCATING -- one allocates new buckets
61  * PHJ_GROW_BUCKETS_REINSERTING -- all insert tuples
62  *
63  * If the planner got the number of batches and buckets right, those won't be
64  * necessary, but on the other hand we might finish up needing to expand the
65  * buckets or batches multiple times while hashing the inner relation to stay
66  * within our memory budget and load factor target. For that reason it's a
67  * separate pair of barriers using circular phases.
68  *
69  * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins,
70  * because we need to divide the outer relation into batches up front in order
71  * to be able to process batches entirely independently. In contrast, the
72  * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
73  * batches whenever it encounters them while scanning and probing, which it
74  * can do because it processes batches in serial order.
75  *
76  * Once PHJ_BUILD_DONE is reached, backends then split up and process
77  * different batches, or gang up and work together on probing batches if there
78  * aren't enough to go around. For each batch there is a separate barrier
79  * with the following phases:
80  *
81  * PHJ_BATCH_ELECTING -- initial state
82  * PHJ_BATCH_ALLOCATING -- one allocates buckets
83  * PHJ_BATCH_LOADING -- all load the hash table from disk
84  * PHJ_BATCH_PROBING -- all probe
85  * PHJ_BATCH_DONE -- end
86  *
87  * Batch 0 is a special case, because it starts out in phase
88  * PHJ_BATCH_PROBING; populating batch 0's hash table is done during
89  * PHJ_BUILD_HASHING_INNER so we can skip loading.
90  *
91  * Initially we try to plan for a single-batch hash join using the combined
92  * work_mem of all participants to create a large shared hash table. If that
93  * turns out either at planning or execution time to be impossible then we
94  * fall back to regular work_mem sized hash tables.
95  *
96  * To avoid deadlocks, we never wait for any barrier unless it is known that
97  * all other backends attached to it are actively executing the node or have
98  * already arrived. Practically, that means that we never return a tuple
99  * while attached to a barrier, unless the barrier has reached its final
100  * state. In the slightly special case of the per-batch barrier, we return
101  * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
102  * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
103  *
104  *-------------------------------------------------------------------------
105  */
106 
107 #include "postgres.h"
108 
109 #include "access/htup_details.h"
110 #include "access/parallel.h"
111 #include "executor/executor.h"
112 #include "executor/hashjoin.h"
113 #include "executor/nodeHash.h"
114 #include "executor/nodeHashjoin.h"
115 #include "miscadmin.h"
116 #include "pgstat.h"
117 #include "utils/memutils.h"
118 #include "utils/sharedtuplestore.h"
119 
120 
121 /*
122  * States of the ExecHashJoin state machine
123  */
124 #define HJ_BUILD_HASHTABLE 1
125 #define HJ_NEED_NEW_OUTER 2
126 #define HJ_SCAN_BUCKET 3
127 #define HJ_FILL_OUTER_TUPLE 4
128 #define HJ_FILL_INNER_TUPLES 5
129 #define HJ_NEED_NEW_BATCH 6
130 
131 /* Returns true if doing null-fill on outer relation */
132 #define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
133 /* Returns true if doing null-fill on inner relation */
134 #define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
135 
137  HashJoinState *hjstate,
138  uint32 *hashvalue);
140  HashJoinState *hjstate,
141  uint32 *hashvalue);
143  BufFile *file,
144  uint32 *hashvalue,
145  TupleTableSlot *tupleSlot);
146 static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
147 static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
149 
150 
151 /* ----------------------------------------------------------------
152  * ExecHashJoinImpl
153  *
154  * This function implements the Hybrid Hashjoin algorithm. It is marked
155  * with an always-inline attribute so that ExecHashJoin() and
156  * ExecParallelHashJoin() can inline it. Compilers that respect the
157  * attribute should create versions specialized for parallel == true and
158  * parallel == false with unnecessary branches removed.
159  *
160  * Note: the relation we build hash table on is the "inner"
161  * the other one is "outer".
162  * ----------------------------------------------------------------
163  */
165 static inline TupleTableSlot *
166 ExecHashJoinImpl(PlanState *pstate, bool parallel)
167 {
168  HashJoinState *node = castNode(HashJoinState, pstate);
169  PlanState *outerNode;
170  HashState *hashNode;
171  ExprState *joinqual;
172  ExprState *otherqual;
173  ExprContext *econtext;
174  HashJoinTable hashtable;
175  TupleTableSlot *outerTupleSlot;
176  uint32 hashvalue;
177  int batchno;
178  ParallelHashJoinState *parallel_state;
179 
180  /*
181  * get information from HashJoin node
182  */
183  joinqual = node->js.joinqual;
184  otherqual = node->js.ps.qual;
185  hashNode = (HashState *) innerPlanState(node);
186  outerNode = outerPlanState(node);
187  hashtable = node->hj_HashTable;
188  econtext = node->js.ps.ps_ExprContext;
189  parallel_state = hashNode->parallel_state;
190 
191  /*
192  * Reset per-tuple memory context to free any expression evaluation
193  * storage allocated in the previous tuple cycle.
194  */
195  ResetExprContext(econtext);
196 
197  /*
198  * run the hash join state machine
199  */
200  for (;;)
201  {
202  /*
203  * It's possible to iterate this loop many times before returning a
204  * tuple, in some pathological cases such as needing to move much of
205  * the current batch to a later batch. So let's check for interrupts
206  * each time through.
207  */
209 
210  switch (node->hj_JoinState)
211  {
212  case HJ_BUILD_HASHTABLE:
213 
214  /*
215  * First time through: build hash table for inner relation.
216  */
217  Assert(hashtable == NULL);
218 
219  /*
220  * If the outer relation is completely empty, and it's not
221  * right/full join, we can quit without building the hash
222  * table. However, for an inner join it is only a win to
223  * check this when the outer relation's startup cost is less
224  * than the projected cost of building the hash table.
225  * Otherwise it's best to build the hash table first and see
226  * if the inner relation is empty. (When it's a left join, we
227  * should always make this check, since we aren't going to be
228  * able to skip the join on the strength of an empty inner
229  * relation anyway.)
230  *
231  * If we are rescanning the join, we make use of information
232  * gained on the previous scan: don't bother to try the
233  * prefetch if the previous scan found the outer relation
234  * nonempty. This is not 100% reliable since with new
235  * parameters the outer relation might yield different
236  * results, but it's a good heuristic.
237  *
238  * The only way to make the check is to try to fetch a tuple
239  * from the outer plan node. If we succeed, we have to stash
240  * it away for later consumption by ExecHashJoinOuterGetTuple.
241  */
242  if (HJ_FILL_INNER(node))
243  {
244  /* no chance to not build the hash table */
245  node->hj_FirstOuterTupleSlot = NULL;
246  }
247  else if (parallel)
248  {
249  /*
250  * The empty-outer optimization is not implemented for
251  * shared hash tables, because no one participant can
252  * determine that there are no outer tuples, and it's not
253  * yet clear that it's worth the synchronization overhead
254  * of reaching consensus to figure that out. So we have
255  * to build the hash table.
256  */
257  node->hj_FirstOuterTupleSlot = NULL;
258  }
259  else if (HJ_FILL_OUTER(node) ||
260  (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
261  !node->hj_OuterNotEmpty))
262  {
263  node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
265  {
266  node->hj_OuterNotEmpty = false;
267  return NULL;
268  }
269  else
270  node->hj_OuterNotEmpty = true;
271  }
272  else
273  node->hj_FirstOuterTupleSlot = NULL;
274 
275  /*
276  * Create the hash table. If using Parallel Hash, then
277  * whoever gets here first will create the hash table and any
278  * later arrivals will merely attach to it.
279  */
280  hashtable = ExecHashTableCreate(hashNode,
281  node->hj_HashOperators,
282  HJ_FILL_INNER(node));
283  node->hj_HashTable = hashtable;
284 
285  /*
286  * Execute the Hash node, to build the hash table. If using
287  * Parallel Hash, then we'll try to help hashing unless we
288  * arrived too late.
289  */
290  hashNode->hashtable = hashtable;
291  (void) MultiExecProcNode((PlanState *) hashNode);
292 
293  /*
294  * If the inner relation is completely empty, and we're not
295  * doing a left outer join, we can quit without scanning the
296  * outer relation.
297  */
298  if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
299  return NULL;
300 
301  /*
302  * need to remember whether nbatch has increased since we
303  * began scanning the outer relation
304  */
305  hashtable->nbatch_outstart = hashtable->nbatch;
306 
307  /*
308  * Reset OuterNotEmpty for scan. (It's OK if we fetched a
309  * tuple above, because ExecHashJoinOuterGetTuple will
310  * immediately set it again.)
311  */
312  node->hj_OuterNotEmpty = false;
313 
314  if (parallel)
315  {
316  Barrier *build_barrier;
317 
318  build_barrier = &parallel_state->build_barrier;
319  Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
320  BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
321  if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
322  {
323  /*
324  * If multi-batch, we need to hash the outer relation
325  * up front.
326  */
327  if (hashtable->nbatch > 1)
329  BarrierArriveAndWait(build_barrier,
331  }
332  Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
333 
334  /* Each backend should now select a batch to work on. */
335  hashtable->curbatch = -1;
337 
338  continue;
339  }
340  else
342 
343  /* FALL THRU */
344 
345  case HJ_NEED_NEW_OUTER:
346 
347  /*
348  * We don't have an outer tuple, try to get the next one
349  */
350  if (parallel)
351  outerTupleSlot =
352  ExecParallelHashJoinOuterGetTuple(outerNode, node,
353  &hashvalue);
354  else
355  outerTupleSlot =
356  ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
357 
358  if (TupIsNull(outerTupleSlot))
359  {
360  /* end of batch, or maybe whole join */
361  if (HJ_FILL_INNER(node))
362  {
363  /* set up to scan for unmatched inner tuples */
366  }
367  else
369  continue;
370  }
371 
372  econtext->ecxt_outertuple = outerTupleSlot;
373  node->hj_MatchedOuter = false;
374 
375  /*
376  * Find the corresponding bucket for this tuple in the main
377  * hash table or skew hash table.
378  */
379  node->hj_CurHashValue = hashvalue;
380  ExecHashGetBucketAndBatch(hashtable, hashvalue,
381  &node->hj_CurBucketNo, &batchno);
382  node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
383  hashvalue);
384  node->hj_CurTuple = NULL;
385 
386  /*
387  * The tuple might not belong to the current batch (where
388  * "current batch" includes the skew buckets if any).
389  */
390  if (batchno != hashtable->curbatch &&
392  {
393  /*
394  * Need to postpone this outer tuple to a later batch.
395  * Save it in the corresponding outer-batch file.
396  */
397  Assert(parallel_state == NULL);
398  Assert(batchno > hashtable->curbatch);
400  hashvalue,
401  &hashtable->outerBatchFile[batchno]);
402 
403  /* Loop around, staying in HJ_NEED_NEW_OUTER state */
404  continue;
405  }
406 
407  /* OK, let's scan the bucket for matches */
409 
410  /* FALL THRU */
411 
412  case HJ_SCAN_BUCKET:
413 
414  /*
415  * Scan the selected hash bucket for matches to current outer
416  */
417  if (parallel)
418  {
419  if (!ExecParallelScanHashBucket(node, econtext))
420  {
421  /* out of matches; check for possible outer-join fill */
423  continue;
424  }
425  }
426  else
427  {
428  if (!ExecScanHashBucket(node, econtext))
429  {
430  /* out of matches; check for possible outer-join fill */
432  continue;
433  }
434  }
435 
436  /*
437  * We've got a match, but still need to test non-hashed quals.
438  * ExecScanHashBucket already set up all the state needed to
439  * call ExecQual.
440  *
441  * If we pass the qual, then save state for next call and have
442  * ExecProject form the projection, store it in the tuple
443  * table, and return the slot.
444  *
445  * Only the joinquals determine tuple match status, but all
446  * quals must pass to actually return the tuple.
447  */
448  if (joinqual == NULL || ExecQual(joinqual, econtext))
449  {
450  node->hj_MatchedOuter = true;
452 
453  /* In an antijoin, we never return a matched tuple */
454  if (node->js.jointype == JOIN_ANTI)
455  {
457  continue;
458  }
459 
460  /*
461  * If we only need to join to the first matching inner
462  * tuple, then consider returning this one, but after that
463  * continue with next outer tuple.
464  */
465  if (node->js.single_match)
467 
468  if (otherqual == NULL || ExecQual(otherqual, econtext))
469  return ExecProject(node->js.ps.ps_ProjInfo);
470  else
471  InstrCountFiltered2(node, 1);
472  }
473  else
474  InstrCountFiltered1(node, 1);
475  break;
476 
477  case HJ_FILL_OUTER_TUPLE:
478 
479  /*
480  * The current outer tuple has run out of matches, so check
481  * whether to emit a dummy outer-join tuple. Whether we emit
482  * one or not, the next state is NEED_NEW_OUTER.
483  */
485 
486  if (!node->hj_MatchedOuter &&
487  HJ_FILL_OUTER(node))
488  {
489  /*
490  * Generate a fake join tuple with nulls for the inner
491  * tuple, and return it if it passes the non-join quals.
492  */
493  econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
494 
495  if (otherqual == NULL || ExecQual(otherqual, econtext))
496  return ExecProject(node->js.ps.ps_ProjInfo);
497  else
498  InstrCountFiltered2(node, 1);
499  }
500  break;
501 
503 
504  /*
505  * We have finished a batch, but we are doing right/full join,
506  * so any unmatched inner tuples in the hashtable have to be
507  * emitted before we continue to the next batch.
508  */
509  if (!ExecScanHashTableForUnmatched(node, econtext))
510  {
511  /* no more unmatched tuples */
513  continue;
514  }
515 
516  /*
517  * Generate a fake join tuple with nulls for the outer tuple,
518  * and return it if it passes the non-join quals.
519  */
520  econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
521 
522  if (otherqual == NULL || ExecQual(otherqual, econtext))
523  return ExecProject(node->js.ps.ps_ProjInfo);
524  else
525  InstrCountFiltered2(node, 1);
526  break;
527 
528  case HJ_NEED_NEW_BATCH:
529 
530  /*
531  * Try to advance to next batch. Done if there are no more.
532  */
533  if (parallel)
534  {
535  if (!ExecParallelHashJoinNewBatch(node))
536  return NULL; /* end of parallel-aware join */
537  }
538  else
539  {
540  if (!ExecHashJoinNewBatch(node))
541  return NULL; /* end of parallel-oblivious join */
542  }
544  break;
545 
546  default:
547  elog(ERROR, "unrecognized hashjoin state: %d",
548  (int) node->hj_JoinState);
549  }
550  }
551 }
552 
553 /* ----------------------------------------------------------------
554  * ExecHashJoin
555  *
556  * Parallel-oblivious version.
557  * ----------------------------------------------------------------
558  */
559 static TupleTableSlot * /* return: a tuple or NULL */
561 {
562  /*
563  * On sufficiently smart compilers this should be inlined with the
564  * parallel-aware branches removed.
565  */
566  return ExecHashJoinImpl(pstate, false);
567 }
568 
569 /* ----------------------------------------------------------------
570  * ExecParallelHashJoin
571  *
572  * Parallel-aware version.
573  * ----------------------------------------------------------------
574  */
575 static TupleTableSlot * /* return: a tuple or NULL */
577 {
578  /*
579  * On sufficiently smart compilers this should be inlined with the
580  * parallel-oblivious branches removed.
581  */
582  return ExecHashJoinImpl(pstate, true);
583 }
584 
585 /* ----------------------------------------------------------------
586  * ExecInitHashJoin
587  *
588  * Init routine for HashJoin node.
589  * ----------------------------------------------------------------
590  */
592 ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
593 {
594  HashJoinState *hjstate;
595  Plan *outerNode;
596  Hash *hashNode;
597  List *lclauses;
598  List *rclauses;
599  List *hoperators;
600  ListCell *l;
601 
602  /* check for unsupported flags */
603  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
604 
605  /*
606  * create state structure
607  */
608  hjstate = makeNode(HashJoinState);
609  hjstate->js.ps.plan = (Plan *) node;
610  hjstate->js.ps.state = estate;
611 
612  /*
613  * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
614  * where this function may be replaced with a parallel version, if we
615  * managed to launch a parallel query.
616  */
617  hjstate->js.ps.ExecProcNode = ExecHashJoin;
618 
619  /*
620  * Miscellaneous initialization
621  *
622  * create expression context for node
623  */
624  ExecAssignExprContext(estate, &hjstate->js.ps);
625 
626  /*
627  * initialize child expressions
628  */
629  hjstate->js.ps.qual =
630  ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
631  hjstate->js.jointype = node->join.jointype;
632  hjstate->js.joinqual =
633  ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
634  hjstate->hashclauses =
635  ExecInitQual(node->hashclauses, (PlanState *) hjstate);
636 
637  /*
638  * initialize child nodes
639  *
640  * Note: we could suppress the REWIND flag for the inner input, which
641  * would amount to betting that the hash will be a single batch. Not
642  * clear if this would be a win or not.
643  */
644  outerNode = outerPlan(node);
645  hashNode = (Hash *) innerPlan(node);
646 
647  outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
648  innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
649 
650  /*
651  * tuple table initialization
652  */
653  ExecInitResultTupleSlot(estate, &hjstate->js.ps);
654  hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate);
655 
656  /*
657  * detect whether we need only consider the first matching inner tuple
658  */
659  hjstate->js.single_match = (node->join.inner_unique ||
660  node->join.jointype == JOIN_SEMI);
661 
662  /* set up null tuples for outer joins, if needed */
663  switch (node->join.jointype)
664  {
665  case JOIN_INNER:
666  case JOIN_SEMI:
667  break;
668  case JOIN_LEFT:
669  case JOIN_ANTI:
670  hjstate->hj_NullInnerTupleSlot =
671  ExecInitNullTupleSlot(estate,
673  break;
674  case JOIN_RIGHT:
675  hjstate->hj_NullOuterTupleSlot =
676  ExecInitNullTupleSlot(estate,
678  break;
679  case JOIN_FULL:
680  hjstate->hj_NullOuterTupleSlot =
681  ExecInitNullTupleSlot(estate,
683  hjstate->hj_NullInnerTupleSlot =
684  ExecInitNullTupleSlot(estate,
686  break;
687  default:
688  elog(ERROR, "unrecognized join type: %d",
689  (int) node->join.jointype);
690  }
691 
692  /*
693  * now for some voodoo. our temporary tuple slot is actually the result
694  * tuple slot of the Hash node (which is our inner plan). we can do this
695  * because Hash nodes don't return tuples via ExecProcNode() -- instead
696  * the hash join node uses ExecScanHashBucket() to get at the contents of
697  * the hash table. -cim 6/9/91
698  */
699  {
700  HashState *hashstate = (HashState *) innerPlanState(hjstate);
701  TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot;
702 
703  hjstate->hj_HashTupleSlot = slot;
704  }
705 
706  /*
707  * initialize tuple type and projection info
708  */
709  ExecAssignResultTypeFromTL(&hjstate->js.ps);
710  ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
711 
712  ExecSetSlotDescriptor(hjstate->hj_OuterTupleSlot,
714 
715  /*
716  * initialize hash-specific info
717  */
718  hjstate->hj_HashTable = NULL;
719  hjstate->hj_FirstOuterTupleSlot = NULL;
720 
721  hjstate->hj_CurHashValue = 0;
722  hjstate->hj_CurBucketNo = 0;
723  hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
724  hjstate->hj_CurTuple = NULL;
725 
726  /*
727  * Deconstruct the hash clauses into outer and inner argument values, so
728  * that we can evaluate those subexpressions separately. Also make a list
729  * of the hash operator OIDs, in preparation for looking up the hash
730  * functions to use.
731  */
732  lclauses = NIL;
733  rclauses = NIL;
734  hoperators = NIL;
735  foreach(l, node->hashclauses)
736  {
737  OpExpr *hclause = lfirst_node(OpExpr, l);
738 
739  lclauses = lappend(lclauses, ExecInitExpr(linitial(hclause->args),
740  (PlanState *) hjstate));
741  rclauses = lappend(rclauses, ExecInitExpr(lsecond(hclause->args),
742  (PlanState *) hjstate));
743  hoperators = lappend_oid(hoperators, hclause->opno);
744  }
745  hjstate->hj_OuterHashKeys = lclauses;
746  hjstate->hj_InnerHashKeys = rclauses;
747  hjstate->hj_HashOperators = hoperators;
748  /* child Hash node needs to evaluate inner hash keys, too */
749  ((HashState *) innerPlanState(hjstate))->hashkeys = rclauses;
750 
751  hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
752  hjstate->hj_MatchedOuter = false;
753  hjstate->hj_OuterNotEmpty = false;
754 
755  return hjstate;
756 }
757 
758 /* ----------------------------------------------------------------
759  * ExecEndHashJoin
760  *
761  * clean up routine for HashJoin node
762  * ----------------------------------------------------------------
763  */
764 void
766 {
767  /*
768  * Free hash table
769  */
770  if (node->hj_HashTable)
771  {
773  node->hj_HashTable = NULL;
774  }
775 
776  /*
777  * Free the exprcontext
778  */
779  ExecFreeExprContext(&node->js.ps);
780 
781  /*
782  * clean out the tuple table
783  */
787 
788  /*
789  * clean up subtrees
790  */
793 }
794 
795 /*
796  * ExecHashJoinOuterGetTuple
797  *
798  * get the next outer tuple for a parallel oblivious hashjoin: either by
799  * executing the outer plan node in the first pass, or from the temp
800  * files for the hashjoin batches.
801  *
802  * Returns a null slot if no more outer tuples (within the current batch).
803  *
804  * On success, the tuple's hash value is stored at *hashvalue --- this is
805  * either originally computed, or re-read from the temp file.
806  */
807 static TupleTableSlot *
809  HashJoinState *hjstate,
810  uint32 *hashvalue)
811 {
812  HashJoinTable hashtable = hjstate->hj_HashTable;
813  int curbatch = hashtable->curbatch;
814  TupleTableSlot *slot;
815 
816  if (curbatch == 0) /* if it is the first pass */
817  {
818  /*
819  * Check to see if first outer tuple was already fetched by
820  * ExecHashJoin() and not used yet.
821  */
822  slot = hjstate->hj_FirstOuterTupleSlot;
823  if (!TupIsNull(slot))
824  hjstate->hj_FirstOuterTupleSlot = NULL;
825  else
826  slot = ExecProcNode(outerNode);
827 
828  while (!TupIsNull(slot))
829  {
830  /*
831  * We have to compute the tuple's hash value.
832  */
833  ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
834 
835  econtext->ecxt_outertuple = slot;
836  if (ExecHashGetHashValue(hashtable, econtext,
837  hjstate->hj_OuterHashKeys,
838  true, /* outer tuple */
839  HJ_FILL_OUTER(hjstate),
840  hashvalue))
841  {
842  /* remember outer relation is not empty for possible rescan */
843  hjstate->hj_OuterNotEmpty = true;
844 
845  return slot;
846  }
847 
848  /*
849  * That tuple couldn't match because of a NULL, so discard it and
850  * continue with the next one.
851  */
852  slot = ExecProcNode(outerNode);
853  }
854  }
855  else if (curbatch < hashtable->nbatch)
856  {
857  BufFile *file = hashtable->outerBatchFile[curbatch];
858 
859  /*
860  * In outer-join cases, we could get here even though the batch file
861  * is empty.
862  */
863  if (file == NULL)
864  return NULL;
865 
866  slot = ExecHashJoinGetSavedTuple(hjstate,
867  file,
868  hashvalue,
869  hjstate->hj_OuterTupleSlot);
870  if (!TupIsNull(slot))
871  return slot;
872  }
873 
874  /* End of this batch */
875  return NULL;
876 }
877 
878 /*
879  * ExecHashJoinOuterGetTuple variant for the parallel case.
880  */
881 static TupleTableSlot *
883  HashJoinState *hjstate,
884  uint32 *hashvalue)
885 {
886  HashJoinTable hashtable = hjstate->hj_HashTable;
887  int curbatch = hashtable->curbatch;
888  TupleTableSlot *slot;
889 
890  /*
891  * In the Parallel Hash case we only run the outer plan directly for
892  * single-batch hash joins. Otherwise we have to go to batch files, even
893  * for batch 0.
894  */
895  if (curbatch == 0 && hashtable->nbatch == 1)
896  {
897  slot = ExecProcNode(outerNode);
898 
899  while (!TupIsNull(slot))
900  {
901  ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
902 
903  econtext->ecxt_outertuple = slot;
904  if (ExecHashGetHashValue(hashtable, econtext,
905  hjstate->hj_OuterHashKeys,
906  true, /* outer tuple */
907  HJ_FILL_OUTER(hjstate),
908  hashvalue))
909  return slot;
910 
911  /*
912  * That tuple couldn't match because of a NULL, so discard it and
913  * continue with the next one.
914  */
915  slot = ExecProcNode(outerNode);
916  }
917  }
918  else if (curbatch < hashtable->nbatch)
919  {
920  MinimalTuple tuple;
921 
922  tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
923  hashvalue);
924  if (tuple != NULL)
925  {
926  slot = ExecStoreMinimalTuple(tuple,
927  hjstate->hj_OuterTupleSlot,
928  false);
929  return slot;
930  }
931  else
933  }
934 
935  /* End of this batch */
936  return NULL;
937 }
938 
939 /*
940  * ExecHashJoinNewBatch
941  * switch to a new hashjoin batch
942  *
943  * Returns true if successful, false if there are no more batches.
944  */
945 static bool
947 {
948  HashJoinTable hashtable = hjstate->hj_HashTable;
949  int nbatch;
950  int curbatch;
951  BufFile *innerFile;
952  TupleTableSlot *slot;
953  uint32 hashvalue;
954 
955  nbatch = hashtable->nbatch;
956  curbatch = hashtable->curbatch;
957 
958  if (curbatch > 0)
959  {
960  /*
961  * We no longer need the previous outer batch file; close it right
962  * away to free disk space.
963  */
964  if (hashtable->outerBatchFile[curbatch])
965  BufFileClose(hashtable->outerBatchFile[curbatch]);
966  hashtable->outerBatchFile[curbatch] = NULL;
967  }
968  else /* we just finished the first batch */
969  {
970  /*
971  * Reset some of the skew optimization state variables, since we no
972  * longer need to consider skew tuples after the first batch. The
973  * memory context reset we are about to do will release the skew
974  * hashtable itself.
975  */
976  hashtable->skewEnabled = false;
977  hashtable->skewBucket = NULL;
978  hashtable->skewBucketNums = NULL;
979  hashtable->nSkewBuckets = 0;
980  hashtable->spaceUsedSkew = 0;
981  }
982 
983  /*
984  * We can always skip over any batches that are completely empty on both
985  * sides. We can sometimes skip over batches that are empty on only one
986  * side, but there are exceptions:
987  *
988  * 1. In a left/full outer join, we have to process outer batches even if
989  * the inner batch is empty. Similarly, in a right/full outer join, we
990  * have to process inner batches even if the outer batch is empty.
991  *
992  * 2. If we have increased nbatch since the initial estimate, we have to
993  * scan inner batches since they might contain tuples that need to be
994  * reassigned to later inner batches.
995  *
996  * 3. Similarly, if we have increased nbatch since starting the outer
997  * scan, we have to rescan outer batches in case they contain tuples that
998  * need to be reassigned.
999  */
1000  curbatch++;
1001  while (curbatch < nbatch &&
1002  (hashtable->outerBatchFile[curbatch] == NULL ||
1003  hashtable->innerBatchFile[curbatch] == NULL))
1004  {
1005  if (hashtable->outerBatchFile[curbatch] &&
1006  HJ_FILL_OUTER(hjstate))
1007  break; /* must process due to rule 1 */
1008  if (hashtable->innerBatchFile[curbatch] &&
1009  HJ_FILL_INNER(hjstate))
1010  break; /* must process due to rule 1 */
1011  if (hashtable->innerBatchFile[curbatch] &&
1012  nbatch != hashtable->nbatch_original)
1013  break; /* must process due to rule 2 */
1014  if (hashtable->outerBatchFile[curbatch] &&
1015  nbatch != hashtable->nbatch_outstart)
1016  break; /* must process due to rule 3 */
1017  /* We can ignore this batch. */
1018  /* Release associated temp files right away. */
1019  if (hashtable->innerBatchFile[curbatch])
1020  BufFileClose(hashtable->innerBatchFile[curbatch]);
1021  hashtable->innerBatchFile[curbatch] = NULL;
1022  if (hashtable->outerBatchFile[curbatch])
1023  BufFileClose(hashtable->outerBatchFile[curbatch]);
1024  hashtable->outerBatchFile[curbatch] = NULL;
1025  curbatch++;
1026  }
1027 
1028  if (curbatch >= nbatch)
1029  return false; /* no more batches */
1030 
1031  hashtable->curbatch = curbatch;
1032 
1033  /*
1034  * Reload the hash table with the new inner batch (which could be empty)
1035  */
1036  ExecHashTableReset(hashtable);
1037 
1038  innerFile = hashtable->innerBatchFile[curbatch];
1039 
1040  if (innerFile != NULL)
1041  {
1042  if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))
1043  ereport(ERROR,
1045  errmsg("could not rewind hash-join temporary file: %m")));
1046 
1047  while ((slot = ExecHashJoinGetSavedTuple(hjstate,
1048  innerFile,
1049  &hashvalue,
1050  hjstate->hj_HashTupleSlot)))
1051  {
1052  /*
1053  * NOTE: some tuples may be sent to future batches. Also, it is
1054  * possible for hashtable->nbatch to be increased here!
1055  */
1056  ExecHashTableInsert(hashtable, slot, hashvalue);
1057  }
1058 
1059  /*
1060  * after we build the hash table, the inner batch file is no longer
1061  * needed
1062  */
1063  BufFileClose(innerFile);
1064  hashtable->innerBatchFile[curbatch] = NULL;
1065  }
1066 
1067  /*
1068  * Rewind outer batch file (if present), so that we can start reading it.
1069  */
1070  if (hashtable->outerBatchFile[curbatch] != NULL)
1071  {
1072  if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
1073  ereport(ERROR,
1075  errmsg("could not rewind hash-join temporary file: %m")));
1076  }
1077 
1078  return true;
1079 }
1080 
1081 /*
1082  * Choose a batch to work on, and attach to it. Returns true if successful,
1083  * false if there are no more batches.
1084  */
1085 static bool
1087 {
1088  HashJoinTable hashtable = hjstate->hj_HashTable;
1089  int start_batchno;
1090  int batchno;
1091 
1092  /*
1093  * If we started up so late that the batch tracking array has been freed
1094  * already by ExecHashTableDetach(), then we are finished. See also
1095  * ExecParallelHashEnsureBatchAccessors().
1096  */
1097  if (hashtable->batches == NULL)
1098  return false;
1099 
1100  /*
1101  * If we were already attached to a batch, remember not to bother checking
1102  * it again, and detach from it (possibly freeing the hash table if we are
1103  * last to detach).
1104  */
1105  if (hashtable->curbatch >= 0)
1106  {
1107  hashtable->batches[hashtable->curbatch].done = true;
1108  ExecHashTableDetachBatch(hashtable);
1109  }
1110 
1111  /*
1112  * Search for a batch that isn't done. We use an atomic counter to start
1113  * our search at a different batch in every participant when there are
1114  * more batches than participants.
1115  */
1116  batchno = start_batchno =
1118  hashtable->nbatch;
1119  do
1120  {
1121  uint32 hashvalue;
1122  MinimalTuple tuple;
1123  TupleTableSlot *slot;
1124 
1125  if (!hashtable->batches[batchno].done)
1126  {
1127  SharedTuplestoreAccessor *inner_tuples;
1128  Barrier *batch_barrier =
1129  &hashtable->batches[batchno].shared->batch_barrier;
1130 
1131  switch (BarrierAttach(batch_barrier))
1132  {
1133  case PHJ_BATCH_ELECTING:
1134 
1135  /* One backend allocates the hash table. */
1136  if (BarrierArriveAndWait(batch_barrier,
1138  ExecParallelHashTableAlloc(hashtable, batchno);
1139  /* Fall through. */
1140 
1141  case PHJ_BATCH_ALLOCATING:
1142  /* Wait for allocation to complete. */
1143  BarrierArriveAndWait(batch_barrier,
1145  /* Fall through. */
1146 
1147  case PHJ_BATCH_LOADING:
1148  /* Start (or join in) loading tuples. */
1149  ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1150  inner_tuples = hashtable->batches[batchno].inner_tuples;
1151  sts_begin_parallel_scan(inner_tuples);
1152  while ((tuple = sts_parallel_scan_next(inner_tuples,
1153  &hashvalue)))
1154  {
1155  slot = ExecStoreMinimalTuple(tuple,
1156  hjstate->hj_HashTupleSlot,
1157  false);
1159  hashvalue);
1160  }
1161  sts_end_parallel_scan(inner_tuples);
1162  BarrierArriveAndWait(batch_barrier,
1164  /* Fall through. */
1165 
1166  case PHJ_BATCH_PROBING:
1167 
1168  /*
1169  * This batch is ready to probe. Return control to
1170  * caller. We stay attached to batch_barrier so that the
1171  * hash table stays alive until everyone's finished
1172  * probing it, but no participant is allowed to wait at
1173  * this barrier again (or else a deadlock could occur).
1174  * All attached participants must eventually call
1175  * BarrierArriveAndDetach() so that the final phase
1176  * PHJ_BATCH_DONE can be reached.
1177  */
1178  ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1179  sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
1180  return true;
1181 
1182  case PHJ_BATCH_DONE:
1183 
1184  /*
1185  * Already done. Detach and go around again (if any
1186  * remain).
1187  */
1188  BarrierDetach(batch_barrier);
1189  hashtable->batches[batchno].done = true;
1190  hashtable->curbatch = -1;
1191  break;
1192 
1193  default:
1194  elog(ERROR, "unexpected batch phase %d",
1195  BarrierPhase(batch_barrier));
1196  }
1197  }
1198  batchno = (batchno + 1) % hashtable->nbatch;
1199  } while (batchno != start_batchno);
1200 
1201  return false;
1202 }
1203 
1204 /*
1205  * ExecHashJoinSaveTuple
1206  * save a tuple to a batch file.
1207  *
1208  * The data recorded in the file for each tuple is its hash value,
1209  * then the tuple in MinimalTuple format.
1210  *
1211  * Note: it is important always to call this in the regular executor
1212  * context, not in a shorter-lived context; else the temp file buffers
1213  * will get messed up.
1214  */
1215 void
1217  BufFile **fileptr)
1218 {
1219  BufFile *file = *fileptr;
1220  size_t written;
1221 
1222  if (file == NULL)
1223  {
1224  /* First write to this batch file, so open it. */
1225  file = BufFileCreateTemp(false);
1226  *fileptr = file;
1227  }
1228 
1229  written = BufFileWrite(file, (void *) &hashvalue, sizeof(uint32));
1230  if (written != sizeof(uint32))
1231  ereport(ERROR,
1233  errmsg("could not write to hash-join temporary file: %m")));
1234 
1235  written = BufFileWrite(file, (void *) tuple, tuple->t_len);
1236  if (written != tuple->t_len)
1237  ereport(ERROR,
1239  errmsg("could not write to hash-join temporary file: %m")));
1240 }
1241 
1242 /*
1243  * ExecHashJoinGetSavedTuple
1244  * read the next tuple from a batch file. Return NULL if no more.
1245  *
1246  * On success, *hashvalue is set to the tuple's hash value, and the tuple
1247  * itself is stored in the given slot.
1248  */
1249 static TupleTableSlot *
1251  BufFile *file,
1252  uint32 *hashvalue,
1253  TupleTableSlot *tupleSlot)
1254 {
1255  uint32 header[2];
1256  size_t nread;
1257  MinimalTuple tuple;
1258 
1259  /*
1260  * We check for interrupts here because this is typically taken as an
1261  * alternative code path to an ExecProcNode() call, which would include
1262  * such a check.
1263  */
1265 
1266  /*
1267  * Since both the hash value and the MinimalTuple length word are uint32,
1268  * we can read them both in one BufFileRead() call without any type
1269  * cheating.
1270  */
1271  nread = BufFileRead(file, (void *) header, sizeof(header));
1272  if (nread == 0) /* end of file */
1273  {
1274  ExecClearTuple(tupleSlot);
1275  return NULL;
1276  }
1277  if (nread != sizeof(header))
1278  ereport(ERROR,
1280  errmsg("could not read from hash-join temporary file: %m")));
1281  *hashvalue = header[0];
1282  tuple = (MinimalTuple) palloc(header[1]);
1283  tuple->t_len = header[1];
1284  nread = BufFileRead(file,
1285  (void *) ((char *) tuple + sizeof(uint32)),
1286  header[1] - sizeof(uint32));
1287  if (nread != header[1] - sizeof(uint32))
1288  ereport(ERROR,
1290  errmsg("could not read from hash-join temporary file: %m")));
1291  return ExecStoreMinimalTuple(tuple, tupleSlot, true);
1292 }
1293 
1294 
1295 void
1297 {
1298  /*
1299  * In a multi-batch join, we currently have to do rescans the hard way,
1300  * primarily because batch temp files may have already been released. But
1301  * if it's a single-batch join, and there is no parameter change for the
1302  * inner subnode, then we can just re-use the existing hash table without
1303  * rebuilding it.
1304  */
1305  if (node->hj_HashTable != NULL)
1306  {
1307  if (node->hj_HashTable->nbatch == 1 &&
1308  node->js.ps.righttree->chgParam == NULL)
1309  {
1310  /*
1311  * Okay to reuse the hash table; needn't rescan inner, either.
1312  *
1313  * However, if it's a right/full join, we'd better reset the
1314  * inner-tuple match flags contained in the table.
1315  */
1316  if (HJ_FILL_INNER(node))
1318 
1319  /*
1320  * Also, we need to reset our state about the emptiness of the
1321  * outer relation, so that the new scan of the outer will update
1322  * it correctly if it turns out to be empty this time. (There's no
1323  * harm in clearing it now because ExecHashJoin won't need the
1324  * info. In the other cases, where the hash table doesn't exist
1325  * or we are destroying it, we leave this state alone because
1326  * ExecHashJoin will need it the first time through.)
1327  */
1328  node->hj_OuterNotEmpty = false;
1329 
1330  /* ExecHashJoin can skip the BUILD_HASHTABLE step */
1332  }
1333  else
1334  {
1335  /* must destroy and rebuild hash table */
1337  node->hj_HashTable = NULL;
1339 
1340  /*
1341  * if chgParam of subnode is not null then plan will be re-scanned
1342  * by first ExecProcNode.
1343  */
1344  if (node->js.ps.righttree->chgParam == NULL)
1345  ExecReScan(node->js.ps.righttree);
1346  }
1347  }
1348 
1349  /* Always reset intra-tuple state */
1350  node->hj_CurHashValue = 0;
1351  node->hj_CurBucketNo = 0;
1353  node->hj_CurTuple = NULL;
1354 
1355  node->hj_MatchedOuter = false;
1356  node->hj_FirstOuterTupleSlot = NULL;
1357 
1358  /*
1359  * if chgParam of subnode is not null then plan will be re-scanned by
1360  * first ExecProcNode.
1361  */
1362  if (node->js.ps.lefttree->chgParam == NULL)
1363  ExecReScan(node->js.ps.lefttree);
1364 }
1365 
1366 void
1368 {
1369  if (node->hj_HashTable)
1370  {
1371  /*
1372  * Detach from shared state before DSM memory goes away. This makes
1373  * sure that we don't have any pointers into DSM memory by the time
1374  * ExecEndHashJoin runs.
1375  */
1378  }
1379 }
1380 
1381 static void
1383 {
1384  PlanState *outerState = outerPlanState(hjstate);
1385  ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1386  HashJoinTable hashtable = hjstate->hj_HashTable;
1387  TupleTableSlot *slot;
1388  uint32 hashvalue;
1389  int i;
1390 
1391  Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
1392 
1393  /* Execute outer plan, writing all tuples to shared tuplestores. */
1394  for (;;)
1395  {
1396  slot = ExecProcNode(outerState);
1397  if (TupIsNull(slot))
1398  break;
1399  econtext->ecxt_outertuple = slot;
1400  if (ExecHashGetHashValue(hashtable, econtext,
1401  hjstate->hj_OuterHashKeys,
1402  true, /* outer tuple */
1403  false, /* outer join, currently unsupported */
1404  &hashvalue))
1405  {
1406  int batchno;
1407  int bucketno;
1408 
1409  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
1410  &batchno);
1411  sts_puttuple(hashtable->batches[batchno].outer_tuples,
1412  &hashvalue, ExecFetchSlotMinimalTuple(slot));
1413  }
1415  }
1416 
1417  /* Make sure all outer partitions are readable by any backend. */
1418  for (i = 0; i < hashtable->nbatch; ++i)
1419  sts_end_write(hashtable->batches[i].outer_tuples);
1420 }
1421 
1422 void
1424 {
1426  shm_toc_estimate_keys(&pcxt->estimator, 1);
1427 }
1428 
1429 void
1431 {
1432  int plan_node_id = state->js.ps.plan->plan_node_id;
1433  HashState *hashNode;
1434  ParallelHashJoinState *pstate;
1435 
1436  /*
1437  * Disable shared hash table mode if we failed to create a real DSM
1438  * segment, because that means that we don't have a DSA area to work with.
1439  */
1440  if (pcxt->seg == NULL)
1441  return;
1442 
1444 
1445  /*
1446  * Set up the state needed to coordinate access to the shared hash
1447  * table(s), using the plan node ID as the toc key.
1448  */
1449  pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
1450  shm_toc_insert(pcxt->toc, plan_node_id, pstate);
1451 
1452  /*
1453  * Set up the shared hash join state with no batches initially.
1454  * ExecHashTableCreate() will prepare at least one later and set nbatch
1455  * and space_allowed.
1456  */
1457  pstate->nbatch = 0;
1458  pstate->space_allowed = 0;
1459  pstate->batches = InvalidDsaPointer;
1460  pstate->old_batches = InvalidDsaPointer;
1461  pstate->nbuckets = 0;
1462  pstate->growth = PHJ_GROWTH_OK;
1464  pg_atomic_init_u32(&pstate->distributor, 0);
1465  pstate->nparticipants = pcxt->nworkers + 1;
1466  pstate->total_tuples = 0;
1467  LWLockInitialize(&pstate->lock,
1469  BarrierInit(&pstate->build_barrier, 0);
1470  BarrierInit(&pstate->grow_batches_barrier, 0);
1471  BarrierInit(&pstate->grow_buckets_barrier, 0);
1472 
1473  /* Set up the space we'll use for shared temporary files. */
1474  SharedFileSetInit(&pstate->fileset, pcxt->seg);
1475 
1476  /* Initialize the shared state in the hash node. */
1477  hashNode = (HashState *) innerPlanState(state);
1478  hashNode->parallel_state = pstate;
1479 }
1480 
1481 /* ----------------------------------------------------------------
1482  * ExecHashJoinReInitializeDSM
1483  *
1484  * Reset shared state before beginning a fresh scan.
1485  * ----------------------------------------------------------------
1486  */
1487 void
1489 {
1490  int plan_node_id = state->js.ps.plan->plan_node_id;
1491  ParallelHashJoinState *pstate =
1492  shm_toc_lookup(cxt->toc, plan_node_id, false);
1493 
1494  /*
1495  * It would be possible to reuse the shared hash table in single-batch
1496  * cases by resetting and then fast-forwarding build_barrier to
1497  * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but
1498  * currently shared hash tables are already freed by now (by the last
1499  * participant to detach from the batch). We could consider keeping it
1500  * around for single-batch joins. We'd also need to adjust
1501  * finalize_plan() so that it doesn't record a dummy dependency for
1502  * Parallel Hash nodes, preventing the rescan optimization. For now we
1503  * don't try.
1504  */
1505 
1506  /* Detach, freeing any remaining shared memory. */
1507  if (state->hj_HashTable != NULL)
1508  {
1511  }
1512 
1513  /* Clear any shared batch files. */
1514  SharedFileSetDeleteAll(&pstate->fileset);
1515 
1516  /* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */
1517  BarrierInit(&pstate->build_barrier, 0);
1518 }
1519 
1520 void
1522  ParallelWorkerContext *pwcxt)
1523 {
1524  HashState *hashNode;
1525  int plan_node_id = state->js.ps.plan->plan_node_id;
1526  ParallelHashJoinState *pstate =
1527  shm_toc_lookup(pwcxt->toc, plan_node_id, false);
1528 
1529  /* Attach to the space for shared temporary files. */
1530  SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
1531 
1532  /* Attach to the shared state in the hash node. */
1533  hashNode = (HashState *) innerPlanState(state);
1534  hashNode->parallel_state = pstate;
1535 
1537 }
JoinType jointype
Definition: execnodes.h:1612
HashJoinTable ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
Definition: nodeHash.c:433
struct ParallelHashJoinState * parallel_state
Definition: execnodes.h:2037
#define NIL
Definition: pg_list.h:69
SharedTuplestoreAccessor * outer_tuples
Definition: hashjoin.h:209
#define HJ_NEED_NEW_BATCH
Definition: nodeHashjoin.c:129
#define PHJ_BATCH_DONE
Definition: hashjoin.h:268
List * qual
Definition: plannodes.h:145
#define INVALID_SKEW_BUCKET_NO
Definition: hashjoin.h:109
#define HJ_SCAN_BUCKET
Definition: nodeHashjoin.c:126
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
dsa_pointer chunk_work_queue
Definition: hashjoin.h:242
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:384
void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
Definition: nodeHash.c:1739
TupleTableSlot * hj_NullInnerTupleSlot
Definition: execnodes.h:1727
ExprState * joinqual
Definition: execnodes.h:1615
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:893
#define InvalidDsaPointer
Definition: dsa.h:78
void BarrierInit(Barrier *barrier, int participants)
Definition: barrier.c:100
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:47
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:676
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
static TupleTableSlot * ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue)
Definition: nodeHashjoin.c:882
PlanState ps
Definition: execnodes.h:1611
#define castNode(_type_, nodeptr)
Definition: nodes.h:581
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:539
bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
Definition: nodeHash.c:2053
void ExecHashTableDetachBatch(HashJoinTable hashtable)
Definition: nodeHash.c:3051
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot)
Definition: execTuples.c:652
void ExecHashTableDetach(HashJoinTable hashtable)
Definition: nodeHash.c:3108
dsm_segment * seg
Definition: parallel.h:42
List * hashclauses
Definition: plannodes.h:727
#define PHJ_BATCH_ALLOCATING
Definition: hashjoin.h:265
void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
Definition: nodeHash.c:2029
ExprContext * ps_ExprContext
Definition: execnodes.h:892
void ExecHashTableReset(HashJoinTable hashtable)
Definition: nodeHash.c:2124
shm_toc_estimator estimator
Definition: parallel.h:41
void ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
Definition: nodeHash.c:3193
bool single_match
Definition: execnodes.h:1613
HashJoinTable hashtable
Definition: execnodes.h:2029
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
int plan_node_id
Definition: plannodes.h:143
void ExecShutdownHashJoin(HashJoinState *node)
bool hj_MatchedOuter
Definition: execnodes.h:1730
static TupleTableSlot * ExecHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue)
Definition: nodeHashjoin.c:808
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
Definition: nodeHash.c:3031
EState * state
Definition: execnodes.h:860
TupleTableSlot * hj_OuterTupleSlot
Definition: execnodes.h:1724
struct PlanState * righttree
Definition: execnodes.h:878
static bool ExecQual(ExprState *state, ExprContext *econtext)
Definition: executor.h:363
List * hj_OuterHashKeys
Definition: execnodes.h:1716
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
List * lappend_oid(List *list, Oid datum)
Definition: list.c:164
SharedFileSet fileset
Definition: hashjoin.h:253
TupleTableSlot * hj_FirstOuterTupleSlot
Definition: execnodes.h:1728
#define PHJ_BUILD_HASHING_OUTER
Definition: hashjoin.h:260
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:603
#define lsecond(l)
Definition: pg_list.h:116
Join join
Definition: plannodes.h:726
void BufFileClose(BufFile *file)
Definition: buffile.c:397
int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
Definition: nodeHash.c:2353
ExprState * ExecInitQual(List *qual, PlanState *parent)
Definition: execExpr.c:204
void ExecAssignResultTypeFromTL(PlanState *planstate)
Definition: execUtils.c:448
#define PHJ_BATCH_LOADING
Definition: hashjoin.h:266
#define PHJ_BATCH_ELECTING
Definition: hashjoin.h:264
struct PlanState * lefttree
Definition: execnodes.h:877
void ExecEndHashJoin(HashJoinState *node)
Definition: nodeHashjoin.c:765
SharedTuplestoreAccessor * inner_tuples
Definition: hashjoin.h:208
#define HJ_FILL_INNER(hjstate)
Definition: nodeHashjoin.c:134
int * skewBucketNums
Definition: hashjoin.h:308
void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
Definition: nodeHash.c:1593
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
Definition: nodeHash.c:1879
JoinType jointype
Definition: plannodes.h:668
uint32 hj_CurHashValue
Definition: execnodes.h:1720
int hj_CurSkewBucketNo
Definition: execnodes.h:1722
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:891
TupleTableSlot * ExecInitNullTupleSlot(EState *estate, TupleDesc tupType)
Definition: execTuples.c:866
#define linitial(l)
Definition: pg_list.h:111
Barrier grow_buckets_barrier
Definition: hashjoin.h:250
#define ERROR
Definition: elog.h:43
TupleTableSlot * hj_NullOuterTupleSlot
Definition: execnodes.h:1726
dsa_pointer batches
Definition: hashjoin.h:236
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
static void ExecParallelHashJoinPartitionOuter(HashJoinState *node)
BufFile * BufFileCreateTemp(bool interXact)
Definition: buffile.c:182
void ExecInitResultTupleSlot(EState *estate, PlanState *planstate)
Definition: execTuples.c:832
#define EXEC_FLAG_BACKWARD
Definition: executor.h:60
BufFile ** outerBatchFile
Definition: hashjoin.h:330
#define lfirst_node(type, lc)
Definition: pg_list.h:109
#define outerPlanState(node)
Definition: execnodes.h:904
#define innerPlan(node)
Definition: plannodes.h:173
Cost startup_cost
Definition: plannodes.h:125
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
Definition: execUtils.c:495
HashJoinTuple hj_CurTuple
Definition: execnodes.h:1723
MinimalTupleData * MinimalTuple
Definition: htup.h:27
bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
Definition: nodeHash.c:1911
int errcode_for_file_access(void)
Definition: elog.c:598
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:209
#define TupIsNull(slot)
Definition: tuptable.h:138
unsigned int uint32
Definition: c.h:306
PlanState ps
Definition: execnodes.h:2028
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
#define InstrCountFiltered1(node, delta)
Definition: execnodes.h:907
#define ereport(elevel, rest)
Definition: elog.h:122
List * hj_HashOperators
Definition: execnodes.h:1718
Bitmapset * chgParam
Definition: execnodes.h:886
#define outerPlan(node)
Definition: plannodes.h:174
List * lappend(List *list, void *datum)
Definition: list.c:128
#define PHJ_BUILD_DONE
Definition: hashjoin.h:261
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:676
int hj_CurBucketNo
Definition: execnodes.h:1721
#define HJ_FILL_INNER_TUPLES
Definition: nodeHashjoin.c:128
void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
#define HJ_FILL_OUTER(hjstate)
Definition: nodeHashjoin.c:132
HashSkewBucket ** skewBucket
Definition: hashjoin.h:305
HashJoinState * ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
Definition: nodeHashjoin.c:592
int BarrierAttach(Barrier *barrier)
Definition: barrier.c:214
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:864
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:356
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:240
#define PHJ_BATCH_PROBING
Definition: hashjoin.h:267
void ExecHashJoinInitializeWorker(HashJoinState *state, ParallelWorkerContext *pwcxt)
void ExecSetExecProcNode(PlanState *node, ExecProcNodeMtd function)
Definition: execProcnode.c:406
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:357
Plan * plan
Definition: execnodes.h:858
double totalTuples
Definition: hashjoin.h:318
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:339
#define makeNode(_type_)
Definition: nodes.h:560
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:210
void SharedFileSetDeleteAll(SharedFileSet *fileset)
bool hj_OuterNotEmpty
Definition: execnodes.h:1731
#define Assert(condition)
Definition: c.h:680
#define EXEC_FLAG_MARK
Definition: executor.h:61
Definition: regguts.h:298
ParallelHashGrowth growth
Definition: hashjoin.h:241
#define InstrCountFiltered2(node, delta)
Definition: execnodes.h:912
dsa_pointer old_batches
Definition: hashjoin.h:237
bool BarrierDetach(Barrier *barrier)
Definition: barrier.c:234
static pg_attribute_always_inline TupleTableSlot * ExecHashJoinImpl(PlanState *pstate, bool parallel)
Definition: nodeHashjoin.c:166
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:426
BufFile ** innerBatchFile
Definition: hashjoin.h:329
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:243
static TupleTableSlot * ExecHashJoinGetSavedTuple(HashJoinState *hjstate, BufFile *file, uint32 *hashvalue, TupleTableSlot *tupleSlot)
static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
static TupleTableSlot * ExecParallelHashJoin(PlanState *pstate)
Definition: nodeHashjoin.c:576
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
static void header(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:208
TupleDesc ExecGetResultType(PlanState *planstate)
Definition: execUtils.c:477
ParallelHashJoinBatch * shared
Definition: hashjoin.h:197
#define HJ_NEED_NEW_OUTER
Definition: nodeHashjoin.c:125
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
Definition: barrier.c:125
ExprState * qual
Definition: execnodes.h:876
void ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:1725
pg_atomic_uint32 distributor
Definition: hashjoin.h:251
#define HJ_BUILD_HASHTABLE
Definition: nodeHashjoin.c:124
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * palloc(Size size)
Definition: mcxt.c:835
HashJoinTable hj_HashTable
Definition: execnodes.h:1719
int errmsg(const char *fmt,...)
Definition: elog.c:797
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:76
Node * MultiExecProcNode(PlanState *node)
Definition: execProcnode.c:484
int i
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:554
Cost total_cost
Definition: plannodes.h:126
#define HeapTupleHeaderSetMatch(tup)
Definition: htup_details.h:527
size_t BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:601
static TupleTableSlot * ExecHashJoin(PlanState *pstate)
Definition: nodeHashjoin.c:560
void ExecHashTableResetMatchFlags(HashJoinTable hashtable)
Definition: nodeHash.c:2153
#define pg_attribute_always_inline
Definition: c.h:156
bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, bool outer_tuple, bool keep_nulls, uint32 *hashvalue)
Definition: nodeHash.c:1775
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:118
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
Oid opno
Definition: primnodes.h:496
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:234
#define HJ_FILL_OUTER_TUPLE
Definition: nodeHashjoin.c:127
#define elog
Definition: elog.h:219
bool inner_unique
Definition: plannodes.h:669
List * args
Definition: primnodes.h:502
#define innerPlanState(node)
Definition: execnodes.h:903
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:139
bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
Definition: nodeHash.c:1975
Definition: pg_list.h:45
JoinState js
Definition: execnodes.h:1714
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
List * joinqual
Definition: plannodes.h:670
static bool ExecHashJoinNewBatch(HashJoinState *hjstate)
Definition: nodeHashjoin.c:946
Barrier grow_batches_barrier
Definition: hashjoin.h:249
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr)
void ExecHashTableDestroy(HashJoinTable hashtable)
Definition: nodeHash.c:853
dsm_segment * seg
Definition: parallel.h:50
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
Definition: executor.h:326
#define ResetExprContext(econtext)
Definition: executor.h:468
shm_toc * toc
Definition: parallel.h:44
Plan plan
Definition: plannodes.h:667
void sts_end_write(SharedTuplestoreAccessor *accessor)
void ExecReScanHashJoin(HashJoinState *node)