PostgreSQL Source Code  git master
nodeAppend.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeAppend.c
4  * routines to handle append nodes.
5  *
6  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/executor/nodeAppend.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /* INTERFACE ROUTINES
16  * ExecInitAppend - initialize the append node
17  * ExecAppend - retrieve the next tuple from the node
18  * ExecEndAppend - shut down the append node
19  * ExecReScanAppend - rescan the append node
20  *
21  * NOTES
22  * Each append node contains a list of one or more subplans which
23  * must be iteratively processed (forwards or backwards).
24  * Tuples are retrieved by executing the 'whichplan'th subplan
25  * until the subplan stops returning tuples, at which point that
26  * plan is shut down and the next started up.
27  *
28  * Append nodes don't make use of their left and right
29  * subtrees, rather they maintain a list of subplans so
30  * a typical append node looks like this in the plan tree:
31  *
32  * ...
33  * /
34  * Append -------+------+------+--- nil
35  * / \ | | |
36  * nil nil ... ... ...
37  * subplans
38  *
39  * Append nodes are currently used for unions, and to support
40  * inheritance queries, where several relations need to be scanned.
41  * For example, in our standard person/student/employee/student-emp
42  * example, where student and employee inherit from person
43  * and student-emp inherits from student and employee, the
44  * query:
45  *
46  * select name from person
47  *
48  * generates the plan:
49  *
50  * |
51  * Append -------+-------+--------+--------+
52  * / \ | | | |
53  * nil nil Scan Scan Scan Scan
54  * | | | |
55  * person employee student student-emp
56  */
57 
58 #include "postgres.h"
59 
60 #include "executor/execAsync.h"
61 #include "executor/execdebug.h"
62 #include "executor/execPartition.h"
63 #include "executor/nodeAppend.h"
64 #include "miscadmin.h"
65 #include "pgstat.h"
66 #include "storage/latch.h"
67 
68 /* Shared state for parallel-aware Append. */
70 {
71  LWLock pa_lock; /* mutual exclusion to choose next subplan */
72  int pa_next_plan; /* next plan to choose by any worker */
73 
74  /*
75  * pa_finished[i] should be true if no more workers should select subplan
76  * i. for a non-partial plan, this should be set to true as soon as a
77  * worker selects the plan; for a partial plan, it remains false until
78  * some worker executes the plan to completion.
79  */
81 };
82 
83 #define INVALID_SUBPLAN_INDEX -1
84 #define EVENT_BUFFER_SIZE 16
85 
86 static TupleTableSlot *ExecAppend(PlanState *pstate);
87 static bool choose_next_subplan_locally(AppendState *node);
91 static void ExecAppendAsyncBegin(AppendState *node);
92 static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
93 static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
94 static void ExecAppendAsyncEventWait(AppendState *node);
95 static void classify_matching_subplans(AppendState *node);
96 
97 /* ----------------------------------------------------------------
98  * ExecInitAppend
99  *
100  * Begin all of the subscans of the append node.
101  *
102  * (This is potentially wasteful, since the entire result of the
103  * append node may not be scanned, but this way all of the
104  * structures get allocated in the executor's top level memory
105  * block instead of that of the call to ExecAppend.)
106  * ----------------------------------------------------------------
107  */
108 AppendState *
109 ExecInitAppend(Append *node, EState *estate, int eflags)
110 {
111  AppendState *appendstate = makeNode(AppendState);
112  PlanState **appendplanstates;
113  Bitmapset *validsubplans;
114  Bitmapset *asyncplans;
115  int nplans;
116  int nasyncplans;
117  int firstvalid;
118  int i,
119  j;
120 
121  /* check for unsupported flags */
122  Assert(!(eflags & EXEC_FLAG_MARK));
123 
124  /*
125  * create new AppendState for our append node
126  */
127  appendstate->ps.plan = (Plan *) node;
128  appendstate->ps.state = estate;
129  appendstate->ps.ExecProcNode = ExecAppend;
130 
131  /* Let choose_next_subplan_* function handle setting the first subplan */
132  appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
133  appendstate->as_syncdone = false;
134  appendstate->as_begun = false;
135 
136  /* If run-time partition pruning is enabled, then set that up now */
137  if (node->part_prune_index >= 0)
138  {
139  PartitionPruneState *prunestate;
140 
141  /*
142  * Set up pruning data structure. This also initializes the set of
143  * subplans to initialize (validsubplans) by taking into account the
144  * result of performing initial pruning if any.
145  */
146  prunestate = ExecInitPartitionPruning(&appendstate->ps,
147  list_length(node->appendplans),
148  node->part_prune_index,
149  node->apprelids,
150  &validsubplans);
151  appendstate->as_prune_state = prunestate;
152  nplans = bms_num_members(validsubplans);
153 
154  /*
155  * When no run-time pruning is required and there's at least one
156  * subplan, we can fill as_valid_subplans immediately, preventing
157  * later calls to ExecFindMatchingSubPlans.
158  */
159  if (!prunestate->do_exec_prune && nplans > 0)
160  appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
161  }
162  else
163  {
164  nplans = list_length(node->appendplans);
165 
166  /*
167  * When run-time partition pruning is not enabled we can just mark all
168  * subplans as valid; they must also all be initialized.
169  */
170  Assert(nplans > 0);
171  appendstate->as_valid_subplans = validsubplans =
172  bms_add_range(NULL, 0, nplans - 1);
173  appendstate->as_prune_state = NULL;
174  }
175 
176  /*
177  * Initialize result tuple type and slot.
178  */
179  ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
180 
181  /* node returns slots from each of its subnodes, therefore not fixed */
182  appendstate->ps.resultopsset = true;
183  appendstate->ps.resultopsfixed = false;
184 
185  appendplanstates = (PlanState **) palloc(nplans *
186  sizeof(PlanState *));
187 
188  /*
189  * call ExecInitNode on each of the valid plans to be executed and save
190  * the results into the appendplanstates array.
191  *
192  * While at it, find out the first valid partial plan.
193  */
194  j = 0;
195  asyncplans = NULL;
196  nasyncplans = 0;
197  firstvalid = nplans;
198  i = -1;
199  while ((i = bms_next_member(validsubplans, i)) >= 0)
200  {
201  Plan *initNode = (Plan *) list_nth(node->appendplans, i);
202 
203  /*
204  * Record async subplans. When executing EvalPlanQual, we treat them
205  * as sync ones; don't do this when initializing an EvalPlanQual plan
206  * tree.
207  */
208  if (initNode->async_capable && estate->es_epq_active == NULL)
209  {
210  asyncplans = bms_add_member(asyncplans, j);
211  nasyncplans++;
212  }
213 
214  /*
215  * Record the lowest appendplans index which is a valid partial plan.
216  */
217  if (i >= node->first_partial_plan && j < firstvalid)
218  firstvalid = j;
219 
220  appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
221  }
222 
223  appendstate->as_first_partial_plan = firstvalid;
224  appendstate->appendplans = appendplanstates;
225  appendstate->as_nplans = nplans;
226 
227  /* Initialize async state */
228  appendstate->as_asyncplans = asyncplans;
229  appendstate->as_nasyncplans = nasyncplans;
230  appendstate->as_asyncrequests = NULL;
231  appendstate->as_asyncresults = NULL;
232  appendstate->as_nasyncresults = 0;
233  appendstate->as_nasyncremain = 0;
234  appendstate->as_needrequest = NULL;
235  appendstate->as_eventset = NULL;
236  appendstate->as_valid_asyncplans = NULL;
237 
238  if (nasyncplans > 0)
239  {
240  appendstate->as_asyncrequests = (AsyncRequest **)
241  palloc0(nplans * sizeof(AsyncRequest *));
242 
243  i = -1;
244  while ((i = bms_next_member(asyncplans, i)) >= 0)
245  {
246  AsyncRequest *areq;
247 
248  areq = palloc(sizeof(AsyncRequest));
249  areq->requestor = (PlanState *) appendstate;
250  areq->requestee = appendplanstates[i];
251  areq->request_index = i;
252  areq->callback_pending = false;
253  areq->request_complete = false;
254  areq->result = NULL;
255 
256  appendstate->as_asyncrequests[i] = areq;
257  }
258 
259  appendstate->as_asyncresults = (TupleTableSlot **)
260  palloc0(nasyncplans * sizeof(TupleTableSlot *));
261 
262  if (appendstate->as_valid_subplans != NULL)
263  classify_matching_subplans(appendstate);
264  }
265 
266  /*
267  * Miscellaneous initialization
268  */
269 
270  appendstate->ps.ps_ProjInfo = NULL;
271 
272  /* For parallel query, this will be overridden later. */
274 
275  return appendstate;
276 }
277 
278 /* ----------------------------------------------------------------
279  * ExecAppend
280  *
281  * Handles iteration over multiple subplans.
282  * ----------------------------------------------------------------
283  */
284 static TupleTableSlot *
286 {
287  AppendState *node = castNode(AppendState, pstate);
288  TupleTableSlot *result;
289 
290  /*
291  * If this is the first call after Init or ReScan, we need to do the
292  * initialization work.
293  */
294  if (!node->as_begun)
295  {
297  Assert(!node->as_syncdone);
298 
299  /* Nothing to do if there are no subplans */
300  if (node->as_nplans == 0)
301  return ExecClearTuple(node->ps.ps_ResultTupleSlot);
302 
303  /* If there are any async subplans, begin executing them. */
304  if (node->as_nasyncplans > 0)
305  ExecAppendAsyncBegin(node);
306 
307  /*
308  * If no sync subplan has been chosen, we must choose one before
309  * proceeding.
310  */
311  if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
312  return ExecClearTuple(node->ps.ps_ResultTupleSlot);
313 
314  Assert(node->as_syncdone ||
315  (node->as_whichplan >= 0 &&
316  node->as_whichplan < node->as_nplans));
317 
318  /* And we're initialized. */
319  node->as_begun = true;
320  }
321 
322  for (;;)
323  {
324  PlanState *subnode;
325 
327 
328  /*
329  * try to get a tuple from an async subplan if any
330  */
331  if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
332  {
333  if (ExecAppendAsyncGetNext(node, &result))
334  return result;
335  Assert(!node->as_syncdone);
337  }
338 
339  /*
340  * figure out which sync subplan we are currently processing
341  */
342  Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
343  subnode = node->appendplans[node->as_whichplan];
344 
345  /*
346  * get a tuple from the subplan
347  */
348  result = ExecProcNode(subnode);
349 
350  if (!TupIsNull(result))
351  {
352  /*
353  * If the subplan gave us something then return it as-is. We do
354  * NOT make use of the result slot that was set up in
355  * ExecInitAppend; there's no need for it.
356  */
357  return result;
358  }
359 
360  /*
361  * wait or poll for async events if any. We do this before checking
362  * for the end of iteration, because it might drain the remaining
363  * async subplans.
364  */
365  if (node->as_nasyncremain > 0)
367 
368  /* choose new sync subplan; if no sync/async subplans, we're done */
369  if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
370  return ExecClearTuple(node->ps.ps_ResultTupleSlot);
371  }
372 }
373 
374 /* ----------------------------------------------------------------
375  * ExecEndAppend
376  *
377  * Shuts down the subscans of the append node.
378  *
379  * Returns nothing of interest.
380  * ----------------------------------------------------------------
381  */
382 void
384 {
385  PlanState **appendplans;
386  int nplans;
387  int i;
388 
389  /*
390  * get information from the node
391  */
392  appendplans = node->appendplans;
393  nplans = node->as_nplans;
394 
395  /*
396  * shut down each of the subscans
397  */
398  for (i = 0; i < nplans; i++)
399  ExecEndNode(appendplans[i]);
400 }
401 
402 void
404 {
405  int nasyncplans = node->as_nasyncplans;
406  int i;
407 
408  /*
409  * If any PARAM_EXEC Params used in pruning expressions have changed, then
410  * we'd better unset the valid subplans so that they are reselected for
411  * the new parameter values.
412  */
413  if (node->as_prune_state &&
414  bms_overlap(node->ps.chgParam,
416  {
418  node->as_valid_subplans = NULL;
419  if (nasyncplans > 0)
420  {
422  node->as_valid_asyncplans = NULL;
423  }
424  }
425 
426  for (i = 0; i < node->as_nplans; i++)
427  {
428  PlanState *subnode = node->appendplans[i];
429 
430  /*
431  * ExecReScan doesn't know about my subplans, so I have to do
432  * changed-parameter signaling myself.
433  */
434  if (node->ps.chgParam != NULL)
435  UpdateChangedParamSet(subnode, node->ps.chgParam);
436 
437  /*
438  * If chgParam of subnode is not null then plan will be re-scanned by
439  * first ExecProcNode or by first ExecAsyncRequest.
440  */
441  if (subnode->chgParam == NULL)
442  ExecReScan(subnode);
443  }
444 
445  /* Reset async state */
446  if (nasyncplans > 0)
447  {
448  i = -1;
449  while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
450  {
451  AsyncRequest *areq = node->as_asyncrequests[i];
452 
453  areq->callback_pending = false;
454  areq->request_complete = false;
455  areq->result = NULL;
456  }
457 
458  node->as_nasyncresults = 0;
459  node->as_nasyncremain = 0;
460  bms_free(node->as_needrequest);
461  node->as_needrequest = NULL;
462  }
463 
464  /* Let choose_next_subplan_* function handle setting the first subplan */
466  node->as_syncdone = false;
467  node->as_begun = false;
468 }
469 
470 /* ----------------------------------------------------------------
471  * Parallel Append Support
472  * ----------------------------------------------------------------
473  */
474 
475 /* ----------------------------------------------------------------
476  * ExecAppendEstimate
477  *
478  * Compute the amount of space we'll need in the parallel
479  * query DSM, and inform pcxt->estimator about our needs.
480  * ----------------------------------------------------------------
481  */
482 void
484  ParallelContext *pcxt)
485 {
486  node->pstate_len =
487  add_size(offsetof(ParallelAppendState, pa_finished),
488  sizeof(bool) * node->as_nplans);
489 
491  shm_toc_estimate_keys(&pcxt->estimator, 1);
492 }
493 
494 
495 /* ----------------------------------------------------------------
496  * ExecAppendInitializeDSM
497  *
498  * Set up shared state for Parallel Append.
499  * ----------------------------------------------------------------
500  */
501 void
503  ParallelContext *pcxt)
504 {
505  ParallelAppendState *pstate;
506 
507  pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
508  memset(pstate, 0, node->pstate_len);
510  shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
511 
512  node->as_pstate = pstate;
514 }
515 
516 /* ----------------------------------------------------------------
517  * ExecAppendReInitializeDSM
518  *
519  * Reset shared state before beginning a fresh scan.
520  * ----------------------------------------------------------------
521  */
522 void
524 {
525  ParallelAppendState *pstate = node->as_pstate;
526 
527  pstate->pa_next_plan = 0;
528  memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
529 }
530 
531 /* ----------------------------------------------------------------
532  * ExecAppendInitializeWorker
533  *
534  * Copy relevant information from TOC into planstate, and initialize
535  * whatever is required to choose and execute the optimal subplan.
536  * ----------------------------------------------------------------
537  */
538 void
540 {
541  node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
543 }
544 
545 /* ----------------------------------------------------------------
546  * choose_next_subplan_locally
547  *
548  * Choose next sync subplan for a non-parallel-aware Append,
549  * returning false if there are no more.
550  * ----------------------------------------------------------------
551  */
552 static bool
554 {
555  int whichplan = node->as_whichplan;
556  int nextplan;
557 
558  /* We should never be called when there are no subplans */
559  Assert(node->as_nplans > 0);
560 
561  /* Nothing to do if syncdone */
562  if (node->as_syncdone)
563  return false;
564 
565  /*
566  * If first call then have the bms member function choose the first valid
567  * sync subplan by initializing whichplan to -1. If there happen to be no
568  * valid sync subplans then the bms member function will handle that by
569  * returning a negative number which will allow us to exit returning a
570  * false value.
571  */
572  if (whichplan == INVALID_SUBPLAN_INDEX)
573  {
574  if (node->as_nasyncplans > 0)
575  {
576  /* We'd have filled as_valid_subplans already */
577  Assert(node->as_valid_subplans);
578  }
579  else if (node->as_valid_subplans == NULL)
580  node->as_valid_subplans =
582 
583  whichplan = -1;
584  }
585 
586  /* Ensure whichplan is within the expected range */
587  Assert(whichplan >= -1 && whichplan <= node->as_nplans);
588 
590  nextplan = bms_next_member(node->as_valid_subplans, whichplan);
591  else
592  nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
593 
594  if (nextplan < 0)
595  {
596  /* Set as_syncdone if in async mode */
597  if (node->as_nasyncplans > 0)
598  node->as_syncdone = true;
599  return false;
600  }
601 
602  node->as_whichplan = nextplan;
603 
604  return true;
605 }
606 
607 /* ----------------------------------------------------------------
608  * choose_next_subplan_for_leader
609  *
610  * Try to pick a plan which doesn't commit us to doing much
611  * work locally, so that as much work as possible is done in
612  * the workers. Cheapest subplans are at the end.
613  * ----------------------------------------------------------------
614  */
615 static bool
617 {
618  ParallelAppendState *pstate = node->as_pstate;
619 
620  /* Backward scan is not supported by parallel-aware plans */
622 
623  /* We should never be called when there are no subplans */
624  Assert(node->as_nplans > 0);
625 
627 
628  if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
629  {
630  /* Mark just-completed subplan as finished. */
631  node->as_pstate->pa_finished[node->as_whichplan] = true;
632  }
633  else
634  {
635  /* Start with last subplan. */
636  node->as_whichplan = node->as_nplans - 1;
637 
638  /*
639  * If we've yet to determine the valid subplans then do so now. If
640  * run-time pruning is disabled then the valid subplans will always be
641  * set to all subplans.
642  */
643  if (node->as_valid_subplans == NULL)
644  {
645  node->as_valid_subplans =
647 
648  /*
649  * Mark each invalid plan as finished to allow the loop below to
650  * select the first valid subplan.
651  */
653  }
654  }
655 
656  /* Loop until we find a subplan to execute. */
657  while (pstate->pa_finished[node->as_whichplan])
658  {
659  if (node->as_whichplan == 0)
660  {
663  LWLockRelease(&pstate->pa_lock);
664  return false;
665  }
666 
667  /*
668  * We needn't pay attention to as_valid_subplans here as all invalid
669  * plans have been marked as finished.
670  */
671  node->as_whichplan--;
672  }
673 
674  /* If non-partial, immediately mark as finished. */
675  if (node->as_whichplan < node->as_first_partial_plan)
676  node->as_pstate->pa_finished[node->as_whichplan] = true;
677 
678  LWLockRelease(&pstate->pa_lock);
679 
680  return true;
681 }
682 
683 /* ----------------------------------------------------------------
684  * choose_next_subplan_for_worker
685  *
686  * Choose next subplan for a parallel-aware Append, returning
687  * false if there are no more.
688  *
689  * We start from the first plan and advance through the list;
690  * when we get back to the end, we loop back to the first
691  * partial plan. This assigns the non-partial plans first in
692  * order of descending cost and then spreads out the workers
693  * as evenly as possible across the remaining partial plans.
694  * ----------------------------------------------------------------
695  */
696 static bool
698 {
699  ParallelAppendState *pstate = node->as_pstate;
700 
701  /* Backward scan is not supported by parallel-aware plans */
703 
704  /* We should never be called when there are no subplans */
705  Assert(node->as_nplans > 0);
706 
708 
709  /* Mark just-completed subplan as finished. */
710  if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
711  node->as_pstate->pa_finished[node->as_whichplan] = true;
712 
713  /*
714  * If we've yet to determine the valid subplans then do so now. If
715  * run-time pruning is disabled then the valid subplans will always be set
716  * to all subplans.
717  */
718  else if (node->as_valid_subplans == NULL)
719  {
720  node->as_valid_subplans =
723  }
724 
725  /* If all the plans are already done, we have nothing to do */
726  if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
727  {
728  LWLockRelease(&pstate->pa_lock);
729  return false;
730  }
731 
732  /* Save the plan from which we are starting the search. */
733  node->as_whichplan = pstate->pa_next_plan;
734 
735  /* Loop until we find a valid subplan to execute. */
736  while (pstate->pa_finished[pstate->pa_next_plan])
737  {
738  int nextplan;
739 
740  nextplan = bms_next_member(node->as_valid_subplans,
741  pstate->pa_next_plan);
742  if (nextplan >= 0)
743  {
744  /* Advance to the next valid plan. */
745  pstate->pa_next_plan = nextplan;
746  }
747  else if (node->as_whichplan > node->as_first_partial_plan)
748  {
749  /*
750  * Try looping back to the first valid partial plan, if there is
751  * one. If there isn't, arrange to bail out below.
752  */
753  nextplan = bms_next_member(node->as_valid_subplans,
754  node->as_first_partial_plan - 1);
755  pstate->pa_next_plan =
756  nextplan < 0 ? node->as_whichplan : nextplan;
757  }
758  else
759  {
760  /*
761  * At last plan, and either there are no partial plans or we've
762  * tried them all. Arrange to bail out.
763  */
764  pstate->pa_next_plan = node->as_whichplan;
765  }
766 
767  if (pstate->pa_next_plan == node->as_whichplan)
768  {
769  /* We've tried everything! */
771  LWLockRelease(&pstate->pa_lock);
772  return false;
773  }
774  }
775 
776  /* Pick the plan we found, and advance pa_next_plan one more time. */
777  node->as_whichplan = pstate->pa_next_plan;
779  pstate->pa_next_plan);
780 
781  /*
782  * If there are no more valid plans then try setting the next plan to the
783  * first valid partial plan.
784  */
785  if (pstate->pa_next_plan < 0)
786  {
787  int nextplan = bms_next_member(node->as_valid_subplans,
788  node->as_first_partial_plan - 1);
789 
790  if (nextplan >= 0)
791  pstate->pa_next_plan = nextplan;
792  else
793  {
794  /*
795  * There are no valid partial plans, and we already chose the last
796  * non-partial plan; so flag that there's nothing more for our
797  * fellow workers to do.
798  */
800  }
801  }
802 
803  /* If non-partial, immediately mark as finished. */
804  if (node->as_whichplan < node->as_first_partial_plan)
805  node->as_pstate->pa_finished[node->as_whichplan] = true;
806 
807  LWLockRelease(&pstate->pa_lock);
808 
809  return true;
810 }
811 
812 /*
813  * mark_invalid_subplans_as_finished
814  * Marks the ParallelAppendState's pa_finished as true for each invalid
815  * subplan.
816  *
817  * This function should only be called for parallel Append with run-time
818  * pruning enabled.
819  */
820 static void
822 {
823  int i;
824 
825  /* Only valid to call this while in parallel Append mode */
826  Assert(node->as_pstate);
827 
828  /* Shouldn't have been called when run-time pruning is not enabled */
829  Assert(node->as_prune_state);
830 
831  /* Nothing to do if all plans are valid */
832  if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
833  return;
834 
835  /* Mark all non-valid plans as finished */
836  for (i = 0; i < node->as_nplans; i++)
837  {
838  if (!bms_is_member(i, node->as_valid_subplans))
839  node->as_pstate->pa_finished[i] = true;
840  }
841 }
842 
843 /* ----------------------------------------------------------------
844  * Asynchronous Append Support
845  * ----------------------------------------------------------------
846  */
847 
848 /* ----------------------------------------------------------------
849  * ExecAppendAsyncBegin
850  *
851  * Begin executing designed async-capable subplans.
852  * ----------------------------------------------------------------
853  */
854 static void
856 {
857  int i;
858 
859  /* Backward scan is not supported by async-aware Appends. */
861 
862  /* We should never be called when there are no subplans */
863  Assert(node->as_nplans > 0);
864 
865  /* We should never be called when there are no async subplans. */
866  Assert(node->as_nasyncplans > 0);
867 
868  /* If we've yet to determine the valid subplans then do so now. */
869  if (node->as_valid_subplans == NULL)
870  {
871  node->as_valid_subplans =
873 
875  }
876 
877  /* Initialize state variables. */
880 
881  /* Nothing to do if there are no valid async subplans. */
882  if (node->as_nasyncremain == 0)
883  return;
884 
885  /* Make a request for each of the valid async subplans. */
886  i = -1;
887  while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
888  {
889  AsyncRequest *areq = node->as_asyncrequests[i];
890 
891  Assert(areq->request_index == i);
892  Assert(!areq->callback_pending);
893 
894  /* Do the actual work. */
895  ExecAsyncRequest(areq);
896  }
897 }
898 
899 /* ----------------------------------------------------------------
900  * ExecAppendAsyncGetNext
901  *
902  * Get the next tuple from any of the asynchronous subplans.
903  * ----------------------------------------------------------------
904  */
905 static bool
907 {
908  *result = NULL;
909 
910  /* We should never be called when there are no valid async subplans. */
911  Assert(node->as_nasyncremain > 0);
912 
913  /* Request a tuple asynchronously. */
914  if (ExecAppendAsyncRequest(node, result))
915  return true;
916 
917  while (node->as_nasyncremain > 0)
918  {
920 
921  /* Wait or poll for async events. */
923 
924  /* Request a tuple asynchronously. */
925  if (ExecAppendAsyncRequest(node, result))
926  return true;
927 
928  /* Break from loop if there's any sync subplan that isn't complete. */
929  if (!node->as_syncdone)
930  break;
931  }
932 
933  /*
934  * If all sync subplans are complete, we're totally done scanning the
935  * given node. Otherwise, we're done with the asynchronous stuff but must
936  * continue scanning the sync subplans.
937  */
938  if (node->as_syncdone)
939  {
940  Assert(node->as_nasyncremain == 0);
941  *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
942  return true;
943  }
944 
945  return false;
946 }
947 
948 /* ----------------------------------------------------------------
949  * ExecAppendAsyncRequest
950  *
951  * Request a tuple asynchronously.
952  * ----------------------------------------------------------------
953  */
954 static bool
956 {
957  Bitmapset *needrequest;
958  int i;
959 
960  /* Nothing to do if there are no async subplans needing a new request. */
961  if (bms_is_empty(node->as_needrequest))
962  {
963  Assert(node->as_nasyncresults == 0);
964  return false;
965  }
966 
967  /*
968  * If there are any asynchronously-generated results that have not yet
969  * been returned, we have nothing to do; just return one of them.
970  */
971  if (node->as_nasyncresults > 0)
972  {
973  --node->as_nasyncresults;
974  *result = node->as_asyncresults[node->as_nasyncresults];
975  return true;
976  }
977 
978  /* Make a new request for each of the async subplans that need it. */
979  needrequest = node->as_needrequest;
980  node->as_needrequest = NULL;
981  i = -1;
982  while ((i = bms_next_member(needrequest, i)) >= 0)
983  {
984  AsyncRequest *areq = node->as_asyncrequests[i];
985 
986  /* Do the actual work. */
987  ExecAsyncRequest(areq);
988  }
989  bms_free(needrequest);
990 
991  /* Return one of the asynchronously-generated results if any. */
992  if (node->as_nasyncresults > 0)
993  {
994  --node->as_nasyncresults;
995  *result = node->as_asyncresults[node->as_nasyncresults];
996  return true;
997  }
998 
999  return false;
1000 }
1001 
1002 /* ----------------------------------------------------------------
1003  * ExecAppendAsyncEventWait
1004  *
1005  * Wait or poll for file descriptor events and fire callbacks.
1006  * ----------------------------------------------------------------
1007  */
1008 static void
1010 {
1011  int nevents = node->as_nasyncplans + 1;
1012  long timeout = node->as_syncdone ? -1 : 0;
1013  WaitEvent occurred_event[EVENT_BUFFER_SIZE];
1014  int noccurred;
1015  int i;
1016 
1017  /* We should never be called when there are no valid async subplans. */
1018  Assert(node->as_nasyncremain > 0);
1019 
1022  NULL, NULL);
1023 
1024  /* Give each waiting subplan a chance to add an event. */
1025  i = -1;
1026  while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1027  {
1028  AsyncRequest *areq = node->as_asyncrequests[i];
1029 
1030  if (areq->callback_pending)
1031  ExecAsyncConfigureWait(areq);
1032  }
1033 
1034  /*
1035  * No need for further processing if there are no configured events other
1036  * than the postmaster death event.
1037  */
1038  if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1039  {
1041  node->as_eventset = NULL;
1042  return;
1043  }
1044 
1045  /* We wait on at most EVENT_BUFFER_SIZE events. */
1046  if (nevents > EVENT_BUFFER_SIZE)
1047  nevents = EVENT_BUFFER_SIZE;
1048 
1049  /*
1050  * If the timeout is -1, wait until at least one event occurs. If the
1051  * timeout is 0, poll for events, but do not wait at all.
1052  */
1053  noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1054  nevents, WAIT_EVENT_APPEND_READY);
1056  node->as_eventset = NULL;
1057  if (noccurred == 0)
1058  return;
1059 
1060  /* Deliver notifications. */
1061  for (i = 0; i < noccurred; i++)
1062  {
1063  WaitEvent *w = &occurred_event[i];
1064 
1065  /*
1066  * Each waiting subplan should have registered its wait event with
1067  * user_data pointing back to its AsyncRequest.
1068  */
1069  if ((w->events & WL_SOCKET_READABLE) != 0)
1070  {
1071  AsyncRequest *areq = (AsyncRequest *) w->user_data;
1072 
1073  if (areq->callback_pending)
1074  {
1075  /*
1076  * Mark it as no longer needing a callback. We must do this
1077  * before dispatching the callback in case the callback resets
1078  * the flag.
1079  */
1080  areq->callback_pending = false;
1081 
1082  /* Do the actual work. */
1083  ExecAsyncNotify(areq);
1084  }
1085  }
1086  }
1087 }
1088 
1089 /* ----------------------------------------------------------------
1090  * ExecAsyncAppendResponse
1091  *
1092  * Receive a response from an asynchronous request we made.
1093  * ----------------------------------------------------------------
1094  */
1095 void
1097 {
1098  AppendState *node = (AppendState *) areq->requestor;
1099  TupleTableSlot *slot = areq->result;
1100 
1101  /* The result should be a TupleTableSlot or NULL. */
1102  Assert(slot == NULL || IsA(slot, TupleTableSlot));
1103 
1104  /* Nothing to do if the request is pending. */
1105  if (!areq->request_complete)
1106  {
1107  /* The request would have been pending for a callback. */
1108  Assert(areq->callback_pending);
1109  return;
1110  }
1111 
1112  /* If the result is NULL or an empty slot, there's nothing more to do. */
1113  if (TupIsNull(slot))
1114  {
1115  /* The ending subplan wouldn't have been pending for a callback. */
1116  Assert(!areq->callback_pending);
1117  --node->as_nasyncremain;
1118  return;
1119  }
1120 
1121  /* Save result so we can return it. */
1122  Assert(node->as_nasyncresults < node->as_nasyncplans);
1123  node->as_asyncresults[node->as_nasyncresults++] = slot;
1124 
1125  /*
1126  * Mark the subplan that returned a result as ready for a new request. We
1127  * don't launch another one here immediately because it might complete.
1128  */
1130  areq->request_index);
1131 }
1132 
1133 /* ----------------------------------------------------------------
1134  * classify_matching_subplans
1135  *
1136  * Classify the node's as_valid_subplans into sync ones and
1137  * async ones, adjust it to contain sync ones only, and save
1138  * async ones in the node's as_valid_asyncplans.
1139  * ----------------------------------------------------------------
1140  */
1141 static void
1143 {
1144  Bitmapset *valid_asyncplans;
1145 
1146  Assert(node->as_valid_asyncplans == NULL);
1147 
1148  /* Nothing to do if there are no valid subplans. */
1149  if (bms_is_empty(node->as_valid_subplans))
1150  {
1151  node->as_syncdone = true;
1152  node->as_nasyncremain = 0;
1153  return;
1154  }
1155 
1156  /* Nothing to do if there are no valid async subplans. */
1157  if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1158  {
1159  node->as_nasyncremain = 0;
1160  return;
1161  }
1162 
1163  /* Get valid async subplans. */
1164  valid_asyncplans = bms_copy(node->as_asyncplans);
1165  valid_asyncplans = bms_int_members(valid_asyncplans,
1166  node->as_valid_subplans);
1167 
1168  /* Adjust the valid subplans to contain sync subplans only. */
1170  valid_asyncplans);
1171 
1172  /* Save valid async subplans. */
1173  node->as_valid_asyncplans = valid_asyncplans;
1174 }
int bms_prev_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1106
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1047
void bms_free(Bitmapset *a)
Definition: bitmapset.c:209
int bms_num_members(const Bitmapset *a)
Definition: bitmapset.c:649
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:428
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:739
Bitmapset * bms_del_members(Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:932
Bitmapset * bms_int_members(Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:906
bool bms_is_empty(const Bitmapset *a)
Definition: bitmapset.c:704
bool bms_overlap(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:495
Bitmapset * bms_copy(const Bitmapset *a)
Definition: bitmapset.c:74
Bitmapset * bms_add_range(Bitmapset *a, int lower, int upper)
Definition: bitmapset.c:837
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:382
void ExecReScan(PlanState *node)
Definition: execAmi.c:78
void ExecAsyncRequest(AsyncRequest *areq)
Definition: execAsync.c:26
void ExecAsyncConfigureWait(AsyncRequest *areq)
Definition: execAsync.c:62
void ExecAsyncNotify(AsyncRequest *areq)
Definition: execAsync.c:88
Bitmapset * ExecFindMatchingSubPlans(PartitionPruneState *prunestate, bool initial_prune)
PartitionPruneState * ExecInitPartitionPruning(PlanState *planstate, int n_total_subplans, int part_prune_index, Bitmapset *root_parent_relids, Bitmapset **initially_valid_subplans)
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:557
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:142
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1799
void UpdateChangedParamSet(PlanState *node, Bitmapset *newchg)
Definition: execUtils.c:871
#define EXEC_FLAG_MARK
Definition: executor.h:59
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:257
int j
Definition: isn.c:74
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
int GetNumRegisteredWaitEvents(WaitEventSet *set)
Definition: latch.c:2153
WaitEventSet * CreateWaitEventSet(MemoryContext context, int nevents)
Definition: latch.c:723
int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)
Definition: latch.c:922
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1383
void FreeWaitEventSet(WaitEventSet *set)
Definition: latch.c:837
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
Assert(fmt[strlen(fmt) - 1] !='\n')
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:730
@ LWTRANCHE_PARALLEL_APPEND
Definition: lwlock.h:202
@ LW_EXCLUSIVE
Definition: lwlock.h:115
void * palloc0(Size size)
Definition: mcxt.c:1241
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
void * palloc(Size size)
Definition: mcxt.c:1210
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
void ExecEndAppend(AppendState *node)
Definition: nodeAppend.c:383
static void ExecAppendAsyncBegin(AppendState *node)
Definition: nodeAppend.c:855
static void ExecAppendAsyncEventWait(AppendState *node)
Definition: nodeAppend.c:1009
void ExecReScanAppend(AppendState *node)
Definition: nodeAppend.c:403
static void classify_matching_subplans(AppendState *node)
Definition: nodeAppend.c:1142
AppendState * ExecInitAppend(Append *node, EState *estate, int eflags)
Definition: nodeAppend.c:109
static void mark_invalid_subplans_as_finished(AppendState *node)
Definition: nodeAppend.c:821
static TupleTableSlot * ExecAppend(PlanState *pstate)
Definition: nodeAppend.c:285
static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
Definition: nodeAppend.c:955
static bool choose_next_subplan_for_leader(AppendState *node)
Definition: nodeAppend.c:616
static bool choose_next_subplan_for_worker(AppendState *node)
Definition: nodeAppend.c:697
void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
Definition: nodeAppend.c:523
#define INVALID_SUBPLAN_INDEX
Definition: nodeAppend.c:83
void ExecAsyncAppendResponse(AsyncRequest *areq)
Definition: nodeAppend.c:1096
#define EVENT_BUFFER_SIZE
Definition: nodeAppend.c:84
static bool choose_next_subplan_locally(AppendState *node)
Definition: nodeAppend.c:553
void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
Definition: nodeAppend.c:539
void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt)
Definition: nodeAppend.c:502
void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt)
Definition: nodeAppend.c:483
static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
Definition: nodeAppend.c:906
#define IsA(nodeptr, _type_)
Definition: nodes.h:179
#define makeNode(_type_)
Definition: nodes.h:176
#define castNode(_type_, nodeptr)
Definition: nodes.h:197
static int list_length(const List *l)
Definition: pg_list.h:152
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
#define PGINVALID_SOCKET
Definition: port.h:31
#define ScanDirectionIsForward(direction)
Definition: sdir.h:64
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
struct PartitionPruneState * as_prune_state
Definition: execnodes.h:1352
Bitmapset * as_valid_asyncplans
Definition: execnodes.h:1354
Bitmapset * as_needrequest
Definition: execnodes.h:1345
bool as_syncdone
Definition: execnodes.h:1342
AsyncRequest ** as_asyncrequests
Definition: execnodes.h:1339
bool as_begun
Definition: execnodes.h:1336
Bitmapset * as_asyncplans
Definition: execnodes.h:1337
int as_whichplan
Definition: execnodes.h:1335
int as_nasyncresults
Definition: execnodes.h:1341
struct WaitEventSet * as_eventset
Definition: execnodes.h:1346
bool(* choose_next_subplan)(AppendState *)
Definition: execnodes.h:1355
PlanState ** appendplans
Definition: execnodes.h:1333
int as_first_partial_plan
Definition: execnodes.h:1348
PlanState ps
Definition: execnodes.h:1332
int as_nasyncremain
Definition: execnodes.h:1344
ParallelAppendState * as_pstate
Definition: execnodes.h:1350
Bitmapset * as_valid_subplans
Definition: execnodes.h:1353
Size pstate_len
Definition: execnodes.h:1351
TupleTableSlot ** as_asyncresults
Definition: execnodes.h:1340
int as_nasyncplans
Definition: execnodes.h:1338
int first_partial_plan
Definition: plannodes.h:277
int part_prune_index
Definition: plannodes.h:280
Bitmapset * apprelids
Definition: plannodes.h:269
List * appendplans
Definition: plannodes.h:270
struct PlanState * requestor
Definition: execnodes.h:591
TupleTableSlot * result
Definition: execnodes.h:596
bool request_complete
Definition: execnodes.h:595
int request_index
Definition: execnodes.h:593
bool callback_pending
Definition: execnodes.h:594
struct PlanState * requestee
Definition: execnodes.h:592
ScanDirection es_direction
Definition: execnodes.h:611
struct EPQState * es_epq_active
Definition: execnodes.h:685
Definition: lwlock.h:40
bool pa_finished[FLEXIBLE_ARRAY_MEMBER]
Definition: nodeAppend.c:80
shm_toc_estimator estimator
Definition: parallel.h:42
shm_toc * toc
Definition: parallel.h:45
Bitmapset * execparamids
bool resultopsset
Definition: execnodes.h:1116
Plan * plan
Definition: execnodes.h:1031
EState * state
Definition: execnodes.h:1033
Bitmapset * chgParam
Definition: execnodes.h:1063
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:1069
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:1071
bool resultopsfixed
Definition: execnodes.h:1112
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:1037
bool async_capable
Definition: plannodes.h:150
int plan_node_id
Definition: plannodes.h:155
void * user_data
Definition: latch.h:155
uint32 events
Definition: latch.h:153
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:433
#define TupIsNull(slot)
Definition: tuptable.h:300
@ WAIT_EVENT_APPEND_READY
Definition: wait_event.h:82