PostgreSQL Source Code git master
Loading...
Searching...
No Matches
nodeGather.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * nodeGather.c
4 * Support routines for scanning a plan via multiple workers.
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * A Gather executor launches parallel workers to run multiple copies of a
10 * plan. It can also run the plan itself, if the workers are not available
11 * or have not started up yet. It then merges all of the results it produces
12 * and the results from the workers into a single output stream. Therefore,
13 * it will normally be used with a plan where running multiple copies of the
14 * same plan does not produce duplicate output, such as parallel-aware
15 * SeqScan.
16 *
17 * Alternatively, a Gather node can be configured to use just one worker
18 * and the single-copy flag can be set. In this case, the Gather node will
19 * run the plan in one worker and will not execute the plan itself. In
20 * this case, it simply returns whatever tuples were returned by the worker.
21 * If a worker cannot be obtained, then it will run the plan itself and
22 * return the results. Therefore, a plan used with a single-copy Gather
23 * node need not be parallel-aware.
24 *
25 * IDENTIFICATION
26 * src/backend/executor/nodeGather.c
27 *
28 *-------------------------------------------------------------------------
29 */
30
31#include "postgres.h"
32
34#include "executor/executor.h"
35#include "executor/nodeGather.h"
36#include "executor/tqueue.h"
37#include "miscadmin.h"
38#include "optimizer/optimizer.h"
39#include "storage/latch.h"
40#include "utils/wait_event.h"
41
42
43static TupleTableSlot *ExecGather(PlanState *pstate);
46static void ExecShutdownGatherWorkers(GatherState *node);
47
48
49/* ----------------------------------------------------------------
50 * ExecInitGather
51 * ----------------------------------------------------------------
52 */
54ExecInitGather(Gather *node, EState *estate, int eflags)
55{
58 TupleDesc tupDesc;
59
60 /* Gather node doesn't have innerPlan node. */
61 Assert(innerPlan(node) == NULL);
62
63 /*
64 * create state structure
65 */
67 gatherstate->ps.plan = (Plan *) node;
68 gatherstate->ps.state = estate;
69 gatherstate->ps.ExecProcNode = ExecGather;
70
71 gatherstate->initialized = false;
72 gatherstate->need_to_scan_locally =
74 gatherstate->tuples_needed = -1;
75
76 /*
77 * Miscellaneous initialization
78 *
79 * create expression context for node
80 */
82
83 /*
84 * now initialize outer plan
85 */
86 outerNode = outerPlan(node);
89
90 /*
91 * Leader may access ExecProcNode result directly (if
92 * need_to_scan_locally), or from workers via tuple queue. So we can't
93 * trivially rely on the slot type being fixed for expressions evaluated
94 * within this node.
95 */
96 gatherstate->ps.outeropsset = true;
97 gatherstate->ps.outeropsfixed = false;
98
99 /*
100 * Initialize result type and projection.
101 */
104
105 /*
106 * Without projections result slot type is not trivially known, see
107 * comment above.
108 */
109 if (gatherstate->ps.ps_ProjInfo == NULL)
110 {
111 gatherstate->ps.resultopsset = true;
112 gatherstate->ps.resultopsfixed = false;
113 }
114
115 /*
116 * Initialize funnel slot to same tuple descriptor as outer plan.
117 */
118 gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
120
121 /*
122 * Gather doesn't support checking a qual (it's always more efficient to
123 * do it in the child node).
124 */
125 Assert(!node->plan.qual);
126
127 return gatherstate;
128}
129
130/* ----------------------------------------------------------------
131 * ExecGather(node)
132 *
133 * Scans the relation via multiple workers and returns
134 * the next qualifying tuple.
135 * ----------------------------------------------------------------
136 */
137static TupleTableSlot *
139{
140 GatherState *node = castNode(GatherState, pstate);
141 TupleTableSlot *slot;
142 ExprContext *econtext;
143
145
146 /*
147 * Initialize the parallel context and workers on first execution. We do
148 * this on first execution rather than during node initialization, as it
149 * needs to allocate a large dynamic segment, so it is better to do it
150 * only if it is really needed.
151 */
152 if (!node->initialized)
153 {
154 EState *estate = node->ps.state;
155 Gather *gather = (Gather *) node->ps.plan;
156
157 /*
158 * Sometimes we might have to run without parallelism; but if parallel
159 * mode is active then we can try to fire up some workers.
160 */
161 if (gather->num_workers > 0 && estate->es_use_parallel_mode)
162 {
163 ParallelContext *pcxt;
164
165 /* Initialize, or re-initialize, shared state needed by workers. */
166 if (!node->pei)
168 estate,
169 gather->initParam,
170 gather->num_workers,
171 node->tuples_needed);
172 else
174 node->pei,
175 gather->initParam);
176
177 /*
178 * Register backend workers. We might not get as many as we
179 * requested, or indeed any at all.
180 */
181 pcxt = node->pei->pcxt;
183 /* We save # workers launched for the benefit of EXPLAIN */
185
186 /*
187 * Count number of workers originally wanted and actually
188 * launched.
189 */
192
193 /* Set up tuple queue readers to read the results. */
194 if (pcxt->nworkers_launched > 0)
195 {
197 /* Make a working array showing the active readers */
198 node->nreaders = pcxt->nworkers_launched;
199 node->reader = (TupleQueueReader **)
200 palloc(node->nreaders * sizeof(TupleQueueReader *));
201 memcpy(node->reader, node->pei->reader,
202 node->nreaders * sizeof(TupleQueueReader *));
203 }
204 else
205 {
206 /* No workers? Then never mind. */
207 node->nreaders = 0;
208 node->reader = NULL;
209 }
210 node->nextreader = 0;
211 }
212
213 /* Run plan locally if no workers or enabled and not single-copy. */
214 node->need_to_scan_locally = (node->nreaders == 0)
215 || (!gather->single_copy && parallel_leader_participation);
216 node->initialized = true;
217 }
218
219 /*
220 * Reset per-tuple memory context to free any expression evaluation
221 * storage allocated in the previous tuple cycle.
222 */
223 econtext = node->ps.ps_ExprContext;
224 ResetExprContext(econtext);
225
226 /*
227 * Get next tuple, either from one of our workers, or by running the plan
228 * ourselves.
229 */
230 slot = gather_getnext(node);
231 if (TupIsNull(slot))
232 return NULL;
233
234 /* If no projection is required, we're done. */
235 if (node->ps.ps_ProjInfo == NULL)
236 return slot;
237
238 /*
239 * Form the result tuple using ExecProject(), and return it.
240 */
241 econtext->ecxt_outertuple = slot;
242 return ExecProject(node->ps.ps_ProjInfo);
243}
244
245/* ----------------------------------------------------------------
246 * ExecEndGather
247 *
248 * frees any storage allocated through C routines.
249 * ----------------------------------------------------------------
250 */
251void
253{
254 ExecEndNode(outerPlanState(node)); /* let children clean up first */
255 ExecShutdownGather(node);
256}
257
258/*
259 * Read the next tuple. We might fetch a tuple from one of the tuple queues
260 * using gather_readnext, or if no tuple queue contains a tuple and the
261 * single_copy flag is not set, we might generate one locally instead.
262 */
263static TupleTableSlot *
265{
268 TupleTableSlot *fslot = gatherstate->funnel_slot;
270
271 while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
272 {
274
275 if (gatherstate->nreaders > 0)
276 {
278
280 {
281 ExecStoreMinimalTuple(tup, /* tuple to store */
282 fslot, /* slot to store the tuple */
283 false); /* don't pfree tuple */
284 return fslot;
285 }
286 }
287
288 if (gatherstate->need_to_scan_locally)
289 {
290 EState *estate = gatherstate->ps.state;
291
292 /* Install our DSA area while executing the plan. */
293 estate->es_query_dsa =
294 gatherstate->pei ? gatherstate->pei->area : NULL;
296 estate->es_query_dsa = NULL;
297
299 return outerTupleSlot;
300
301 gatherstate->need_to_scan_locally = false;
302 }
303 }
304
305 return ExecClearTuple(fslot);
306}
307
308/*
309 * Attempt to read a tuple from one of our parallel workers.
310 */
311static MinimalTuple
313{
314 int nvisited = 0;
315
316 for (;;)
317 {
318 TupleQueueReader *reader;
320 bool readerdone;
321
322 /* Check for async events, particularly messages from workers. */
324
325 /*
326 * Attempt to read a tuple, but don't block if none is available.
327 *
328 * Note that TupleQueueReaderNext will just return NULL for a worker
329 * which fails to initialize. We'll treat that worker as having
330 * produced no tuples; WaitForParallelWorkersToFinish will error out
331 * when we get there.
332 */
333 Assert(gatherstate->nextreader < gatherstate->nreaders);
334 reader = gatherstate->reader[gatherstate->nextreader];
335 tup = TupleQueueReaderNext(reader, true, &readerdone);
336
337 /*
338 * If this reader is done, remove it from our working array of active
339 * readers. If all readers are done, we're outta here.
340 */
341 if (readerdone)
342 {
343 Assert(!tup);
344 --gatherstate->nreaders;
345 if (gatherstate->nreaders == 0)
346 {
348 return NULL;
349 }
350 memmove(&gatherstate->reader[gatherstate->nextreader],
351 &gatherstate->reader[gatherstate->nextreader + 1],
352 sizeof(TupleQueueReader *)
353 * (gatherstate->nreaders - gatherstate->nextreader));
354 if (gatherstate->nextreader >= gatherstate->nreaders)
355 gatherstate->nextreader = 0;
356 continue;
357 }
358
359 /* If we got a tuple, return it. */
360 if (tup)
361 return tup;
362
363 /*
364 * Advance nextreader pointer in round-robin fashion. Note that we
365 * only reach this code if we weren't able to get a tuple from the
366 * current worker. We used to advance the nextreader pointer after
367 * every tuple, but it turns out to be much more efficient to keep
368 * reading from the same queue until that would require blocking.
369 */
370 gatherstate->nextreader++;
371 if (gatherstate->nextreader >= gatherstate->nreaders)
372 gatherstate->nextreader = 0;
373
374 /* Have we visited every (surviving) TupleQueueReader? */
375 nvisited++;
376 if (nvisited >= gatherstate->nreaders)
377 {
378 /*
379 * If (still) running plan locally, return NULL so caller can
380 * generate another tuple from the local copy of the plan.
381 */
382 if (gatherstate->need_to_scan_locally)
383 return NULL;
384
385 /* Nothing to do except wait for developments. */
389 nvisited = 0;
390 }
391 }
392}
393
394/* ----------------------------------------------------------------
395 * ExecShutdownGatherWorkers
396 *
397 * Stop all the parallel workers.
398 * ----------------------------------------------------------------
399 */
400static void
402{
403 if (node->pei != NULL)
404 ExecParallelFinish(node->pei);
405
406 /* Flush local copy of reader array */
407 if (node->reader)
408 pfree(node->reader);
409 node->reader = NULL;
410}
411
412/* ----------------------------------------------------------------
413 * ExecShutdownGather
414 *
415 * Destroy the setup for parallel workers including parallel context.
416 * ----------------------------------------------------------------
417 */
418void
420{
422
423 /* Now destroy the parallel context. */
424 if (node->pei != NULL)
425 {
427 node->pei = NULL;
428 }
429}
430
431/* ----------------------------------------------------------------
432 * Join Support
433 * ----------------------------------------------------------------
434 */
435
436/* ----------------------------------------------------------------
437 * ExecReScanGather
438 *
439 * Prepare to re-scan the result of a Gather.
440 * ----------------------------------------------------------------
441 */
442void
444{
445 Gather *gather = (Gather *) node->ps.plan;
447
448 /* Make sure any existing workers are gracefully shut down */
450
451 /* Mark node so that shared state will be rebuilt at next call */
452 node->initialized = false;
453
454 /*
455 * Set child node's chgParam to tell it that the next scan might deliver a
456 * different set of rows within the leader process. (The overall rowset
457 * shouldn't change, but the leader process's subset might; hence nodes
458 * between here and the parallel table scan node mustn't optimize on the
459 * assumption of an unchanging rowset.)
460 */
461 if (gather->rescan_param >= 0)
462 outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
463 gather->rescan_param);
464
465 /*
466 * If chgParam of subnode is not null then plan will be re-scanned by
467 * first ExecProcNode. Note: because this does nothing if we have a
468 * rescan_param, it's currently guaranteed that parallel-aware child nodes
469 * will not see a ReScan call until after they get a ReInitializeDSM call.
470 * That ordering might not be something to rely on, though. A good rule
471 * of thumb is that ReInitializeDSM should reset only shared state, ReScan
472 * should reset only local state, and anything that depends on both of
473 * those steps being finished must wait until the first ExecProcNode call.
474 */
475 if (outerPlan->chgParam == NULL)
477}
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition parallel.c:582
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition bitmapset.c:799
#define Assert(condition)
Definition c.h:885
void ExecReScan(PlanState *node)
Definition execAmi.c:77
void ExecParallelCleanup(ParallelExecutorInfo *pei)
void ExecParallelReinitialize(PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParams)
void ExecParallelCreateReaders(ParallelExecutorInfo *pei)
ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed)
void ExecParallelFinish(ParallelExecutorInfo *pei)
void ExecEndNode(PlanState *node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
void ExecInitResultTypeTL(PlanState *planstate)
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
Definition execTuples.c:86
TupleDesc ExecGetResultType(PlanState *planstate)
Definition execUtils.c:495
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition execUtils.c:485
void ExecConditionalAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc, int varno)
Definition execUtils.c:603
#define outerPlanState(node)
Definition execnodes.h:1264
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
Definition executor.h:483
#define ResetExprContext(econtext)
Definition executor.h:650
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition executor.h:314
struct Latch * MyLatch
Definition globals.c:63
#define HeapTupleIsValid(tuple)
Definition htup.h:78
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
void pfree(void *pointer)
Definition mcxt.c:1616
void * palloc(Size size)
Definition mcxt.c:1387
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
void ExecEndGather(GatherState *node)
Definition nodeGather.c:252
static MinimalTuple gather_readnext(GatherState *gatherstate)
Definition nodeGather.c:312
static TupleTableSlot * ExecGather(PlanState *pstate)
Definition nodeGather.c:138
static void ExecShutdownGatherWorkers(GatherState *node)
Definition nodeGather.c:401
void ExecShutdownGather(GatherState *node)
Definition nodeGather.c:419
void ExecReScanGather(GatherState *node)
Definition nodeGather.c:443
GatherState * ExecInitGather(Gather *node, EState *estate, int eflags)
Definition nodeGather.c:54
static TupleTableSlot * gather_getnext(GatherState *gatherstate)
Definition nodeGather.c:264
#define makeNode(_type_)
Definition nodes.h:161
#define castNode(_type_, nodeptr)
Definition nodes.h:182
bool parallel_leader_participation
Definition planner.c:70
#define innerPlan(node)
Definition plannodes.h:266
#define outerPlan(node)
Definition plannodes.h:267
static int fb(int x)
#define OUTER_VAR
Definition primnodes.h:244
struct dsa_area * es_query_dsa
Definition execnodes.h:755
int es_parallel_workers_to_launch
Definition execnodes.h:749
bool es_use_parallel_mode
Definition execnodes.h:747
int es_parallel_workers_launched
Definition execnodes.h:751
TupleTableSlot * ecxt_outertuple
Definition execnodes.h:279
bool initialized
Definition execnodes.h:2631
struct ParallelExecutorInfo * pei
Definition execnodes.h:2636
int nworkers_launched
Definition execnodes.h:2638
PlanState ps
Definition execnodes.h:2630
struct TupleQueueReader ** reader
Definition execnodes.h:2641
int64 tuples_needed
Definition execnodes.h:2633
bool need_to_scan_locally
Definition execnodes.h:2632
bool single_copy
Definition plannodes.h:1362
Plan plan
Definition plannodes.h:1356
int nworkers_launched
Definition parallel.h:39
int nworkers_to_launch
Definition parallel.h:38
ParallelContext * pcxt
struct TupleQueueReader ** reader
Plan * plan
Definition execnodes.h:1168
EState * state
Definition execnodes.h:1170
ExprContext * ps_ExprContext
Definition execnodes.h:1207
ProjectionInfo * ps_ProjInfo
Definition execnodes.h:1208
List * qual
Definition plannodes.h:237
MinimalTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Definition tqueue.c:176
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:457
#define TupIsNull(slot)
Definition tuptable.h:309
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET