PostgreSQL Source Code  git master
nodeGather.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeGather.c
4  * Support routines for scanning a plan via multiple workers.
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * A Gather executor launches parallel workers to run multiple copies of a
10  * plan. It can also run the plan itself, if the workers are not available
11  * or have not started up yet. It then merges all of the results it produces
12  * and the results from the workers into a single output stream. Therefore,
13  * it will normally be used with a plan where running multiple copies of the
14  * same plan does not produce duplicate output, such as parallel-aware
15  * SeqScan.
16  *
17  * Alternatively, a Gather node can be configured to use just one worker
18  * and the single-copy flag can be set. In this case, the Gather node will
19  * run the plan in one worker and will not execute the plan itself. In
20  * this case, it simply returns whatever tuples were returned by the worker.
21  * If a worker cannot be obtained, then it will run the plan itself and
22  * return the results. Therefore, a plan used with a single-copy Gather
23  * node need not be parallel-aware.
24  *
25  * IDENTIFICATION
26  * src/backend/executor/nodeGather.c
27  *
28  *-------------------------------------------------------------------------
29  */
30 
31 #include "postgres.h"
32 
33 #include "access/relscan.h"
34 #include "access/xact.h"
35 #include "executor/execdebug.h"
36 #include "executor/execParallel.h"
37 #include "executor/nodeGather.h"
38 #include "executor/nodeSubplan.h"
39 #include "executor/tqueue.h"
40 #include "miscadmin.h"
41 #include "optimizer/optimizer.h"
42 #include "pgstat.h"
43 #include "utils/memutils.h"
44 #include "utils/rel.h"
45 
46 
47 static TupleTableSlot *ExecGather(PlanState *pstate);
48 static TupleTableSlot *gather_getnext(GatherState *gatherstate);
49 static HeapTuple gather_readnext(GatherState *gatherstate);
50 static void ExecShutdownGatherWorkers(GatherState *node);
51 
52 
53 /* ----------------------------------------------------------------
54  * ExecInitGather
55  * ----------------------------------------------------------------
56  */
58 ExecInitGather(Gather *node, EState *estate, int eflags)
59 {
60  GatherState *gatherstate;
61  Plan *outerNode;
62  TupleDesc tupDesc;
63 
64  /* Gather node doesn't have innerPlan node. */
65  Assert(innerPlan(node) == NULL);
66 
67  /*
68  * create state structure
69  */
70  gatherstate = makeNode(GatherState);
71  gatherstate->ps.plan = (Plan *) node;
72  gatherstate->ps.state = estate;
73  gatherstate->ps.ExecProcNode = ExecGather;
74 
75  gatherstate->initialized = false;
76  gatherstate->need_to_scan_locally =
78  gatherstate->tuples_needed = -1;
79 
80  /*
81  * Miscellaneous initialization
82  *
83  * create expression context for node
84  */
85  ExecAssignExprContext(estate, &gatherstate->ps);
86 
87  /*
88  * now initialize outer plan
89  */
90  outerNode = outerPlan(node);
91  outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
92  tupDesc = ExecGetResultType(outerPlanState(gatherstate));
93 
94  /*
95  * Leader may access ExecProcNode result directly (if
96  * need_to_scan_locally), or from workers via tuple queue. So we can't
97  * trivially rely on the slot type being fixed for expressions evaluated
98  * within this node.
99  */
100  gatherstate->ps.outeropsset = true;
101  gatherstate->ps.outeropsfixed = false;
102 
103  /*
104  * Initialize result type and projection.
105  */
106  ExecInitResultTypeTL(&gatherstate->ps);
107  ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
108 
109  /*
110  * Without projections result slot type is not trivially known, see
111  * comment above.
112  */
113  if (gatherstate->ps.ps_ProjInfo == NULL)
114  {
115  gatherstate->ps.resultopsset = true;
116  gatherstate->ps.resultopsfixed = false;
117  }
118 
119  /*
120  * Initialize funnel slot to same tuple descriptor as outer plan.
121  */
122  gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
123  &TTSOpsHeapTuple);
124 
125  /*
126  * Gather doesn't support checking a qual (it's always more efficient to
127  * do it in the child node).
128  */
129  Assert(!node->plan.qual);
130 
131  return gatherstate;
132 }
133 
134 /* ----------------------------------------------------------------
135  * ExecGather(node)
136  *
137  * Scans the relation via multiple workers and returns
138  * the next qualifying tuple.
139  * ----------------------------------------------------------------
140  */
141 static TupleTableSlot *
143 {
144  GatherState *node = castNode(GatherState, pstate);
145  TupleTableSlot *slot;
146  ExprContext *econtext;
147 
149 
150  /*
151  * Initialize the parallel context and workers on first execution. We do
152  * this on first execution rather than during node initialization, as it
153  * needs to allocate a large dynamic segment, so it is better to do it
154  * only if it is really needed.
155  */
156  if (!node->initialized)
157  {
158  EState *estate = node->ps.state;
159  Gather *gather = (Gather *) node->ps.plan;
160 
161  /*
162  * Sometimes we might have to run without parallelism; but if parallel
163  * mode is active then we can try to fire up some workers.
164  */
165  if (gather->num_workers > 0 && estate->es_use_parallel_mode)
166  {
167  ParallelContext *pcxt;
168 
169  /* Initialize, or re-initialize, shared state needed by workers. */
170  if (!node->pei)
171  node->pei = ExecInitParallelPlan(node->ps.lefttree,
172  estate,
173  gather->initParam,
174  gather->num_workers,
175  node->tuples_needed);
176  else
178  node->pei,
179  gather->initParam);
180 
181  /*
182  * Register backend workers. We might not get as many as we
183  * requested, or indeed any at all.
184  */
185  pcxt = node->pei->pcxt;
186  LaunchParallelWorkers(pcxt);
187  /* We save # workers launched for the benefit of EXPLAIN */
188  node->nworkers_launched = pcxt->nworkers_launched;
189 
190  /* Set up tuple queue readers to read the results. */
191  if (pcxt->nworkers_launched > 0)
192  {
194  /* Make a working array showing the active readers */
195  node->nreaders = pcxt->nworkers_launched;
196  node->reader = (TupleQueueReader **)
197  palloc(node->nreaders * sizeof(TupleQueueReader *));
198  memcpy(node->reader, node->pei->reader,
199  node->nreaders * sizeof(TupleQueueReader *));
200  }
201  else
202  {
203  /* No workers? Then never mind. */
204  node->nreaders = 0;
205  node->reader = NULL;
206  }
207  node->nextreader = 0;
208  }
209 
210  /* Run plan locally if no workers or enabled and not single-copy. */
211  node->need_to_scan_locally = (node->nreaders == 0)
213  node->initialized = true;
214  }
215 
216  /*
217  * Reset per-tuple memory context to free any expression evaluation
218  * storage allocated in the previous tuple cycle.
219  */
220  econtext = node->ps.ps_ExprContext;
221  ResetExprContext(econtext);
222 
223  /*
224  * Get next tuple, either from one of our workers, or by running the plan
225  * ourselves.
226  */
227  slot = gather_getnext(node);
228  if (TupIsNull(slot))
229  return NULL;
230 
231  /* If no projection is required, we're done. */
232  if (node->ps.ps_ProjInfo == NULL)
233  return slot;
234 
235  /*
236  * Form the result tuple using ExecProject(), and return it.
237  */
238  econtext->ecxt_outertuple = slot;
239  return ExecProject(node->ps.ps_ProjInfo);
240 }
241 
242 /* ----------------------------------------------------------------
243  * ExecEndGather
244  *
245  * frees any storage allocated through C routines.
246  * ----------------------------------------------------------------
247  */
248 void
250 {
251  ExecEndNode(outerPlanState(node)); /* let children clean up first */
252  ExecShutdownGather(node);
253  ExecFreeExprContext(&node->ps);
254  if (node->ps.ps_ResultTupleSlot)
256 }
257 
258 /*
259  * Read the next tuple. We might fetch a tuple from one of the tuple queues
260  * using gather_readnext, or if no tuple queue contains a tuple and the
261  * single_copy flag is not set, we might generate one locally instead.
262  */
263 static TupleTableSlot *
265 {
266  PlanState *outerPlan = outerPlanState(gatherstate);
267  TupleTableSlot *outerTupleSlot;
268  TupleTableSlot *fslot = gatherstate->funnel_slot;
269  HeapTuple tup;
270 
271  while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
272  {
274 
275  if (gatherstate->nreaders > 0)
276  {
277  tup = gather_readnext(gatherstate);
278 
279  if (HeapTupleIsValid(tup))
280  {
281  ExecStoreHeapTuple(tup, /* tuple to store */
282  fslot, /* slot to store the tuple */
283  true); /* pfree tuple when done with it */
284  return fslot;
285  }
286  }
287 
288  if (gatherstate->need_to_scan_locally)
289  {
290  EState *estate = gatherstate->ps.state;
291 
292  /* Install our DSA area while executing the plan. */
293  estate->es_query_dsa =
294  gatherstate->pei ? gatherstate->pei->area : NULL;
295  outerTupleSlot = ExecProcNode(outerPlan);
296  estate->es_query_dsa = NULL;
297 
298  if (!TupIsNull(outerTupleSlot))
299  return outerTupleSlot;
300 
301  gatherstate->need_to_scan_locally = false;
302  }
303  }
304 
305  return ExecClearTuple(fslot);
306 }
307 
308 /*
309  * Attempt to read a tuple from one of our parallel workers.
310  */
311 static HeapTuple
313 {
314  int nvisited = 0;
315 
316  for (;;)
317  {
318  TupleQueueReader *reader;
319  HeapTuple tup;
320  bool readerdone;
321 
322  /* Check for async events, particularly messages from workers. */
324 
325  /*
326  * Attempt to read a tuple, but don't block if none is available.
327  *
328  * Note that TupleQueueReaderNext will just return NULL for a worker
329  * which fails to initialize. We'll treat that worker as having
330  * produced no tuples; WaitForParallelWorkersToFinish will error out
331  * when we get there.
332  */
333  Assert(gatherstate->nextreader < gatherstate->nreaders);
334  reader = gatherstate->reader[gatherstate->nextreader];
335  tup = TupleQueueReaderNext(reader, true, &readerdone);
336 
337  /*
338  * If this reader is done, remove it from our working array of active
339  * readers. If all readers are done, we're outta here.
340  */
341  if (readerdone)
342  {
343  Assert(!tup);
344  --gatherstate->nreaders;
345  if (gatherstate->nreaders == 0)
346  {
347  ExecShutdownGatherWorkers(gatherstate);
348  return NULL;
349  }
350  memmove(&gatherstate->reader[gatherstate->nextreader],
351  &gatherstate->reader[gatherstate->nextreader + 1],
352  sizeof(TupleQueueReader *)
353  * (gatherstate->nreaders - gatherstate->nextreader));
354  if (gatherstate->nextreader >= gatherstate->nreaders)
355  gatherstate->nextreader = 0;
356  continue;
357  }
358 
359  /* If we got a tuple, return it. */
360  if (tup)
361  return tup;
362 
363  /*
364  * Advance nextreader pointer in round-robin fashion. Note that we
365  * only reach this code if we weren't able to get a tuple from the
366  * current worker. We used to advance the nextreader pointer after
367  * every tuple, but it turns out to be much more efficient to keep
368  * reading from the same queue until that would require blocking.
369  */
370  gatherstate->nextreader++;
371  if (gatherstate->nextreader >= gatherstate->nreaders)
372  gatherstate->nextreader = 0;
373 
374  /* Have we visited every (surviving) TupleQueueReader? */
375  nvisited++;
376  if (nvisited >= gatherstate->nreaders)
377  {
378  /*
379  * If (still) running plan locally, return NULL so caller can
380  * generate another tuple from the local copy of the plan.
381  */
382  if (gatherstate->need_to_scan_locally)
383  return NULL;
384 
385  /* Nothing to do except wait for developments. */
389  nvisited = 0;
390  }
391  }
392 }
393 
394 /* ----------------------------------------------------------------
395  * ExecShutdownGatherWorkers
396  *
397  * Stop all the parallel workers.
398  * ----------------------------------------------------------------
399  */
400 static void
402 {
403  if (node->pei != NULL)
404  ExecParallelFinish(node->pei);
405 
406  /* Flush local copy of reader array */
407  if (node->reader)
408  pfree(node->reader);
409  node->reader = NULL;
410 }
411 
412 /* ----------------------------------------------------------------
413  * ExecShutdownGather
414  *
415  * Destroy the setup for parallel workers including parallel context.
416  * ----------------------------------------------------------------
417  */
418 void
420 {
422 
423  /* Now destroy the parallel context. */
424  if (node->pei != NULL)
425  {
426  ExecParallelCleanup(node->pei);
427  node->pei = NULL;
428  }
429 }
430 
431 /* ----------------------------------------------------------------
432  * Join Support
433  * ----------------------------------------------------------------
434  */
435 
436 /* ----------------------------------------------------------------
437  * ExecReScanGather
438  *
439  * Prepare to re-scan the result of a Gather.
440  * ----------------------------------------------------------------
441  */
442 void
444 {
445  Gather *gather = (Gather *) node->ps.plan;
447 
448  /* Make sure any existing workers are gracefully shut down */
450 
451  /* Mark node so that shared state will be rebuilt at next call */
452  node->initialized = false;
453 
454  /*
455  * Set child node's chgParam to tell it that the next scan might deliver a
456  * different set of rows within the leader process. (The overall rowset
457  * shouldn't change, but the leader process's subset might; hence nodes
458  * between here and the parallel table scan node mustn't optimize on the
459  * assumption of an unchanging rowset.)
460  */
461  if (gather->rescan_param >= 0)
462  outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
463  gather->rescan_param);
464 
465  /*
466  * If chgParam of subnode is not null then plan will be re-scanned by
467  * first ExecProcNode. Note: because this does nothing if we have a
468  * rescan_param, it's currently guaranteed that parallel-aware child nodes
469  * will not see a ReScan call until after they get a ReInitializeDSM call.
470  * That ordering might not be something to rely on, though. A good rule
471  * of thumb is that ReInitializeDSM should reset only shared state, ReScan
472  * should reset only local state, and anything that depends on both of
473  * those steps being finished must wait until the first ExecProcNode call.
474  */
475  if (outerPlan->chgParam == NULL)
477 }
List * qual
Definition: plannodes.h:141
struct dsa_area * es_query_dsa
Definition: execnodes.h:583
ParallelContext * pcxt
Definition: execParallel.h:27
static TupleTableSlot * ExecGather(PlanState *pstate)
Definition: nodeGather.c:142
int nworkers_launched
Definition: execnodes.h:2203
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:979
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
void ExecParallelFinish(ParallelExecutorInfo *pei)
#define castNode(_type_, nodeptr)
Definition: nodes.h:594
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:537
ExprContext * ps_ExprContext
Definition: execnodes.h:978
void ExecReScan(PlanState *node)
Definition: execAmi.c:75
bool need_to_scan_locally
Definition: execnodes.h:2197
HeapTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Definition: tqueue.c:176
struct TupleQueueReader ** reader
Definition: execnodes.h:2206
EState * state
Definition: execnodes.h:941
void ExecConditionalAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc, Index varno)
Definition: execUtils.c:519
GatherState * ExecInitGather(Gather *node, EState *estate, int eflags)
Definition: nodeGather.c:58
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:614
void ResetLatch(Latch *latch)
Definition: latch.c:519
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:344
struct PlanState * lefttree
Definition: execnodes.h:961
bool es_use_parallel_mode
Definition: execnodes.h:580
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:977
void pfree(void *pointer)
Definition: mcxt.c:1056
bool initialized
Definition: execnodes.h:2196
static void ExecShutdownGatherWorkers(GatherState *node)
Definition: nodeGather.c:401
Plan plan
Definition: plannodes.h:871
bool single_copy
Definition: plannodes.h:874
struct ParallelExecutorInfo * pei
Definition: execnodes.h:2201
#define outerPlanState(node)
Definition: execnodes.h:1033
#define innerPlan(node)
Definition: plannodes.h:169
void ExecParallelCreateReaders(ParallelExecutorInfo *pei)
Definition: execParallel.c:831
#define memmove(d, s, c)
Definition: c.h:1267
struct TupleQueueReader ** reader
Definition: execParallel.h:36
void ExecInitResultTypeTL(PlanState *planstate)
Definition: execTuples.c:1725
#define TupIsNull(slot)
Definition: tuptable.h:292
PlanState ps
Definition: execnodes.h:2195
bool outeropsset
Definition: execnodes.h:1020
int nworkers_launched
Definition: parallel.h:37
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition: parallel.c:493
static HeapTuple gather_readnext(GatherState *gatherstate)
Definition: nodeGather.c:312
bool resultopsset
Definition: execnodes.h:1022
#define outerPlan(node)
Definition: plannodes.h:170
void ExecReScanGather(GatherState *node)
Definition: nodeGather.c:443
void ExecShutdownGather(GatherState *node)
Definition: nodeGather.c:419
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:945
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:235
TupleTableSlot * funnel_slot
Definition: execnodes.h:2200
Plan * plan
Definition: execnodes.h:939
int num_workers
Definition: plannodes.h:872
bool parallel_leader_participation
Definition: planner.c:71
#define makeNode(_type_)
Definition: nodes.h:573
void ExecParallelCleanup(ParallelExecutorInfo *pei)
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:227
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:739
int rescan_param
Definition: plannodes.h:873
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:444
bool outeropsfixed
Definition: execnodes.h:1016
void ExecParallelReinitialize(PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParams)
Definition: execParallel.c:857
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
TupleDesc ExecGetResultType(PlanState *planstate)
Definition: execUtils.c:454
static TupleTableSlot * gather_getnext(GatherState *gatherstate)
Definition: nodeGather.c:264
Bitmapset * initParam
Definition: plannodes.h:876
bool resultopsfixed
Definition: execnodes.h:1018
int nextreader
Definition: execnodes.h:2205
void * palloc(Size size)
Definition: mcxt.c:949
const TupleTableSlotOps TTSOpsHeapTuple
Definition: execTuples.c:84
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed)
Definition: execParallel.c:561
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:138
#define WL_LATCH_SET
Definition: latch.h:124
#define OUTER_VAR
Definition: primnodes.h:158
TupleTableSlot * ExecStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1322
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
Definition: executor.h:328
void ExecEndGather(GatherState *node)
Definition: nodeGather.c:249
#define ResetExprContext(econtext)
Definition: executor.h:495
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
int64 tuples_needed
Definition: execnodes.h:2198