PostgreSQL Source Code git master
Loading...
Searching...
No Matches
nodeGather.c File Reference
#include "postgres.h"
#include "executor/execParallel.h"
#include "executor/executor.h"
#include "executor/nodeGather.h"
#include "executor/tqueue.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "storage/latch.h"
#include "utils/wait_event.h"
Include dependency graph for nodeGather.c:

Go to the source code of this file.

Functions

static TupleTableSlotExecGather (PlanState *pstate)
 
static TupleTableSlotgather_getnext (GatherState *gatherstate)
 
static MinimalTuple gather_readnext (GatherState *gatherstate)
 
static void ExecShutdownGatherWorkers (GatherState *node)
 
GatherStateExecInitGather (Gather *node, EState *estate, int eflags)
 
void ExecEndGather (GatherState *node)
 
void ExecShutdownGather (GatherState *node)
 
void ExecReScanGather (GatherState *node)
 

Function Documentation

◆ ExecEndGather()

void ExecEndGather ( GatherState node)

Definition at line 252 of file nodeGather.c.

253{
254 ExecEndNode(outerPlanState(node)); /* let children clean up first */
255 ExecShutdownGather(node);
256}
void ExecEndNode(PlanState *node)
#define outerPlanState(node)
Definition execnodes.h:1264
void ExecShutdownGather(GatherState *node)
Definition nodeGather.c:419

References ExecEndNode(), ExecShutdownGather(), and outerPlanState.

Referenced by ExecEndNode().

◆ ExecGather()

static TupleTableSlot * ExecGather ( PlanState pstate)
static

Definition at line 138 of file nodeGather.c.

139{
140 GatherState *node = castNode(GatherState, pstate);
141 TupleTableSlot *slot;
142 ExprContext *econtext;
143
145
146 /*
147 * Initialize the parallel context and workers on first execution. We do
148 * this on first execution rather than during node initialization, as it
149 * needs to allocate a large dynamic segment, so it is better to do it
150 * only if it is really needed.
151 */
152 if (!node->initialized)
153 {
154 EState *estate = node->ps.state;
155 Gather *gather = (Gather *) node->ps.plan;
156
157 /*
158 * Sometimes we might have to run without parallelism; but if parallel
159 * mode is active then we can try to fire up some workers.
160 */
161 if (gather->num_workers > 0 && estate->es_use_parallel_mode)
162 {
163 ParallelContext *pcxt;
164
165 /* Initialize, or re-initialize, shared state needed by workers. */
166 if (!node->pei)
168 estate,
169 gather->initParam,
170 gather->num_workers,
171 node->tuples_needed);
172 else
174 node->pei,
175 gather->initParam);
176
177 /*
178 * Register backend workers. We might not get as many as we
179 * requested, or indeed any at all.
180 */
181 pcxt = node->pei->pcxt;
183 /* We save # workers launched for the benefit of EXPLAIN */
185
186 /*
187 * Count number of workers originally wanted and actually
188 * launched.
189 */
190 estate->es_parallel_workers_to_launch += pcxt->nworkers_to_launch;
191 estate->es_parallel_workers_launched += pcxt->nworkers_launched;
192
193 /* Set up tuple queue readers to read the results. */
194 if (pcxt->nworkers_launched > 0)
195 {
197 /* Make a working array showing the active readers */
198 node->nreaders = pcxt->nworkers_launched;
199 node->reader = (TupleQueueReader **)
200 palloc(node->nreaders * sizeof(TupleQueueReader *));
201 memcpy(node->reader, node->pei->reader,
202 node->nreaders * sizeof(TupleQueueReader *));
203 }
204 else
205 {
206 /* No workers? Then never mind. */
207 node->nreaders = 0;
208 node->reader = NULL;
209 }
210 node->nextreader = 0;
211 }
212
213 /* Run plan locally if no workers or enabled and not single-copy. */
214 node->need_to_scan_locally = (node->nreaders == 0)
215 || (!gather->single_copy && parallel_leader_participation);
216 node->initialized = true;
217 }
218
219 /*
220 * Reset per-tuple memory context to free any expression evaluation
221 * storage allocated in the previous tuple cycle.
222 */
223 econtext = node->ps.ps_ExprContext;
224 ResetExprContext(econtext);
225
226 /*
227 * Get next tuple, either from one of our workers, or by running the plan
228 * ourselves.
229 */
230 slot = gather_getnext(node);
231 if (TupIsNull(slot))
232 return NULL;
233
234 /* If no projection is required, we're done. */
235 if (node->ps.ps_ProjInfo == NULL)
236 return slot;
237
238 /*
239 * Form the result tuple using ExecProject(), and return it.
240 */
241 econtext->ecxt_outertuple = slot;
242 return ExecProject(node->ps.ps_ProjInfo);
243}
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition parallel.c:582
void ExecParallelReinitialize(PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParams)
void ExecParallelCreateReaders(ParallelExecutorInfo *pei)
ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed)
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
Definition executor.h:483
#define ResetExprContext(econtext)
Definition executor.h:650
void * palloc(Size size)
Definition mcxt.c:1387
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
static TupleTableSlot * gather_getnext(GatherState *gatherstate)
Definition nodeGather.c:264
#define castNode(_type_, nodeptr)
Definition nodes.h:182
bool parallel_leader_participation
Definition planner.c:70
static int fb(int x)
TupleTableSlot * ecxt_outertuple
Definition execnodes.h:279
bool initialized
Definition execnodes.h:2631
struct ParallelExecutorInfo * pei
Definition execnodes.h:2636
int nworkers_launched
Definition execnodes.h:2638
PlanState ps
Definition execnodes.h:2630
struct TupleQueueReader ** reader
Definition execnodes.h:2641
int64 tuples_needed
Definition execnodes.h:2633
bool need_to_scan_locally
Definition execnodes.h:2632
int nworkers_launched
Definition parallel.h:39
int nworkers_to_launch
Definition parallel.h:38
ParallelContext * pcxt
struct TupleQueueReader ** reader
Plan * plan
Definition execnodes.h:1168
EState * state
Definition execnodes.h:1170
ExprContext * ps_ExprContext
Definition execnodes.h:1207
ProjectionInfo * ps_ProjInfo
Definition execnodes.h:1208
#define TupIsNull(slot)
Definition tuptable.h:309

References castNode, CHECK_FOR_INTERRUPTS, ExprContext::ecxt_outertuple, EState::es_parallel_workers_launched, EState::es_parallel_workers_to_launch, EState::es_use_parallel_mode, ExecInitParallelPlan(), ExecParallelCreateReaders(), ExecParallelReinitialize(), ExecProject(), fb(), gather_getnext(), GatherState::initialized, LaunchParallelWorkers(), GatherState::need_to_scan_locally, GatherState::nextreader, GatherState::nreaders, ParallelContext::nworkers_launched, GatherState::nworkers_launched, ParallelContext::nworkers_to_launch, outerPlanState, palloc(), parallel_leader_participation, ParallelExecutorInfo::pcxt, GatherState::pei, PlanState::plan, GatherState::ps, PlanState::ps_ExprContext, PlanState::ps_ProjInfo, ParallelExecutorInfo::reader, GatherState::reader, ResetExprContext, PlanState::state, TupIsNull, and GatherState::tuples_needed.

Referenced by ExecInitGather().

◆ ExecInitGather()

GatherState * ExecInitGather ( Gather node,
EState estate,
int  eflags 
)

Definition at line 54 of file nodeGather.c.

55{
58 TupleDesc tupDesc;
59
60 /* Gather node doesn't have innerPlan node. */
61 Assert(innerPlan(node) == NULL);
62
63 /*
64 * create state structure
65 */
67 gatherstate->ps.plan = (Plan *) node;
68 gatherstate->ps.state = estate;
69 gatherstate->ps.ExecProcNode = ExecGather;
70
71 gatherstate->initialized = false;
72 gatherstate->need_to_scan_locally =
74 gatherstate->tuples_needed = -1;
75
76 /*
77 * Miscellaneous initialization
78 *
79 * create expression context for node
80 */
82
83 /*
84 * now initialize outer plan
85 */
86 outerNode = outerPlan(node);
89
90 /*
91 * Leader may access ExecProcNode result directly (if
92 * need_to_scan_locally), or from workers via tuple queue. So we can't
93 * trivially rely on the slot type being fixed for expressions evaluated
94 * within this node.
95 */
96 gatherstate->ps.outeropsset = true;
97 gatherstate->ps.outeropsfixed = false;
98
99 /*
100 * Initialize result type and projection.
101 */
104
105 /*
106 * Without projections result slot type is not trivially known, see
107 * comment above.
108 */
109 if (gatherstate->ps.ps_ProjInfo == NULL)
110 {
111 gatherstate->ps.resultopsset = true;
112 gatherstate->ps.resultopsfixed = false;
113 }
114
115 /*
116 * Initialize funnel slot to same tuple descriptor as outer plan.
117 */
118 gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
120
121 /*
122 * Gather doesn't support checking a qual (it's always more efficient to
123 * do it in the child node).
124 */
125 Assert(!node->plan.qual);
126
127 return gatherstate;
128}
#define Assert(condition)
Definition c.h:885
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
void ExecInitResultTypeTL(PlanState *planstate)
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
Definition execTuples.c:86
TupleDesc ExecGetResultType(PlanState *planstate)
Definition execUtils.c:495
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition execUtils.c:485
void ExecConditionalAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc, int varno)
Definition execUtils.c:603
static TupleTableSlot * ExecGather(PlanState *pstate)
Definition nodeGather.c:138
#define makeNode(_type_)
Definition nodes.h:161
#define innerPlan(node)
Definition plannodes.h:266
#define outerPlan(node)
Definition plannodes.h:267
#define OUTER_VAR
Definition primnodes.h:244
bool single_copy
Definition plannodes.h:1362
Plan plan
Definition plannodes.h:1356
List * qual
Definition plannodes.h:237

References Assert, ExecAssignExprContext(), ExecConditionalAssignProjectionInfo(), ExecGather(), ExecGetResultType(), ExecInitExtraTupleSlot(), ExecInitNode(), ExecInitResultTypeTL(), fb(), innerPlan, makeNode, OUTER_VAR, outerPlan, outerPlanState, parallel_leader_participation, Gather::plan, Plan::qual, Gather::single_copy, and TTSOpsMinimalTuple.

Referenced by ExecInitNode().

◆ ExecReScanGather()

void ExecReScanGather ( GatherState node)

Definition at line 443 of file nodeGather.c.

444{
445 Gather *gather = (Gather *) node->ps.plan;
447
448 /* Make sure any existing workers are gracefully shut down */
450
451 /* Mark node so that shared state will be rebuilt at next call */
452 node->initialized = false;
453
454 /*
455 * Set child node's chgParam to tell it that the next scan might deliver a
456 * different set of rows within the leader process. (The overall rowset
457 * shouldn't change, but the leader process's subset might; hence nodes
458 * between here and the parallel table scan node mustn't optimize on the
459 * assumption of an unchanging rowset.)
460 */
461 if (gather->rescan_param >= 0)
462 outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
463 gather->rescan_param);
464
465 /*
466 * If chgParam of subnode is not null then plan will be re-scanned by
467 * first ExecProcNode. Note: because this does nothing if we have a
468 * rescan_param, it's currently guaranteed that parallel-aware child nodes
469 * will not see a ReScan call until after they get a ReInitializeDSM call.
470 * That ordering might not be something to rely on, though. A good rule
471 * of thumb is that ReInitializeDSM should reset only shared state, ReScan
472 * should reset only local state, and anything that depends on both of
473 * those steps being finished must wait until the first ExecProcNode call.
474 */
475 if (outerPlan->chgParam == NULL)
477}
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition bitmapset.c:799
void ExecReScan(PlanState *node)
Definition execAmi.c:77
static void ExecShutdownGatherWorkers(GatherState *node)
Definition nodeGather.c:401

References bms_add_member(), ExecReScan(), ExecShutdownGatherWorkers(), fb(), GatherState::initialized, outerPlan, outerPlanState, PlanState::plan, and GatherState::ps.

Referenced by ExecReScan().

◆ ExecShutdownGather()

void ExecShutdownGather ( GatherState node)

Definition at line 419 of file nodeGather.c.

420{
422
423 /* Now destroy the parallel context. */
424 if (node->pei != NULL)
425 {
427 node->pei = NULL;
428 }
429}
void ExecParallelCleanup(ParallelExecutorInfo *pei)

References ExecParallelCleanup(), ExecShutdownGatherWorkers(), fb(), and GatherState::pei.

Referenced by ExecEndGather(), and ExecShutdownNode_walker().

◆ ExecShutdownGatherWorkers()

static void ExecShutdownGatherWorkers ( GatherState node)
static

Definition at line 401 of file nodeGather.c.

402{
403 if (node->pei != NULL)
404 ExecParallelFinish(node->pei);
405
406 /* Flush local copy of reader array */
407 if (node->reader)
408 pfree(node->reader);
409 node->reader = NULL;
410}
void ExecParallelFinish(ParallelExecutorInfo *pei)
void pfree(void *pointer)
Definition mcxt.c:1616

References ExecParallelFinish(), fb(), GatherState::pei, pfree(), and GatherState::reader.

Referenced by ExecReScanGather(), ExecShutdownGather(), and gather_readnext().

◆ gather_getnext()

static TupleTableSlot * gather_getnext ( GatherState gatherstate)
static

Definition at line 264 of file nodeGather.c.

265{
268 TupleTableSlot *fslot = gatherstate->funnel_slot;
270
271 while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
272 {
274
275 if (gatherstate->nreaders > 0)
276 {
278
280 {
281 ExecStoreMinimalTuple(tup, /* tuple to store */
282 fslot, /* slot to store the tuple */
283 false); /* don't pfree tuple */
284 return fslot;
285 }
286 }
287
288 if (gatherstate->need_to_scan_locally)
289 {
290 EState *estate = gatherstate->ps.state;
291
292 /* Install our DSA area while executing the plan. */
293 estate->es_query_dsa =
294 gatherstate->pei ? gatherstate->pei->area : NULL;
296 estate->es_query_dsa = NULL;
297
299 return outerTupleSlot;
300
301 gatherstate->need_to_scan_locally = false;
302 }
303 }
304
305 return ExecClearTuple(fslot);
306}
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition executor.h:314
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static MinimalTuple gather_readnext(GatherState *gatherstate)
Definition nodeGather.c:312
struct dsa_area * es_query_dsa
Definition execnodes.h:755
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:457

References CHECK_FOR_INTERRUPTS, EState::es_query_dsa, ExecClearTuple(), ExecProcNode(), ExecStoreMinimalTuple(), fb(), gather_readnext(), HeapTupleIsValid, outerPlan, outerPlanState, and TupIsNull.

Referenced by ExecGather().

◆ gather_readnext()

static MinimalTuple gather_readnext ( GatherState gatherstate)
static

Definition at line 312 of file nodeGather.c.

313{
314 int nvisited = 0;
315
316 for (;;)
317 {
318 TupleQueueReader *reader;
320 bool readerdone;
321
322 /* Check for async events, particularly messages from workers. */
324
325 /*
326 * Attempt to read a tuple, but don't block if none is available.
327 *
328 * Note that TupleQueueReaderNext will just return NULL for a worker
329 * which fails to initialize. We'll treat that worker as having
330 * produced no tuples; WaitForParallelWorkersToFinish will error out
331 * when we get there.
332 */
333 Assert(gatherstate->nextreader < gatherstate->nreaders);
334 reader = gatherstate->reader[gatherstate->nextreader];
335 tup = TupleQueueReaderNext(reader, true, &readerdone);
336
337 /*
338 * If this reader is done, remove it from our working array of active
339 * readers. If all readers are done, we're outta here.
340 */
341 if (readerdone)
342 {
343 Assert(!tup);
344 --gatherstate->nreaders;
345 if (gatherstate->nreaders == 0)
346 {
348 return NULL;
349 }
350 memmove(&gatherstate->reader[gatherstate->nextreader],
351 &gatherstate->reader[gatherstate->nextreader + 1],
352 sizeof(TupleQueueReader *)
353 * (gatherstate->nreaders - gatherstate->nextreader));
354 if (gatherstate->nextreader >= gatherstate->nreaders)
355 gatherstate->nextreader = 0;
356 continue;
357 }
358
359 /* If we got a tuple, return it. */
360 if (tup)
361 return tup;
362
363 /*
364 * Advance nextreader pointer in round-robin fashion. Note that we
365 * only reach this code if we weren't able to get a tuple from the
366 * current worker. We used to advance the nextreader pointer after
367 * every tuple, but it turns out to be much more efficient to keep
368 * reading from the same queue until that would require blocking.
369 */
370 gatherstate->nextreader++;
371 if (gatherstate->nextreader >= gatherstate->nreaders)
372 gatherstate->nextreader = 0;
373
374 /* Have we visited every (surviving) TupleQueueReader? */
375 nvisited++;
376 if (nvisited >= gatherstate->nreaders)
377 {
378 /*
379 * If (still) running plan locally, return NULL so caller can
380 * generate another tuple from the local copy of the plan.
381 */
382 if (gatherstate->need_to_scan_locally)
383 return NULL;
384
385 /* Nothing to do except wait for developments. */
389 nvisited = 0;
390 }
391 }
392}
struct Latch * MyLatch
Definition globals.c:63
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
MinimalTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Definition tqueue.c:176
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET

References Assert, CHECK_FOR_INTERRUPTS, ExecShutdownGatherWorkers(), fb(), MyLatch, ResetLatch(), TupleQueueReaderNext(), WaitLatch(), WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by gather_getnext().