PostgreSQL Source Code  git master
nodeHashjoin.c File Reference
#include "postgres.h"
#include "access/htup_details.h"
#include "access/parallel.h"
#include "executor/executor.h"
#include "executor/hashjoin.h"
#include "executor/nodeHash.h"
#include "executor/nodeHashjoin.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "utils/memutils.h"
#include "utils/sharedtuplestore.h"
Include dependency graph for nodeHashjoin.c:

Go to the source code of this file.

Macros

#define HJ_BUILD_HASHTABLE   1
 
#define HJ_NEED_NEW_OUTER   2
 
#define HJ_SCAN_BUCKET   3
 
#define HJ_FILL_OUTER_TUPLE   4
 
#define HJ_FILL_INNER_TUPLES   5
 
#define HJ_NEED_NEW_BATCH   6
 
#define HJ_FILL_OUTER(hjstate)   ((hjstate)->hj_NullInnerTupleSlot != NULL)
 
#define HJ_FILL_INNER(hjstate)   ((hjstate)->hj_NullOuterTupleSlot != NULL)
 

Functions

static TupleTableSlotExecHashJoinOuterGetTuple (PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue)
 
static TupleTableSlotExecParallelHashJoinOuterGetTuple (PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue)
 
static TupleTableSlotExecHashJoinGetSavedTuple (HashJoinState *hjstate, BufFile *file, uint32 *hashvalue, TupleTableSlot *tupleSlot)
 
static bool ExecHashJoinNewBatch (HashJoinState *hjstate)
 
static bool ExecParallelHashJoinNewBatch (HashJoinState *hjstate)
 
static void ExecParallelHashJoinPartitionOuter (HashJoinState *node)
 
static pg_attribute_always_inline TupleTableSlotExecHashJoinImpl (PlanState *pstate, bool parallel)
 
static TupleTableSlotExecHashJoin (PlanState *pstate)
 
static TupleTableSlotExecParallelHashJoin (PlanState *pstate)
 
HashJoinStateExecInitHashJoin (HashJoin *node, EState *estate, int eflags)
 
void ExecEndHashJoin (HashJoinState *node)
 
void ExecHashJoinSaveTuple (MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr)
 
void ExecReScanHashJoin (HashJoinState *node)
 
void ExecShutdownHashJoin (HashJoinState *node)
 
void ExecHashJoinEstimate (HashJoinState *state, ParallelContext *pcxt)
 
void ExecHashJoinInitializeDSM (HashJoinState *state, ParallelContext *pcxt)
 
void ExecHashJoinReInitializeDSM (HashJoinState *state, ParallelContext *cxt)
 
void ExecHashJoinInitializeWorker (HashJoinState *state, ParallelWorkerContext *pwcxt)
 

Macro Definition Documentation

◆ HJ_BUILD_HASHTABLE

#define HJ_BUILD_HASHTABLE   1

Definition at line 124 of file nodeHashjoin.c.

Referenced by ExecHashJoinImpl(), ExecInitHashJoin(), and ExecReScanHashJoin().

◆ HJ_FILL_INNER

#define HJ_FILL_INNER (   hjstate)    ((hjstate)->hj_NullOuterTupleSlot != NULL)

Definition at line 134 of file nodeHashjoin.c.

Referenced by ExecHashJoinImpl(), ExecHashJoinNewBatch(), and ExecReScanHashJoin().

◆ HJ_FILL_INNER_TUPLES

#define HJ_FILL_INNER_TUPLES   5

Definition at line 128 of file nodeHashjoin.c.

Referenced by ExecHashJoinImpl().

◆ HJ_FILL_OUTER

#define HJ_FILL_OUTER (   hjstate)    ((hjstate)->hj_NullInnerTupleSlot != NULL)

◆ HJ_FILL_OUTER_TUPLE

#define HJ_FILL_OUTER_TUPLE   4

Definition at line 127 of file nodeHashjoin.c.

Referenced by ExecHashJoinImpl().

◆ HJ_NEED_NEW_BATCH

#define HJ_NEED_NEW_BATCH   6

Definition at line 129 of file nodeHashjoin.c.

Referenced by ExecHashJoinImpl().

◆ HJ_NEED_NEW_OUTER

#define HJ_NEED_NEW_OUTER   2

Definition at line 125 of file nodeHashjoin.c.

Referenced by ExecHashJoinImpl(), and ExecReScanHashJoin().

◆ HJ_SCAN_BUCKET

#define HJ_SCAN_BUCKET   3

Definition at line 126 of file nodeHashjoin.c.

Referenced by ExecHashJoinImpl().

Function Documentation

◆ ExecEndHashJoin()

void ExecEndHashJoin ( HashJoinState node)

Definition at line 778 of file nodeHashjoin.c.

References ExecClearTuple(), ExecEndNode(), ExecFreeExprContext(), ExecHashTableDestroy(), HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HashJoinState::hj_OuterTupleSlot, innerPlanState, HashJoinState::js, outerPlanState, JoinState::ps, and PlanState::ps_ResultTupleSlot.

Referenced by ExecEndNode().

779 {
780  /*
781  * Free hash table
782  */
783  if (node->hj_HashTable)
784  {
786  node->hj_HashTable = NULL;
787  }
788 
789  /*
790  * Free the exprcontext
791  */
792  ExecFreeExprContext(&node->js.ps);
793 
794  /*
795  * clean out the tuple table
796  */
800 
801  /*
802  * clean up subtrees
803  */
806 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:424
PlanState ps
Definition: execnodes.h:1769
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:538
TupleTableSlot * hj_OuterTupleSlot
Definition: execnodes.h:1883
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:617
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:976
#define outerPlanState(node)
Definition: execnodes.h:1032
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:1884
HashJoinTable hj_HashTable
Definition: execnodes.h:1878
#define innerPlanState(node)
Definition: execnodes.h:1031
JoinState js
Definition: execnodes.h:1872
void ExecHashTableDestroy(HashJoinTable hashtable)
Definition: nodeHash.c:852

◆ ExecHashJoin()

static TupleTableSlot* ExecHashJoin ( PlanState pstate)
static

Definition at line 566 of file nodeHashjoin.c.

References ExecHashJoinImpl().

Referenced by ExecInitHashJoin().

567 {
568  /*
569  * On sufficiently smart compilers this should be inlined with the
570  * parallel-aware branches removed.
571  */
572  return ExecHashJoinImpl(pstate, false);
573 }
static pg_attribute_always_inline TupleTableSlot * ExecHashJoinImpl(PlanState *pstate, bool parallel)
Definition: nodeHashjoin.c:165

◆ ExecHashJoinEstimate()

void ExecHashJoinEstimate ( HashJoinState state,
ParallelContext pcxt 
)

Definition at line 1444 of file nodeHashjoin.c.

References ParallelContext::estimator, shm_toc_estimate_chunk, and shm_toc_estimate_keys.

Referenced by ExecParallelEstimate().

1445 {
1447  shm_toc_estimate_keys(&pcxt->estimator, 1);
1448 }
shm_toc_estimator estimator
Definition: parallel.h:41
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53

◆ ExecHashJoinGetSavedTuple()

static TupleTableSlot * ExecHashJoinGetSavedTuple ( HashJoinState hjstate,
BufFile file,
uint32 hashvalue,
TupleTableSlot tupleSlot 
)
static

Definition at line 1265 of file nodeHashjoin.c.

References BufFileRead(), CHECK_FOR_INTERRUPTS, ereport, errcode_for_file_access(), errmsg(), ERROR, ExecClearTuple(), ExecForceStoreMinimalTuple(), header(), palloc(), and MinimalTupleData::t_len.

Referenced by ExecHashJoinNewBatch(), and ExecHashJoinOuterGetTuple().

1269 {
1270  uint32 header[2];
1271  size_t nread;
1272  MinimalTuple tuple;
1273 
1274  /*
1275  * We check for interrupts here because this is typically taken as an
1276  * alternative code path to an ExecProcNode() call, which would include
1277  * such a check.
1278  */
1280 
1281  /*
1282  * Since both the hash value and the MinimalTuple length word are uint32,
1283  * we can read them both in one BufFileRead() call without any type
1284  * cheating.
1285  */
1286  nread = BufFileRead(file, (void *) header, sizeof(header));
1287  if (nread == 0) /* end of file */
1288  {
1289  ExecClearTuple(tupleSlot);
1290  return NULL;
1291  }
1292  if (nread != sizeof(header))
1293  ereport(ERROR,
1295  errmsg("could not read from hash-join temporary file: %m")));
1296  *hashvalue = header[0];
1297  tuple = (MinimalTuple) palloc(header[1]);
1298  tuple->t_len = header[1];
1299  nread = BufFileRead(file,
1300  (void *) ((char *) tuple + sizeof(uint32)),
1301  header[1] - sizeof(uint32));
1302  if (nread != header[1] - sizeof(uint32))
1303  ereport(ERROR,
1305  errmsg("could not read from hash-join temporary file: %m")));
1306  ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
1307  return tupleSlot;
1308 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:424
#define ERROR
Definition: elog.h:43
void ExecForceStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1465
MinimalTupleData * MinimalTuple
Definition: htup.h:27
int errcode_for_file_access(void)
Definition: elog.c:593
unsigned int uint32
Definition: c.h:358
#define ereport(elevel, rest)
Definition: elog.h:141
static void header(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:209
void * palloc(Size size)
Definition: mcxt.c:924
int errmsg(const char *fmt,...)
Definition: elog.c:784
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:516
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99

◆ ExecHashJoinImpl()

static pg_attribute_always_inline TupleTableSlot* ExecHashJoinImpl ( PlanState pstate,
bool  parallel 
)
static

Definition at line 165 of file nodeHashjoin.c.

References Assert, BarrierArriveAndWait(), BarrierPhase(), ParallelHashJoinState::build_barrier, castNode, CHECK_FOR_INTERRUPTS, HashJoinTableData::curbatch, ExprContext::ecxt_innertuple, ExprContext::ecxt_outertuple, elog, ERROR, ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecHashGetSkewBucket(), ExecHashJoinNewBatch(), ExecHashJoinOuterGetTuple(), ExecHashJoinSaveTuple(), ExecHashTableCreate(), ExecParallelHashJoinNewBatch(), ExecParallelHashJoinOuterGetTuple(), ExecParallelHashJoinPartitionOuter(), ExecParallelScanHashBucket(), ExecPrepHashTableForUnmatched(), ExecProcNode(), ExecProject(), ExecQual(), ExecScanHashBucket(), ExecScanHashTableForUnmatched(), HashState::hashtable, heap_free_minimal_tuple(), HeapTupleHeaderSetMatch, HJ_BUILD_HASHTABLE, HashJoinState::hj_Collations, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurHashValue, HashJoinState::hj_CurSkewBucketNo, HashJoinState::hj_CurTuple, HJ_FILL_INNER, HJ_FILL_INNER_TUPLES, HJ_FILL_OUTER, HJ_FILL_OUTER_TUPLE, HashJoinState::hj_FirstOuterTupleSlot, HashJoinState::hj_HashOperators, HashJoinState::hj_HashTable, HashJoinState::hj_JoinState, HashJoinState::hj_MatchedOuter, HJ_NEED_NEW_BATCH, HJ_NEED_NEW_OUTER, HashJoinState::hj_NullInnerTupleSlot, HashJoinState::hj_NullOuterTupleSlot, HashJoinState::hj_OuterNotEmpty, HJ_SCAN_BUCKET, HJTUPLE_MINTUPLE, innerPlanState, InstrCountFiltered1, InstrCountFiltered2, INVALID_SKEW_BUCKET_NO, JOIN_ANTI, JoinState::joinqual, JoinState::jointype, HashJoinState::js, MultiExecProcNode(), HashJoinTableData::nbatch, HashJoinTableData::nbatch_outstart, HashJoinTableData::outerBatchFile, outerPlanState, HashState::parallel_state, PHJ_BUILD_DONE, PHJ_BUILD_HASHING_OUTER, PlanState::plan, JoinState::ps, HashState::ps, PlanState::ps_ExprContext, PlanState::ps_ProjInfo, PlanState::qual, ResetExprContext, JoinState::single_match, Plan::startup_cost, Plan::total_cost, HashJoinTableData::totalTuples, TupIsNull, and WAIT_EVENT_HASH_BUILD_HASHING_OUTER.

Referenced by ExecHashJoin(), and ExecParallelHashJoin().

166 {
167  HashJoinState *node = castNode(HashJoinState, pstate);
168  PlanState *outerNode;
169  HashState *hashNode;
170  ExprState *joinqual;
171  ExprState *otherqual;
172  ExprContext *econtext;
173  HashJoinTable hashtable;
174  TupleTableSlot *outerTupleSlot;
175  uint32 hashvalue;
176  int batchno;
177  ParallelHashJoinState *parallel_state;
178 
179  /*
180  * get information from HashJoin node
181  */
182  joinqual = node->js.joinqual;
183  otherqual = node->js.ps.qual;
184  hashNode = (HashState *) innerPlanState(node);
185  outerNode = outerPlanState(node);
186  hashtable = node->hj_HashTable;
187  econtext = node->js.ps.ps_ExprContext;
188  parallel_state = hashNode->parallel_state;
189 
190  /*
191  * Reset per-tuple memory context to free any expression evaluation
192  * storage allocated in the previous tuple cycle.
193  */
194  ResetExprContext(econtext);
195 
196  /*
197  * run the hash join state machine
198  */
199  for (;;)
200  {
201  /*
202  * It's possible to iterate this loop many times before returning a
203  * tuple, in some pathological cases such as needing to move much of
204  * the current batch to a later batch. So let's check for interrupts
205  * each time through.
206  */
208 
209  switch (node->hj_JoinState)
210  {
211  case HJ_BUILD_HASHTABLE:
212 
213  /*
214  * First time through: build hash table for inner relation.
215  */
216  Assert(hashtable == NULL);
217 
218  /*
219  * If the outer relation is completely empty, and it's not
220  * right/full join, we can quit without building the hash
221  * table. However, for an inner join it is only a win to
222  * check this when the outer relation's startup cost is less
223  * than the projected cost of building the hash table.
224  * Otherwise it's best to build the hash table first and see
225  * if the inner relation is empty. (When it's a left join, we
226  * should always make this check, since we aren't going to be
227  * able to skip the join on the strength of an empty inner
228  * relation anyway.)
229  *
230  * If we are rescanning the join, we make use of information
231  * gained on the previous scan: don't bother to try the
232  * prefetch if the previous scan found the outer relation
233  * nonempty. This is not 100% reliable since with new
234  * parameters the outer relation might yield different
235  * results, but it's a good heuristic.
236  *
237  * The only way to make the check is to try to fetch a tuple
238  * from the outer plan node. If we succeed, we have to stash
239  * it away for later consumption by ExecHashJoinOuterGetTuple.
240  */
241  if (HJ_FILL_INNER(node))
242  {
243  /* no chance to not build the hash table */
244  node->hj_FirstOuterTupleSlot = NULL;
245  }
246  else if (parallel)
247  {
248  /*
249  * The empty-outer optimization is not implemented for
250  * shared hash tables, because no one participant can
251  * determine that there are no outer tuples, and it's not
252  * yet clear that it's worth the synchronization overhead
253  * of reaching consensus to figure that out. So we have
254  * to build the hash table.
255  */
256  node->hj_FirstOuterTupleSlot = NULL;
257  }
258  else if (HJ_FILL_OUTER(node) ||
259  (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
260  !node->hj_OuterNotEmpty))
261  {
262  node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
264  {
265  node->hj_OuterNotEmpty = false;
266  return NULL;
267  }
268  else
269  node->hj_OuterNotEmpty = true;
270  }
271  else
272  node->hj_FirstOuterTupleSlot = NULL;
273 
274  /*
275  * Create the hash table. If using Parallel Hash, then
276  * whoever gets here first will create the hash table and any
277  * later arrivals will merely attach to it.
278  */
279  hashtable = ExecHashTableCreate(hashNode,
280  node->hj_HashOperators,
281  node->hj_Collations,
282  HJ_FILL_INNER(node));
283  node->hj_HashTable = hashtable;
284 
285  /*
286  * Execute the Hash node, to build the hash table. If using
287  * Parallel Hash, then we'll try to help hashing unless we
288  * arrived too late.
289  */
290  hashNode->hashtable = hashtable;
291  (void) MultiExecProcNode((PlanState *) hashNode);
292 
293  /*
294  * If the inner relation is completely empty, and we're not
295  * doing a left outer join, we can quit without scanning the
296  * outer relation.
297  */
298  if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
299  return NULL;
300 
301  /*
302  * need to remember whether nbatch has increased since we
303  * began scanning the outer relation
304  */
305  hashtable->nbatch_outstart = hashtable->nbatch;
306 
307  /*
308  * Reset OuterNotEmpty for scan. (It's OK if we fetched a
309  * tuple above, because ExecHashJoinOuterGetTuple will
310  * immediately set it again.)
311  */
312  node->hj_OuterNotEmpty = false;
313 
314  if (parallel)
315  {
316  Barrier *build_barrier;
317 
318  build_barrier = &parallel_state->build_barrier;
319  Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
320  BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
321  if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
322  {
323  /*
324  * If multi-batch, we need to hash the outer relation
325  * up front.
326  */
327  if (hashtable->nbatch > 1)
329  BarrierArriveAndWait(build_barrier,
331  }
332  Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
333 
334  /* Each backend should now select a batch to work on. */
335  hashtable->curbatch = -1;
337 
338  continue;
339  }
340  else
342 
343  /* FALL THRU */
344 
345  case HJ_NEED_NEW_OUTER:
346 
347  /*
348  * We don't have an outer tuple, try to get the next one
349  */
350  if (parallel)
351  outerTupleSlot =
352  ExecParallelHashJoinOuterGetTuple(outerNode, node,
353  &hashvalue);
354  else
355  outerTupleSlot =
356  ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
357 
358  if (TupIsNull(outerTupleSlot))
359  {
360  /* end of batch, or maybe whole join */
361  if (HJ_FILL_INNER(node))
362  {
363  /* set up to scan for unmatched inner tuples */
366  }
367  else
369  continue;
370  }
371 
372  econtext->ecxt_outertuple = outerTupleSlot;
373  node->hj_MatchedOuter = false;
374 
375  /*
376  * Find the corresponding bucket for this tuple in the main
377  * hash table or skew hash table.
378  */
379  node->hj_CurHashValue = hashvalue;
380  ExecHashGetBucketAndBatch(hashtable, hashvalue,
381  &node->hj_CurBucketNo, &batchno);
382  node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
383  hashvalue);
384  node->hj_CurTuple = NULL;
385 
386  /*
387  * The tuple might not belong to the current batch (where
388  * "current batch" includes the skew buckets if any).
389  */
390  if (batchno != hashtable->curbatch &&
392  {
393  bool shouldFree;
394  MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
395  &shouldFree);
396 
397  /*
398  * Need to postpone this outer tuple to a later batch.
399  * Save it in the corresponding outer-batch file.
400  */
401  Assert(parallel_state == NULL);
402  Assert(batchno > hashtable->curbatch);
403  ExecHashJoinSaveTuple(mintuple, hashvalue,
404  &hashtable->outerBatchFile[batchno]);
405 
406  if (shouldFree)
407  heap_free_minimal_tuple(mintuple);
408 
409  /* Loop around, staying in HJ_NEED_NEW_OUTER state */
410  continue;
411  }
412 
413  /* OK, let's scan the bucket for matches */
415 
416  /* FALL THRU */
417 
418  case HJ_SCAN_BUCKET:
419 
420  /*
421  * Scan the selected hash bucket for matches to current outer
422  */
423  if (parallel)
424  {
425  if (!ExecParallelScanHashBucket(node, econtext))
426  {
427  /* out of matches; check for possible outer-join fill */
429  continue;
430  }
431  }
432  else
433  {
434  if (!ExecScanHashBucket(node, econtext))
435  {
436  /* out of matches; check for possible outer-join fill */
438  continue;
439  }
440  }
441 
442  /*
443  * We've got a match, but still need to test non-hashed quals.
444  * ExecScanHashBucket already set up all the state needed to
445  * call ExecQual.
446  *
447  * If we pass the qual, then save state for next call and have
448  * ExecProject form the projection, store it in the tuple
449  * table, and return the slot.
450  *
451  * Only the joinquals determine tuple match status, but all
452  * quals must pass to actually return the tuple.
453  */
454  if (joinqual == NULL || ExecQual(joinqual, econtext))
455  {
456  node->hj_MatchedOuter = true;
458 
459  /* In an antijoin, we never return a matched tuple */
460  if (node->js.jointype == JOIN_ANTI)
461  {
463  continue;
464  }
465 
466  /*
467  * If we only need to join to the first matching inner
468  * tuple, then consider returning this one, but after that
469  * continue with next outer tuple.
470  */
471  if (node->js.single_match)
473 
474  if (otherqual == NULL || ExecQual(otherqual, econtext))
475  return ExecProject(node->js.ps.ps_ProjInfo);
476  else
477  InstrCountFiltered2(node, 1);
478  }
479  else
480  InstrCountFiltered1(node, 1);
481  break;
482 
483  case HJ_FILL_OUTER_TUPLE:
484 
485  /*
486  * The current outer tuple has run out of matches, so check
487  * whether to emit a dummy outer-join tuple. Whether we emit
488  * one or not, the next state is NEED_NEW_OUTER.
489  */
491 
492  if (!node->hj_MatchedOuter &&
493  HJ_FILL_OUTER(node))
494  {
495  /*
496  * Generate a fake join tuple with nulls for the inner
497  * tuple, and return it if it passes the non-join quals.
498  */
499  econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
500 
501  if (otherqual == NULL || ExecQual(otherqual, econtext))
502  return ExecProject(node->js.ps.ps_ProjInfo);
503  else
504  InstrCountFiltered2(node, 1);
505  }
506  break;
507 
509 
510  /*
511  * We have finished a batch, but we are doing right/full join,
512  * so any unmatched inner tuples in the hashtable have to be
513  * emitted before we continue to the next batch.
514  */
515  if (!ExecScanHashTableForUnmatched(node, econtext))
516  {
517  /* no more unmatched tuples */
519  continue;
520  }
521 
522  /*
523  * Generate a fake join tuple with nulls for the outer tuple,
524  * and return it if it passes the non-join quals.
525  */
526  econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
527 
528  if (otherqual == NULL || ExecQual(otherqual, econtext))
529  return ExecProject(node->js.ps.ps_ProjInfo);
530  else
531  InstrCountFiltered2(node, 1);
532  break;
533 
534  case HJ_NEED_NEW_BATCH:
535 
536  /*
537  * Try to advance to next batch. Done if there are no more.
538  */
539  if (parallel)
540  {
541  if (!ExecParallelHashJoinNewBatch(node))
542  return NULL; /* end of parallel-aware join */
543  }
544  else
545  {
546  if (!ExecHashJoinNewBatch(node))
547  return NULL; /* end of parallel-oblivious join */
548  }
550  break;
551 
552  default:
553  elog(ERROR, "unrecognized hashjoin state: %d",
554  (int) node->hj_JoinState);
555  }
556  }
557 }
JoinType jointype
Definition: execnodes.h:1770
struct ParallelHashJoinState * parallel_state
Definition: execnodes.h:2220
#define HJ_NEED_NEW_BATCH
Definition: nodeHashjoin.c:129
#define INVALID_SKEW_BUCKET_NO
Definition: hashjoin.h:109
#define HJ_SCAN_BUCKET
Definition: nodeHashjoin.c:126
TupleTableSlot * hj_NullInnerTupleSlot
Definition: execnodes.h:1886
ExprState * joinqual
Definition: execnodes.h:1773
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:978
static TupleTableSlot * ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue)
Definition: nodeHashjoin.c:895
PlanState ps
Definition: execnodes.h:1769
#define castNode(_type_, nodeptr)
Definition: nodes.h:593
bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
Definition: nodeHash.c:2058
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1639
void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
Definition: nodeHash.c:2034
ExprContext * ps_ExprContext
Definition: execnodes.h:977
bool single_match
Definition: execnodes.h:1771
HashJoinTable hashtable
Definition: execnodes.h:2212
bool hj_MatchedOuter
Definition: execnodes.h:1889
static TupleTableSlot * ExecHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue)
Definition: nodeHashjoin.c:821
static bool ExecQual(ExprState *state, ExprContext *econtext)
Definition: executor.h:369
TupleTableSlot * hj_FirstOuterTupleSlot
Definition: execnodes.h:1887
#define PHJ_BUILD_HASHING_OUTER
Definition: hashjoin.h:260
int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
Definition: nodeHash.c:2359
HashJoinTable ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, bool keepNulls)
Definition: nodeHash.c:428
#define HJ_FILL_INNER(hjstate)
Definition: nodeHashjoin.c:134
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
Definition: nodeHash.c:1890
uint32 hj_CurHashValue
Definition: execnodes.h:1879
int hj_CurSkewBucketNo
Definition: execnodes.h:1881
#define ERROR
Definition: elog.h:43
TupleTableSlot * hj_NullOuterTupleSlot
Definition: execnodes.h:1885
static void ExecParallelHashJoinPartitionOuter(HashJoinState *node)
void heap_free_minimal_tuple(MinimalTuple mtup)
Definition: heaptuple.c:1427
BufFile ** outerBatchFile
Definition: hashjoin.h:330
#define outerPlanState(node)
Definition: execnodes.h:1032
Cost startup_cost
Definition: plannodes.h:121
HashJoinTuple hj_CurTuple
Definition: execnodes.h:1882
bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
Definition: nodeHash.c:1922
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:224
#define TupIsNull(slot)
Definition: tuptable.h:293
unsigned int uint32
Definition: c.h:358
PlanState ps
Definition: execnodes.h:2211
#define InstrCountFiltered1(node, delta)
Definition: execnodes.h:1040
List * hj_HashOperators
Definition: execnodes.h:1876
#define PHJ_BUILD_DONE
Definition: hashjoin.h:261
int hj_CurBucketNo
Definition: execnodes.h:1880
#define HJ_FILL_INNER_TUPLES
Definition: nodeHashjoin.c:128
#define HJ_FILL_OUTER(hjstate)
Definition: nodeHashjoin.c:132
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:239
List * hj_Collations
Definition: execnodes.h:1877
Plan * plan
Definition: execnodes.h:938
double totalTuples
Definition: hashjoin.h:318
#define HJTUPLE_MINTUPLE(hjtup)
Definition: hashjoin.h:80
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:226
bool hj_OuterNotEmpty
Definition: execnodes.h:1890
#define Assert(condition)
Definition: c.h:732
#define InstrCountFiltered2(node, delta)
Definition: execnodes.h:1045
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:243
static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
#define HJ_NEED_NEW_OUTER
Definition: nodeHashjoin.c:125
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
Definition: barrier.c:125
ExprState * qual
Definition: execnodes.h:959
#define HJ_BUILD_HASHTABLE
Definition: nodeHashjoin.c:124
HashJoinTable hj_HashTable
Definition: execnodes.h:1878
Node * MultiExecProcNode(PlanState *node)
Definition: execProcnode.c:483
#define elog(elevel,...)
Definition: elog.h:226
Cost total_cost
Definition: plannodes.h:122
#define HeapTupleHeaderSetMatch(tup)
Definition: htup_details.h:521
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
#define HJ_FILL_OUTER_TUPLE
Definition: nodeHashjoin.c:127
#define innerPlanState(node)
Definition: execnodes.h:1031
bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
Definition: nodeHash.c:1983
JoinState js
Definition: execnodes.h:1872
static bool ExecHashJoinNewBatch(HashJoinState *hjstate)
Definition: nodeHashjoin.c:960
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr)
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
Definition: executor.h:332
#define ResetExprContext(econtext)
Definition: executor.h:499

◆ ExecHashJoinInitializeDSM()

void ExecHashJoinInitializeDSM ( HashJoinState state,
ParallelContext pcxt 
)

Definition at line 1451 of file nodeHashjoin.c.

References BarrierInit(), ParallelHashJoinState::batches, ParallelHashJoinState::build_barrier, ParallelHashJoinState::chunk_work_queue, ParallelHashJoinState::distributor, ExecParallelHashJoin(), ExecSetExecProcNode(), ParallelHashJoinState::fileset, ParallelHashJoinState::grow_batches_barrier, ParallelHashJoinState::grow_buckets_barrier, ParallelHashJoinState::growth, innerPlanState, InvalidDsaPointer, HashJoinState::js, ParallelHashJoinState::lock, LWLockInitialize(), LWTRANCHE_PARALLEL_HASH_JOIN, ParallelHashJoinState::nbatch, ParallelHashJoinState::nbuckets, ParallelHashJoinState::nparticipants, ParallelContext::nworkers, ParallelHashJoinState::old_batches, HashState::parallel_state, pg_atomic_init_u32(), PHJ_GROWTH_OK, PlanState::plan, Plan::plan_node_id, JoinState::ps, ParallelContext::seg, SharedFileSetInit(), shm_toc_allocate(), shm_toc_insert(), ParallelHashJoinState::space_allowed, ParallelContext::toc, and ParallelHashJoinState::total_tuples.

Referenced by ExecParallelInitializeDSM().

1452 {
1453  int plan_node_id = state->js.ps.plan->plan_node_id;
1454  HashState *hashNode;
1455  ParallelHashJoinState *pstate;
1456 
1457  /*
1458  * Disable shared hash table mode if we failed to create a real DSM
1459  * segment, because that means that we don't have a DSA area to work with.
1460  */
1461  if (pcxt->seg == NULL)
1462  return;
1463 
1465 
1466  /*
1467  * Set up the state needed to coordinate access to the shared hash
1468  * table(s), using the plan node ID as the toc key.
1469  */
1470  pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
1471  shm_toc_insert(pcxt->toc, plan_node_id, pstate);
1472 
1473  /*
1474  * Set up the shared hash join state with no batches initially.
1475  * ExecHashTableCreate() will prepare at least one later and set nbatch
1476  * and space_allowed.
1477  */
1478  pstate->nbatch = 0;
1479  pstate->space_allowed = 0;
1480  pstate->batches = InvalidDsaPointer;
1481  pstate->old_batches = InvalidDsaPointer;
1482  pstate->nbuckets = 0;
1483  pstate->growth = PHJ_GROWTH_OK;
1485  pg_atomic_init_u32(&pstate->distributor, 0);
1486  pstate->nparticipants = pcxt->nworkers + 1;
1487  pstate->total_tuples = 0;
1488  LWLockInitialize(&pstate->lock,
1490  BarrierInit(&pstate->build_barrier, 0);
1491  BarrierInit(&pstate->grow_batches_barrier, 0);
1492  BarrierInit(&pstate->grow_buckets_barrier, 0);
1493 
1494  /* Set up the space we'll use for shared temporary files. */
1495  SharedFileSetInit(&pstate->fileset, pcxt->seg);
1496 
1497  /* Initialize the shared state in the hash node. */
1498  hashNode = (HashState *) innerPlanState(state);
1499  hashNode->parallel_state = pstate;
1500 }
struct ParallelHashJoinState * parallel_state
Definition: execnodes.h:2220
dsa_pointer chunk_work_queue
Definition: hashjoin.h:242
#define InvalidDsaPointer
Definition: dsa.h:78
void BarrierInit(Barrier *barrier, int participants)
Definition: barrier.c:100
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:49
PlanState ps
Definition: execnodes.h:1769
dsm_segment * seg
Definition: parallel.h:42
int plan_node_id
Definition: plannodes.h:139
SharedFileSet fileset
Definition: hashjoin.h:253
Barrier grow_buckets_barrier
Definition: hashjoin.h:250
dsa_pointer batches
Definition: hashjoin.h:236
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:678
void ExecSetExecProcNode(PlanState *node, ExecProcNodeMtd function)
Definition: execProcnode.c:406
Plan * plan
Definition: execnodes.h:938
ParallelHashGrowth growth
Definition: hashjoin.h:241
dsa_pointer old_batches
Definition: hashjoin.h:237
static TupleTableSlot * ExecParallelHashJoin(PlanState *pstate)
Definition: nodeHashjoin.c:582
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
pg_atomic_uint32 distributor
Definition: hashjoin.h:251
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:226
#define innerPlanState(node)
Definition: execnodes.h:1031
JoinState js
Definition: execnodes.h:1872
Barrier grow_batches_barrier
Definition: hashjoin.h:249
shm_toc * toc
Definition: parallel.h:44

◆ ExecHashJoinInitializeWorker()

void ExecHashJoinInitializeWorker ( HashJoinState state,
ParallelWorkerContext pwcxt 
)

Definition at line 1542 of file nodeHashjoin.c.

References ExecParallelHashJoin(), ExecSetExecProcNode(), ParallelHashJoinState::fileset, innerPlanState, HashJoinState::js, HashState::parallel_state, PlanState::plan, Plan::plan_node_id, JoinState::ps, ParallelWorkerContext::seg, SharedFileSetAttach(), shm_toc_lookup(), and ParallelWorkerContext::toc.

Referenced by ExecParallelInitializeWorker().

1544 {
1545  HashState *hashNode;
1546  int plan_node_id = state->js.ps.plan->plan_node_id;
1547  ParallelHashJoinState *pstate =
1548  shm_toc_lookup(pwcxt->toc, plan_node_id, false);
1549 
1550  /* Attach to the space for shared temporary files. */
1551  SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
1552 
1553  /* Attach to the shared state in the hash node. */
1554  hashNode = (HashState *) innerPlanState(state);
1555  hashNode->parallel_state = pstate;
1556 
1558 }
struct ParallelHashJoinState * parallel_state
Definition: execnodes.h:2220
PlanState ps
Definition: execnodes.h:1769
int plan_node_id
Definition: plannodes.h:139
SharedFileSet fileset
Definition: hashjoin.h:253
void ExecSetExecProcNode(PlanState *node, ExecProcNodeMtd function)
Definition: execProcnode.c:406
Plan * plan
Definition: execnodes.h:938
static TupleTableSlot * ExecParallelHashJoin(PlanState *pstate)
Definition: nodeHashjoin.c:582
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:78
#define innerPlanState(node)
Definition: execnodes.h:1031
JoinState js
Definition: execnodes.h:1872
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
dsm_segment * seg
Definition: parallel.h:52

◆ ExecHashJoinNewBatch()

static bool ExecHashJoinNewBatch ( HashJoinState hjstate)
static

Definition at line 960 of file nodeHashjoin.c.

References BufFileClose(), BufFileSeek(), HashJoinTableData::curbatch, ereport, errcode_for_file_access(), errmsg(), ERROR, ExecHashJoinGetSavedTuple(), ExecHashTableInsert(), ExecHashTableReset(), HJ_FILL_INNER, HJ_FILL_OUTER, HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, HashJoinTableData::innerBatchFile, HashJoinTableData::nbatch, HashJoinTableData::nbatch_original, HashJoinTableData::nbatch_outstart, HashJoinTableData::nSkewBuckets, HashJoinTableData::outerBatchFile, HashJoinTableData::skewBucket, HashJoinTableData::skewBucketNums, HashJoinTableData::skewEnabled, and HashJoinTableData::spaceUsedSkew.

Referenced by ExecHashJoinImpl().

961 {
962  HashJoinTable hashtable = hjstate->hj_HashTable;
963  int nbatch;
964  int curbatch;
965  BufFile *innerFile;
966  TupleTableSlot *slot;
967  uint32 hashvalue;
968 
969  nbatch = hashtable->nbatch;
970  curbatch = hashtable->curbatch;
971 
972  if (curbatch > 0)
973  {
974  /*
975  * We no longer need the previous outer batch file; close it right
976  * away to free disk space.
977  */
978  if (hashtable->outerBatchFile[curbatch])
979  BufFileClose(hashtable->outerBatchFile[curbatch]);
980  hashtable->outerBatchFile[curbatch] = NULL;
981  }
982  else /* we just finished the first batch */
983  {
984  /*
985  * Reset some of the skew optimization state variables, since we no
986  * longer need to consider skew tuples after the first batch. The
987  * memory context reset we are about to do will release the skew
988  * hashtable itself.
989  */
990  hashtable->skewEnabled = false;
991  hashtable->skewBucket = NULL;
992  hashtable->skewBucketNums = NULL;
993  hashtable->nSkewBuckets = 0;
994  hashtable->spaceUsedSkew = 0;
995  }
996 
997  /*
998  * We can always skip over any batches that are completely empty on both
999  * sides. We can sometimes skip over batches that are empty on only one
1000  * side, but there are exceptions:
1001  *
1002  * 1. In a left/full outer join, we have to process outer batches even if
1003  * the inner batch is empty. Similarly, in a right/full outer join, we
1004  * have to process inner batches even if the outer batch is empty.
1005  *
1006  * 2. If we have increased nbatch since the initial estimate, we have to
1007  * scan inner batches since they might contain tuples that need to be
1008  * reassigned to later inner batches.
1009  *
1010  * 3. Similarly, if we have increased nbatch since starting the outer
1011  * scan, we have to rescan outer batches in case they contain tuples that
1012  * need to be reassigned.
1013  */
1014  curbatch++;
1015  while (curbatch < nbatch &&
1016  (hashtable->outerBatchFile[curbatch] == NULL ||
1017  hashtable->innerBatchFile[curbatch] == NULL))
1018  {
1019  if (hashtable->outerBatchFile[curbatch] &&
1020  HJ_FILL_OUTER(hjstate))
1021  break; /* must process due to rule 1 */
1022  if (hashtable->innerBatchFile[curbatch] &&
1023  HJ_FILL_INNER(hjstate))
1024  break; /* must process due to rule 1 */
1025  if (hashtable->innerBatchFile[curbatch] &&
1026  nbatch != hashtable->nbatch_original)
1027  break; /* must process due to rule 2 */
1028  if (hashtable->outerBatchFile[curbatch] &&
1029  nbatch != hashtable->nbatch_outstart)
1030  break; /* must process due to rule 3 */
1031  /* We can ignore this batch. */
1032  /* Release associated temp files right away. */
1033  if (hashtable->innerBatchFile[curbatch])
1034  BufFileClose(hashtable->innerBatchFile[curbatch]);
1035  hashtable->innerBatchFile[curbatch] = NULL;
1036  if (hashtable->outerBatchFile[curbatch])
1037  BufFileClose(hashtable->outerBatchFile[curbatch]);
1038  hashtable->outerBatchFile[curbatch] = NULL;
1039  curbatch++;
1040  }
1041 
1042  if (curbatch >= nbatch)
1043  return false; /* no more batches */
1044 
1045  hashtable->curbatch = curbatch;
1046 
1047  /*
1048  * Reload the hash table with the new inner batch (which could be empty)
1049  */
1050  ExecHashTableReset(hashtable);
1051 
1052  innerFile = hashtable->innerBatchFile[curbatch];
1053 
1054  if (innerFile != NULL)
1055  {
1056  if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))
1057  ereport(ERROR,
1059  errmsg("could not rewind hash-join temporary file: %m")));
1060 
1061  while ((slot = ExecHashJoinGetSavedTuple(hjstate,
1062  innerFile,
1063  &hashvalue,
1064  hjstate->hj_HashTupleSlot)))
1065  {
1066  /*
1067  * NOTE: some tuples may be sent to future batches. Also, it is
1068  * possible for hashtable->nbatch to be increased here!
1069  */
1070  ExecHashTableInsert(hashtable, slot, hashvalue);
1071  }
1072 
1073  /*
1074  * after we build the hash table, the inner batch file is no longer
1075  * needed
1076  */
1077  BufFileClose(innerFile);
1078  hashtable->innerBatchFile[curbatch] = NULL;
1079  }
1080 
1081  /*
1082  * Rewind outer batch file (if present), so that we can start reading it.
1083  */
1084  if (hashtable->outerBatchFile[curbatch] != NULL)
1085  {
1086  if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
1087  ereport(ERROR,
1089  errmsg("could not rewind hash-join temporary file: %m")));
1090  }
1091 
1092  return true;
1093 }
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:638
void ExecHashTableReset(HashJoinTable hashtable)
Definition: nodeHash.c:2129
void BufFileClose(BufFile *file)
Definition: buffile.c:379
#define HJ_FILL_INNER(hjstate)
Definition: nodeHashjoin.c:134
int * skewBucketNums
Definition: hashjoin.h:308
void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
Definition: nodeHash.c:1592
#define ERROR
Definition: elog.h:43
BufFile ** outerBatchFile
Definition: hashjoin.h:330
int errcode_for_file_access(void)
Definition: elog.c:593
unsigned int uint32
Definition: c.h:358
#define ereport(elevel, rest)
Definition: elog.h:141
#define HJ_FILL_OUTER(hjstate)
Definition: nodeHashjoin.c:132
HashSkewBucket ** skewBucket
Definition: hashjoin.h:305
BufFile ** innerBatchFile
Definition: hashjoin.h:329
static TupleTableSlot * ExecHashJoinGetSavedTuple(HashJoinState *hjstate, BufFile *file, uint32 *hashvalue, TupleTableSlot *tupleSlot)
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:1884
HashJoinTable hj_HashTable
Definition: execnodes.h:1878
int errmsg(const char *fmt,...)
Definition: elog.c:784

◆ ExecHashJoinOuterGetTuple()

static TupleTableSlot * ExecHashJoinOuterGetTuple ( PlanState outerNode,
HashJoinState hjstate,
uint32 hashvalue 
)
static

Definition at line 821 of file nodeHashjoin.c.

References HashJoinTableData::curbatch, ExprContext::ecxt_outertuple, ExecHashGetHashValue(), ExecHashJoinGetSavedTuple(), ExecProcNode(), HJ_FILL_OUTER, HashJoinState::hj_FirstOuterTupleSlot, HashJoinState::hj_HashTable, HashJoinState::hj_OuterHashKeys, HashJoinState::hj_OuterNotEmpty, HashJoinState::hj_OuterTupleSlot, HashJoinState::js, HashJoinTableData::outerBatchFile, JoinState::ps, PlanState::ps_ExprContext, and TupIsNull.

Referenced by ExecHashJoinImpl().

824 {
825  HashJoinTable hashtable = hjstate->hj_HashTable;
826  int curbatch = hashtable->curbatch;
827  TupleTableSlot *slot;
828 
829  if (curbatch == 0) /* if it is the first pass */
830  {
831  /*
832  * Check to see if first outer tuple was already fetched by
833  * ExecHashJoin() and not used yet.
834  */
835  slot = hjstate->hj_FirstOuterTupleSlot;
836  if (!TupIsNull(slot))
837  hjstate->hj_FirstOuterTupleSlot = NULL;
838  else
839  slot = ExecProcNode(outerNode);
840 
841  while (!TupIsNull(slot))
842  {
843  /*
844  * We have to compute the tuple's hash value.
845  */
846  ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
847 
848  econtext->ecxt_outertuple = slot;
849  if (ExecHashGetHashValue(hashtable, econtext,
850  hjstate->hj_OuterHashKeys,
851  true, /* outer tuple */
852  HJ_FILL_OUTER(hjstate),
853  hashvalue))
854  {
855  /* remember outer relation is not empty for possible rescan */
856  hjstate->hj_OuterNotEmpty = true;
857 
858  return slot;
859  }
860 
861  /*
862  * That tuple couldn't match because of a NULL, so discard it and
863  * continue with the next one.
864  */
865  slot = ExecProcNode(outerNode);
866  }
867  }
868  else if (curbatch < hashtable->nbatch)
869  {
870  BufFile *file = hashtable->outerBatchFile[curbatch];
871 
872  /*
873  * In outer-join cases, we could get here even though the batch file
874  * is empty.
875  */
876  if (file == NULL)
877  return NULL;
878 
879  slot = ExecHashJoinGetSavedTuple(hjstate,
880  file,
881  hashvalue,
882  hjstate->hj_OuterTupleSlot);
883  if (!TupIsNull(slot))
884  return slot;
885  }
886 
887  /* End of this batch */
888  return NULL;
889 }
PlanState ps
Definition: execnodes.h:1769
ExprContext * ps_ExprContext
Definition: execnodes.h:977
TupleTableSlot * hj_OuterTupleSlot
Definition: execnodes.h:1883
List * hj_OuterHashKeys
Definition: execnodes.h:1874
TupleTableSlot * hj_FirstOuterTupleSlot
Definition: execnodes.h:1887
BufFile ** outerBatchFile
Definition: hashjoin.h:330
#define TupIsNull(slot)
Definition: tuptable.h:293
#define HJ_FILL_OUTER(hjstate)
Definition: nodeHashjoin.c:132
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:239
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:226
bool hj_OuterNotEmpty
Definition: execnodes.h:1890
static TupleTableSlot * ExecHashJoinGetSavedTuple(HashJoinState *hjstate, BufFile *file, uint32 *hashvalue, TupleTableSlot *tupleSlot)
HashJoinTable hj_HashTable
Definition: execnodes.h:1878
bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, bool outer_tuple, bool keep_nulls, uint32 *hashvalue)
Definition: nodeHash.c:1786
JoinState js
Definition: execnodes.h:1872

◆ ExecHashJoinReInitializeDSM()

void ExecHashJoinReInitializeDSM ( HashJoinState state,
ParallelContext cxt 
)

Definition at line 1509 of file nodeHashjoin.c.

References BarrierInit(), ParallelHashJoinState::build_barrier, ExecHashTableDetach(), ExecHashTableDetachBatch(), ParallelHashJoinState::fileset, HashJoinState::hj_HashTable, HashJoinState::js, PlanState::plan, Plan::plan_node_id, JoinState::ps, SharedFileSetDeleteAll(), shm_toc_lookup(), and ParallelContext::toc.

Referenced by ExecParallelReInitializeDSM().

1510 {
1511  int plan_node_id = state->js.ps.plan->plan_node_id;
1512  ParallelHashJoinState *pstate =
1513  shm_toc_lookup(cxt->toc, plan_node_id, false);
1514 
1515  /*
1516  * It would be possible to reuse the shared hash table in single-batch
1517  * cases by resetting and then fast-forwarding build_barrier to
1518  * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but
1519  * currently shared hash tables are already freed by now (by the last
1520  * participant to detach from the batch). We could consider keeping it
1521  * around for single-batch joins. We'd also need to adjust
1522  * finalize_plan() so that it doesn't record a dummy dependency for
1523  * Parallel Hash nodes, preventing the rescan optimization. For now we
1524  * don't try.
1525  */
1526 
1527  /* Detach, freeing any remaining shared memory. */
1528  if (state->hj_HashTable != NULL)
1529  {
1532  }
1533 
1534  /* Clear any shared batch files. */
1535  SharedFileSetDeleteAll(&pstate->fileset);
1536 
1537  /* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */
1538  BarrierInit(&pstate->build_barrier, 0);
1539 }
void BarrierInit(Barrier *barrier, int participants)
Definition: barrier.c:100
PlanState ps
Definition: execnodes.h:1769
void ExecHashTableDetachBatch(HashJoinTable hashtable)
Definition: nodeHash.c:3079
void ExecHashTableDetach(HashJoinTable hashtable)
Definition: nodeHash.c:3136
int plan_node_id
Definition: plannodes.h:139
SharedFileSet fileset
Definition: hashjoin.h:253
Plan * plan
Definition: execnodes.h:938
void SharedFileSetDeleteAll(SharedFileSet *fileset)
HashJoinTable hj_HashTable
Definition: execnodes.h:1878
JoinState js
Definition: execnodes.h:1872
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
shm_toc * toc
Definition: parallel.h:44

◆ ExecHashJoinSaveTuple()

void ExecHashJoinSaveTuple ( MinimalTuple  tuple,
uint32  hashvalue,
BufFile **  fileptr 
)

Definition at line 1231 of file nodeHashjoin.c.

References BufFileCreateTemp(), BufFileWrite(), ereport, errcode_for_file_access(), errmsg(), ERROR, and MinimalTupleData::t_len.

Referenced by ExecHashIncreaseNumBatches(), ExecHashJoinImpl(), ExecHashRemoveNextSkewBucket(), and ExecHashTableInsert().

1233 {
1234  BufFile *file = *fileptr;
1235  size_t written;
1236 
1237  if (file == NULL)
1238  {
1239  /* First write to this batch file, so open it. */
1240  file = BufFileCreateTemp(false);
1241  *fileptr = file;
1242  }
1243 
1244  written = BufFileWrite(file, (void *) &hashvalue, sizeof(uint32));
1245  if (written != sizeof(uint32))
1246  ereport(ERROR,
1248  errmsg("could not write to hash-join temporary file: %m")));
1249 
1250  written = BufFileWrite(file, (void *) tuple, tuple->t_len);
1251  if (written != tuple->t_len)
1252  ereport(ERROR,
1254  errmsg("could not write to hash-join temporary file: %m")));
1255 }
#define ERROR
Definition: elog.h:43
BufFile * BufFileCreateTemp(bool interXact)
Definition: buffile.c:183
int errcode_for_file_access(void)
Definition: elog.c:593
unsigned int uint32
Definition: c.h:358
#define ereport(elevel, rest)
Definition: elog.h:141
int errmsg(const char *fmt,...)
Definition: elog.c:784
size_t BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:563

◆ ExecInitHashJoin()

HashJoinState* ExecInitHashJoin ( HashJoin node,
EState estate,
int  eflags 
)

Definition at line 598 of file nodeHashjoin.c.

References OpExpr::args, Assert, elog, ERROR, EXEC_FLAG_BACKWARD, EXEC_FLAG_MARK, ExecAssignExprContext(), ExecAssignProjectionInfo(), ExecGetResultSlotOps(), ExecGetResultType(), ExecHashJoin(), ExecInitExpr(), ExecInitExtraTupleSlot(), ExecInitNode(), ExecInitNullTupleSlot(), ExecInitQual(), ExecInitResultTupleSlotTL(), PlanState::ExecProcNode, HashJoin::hashclauses, HJ_BUILD_HASHTABLE, HashJoinState::hj_HashTupleSlot, HashJoinState::hj_NullInnerTupleSlot, HashJoinState::hj_NullOuterTupleSlot, HashJoinState::hj_OuterTupleSlot, Join::inner_unique, innerPlan, innerPlanState, OpExpr::inputcollid, INVALID_SKEW_BUCKET_NO, HashJoin::join, JOIN_ANTI, JOIN_FULL, JOIN_INNER, JOIN_LEFT, JOIN_RIGHT, JOIN_SEMI, Join::joinqual, Join::jointype, JoinState::jointype, HashJoinState::js, lappend(), lappend_oid(), lfirst_node, linitial, lsecond, makeNode, NIL, OpExpr::opno, outerPlan, outerPlanState, Join::plan, PlanState::plan, JoinState::ps, HashState::ps, PlanState::ps_ResultTupleSlot, Plan::qual, PlanState::qual, JoinState::single_match, PlanState::state, and TTSOpsVirtual.

Referenced by ExecInitNode().

599 {
600  HashJoinState *hjstate;
601  Plan *outerNode;
602  Hash *hashNode;
603  List *lclauses;
604  List *rclauses;
605  List *rhclauses;
606  List *hoperators;
607  List *hcollations;
608  TupleDesc outerDesc,
609  innerDesc;
610  ListCell *l;
611  const TupleTableSlotOps *ops;
612 
613  /* check for unsupported flags */
614  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
615 
616  /*
617  * create state structure
618  */
619  hjstate = makeNode(HashJoinState);
620  hjstate->js.ps.plan = (Plan *) node;
621  hjstate->js.ps.state = estate;
622 
623  /*
624  * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
625  * where this function may be replaced with a parallel version, if we
626  * managed to launch a parallel query.
627  */
628  hjstate->js.ps.ExecProcNode = ExecHashJoin;
629  hjstate->js.jointype = node->join.jointype;
630 
631  /*
632  * Miscellaneous initialization
633  *
634  * create expression context for node
635  */
636  ExecAssignExprContext(estate, &hjstate->js.ps);
637 
638  /*
639  * initialize child nodes
640  *
641  * Note: we could suppress the REWIND flag for the inner input, which
642  * would amount to betting that the hash will be a single batch. Not
643  * clear if this would be a win or not.
644  */
645  outerNode = outerPlan(node);
646  hashNode = (Hash *) innerPlan(node);
647 
648  outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
649  outerDesc = ExecGetResultType(outerPlanState(hjstate));
650  innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
651  innerDesc = ExecGetResultType(innerPlanState(hjstate));
652 
653  /*
654  * Initialize result slot, type and projection.
655  */
657  ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
658 
659  /*
660  * tuple table initialization
661  */
662  ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL);
663  hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc,
664  ops);
665 
666  /*
667  * detect whether we need only consider the first matching inner tuple
668  */
669  hjstate->js.single_match = (node->join.inner_unique ||
670  node->join.jointype == JOIN_SEMI);
671 
672  /* set up null tuples for outer joins, if needed */
673  switch (node->join.jointype)
674  {
675  case JOIN_INNER:
676  case JOIN_SEMI:
677  break;
678  case JOIN_LEFT:
679  case JOIN_ANTI:
680  hjstate->hj_NullInnerTupleSlot =
681  ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
682  break;
683  case JOIN_RIGHT:
684  hjstate->hj_NullOuterTupleSlot =
685  ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
686  break;
687  case JOIN_FULL:
688  hjstate->hj_NullOuterTupleSlot =
689  ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
690  hjstate->hj_NullInnerTupleSlot =
691  ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
692  break;
693  default:
694  elog(ERROR, "unrecognized join type: %d",
695  (int) node->join.jointype);
696  }
697 
698  /*
699  * now for some voodoo. our temporary tuple slot is actually the result
700  * tuple slot of the Hash node (which is our inner plan). we can do this
701  * because Hash nodes don't return tuples via ExecProcNode() -- instead
702  * the hash join node uses ExecScanHashBucket() to get at the contents of
703  * the hash table. -cim 6/9/91
704  */
705  {
706  HashState *hashstate = (HashState *) innerPlanState(hjstate);
707  TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot;
708 
709  hjstate->hj_HashTupleSlot = slot;
710  }
711 
712  /*
713  * initialize child expressions
714  */
715  hjstate->js.ps.qual =
716  ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
717  hjstate->js.joinqual =
718  ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
719  hjstate->hashclauses =
720  ExecInitQual(node->hashclauses, (PlanState *) hjstate);
721 
722  /*
723  * initialize hash-specific info
724  */
725  hjstate->hj_HashTable = NULL;
726  hjstate->hj_FirstOuterTupleSlot = NULL;
727 
728  hjstate->hj_CurHashValue = 0;
729  hjstate->hj_CurBucketNo = 0;
731  hjstate->hj_CurTuple = NULL;
732 
733  /*
734  * Deconstruct the hash clauses into outer and inner argument values, so
735  * that we can evaluate those subexpressions separately. Also make a list
736  * of the hash operator OIDs, in preparation for looking up the hash
737  * functions to use.
738  */
739  lclauses = NIL;
740  rclauses = NIL;
741  rhclauses = NIL;
742  hoperators = NIL;
743  hcollations = NIL;
744  foreach(l, node->hashclauses)
745  {
746  OpExpr *hclause = lfirst_node(OpExpr, l);
747 
748  lclauses = lappend(lclauses, ExecInitExpr(linitial(hclause->args),
749  (PlanState *) hjstate));
750  rclauses = lappend(rclauses, ExecInitExpr(lsecond(hclause->args),
751  (PlanState *) hjstate));
752  rhclauses = lappend(rhclauses, ExecInitExpr(lsecond(hclause->args),
753  innerPlanState(hjstate)));
754  hoperators = lappend_oid(hoperators, hclause->opno);
755  hcollations = lappend_oid(hcollations, hclause->inputcollid);
756  }
757  hjstate->hj_OuterHashKeys = lclauses;
758  hjstate->hj_InnerHashKeys = rclauses;
759  hjstate->hj_HashOperators = hoperators;
760  hjstate->hj_Collations = hcollations;
761  /* child Hash node needs to evaluate inner hash keys, too */
762  ((HashState *) innerPlanState(hjstate))->hashkeys = rhclauses;
763 
764  hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
765  hjstate->hj_MatchedOuter = false;
766  hjstate->hj_OuterNotEmpty = false;
767 
768  return hjstate;
769 }
JoinType jointype
Definition: execnodes.h:1770
#define NIL
Definition: pg_list.h:69
List * qual
Definition: plannodes.h:141
#define INVALID_SKEW_BUCKET_NO
Definition: hashjoin.h:109
TupleTableSlot * hj_NullInnerTupleSlot
Definition: execnodes.h:1886
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1778
ExprState * joinqual
Definition: execnodes.h:1773
const TupleTableSlotOps * ExecGetResultSlotOps(PlanState *planstate, bool *isfixed)
Definition: execUtils.c:466
PlanState ps
Definition: execnodes.h:1769
List * hashclauses
Definition: plannodes.h:739
bool single_match
Definition: execnodes.h:1771
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
bool hj_MatchedOuter
Definition: execnodes.h:1889
EState * state
Definition: execnodes.h:940
TupleTableSlot * hj_OuterTupleSlot
Definition: execnodes.h:1883
List * hj_OuterHashKeys
Definition: execnodes.h:1874
List * lappend_oid(List *list, Oid datum)
Definition: list.c:164
TupleTableSlot * hj_FirstOuterTupleSlot
Definition: execnodes.h:1887
#define lsecond(l)
Definition: pg_list.h:116
Join join
Definition: plannodes.h:738
ExprState * ExecInitQual(List *qual, PlanState *parent)
Definition: execExpr.c:206
JoinType jointype
Definition: plannodes.h:680
uint32 hj_CurHashValue
Definition: execnodes.h:1879
int hj_CurSkewBucketNo
Definition: execnodes.h:1881
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:976
#define linitial(l)
Definition: pg_list.h:111
#define ERROR
Definition: elog.h:43
TupleTableSlot * hj_NullOuterTupleSlot
Definition: execnodes.h:1885
#define EXEC_FLAG_BACKWARD
Definition: executor.h:57
#define lfirst_node(type, lc)
Definition: pg_list.h:109
#define outerPlanState(node)
Definition: execnodes.h:1032
#define innerPlan(node)
Definition: plannodes.h:169
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
Definition: execUtils.c:502
HashJoinTuple hj_CurTuple
Definition: execnodes.h:1882
PlanState ps
Definition: execnodes.h:2211
List * hj_HashOperators
Definition: execnodes.h:1876
#define outerPlan(node)
Definition: plannodes.h:170
List * lappend(List *list, void *datum)
Definition: list.c:128
int hj_CurBucketNo
Definition: execnodes.h:1880
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:944
List * hj_Collations
Definition: execnodes.h:1877
Plan * plan
Definition: execnodes.h:938
#define makeNode(_type_)
Definition: nodes.h:572
bool hj_OuterNotEmpty
Definition: execnodes.h:1890
#define Assert(condition)
Definition: c.h:732
#define EXEC_FLAG_MARK
Definition: executor.h:58
TupleTableSlot * ExecInitNullTupleSlot(EState *estate, TupleDesc tupType, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1794
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:447
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1746
Oid inputcollid
Definition: primnodes.h:509
TupleDesc ExecGetResultType(PlanState *planstate)
Definition: execUtils.c:457
ExprState * qual
Definition: execnodes.h:959
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:1884
List * hj_InnerHashKeys
Definition: execnodes.h:1875
#define HJ_BUILD_HASHTABLE
Definition: nodeHashjoin.c:124
HashJoinTable hj_HashTable
Definition: execnodes.h:1878
#define elog(elevel,...)
Definition: elog.h:226
static TupleTableSlot * ExecHashJoin(PlanState *pstate)
Definition: nodeHashjoin.c:566
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:120
Oid opno
Definition: primnodes.h:504
bool inner_unique
Definition: plannodes.h:681
List * args
Definition: primnodes.h:510
#define innerPlanState(node)
Definition: execnodes.h:1031
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:139
Definition: pg_list.h:45
JoinState js
Definition: execnodes.h:1872
List * joinqual
Definition: plannodes.h:682
ExprState * hashclauses
Definition: execnodes.h:1873
Plan plan
Definition: plannodes.h:679

◆ ExecParallelHashJoin()

static TupleTableSlot* ExecParallelHashJoin ( PlanState pstate)
static

Definition at line 582 of file nodeHashjoin.c.

References ExecHashJoinImpl().

Referenced by ExecHashJoinInitializeDSM(), and ExecHashJoinInitializeWorker().

583 {
584  /*
585  * On sufficiently smart compilers this should be inlined with the
586  * parallel-oblivious branches removed.
587  */
588  return ExecHashJoinImpl(pstate, true);
589 }
static pg_attribute_always_inline TupleTableSlot * ExecHashJoinImpl(PlanState *pstate, bool parallel)
Definition: nodeHashjoin.c:165

◆ ExecParallelHashJoinNewBatch()

static bool ExecParallelHashJoinNewBatch ( HashJoinState hjstate)
static

Definition at line 1100 of file nodeHashjoin.c.

References BarrierArriveAndWait(), BarrierAttach(), BarrierDetach(), BarrierPhase(), ParallelHashJoinBatch::batch_barrier, HashJoinTableData::batches, HashJoinTableData::curbatch, ParallelHashJoinState::distributor, ParallelHashJoinBatchAccessor::done, elog, ERROR, ExecForceStoreMinimalTuple(), ExecHashTableDetachBatch(), ExecParallelHashTableAlloc(), ExecParallelHashTableInsertCurrentBatch(), ExecParallelHashTableSetCurrentBatch(), HashJoinState::hj_HashTable, HashJoinState::hj_HashTupleSlot, ParallelHashJoinBatchAccessor::inner_tuples, HashJoinTableData::nbatch, ParallelHashJoinBatchAccessor::outer_tuples, HashJoinTableData::parallel_state, pg_atomic_fetch_add_u32(), PHJ_BATCH_ALLOCATING, PHJ_BATCH_DONE, PHJ_BATCH_ELECTING, PHJ_BATCH_LOADING, PHJ_BATCH_PROBING, ParallelHashJoinBatchAccessor::shared, sts_begin_parallel_scan(), sts_end_parallel_scan(), sts_parallel_scan_next(), WAIT_EVENT_HASH_BATCH_ALLOCATING, WAIT_EVENT_HASH_BATCH_ELECTING, and WAIT_EVENT_HASH_BATCH_LOADING.

Referenced by ExecHashJoinImpl().

1101 {
1102  HashJoinTable hashtable = hjstate->hj_HashTable;
1103  int start_batchno;
1104  int batchno;
1105 
1106  /*
1107  * If we started up so late that the batch tracking array has been freed
1108  * already by ExecHashTableDetach(), then we are finished. See also
1109  * ExecParallelHashEnsureBatchAccessors().
1110  */
1111  if (hashtable->batches == NULL)
1112  return false;
1113 
1114  /*
1115  * If we were already attached to a batch, remember not to bother checking
1116  * it again, and detach from it (possibly freeing the hash table if we are
1117  * last to detach).
1118  */
1119  if (hashtable->curbatch >= 0)
1120  {
1121  hashtable->batches[hashtable->curbatch].done = true;
1122  ExecHashTableDetachBatch(hashtable);
1123  }
1124 
1125  /*
1126  * Search for a batch that isn't done. We use an atomic counter to start
1127  * our search at a different batch in every participant when there are
1128  * more batches than participants.
1129  */
1130  batchno = start_batchno =
1132  hashtable->nbatch;
1133  do
1134  {
1135  uint32 hashvalue;
1136  MinimalTuple tuple;
1137  TupleTableSlot *slot;
1138 
1139  if (!hashtable->batches[batchno].done)
1140  {
1141  SharedTuplestoreAccessor *inner_tuples;
1142  Barrier *batch_barrier =
1143  &hashtable->batches[batchno].shared->batch_barrier;
1144 
1145  switch (BarrierAttach(batch_barrier))
1146  {
1147  case PHJ_BATCH_ELECTING:
1148 
1149  /* One backend allocates the hash table. */
1150  if (BarrierArriveAndWait(batch_barrier,
1152  ExecParallelHashTableAlloc(hashtable, batchno);
1153  /* Fall through. */
1154 
1155  case PHJ_BATCH_ALLOCATING:
1156  /* Wait for allocation to complete. */
1157  BarrierArriveAndWait(batch_barrier,
1159  /* Fall through. */
1160 
1161  case PHJ_BATCH_LOADING:
1162  /* Start (or join in) loading tuples. */
1163  ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1164  inner_tuples = hashtable->batches[batchno].inner_tuples;
1165  sts_begin_parallel_scan(inner_tuples);
1166  while ((tuple = sts_parallel_scan_next(inner_tuples,
1167  &hashvalue)))
1168  {
1170  hjstate->hj_HashTupleSlot,
1171  false);
1172  slot = hjstate->hj_HashTupleSlot;
1174  hashvalue);
1175  }
1176  sts_end_parallel_scan(inner_tuples);
1177  BarrierArriveAndWait(batch_barrier,
1179  /* Fall through. */
1180 
1181  case PHJ_BATCH_PROBING:
1182 
1183  /*
1184  * This batch is ready to probe. Return control to
1185  * caller. We stay attached to batch_barrier so that the
1186  * hash table stays alive until everyone's finished
1187  * probing it, but no participant is allowed to wait at
1188  * this barrier again (or else a deadlock could occur).
1189  * All attached participants must eventually call
1190  * BarrierArriveAndDetach() so that the final phase
1191  * PHJ_BATCH_DONE can be reached.
1192  */
1193  ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1194  sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
1195  return true;
1196 
1197  case PHJ_BATCH_DONE:
1198 
1199  /*
1200  * Already done. Detach and go around again (if any
1201  * remain).
1202  */
1203  BarrierDetach(batch_barrier);
1204  hashtable->batches[batchno].done = true;
1205  hashtable->curbatch = -1;
1206  break;
1207 
1208  default:
1209  elog(ERROR, "unexpected batch phase %d",
1210  BarrierPhase(batch_barrier));
1211  }
1212  }
1213  batchno = (batchno + 1) % hashtable->nbatch;
1214  } while (batchno != start_batchno);
1215 
1216  return false;
1217 }
SharedTuplestoreAccessor * outer_tuples
Definition: hashjoin.h:209
#define PHJ_BATCH_DONE
Definition: hashjoin.h:268
void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
Definition: nodeHash.c:1746
void ExecHashTableDetachBatch(HashJoinTable hashtable)
Definition: nodeHash.c:3079
#define PHJ_BATCH_ALLOCATING
Definition: hashjoin.h:265
void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
Definition: nodeHash.c:3221
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
Definition: nodeHash.c:3059
#define PHJ_BATCH_LOADING
Definition: hashjoin.h:266
#define PHJ_BATCH_ELECTING
Definition: hashjoin.h:264
SharedTuplestoreAccessor * inner_tuples
Definition: hashjoin.h:208
#define ERROR
Definition: elog.h:43
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
void ExecForceStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1465
unsigned int uint32
Definition: c.h:358
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
int BarrierAttach(Barrier *barrier)
Definition: barrier.c:214
ParallelHashJoinState * parallel_state
Definition: hashjoin.h:357
#define PHJ_BATCH_PROBING
Definition: hashjoin.h:267
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:331
bool BarrierDetach(Barrier *barrier)
Definition: barrier.c:234
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:243
ParallelHashJoinBatch * shared
Definition: hashjoin.h:197
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
Definition: barrier.c:125
TupleTableSlot * hj_HashTupleSlot
Definition: execnodes.h:1884
pg_atomic_uint32 distributor
Definition: hashjoin.h:251
HashJoinTable hj_HashTable
Definition: execnodes.h:1878
#define elog(elevel,...)
Definition: elog.h:226
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)

◆ ExecParallelHashJoinOuterGetTuple()

static TupleTableSlot * ExecParallelHashJoinOuterGetTuple ( PlanState outerNode,
HashJoinState hjstate,
uint32 hashvalue 
)
static

Definition at line 895 of file nodeHashjoin.c.

References HashJoinTableData::batches, HashJoinTableData::curbatch, ExprContext::ecxt_outertuple, ExecClearTuple(), ExecForceStoreMinimalTuple(), ExecHashGetHashValue(), ExecProcNode(), HJ_FILL_OUTER, HashJoinState::hj_HashTable, HashJoinState::hj_OuterHashKeys, HashJoinState::hj_OuterTupleSlot, HashJoinState::js, HashJoinTableData::nbatch, ParallelHashJoinBatchAccessor::outer_tuples, JoinState::ps, PlanState::ps_ExprContext, sts_parallel_scan_next(), and TupIsNull.

Referenced by ExecHashJoinImpl().

898 {
899  HashJoinTable hashtable = hjstate->hj_HashTable;
900  int curbatch = hashtable->curbatch;
901  TupleTableSlot *slot;
902 
903  /*
904  * In the Parallel Hash case we only run the outer plan directly for
905  * single-batch hash joins. Otherwise we have to go to batch files, even
906  * for batch 0.
907  */
908  if (curbatch == 0 && hashtable->nbatch == 1)
909  {
910  slot = ExecProcNode(outerNode);
911 
912  while (!TupIsNull(slot))
913  {
914  ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
915 
916  econtext->ecxt_outertuple = slot;
917  if (ExecHashGetHashValue(hashtable, econtext,
918  hjstate->hj_OuterHashKeys,
919  true, /* outer tuple */
920  HJ_FILL_OUTER(hjstate),
921  hashvalue))
922  return slot;
923 
924  /*
925  * That tuple couldn't match because of a NULL, so discard it and
926  * continue with the next one.
927  */
928  slot = ExecProcNode(outerNode);
929  }
930  }
931  else if (curbatch < hashtable->nbatch)
932  {
933  MinimalTuple tuple;
934 
935  tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
936  hashvalue);
937  if (tuple != NULL)
938  {
940  hjstate->hj_OuterTupleSlot,
941  false);
942  slot = hjstate->hj_OuterTupleSlot;
943  return slot;
944  }
945  else
947  }
948 
949  /* End of this batch */
950  return NULL;
951 }
SharedTuplestoreAccessor * outer_tuples
Definition: hashjoin.h:209
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:424
PlanState ps
Definition: execnodes.h:1769
ExprContext * ps_ExprContext
Definition: execnodes.h:977
TupleTableSlot * hj_OuterTupleSlot
Definition: execnodes.h:1883
List * hj_OuterHashKeys
Definition: execnodes.h:1874
void ExecForceStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1465
#define TupIsNull(slot)
Definition: tuptable.h:293
#define HJ_FILL_OUTER(hjstate)
Definition: nodeHashjoin.c:132
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:239
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:226
HashJoinTable hj_HashTable
Definition: execnodes.h:1878
bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, bool outer_tuple, bool keep_nulls, uint32 *hashvalue)
Definition: nodeHash.c:1786
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
JoinState js
Definition: execnodes.h:1872

◆ ExecParallelHashJoinPartitionOuter()

static void ExecParallelHashJoinPartitionOuter ( HashJoinState node)
static

Definition at line 1398 of file nodeHashjoin.c.

References Assert, HashJoinTableData::batches, CHECK_FOR_INTERRUPTS, ExprContext::ecxt_outertuple, ExecFetchSlotMinimalTuple(), ExecHashGetBucketAndBatch(), ExecHashGetHashValue(), ExecProcNode(), heap_free_minimal_tuple(), HJ_FILL_OUTER, HashJoinState::hj_FirstOuterTupleSlot, HashJoinState::hj_HashTable, HashJoinState::hj_OuterHashKeys, i, HashJoinState::js, HashJoinTableData::nbatch, ParallelHashJoinBatchAccessor::outer_tuples, outerPlanState, JoinState::ps, PlanState::ps_ExprContext, sts_end_write(), sts_puttuple(), and TupIsNull.

Referenced by ExecHashJoinImpl().

1399 {
1400  PlanState *outerState = outerPlanState(hjstate);
1401  ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1402  HashJoinTable hashtable = hjstate->hj_HashTable;
1403  TupleTableSlot *slot;
1404  uint32 hashvalue;
1405  int i;
1406 
1407  Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
1408 
1409  /* Execute outer plan, writing all tuples to shared tuplestores. */
1410  for (;;)
1411  {
1412  slot = ExecProcNode(outerState);
1413  if (TupIsNull(slot))
1414  break;
1415  econtext->ecxt_outertuple = slot;
1416  if (ExecHashGetHashValue(hashtable, econtext,
1417  hjstate->hj_OuterHashKeys,
1418  true, /* outer tuple */
1419  HJ_FILL_OUTER(hjstate),
1420  &hashvalue))
1421  {
1422  int batchno;
1423  int bucketno;
1424  bool shouldFree;
1425  MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1426 
1427  ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
1428  &batchno);
1429  sts_puttuple(hashtable->batches[batchno].outer_tuples,
1430  &hashvalue, mintup);
1431 
1432  if (shouldFree)
1433  heap_free_minimal_tuple(mintup);
1434  }
1436  }
1437 
1438  /* Make sure all outer partitions are readable by any backend. */
1439  for (i = 0; i < hashtable->nbatch; ++i)
1440  sts_end_write(hashtable->batches[i].outer_tuples);
1441 }
SharedTuplestoreAccessor * outer_tuples
Definition: hashjoin.h:209
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1639
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
Definition: nodeHash.c:1890
void heap_free_minimal_tuple(MinimalTuple mtup)
Definition: heaptuple.c:1427
#define outerPlanState(node)
Definition: execnodes.h:1032
#define TupIsNull(slot)
Definition: tuptable.h:293
unsigned int uint32
Definition: c.h:358
#define HJ_FILL_OUTER(hjstate)
Definition: nodeHashjoin.c:132
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:239
ParallelHashJoinBatchAccessor * batches
Definition: hashjoin.h:358
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:226
#define Assert(condition)
Definition: c.h:732
int i
bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, bool outer_tuple, bool keep_nulls, uint32 *hashvalue)
Definition: nodeHash.c:1786
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
void sts_end_write(SharedTuplestoreAccessor *accessor)

◆ ExecReScanHashJoin()

void ExecReScanHashJoin ( HashJoinState node)

Definition at line 1312 of file nodeHashjoin.c.

References PlanState::chgParam, ExecHashTableDestroy(), ExecHashTableResetMatchFlags(), ExecReScan(), HJ_BUILD_HASHTABLE, HashJoinState::hj_CurBucketNo, HashJoinState::hj_CurHashValue, HashJoinState::hj_CurSkewBucketNo, HashJoinState::hj_CurTuple, HJ_FILL_INNER, HashJoinState::hj_FirstOuterTupleSlot, HashJoinState::hj_HashTable, HashJoinState::hj_JoinState, HashJoinState::hj_MatchedOuter, HJ_NEED_NEW_OUTER, HashJoinState::hj_OuterNotEmpty, INVALID_SKEW_BUCKET_NO, HashJoinState::js, PlanState::lefttree, HashJoinTableData::nbatch, JoinState::ps, and PlanState::righttree.

Referenced by ExecReScan().

1313 {
1314  /*
1315  * In a multi-batch join, we currently have to do rescans the hard way,
1316  * primarily because batch temp files may have already been released. But
1317  * if it's a single-batch join, and there is no parameter change for the
1318  * inner subnode, then we can just re-use the existing hash table without
1319  * rebuilding it.
1320  */
1321  if (node->hj_HashTable != NULL)
1322  {
1323  if (node->hj_HashTable->nbatch == 1 &&
1324  node->js.ps.righttree->chgParam == NULL)
1325  {
1326  /*
1327  * Okay to reuse the hash table; needn't rescan inner, either.
1328  *
1329  * However, if it's a right/full join, we'd better reset the
1330  * inner-tuple match flags contained in the table.
1331  */
1332  if (HJ_FILL_INNER(node))
1334 
1335  /*
1336  * Also, we need to reset our state about the emptiness of the
1337  * outer relation, so that the new scan of the outer will update
1338  * it correctly if it turns out to be empty this time. (There's no
1339  * harm in clearing it now because ExecHashJoin won't need the
1340  * info. In the other cases, where the hash table doesn't exist
1341  * or we are destroying it, we leave this state alone because
1342  * ExecHashJoin will need it the first time through.)
1343  */
1344  node->hj_OuterNotEmpty = false;
1345 
1346  /* ExecHashJoin can skip the BUILD_HASHTABLE step */
1348  }
1349  else
1350  {
1351  /* must destroy and rebuild hash table */
1353  node->hj_HashTable = NULL;
1355 
1356  /*
1357  * if chgParam of subnode is not null then plan will be re-scanned
1358  * by first ExecProcNode.
1359  */
1360  if (node->js.ps.righttree->chgParam == NULL)
1361  ExecReScan(node->js.ps.righttree);
1362  }
1363  }
1364 
1365  /* Always reset intra-tuple state */
1366  node->hj_CurHashValue = 0;
1367  node->hj_CurBucketNo = 0;
1369  node->hj_CurTuple = NULL;
1370 
1371  node->hj_MatchedOuter = false;
1372  node->hj_FirstOuterTupleSlot = NULL;
1373 
1374  /*
1375  * if chgParam of subnode is not null then plan will be re-scanned by
1376  * first ExecProcNode.
1377  */
1378  if (node->js.ps.lefttree->chgParam == NULL)
1379  ExecReScan(node->js.ps.lefttree);
1380 }
#define INVALID_SKEW_BUCKET_NO
Definition: hashjoin.h:109
PlanState ps
Definition: execnodes.h:1769
void ExecReScan(PlanState *node)
Definition: execAmi.c:77
bool hj_MatchedOuter
Definition: execnodes.h:1889
struct PlanState * righttree
Definition: execnodes.h:961
TupleTableSlot * hj_FirstOuterTupleSlot
Definition: execnodes.h:1887
struct PlanState * lefttree
Definition: execnodes.h:960
#define HJ_FILL_INNER(hjstate)
Definition: nodeHashjoin.c:134
uint32 hj_CurHashValue
Definition: execnodes.h:1879
int hj_CurSkewBucketNo
Definition: execnodes.h:1881
HashJoinTuple hj_CurTuple
Definition: execnodes.h:1882
Bitmapset * chgParam
Definition: execnodes.h:970
int hj_CurBucketNo
Definition: execnodes.h:1880
bool hj_OuterNotEmpty
Definition: execnodes.h:1890
#define HJ_NEED_NEW_OUTER
Definition: nodeHashjoin.c:125
#define HJ_BUILD_HASHTABLE
Definition: nodeHashjoin.c:124
HashJoinTable hj_HashTable
Definition: execnodes.h:1878
void ExecHashTableResetMatchFlags(HashJoinTable hashtable)
Definition: nodeHash.c:2158
JoinState js
Definition: execnodes.h:1872
void ExecHashTableDestroy(HashJoinTable hashtable)
Definition: nodeHash.c:852

◆ ExecShutdownHashJoin()

void ExecShutdownHashJoin ( HashJoinState node)

Definition at line 1383 of file nodeHashjoin.c.

References ExecHashTableDetach(), ExecHashTableDetachBatch(), and HashJoinState::hj_HashTable.

Referenced by ExecShutdownNode().

1384 {
1385  if (node->hj_HashTable)
1386  {
1387  /*
1388  * Detach from shared state before DSM memory goes away. This makes
1389  * sure that we don't have any pointers into DSM memory by the time
1390  * ExecEndHashJoin runs.
1391  */
1394  }
1395 }
void ExecHashTableDetachBatch(HashJoinTable hashtable)
Definition: nodeHash.c:3079
void ExecHashTableDetach(HashJoinTable hashtable)
Definition: nodeHash.c:3136
HashJoinTable hj_HashTable
Definition: execnodes.h:1878