PostgreSQL Source Code  git master
nodeGather.c File Reference
#include "postgres.h"
#include "access/relscan.h"
#include "access/xact.h"
#include "executor/execdebug.h"
#include "executor/execParallel.h"
#include "executor/nodeGather.h"
#include "executor/nodeSubplan.h"
#include "executor/tqueue.h"
#include "miscadmin.h"
#include "optimizer/planmain.h"
#include "pgstat.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for nodeGather.c:

Go to the source code of this file.

Functions

static TupleTableSlotExecGather (PlanState *pstate)
 
static TupleTableSlotgather_getnext (GatherState *gatherstate)
 
static HeapTuple gather_readnext (GatherState *gatherstate)
 
static void ExecShutdownGatherWorkers (GatherState *node)
 
GatherStateExecInitGather (Gather *node, EState *estate, int eflags)
 
void ExecEndGather (GatherState *node)
 
void ExecShutdownGather (GatherState *node)
 
void ExecReScanGather (GatherState *node)
 

Function Documentation

◆ ExecEndGather()

void ExecEndGather ( GatherState node)

Definition at line 229 of file nodeGather.c.

References ExecClearTuple(), ExecEndNode(), ExecFreeExprContext(), ExecShutdownGather(), outerPlanState, GatherState::ps, and PlanState::ps_ResultTupleSlot.

Referenced by ExecEndNode().

230 {
231  ExecEndNode(outerPlanState(node)); /* let children clean up first */
232  ExecShutdownGather(node);
233  ExecFreeExprContext(&node->ps);
235 }
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:538
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:475
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:566
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:946
#define outerPlanState(node)
Definition: execnodes.h:966
PlanState ps
Definition: execnodes.h:2053
void ExecShutdownGather(GatherState *node)
Definition: nodeGather.c:396

◆ ExecGather()

static TupleTableSlot * ExecGather ( PlanState pstate)
static

Definition at line 122 of file nodeGather.c.

References castNode, CHECK_FOR_INTERRUPTS, ExprContext::ecxt_outertuple, EState::es_use_parallel_mode, ExecInitParallelPlan(), ExecParallelCreateReaders(), ExecParallelReinitialize(), ExecProject(), gather_getnext(), GatherState::initialized, Gather::initParam, LaunchParallelWorkers(), PlanState::lefttree, GatherState::need_to_scan_locally, GatherState::nextreader, GatherState::nreaders, Gather::num_workers, ParallelContext::nworkers_launched, GatherState::nworkers_launched, palloc(), parallel_leader_participation, ParallelExecutorInfo::pcxt, GatherState::pei, PlanState::plan, GatherState::ps, PlanState::ps_ExprContext, PlanState::ps_ProjInfo, ParallelExecutorInfo::reader, GatherState::reader, ResetExprContext, Gather::single_copy, PlanState::state, TupIsNull, and GatherState::tuples_needed.

Referenced by ExecInitGather().

123 {
124  GatherState *node = castNode(GatherState, pstate);
125  TupleTableSlot *slot;
126  ExprContext *econtext;
127 
129 
130  /*
131  * Initialize the parallel context and workers on first execution. We do
132  * this on first execution rather than during node initialization, as it
133  * needs to allocate a large dynamic segment, so it is better to do it
134  * only if it is really needed.
135  */
136  if (!node->initialized)
137  {
138  EState *estate = node->ps.state;
139  Gather *gather = (Gather *) node->ps.plan;
140 
141  /*
142  * Sometimes we might have to run without parallelism; but if parallel
143  * mode is active then we can try to fire up some workers.
144  */
145  if (gather->num_workers > 0 && estate->es_use_parallel_mode)
146  {
147  ParallelContext *pcxt;
148 
149  /* Initialize, or re-initialize, shared state needed by workers. */
150  if (!node->pei)
151  node->pei = ExecInitParallelPlan(node->ps.lefttree,
152  estate,
153  gather->initParam,
154  gather->num_workers,
155  node->tuples_needed);
156  else
158  node->pei,
159  gather->initParam);
160 
161  /*
162  * Register backend workers. We might not get as many as we
163  * requested, or indeed any at all.
164  */
165  pcxt = node->pei->pcxt;
166  LaunchParallelWorkers(pcxt);
167  /* We save # workers launched for the benefit of EXPLAIN */
168  node->nworkers_launched = pcxt->nworkers_launched;
169 
170  /* Set up tuple queue readers to read the results. */
171  if (pcxt->nworkers_launched > 0)
172  {
174  /* Make a working array showing the active readers */
175  node->nreaders = pcxt->nworkers_launched;
176  node->reader = (TupleQueueReader **)
177  palloc(node->nreaders * sizeof(TupleQueueReader *));
178  memcpy(node->reader, node->pei->reader,
179  node->nreaders * sizeof(TupleQueueReader *));
180  }
181  else
182  {
183  /* No workers? Then never mind. */
184  node->nreaders = 0;
185  node->reader = NULL;
186  }
187  node->nextreader = 0;
188  }
189 
190  /* Run plan locally if no workers or enabled and not single-copy. */
191  node->need_to_scan_locally = (node->nreaders == 0)
193  node->initialized = true;
194  }
195 
196  /*
197  * Reset per-tuple memory context to free any expression evaluation
198  * storage allocated in the previous tuple cycle.
199  */
200  econtext = node->ps.ps_ExprContext;
201  ResetExprContext(econtext);
202 
203  /*
204  * Get next tuple, either from one of our workers, or by running the plan
205  * ourselves.
206  */
207  slot = gather_getnext(node);
208  if (TupIsNull(slot))
209  return NULL;
210 
211  /* If no projection is required, we're done. */
212  if (node->ps.ps_ProjInfo == NULL)
213  return slot;
214 
215  /*
216  * Form the result tuple using ExecProject(), and return it.
217  */
218  econtext->ecxt_outertuple = slot;
219  return ExecProject(node->ps.ps_ProjInfo);
220 }
ParallelContext * pcxt
Definition: execParallel.h:27
int nworkers_launched
Definition: execnodes.h:2061
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:948
#define castNode(_type_, nodeptr)
Definition: nodes.h:586
ExprContext * ps_ExprContext
Definition: execnodes.h:947
bool need_to_scan_locally
Definition: execnodes.h:2055
struct TupleQueueReader ** reader
Definition: execnodes.h:2064
EState * state
Definition: execnodes.h:914
struct PlanState * lefttree
Definition: execnodes.h:931
bool es_use_parallel_mode
Definition: execnodes.h:562
bool initialized
Definition: execnodes.h:2054
bool single_copy
Definition: plannodes.h:862
struct ParallelExecutorInfo * pei
Definition: execnodes.h:2059
void ExecParallelCreateReaders(ParallelExecutorInfo *pei)
Definition: execParallel.c:798
struct TupleQueueReader ** reader
Definition: execParallel.h:35
#define TupIsNull(slot)
Definition: tuptable.h:146
PlanState ps
Definition: execnodes.h:2053
int nworkers_launched
Definition: parallel.h:37
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition: parallel.c:481
Plan * plan
Definition: execnodes.h:912
int num_workers
Definition: plannodes.h:860
bool parallel_leader_participation
Definition: planner.c:65
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:222
void ExecParallelReinitialize(PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParams)
Definition: execParallel.c:824
static TupleTableSlot * gather_getnext(GatherState *gatherstate)
Definition: nodeGather.c:243
Bitmapset * initParam
Definition: plannodes.h:864
int nextreader
Definition: execnodes.h:2063
void * palloc(Size size)
Definition: mcxt.c:924
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed)
Definition: execParallel.c:562
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
Definition: executor.h:324
#define ResetExprContext(econtext)
Definition: executor.h:483
int64 tuples_needed
Definition: execnodes.h:2056

◆ ExecInitGather()

GatherState* ExecInitGather ( Gather node,
EState estate,
int  eflags 
)

Definition at line 58 of file nodeGather.c.

References Assert, ExecAssignExprContext(), ExecConditionalAssignProjectionInfo(), ExecGather(), ExecGetResultType(), ExecInitExtraTupleSlot(), ExecInitNode(), ExecInitResultTupleSlotTL(), PlanState::ExecProcNode, GatherState::funnel_slot, GatherState::initialized, innerPlan, makeNode, GatherState::need_to_scan_locally, OUTER_VAR, outerPlan, outerPlanState, parallel_leader_participation, Gather::plan, PlanState::plan, GatherState::ps, Plan::qual, Gather::single_copy, PlanState::state, and GatherState::tuples_needed.

Referenced by ExecInitNode().

59 {
60  GatherState *gatherstate;
61  Plan *outerNode;
62  TupleDesc tupDesc;
63 
64  /* Gather node doesn't have innerPlan node. */
65  Assert(innerPlan(node) == NULL);
66 
67  /*
68  * create state structure
69  */
70  gatherstate = makeNode(GatherState);
71  gatherstate->ps.plan = (Plan *) node;
72  gatherstate->ps.state = estate;
73  gatherstate->ps.ExecProcNode = ExecGather;
74 
75  gatherstate->initialized = false;
76  gatherstate->need_to_scan_locally =
78  gatherstate->tuples_needed = -1;
79 
80  /*
81  * Miscellaneous initialization
82  *
83  * create expression context for node
84  */
85  ExecAssignExprContext(estate, &gatherstate->ps);
86 
87  /*
88  * now initialize outer plan
89  */
90  outerNode = outerPlan(node);
91  outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
92  tupDesc = ExecGetResultType(outerPlanState(gatherstate));
93 
94  /*
95  * Initialize result slot, type and projection.
96  */
97  ExecInitResultTupleSlotTL(estate, &gatherstate->ps);
98  ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
99 
100  /*
101  * Initialize funnel slot to same tuple descriptor as outer plan.
102  */
103  gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc);
104 
105  /*
106  * Gather doesn't support checking a qual (it's always more efficient to
107  * do it in the child node).
108  */
109  Assert(!node->plan.qual);
110 
111  return gatherstate;
112 }
List * qual
Definition: plannodes.h:147
static TupleTableSlot * ExecGather(PlanState *pstate)
Definition: nodeGather.c:122
bool need_to_scan_locally
Definition: execnodes.h:2055
EState * state
Definition: execnodes.h:914
void ExecConditionalAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc, Index varno)
Definition: execUtils.c:476
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc)
Definition: execTuples.c:931
bool initialized
Definition: execnodes.h:2054
Plan plan
Definition: plannodes.h:859
bool single_copy
Definition: plannodes.h:862
#define outerPlanState(node)
Definition: execnodes.h:966
#define innerPlan(node)
Definition: plannodes.h:175
PlanState ps
Definition: execnodes.h:2053
#define outerPlan(node)
Definition: plannodes.h:176
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:918
TupleTableSlot * funnel_slot
Definition: execnodes.h:2058
void ExecInitResultTupleSlotTL(EState *estate, PlanState *planstate)
Definition: execTuples.c:890
Plan * plan
Definition: execnodes.h:912
bool parallel_leader_participation
Definition: planner.c:65
#define makeNode(_type_)
Definition: nodes.h:565
#define Assert(condition)
Definition: c.h:699
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:428
TupleDesc ExecGetResultType(PlanState *planstate)
Definition: execUtils.c:438
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:139
#define OUTER_VAR
Definition: primnodes.h:155
int64 tuples_needed
Definition: execnodes.h:2056

◆ ExecReScanGather()

void ExecReScanGather ( GatherState node)

Definition at line 420 of file nodeGather.c.

References bms_add_member(), ExecReScan(), ExecShutdownGatherWorkers(), GatherState::initialized, outerPlan, outerPlanState, PlanState::plan, GatherState::ps, and Gather::rescan_param.

Referenced by ExecReScan().

421 {
422  Gather *gather = (Gather *) node->ps.plan;
424 
425  /* Make sure any existing workers are gracefully shut down */
427 
428  /* Mark node so that shared state will be rebuilt at next call */
429  node->initialized = false;
430 
431  /*
432  * Set child node's chgParam to tell it that the next scan might deliver a
433  * different set of rows within the leader process. (The overall rowset
434  * shouldn't change, but the leader process's subset might; hence nodes
435  * between here and the parallel table scan node mustn't optimize on the
436  * assumption of an unchanging rowset.)
437  */
438  if (gather->rescan_param >= 0)
439  outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
440  gather->rescan_param);
441 
442  /*
443  * If chgParam of subnode is not null then plan will be re-scanned by
444  * first ExecProcNode. Note: because this does nothing if we have a
445  * rescan_param, it's currently guaranteed that parallel-aware child nodes
446  * will not see a ReScan call until after they get a ReInitializeDSM call.
447  * That ordering might not be something to rely on, though. A good rule
448  * of thumb is that ReInitializeDSM should reset only shared state, ReScan
449  * should reset only local state, and anything that depends on both of
450  * those steps being finished must wait until the first ExecProcNode call.
451  */
452  if (outerPlan->chgParam == NULL)
454 }
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
bool initialized
Definition: execnodes.h:2054
static void ExecShutdownGatherWorkers(GatherState *node)
Definition: nodeGather.c:378
#define outerPlanState(node)
Definition: execnodes.h:966
PlanState ps
Definition: execnodes.h:2053
#define outerPlan(node)
Definition: plannodes.h:176
Plan * plan
Definition: execnodes.h:912
int rescan_param
Definition: plannodes.h:861
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:764

◆ ExecShutdownGather()

void ExecShutdownGather ( GatherState node)

Definition at line 396 of file nodeGather.c.

References ExecParallelCleanup(), ExecShutdownGatherWorkers(), and GatherState::pei.

Referenced by ExecEndGather(), and ExecShutdownNode().

397 {
399 
400  /* Now destroy the parallel context. */
401  if (node->pei != NULL)
402  {
403  ExecParallelCleanup(node->pei);
404  node->pei = NULL;
405  }
406 }
static void ExecShutdownGatherWorkers(GatherState *node)
Definition: nodeGather.c:378
struct ParallelExecutorInfo * pei
Definition: execnodes.h:2059
void ExecParallelCleanup(ParallelExecutorInfo *pei)

◆ ExecShutdownGatherWorkers()

static void ExecShutdownGatherWorkers ( GatherState node)
static

Definition at line 378 of file nodeGather.c.

References ExecParallelFinish(), GatherState::pei, pfree(), and GatherState::reader.

Referenced by ExecReScanGather(), and ExecShutdownGather().

379 {
380  if (node->pei != NULL)
381  ExecParallelFinish(node->pei);
382 
383  /* Flush local copy of reader array */
384  if (node->reader)
385  pfree(node->reader);
386  node->reader = NULL;
387 }
void ExecParallelFinish(ParallelExecutorInfo *pei)
Definition: execParallel.c:997
struct TupleQueueReader ** reader
Definition: execnodes.h:2064
void pfree(void *pointer)
Definition: mcxt.c:1031
struct ParallelExecutorInfo * pei
Definition: execnodes.h:2059

◆ gather_getnext()

static TupleTableSlot * gather_getnext ( GatherState gatherstate)
static

Definition at line 243 of file nodeGather.c.

References ParallelExecutorInfo::area, CHECK_FOR_INTERRUPTS, EState::es_query_dsa, ExecClearTuple(), ExecProcNode(), ExecStoreTuple(), GatherState::funnel_slot, gather_readnext(), HeapTupleIsValid, InvalidBuffer, GatherState::need_to_scan_locally, GatherState::nreaders, outerPlan, outerPlanState, GatherState::pei, GatherState::ps, PlanState::state, and TupIsNull.

Referenced by ExecGather().

244 {
245  PlanState *outerPlan = outerPlanState(gatherstate);
246  TupleTableSlot *outerTupleSlot;
247  TupleTableSlot *fslot = gatherstate->funnel_slot;
248  HeapTuple tup;
249 
250  while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
251  {
253 
254  if (gatherstate->nreaders > 0)
255  {
256  tup = gather_readnext(gatherstate);
257 
258  if (HeapTupleIsValid(tup))
259  {
260  ExecStoreTuple(tup, /* tuple to store */
261  fslot, /* slot in which to store the tuple */
262  InvalidBuffer, /* buffer associated with this
263  * tuple */
264  true); /* pfree tuple when done with it */
265  return fslot;
266  }
267  }
268 
269  if (gatherstate->need_to_scan_locally)
270  {
271  EState *estate = gatherstate->ps.state;
272 
273  /* Install our DSA area while executing the plan. */
274  estate->es_query_dsa =
275  gatherstate->pei ? gatherstate->pei->area : NULL;
276  outerTupleSlot = ExecProcNode(outerPlan);
277  estate->es_query_dsa = NULL;
278 
279  if (!TupIsNull(outerTupleSlot))
280  return outerTupleSlot;
281 
282  gatherstate->need_to_scan_locally = false;
283  }
284  }
285 
286  return ExecClearTuple(fslot);
287 }
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:356
struct dsa_area * es_query_dsa
Definition: execnodes.h:565
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:475
#define InvalidBuffer
Definition: buf.h:25
bool need_to_scan_locally
Definition: execnodes.h:2055
EState * state
Definition: execnodes.h:914
struct ParallelExecutorInfo * pei
Definition: execnodes.h:2059
#define outerPlanState(node)
Definition: execnodes.h:966
#define TupIsNull(slot)
Definition: tuptable.h:146
PlanState ps
Definition: execnodes.h:2053
static HeapTuple gather_readnext(GatherState *gatherstate)
Definition: nodeGather.c:293
#define outerPlan(node)
Definition: plannodes.h:176
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:233
TupleTableSlot * funnel_slot
Definition: execnodes.h:2058
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98

◆ gather_readnext()

static HeapTuple gather_readnext ( GatherState gatherstate)
static

Definition at line 293 of file nodeGather.c.

References Assert, CHECK_FOR_INTERRUPTS, memmove, MyLatch, GatherState::need_to_scan_locally, GatherState::nextreader, GatherState::nreaders, GatherState::reader, ResetLatch(), TupleQueueReaderNext(), WAIT_EVENT_EXECUTE_GATHER, WaitLatch(), and WL_LATCH_SET.

Referenced by gather_getnext().

294 {
295  int nvisited = 0;
296 
297  for (;;)
298  {
299  TupleQueueReader *reader;
300  HeapTuple tup;
301  bool readerdone;
302 
303  /* Check for async events, particularly messages from workers. */
305 
306  /*
307  * Attempt to read a tuple, but don't block if none is available.
308  *
309  * Note that TupleQueueReaderNext will just return NULL for a worker
310  * which fails to initialize. We'll treat that worker as having
311  * produced no tuples; WaitForParallelWorkersToFinish will error out
312  * when we get there.
313  */
314  Assert(gatherstate->nextreader < gatherstate->nreaders);
315  reader = gatherstate->reader[gatherstate->nextreader];
316  tup = TupleQueueReaderNext(reader, true, &readerdone);
317 
318  /*
319  * If this reader is done, remove it from our working array of active
320  * readers. If all readers are done, we're outta here.
321  */
322  if (readerdone)
323  {
324  Assert(!tup);
325  --gatherstate->nreaders;
326  if (gatherstate->nreaders == 0)
327  return NULL;
328  memmove(&gatherstate->reader[gatherstate->nextreader],
329  &gatherstate->reader[gatherstate->nextreader + 1],
330  sizeof(TupleQueueReader *)
331  * (gatherstate->nreaders - gatherstate->nextreader));
332  if (gatherstate->nextreader >= gatherstate->nreaders)
333  gatherstate->nextreader = 0;
334  continue;
335  }
336 
337  /* If we got a tuple, return it. */
338  if (tup)
339  return tup;
340 
341  /*
342  * Advance nextreader pointer in round-robin fashion. Note that we
343  * only reach this code if we weren't able to get a tuple from the
344  * current worker. We used to advance the nextreader pointer after
345  * every tuple, but it turns out to be much more efficient to keep
346  * reading from the same queue until that would require blocking.
347  */
348  gatherstate->nextreader++;
349  if (gatherstate->nextreader >= gatherstate->nreaders)
350  gatherstate->nextreader = 0;
351 
352  /* Have we visited every (surviving) TupleQueueReader? */
353  nvisited++;
354  if (nvisited >= gatherstate->nreaders)
355  {
356  /*
357  * If (still) running plan locally, return NULL so caller can
358  * generate another tuple from the local copy of the plan.
359  */
360  if (gatherstate->need_to_scan_locally)
361  return NULL;
362 
363  /* Nothing to do except wait for developments. */
366  nvisited = 0;
367  }
368  }
369 }
bool need_to_scan_locally
Definition: execnodes.h:2055
HeapTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Definition: tqueue.c:172
struct TupleQueueReader ** reader
Definition: execnodes.h:2064
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:336
#define memmove(d, s, c)
Definition: c.h:1135
#define Assert(condition)
Definition: c.h:699
int nextreader
Definition: execnodes.h:2063
struct Latch * MyLatch
Definition: globals.c:53
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
#define WL_LATCH_SET
Definition: latch.h:124