PostgreSQL Source Code  git master
nodeGather.c File Reference
#include "postgres.h"
#include "executor/execParallel.h"
#include "executor/executor.h"
#include "executor/nodeGather.h"
#include "executor/tqueue.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "utils/wait_event.h"
Include dependency graph for nodeGather.c:

Go to the source code of this file.

Functions

static TupleTableSlotExecGather (PlanState *pstate)
 
static TupleTableSlotgather_getnext (GatherState *gatherstate)
 
static MinimalTuple gather_readnext (GatherState *gatherstate)
 
static void ExecShutdownGatherWorkers (GatherState *node)
 
GatherStateExecInitGather (Gather *node, EState *estate, int eflags)
 
void ExecEndGather (GatherState *node)
 
void ExecShutdownGather (GatherState *node)
 
void ExecReScanGather (GatherState *node)
 

Function Documentation

◆ ExecEndGather()

void ExecEndGather ( GatherState node)

Definition at line 244 of file nodeGather.c.

245 {
246  ExecEndNode(outerPlanState(node)); /* let children clean up first */
247  ExecShutdownGather(node);
248 }
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:562
#define outerPlanState(node)
Definition: execnodes.h:1216
void ExecShutdownGather(GatherState *node)
Definition: nodeGather.c:411

References ExecEndNode(), ExecShutdownGather(), and outerPlanState.

Referenced by ExecEndNode().

◆ ExecGather()

static TupleTableSlot * ExecGather ( PlanState pstate)
static

Definition at line 137 of file nodeGather.c.

138 {
139  GatherState *node = castNode(GatherState, pstate);
140  TupleTableSlot *slot;
141  ExprContext *econtext;
142 
144 
145  /*
146  * Initialize the parallel context and workers on first execution. We do
147  * this on first execution rather than during node initialization, as it
148  * needs to allocate a large dynamic segment, so it is better to do it
149  * only if it is really needed.
150  */
151  if (!node->initialized)
152  {
153  EState *estate = node->ps.state;
154  Gather *gather = (Gather *) node->ps.plan;
155 
156  /*
157  * Sometimes we might have to run without parallelism; but if parallel
158  * mode is active then we can try to fire up some workers.
159  */
160  if (gather->num_workers > 0 && estate->es_use_parallel_mode)
161  {
162  ParallelContext *pcxt;
163 
164  /* Initialize, or re-initialize, shared state needed by workers. */
165  if (!node->pei)
167  estate,
168  gather->initParam,
169  gather->num_workers,
170  node->tuples_needed);
171  else
173  node->pei,
174  gather->initParam);
175 
176  /*
177  * Register backend workers. We might not get as many as we
178  * requested, or indeed any at all.
179  */
180  pcxt = node->pei->pcxt;
181  LaunchParallelWorkers(pcxt);
182  /* We save # workers launched for the benefit of EXPLAIN */
183  node->nworkers_launched = pcxt->nworkers_launched;
184 
185  /* Set up tuple queue readers to read the results. */
186  if (pcxt->nworkers_launched > 0)
187  {
189  /* Make a working array showing the active readers */
190  node->nreaders = pcxt->nworkers_launched;
191  node->reader = (TupleQueueReader **)
192  palloc(node->nreaders * sizeof(TupleQueueReader *));
193  memcpy(node->reader, node->pei->reader,
194  node->nreaders * sizeof(TupleQueueReader *));
195  }
196  else
197  {
198  /* No workers? Then never mind. */
199  node->nreaders = 0;
200  node->reader = NULL;
201  }
202  node->nextreader = 0;
203  }
204 
205  /* Run plan locally if no workers or enabled and not single-copy. */
206  node->need_to_scan_locally = (node->nreaders == 0)
208  node->initialized = true;
209  }
210 
211  /*
212  * Reset per-tuple memory context to free any expression evaluation
213  * storage allocated in the previous tuple cycle.
214  */
215  econtext = node->ps.ps_ExprContext;
216  ResetExprContext(econtext);
217 
218  /*
219  * Get next tuple, either from one of our workers, or by running the plan
220  * ourselves.
221  */
222  slot = gather_getnext(node);
223  if (TupIsNull(slot))
224  return NULL;
225 
226  /* If no projection is required, we're done. */
227  if (node->ps.ps_ProjInfo == NULL)
228  return slot;
229 
230  /*
231  * Form the result tuple using ExecProject(), and return it.
232  */
233  econtext->ecxt_outertuple = slot;
234  return ExecProject(node->ps.ps_ProjInfo);
235 }
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition: parallel.c:552
ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed)
Definition: execParallel.c:587
void ExecParallelReinitialize(PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParams)
Definition: execParallel.c:904
void ExecParallelCreateReaders(ParallelExecutorInfo *pei)
Definition: execParallel.c:878
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
Definition: executor.h:387
#define ResetExprContext(econtext)
Definition: executor.h:555
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
void * palloc(Size size)
Definition: mcxt.c:1317
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
static TupleTableSlot * gather_getnext(GatherState *gatherstate)
Definition: nodeGather.c:256
#define castNode(_type_, nodeptr)
Definition: nodes.h:176
bool parallel_leader_participation
Definition: planner.c:69
bool es_use_parallel_mode
Definition: execnodes.h:706
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:262
bool initialized
Definition: execnodes.h:2699
struct ParallelExecutorInfo * pei
Definition: execnodes.h:2704
int nextreader
Definition: execnodes.h:2708
int nworkers_launched
Definition: execnodes.h:2706
PlanState ps
Definition: execnodes.h:2698
struct TupleQueueReader ** reader
Definition: execnodes.h:2709
int64 tuples_needed
Definition: execnodes.h:2701
bool need_to_scan_locally
Definition: execnodes.h:2700
int num_workers
Definition: plannodes.h:1144
Bitmapset * initParam
Definition: plannodes.h:1148
bool single_copy
Definition: plannodes.h:1146
int nworkers_launched
Definition: parallel.h:37
ParallelContext * pcxt
Definition: execParallel.h:27
struct TupleQueueReader ** reader
Definition: execParallel.h:37
Plan * plan
Definition: execnodes.h:1120
EState * state
Definition: execnodes.h:1122
ExprContext * ps_ExprContext
Definition: execnodes.h:1159
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:1160
#define TupIsNull(slot)
Definition: tuptable.h:306

References castNode, CHECK_FOR_INTERRUPTS, ExprContext::ecxt_outertuple, EState::es_use_parallel_mode, ExecInitParallelPlan(), ExecParallelCreateReaders(), ExecParallelReinitialize(), ExecProject(), gather_getnext(), if(), GatherState::initialized, Gather::initParam, LaunchParallelWorkers(), GatherState::need_to_scan_locally, GatherState::nextreader, GatherState::nreaders, Gather::num_workers, ParallelContext::nworkers_launched, GatherState::nworkers_launched, outerPlanState, palloc(), parallel_leader_participation, ParallelExecutorInfo::pcxt, GatherState::pei, PlanState::plan, GatherState::ps, PlanState::ps_ExprContext, PlanState::ps_ProjInfo, ParallelExecutorInfo::reader, GatherState::reader, ResetExprContext, Gather::single_copy, PlanState::state, TupIsNull, and GatherState::tuples_needed.

Referenced by ExecInitGather().

◆ ExecInitGather()

GatherState* ExecInitGather ( Gather node,
EState estate,
int  eflags 
)

Definition at line 53 of file nodeGather.c.

54 {
55  GatherState *gatherstate;
56  Plan *outerNode;
57  TupleDesc tupDesc;
58 
59  /* Gather node doesn't have innerPlan node. */
60  Assert(innerPlan(node) == NULL);
61 
62  /*
63  * create state structure
64  */
65  gatherstate = makeNode(GatherState);
66  gatherstate->ps.plan = (Plan *) node;
67  gatherstate->ps.state = estate;
68  gatherstate->ps.ExecProcNode = ExecGather;
69 
70  gatherstate->initialized = false;
71  gatherstate->need_to_scan_locally =
73  gatherstate->tuples_needed = -1;
74 
75  /*
76  * Miscellaneous initialization
77  *
78  * create expression context for node
79  */
80  ExecAssignExprContext(estate, &gatherstate->ps);
81 
82  /*
83  * now initialize outer plan
84  */
85  outerNode = outerPlan(node);
86  outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
87  tupDesc = ExecGetResultType(outerPlanState(gatherstate));
88 
89  /*
90  * Leader may access ExecProcNode result directly (if
91  * need_to_scan_locally), or from workers via tuple queue. So we can't
92  * trivially rely on the slot type being fixed for expressions evaluated
93  * within this node.
94  */
95  gatherstate->ps.outeropsset = true;
96  gatherstate->ps.outeropsfixed = false;
97 
98  /*
99  * Initialize result type and projection.
100  */
101  ExecInitResultTypeTL(&gatherstate->ps);
102  ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
103 
104  /*
105  * Without projections result slot type is not trivially known, see
106  * comment above.
107  */
108  if (gatherstate->ps.ps_ProjInfo == NULL)
109  {
110  gatherstate->ps.resultopsset = true;
111  gatherstate->ps.resultopsfixed = false;
112  }
113 
114  /*
115  * Initialize funnel slot to same tuple descriptor as outer plan.
116  */
117  gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
119 
120  /*
121  * Gather doesn't support checking a qual (it's always more efficient to
122  * do it in the child node).
123  */
124  Assert(!node->plan.qual);
125 
126  return gatherstate;
127 }
#define Assert(condition)
Definition: c.h:858
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:142
void ExecInitResultTypeTL(PlanState *planstate)
Definition: execTuples.c:1842
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1918
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:86
TupleDesc ExecGetResultType(PlanState *planstate)
Definition: execUtils.c:493
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:483
void ExecConditionalAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc, int varno)
Definition: execUtils.c:558
static TupleTableSlot * ExecGather(PlanState *pstate)
Definition: nodeGather.c:137
#define makeNode(_type_)
Definition: nodes.h:155
#define innerPlan(node)
Definition: plannodes.h:182
#define outerPlan(node)
Definition: plannodes.h:183
#define OUTER_VAR
Definition: primnodes.h:237
TupleTableSlot * funnel_slot
Definition: execnodes.h:2703
Plan plan
Definition: plannodes.h:1143
bool outeropsset
Definition: execnodes.h:1203
bool resultopsset
Definition: execnodes.h:1205
bool outeropsfixed
Definition: execnodes.h:1199
bool resultopsfixed
Definition: execnodes.h:1201
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:1126
List * qual
Definition: plannodes.h:154

References Assert, ExecAssignExprContext(), ExecConditionalAssignProjectionInfo(), ExecGather(), ExecGetResultType(), ExecInitExtraTupleSlot(), ExecInitNode(), ExecInitResultTypeTL(), PlanState::ExecProcNode, GatherState::funnel_slot, GatherState::initialized, innerPlan, makeNode, GatherState::need_to_scan_locally, OUTER_VAR, PlanState::outeropsfixed, PlanState::outeropsset, outerPlan, outerPlanState, parallel_leader_participation, PlanState::plan, Gather::plan, GatherState::ps, PlanState::ps_ProjInfo, Plan::qual, PlanState::resultopsfixed, PlanState::resultopsset, Gather::single_copy, PlanState::state, TTSOpsMinimalTuple, and GatherState::tuples_needed.

Referenced by ExecInitNode().

◆ ExecReScanGather()

void ExecReScanGather ( GatherState node)

Definition at line 435 of file nodeGather.c.

436 {
437  Gather *gather = (Gather *) node->ps.plan;
439 
440  /* Make sure any existing workers are gracefully shut down */
442 
443  /* Mark node so that shared state will be rebuilt at next call */
444  node->initialized = false;
445 
446  /*
447  * Set child node's chgParam to tell it that the next scan might deliver a
448  * different set of rows within the leader process. (The overall rowset
449  * shouldn't change, but the leader process's subset might; hence nodes
450  * between here and the parallel table scan node mustn't optimize on the
451  * assumption of an unchanging rowset.)
452  */
453  if (gather->rescan_param >= 0)
454  outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
455  gather->rescan_param);
456 
457  /*
458  * If chgParam of subnode is not null then plan will be re-scanned by
459  * first ExecProcNode. Note: because this does nothing if we have a
460  * rescan_param, it's currently guaranteed that parallel-aware child nodes
461  * will not see a ReScan call until after they get a ReInitializeDSM call.
462  * That ordering might not be something to rely on, though. A good rule
463  * of thumb is that ReInitializeDSM should reset only shared state, ReScan
464  * should reset only local state, and anything that depends on both of
465  * those steps being finished must wait until the first ExecProcNode call.
466  */
467  if (outerPlan->chgParam == NULL)
469 }
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
static void ExecShutdownGatherWorkers(GatherState *node)
Definition: nodeGather.c:393
int rescan_param
Definition: plannodes.h:1145

References bms_add_member(), ExecReScan(), ExecShutdownGatherWorkers(), GatherState::initialized, outerPlan, outerPlanState, PlanState::plan, GatherState::ps, and Gather::rescan_param.

Referenced by ExecReScan().

◆ ExecShutdownGather()

void ExecShutdownGather ( GatherState node)

Definition at line 411 of file nodeGather.c.

412 {
414 
415  /* Now destroy the parallel context. */
416  if (node->pei != NULL)
417  {
418  ExecParallelCleanup(node->pei);
419  node->pei = NULL;
420  }
421 }
void ExecParallelCleanup(ParallelExecutorInfo *pei)

References ExecParallelCleanup(), ExecShutdownGatherWorkers(), and GatherState::pei.

Referenced by ExecEndGather(), and ExecShutdownNode_walker().

◆ ExecShutdownGatherWorkers()

static void ExecShutdownGatherWorkers ( GatherState node)
static

Definition at line 393 of file nodeGather.c.

394 {
395  if (node->pei != NULL)
396  ExecParallelFinish(node->pei);
397 
398  /* Flush local copy of reader array */
399  if (node->reader)
400  pfree(node->reader);
401  node->reader = NULL;
402 }
void ExecParallelFinish(ParallelExecutorInfo *pei)
void pfree(void *pointer)
Definition: mcxt.c:1521

References ExecParallelFinish(), GatherState::pei, pfree(), and GatherState::reader.

Referenced by ExecReScanGather(), ExecShutdownGather(), and gather_readnext().

◆ gather_getnext()

static TupleTableSlot * gather_getnext ( GatherState gatherstate)
static

Definition at line 256 of file nodeGather.c.

257 {
258  PlanState *outerPlan = outerPlanState(gatherstate);
259  TupleTableSlot *outerTupleSlot;
260  TupleTableSlot *fslot = gatherstate->funnel_slot;
261  MinimalTuple tup;
262 
263  while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
264  {
266 
267  if (gatherstate->nreaders > 0)
268  {
269  tup = gather_readnext(gatherstate);
270 
271  if (HeapTupleIsValid(tup))
272  {
273  ExecStoreMinimalTuple(tup, /* tuple to store */
274  fslot, /* slot to store the tuple */
275  false); /* don't pfree tuple */
276  return fslot;
277  }
278  }
279 
280  if (gatherstate->need_to_scan_locally)
281  {
282  EState *estate = gatherstate->ps.state;
283 
284  /* Install our DSA area while executing the plan. */
285  estate->es_query_dsa =
286  gatherstate->pei ? gatherstate->pei->area : NULL;
287  outerTupleSlot = ExecProcNode(outerPlan);
288  estate->es_query_dsa = NULL;
289 
290  if (!TupIsNull(outerTupleSlot))
291  return outerTupleSlot;
292 
293  gatherstate->need_to_scan_locally = false;
294  }
295  }
296 
297  return ExecClearTuple(fslot);
298 }
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1533
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:273
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static MinimalTuple gather_readnext(GatherState *gatherstate)
Definition: nodeGather.c:304
struct dsa_area * es_query_dsa
Definition: execnodes.h:709
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:454

References ParallelExecutorInfo::area, CHECK_FOR_INTERRUPTS, EState::es_query_dsa, ExecClearTuple(), ExecProcNode(), ExecStoreMinimalTuple(), GatherState::funnel_slot, gather_readnext(), HeapTupleIsValid, GatherState::need_to_scan_locally, GatherState::nreaders, outerPlan, outerPlanState, GatherState::pei, GatherState::ps, PlanState::state, and TupIsNull.

Referenced by ExecGather().

◆ gather_readnext()

static MinimalTuple gather_readnext ( GatherState gatherstate)
static

Definition at line 304 of file nodeGather.c.

305 {
306  int nvisited = 0;
307 
308  for (;;)
309  {
310  TupleQueueReader *reader;
311  MinimalTuple tup;
312  bool readerdone;
313 
314  /* Check for async events, particularly messages from workers. */
316 
317  /*
318  * Attempt to read a tuple, but don't block if none is available.
319  *
320  * Note that TupleQueueReaderNext will just return NULL for a worker
321  * which fails to initialize. We'll treat that worker as having
322  * produced no tuples; WaitForParallelWorkersToFinish will error out
323  * when we get there.
324  */
325  Assert(gatherstate->nextreader < gatherstate->nreaders);
326  reader = gatherstate->reader[gatherstate->nextreader];
327  tup = TupleQueueReaderNext(reader, true, &readerdone);
328 
329  /*
330  * If this reader is done, remove it from our working array of active
331  * readers. If all readers are done, we're outta here.
332  */
333  if (readerdone)
334  {
335  Assert(!tup);
336  --gatherstate->nreaders;
337  if (gatherstate->nreaders == 0)
338  {
339  ExecShutdownGatherWorkers(gatherstate);
340  return NULL;
341  }
342  memmove(&gatherstate->reader[gatherstate->nextreader],
343  &gatherstate->reader[gatherstate->nextreader + 1],
344  sizeof(TupleQueueReader *)
345  * (gatherstate->nreaders - gatherstate->nextreader));
346  if (gatherstate->nextreader >= gatherstate->nreaders)
347  gatherstate->nextreader = 0;
348  continue;
349  }
350 
351  /* If we got a tuple, return it. */
352  if (tup)
353  return tup;
354 
355  /*
356  * Advance nextreader pointer in round-robin fashion. Note that we
357  * only reach this code if we weren't able to get a tuple from the
358  * current worker. We used to advance the nextreader pointer after
359  * every tuple, but it turns out to be much more efficient to keep
360  * reading from the same queue until that would require blocking.
361  */
362  gatherstate->nextreader++;
363  if (gatherstate->nextreader >= gatherstate->nreaders)
364  gatherstate->nextreader = 0;
365 
366  /* Have we visited every (surviving) TupleQueueReader? */
367  nvisited++;
368  if (nvisited >= gatherstate->nreaders)
369  {
370  /*
371  * If (still) running plan locally, return NULL so caller can
372  * generate another tuple from the local copy of the plan.
373  */
374  if (gatherstate->need_to_scan_locally)
375  return NULL;
376 
377  /* Nothing to do except wait for developments. */
379  WAIT_EVENT_EXECUTE_GATHER);
381  nvisited = 0;
382  }
383  }
384 }
struct Latch * MyLatch
Definition: globals.c:62
void ResetLatch(Latch *latch)
Definition: latch.c:724
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:517
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
MinimalTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Definition: tqueue.c:176

References Assert, CHECK_FOR_INTERRUPTS, ExecShutdownGatherWorkers(), MyLatch, GatherState::need_to_scan_locally, GatherState::nextreader, GatherState::nreaders, GatherState::reader, ResetLatch(), TupleQueueReaderNext(), WaitLatch(), WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by gather_getnext().