PostgreSQL Source Code  git master
nodeGather.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeGather.c
4  * Support routines for scanning a plan via multiple workers.
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * A Gather executor launches parallel workers to run multiple copies of a
10  * plan. It can also run the plan itself, if the workers are not available
11  * or have not started up yet. It then merges all of the results it produces
12  * and the results from the workers into a single output stream. Therefore,
13  * it will normally be used with a plan where running multiple copies of the
14  * same plan does not produce duplicate output, such as parallel-aware
15  * SeqScan.
16  *
17  * Alternatively, a Gather node can be configured to use just one worker
18  * and the single-copy flag can be set. In this case, the Gather node will
19  * run the plan in one worker and will not execute the plan itself. In
20  * this case, it simply returns whatever tuples were returned by the worker.
21  * If a worker cannot be obtained, then it will run the plan itself and
22  * return the results. Therefore, a plan used with a single-copy Gather
23  * node need not be parallel-aware.
24  *
25  * IDENTIFICATION
26  * src/backend/executor/nodeGather.c
27  *
28  *-------------------------------------------------------------------------
29  */
30 
31 #include "postgres.h"
32 
33 #include "access/relscan.h"
34 #include "access/xact.h"
35 #include "executor/execdebug.h"
36 #include "executor/execParallel.h"
37 #include "executor/nodeGather.h"
38 #include "executor/nodeSubplan.h"
39 #include "executor/tqueue.h"
40 #include "miscadmin.h"
41 #include "optimizer/planmain.h"
42 #include "pgstat.h"
43 #include "utils/memutils.h"
44 #include "utils/rel.h"
45 
46 
47 static TupleTableSlot *ExecGather(PlanState *pstate);
48 static TupleTableSlot *gather_getnext(GatherState *gatherstate);
49 static HeapTuple gather_readnext(GatherState *gatherstate);
50 static void ExecShutdownGatherWorkers(GatherState *node);
51 
52 
53 /* ----------------------------------------------------------------
54  * ExecInitGather
55  * ----------------------------------------------------------------
56  */
58 ExecInitGather(Gather *node, EState *estate, int eflags)
59 {
60  GatherState *gatherstate;
61  Plan *outerNode;
62  TupleDesc tupDesc;
63 
64  /* Gather node doesn't have innerPlan node. */
65  Assert(innerPlan(node) == NULL);
66 
67  /*
68  * create state structure
69  */
70  gatherstate = makeNode(GatherState);
71  gatherstate->ps.plan = (Plan *) node;
72  gatherstate->ps.state = estate;
73  gatherstate->ps.ExecProcNode = ExecGather;
74 
75  gatherstate->initialized = false;
76  gatherstate->need_to_scan_locally =
78  gatherstate->tuples_needed = -1;
79 
80  /*
81  * Miscellaneous initialization
82  *
83  * create expression context for node
84  */
85  ExecAssignExprContext(estate, &gatherstate->ps);
86 
87  /*
88  * now initialize outer plan
89  */
90  outerNode = outerPlan(node);
91  outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
92  tupDesc = ExecGetResultType(outerPlanState(gatherstate));
93 
94  /*
95  * Initialize result slot, type and projection.
96  */
97  ExecInitResultTupleSlotTL(estate, &gatherstate->ps);
98  ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
99 
100  /*
101  * Initialize funnel slot to same tuple descriptor as outer plan.
102  */
103  gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc);
104 
105  /*
106  * Gather doesn't support checking a qual (it's always more efficient to
107  * do it in the child node).
108  */
109  Assert(!node->plan.qual);
110 
111  return gatherstate;
112 }
113 
114 /* ----------------------------------------------------------------
115  * ExecGather(node)
116  *
117  * Scans the relation via multiple workers and returns
118  * the next qualifying tuple.
119  * ----------------------------------------------------------------
120  */
121 static TupleTableSlot *
123 {
124  GatherState *node = castNode(GatherState, pstate);
125  TupleTableSlot *slot;
126  ExprContext *econtext;
127 
129 
130  /*
131  * Initialize the parallel context and workers on first execution. We do
132  * this on first execution rather than during node initialization, as it
133  * needs to allocate a large dynamic segment, so it is better to do it
134  * only if it is really needed.
135  */
136  if (!node->initialized)
137  {
138  EState *estate = node->ps.state;
139  Gather *gather = (Gather *) node->ps.plan;
140 
141  /*
142  * Sometimes we might have to run without parallelism; but if parallel
143  * mode is active then we can try to fire up some workers.
144  */
145  if (gather->num_workers > 0 && estate->es_use_parallel_mode)
146  {
147  ParallelContext *pcxt;
148 
149  /* Initialize, or re-initialize, shared state needed by workers. */
150  if (!node->pei)
151  node->pei = ExecInitParallelPlan(node->ps.lefttree,
152  estate,
153  gather->initParam,
154  gather->num_workers,
155  node->tuples_needed);
156  else
158  node->pei,
159  gather->initParam);
160 
161  /*
162  * Register backend workers. We might not get as many as we
163  * requested, or indeed any at all.
164  */
165  pcxt = node->pei->pcxt;
166  LaunchParallelWorkers(pcxt);
167  /* We save # workers launched for the benefit of EXPLAIN */
168  node->nworkers_launched = pcxt->nworkers_launched;
169 
170  /* Set up tuple queue readers to read the results. */
171  if (pcxt->nworkers_launched > 0)
172  {
174  /* Make a working array showing the active readers */
175  node->nreaders = pcxt->nworkers_launched;
176  node->reader = (TupleQueueReader **)
177  palloc(node->nreaders * sizeof(TupleQueueReader *));
178  memcpy(node->reader, node->pei->reader,
179  node->nreaders * sizeof(TupleQueueReader *));
180  }
181  else
182  {
183  /* No workers? Then never mind. */
184  node->nreaders = 0;
185  node->reader = NULL;
186  }
187  node->nextreader = 0;
188  }
189 
190  /* Run plan locally if no workers or enabled and not single-copy. */
191  node->need_to_scan_locally = (node->nreaders == 0)
193  node->initialized = true;
194  }
195 
196  /*
197  * Reset per-tuple memory context to free any expression evaluation
198  * storage allocated in the previous tuple cycle.
199  */
200  econtext = node->ps.ps_ExprContext;
201  ResetExprContext(econtext);
202 
203  /*
204  * Get next tuple, either from one of our workers, or by running the plan
205  * ourselves.
206  */
207  slot = gather_getnext(node);
208  if (TupIsNull(slot))
209  return NULL;
210 
211  /* If no projection is required, we're done. */
212  if (node->ps.ps_ProjInfo == NULL)
213  return slot;
214 
215  /*
216  * Form the result tuple using ExecProject(), and return it.
217  */
218  econtext->ecxt_outertuple = slot;
219  return ExecProject(node->ps.ps_ProjInfo);
220 }
221 
222 /* ----------------------------------------------------------------
223  * ExecEndGather
224  *
225  * frees any storage allocated through C routines.
226  * ----------------------------------------------------------------
227  */
228 void
230 {
231  ExecEndNode(outerPlanState(node)); /* let children clean up first */
232  ExecShutdownGather(node);
233  ExecFreeExprContext(&node->ps);
235 }
236 
237 /*
238  * Read the next tuple. We might fetch a tuple from one of the tuple queues
239  * using gather_readnext, or if no tuple queue contains a tuple and the
240  * single_copy flag is not set, we might generate one locally instead.
241  */
242 static TupleTableSlot *
244 {
245  PlanState *outerPlan = outerPlanState(gatherstate);
246  TupleTableSlot *outerTupleSlot;
247  TupleTableSlot *fslot = gatherstate->funnel_slot;
248  HeapTuple tup;
249 
250  while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
251  {
253 
254  if (gatherstate->nreaders > 0)
255  {
256  tup = gather_readnext(gatherstate);
257 
258  if (HeapTupleIsValid(tup))
259  {
260  ExecStoreTuple(tup, /* tuple to store */
261  fslot, /* slot in which to store the tuple */
262  InvalidBuffer, /* buffer associated with this
263  * tuple */
264  true); /* pfree tuple when done with it */
265  return fslot;
266  }
267  }
268 
269  if (gatherstate->need_to_scan_locally)
270  {
271  EState *estate = gatherstate->ps.state;
272 
273  /* Install our DSA area while executing the plan. */
274  estate->es_query_dsa =
275  gatherstate->pei ? gatherstate->pei->area : NULL;
276  outerTupleSlot = ExecProcNode(outerPlan);
277  estate->es_query_dsa = NULL;
278 
279  if (!TupIsNull(outerTupleSlot))
280  return outerTupleSlot;
281 
282  gatherstate->need_to_scan_locally = false;
283  }
284  }
285 
286  return ExecClearTuple(fslot);
287 }
288 
289 /*
290  * Attempt to read a tuple from one of our parallel workers.
291  */
292 static HeapTuple
294 {
295  int nvisited = 0;
296 
297  for (;;)
298  {
299  TupleQueueReader *reader;
300  HeapTuple tup;
301  bool readerdone;
302 
303  /* Check for async events, particularly messages from workers. */
305 
306  /*
307  * Attempt to read a tuple, but don't block if none is available.
308  *
309  * Note that TupleQueueReaderNext will just return NULL for a worker
310  * which fails to initialize. We'll treat that worker as having
311  * produced no tuples; WaitForParallelWorkersToFinish will error out
312  * when we get there.
313  */
314  Assert(gatherstate->nextreader < gatherstate->nreaders);
315  reader = gatherstate->reader[gatherstate->nextreader];
316  tup = TupleQueueReaderNext(reader, true, &readerdone);
317 
318  /*
319  * If this reader is done, remove it from our working array of active
320  * readers. If all readers are done, we're outta here.
321  */
322  if (readerdone)
323  {
324  Assert(!tup);
325  --gatherstate->nreaders;
326  if (gatherstate->nreaders == 0)
327  return NULL;
328  memmove(&gatherstate->reader[gatherstate->nextreader],
329  &gatherstate->reader[gatherstate->nextreader + 1],
330  sizeof(TupleQueueReader *)
331  * (gatherstate->nreaders - gatherstate->nextreader));
332  if (gatherstate->nextreader >= gatherstate->nreaders)
333  gatherstate->nextreader = 0;
334  continue;
335  }
336 
337  /* If we got a tuple, return it. */
338  if (tup)
339  return tup;
340 
341  /*
342  * Advance nextreader pointer in round-robin fashion. Note that we
343  * only reach this code if we weren't able to get a tuple from the
344  * current worker. We used to advance the nextreader pointer after
345  * every tuple, but it turns out to be much more efficient to keep
346  * reading from the same queue until that would require blocking.
347  */
348  gatherstate->nextreader++;
349  if (gatherstate->nextreader >= gatherstate->nreaders)
350  gatherstate->nextreader = 0;
351 
352  /* Have we visited every (surviving) TupleQueueReader? */
353  nvisited++;
354  if (nvisited >= gatherstate->nreaders)
355  {
356  /*
357  * If (still) running plan locally, return NULL so caller can
358  * generate another tuple from the local copy of the plan.
359  */
360  if (gatherstate->need_to_scan_locally)
361  return NULL;
362 
363  /* Nothing to do except wait for developments. */
366  nvisited = 0;
367  }
368  }
369 }
370 
371 /* ----------------------------------------------------------------
372  * ExecShutdownGatherWorkers
373  *
374  * Stop all the parallel workers.
375  * ----------------------------------------------------------------
376  */
377 static void
379 {
380  if (node->pei != NULL)
381  ExecParallelFinish(node->pei);
382 
383  /* Flush local copy of reader array */
384  if (node->reader)
385  pfree(node->reader);
386  node->reader = NULL;
387 }
388 
389 /* ----------------------------------------------------------------
390  * ExecShutdownGather
391  *
392  * Destroy the setup for parallel workers including parallel context.
393  * ----------------------------------------------------------------
394  */
395 void
397 {
399 
400  /* Now destroy the parallel context. */
401  if (node->pei != NULL)
402  {
403  ExecParallelCleanup(node->pei);
404  node->pei = NULL;
405  }
406 }
407 
408 /* ----------------------------------------------------------------
409  * Join Support
410  * ----------------------------------------------------------------
411  */
412 
413 /* ----------------------------------------------------------------
414  * ExecReScanGather
415  *
416  * Prepare to re-scan the result of a Gather.
417  * ----------------------------------------------------------------
418  */
419 void
421 {
422  Gather *gather = (Gather *) node->ps.plan;
424 
425  /* Make sure any existing workers are gracefully shut down */
427 
428  /* Mark node so that shared state will be rebuilt at next call */
429  node->initialized = false;
430 
431  /*
432  * Set child node's chgParam to tell it that the next scan might deliver a
433  * different set of rows within the leader process. (The overall rowset
434  * shouldn't change, but the leader process's subset might; hence nodes
435  * between here and the parallel table scan node mustn't optimize on the
436  * assumption of an unchanging rowset.)
437  */
438  if (gather->rescan_param >= 0)
439  outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
440  gather->rescan_param);
441 
442  /*
443  * If chgParam of subnode is not null then plan will be re-scanned by
444  * first ExecProcNode. Note: because this does nothing if we have a
445  * rescan_param, it's currently guaranteed that parallel-aware child nodes
446  * will not see a ReScan call until after they get a ReInitializeDSM call.
447  * That ordering might not be something to rely on, though. A good rule
448  * of thumb is that ReInitializeDSM should reset only shared state, ReScan
449  * should reset only local state, and anything that depends on both of
450  * those steps being finished must wait until the first ExecProcNode call.
451  */
452  if (outerPlan->chgParam == NULL)
454 }
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:356
List * qual
Definition: plannodes.h:147
struct dsa_area * es_query_dsa
Definition: execnodes.h:565
ParallelContext * pcxt
Definition: execParallel.h:27
static TupleTableSlot * ExecGather(PlanState *pstate)
Definition: nodeGather.c:122
int nworkers_launched
Definition: execnodes.h:2061
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:948
void ExecParallelFinish(ParallelExecutorInfo *pei)
Definition: execParallel.c:997
#define castNode(_type_, nodeptr)
Definition: nodes.h:586
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:538
ExprContext * ps_ExprContext
Definition: execnodes.h:947
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:475
#define InvalidBuffer
Definition: buf.h:25
bool need_to_scan_locally
Definition: execnodes.h:2055
HeapTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Definition: tqueue.c:172
struct TupleQueueReader ** reader
Definition: execnodes.h:2064
EState * state
Definition: execnodes.h:914
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
void ExecConditionalAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc, Index varno)
Definition: execUtils.c:476
GatherState * ExecInitGather(Gather *node, EState *estate, int eflags)
Definition: nodeGather.c:58
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:566
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc)
Definition: execTuples.c:931
struct PlanState * lefttree
Definition: execnodes.h:931
bool es_use_parallel_mode
Definition: execnodes.h:562
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:946
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:336
void pfree(void *pointer)
Definition: mcxt.c:1031
bool initialized
Definition: execnodes.h:2054
static void ExecShutdownGatherWorkers(GatherState *node)
Definition: nodeGather.c:378
Plan plan
Definition: plannodes.h:859
bool single_copy
Definition: plannodes.h:862
struct ParallelExecutorInfo * pei
Definition: execnodes.h:2059
#define outerPlanState(node)
Definition: execnodes.h:966
#define innerPlan(node)
Definition: plannodes.h:175
void ExecParallelCreateReaders(ParallelExecutorInfo *pei)
Definition: execParallel.c:798
#define memmove(d, s, c)
Definition: c.h:1135
struct TupleQueueReader ** reader
Definition: execParallel.h:35
#define TupIsNull(slot)
Definition: tuptable.h:146
PlanState ps
Definition: execnodes.h:2053
int nworkers_launched
Definition: parallel.h:37
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition: parallel.c:481
static HeapTuple gather_readnext(GatherState *gatherstate)
Definition: nodeGather.c:293
#define outerPlan(node)
Definition: plannodes.h:176
void ExecReScanGather(GatherState *node)
Definition: nodeGather.c:420
void ExecShutdownGather(GatherState *node)
Definition: nodeGather.c:396
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:918
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:233
TupleTableSlot * funnel_slot
Definition: execnodes.h:2058
void ExecInitResultTupleSlotTL(EState *estate, PlanState *planstate)
Definition: execTuples.c:890
Plan * plan
Definition: execnodes.h:912
int num_workers
Definition: plannodes.h:860
bool parallel_leader_participation
Definition: planner.c:65
#define makeNode(_type_)
Definition: nodes.h:565
void ExecParallelCleanup(ParallelExecutorInfo *pei)
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:222
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:699
int rescan_param
Definition: plannodes.h:861
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:428
void ExecParallelReinitialize(PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParams)
Definition: execParallel.c:824
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:764
TupleDesc ExecGetResultType(PlanState *planstate)
Definition: execUtils.c:438
static TupleTableSlot * gather_getnext(GatherState *gatherstate)
Definition: nodeGather.c:243
Bitmapset * initParam
Definition: plannodes.h:864
int nextreader
Definition: execnodes.h:2063
void * palloc(Size size)
Definition: mcxt.c:924
struct Latch * MyLatch
Definition: globals.c:55
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed)
Definition: execParallel.c:562
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:139
#define WL_LATCH_SET
Definition: latch.h:124
#define OUTER_VAR
Definition: primnodes.h:155
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
Definition: executor.h:324
void ExecEndGather(GatherState *node)
Definition: nodeGather.c:229
#define ResetExprContext(econtext)
Definition: executor.h:483
int64 tuples_needed
Definition: execnodes.h:2056