PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
nodeGather.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeGather.c
4  * Support routines for scanning a plan via multiple workers.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * A Gather executor launches parallel workers to run multiple copies of a
10  * plan. It can also run the plan itself, if the workers are not available
11  * or have not started up yet. It then merges all of the results it produces
12  * and the results from the workers into a single output stream. Therefore,
13  * it will normally be used with a plan where running multiple copies of the
14  * same plan does not produce duplicate output, such as parallel-aware
15  * SeqScan.
16  *
17  * Alternatively, a Gather node can be configured to use just one worker
18  * and the single-copy flag can be set. In this case, the Gather node will
19  * run the plan in one worker and will not execute the plan itself. In
20  * this case, it simply returns whatever tuples were returned by the worker.
21  * If a worker cannot be obtained, then it will run the plan itself and
22  * return the results. Therefore, a plan used with a single-copy Gather
23  * node need not be parallel-aware.
24  *
25  * IDENTIFICATION
26  * src/backend/executor/nodeGather.c
27  *
28  *-------------------------------------------------------------------------
29  */
30 
31 #include "postgres.h"
32 
33 #include "access/relscan.h"
34 #include "access/xact.h"
35 #include "executor/execdebug.h"
36 #include "executor/execParallel.h"
37 #include "executor/nodeGather.h"
38 #include "executor/nodeSubplan.h"
39 #include "executor/tqueue.h"
40 #include "miscadmin.h"
41 #include "pgstat.h"
42 #include "utils/memutils.h"
43 #include "utils/rel.h"
44 
45 
46 static TupleTableSlot *ExecGather(PlanState *pstate);
47 static TupleTableSlot *gather_getnext(GatherState *gatherstate);
48 static HeapTuple gather_readnext(GatherState *gatherstate);
49 static void ExecShutdownGatherWorkers(GatherState *node);
50 
51 
52 /* ----------------------------------------------------------------
53  * ExecInitGather
54  * ----------------------------------------------------------------
55  */
57 ExecInitGather(Gather *node, EState *estate, int eflags)
58 {
59  GatherState *gatherstate;
60  Plan *outerNode;
61  bool hasoid;
62  TupleDesc tupDesc;
63 
64  /* Gather node doesn't have innerPlan node. */
65  Assert(innerPlan(node) == NULL);
66 
67  /*
68  * create state structure
69  */
70  gatherstate = makeNode(GatherState);
71  gatherstate->ps.plan = (Plan *) node;
72  gatherstate->ps.state = estate;
73  gatherstate->ps.ExecProcNode = ExecGather;
74 
75  gatherstate->initialized = false;
76  gatherstate->need_to_scan_locally = !node->single_copy;
77  gatherstate->tuples_needed = -1;
78 
79  /*
80  * Miscellaneous initialization
81  *
82  * create expression context for node
83  */
84  ExecAssignExprContext(estate, &gatherstate->ps);
85 
86  /*
87  * Gather doesn't support checking a qual (it's always more efficient to
88  * do it in the child node).
89  */
90  Assert(!node->plan.qual);
91 
92  /*
93  * tuple table initialization
94  */
95  gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate);
96  ExecInitResultTupleSlot(estate, &gatherstate->ps);
97 
98  /*
99  * now initialize outer plan
100  */
101  outerNode = outerPlan(node);
102  outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
103 
104  /*
105  * Initialize result tuple type and projection info.
106  */
107  ExecAssignResultTypeFromTL(&gatherstate->ps);
108  ExecAssignProjectionInfo(&gatherstate->ps, NULL);
109 
110  /*
111  * Initialize funnel slot to same tuple descriptor as outer plan.
112  */
113  if (!ExecContextForcesOids(&gatherstate->ps, &hasoid))
114  hasoid = false;
115  tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
116  ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
117 
118  return gatherstate;
119 }
120 
121 /* ----------------------------------------------------------------
122  * ExecGather(node)
123  *
124  * Scans the relation via multiple workers and returns
125  * the next qualifying tuple.
126  * ----------------------------------------------------------------
127  */
128 static TupleTableSlot *
130 {
131  GatherState *node = castNode(GatherState, pstate);
132  TupleTableSlot *fslot = node->funnel_slot;
133  TupleTableSlot *slot;
134  ExprContext *econtext;
135 
137 
138  /*
139  * Initialize the parallel context and workers on first execution. We do
140  * this on first execution rather than during node initialization, as it
141  * needs to allocate a large dynamic segment, so it is better to do it
142  * only if it is really needed.
143  */
144  if (!node->initialized)
145  {
146  EState *estate = node->ps.state;
147  Gather *gather = (Gather *) node->ps.plan;
148 
149  /*
150  * Sometimes we might have to run without parallelism; but if parallel
151  * mode is active then we can try to fire up some workers.
152  */
153  if (gather->num_workers > 0 && IsInParallelMode())
154  {
155  ParallelContext *pcxt;
156 
157  /* Initialize, or re-initialize, shared state needed by workers. */
158  if (!node->pei)
159  node->pei = ExecInitParallelPlan(node->ps.lefttree,
160  estate,
161  gather->num_workers,
162  node->tuples_needed);
163  else
165  node->pei);
166 
167  /*
168  * Register backend workers. We might not get as many as we
169  * requested, or indeed any at all.
170  */
171  pcxt = node->pei->pcxt;
172  LaunchParallelWorkers(pcxt);
173  /* We save # workers launched for the benefit of EXPLAIN */
174  node->nworkers_launched = pcxt->nworkers_launched;
175 
176  /* Set up tuple queue readers to read the results. */
177  if (pcxt->nworkers_launched > 0)
178  {
180  /* Make a working array showing the active readers */
181  node->nreaders = pcxt->nworkers_launched;
182  node->reader = (TupleQueueReader **)
183  palloc(node->nreaders * sizeof(TupleQueueReader *));
184  memcpy(node->reader, node->pei->reader,
185  node->nreaders * sizeof(TupleQueueReader *));
186  }
187  else
188  {
189  /* No workers? Then never mind. */
190  node->nreaders = 0;
191  node->reader = NULL;
192  }
193  node->nextreader = 0;
194  }
195 
196  /* Run plan locally if no workers or not single-copy. */
197  node->need_to_scan_locally = (node->nreaders == 0)
198  || !gather->single_copy;
199  node->initialized = true;
200  }
201 
202  /*
203  * Reset per-tuple memory context to free any expression evaluation
204  * storage allocated in the previous tuple cycle. This will also clear
205  * any previous tuple returned by a TupleQueueReader; to make sure we
206  * don't leave a dangling pointer around, clear the working slot first.
207  */
208  ExecClearTuple(fslot);
209  econtext = node->ps.ps_ExprContext;
210  ResetExprContext(econtext);
211 
212  /*
213  * Get next tuple, either from one of our workers, or by running the plan
214  * ourselves.
215  */
216  slot = gather_getnext(node);
217  if (TupIsNull(slot))
218  return NULL;
219 
220  /*
221  * Form the result tuple using ExecProject(), and return it.
222  */
223  econtext->ecxt_outertuple = slot;
224  return ExecProject(node->ps.ps_ProjInfo);
225 }
226 
227 /* ----------------------------------------------------------------
228  * ExecEndGather
229  *
230  * frees any storage allocated through C routines.
231  * ----------------------------------------------------------------
232  */
233 void
235 {
236  ExecEndNode(outerPlanState(node)); /* let children clean up first */
237  ExecShutdownGather(node);
238  ExecFreeExprContext(&node->ps);
240 }
241 
242 /*
243  * Read the next tuple. We might fetch a tuple from one of the tuple queues
244  * using gather_readnext, or if no tuple queue contains a tuple and the
245  * single_copy flag is not set, we might generate one locally instead.
246  */
247 static TupleTableSlot *
249 {
250  PlanState *outerPlan = outerPlanState(gatherstate);
251  TupleTableSlot *outerTupleSlot;
252  TupleTableSlot *fslot = gatherstate->funnel_slot;
253  MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
254  HeapTuple tup;
255 
256  while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
257  {
259 
260  if (gatherstate->nreaders > 0)
261  {
262  MemoryContext oldContext;
263 
264  /* Run TupleQueueReaders in per-tuple context */
265  oldContext = MemoryContextSwitchTo(tupleContext);
266  tup = gather_readnext(gatherstate);
267  MemoryContextSwitchTo(oldContext);
268 
269  if (HeapTupleIsValid(tup))
270  {
271  ExecStoreTuple(tup, /* tuple to store */
272  fslot, /* slot in which to store the tuple */
273  InvalidBuffer, /* buffer associated with this
274  * tuple */
275  false); /* slot should not pfree tuple */
276  return fslot;
277  }
278  }
279 
280  if (gatherstate->need_to_scan_locally)
281  {
282  outerTupleSlot = ExecProcNode(outerPlan);
283 
284  if (!TupIsNull(outerTupleSlot))
285  return outerTupleSlot;
286 
287  gatherstate->need_to_scan_locally = false;
288  }
289  }
290 
291  return ExecClearTuple(fslot);
292 }
293 
294 /*
295  * Attempt to read a tuple from one of our parallel workers.
296  */
297 static HeapTuple
299 {
300  int nvisited = 0;
301 
302  for (;;)
303  {
304  TupleQueueReader *reader;
305  HeapTuple tup;
306  bool readerdone;
307 
308  /* Check for async events, particularly messages from workers. */
310 
311  /* Attempt to read a tuple, but don't block if none is available. */
312  Assert(gatherstate->nextreader < gatherstate->nreaders);
313  reader = gatherstate->reader[gatherstate->nextreader];
314  tup = TupleQueueReaderNext(reader, true, &readerdone);
315 
316  /*
317  * If this reader is done, remove it from our working array of active
318  * readers. If all readers are done, we're outta here.
319  */
320  if (readerdone)
321  {
322  Assert(!tup);
323  --gatherstate->nreaders;
324  if (gatherstate->nreaders == 0)
325  return NULL;
326  memmove(&gatherstate->reader[gatherstate->nextreader],
327  &gatherstate->reader[gatherstate->nextreader + 1],
328  sizeof(TupleQueueReader *)
329  * (gatherstate->nreaders - gatherstate->nextreader));
330  if (gatherstate->nextreader >= gatherstate->nreaders)
331  gatherstate->nextreader = 0;
332  continue;
333  }
334 
335  /* If we got a tuple, return it. */
336  if (tup)
337  return tup;
338 
339  /*
340  * Advance nextreader pointer in round-robin fashion. Note that we
341  * only reach this code if we weren't able to get a tuple from the
342  * current worker. We used to advance the nextreader pointer after
343  * every tuple, but it turns out to be much more efficient to keep
344  * reading from the same queue until that would require blocking.
345  */
346  gatherstate->nextreader++;
347  if (gatherstate->nextreader >= gatherstate->nreaders)
348  gatherstate->nextreader = 0;
349 
350  /* Have we visited every (surviving) TupleQueueReader? */
351  nvisited++;
352  if (nvisited >= gatherstate->nreaders)
353  {
354  /*
355  * If (still) running plan locally, return NULL so caller can
356  * generate another tuple from the local copy of the plan.
357  */
358  if (gatherstate->need_to_scan_locally)
359  return NULL;
360 
361  /* Nothing to do except wait for developments. */
364  nvisited = 0;
365  }
366  }
367 }
368 
369 /* ----------------------------------------------------------------
370  * ExecShutdownGatherWorkers
371  *
372  * Stop all the parallel workers.
373  * ----------------------------------------------------------------
374  */
375 static void
377 {
378  if (node->pei != NULL)
379  ExecParallelFinish(node->pei);
380 
381  /* Flush local copy of reader array */
382  if (node->reader)
383  pfree(node->reader);
384  node->reader = NULL;
385 }
386 
387 /* ----------------------------------------------------------------
388  * ExecShutdownGather
389  *
390  * Destroy the setup for parallel workers including parallel context.
391  * ----------------------------------------------------------------
392  */
393 void
395 {
397 
398  /* Now destroy the parallel context. */
399  if (node->pei != NULL)
400  {
401  ExecParallelCleanup(node->pei);
402  node->pei = NULL;
403  }
404 }
405 
406 /* ----------------------------------------------------------------
407  * Join Support
408  * ----------------------------------------------------------------
409  */
410 
411 /* ----------------------------------------------------------------
412  * ExecReScanGather
413  *
414  * Prepare to re-scan the result of a Gather.
415  * ----------------------------------------------------------------
416  */
417 void
419 {
420  Gather *gather = (Gather *) node->ps.plan;
422 
423  /* Make sure any existing workers are gracefully shut down */
425 
426  /* Mark node so that shared state will be rebuilt at next call */
427  node->initialized = false;
428 
429  /*
430  * Set child node's chgParam to tell it that the next scan might deliver a
431  * different set of rows within the leader process. (The overall rowset
432  * shouldn't change, but the leader process's subset might; hence nodes
433  * between here and the parallel table scan node mustn't optimize on the
434  * assumption of an unchanging rowset.)
435  */
436  if (gather->rescan_param >= 0)
437  outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
438  gather->rescan_param);
439 
440  /*
441  * If chgParam of subnode is not null then plan will be re-scanned by
442  * first ExecProcNode. Note: because this does nothing if we have a
443  * rescan_param, it's currently guaranteed that parallel-aware child nodes
444  * will not see a ReScan call until after they get a ReInitializeDSM call.
445  * That ordering might not be something to rely on, though. A good rule
446  * of thumb is that ReInitializeDSM should reset only shared state, ReScan
447  * should reset only local state, and anything that depends on both of
448  * those steps being finished must wait until the first ExecProcNode call.
449  */
450  if (outerPlan->chgParam == NULL)
452 }
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:320
List * qual
Definition: plannodes.h:145
ParallelContext * pcxt
Definition: execParallel.h:27
static TupleTableSlot * ExecGather(PlanState *pstate)
Definition: nodeGather.c:129
void ExecParallelReinitialize(PlanState *planstate, ParallelExecutorInfo *pei)
Definition: execParallel.c:642
int nworkers_launched
Definition: execnodes.h:1935
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:882
void ExecParallelFinish(ParallelExecutorInfo *pei)
Definition: execParallel.c:774
#define castNode(_type_, nodeptr)
Definition: nodes.h:579
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:523
ExprContext * ps_ExprContext
Definition: execnodes.h:881
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:203
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define InvalidBuffer
Definition: buf.h:25
bool need_to_scan_locally
Definition: execnodes.h:1929
HeapTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Definition: tqueue.c:170
struct TupleQueueReader ** reader
Definition: execnodes.h:1938
EState * state
Definition: execnodes.h:849
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
GatherState * ExecInitGather(Gather *node, EState *estate, int eflags)
Definition: nodeGather.c:57
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:521
void ExecAssignResultTypeFromTL(PlanState *planstate)
Definition: execUtils.c:445
struct PlanState * lefttree
Definition: execnodes.h:866
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:880
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:336
void pfree(void *pointer)
Definition: mcxt.c:949
bool IsInParallelMode(void)
Definition: xact.c:906
bool initialized
Definition: execnodes.h:1928
static void ExecShutdownGatherWorkers(GatherState *node)
Definition: nodeGather.c:376
Plan plan
Definition: plannodes.h:839
bool single_copy
Definition: plannodes.h:842
void ExecInitResultTupleSlot(EState *estate, PlanState *planstate)
Definition: execTuples.c:832
struct ParallelExecutorInfo * pei
Definition: execnodes.h:1933
#define outerPlanState(node)
Definition: execnodes.h:893
#define innerPlan(node)
Definition: plannodes.h:173
void ExecParallelCreateReaders(ParallelExecutorInfo *pei)
Definition: execParallel.c:616
#define memmove(d, s, c)
Definition: c.h:1064
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
Definition: execUtils.c:492
struct TupleQueueReader ** reader
Definition: execParallel.h:34
#define TupIsNull(slot)
Definition: tuptable.h:138
PlanState ps
Definition: execnodes.h:1927
int nworkers_launched
Definition: parallel.h:37
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition: parallel.c:446
static HeapTuple gather_readnext(GatherState *gatherstate)
Definition: nodeGather.c:298
TupleDesc ExecTypeFromTL(List *targetList, bool hasoid)
Definition: execTuples.c:888
#define outerPlan(node)
Definition: plannodes.h:174
void ExecReScanGather(GatherState *node)
Definition: nodeGather.c:418
void ExecShutdownGather(GatherState *node)
Definition: nodeGather.c:394
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:853
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:246
TupleTableSlot * funnel_slot
Definition: execnodes.h:1932
Plan * plan
Definition: execnodes.h:847
int num_workers
Definition: plannodes.h:840
#define makeNode(_type_)
Definition: nodes.h:558
void ExecParallelCleanup(ParallelExecutorInfo *pei)
Definition: execParallel.c:832
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:199
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define Assert(condition)
Definition: c.h:681
int rescan_param
Definition: plannodes.h:841
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:423
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:698
List * targetlist
Definition: plannodes.h:144
static TupleTableSlot * gather_getnext(GatherState *gatherstate)
Definition: nodeGather.c:248
int nextreader
Definition: execnodes.h:1937
void * palloc(Size size)
Definition: mcxt.c:848
ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, int64 tuples_needed)
Definition: execParallel.c:398
struct Latch * MyLatch
Definition: globals.c:52
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
bool ExecContextForcesOids(PlanState *planstate, bool *hasoids)
Definition: execMain.c:1515
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:139
#define WL_LATCH_SET
Definition: latch.h:124
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
Definition: executor.h:329
void ExecEndGather(GatherState *node)
Definition: nodeGather.c:234
#define ResetExprContext(econtext)
Definition: executor.h:471
int64 tuples_needed
Definition: execnodes.h:1930