PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
nodeGather.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeGather.c
4  * Support routines for scanning a plan via multiple workers.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * A Gather executor launches parallel workers to run multiple copies of a
10  * plan. It can also run the plan itself, if the workers are not available
11  * or have not started up yet. It then merges all of the results it produces
12  * and the results from the workers into a single output stream. Therefore,
13  * it will normally be used with a plan where running multiple copies of the
14  * same plan does not produce duplicate output, such as parallel-aware
15  * SeqScan.
16  *
17  * Alternatively, a Gather node can be configured to use just one worker
18  * and the single-copy flag can be set. In this case, the Gather node will
19  * run the plan in one worker and will not execute the plan itself. In
20  * this case, it simply returns whatever tuples were returned by the worker.
21  * If a worker cannot be obtained, then it will run the plan itself and
22  * return the results. Therefore, a plan used with a single-copy Gather
23  * node need not be parallel-aware.
24  *
25  * IDENTIFICATION
26  * src/backend/executor/nodeGather.c
27  *
28  *-------------------------------------------------------------------------
29  */
30 
31 #include "postgres.h"
32 
33 #include "access/relscan.h"
34 #include "access/xact.h"
35 #include "executor/execdebug.h"
36 #include "executor/execParallel.h"
37 #include "executor/nodeGather.h"
38 #include "executor/nodeSubplan.h"
39 #include "executor/tqueue.h"
40 #include "miscadmin.h"
41 #include "pgstat.h"
42 #include "utils/memutils.h"
43 #include "utils/rel.h"
44 
45 
46 static TupleTableSlot *gather_getnext(GatherState *gatherstate);
47 static HeapTuple gather_readnext(GatherState *gatherstate);
48 static void ExecShutdownGatherWorkers(GatherState *node);
49 
50 
51 /* ----------------------------------------------------------------
52  * ExecInitGather
53  * ----------------------------------------------------------------
54  */
56 ExecInitGather(Gather *node, EState *estate, int eflags)
57 {
58  GatherState *gatherstate;
59  Plan *outerNode;
60  bool hasoid;
61  TupleDesc tupDesc;
62 
63  /* Gather node doesn't have innerPlan node. */
64  Assert(innerPlan(node) == NULL);
65 
66  /*
67  * create state structure
68  */
69  gatherstate = makeNode(GatherState);
70  gatherstate->ps.plan = (Plan *) node;
71  gatherstate->ps.state = estate;
72  gatherstate->need_to_scan_locally = !node->single_copy;
73 
74  /*
75  * Miscellaneous initialization
76  *
77  * create expression context for node
78  */
79  ExecAssignExprContext(estate, &gatherstate->ps);
80 
81  /*
82  * initialize child expressions
83  */
84  gatherstate->ps.qual =
85  ExecInitQual(node->plan.qual, (PlanState *) gatherstate);
86 
87  /*
88  * tuple table initialization
89  */
90  gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate);
91  ExecInitResultTupleSlot(estate, &gatherstate->ps);
92 
93  /*
94  * now initialize outer plan
95  */
96  outerNode = outerPlan(node);
97  outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
98 
99  /*
100  * Initialize result tuple type and projection info.
101  */
102  ExecAssignResultTypeFromTL(&gatherstate->ps);
103  ExecAssignProjectionInfo(&gatherstate->ps, NULL);
104 
105  /*
106  * Initialize funnel slot to same tuple descriptor as outer plan.
107  */
108  if (!ExecContextForcesOids(&gatherstate->ps, &hasoid))
109  hasoid = false;
110  tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
111  ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
112 
113  return gatherstate;
114 }
115 
116 /* ----------------------------------------------------------------
117  * ExecGather(node)
118  *
119  * Scans the relation via multiple workers and returns
120  * the next qualifying tuple.
121  * ----------------------------------------------------------------
122  */
125 {
126  TupleTableSlot *fslot = node->funnel_slot;
127  int i;
128  TupleTableSlot *slot;
129  ExprContext *econtext;
130 
131  /*
132  * Initialize the parallel context and workers on first execution. We do
133  * this on first execution rather than during node initialization, as it
134  * needs to allocate a large dynamic segment, so it is better to do it
135  * only if it is really needed.
136  */
137  if (!node->initialized)
138  {
139  EState *estate = node->ps.state;
140  Gather *gather = (Gather *) node->ps.plan;
141 
142  /*
143  * Sometimes we might have to run without parallelism; but if parallel
144  * mode is active then we can try to fire up some workers.
145  */
146  if (gather->num_workers > 0 && IsInParallelMode())
147  {
148  ParallelContext *pcxt;
149 
150  /* Initialize the workers required to execute Gather node. */
151  if (!node->pei)
152  node->pei = ExecInitParallelPlan(node->ps.lefttree,
153  estate,
154  gather->num_workers);
155 
156  /*
157  * Register backend workers. We might not get as many as we
158  * requested, or indeed any at all.
159  */
160  pcxt = node->pei->pcxt;
161  LaunchParallelWorkers(pcxt);
162  node->nworkers_launched = pcxt->nworkers_launched;
163 
164  /* Set up tuple queue readers to read the results. */
165  if (pcxt->nworkers_launched > 0)
166  {
167  node->nreaders = 0;
168  node->nextreader = 0;
169  node->reader =
170  palloc(pcxt->nworkers_launched * sizeof(TupleQueueReader *));
171 
172  for (i = 0; i < pcxt->nworkers_launched; ++i)
173  {
174  shm_mq_set_handle(node->pei->tqueue[i],
175  pcxt->worker[i].bgwhandle);
176  node->reader[node->nreaders++] =
178  fslot->tts_tupleDescriptor);
179  }
180  }
181  else
182  {
183  /* No workers? Then never mind. */
185  }
186  }
187 
188  /* Run plan locally if no workers or not single-copy. */
189  node->need_to_scan_locally = (node->reader == NULL)
190  || !gather->single_copy;
191  node->initialized = true;
192  }
193 
194  /*
195  * Reset per-tuple memory context to free any expression evaluation
196  * storage allocated in the previous tuple cycle. This will also clear
197  * any previous tuple returned by a TupleQueueReader; to make sure we
198  * don't leave a dangling pointer around, clear the working slot first.
199  */
200  ExecClearTuple(fslot);
201  econtext = node->ps.ps_ExprContext;
202  ResetExprContext(econtext);
203 
204  /*
205  * Get next tuple, either from one of our workers, or by running the plan
206  * ourselves.
207  */
208  slot = gather_getnext(node);
209  if (TupIsNull(slot))
210  return NULL;
211 
212  /*
213  * Form the result tuple using ExecProject(), and return it.
214  */
215  econtext->ecxt_outertuple = slot;
216  return ExecProject(node->ps.ps_ProjInfo);
217 }
218 
219 /* ----------------------------------------------------------------
220  * ExecEndGather
221  *
222  * frees any storage allocated through C routines.
223  * ----------------------------------------------------------------
224  */
225 void
227 {
228  ExecEndNode(outerPlanState(node)); /* let children clean up first */
229  ExecShutdownGather(node);
230  ExecFreeExprContext(&node->ps);
232 }
233 
234 /*
235  * Read the next tuple. We might fetch a tuple from one of the tuple queues
236  * using gather_readnext, or if no tuple queue contains a tuple and the
237  * single_copy flag is not set, we might generate one locally instead.
238  */
239 static TupleTableSlot *
241 {
242  PlanState *outerPlan = outerPlanState(gatherstate);
243  TupleTableSlot *outerTupleSlot;
244  TupleTableSlot *fslot = gatherstate->funnel_slot;
245  MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
246  HeapTuple tup;
247 
248  while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally)
249  {
250  if (gatherstate->reader != NULL)
251  {
252  MemoryContext oldContext;
253 
254  /* Run TupleQueueReaders in per-tuple context */
255  oldContext = MemoryContextSwitchTo(tupleContext);
256  tup = gather_readnext(gatherstate);
257  MemoryContextSwitchTo(oldContext);
258 
259  if (HeapTupleIsValid(tup))
260  {
261  ExecStoreTuple(tup, /* tuple to store */
262  fslot, /* slot in which to store the tuple */
263  InvalidBuffer, /* buffer associated with this
264  * tuple */
265  false); /* slot should not pfree tuple */
266  return fslot;
267  }
268  }
269 
270  if (gatherstate->need_to_scan_locally)
271  {
272  outerTupleSlot = ExecProcNode(outerPlan);
273 
274  if (!TupIsNull(outerTupleSlot))
275  return outerTupleSlot;
276 
277  gatherstate->need_to_scan_locally = false;
278  }
279  }
280 
281  return ExecClearTuple(fslot);
282 }
283 
284 /*
285  * Attempt to read a tuple from one of our parallel workers.
286  */
287 static HeapTuple
289 {
290  int nvisited = 0;
291 
292  for (;;)
293  {
294  TupleQueueReader *reader;
295  HeapTuple tup;
296  bool readerdone;
297 
298  /* Check for async events, particularly messages from workers. */
300 
301  /* Attempt to read a tuple, but don't block if none is available. */
302  Assert(gatherstate->nextreader < gatherstate->nreaders);
303  reader = gatherstate->reader[gatherstate->nextreader];
304  tup = TupleQueueReaderNext(reader, true, &readerdone);
305 
306  /*
307  * If this reader is done, remove it. If all readers are done, clean
308  * up remaining worker state.
309  */
310  if (readerdone)
311  {
312  Assert(!tup);
313  DestroyTupleQueueReader(reader);
314  --gatherstate->nreaders;
315  if (gatherstate->nreaders == 0)
316  {
317  ExecShutdownGatherWorkers(gatherstate);
318  return NULL;
319  }
320  memmove(&gatherstate->reader[gatherstate->nextreader],
321  &gatherstate->reader[gatherstate->nextreader + 1],
322  sizeof(TupleQueueReader *)
323  * (gatherstate->nreaders - gatherstate->nextreader));
324  if (gatherstate->nextreader >= gatherstate->nreaders)
325  gatherstate->nextreader = 0;
326  continue;
327  }
328 
329  /* If we got a tuple, return it. */
330  if (tup)
331  return tup;
332 
333  /*
334  * Advance nextreader pointer in round-robin fashion. Note that we
335  * only reach this code if we weren't able to get a tuple from the
336  * current worker. We used to advance the nextreader pointer after
337  * every tuple, but it turns out to be much more efficient to keep
338  * reading from the same queue until that would require blocking.
339  */
340  gatherstate->nextreader++;
341  if (gatherstate->nextreader >= gatherstate->nreaders)
342  gatherstate->nextreader = 0;
343 
344  /* Have we visited every (surviving) TupleQueueReader? */
345  nvisited++;
346  if (nvisited >= gatherstate->nreaders)
347  {
348  /*
349  * If (still) running plan locally, return NULL so caller can
350  * generate another tuple from the local copy of the plan.
351  */
352  if (gatherstate->need_to_scan_locally)
353  return NULL;
354 
355  /* Nothing to do except wait for developments. */
358  nvisited = 0;
359  }
360  }
361 }
362 
363 /* ----------------------------------------------------------------
364  * ExecShutdownGatherWorkers
365  *
366  * Destroy the parallel workers. Collect all the stats after
367  * workers are stopped, else some work done by workers won't be
368  * accounted.
369  * ----------------------------------------------------------------
370  */
371 static void
373 {
374  /* Shut down tuple queue readers before shutting down workers. */
375  if (node->reader != NULL)
376  {
377  int i;
378 
379  for (i = 0; i < node->nreaders; ++i)
381 
382  pfree(node->reader);
383  node->reader = NULL;
384  }
385 
386  /* Now shut down the workers. */
387  if (node->pei != NULL)
388  ExecParallelFinish(node->pei);
389 }
390 
391 /* ----------------------------------------------------------------
392  * ExecShutdownGather
393  *
394  * Destroy the setup for parallel workers including parallel context.
395  * Collect all the stats after workers are stopped, else some work
396  * done by workers won't be accounted.
397  * ----------------------------------------------------------------
398  */
399 void
401 {
403 
404  /* Now destroy the parallel context. */
405  if (node->pei != NULL)
406  {
407  ExecParallelCleanup(node->pei);
408  node->pei = NULL;
409  }
410 }
411 
412 /* ----------------------------------------------------------------
413  * Join Support
414  * ----------------------------------------------------------------
415  */
416 
417 /* ----------------------------------------------------------------
418  * ExecReScanGather
419  *
420  * Re-initialize the workers and rescans a relation via them.
421  * ----------------------------------------------------------------
422  */
423 void
425 {
426  /*
427  * Re-initialize the parallel workers to perform rescan of relation. We
428  * want to gracefully shutdown all the workers so that they should be able
429  * to propagate any error or other information to master backend before
430  * dying. Parallel context will be reused for rescan.
431  */
433 
434  node->initialized = false;
435 
436  if (node->pei)
438 
439  ExecReScan(node->ps.lefttree);
440 }
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:320
List * qual
Definition: plannodes.h:145
TupleTableSlot * ExecProcNode(PlanState *node)
Definition: execProcnode.c:398
ParallelContext * pcxt
Definition: execParallel.h:27
void DestroyTupleQueueReader(TupleQueueReader *reader)
Definition: tqueue.c:651
int nworkers_launched
Definition: execnodes.h:1895
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:863
void ExecParallelFinish(ParallelExecutorInfo *pei)
Definition: execParallel.c:633
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:654
ExprContext * ps_ExprContext
Definition: execnodes.h:862
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:203
void ExecReScan(PlanState *node)
Definition: execAmi.c:75
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define InvalidBuffer
Definition: buf.h:25
bool need_to_scan_locally
Definition: execnodes.h:1898
HeapTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Definition: tqueue.c:679
struct TupleQueueReader ** reader
Definition: execnodes.h:1896
ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
Definition: execParallel.c:384
EState * state
Definition: execnodes.h:834
void ResetLatch(volatile Latch *latch)
Definition: latch.c:498
GatherState * ExecInitGather(Gather *node, EState *estate, int eflags)
Definition: nodeGather.c:56
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:516
ExprState * ExecInitQual(List *qual, PlanState *parent)
Definition: execExpr.c:160
void ExecAssignResultTypeFromTL(PlanState *planstate)
Definition: execUtils.c:440
struct PlanState * lefttree
Definition: execnodes.h:847
ParallelWorkerInfo * worker
Definition: parallel.h:45
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:861
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:336
void pfree(void *pointer)
Definition: mcxt.c:950
bool IsInParallelMode(void)
Definition: xact.c:913
bool initialized
Definition: execnodes.h:1891
static void ExecShutdownGatherWorkers(GatherState *node)
Definition: nodeGather.c:372
Plan plan
Definition: plannodes.h:832
bool single_copy
Definition: plannodes.h:834
void ExecInitResultTupleSlot(EState *estate, PlanState *planstate)
Definition: execTuples.c:832
struct ParallelExecutorInfo * pei
Definition: execnodes.h:1892
#define outerPlanState(node)
Definition: execnodes.h:874
#define innerPlan(node)
Definition: plannodes.h:173
#define memmove(d, s, c)
Definition: c.h:1058
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
Definition: execUtils.c:487
BackgroundWorkerHandle * bgwhandle
Definition: parallel.h:27
#define TupIsNull(slot)
Definition: tuptable.h:138
PlanState ps
Definition: execnodes.h:1890
int nworkers_launched
Definition: parallel.h:37
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition: parallel.c:417
static HeapTuple gather_readnext(GatherState *gatherstate)
Definition: nodeGather.c:288
TupleDesc ExecTypeFromTL(List *targetList, bool hasoid)
Definition: execTuples.c:888
#define outerPlan(node)
Definition: plannodes.h:174
void ExecReScanGather(GatherState *node)
Definition: nodeGather.c:424
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:121
void ExecShutdownGather(GatherState *node)
Definition: nodeGather.c:400
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
TupleTableSlot * funnel_slot
Definition: execnodes.h:1897
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:311
Plan * plan
Definition: execnodes.h:832
void ExecParallelReinitialize(ParallelExecutorInfo *pei)
Definition: execParallel.c:372
int num_workers
Definition: plannodes.h:833
#define makeNode(_type_)
Definition: nodes.h:557
void ExecParallelCleanup(ParallelExecutorInfo *pei)
Definition: execParallel.c:662
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:199
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:418
List * targetlist
Definition: plannodes.h:144
static TupleTableSlot * gather_getnext(GatherState *gatherstate)
Definition: nodeGather.c:240
ExprState * qual
Definition: execnodes.h:846
TupleTableSlot * ExecGather(GatherState *node)
Definition: nodeGather.c:124
int nextreader
Definition: execnodes.h:1894
void * palloc(Size size)
Definition: mcxt.c:849
int i
TupleQueueReader * CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
Definition: tqueue.c:633
struct Latch * MyLatch
Definition: globals.c:52
shm_mq_handle ** tqueue
Definition: execParallel.h:30
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
bool ExecContextForcesOids(PlanState *planstate, bool *hasoids)
Definition: execMain.c:1487
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:140
#define WL_LATCH_SET
Definition: latch.h:124
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
Definition: executor.h:309
void ExecEndGather(GatherState *node)
Definition: nodeGather.c:226
#define ResetExprContext(econtext)
Definition: executor.h:450