PostgreSQL Source Code  git master
nodeGather.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeGather.c
4  * Support routines for scanning a plan via multiple workers.
5  *
6  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * A Gather executor launches parallel workers to run multiple copies of a
10  * plan. It can also run the plan itself, if the workers are not available
11  * or have not started up yet. It then merges all of the results it produces
12  * and the results from the workers into a single output stream. Therefore,
13  * it will normally be used with a plan where running multiple copies of the
14  * same plan does not produce duplicate output, such as parallel-aware
15  * SeqScan.
16  *
17  * Alternatively, a Gather node can be configured to use just one worker
18  * and the single-copy flag can be set. In this case, the Gather node will
19  * run the plan in one worker and will not execute the plan itself. In
20  * this case, it simply returns whatever tuples were returned by the worker.
21  * If a worker cannot be obtained, then it will run the plan itself and
22  * return the results. Therefore, a plan used with a single-copy Gather
23  * node need not be parallel-aware.
24  *
25  * IDENTIFICATION
26  * src/backend/executor/nodeGather.c
27  *
28  *-------------------------------------------------------------------------
29  */
30 
31 #include "postgres.h"
32 
33 #include "executor/execParallel.h"
34 #include "executor/executor.h"
35 #include "executor/nodeGather.h"
36 #include "executor/tqueue.h"
37 #include "miscadmin.h"
38 #include "optimizer/optimizer.h"
39 #include "utils/wait_event.h"
40 
41 
42 static TupleTableSlot *ExecGather(PlanState *pstate);
43 static TupleTableSlot *gather_getnext(GatherState *gatherstate);
44 static MinimalTuple gather_readnext(GatherState *gatherstate);
45 static void ExecShutdownGatherWorkers(GatherState *node);
46 
47 
48 /* ----------------------------------------------------------------
49  * ExecInitGather
50  * ----------------------------------------------------------------
51  */
53 ExecInitGather(Gather *node, EState *estate, int eflags)
54 {
55  GatherState *gatherstate;
56  Plan *outerNode;
57  TupleDesc tupDesc;
58 
59  /* Gather node doesn't have innerPlan node. */
60  Assert(innerPlan(node) == NULL);
61 
62  /*
63  * create state structure
64  */
65  gatherstate = makeNode(GatherState);
66  gatherstate->ps.plan = (Plan *) node;
67  gatherstate->ps.state = estate;
68  gatherstate->ps.ExecProcNode = ExecGather;
69 
70  gatherstate->initialized = false;
71  gatherstate->need_to_scan_locally =
73  gatherstate->tuples_needed = -1;
74 
75  /*
76  * Miscellaneous initialization
77  *
78  * create expression context for node
79  */
80  ExecAssignExprContext(estate, &gatherstate->ps);
81 
82  /*
83  * now initialize outer plan
84  */
85  outerNode = outerPlan(node);
86  outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
87  tupDesc = ExecGetResultType(outerPlanState(gatherstate));
88 
89  /*
90  * Leader may access ExecProcNode result directly (if
91  * need_to_scan_locally), or from workers via tuple queue. So we can't
92  * trivially rely on the slot type being fixed for expressions evaluated
93  * within this node.
94  */
95  gatherstate->ps.outeropsset = true;
96  gatherstate->ps.outeropsfixed = false;
97 
98  /*
99  * Initialize result type and projection.
100  */
101  ExecInitResultTypeTL(&gatherstate->ps);
102  ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
103 
104  /*
105  * Without projections result slot type is not trivially known, see
106  * comment above.
107  */
108  if (gatherstate->ps.ps_ProjInfo == NULL)
109  {
110  gatherstate->ps.resultopsset = true;
111  gatherstate->ps.resultopsfixed = false;
112  }
113 
114  /*
115  * Initialize funnel slot to same tuple descriptor as outer plan.
116  */
117  gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
119 
120  /*
121  * Gather doesn't support checking a qual (it's always more efficient to
122  * do it in the child node).
123  */
124  Assert(!node->plan.qual);
125 
126  return gatherstate;
127 }
128 
129 /* ----------------------------------------------------------------
130  * ExecGather(node)
131  *
132  * Scans the relation via multiple workers and returns
133  * the next qualifying tuple.
134  * ----------------------------------------------------------------
135  */
136 static TupleTableSlot *
138 {
139  GatherState *node = castNode(GatherState, pstate);
140  TupleTableSlot *slot;
141  ExprContext *econtext;
142 
144 
145  /*
146  * Initialize the parallel context and workers on first execution. We do
147  * this on first execution rather than during node initialization, as it
148  * needs to allocate a large dynamic segment, so it is better to do it
149  * only if it is really needed.
150  */
151  if (!node->initialized)
152  {
153  EState *estate = node->ps.state;
154  Gather *gather = (Gather *) node->ps.plan;
155 
156  /*
157  * Sometimes we might have to run without parallelism; but if parallel
158  * mode is active then we can try to fire up some workers.
159  */
160  if (gather->num_workers > 0 && estate->es_use_parallel_mode)
161  {
162  ParallelContext *pcxt;
163 
164  /* Initialize, or re-initialize, shared state needed by workers. */
165  if (!node->pei)
167  estate,
168  gather->initParam,
169  gather->num_workers,
170  node->tuples_needed);
171  else
173  node->pei,
174  gather->initParam);
175 
176  /*
177  * Register backend workers. We might not get as many as we
178  * requested, or indeed any at all.
179  */
180  pcxt = node->pei->pcxt;
181  LaunchParallelWorkers(pcxt);
182  /* We save # workers launched for the benefit of EXPLAIN */
183  node->nworkers_launched = pcxt->nworkers_launched;
184 
185  /*
186  * Count number of workers originally wanted and actually
187  * launched.
188  */
191 
192  /* Set up tuple queue readers to read the results. */
193  if (pcxt->nworkers_launched > 0)
194  {
196  /* Make a working array showing the active readers */
197  node->nreaders = pcxt->nworkers_launched;
198  node->reader = (TupleQueueReader **)
199  palloc(node->nreaders * sizeof(TupleQueueReader *));
200  memcpy(node->reader, node->pei->reader,
201  node->nreaders * sizeof(TupleQueueReader *));
202  }
203  else
204  {
205  /* No workers? Then never mind. */
206  node->nreaders = 0;
207  node->reader = NULL;
208  }
209  node->nextreader = 0;
210  }
211 
212  /* Run plan locally if no workers or enabled and not single-copy. */
213  node->need_to_scan_locally = (node->nreaders == 0)
215  node->initialized = true;
216  }
217 
218  /*
219  * Reset per-tuple memory context to free any expression evaluation
220  * storage allocated in the previous tuple cycle.
221  */
222  econtext = node->ps.ps_ExprContext;
223  ResetExprContext(econtext);
224 
225  /*
226  * Get next tuple, either from one of our workers, or by running the plan
227  * ourselves.
228  */
229  slot = gather_getnext(node);
230  if (TupIsNull(slot))
231  return NULL;
232 
233  /* If no projection is required, we're done. */
234  if (node->ps.ps_ProjInfo == NULL)
235  return slot;
236 
237  /*
238  * Form the result tuple using ExecProject(), and return it.
239  */
240  econtext->ecxt_outertuple = slot;
241  return ExecProject(node->ps.ps_ProjInfo);
242 }
243 
244 /* ----------------------------------------------------------------
245  * ExecEndGather
246  *
247  * frees any storage allocated through C routines.
248  * ----------------------------------------------------------------
249  */
250 void
252 {
253  ExecEndNode(outerPlanState(node)); /* let children clean up first */
254  ExecShutdownGather(node);
255 }
256 
257 /*
258  * Read the next tuple. We might fetch a tuple from one of the tuple queues
259  * using gather_readnext, or if no tuple queue contains a tuple and the
260  * single_copy flag is not set, we might generate one locally instead.
261  */
262 static TupleTableSlot *
264 {
265  PlanState *outerPlan = outerPlanState(gatherstate);
266  TupleTableSlot *outerTupleSlot;
267  TupleTableSlot *fslot = gatherstate->funnel_slot;
268  MinimalTuple tup;
269 
270  while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
271  {
273 
274  if (gatherstate->nreaders > 0)
275  {
276  tup = gather_readnext(gatherstate);
277 
278  if (HeapTupleIsValid(tup))
279  {
280  ExecStoreMinimalTuple(tup, /* tuple to store */
281  fslot, /* slot to store the tuple */
282  false); /* don't pfree tuple */
283  return fslot;
284  }
285  }
286 
287  if (gatherstate->need_to_scan_locally)
288  {
289  EState *estate = gatherstate->ps.state;
290 
291  /* Install our DSA area while executing the plan. */
292  estate->es_query_dsa =
293  gatherstate->pei ? gatherstate->pei->area : NULL;
294  outerTupleSlot = ExecProcNode(outerPlan);
295  estate->es_query_dsa = NULL;
296 
297  if (!TupIsNull(outerTupleSlot))
298  return outerTupleSlot;
299 
300  gatherstate->need_to_scan_locally = false;
301  }
302  }
303 
304  return ExecClearTuple(fslot);
305 }
306 
307 /*
308  * Attempt to read a tuple from one of our parallel workers.
309  */
310 static MinimalTuple
312 {
313  int nvisited = 0;
314 
315  for (;;)
316  {
317  TupleQueueReader *reader;
318  MinimalTuple tup;
319  bool readerdone;
320 
321  /* Check for async events, particularly messages from workers. */
323 
324  /*
325  * Attempt to read a tuple, but don't block if none is available.
326  *
327  * Note that TupleQueueReaderNext will just return NULL for a worker
328  * which fails to initialize. We'll treat that worker as having
329  * produced no tuples; WaitForParallelWorkersToFinish will error out
330  * when we get there.
331  */
332  Assert(gatherstate->nextreader < gatherstate->nreaders);
333  reader = gatherstate->reader[gatherstate->nextreader];
334  tup = TupleQueueReaderNext(reader, true, &readerdone);
335 
336  /*
337  * If this reader is done, remove it from our working array of active
338  * readers. If all readers are done, we're outta here.
339  */
340  if (readerdone)
341  {
342  Assert(!tup);
343  --gatherstate->nreaders;
344  if (gatherstate->nreaders == 0)
345  {
346  ExecShutdownGatherWorkers(gatherstate);
347  return NULL;
348  }
349  memmove(&gatherstate->reader[gatherstate->nextreader],
350  &gatherstate->reader[gatherstate->nextreader + 1],
351  sizeof(TupleQueueReader *)
352  * (gatherstate->nreaders - gatherstate->nextreader));
353  if (gatherstate->nextreader >= gatherstate->nreaders)
354  gatherstate->nextreader = 0;
355  continue;
356  }
357 
358  /* If we got a tuple, return it. */
359  if (tup)
360  return tup;
361 
362  /*
363  * Advance nextreader pointer in round-robin fashion. Note that we
364  * only reach this code if we weren't able to get a tuple from the
365  * current worker. We used to advance the nextreader pointer after
366  * every tuple, but it turns out to be much more efficient to keep
367  * reading from the same queue until that would require blocking.
368  */
369  gatherstate->nextreader++;
370  if (gatherstate->nextreader >= gatherstate->nreaders)
371  gatherstate->nextreader = 0;
372 
373  /* Have we visited every (surviving) TupleQueueReader? */
374  nvisited++;
375  if (nvisited >= gatherstate->nreaders)
376  {
377  /*
378  * If (still) running plan locally, return NULL so caller can
379  * generate another tuple from the local copy of the plan.
380  */
381  if (gatherstate->need_to_scan_locally)
382  return NULL;
383 
384  /* Nothing to do except wait for developments. */
386  WAIT_EVENT_EXECUTE_GATHER);
388  nvisited = 0;
389  }
390  }
391 }
392 
393 /* ----------------------------------------------------------------
394  * ExecShutdownGatherWorkers
395  *
396  * Stop all the parallel workers.
397  * ----------------------------------------------------------------
398  */
399 static void
401 {
402  if (node->pei != NULL)
403  ExecParallelFinish(node->pei);
404 
405  /* Flush local copy of reader array */
406  if (node->reader)
407  pfree(node->reader);
408  node->reader = NULL;
409 }
410 
411 /* ----------------------------------------------------------------
412  * ExecShutdownGather
413  *
414  * Destroy the setup for parallel workers including parallel context.
415  * ----------------------------------------------------------------
416  */
417 void
419 {
421 
422  /* Now destroy the parallel context. */
423  if (node->pei != NULL)
424  {
425  ExecParallelCleanup(node->pei);
426  node->pei = NULL;
427  }
428 }
429 
430 /* ----------------------------------------------------------------
431  * Join Support
432  * ----------------------------------------------------------------
433  */
434 
435 /* ----------------------------------------------------------------
436  * ExecReScanGather
437  *
438  * Prepare to re-scan the result of a Gather.
439  * ----------------------------------------------------------------
440  */
441 void
443 {
444  Gather *gather = (Gather *) node->ps.plan;
446 
447  /* Make sure any existing workers are gracefully shut down */
449 
450  /* Mark node so that shared state will be rebuilt at next call */
451  node->initialized = false;
452 
453  /*
454  * Set child node's chgParam to tell it that the next scan might deliver a
455  * different set of rows within the leader process. (The overall rowset
456  * shouldn't change, but the leader process's subset might; hence nodes
457  * between here and the parallel table scan node mustn't optimize on the
458  * assumption of an unchanging rowset.)
459  */
460  if (gather->rescan_param >= 0)
461  outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
462  gather->rescan_param);
463 
464  /*
465  * If chgParam of subnode is not null then plan will be re-scanned by
466  * first ExecProcNode. Note: because this does nothing if we have a
467  * rescan_param, it's currently guaranteed that parallel-aware child nodes
468  * will not see a ReScan call until after they get a ReInitializeDSM call.
469  * That ordering might not be something to rely on, though. A good rule
470  * of thumb is that ReInitializeDSM should reset only shared state, ReScan
471  * should reset only local state, and anything that depends on both of
472  * those steps being finished must wait until the first ExecProcNode call.
473  */
474  if (outerPlan->chgParam == NULL)
476 }
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition: parallel.c:552
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
#define Assert(condition)
Definition: c.h:849
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
void ExecParallelCleanup(ParallelExecutorInfo *pei)
ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed)
Definition: execParallel.c:587
void ExecParallelReinitialize(PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParams)
Definition: execParallel.c:904
void ExecParallelCreateReaders(ParallelExecutorInfo *pei)
Definition: execParallel.c:878
void ExecParallelFinish(ParallelExecutorInfo *pei)
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:562
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:142
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1533
void ExecInitResultTypeTL(PlanState *planstate)
Definition: execTuples.c:1842
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1918
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:86
TupleDesc ExecGetResultType(PlanState *planstate)
Definition: execUtils.c:495
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:485
void ExecConditionalAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc, int varno)
Definition: execUtils.c:560
#define outerPlanState(node)
Definition: execnodes.h:1224
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
Definition: executor.h:387
#define ResetExprContext(econtext)
Definition: executor.h:555
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:273
struct Latch * MyLatch
Definition: globals.c:62
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
void ResetLatch(Latch *latch)
Definition: latch.c:724
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:517
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc(Size size)
Definition: mcxt.c:1317
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
void ExecEndGather(GatherState *node)
Definition: nodeGather.c:251
static MinimalTuple gather_readnext(GatherState *gatherstate)
Definition: nodeGather.c:311
static TupleTableSlot * ExecGather(PlanState *pstate)
Definition: nodeGather.c:137
GatherState * ExecInitGather(Gather *node, EState *estate, int eflags)
Definition: nodeGather.c:53
static void ExecShutdownGatherWorkers(GatherState *node)
Definition: nodeGather.c:400
void ExecShutdownGather(GatherState *node)
Definition: nodeGather.c:418
void ExecReScanGather(GatherState *node)
Definition: nodeGather.c:442
static TupleTableSlot * gather_getnext(GatherState *gatherstate)
Definition: nodeGather.c:263
#define makeNode(_type_)
Definition: nodes.h:155
#define castNode(_type_, nodeptr)
Definition: nodes.h:176
bool parallel_leader_participation
Definition: planner.c:69
#define innerPlan(node)
Definition: plannodes.h:182
#define outerPlan(node)
Definition: plannodes.h:183
#define OUTER_VAR
Definition: primnodes.h:237
struct dsa_area * es_query_dsa
Definition: execnodes.h:717
int es_parallel_workers_to_launch
Definition: execnodes.h:711
bool es_use_parallel_mode
Definition: execnodes.h:709
int es_parallel_workers_launched
Definition: execnodes.h:713
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:262
bool initialized
Definition: execnodes.h:2707
TupleTableSlot * funnel_slot
Definition: execnodes.h:2711
struct ParallelExecutorInfo * pei
Definition: execnodes.h:2712
int nextreader
Definition: execnodes.h:2716
int nworkers_launched
Definition: execnodes.h:2714
PlanState ps
Definition: execnodes.h:2706
struct TupleQueueReader ** reader
Definition: execnodes.h:2717
int64 tuples_needed
Definition: execnodes.h:2709
bool need_to_scan_locally
Definition: execnodes.h:2708
int num_workers
Definition: plannodes.h:1144
Bitmapset * initParam
Definition: plannodes.h:1148
bool single_copy
Definition: plannodes.h:1146
Plan plan
Definition: plannodes.h:1143
int rescan_param
Definition: plannodes.h:1145
int nworkers_launched
Definition: parallel.h:37
int nworkers_to_launch
Definition: parallel.h:36
ParallelContext * pcxt
Definition: execParallel.h:27
struct TupleQueueReader ** reader
Definition: execParallel.h:37
bool outeropsset
Definition: execnodes.h:1211
bool resultopsset
Definition: execnodes.h:1213
Plan * plan
Definition: execnodes.h:1128
bool outeropsfixed
Definition: execnodes.h:1207
EState * state
Definition: execnodes.h:1130
ExprContext * ps_ExprContext
Definition: execnodes.h:1167
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:1168
bool resultopsfixed
Definition: execnodes.h:1209
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:1134
List * qual
Definition: plannodes.h:154
MinimalTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Definition: tqueue.c:176
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:454
#define TupIsNull(slot)
Definition: tuptable.h:306