PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
nodeGather.c File Reference
#include "postgres.h"
#include "access/relscan.h"
#include "access/xact.h"
#include "executor/execdebug.h"
#include "executor/execParallel.h"
#include "executor/nodeGather.h"
#include "executor/nodeSubplan.h"
#include "executor/tqueue.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for nodeGather.c:

Go to the source code of this file.

Functions

static TupleTableSlotExecGather (PlanState *pstate)
 
static TupleTableSlotgather_getnext (GatherState *gatherstate)
 
static HeapTuple gather_readnext (GatherState *gatherstate)
 
static void ExecShutdownGatherWorkers (GatherState *node)
 
GatherStateExecInitGather (Gather *node, EState *estate, int eflags)
 
void ExecEndGather (GatherState *node)
 
void ExecShutdownGather (GatherState *node)
 
void ExecReScanGather (GatherState *node)
 

Function Documentation

void ExecEndGather ( GatherState node)

Definition at line 231 of file nodeGather.c.

References ExecClearTuple(), ExecEndNode(), ExecFreeExprContext(), ExecShutdownGather(), outerPlanState, GatherState::ps, and PlanState::ps_ResultTupleSlot.

Referenced by ExecEndNode().

232 {
233  ExecEndNode(outerPlanState(node)); /* let children clean up first */
234  ExecShutdownGather(node);
235  ExecFreeExprContext(&node->ps);
237 }
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:523
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:516
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:877
#define outerPlanState(node)
Definition: execnodes.h:890
PlanState ps
Definition: execnodes.h:1910
void ExecShutdownGather(GatherState *node)
Definition: nodeGather.c:407
static TupleTableSlot * ExecGather ( PlanState pstate)
static

Definition at line 126 of file nodeGather.c.

References ParallelWorkerInfo::bgwhandle, castNode, CHECK_FOR_INTERRUPTS, CreateTupleQueueReader(), ExprContext::ecxt_outertuple, ExecClearTuple(), ExecInitParallelPlan(), ExecProject(), ExecShutdownGatherWorkers(), GatherState::funnel_slot, gather_getnext(), i, GatherState::initialized, IsInParallelMode(), LaunchParallelWorkers(), PlanState::lefttree, GatherState::need_to_scan_locally, GatherState::nextreader, GatherState::nreaders, NULL, Gather::num_workers, ParallelContext::nworkers_launched, GatherState::nworkers_launched, palloc(), ParallelExecutorInfo::pcxt, GatherState::pei, PlanState::plan, GatherState::ps, PlanState::ps_ExprContext, PlanState::ps_ProjInfo, GatherState::reader, ResetExprContext, shm_mq_set_handle(), Gather::single_copy, PlanState::state, ParallelExecutorInfo::tqueue, TupleTableSlot::tts_tupleDescriptor, TupIsNull, and ParallelContext::worker.

Referenced by ExecInitGather().

127 {
128  GatherState *node = castNode(GatherState, pstate);
129  TupleTableSlot *fslot = node->funnel_slot;
130  int i;
131  TupleTableSlot *slot;
132  ExprContext *econtext;
133 
135 
136  /*
137  * Initialize the parallel context and workers on first execution. We do
138  * this on first execution rather than during node initialization, as it
139  * needs to allocate a large dynamic segment, so it is better to do it
140  * only if it is really needed.
141  */
142  if (!node->initialized)
143  {
144  EState *estate = node->ps.state;
145  Gather *gather = (Gather *) node->ps.plan;
146 
147  /*
148  * Sometimes we might have to run without parallelism; but if parallel
149  * mode is active then we can try to fire up some workers.
150  */
151  if (gather->num_workers > 0 && IsInParallelMode())
152  {
153  ParallelContext *pcxt;
154 
155  /* Initialize the workers required to execute Gather node. */
156  if (!node->pei)
157  node->pei = ExecInitParallelPlan(node->ps.lefttree,
158  estate,
159  gather->num_workers);
160 
161  /*
162  * Register backend workers. We might not get as many as we
163  * requested, or indeed any at all.
164  */
165  pcxt = node->pei->pcxt;
166  LaunchParallelWorkers(pcxt);
167  node->nworkers_launched = pcxt->nworkers_launched;
168 
169  /* Set up tuple queue readers to read the results. */
170  if (pcxt->nworkers_launched > 0)
171  {
172  node->nreaders = 0;
173  node->nextreader = 0;
174  node->reader =
175  palloc(pcxt->nworkers_launched * sizeof(TupleQueueReader *));
176 
177  for (i = 0; i < pcxt->nworkers_launched; ++i)
178  {
179  shm_mq_set_handle(node->pei->tqueue[i],
180  pcxt->worker[i].bgwhandle);
181  node->reader[node->nreaders++] =
183  fslot->tts_tupleDescriptor);
184  }
185  }
186  else
187  {
188  /* No workers? Then never mind. */
190  }
191  }
192 
193  /* Run plan locally if no workers or not single-copy. */
194  node->need_to_scan_locally = (node->reader == NULL)
195  || !gather->single_copy;
196  node->initialized = true;
197  }
198 
199  /*
200  * Reset per-tuple memory context to free any expression evaluation
201  * storage allocated in the previous tuple cycle. This will also clear
202  * any previous tuple returned by a TupleQueueReader; to make sure we
203  * don't leave a dangling pointer around, clear the working slot first.
204  */
205  ExecClearTuple(fslot);
206  econtext = node->ps.ps_ExprContext;
207  ResetExprContext(econtext);
208 
209  /*
210  * Get next tuple, either from one of our workers, or by running the plan
211  * ourselves.
212  */
213  slot = gather_getnext(node);
214  if (TupIsNull(slot))
215  return NULL;
216 
217  /*
218  * Form the result tuple using ExecProject(), and return it.
219  */
220  econtext->ecxt_outertuple = slot;
221  return ExecProject(node->ps.ps_ProjInfo);
222 }
ParallelContext * pcxt
Definition: execParallel.h:27
int nworkers_launched
Definition: execnodes.h:1915
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:879
#define castNode(_type_, nodeptr)
Definition: nodes.h:578
ExprContext * ps_ExprContext
Definition: execnodes.h:878
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
bool need_to_scan_locally
Definition: execnodes.h:1918
struct TupleQueueReader ** reader
Definition: execnodes.h:1916
ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
Definition: execParallel.c:384
EState * state
Definition: execnodes.h:846
struct PlanState * lefttree
Definition: execnodes.h:863
ParallelWorkerInfo * worker
Definition: parallel.h:45
bool IsInParallelMode(void)
Definition: xact.c:913
bool initialized
Definition: execnodes.h:1911
static void ExecShutdownGatherWorkers(GatherState *node)
Definition: nodeGather.c:379
bool single_copy
Definition: plannodes.h:834
struct ParallelExecutorInfo * pei
Definition: execnodes.h:1912
BackgroundWorkerHandle * bgwhandle
Definition: parallel.h:27
#define TupIsNull(slot)
Definition: tuptable.h:138
PlanState ps
Definition: execnodes.h:1910
int nworkers_launched
Definition: parallel.h:37
void LaunchParallelWorkers(ParallelContext *pcxt)
Definition: parallel.c:417
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:121
TupleTableSlot * funnel_slot
Definition: execnodes.h:1917
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:311
Plan * plan
Definition: execnodes.h:844
int num_workers
Definition: plannodes.h:833
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:199
#define NULL
Definition: c.h:229
static TupleTableSlot * gather_getnext(GatherState *gatherstate)
Definition: nodeGather.c:245
int nextreader
Definition: execnodes.h:1914
void * palloc(Size size)
Definition: mcxt.c:849
int i
TupleQueueReader * CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
Definition: tqueue.c:633
shm_mq_handle ** tqueue
Definition: execParallel.h:30
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
Definition: executor.h:327
#define ResetExprContext(econtext)
Definition: executor.h:468
GatherState* ExecInitGather ( Gather node,
EState estate,
int  eflags 
)

Definition at line 57 of file nodeGather.c.

References Assert, ExecAssignExprContext(), ExecAssignProjectionInfo(), ExecAssignResultTypeFromTL(), ExecContextForcesOids(), ExecGather(), ExecInitExtraTupleSlot(), ExecInitNode(), ExecInitQual(), ExecInitResultTupleSlot(), PlanState::ExecProcNode, ExecSetSlotDescriptor(), ExecTypeFromTL(), GatherState::funnel_slot, innerPlan, makeNode, GatherState::need_to_scan_locally, NULL, outerPlan, outerPlanState, Gather::plan, PlanState::plan, GatherState::ps, Plan::qual, PlanState::qual, Gather::single_copy, PlanState::state, and Plan::targetlist.

Referenced by ExecInitNode().

58 {
59  GatherState *gatherstate;
60  Plan *outerNode;
61  bool hasoid;
62  TupleDesc tupDesc;
63 
64  /* Gather node doesn't have innerPlan node. */
65  Assert(innerPlan(node) == NULL);
66 
67  /*
68  * create state structure
69  */
70  gatherstate = makeNode(GatherState);
71  gatherstate->ps.plan = (Plan *) node;
72  gatherstate->ps.state = estate;
73  gatherstate->ps.ExecProcNode = ExecGather;
74  gatherstate->need_to_scan_locally = !node->single_copy;
75 
76  /*
77  * Miscellaneous initialization
78  *
79  * create expression context for node
80  */
81  ExecAssignExprContext(estate, &gatherstate->ps);
82 
83  /*
84  * initialize child expressions
85  */
86  gatherstate->ps.qual =
87  ExecInitQual(node->plan.qual, (PlanState *) gatherstate);
88 
89  /*
90  * tuple table initialization
91  */
92  gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate);
93  ExecInitResultTupleSlot(estate, &gatherstate->ps);
94 
95  /*
96  * now initialize outer plan
97  */
98  outerNode = outerPlan(node);
99  outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
100 
101  /*
102  * Initialize result tuple type and projection info.
103  */
104  ExecAssignResultTypeFromTL(&gatherstate->ps);
105  ExecAssignProjectionInfo(&gatherstate->ps, NULL);
106 
107  /*
108  * Initialize funnel slot to same tuple descriptor as outer plan.
109  */
110  if (!ExecContextForcesOids(&gatherstate->ps, &hasoid))
111  hasoid = false;
112  tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
113  ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
114 
115  return gatherstate;
116 }
List * qual
Definition: plannodes.h:145
static TupleTableSlot * ExecGather(PlanState *pstate)
Definition: nodeGather.c:126
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
bool need_to_scan_locally
Definition: execnodes.h:1918
EState * state
Definition: execnodes.h:846
ExprState * ExecInitQual(List *qual, PlanState *parent)
Definition: execExpr.c:160
void ExecAssignResultTypeFromTL(PlanState *planstate)
Definition: execUtils.c:440
Plan plan
Definition: plannodes.h:832
bool single_copy
Definition: plannodes.h:834
void ExecInitResultTupleSlot(EState *estate, PlanState *planstate)
Definition: execTuples.c:832
#define outerPlanState(node)
Definition: execnodes.h:890
#define innerPlan(node)
Definition: plannodes.h:173
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
Definition: execUtils.c:487
PlanState ps
Definition: execnodes.h:1910
TupleDesc ExecTypeFromTL(List *targetList, bool hasoid)
Definition: execTuples.c:888
#define outerPlan(node)
Definition: plannodes.h:174
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:850
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
TupleTableSlot * funnel_slot
Definition: execnodes.h:1917
Plan * plan
Definition: execnodes.h:844
#define makeNode(_type_)
Definition: nodes.h:557
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:418
List * targetlist
Definition: plannodes.h:144
ExprState * qual
Definition: execnodes.h:862
bool ExecContextForcesOids(PlanState *planstate, bool *hasoids)
Definition: execMain.c:1487
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:139
void ExecReScanGather ( GatherState node)

Definition at line 431 of file nodeGather.c.

References ExecParallelReinitialize(), ExecReScan(), ExecShutdownGatherWorkers(), GatherState::initialized, PlanState::lefttree, GatherState::pei, and GatherState::ps.

Referenced by ExecReScan().

432 {
433  /*
434  * Re-initialize the parallel workers to perform rescan of relation. We
435  * want to gracefully shutdown all the workers so that they should be able
436  * to propagate any error or other information to master backend before
437  * dying. Parallel context will be reused for rescan.
438  */
440 
441  node->initialized = false;
442 
443  if (node->pei)
445 
446  ExecReScan(node->ps.lefttree);
447 }
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
struct PlanState * lefttree
Definition: execnodes.h:863
bool initialized
Definition: execnodes.h:1911
static void ExecShutdownGatherWorkers(GatherState *node)
Definition: nodeGather.c:379
struct ParallelExecutorInfo * pei
Definition: execnodes.h:1912
PlanState ps
Definition: execnodes.h:1910
void ExecParallelReinitialize(ParallelExecutorInfo *pei)
Definition: execParallel.c:372
void ExecShutdownGather ( GatherState node)

Definition at line 407 of file nodeGather.c.

References ExecParallelCleanup(), ExecShutdownGatherWorkers(), NULL, and GatherState::pei.

Referenced by ExecEndGather(), and ExecShutdownNode().

408 {
410 
411  /* Now destroy the parallel context. */
412  if (node->pei != NULL)
413  {
414  ExecParallelCleanup(node->pei);
415  node->pei = NULL;
416  }
417 }
static void ExecShutdownGatherWorkers(GatherState *node)
Definition: nodeGather.c:379
struct ParallelExecutorInfo * pei
Definition: execnodes.h:1912
void ExecParallelCleanup(ParallelExecutorInfo *pei)
Definition: execParallel.c:662
#define NULL
Definition: c.h:229
static void ExecShutdownGatherWorkers ( GatherState node)
static

Definition at line 379 of file nodeGather.c.

References DestroyTupleQueueReader(), ExecParallelFinish(), i, GatherState::nreaders, NULL, GatherState::pei, pfree(), and GatherState::reader.

Referenced by ExecGather(), ExecReScanGather(), ExecShutdownGather(), and gather_readnext().

380 {
381  /* Shut down tuple queue readers before shutting down workers. */
382  if (node->reader != NULL)
383  {
384  int i;
385 
386  for (i = 0; i < node->nreaders; ++i)
388 
389  pfree(node->reader);
390  node->reader = NULL;
391  }
392 
393  /* Now shut down the workers. */
394  if (node->pei != NULL)
395  ExecParallelFinish(node->pei);
396 }
void DestroyTupleQueueReader(TupleQueueReader *reader)
Definition: tqueue.c:651
void ExecParallelFinish(ParallelExecutorInfo *pei)
Definition: execParallel.c:633
struct TupleQueueReader ** reader
Definition: execnodes.h:1916
void pfree(void *pointer)
Definition: mcxt.c:950
struct ParallelExecutorInfo * pei
Definition: execnodes.h:1912
#define NULL
Definition: c.h:229
int i
static TupleTableSlot * gather_getnext ( GatherState gatherstate)
static

Definition at line 245 of file nodeGather.c.

References CHECK_FOR_INTERRUPTS, ExprContext::ecxt_per_tuple_memory, ExecClearTuple(), ExecProcNode(), ExecStoreTuple(), GatherState::funnel_slot, gather_readnext(), HeapTupleIsValid, InvalidBuffer, MemoryContextSwitchTo(), GatherState::need_to_scan_locally, NULL, outerPlan, outerPlanState, GatherState::ps, PlanState::ps_ExprContext, GatherState::reader, and TupIsNull.

Referenced by ExecGather().

246 {
247  PlanState *outerPlan = outerPlanState(gatherstate);
248  TupleTableSlot *outerTupleSlot;
249  TupleTableSlot *fslot = gatherstate->funnel_slot;
250  MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
251  HeapTuple tup;
252 
253  while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally)
254  {
256 
257  if (gatherstate->reader != NULL)
258  {
259  MemoryContext oldContext;
260 
261  /* Run TupleQueueReaders in per-tuple context */
262  oldContext = MemoryContextSwitchTo(tupleContext);
263  tup = gather_readnext(gatherstate);
264  MemoryContextSwitchTo(oldContext);
265 
266  if (HeapTupleIsValid(tup))
267  {
268  ExecStoreTuple(tup, /* tuple to store */
269  fslot, /* slot in which to store the tuple */
270  InvalidBuffer, /* buffer associated with this
271  * tuple */
272  false); /* slot should not pfree tuple */
273  return fslot;
274  }
275  }
276 
277  if (gatherstate->need_to_scan_locally)
278  {
279  outerTupleSlot = ExecProcNode(outerPlan);
280 
281  if (!TupIsNull(outerTupleSlot))
282  return outerTupleSlot;
283 
284  gatherstate->need_to_scan_locally = false;
285  }
286  }
287 
288  return ExecClearTuple(fslot);
289 }
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:320
ExprContext * ps_ExprContext
Definition: execnodes.h:878
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:203
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define InvalidBuffer
Definition: buf.h:25
bool need_to_scan_locally
Definition: execnodes.h:1918
struct TupleQueueReader ** reader
Definition: execnodes.h:1916
#define outerPlanState(node)
Definition: execnodes.h:890
#define TupIsNull(slot)
Definition: tuptable.h:138
PlanState ps
Definition: execnodes.h:1910
static HeapTuple gather_readnext(GatherState *gatherstate)
Definition: nodeGather.c:295
#define outerPlan(node)
Definition: plannodes.h:174
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:244
TupleTableSlot * funnel_slot
Definition: execnodes.h:1917
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define NULL
Definition: c.h:229
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
static HeapTuple gather_readnext ( GatherState gatherstate)
static

Definition at line 295 of file nodeGather.c.

References Assert, CHECK_FOR_INTERRUPTS, DestroyTupleQueueReader(), ExecShutdownGatherWorkers(), memmove, MyLatch, GatherState::need_to_scan_locally, GatherState::nextreader, GatherState::nreaders, NULL, GatherState::reader, ResetLatch(), TupleQueueReaderNext(), WAIT_EVENT_EXECUTE_GATHER, WaitLatch(), and WL_LATCH_SET.

Referenced by gather_getnext().

296 {
297  int nvisited = 0;
298 
299  for (;;)
300  {
301  TupleQueueReader *reader;
302  HeapTuple tup;
303  bool readerdone;
304 
305  /* Check for async events, particularly messages from workers. */
307 
308  /* Attempt to read a tuple, but don't block if none is available. */
309  Assert(gatherstate->nextreader < gatherstate->nreaders);
310  reader = gatherstate->reader[gatherstate->nextreader];
311  tup = TupleQueueReaderNext(reader, true, &readerdone);
312 
313  /*
314  * If this reader is done, remove it. If all readers are done, clean
315  * up remaining worker state.
316  */
317  if (readerdone)
318  {
319  Assert(!tup);
320  DestroyTupleQueueReader(reader);
321  --gatherstate->nreaders;
322  if (gatherstate->nreaders == 0)
323  {
324  ExecShutdownGatherWorkers(gatherstate);
325  return NULL;
326  }
327  memmove(&gatherstate->reader[gatherstate->nextreader],
328  &gatherstate->reader[gatherstate->nextreader + 1],
329  sizeof(TupleQueueReader *)
330  * (gatherstate->nreaders - gatherstate->nextreader));
331  if (gatherstate->nextreader >= gatherstate->nreaders)
332  gatherstate->nextreader = 0;
333  continue;
334  }
335 
336  /* If we got a tuple, return it. */
337  if (tup)
338  return tup;
339 
340  /*
341  * Advance nextreader pointer in round-robin fashion. Note that we
342  * only reach this code if we weren't able to get a tuple from the
343  * current worker. We used to advance the nextreader pointer after
344  * every tuple, but it turns out to be much more efficient to keep
345  * reading from the same queue until that would require blocking.
346  */
347  gatherstate->nextreader++;
348  if (gatherstate->nextreader >= gatherstate->nreaders)
349  gatherstate->nextreader = 0;
350 
351  /* Have we visited every (surviving) TupleQueueReader? */
352  nvisited++;
353  if (nvisited >= gatherstate->nreaders)
354  {
355  /*
356  * If (still) running plan locally, return NULL so caller can
357  * generate another tuple from the local copy of the plan.
358  */
359  if (gatherstate->need_to_scan_locally)
360  return NULL;
361 
362  /* Nothing to do except wait for developments. */
365  nvisited = 0;
366  }
367  }
368 }
void DestroyTupleQueueReader(TupleQueueReader *reader)
Definition: tqueue.c:651
bool need_to_scan_locally
Definition: execnodes.h:1918
HeapTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Definition: tqueue.c:679
struct TupleQueueReader ** reader
Definition: execnodes.h:1916
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:336
static void ExecShutdownGatherWorkers(GatherState *node)
Definition: nodeGather.c:379
#define memmove(d, s, c)
Definition: c.h:1058
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
int nextreader
Definition: execnodes.h:1914
struct Latch * MyLatch
Definition: globals.c:52
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
#define WL_LATCH_SET
Definition: latch.h:124