PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
nodeGather.c File Reference
#include "postgres.h"
#include "access/relscan.h"
#include "access/xact.h"
#include "executor/execdebug.h"
#include "executor/execParallel.h"
#include "executor/nodeGather.h"
#include "executor/nodeSubplan.h"
#include "executor/tqueue.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for nodeGather.c:

Go to the source code of this file.

Functions

static TupleTableSlotgather_getnext (GatherState *gatherstate)
 
static HeapTuple gather_readnext (GatherState *gatherstate)
 
static void ExecShutdownGatherWorkers (GatherState *node)
 
GatherStateExecInitGather (Gather *node, EState *estate, int eflags)
 
TupleTableSlotExecGather (GatherState *node)
 
void ExecEndGather (GatherState *node)
 
void ExecShutdownGather (GatherState *node)
 
void ExecReScanGather (GatherState *node)
 

Function Documentation

void ExecEndGather ( GatherState node)

Definition at line 230 of file nodeGather.c.

References ExecClearTuple(), ExecEndNode(), ExecFreeExprContext(), ExecShutdownGather(), outerPlanState, GatherState::ps, and PlanState::ps_ResultTupleSlot.

Referenced by ExecEndNode().

231 {
232  ExecShutdownGather(node);
233  ExecFreeExprContext(&node->ps);
236 }
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:624
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:684
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:1076
#define outerPlanState(node)
Definition: execnodes.h:1089
PlanState ps
Definition: execnodes.h:2003
void ExecShutdownGather(GatherState *node)
Definition: nodeGather.c:404
TupleTableSlot* ExecGather ( GatherState node)

Definition at line 128 of file nodeGather.c.

References ParallelWorkerInfo::bgwhandle, CreateTupleQueueReader(), ExprContext::ecxt_outertuple, ExecClearTuple(), ExecInitParallelPlan(), ExecProject(), ExecShutdownGatherWorkers(), GatherState::funnel_slot, gather_getnext(), i, GatherState::initialized, IsInParallelMode(), LaunchParallelWorkers(), PlanState::lefttree, GatherState::need_to_scan_locally, GatherState::nextreader, GatherState::nreaders, NULL, Gather::num_workers, ParallelContext::nworkers_launched, GatherState::nworkers_launched, palloc(), ParallelExecutorInfo::pcxt, GatherState::pei, PlanState::plan, GatherState::ps, PlanState::ps_ExprContext, PlanState::ps_ProjInfo, GatherState::reader, ResetExprContext, shm_mq_set_handle(), Gather::single_copy, PlanState::state, ParallelExecutorInfo::tqueue, TupleTableSlot::tts_tupleDescriptor, TupIsNull, and ParallelContext::worker.

Referenced by ExecProcNode().

129 {
130  TupleTableSlot *fslot = node->funnel_slot;
131  int i;
132  TupleTableSlot *slot;
133  ExprContext *econtext;
134 
135  /*
136  * Initialize the parallel context and workers on first execution. We do
137  * this on first execution rather than during node initialization, as it
138  * needs to allocate a large dynamic segment, so it is better to do it
139  * only if it is really needed.
140  */
141  if (!node->initialized)
142  {
143  EState *estate = node->ps.state;
144  Gather *gather = (Gather *) node->ps.plan;
145 
146  /*
147  * Sometimes we might have to run without parallelism; but if parallel
148  * mode is active then we can try to fire up some workers.
149  */
150  if (gather->num_workers > 0 && IsInParallelMode())
151  {
152  ParallelContext *pcxt;
153 
154  /* Initialize the workers required to execute Gather node. */
155  if (!node->pei)
156  node->pei = ExecInitParallelPlan(node->ps.lefttree,
157  estate,
158  gather->num_workers);
159 
160  /*
161  * Register backend workers. We might not get as many as we
162  * requested, or indeed any at all.
163  */
164  pcxt = node->pei->pcxt;
165  LaunchParallelWorkers(pcxt);
166  node->nworkers_launched = pcxt->nworkers_launched;
167 
168  /* Set up tuple queue readers to read the results. */
169  if (pcxt->nworkers_launched > 0)
170  {
171  node->nreaders = 0;
172  node->nextreader = 0;
173  node->reader =
174  palloc(pcxt->nworkers_launched * sizeof(TupleQueueReader *));
175 
176  for (i = 0; i < pcxt->nworkers_launched; ++i)
177  {
178  shm_mq_set_handle(node->pei->tqueue[i],
179  pcxt->worker[i].bgwhandle);
180  node->reader[node->nreaders++] =
182  fslot->tts_tupleDescriptor);
183  }
184  }
185  else
186  {
187  /* No workers? Then never mind. */
189  }
190  }
191 
192  /* Run plan locally if no workers or not single-copy. */
193  node->need_to_scan_locally = (node->reader == NULL)
194  || !gather->single_copy;
195  node->initialized = true;
196  }
197 
198  /*
199  * Reset per-tuple memory context to free any expression evaluation
200  * storage allocated in the previous tuple cycle. This will also clear
201  * any previous tuple returned by a TupleQueueReader; to make sure we
202  * don't leave a dangling pointer around, clear the working slot first.
203  */
204  ExecClearTuple(fslot);
205  econtext = node->ps.ps_ExprContext;
206  ResetExprContext(econtext);
207 
208  /*
209  * Get next tuple, either from one of our workers, or by running the plan
210  * ourselves.
211  */
212  slot = gather_getnext(node);
213  if (TupIsNull(slot))
214  return NULL;
215 
216  /*
217  * Form the result tuple using ExecProject(), and return it.
218  */
219  econtext->ecxt_outertuple = slot;
220  return ExecProject(node->ps.ps_ProjInfo);
221 }
TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
Definition: execQual.c:5214
ParallelContext * pcxt
Definition: execParallel.h:27
int nworkers_launched
Definition: execnodes.h:2008
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:1078
ExprContext * ps_ExprContext
Definition: execnodes.h:1077
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
bool need_to_scan_locally
Definition: execnodes.h:2011
struct TupleQueueReader ** reader
Definition: execnodes.h:2009
ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
Definition: execParallel.c:355
EState * state
Definition: execnodes.h:1048
struct PlanState * lefttree
Definition: execnodes.h:1062
ParallelWorkerInfo * worker
Definition: parallel.h:46
bool IsInParallelMode(void)
Definition: xact.c:912
bool initialized
Definition: execnodes.h:2004
static void ExecShutdownGatherWorkers(GatherState *node)
Definition: nodeGather.c:376
bool single_copy
Definition: plannodes.h:784
struct ParallelExecutorInfo * pei
Definition: execnodes.h:2005
BackgroundWorkerHandle * bgwhandle
Definition: parallel.h:27
#define TupIsNull(slot)
Definition: tuptable.h:138
PlanState ps
Definition: execnodes.h:2003
int nworkers_launched
Definition: parallel.h:37
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition: parallel.c:433
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:121
TupleTableSlot * funnel_slot
Definition: execnodes.h:2010
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:311
Plan * plan
Definition: execnodes.h:1046
int num_workers
Definition: plannodes.h:783
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:130
#define NULL
Definition: c.h:226
static TupleTableSlot * gather_getnext(GatherState *gatherstate)
Definition: nodeGather.c:244
int nextreader
Definition: execnodes.h:2007
void * palloc(Size size)
Definition: mcxt.c:891
int i
TupleQueueReader * CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
Definition: tqueue.c:633
shm_mq_handle ** tqueue
Definition: execParallel.h:30
#define ResetExprContext(econtext)
Definition: executor.h:332
GatherState* ExecInitGather ( Gather node,
EState estate,
int  eflags 
)

Definition at line 56 of file nodeGather.c.

References Assert, ExecAssignExprContext(), ExecAssignProjectionInfo(), ExecAssignResultTypeFromTL(), ExecContextForcesOids(), ExecInitExpr(), ExecInitExtraTupleSlot(), ExecInitNode(), ExecInitResultTupleSlot(), ExecSetSlotDescriptor(), ExecTypeFromTL(), GatherState::funnel_slot, innerPlan, makeNode, GatherState::need_to_scan_locally, NULL, outerPlan, outerPlanState, Gather::plan, PlanState::plan, GatherState::ps, Plan::qual, PlanState::qual, Gather::single_copy, PlanState::state, Plan::targetlist, and PlanState::targetlist.

Referenced by ExecInitNode().

57 {
58  GatherState *gatherstate;
59  Plan *outerNode;
60  bool hasoid;
61  TupleDesc tupDesc;
62 
63  /* Gather node doesn't have innerPlan node. */
64  Assert(innerPlan(node) == NULL);
65 
66  /*
67  * create state structure
68  */
69  gatherstate = makeNode(GatherState);
70  gatherstate->ps.plan = (Plan *) node;
71  gatherstate->ps.state = estate;
72  gatherstate->need_to_scan_locally = !node->single_copy;
73 
74  /*
75  * Miscellaneous initialization
76  *
77  * create expression context for node
78  */
79  ExecAssignExprContext(estate, &gatherstate->ps);
80 
81  /*
82  * initialize child expressions
83  */
84  gatherstate->ps.targetlist = (List *)
85  ExecInitExpr((Expr *) node->plan.targetlist,
86  (PlanState *) gatherstate);
87  gatherstate->ps.qual = (List *)
88  ExecInitExpr((Expr *) node->plan.qual,
89  (PlanState *) gatherstate);
90 
91  /*
92  * tuple table initialization
93  */
94  gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate);
95  ExecInitResultTupleSlot(estate, &gatherstate->ps);
96 
97  /*
98  * now initialize outer plan
99  */
100  outerNode = outerPlan(node);
101  outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
102 
103  /*
104  * Initialize result tuple type and projection info.
105  */
106  ExecAssignResultTypeFromTL(&gatherstate->ps);
107  ExecAssignProjectionInfo(&gatherstate->ps, NULL);
108 
109  /*
110  * Initialize funnel slot to same tuple descriptor as outer plan.
111  */
112  if (!ExecContextForcesOids(&gatherstate->ps, &hasoid))
113  hasoid = false;
114  tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
115  ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
116 
117  return gatherstate;
118 }
List * qual
Definition: plannodes.h:130
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
List * qual
Definition: execnodes.h:1061
bool need_to_scan_locally
Definition: execnodes.h:2011
List * targetlist
Definition: execnodes.h:1060
EState * state
Definition: execnodes.h:1048
void ExecAssignResultTypeFromTL(PlanState *planstate)
Definition: execUtils.c:429
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execQual.c:4266
Plan plan
Definition: plannodes.h:782
bool single_copy
Definition: plannodes.h:784
void ExecInitResultTupleSlot(EState *estate, PlanState *planstate)
Definition: execTuples.c:832
#define outerPlanState(node)
Definition: execnodes.h:1089
#define innerPlan(node)
Definition: plannodes.h:158
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
Definition: execUtils.c:656
PlanState ps
Definition: execnodes.h:2003
TupleDesc ExecTypeFromTL(List *targetList, bool hasoid)
Definition: execTuples.c:888
#define outerPlan(node)
Definition: plannodes.h:159
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
TupleTableSlot * funnel_slot
Definition: execnodes.h:2010
Plan * plan
Definition: execnodes.h:1046
#define makeNode(_type_)
Definition: nodes.h:556
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:670
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:407
List * targetlist
Definition: plannodes.h:129
bool ExecContextForcesOids(PlanState *planstate, bool *hasoids)
Definition: execMain.c:1404
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:137
Definition: pg_list.h:45
void ExecReScanGather ( GatherState node)

Definition at line 428 of file nodeGather.c.

References ExecParallelReinitialize(), ExecReScan(), ExecShutdownGatherWorkers(), GatherState::initialized, PlanState::lefttree, GatherState::pei, and GatherState::ps.

Referenced by ExecReScan().

429 {
430  /*
431  * Re-initialize the parallel workers to perform rescan of relation. We
432  * want to gracefully shutdown all the workers so that they should be able
433  * to propagate any error or other information to master backend before
434  * dying. Parallel context will be reused for rescan.
435  */
437 
438  node->initialized = false;
439 
440  if (node->pei)
442 
443  ExecReScan(node->ps.lefttree);
444 }
void ExecReScan(PlanState *node)
Definition: execAmi.c:73
struct PlanState * lefttree
Definition: execnodes.h:1062
bool initialized
Definition: execnodes.h:2004
static void ExecShutdownGatherWorkers(GatherState *node)
Definition: nodeGather.c:376
struct ParallelExecutorInfo * pei
Definition: execnodes.h:2005
PlanState ps
Definition: execnodes.h:2003
void ExecParallelReinitialize(ParallelExecutorInfo *pei)
Definition: execParallel.c:343
void ExecShutdownGather ( GatherState node)

Definition at line 404 of file nodeGather.c.

References ExecParallelCleanup(), ExecShutdownGatherWorkers(), NULL, and GatherState::pei.

Referenced by ExecEndGather(), and ExecShutdownNode().

405 {
407 
408  /* Now destroy the parallel context. */
409  if (node->pei != NULL)
410  {
411  ExecParallelCleanup(node->pei);
412  node->pei = NULL;
413  }
414 }
static void ExecShutdownGatherWorkers(GatherState *node)
Definition: nodeGather.c:376
struct ParallelExecutorInfo * pei
Definition: execnodes.h:2005
void ExecParallelCleanup(ParallelExecutorInfo *pei)
Definition: execParallel.c:621
#define NULL
Definition: c.h:226
static void ExecShutdownGatherWorkers ( GatherState node)
static

Definition at line 376 of file nodeGather.c.

References DestroyTupleQueueReader(), ExecParallelFinish(), i, GatherState::nreaders, NULL, GatherState::pei, pfree(), and GatherState::reader.

Referenced by ExecGather(), ExecReScanGather(), ExecShutdownGather(), and gather_readnext().

377 {
378  /* Shut down tuple queue readers before shutting down workers. */
379  if (node->reader != NULL)
380  {
381  int i;
382 
383  for (i = 0; i < node->nreaders; ++i)
385 
386  pfree(node->reader);
387  node->reader = NULL;
388  }
389 
390  /* Now shut down the workers. */
391  if (node->pei != NULL)
392  ExecParallelFinish(node->pei);
393 }
void DestroyTupleQueueReader(TupleQueueReader *reader)
Definition: tqueue.c:651
void ExecParallelFinish(ParallelExecutorInfo *pei)
Definition: execParallel.c:592
struct TupleQueueReader ** reader
Definition: execnodes.h:2009
void pfree(void *pointer)
Definition: mcxt.c:992
struct ParallelExecutorInfo * pei
Definition: execnodes.h:2005
#define NULL
Definition: c.h:226
int i
static TupleTableSlot * gather_getnext ( GatherState gatherstate)
static

Definition at line 244 of file nodeGather.c.

References ExprContext::ecxt_per_tuple_memory, ExecClearTuple(), ExecProcNode(), ExecStoreTuple(), GatherState::funnel_slot, gather_readnext(), HeapTupleIsValid, InvalidBuffer, MemoryContextSwitchTo(), GatherState::need_to_scan_locally, NULL, outerPlan, outerPlanState, GatherState::ps, PlanState::ps_ExprContext, GatherState::reader, and TupIsNull.

Referenced by ExecGather().

245 {
246  PlanState *outerPlan = outerPlanState(gatherstate);
247  TupleTableSlot *outerTupleSlot;
248  TupleTableSlot *fslot = gatherstate->funnel_slot;
249  MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
250  HeapTuple tup;
251 
252  while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally)
253  {
254  if (gatherstate->reader != NULL)
255  {
256  MemoryContext oldContext;
257 
258  /* Run TupleQueueReaders in per-tuple context */
259  oldContext = MemoryContextSwitchTo(tupleContext);
260  tup = gather_readnext(gatherstate);
261  MemoryContextSwitchTo(oldContext);
262 
263  if (HeapTupleIsValid(tup))
264  {
265  ExecStoreTuple(tup, /* tuple to store */
266  fslot, /* slot in which to store the tuple */
267  InvalidBuffer, /* buffer associated with this
268  * tuple */
269  false); /* slot should not pfree tuple */
270  return fslot;
271  }
272  }
273 
274  if (gatherstate->need_to_scan_locally)
275  {
276  outerTupleSlot = ExecProcNode(outerPlan);
277 
278  if (!TupIsNull(outerTupleSlot))
279  return outerTupleSlot;
280 
281  gatherstate->need_to_scan_locally = false;
282  }
283  }
284 
285  return ExecClearTuple(fslot);
286 }
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:320
TupleTableSlot * ExecProcNode(PlanState *node)
Definition: execProcnode.c:380
ExprContext * ps_ExprContext
Definition: execnodes.h:1077
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:134
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define InvalidBuffer
Definition: buf.h:25
bool need_to_scan_locally
Definition: execnodes.h:2011
struct TupleQueueReader ** reader
Definition: execnodes.h:2009
#define outerPlanState(node)
Definition: execnodes.h:1089
#define TupIsNull(slot)
Definition: tuptable.h:138
PlanState ps
Definition: execnodes.h:2003
static HeapTuple gather_readnext(GatherState *gatherstate)
Definition: nodeGather.c:292
#define outerPlan(node)
Definition: plannodes.h:159
TupleTableSlot * funnel_slot
Definition: execnodes.h:2010
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define NULL
Definition: c.h:226
static HeapTuple gather_readnext ( GatherState gatherstate)
static

Definition at line 292 of file nodeGather.c.

References Assert, CHECK_FOR_INTERRUPTS, DestroyTupleQueueReader(), ExecShutdownGatherWorkers(), memmove, MyLatch, GatherState::need_to_scan_locally, GatherState::nextreader, GatherState::nreaders, NULL, GatherState::reader, ResetLatch(), TupleQueueReaderNext(), WAIT_EVENT_EXECUTE_GATHER, WaitLatch(), and WL_LATCH_SET.

Referenced by gather_getnext().

293 {
294  int nvisited = 0;
295 
296  for (;;)
297  {
298  TupleQueueReader *reader;
299  HeapTuple tup;
300  bool readerdone;
301 
302  /* Check for async events, particularly messages from workers. */
304 
305  /* Attempt to read a tuple, but don't block if none is available. */
306  Assert(gatherstate->nextreader < gatherstate->nreaders);
307  reader = gatherstate->reader[gatherstate->nextreader];
308  tup = TupleQueueReaderNext(reader, true, &readerdone);
309 
310  /*
311  * If this reader is done, remove it. If all readers are done, clean
312  * up remaining worker state.
313  */
314  if (readerdone)
315  {
316  Assert(!tup);
317  DestroyTupleQueueReader(reader);
318  --gatherstate->nreaders;
319  if (gatherstate->nreaders == 0)
320  {
321  ExecShutdownGatherWorkers(gatherstate);
322  return NULL;
323  }
324  memmove(&gatherstate->reader[gatherstate->nextreader],
325  &gatherstate->reader[gatherstate->nextreader + 1],
326  sizeof(TupleQueueReader *)
327  * (gatherstate->nreaders - gatherstate->nextreader));
328  if (gatherstate->nextreader >= gatherstate->nreaders)
329  gatherstate->nextreader = 0;
330  continue;
331  }
332 
333  /* If we got a tuple, return it. */
334  if (tup)
335  return tup;
336 
337  /*
338  * Advance nextreader pointer in round-robin fashion. Note that we
339  * only reach this code if we weren't able to get a tuple from the
340  * current worker. We used to advance the nextreader pointer after
341  * every tuple, but it turns out to be much more efficient to keep
342  * reading from the same queue until that would require blocking.
343  */
344  gatherstate->nextreader++;
345  if (gatherstate->nextreader >= gatherstate->nreaders)
346  gatherstate->nextreader = 0;
347 
348  /* Have we visited every (surviving) TupleQueueReader? */
349  nvisited++;
350  if (nvisited >= gatherstate->nreaders)
351  {
352  /*
353  * If (still) running plan locally, return NULL so caller can
354  * generate another tuple from the local copy of the plan.
355  */
356  if (gatherstate->need_to_scan_locally)
357  return NULL;
358 
359  /* Nothing to do except wait for developments. */
362  nvisited = 0;
363  }
364  }
365 }
void DestroyTupleQueueReader(TupleQueueReader *reader)
Definition: tqueue.c:651
bool need_to_scan_locally
Definition: execnodes.h:2011
HeapTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Definition: tqueue.c:679
struct TupleQueueReader ** reader
Definition: execnodes.h:2009
void ResetLatch(volatile Latch *latch)
Definition: latch.c:462
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:301
static void ExecShutdownGatherWorkers(GatherState *node)
Definition: nodeGather.c:376
#define memmove(d, s, c)
Definition: c.h:1057
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:670
int nextreader
Definition: execnodes.h:2007
struct Latch * MyLatch
Definition: globals.c:51
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
#define WL_LATCH_SET
Definition: latch.h:124