PostgreSQL Source Code  git master
nodeGather.c File Reference
#include "postgres.h"
#include "access/relscan.h"
#include "access/xact.h"
#include "executor/execdebug.h"
#include "executor/execParallel.h"
#include "executor/nodeGather.h"
#include "executor/nodeSubplan.h"
#include "executor/tqueue.h"
#include "miscadmin.h"
#include "optimizer/planmain.h"
#include "pgstat.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for nodeGather.c:

Go to the source code of this file.

Functions

static TupleTableSlotExecGather (PlanState *pstate)
 
static TupleTableSlotgather_getnext (GatherState *gatherstate)
 
static HeapTuple gather_readnext (GatherState *gatherstate)
 
static void ExecShutdownGatherWorkers (GatherState *node)
 
GatherStateExecInitGather (Gather *node, EState *estate, int eflags)
 
void ExecEndGather (GatherState *node)
 
void ExecShutdownGather (GatherState *node)
 
void ExecReScanGather (GatherState *node)
 

Function Documentation

◆ ExecEndGather()

void ExecEndGather ( GatherState node)

Definition at line 238 of file nodeGather.c.

References ExecClearTuple(), ExecEndNode(), ExecFreeExprContext(), ExecShutdownGather(), outerPlanState, GatherState::ps, and PlanState::ps_ResultTupleSlot.

Referenced by ExecEndNode().

239 {
240  ExecEndNode(outerPlanState(node)); /* let children clean up first */
241  ExecShutdownGather(node);
242  ExecFreeExprContext(&node->ps);
244 }
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:539
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:603
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:891
#define outerPlanState(node)
Definition: execnodes.h:904
PlanState ps
Definition: execnodes.h:1953
void ExecShutdownGather(GatherState *node)
Definition: nodeGather.c:398

◆ ExecGather()

static TupleTableSlot * ExecGather ( PlanState pstate)
static

Definition at line 131 of file nodeGather.c.

References castNode, CHECK_FOR_INTERRUPTS, ExprContext::ecxt_outertuple, EState::es_use_parallel_mode, ExecInitParallelPlan(), ExecParallelCreateReaders(), ExecParallelReinitialize(), ExecProject(), gather_getnext(), GatherState::initialized, Gather::initParam, LaunchParallelWorkers(), PlanState::lefttree, GatherState::need_to_scan_locally, GatherState::nextreader, GatherState::nreaders, Gather::num_workers, ParallelContext::nworkers_launched, GatherState::nworkers_launched, palloc(), parallel_leader_participation, ParallelExecutorInfo::pcxt, GatherState::pei, PlanState::plan, GatherState::ps, PlanState::ps_ExprContext, PlanState::ps_ProjInfo, ParallelExecutorInfo::reader, GatherState::reader, ResetExprContext, Gather::single_copy, PlanState::state, TupIsNull, and GatherState::tuples_needed.

Referenced by ExecInitGather().

132 {
133  GatherState *node = castNode(GatherState, pstate);
134  TupleTableSlot *slot;
135  ExprContext *econtext;
136 
138 
139  /*
140  * Initialize the parallel context and workers on first execution. We do
141  * this on first execution rather than during node initialization, as it
142  * needs to allocate a large dynamic segment, so it is better to do it
143  * only if it is really needed.
144  */
145  if (!node->initialized)
146  {
147  EState *estate = node->ps.state;
148  Gather *gather = (Gather *) node->ps.plan;
149 
150  /*
151  * Sometimes we might have to run without parallelism; but if parallel
152  * mode is active then we can try to fire up some workers.
153  */
154  if (gather->num_workers > 0 && estate->es_use_parallel_mode)
155  {
156  ParallelContext *pcxt;
157 
158  /* Initialize, or re-initialize, shared state needed by workers. */
159  if (!node->pei)
160  node->pei = ExecInitParallelPlan(node->ps.lefttree,
161  estate,
162  gather->initParam,
163  gather->num_workers,
164  node->tuples_needed);
165  else
167  node->pei,
168  gather->initParam);
169 
170  /*
171  * Register backend workers. We might not get as many as we
172  * requested, or indeed any at all.
173  */
174  pcxt = node->pei->pcxt;
175  LaunchParallelWorkers(pcxt);
176  /* We save # workers launched for the benefit of EXPLAIN */
177  node->nworkers_launched = pcxt->nworkers_launched;
178 
179  /* Set up tuple queue readers to read the results. */
180  if (pcxt->nworkers_launched > 0)
181  {
183  /* Make a working array showing the active readers */
184  node->nreaders = pcxt->nworkers_launched;
185  node->reader = (TupleQueueReader **)
186  palloc(node->nreaders * sizeof(TupleQueueReader *));
187  memcpy(node->reader, node->pei->reader,
188  node->nreaders * sizeof(TupleQueueReader *));
189  }
190  else
191  {
192  /* No workers? Then never mind. */
193  node->nreaders = 0;
194  node->reader = NULL;
195  }
196  node->nextreader = 0;
197  }
198 
199  /* Run plan locally if no workers or enabled and not single-copy. */
200  node->need_to_scan_locally = (node->nreaders == 0)
202  node->initialized = true;
203  }
204 
205  /*
206  * Reset per-tuple memory context to free any expression evaluation
207  * storage allocated in the previous tuple cycle.
208  */
209  econtext = node->ps.ps_ExprContext;
210  ResetExprContext(econtext);
211 
212  /*
213  * Get next tuple, either from one of our workers, or by running the plan
214  * ourselves.
215  */
216  slot = gather_getnext(node);
217  if (TupIsNull(slot))
218  return NULL;
219 
220  /* If no projection is required, we're done. */
221  if (node->ps.ps_ProjInfo == NULL)
222  return slot;
223 
224  /*
225  * Form the result tuple using ExecProject(), and return it.
226  */
227  econtext->ecxt_outertuple = slot;
228  return ExecProject(node->ps.ps_ProjInfo);
229 }
ParallelContext * pcxt
Definition: execParallel.h:27
int nworkers_launched
Definition: execnodes.h:1961
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:893
#define castNode(_type_, nodeptr)
Definition: nodes.h:581
ExprContext * ps_ExprContext
Definition: execnodes.h:892
bool need_to_scan_locally
Definition: execnodes.h:1955
struct TupleQueueReader ** reader
Definition: execnodes.h:1964
EState * state
Definition: execnodes.h:860
struct PlanState * lefttree
Definition: execnodes.h:877
bool es_use_parallel_mode
Definition: execnodes.h:521
bool initialized
Definition: execnodes.h:1954
bool single_copy
Definition: plannodes.h:843
struct ParallelExecutorInfo * pei
Definition: execnodes.h:1959
void ExecParallelCreateReaders(ParallelExecutorInfo *pei)
Definition: execParallel.c:796
struct TupleQueueReader ** reader
Definition: execParallel.h:35
#define TupIsNull(slot)
Definition: tuptable.h:138
PlanState ps
Definition: execnodes.h:1953
int nworkers_launched
Definition: parallel.h:37
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition: parallel.c:451
Plan * plan
Definition: execnodes.h:858
int num_workers
Definition: plannodes.h:841
bool parallel_leader_participation
Definition: planner.c:64
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:210
void ExecParallelReinitialize(PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParams)
Definition: execParallel.c:822
static TupleTableSlot * gather_getnext(GatherState *gatherstate)
Definition: nodeGather.c:252
Bitmapset * initParam
Definition: plannodes.h:845
int nextreader
Definition: execnodes.h:1963
void * palloc(Size size)
Definition: mcxt.c:835
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed)
Definition: execParallel.c:561
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
Definition: executor.h:326
#define ResetExprContext(econtext)
Definition: executor.h:468
int64 tuples_needed
Definition: execnodes.h:1956

◆ ExecInitGather()

GatherState* ExecInitGather ( Gather node,
EState estate,
int  eflags 
)

Definition at line 58 of file nodeGather.c.

References Assert, ExecAssignExprContext(), ExecAssignResultTypeFromTL(), ExecConditionalAssignProjectionInfo(), ExecContextForcesOids(), ExecGather(), ExecInitExtraTupleSlot(), ExecInitNode(), ExecInitResultTupleSlot(), PlanState::ExecProcNode, ExecSetSlotDescriptor(), ExecTypeFromTL(), GatherState::funnel_slot, GatherState::initialized, innerPlan, makeNode, GatherState::need_to_scan_locally, OUTER_VAR, outerPlan, outerPlanState, parallel_leader_participation, Gather::plan, PlanState::plan, GatherState::ps, Plan::qual, Gather::single_copy, PlanState::state, Plan::targetlist, and GatherState::tuples_needed.

Referenced by ExecInitNode().

59 {
60  GatherState *gatherstate;
61  Plan *outerNode;
62  bool hasoid;
63  TupleDesc tupDesc;
64 
65  /* Gather node doesn't have innerPlan node. */
66  Assert(innerPlan(node) == NULL);
67 
68  /*
69  * create state structure
70  */
71  gatherstate = makeNode(GatherState);
72  gatherstate->ps.plan = (Plan *) node;
73  gatherstate->ps.state = estate;
74  gatherstate->ps.ExecProcNode = ExecGather;
75 
76  gatherstate->initialized = false;
77  gatherstate->need_to_scan_locally =
79  gatherstate->tuples_needed = -1;
80 
81  /*
82  * Miscellaneous initialization
83  *
84  * create expression context for node
85  */
86  ExecAssignExprContext(estate, &gatherstate->ps);
87 
88  /*
89  * Gather doesn't support checking a qual (it's always more efficient to
90  * do it in the child node).
91  */
92  Assert(!node->plan.qual);
93 
94  /*
95  * tuple table initialization
96  */
97  gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate);
98  ExecInitResultTupleSlot(estate, &gatherstate->ps);
99 
100  /*
101  * now initialize outer plan
102  */
103  outerNode = outerPlan(node);
104  outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
105 
106  /*
107  * Initialize funnel slot to same tuple descriptor as outer plan.
108  */
109  if (!ExecContextForcesOids(outerPlanState(gatherstate), &hasoid))
110  hasoid = false;
111  tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
112  ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
113 
114  /*
115  * Initialize result tuple type and projection info.
116  */
117  ExecAssignResultTypeFromTL(&gatherstate->ps);
118  ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
119 
120  return gatherstate;
121 }
List * qual
Definition: plannodes.h:145
static TupleTableSlot * ExecGather(PlanState *pstate)
Definition: nodeGather.c:131
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
bool need_to_scan_locally
Definition: execnodes.h:1955
EState * state
Definition: execnodes.h:860
void ExecConditionalAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc, Index varno)
Definition: execUtils.c:515
void ExecAssignResultTypeFromTL(PlanState *planstate)
Definition: execUtils.c:448
bool initialized
Definition: execnodes.h:1954
Plan plan
Definition: plannodes.h:840
bool single_copy
Definition: plannodes.h:843
void ExecInitResultTupleSlot(EState *estate, PlanState *planstate)
Definition: execTuples.c:832
#define outerPlanState(node)
Definition: execnodes.h:904
#define innerPlan(node)
Definition: plannodes.h:173
PlanState ps
Definition: execnodes.h:1953
TupleDesc ExecTypeFromTL(List *targetList, bool hasoid)
Definition: execTuples.c:888
#define outerPlan(node)
Definition: plannodes.h:174
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:864
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
TupleTableSlot * funnel_slot
Definition: execnodes.h:1958
Plan * plan
Definition: execnodes.h:858
bool parallel_leader_participation
Definition: planner.c:64
#define makeNode(_type_)
Definition: nodes.h:560
#define Assert(condition)
Definition: c.h:680
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:426
List * targetlist
Definition: plannodes.h:144
bool ExecContextForcesOids(PlanState *planstate, bool *hasoids)
Definition: execMain.c:1513
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:139
#define OUTER_VAR
Definition: primnodes.h:154
int64 tuples_needed
Definition: execnodes.h:1956

◆ ExecReScanGather()

void ExecReScanGather ( GatherState node)

Definition at line 422 of file nodeGather.c.

References bms_add_member(), ExecReScan(), ExecShutdownGatherWorkers(), GatherState::initialized, outerPlan, outerPlanState, PlanState::plan, GatherState::ps, and Gather::rescan_param.

Referenced by ExecReScan().

423 {
424  Gather *gather = (Gather *) node->ps.plan;
426 
427  /* Make sure any existing workers are gracefully shut down */
429 
430  /* Mark node so that shared state will be rebuilt at next call */
431  node->initialized = false;
432 
433  /*
434  * Set child node's chgParam to tell it that the next scan might deliver a
435  * different set of rows within the leader process. (The overall rowset
436  * shouldn't change, but the leader process's subset might; hence nodes
437  * between here and the parallel table scan node mustn't optimize on the
438  * assumption of an unchanging rowset.)
439  */
440  if (gather->rescan_param >= 0)
441  outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
442  gather->rescan_param);
443 
444  /*
445  * If chgParam of subnode is not null then plan will be re-scanned by
446  * first ExecProcNode. Note: because this does nothing if we have a
447  * rescan_param, it's currently guaranteed that parallel-aware child nodes
448  * will not see a ReScan call until after they get a ReInitializeDSM call.
449  * That ordering might not be something to rely on, though. A good rule
450  * of thumb is that ReInitializeDSM should reset only shared state, ReScan
451  * should reset only local state, and anything that depends on both of
452  * those steps being finished must wait until the first ExecProcNode call.
453  */
454  if (outerPlan->chgParam == NULL)
456 }
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
bool initialized
Definition: execnodes.h:1954
static void ExecShutdownGatherWorkers(GatherState *node)
Definition: nodeGather.c:380
#define outerPlanState(node)
Definition: execnodes.h:904
PlanState ps
Definition: execnodes.h:1953
#define outerPlan(node)
Definition: plannodes.h:174
Plan * plan
Definition: execnodes.h:858
int rescan_param
Definition: plannodes.h:842
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:742

◆ ExecShutdownGather()

void ExecShutdownGather ( GatherState node)

Definition at line 398 of file nodeGather.c.

References ExecParallelCleanup(), ExecShutdownGatherWorkers(), and GatherState::pei.

Referenced by ExecEndGather(), and ExecShutdownNode().

399 {
401 
402  /* Now destroy the parallel context. */
403  if (node->pei != NULL)
404  {
405  ExecParallelCleanup(node->pei);
406  node->pei = NULL;
407  }
408 }
static void ExecShutdownGatherWorkers(GatherState *node)
Definition: nodeGather.c:380
struct ParallelExecutorInfo * pei
Definition: execnodes.h:1959
void ExecParallelCleanup(ParallelExecutorInfo *pei)

◆ ExecShutdownGatherWorkers()

static void ExecShutdownGatherWorkers ( GatherState node)
static

Definition at line 380 of file nodeGather.c.

References ExecParallelFinish(), GatherState::pei, pfree(), and GatherState::reader.

Referenced by ExecReScanGather(), and ExecShutdownGather().

381 {
382  if (node->pei != NULL)
383  ExecParallelFinish(node->pei);
384 
385  /* Flush local copy of reader array */
386  if (node->reader)
387  pfree(node->reader);
388  node->reader = NULL;
389 }
void ExecParallelFinish(ParallelExecutorInfo *pei)
Definition: execParallel.c:995
struct TupleQueueReader ** reader
Definition: execnodes.h:1964
void pfree(void *pointer)
Definition: mcxt.c:936
struct ParallelExecutorInfo * pei
Definition: execnodes.h:1959

◆ gather_getnext()

static TupleTableSlot * gather_getnext ( GatherState gatherstate)
static

Definition at line 252 of file nodeGather.c.

References ParallelExecutorInfo::area, CHECK_FOR_INTERRUPTS, EState::es_query_dsa, ExecClearTuple(), ExecProcNode(), ExecStoreTuple(), GatherState::funnel_slot, gather_readnext(), HeapTupleIsValid, InvalidBuffer, GatherState::need_to_scan_locally, GatherState::nreaders, outerPlan, outerPlanState, GatherState::pei, GatherState::ps, PlanState::state, and TupIsNull.

Referenced by ExecGather().

253 {
254  PlanState *outerPlan = outerPlanState(gatherstate);
255  TupleTableSlot *outerTupleSlot;
256  TupleTableSlot *fslot = gatherstate->funnel_slot;
257  HeapTuple tup;
258 
259  while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
260  {
262 
263  if (gatherstate->nreaders > 0)
264  {
265  tup = gather_readnext(gatherstate);
266 
267  if (HeapTupleIsValid(tup))
268  {
269  ExecStoreTuple(tup, /* tuple to store */
270  fslot, /* slot in which to store the tuple */
271  InvalidBuffer, /* buffer associated with this
272  * tuple */
273  true); /* pfree tuple when done with it */
274  return fslot;
275  }
276  }
277 
278  if (gatherstate->need_to_scan_locally)
279  {
280  EState *estate = gatherstate->ps.state;
281 
282  /* Install our DSA area while executing the plan. */
283  estate->es_query_dsa =
284  gatherstate->pei ? gatherstate->pei->area : NULL;
285  outerTupleSlot = ExecProcNode(outerPlan);
286  estate->es_query_dsa = NULL;
287 
288  if (!TupIsNull(outerTupleSlot))
289  return outerTupleSlot;
290 
291  gatherstate->need_to_scan_locally = false;
292  }
293  }
294 
295  return ExecClearTuple(fslot);
296 }
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:320
struct dsa_area * es_query_dsa
Definition: execnodes.h:524
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
#define InvalidBuffer
Definition: buf.h:25
bool need_to_scan_locally
Definition: execnodes.h:1955
EState * state
Definition: execnodes.h:860
struct ParallelExecutorInfo * pei
Definition: execnodes.h:1959
#define outerPlanState(node)
Definition: execnodes.h:904
#define TupIsNull(slot)
Definition: tuptable.h:138
PlanState ps
Definition: execnodes.h:1953
static HeapTuple gather_readnext(GatherState *gatherstate)
Definition: nodeGather.c:302
#define outerPlan(node)
Definition: plannodes.h:174
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:240
TupleTableSlot * funnel_slot
Definition: execnodes.h:1958
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98

◆ gather_readnext()

static HeapTuple gather_readnext ( GatherState gatherstate)
static

Definition at line 302 of file nodeGather.c.

References Assert, CHECK_FOR_INTERRUPTS, memmove, MyLatch, GatherState::need_to_scan_locally, GatherState::nextreader, GatherState::nreaders, GatherState::reader, ResetLatch(), TupleQueueReaderNext(), WAIT_EVENT_EXECUTE_GATHER, WaitLatch(), and WL_LATCH_SET.

Referenced by gather_getnext().

303 {
304  int nvisited = 0;
305 
306  for (;;)
307  {
308  TupleQueueReader *reader;
309  HeapTuple tup;
310  bool readerdone;
311 
312  /* Check for async events, particularly messages from workers. */
314 
315  /* Attempt to read a tuple, but don't block if none is available. */
316  Assert(gatherstate->nextreader < gatherstate->nreaders);
317  reader = gatherstate->reader[gatherstate->nextreader];
318  tup = TupleQueueReaderNext(reader, true, &readerdone);
319 
320  /*
321  * If this reader is done, remove it from our working array of active
322  * readers. If all readers are done, we're outta here.
323  */
324  if (readerdone)
325  {
326  Assert(!tup);
327  --gatherstate->nreaders;
328  if (gatherstate->nreaders == 0)
329  return NULL;
330  memmove(&gatherstate->reader[gatherstate->nextreader],
331  &gatherstate->reader[gatherstate->nextreader + 1],
332  sizeof(TupleQueueReader *)
333  * (gatherstate->nreaders - gatherstate->nextreader));
334  if (gatherstate->nextreader >= gatherstate->nreaders)
335  gatherstate->nextreader = 0;
336  continue;
337  }
338 
339  /* If we got a tuple, return it. */
340  if (tup)
341  return tup;
342 
343  /*
344  * Advance nextreader pointer in round-robin fashion. Note that we
345  * only reach this code if we weren't able to get a tuple from the
346  * current worker. We used to advance the nextreader pointer after
347  * every tuple, but it turns out to be much more efficient to keep
348  * reading from the same queue until that would require blocking.
349  */
350  gatherstate->nextreader++;
351  if (gatherstate->nextreader >= gatherstate->nreaders)
352  gatherstate->nextreader = 0;
353 
354  /* Have we visited every (surviving) TupleQueueReader? */
355  nvisited++;
356  if (nvisited >= gatherstate->nreaders)
357  {
358  /*
359  * If (still) running plan locally, return NULL so caller can
360  * generate another tuple from the local copy of the plan.
361  */
362  if (gatherstate->need_to_scan_locally)
363  return NULL;
364 
365  /* Nothing to do except wait for developments. */
368  nvisited = 0;
369  }
370  }
371 }
bool need_to_scan_locally
Definition: execnodes.h:1955
HeapTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Definition: tqueue.c:172
struct TupleQueueReader ** reader
Definition: execnodes.h:1964
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:336
#define memmove(d, s, c)
Definition: c.h:1069
#define Assert(condition)
Definition: c.h:680
int nextreader
Definition: execnodes.h:1963
struct Latch * MyLatch
Definition: globals.c:52
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
#define WL_LATCH_SET
Definition: latch.h:124