PostgreSQL Source Code  git master
nodeAppend.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeAppend.c
4  * routines to handle append nodes.
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/executor/nodeAppend.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /* INTERFACE ROUTINES
16  * ExecInitAppend - initialize the append node
17  * ExecAppend - retrieve the next tuple from the node
18  * ExecEndAppend - shut down the append node
19  * ExecReScanAppend - rescan the append node
20  *
21  * NOTES
22  * Each append node contains a list of one or more subplans which
23  * must be iteratively processed (forwards or backwards).
24  * Tuples are retrieved by executing the 'whichplan'th subplan
25  * until the subplan stops returning tuples, at which point that
26  * plan is shut down and the next started up.
27  *
28  * Append nodes don't make use of their left and right
29  * subtrees, rather they maintain a list of subplans so
30  * a typical append node looks like this in the plan tree:
31  *
32  * ...
33  * /
34  * Append -------+------+------+--- nil
35  * / \ | | |
36  * nil nil ... ... ...
37  * subplans
38  *
39  * Append nodes are currently used for unions, and to support
40  * inheritance queries, where several relations need to be scanned.
41  * For example, in our standard person/student/employee/student-emp
42  * example, where student and employee inherit from person
43  * and student-emp inherits from student and employee, the
44  * query:
45  *
46  * select name from person
47  *
48  * generates the plan:
49  *
50  * |
51  * Append -------+-------+--------+--------+
52  * / \ | | | |
53  * nil nil Scan Scan Scan Scan
54  * | | | |
55  * person employee student student-emp
56  */
57 
58 #include "postgres.h"
59 
60 #include "executor/execAsync.h"
61 #include "executor/execdebug.h"
62 #include "executor/execPartition.h"
63 #include "executor/nodeAppend.h"
64 #include "miscadmin.h"
65 #include "pgstat.h"
66 #include "storage/latch.h"
67 
68 /* Shared state for parallel-aware Append. */
70 {
71  LWLock pa_lock; /* mutual exclusion to choose next subplan */
72  int pa_next_plan; /* next plan to choose by any worker */
73 
74  /*
75  * pa_finished[i] should be true if no more workers should select subplan
76  * i. for a non-partial plan, this should be set to true as soon as a
77  * worker selects the plan; for a partial plan, it remains false until
78  * some worker executes the plan to completion.
79  */
81 };
82 
83 #define INVALID_SUBPLAN_INDEX -1
84 #define EVENT_BUFFER_SIZE 16
85 
86 static TupleTableSlot *ExecAppend(PlanState *pstate);
87 static bool choose_next_subplan_locally(AppendState *node);
91 static void ExecAppendAsyncBegin(AppendState *node);
92 static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
93 static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
94 static void ExecAppendAsyncEventWait(AppendState *node);
95 static void classify_matching_subplans(AppendState *node);
96 
97 /* ----------------------------------------------------------------
98  * ExecInitAppend
99  *
100  * Begin all of the subscans of the append node.
101  *
102  * (This is potentially wasteful, since the entire result of the
103  * append node may not be scanned, but this way all of the
104  * structures get allocated in the executor's top level memory
105  * block instead of that of the call to ExecAppend.)
106  * ----------------------------------------------------------------
107  */
108 AppendState *
109 ExecInitAppend(Append *node, EState *estate, int eflags)
110 {
111  AppendState *appendstate = makeNode(AppendState);
112  PlanState **appendplanstates;
113  Bitmapset *validsubplans;
114  Bitmapset *asyncplans;
115  int nplans;
116  int nasyncplans;
117  int firstvalid;
118  int i,
119  j;
120 
121  /* check for unsupported flags */
122  Assert(!(eflags & EXEC_FLAG_MARK));
123 
124  /*
125  * create new AppendState for our append node
126  */
127  appendstate->ps.plan = (Plan *) node;
128  appendstate->ps.state = estate;
129  appendstate->ps.ExecProcNode = ExecAppend;
130 
131  /* Let choose_next_subplan_* function handle setting the first subplan */
132  appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
133  appendstate->as_syncdone = false;
134  appendstate->as_begun = false;
135 
136  /* If run-time partition pruning is enabled, then set that up now */
137  if (node->part_prune_info != NULL)
138  {
139  PartitionPruneState *prunestate;
140 
141  /* We may need an expression context to evaluate partition exprs */
142  ExecAssignExprContext(estate, &appendstate->ps);
143 
144  /* Create the working data structure for pruning. */
145  prunestate = ExecCreatePartitionPruneState(&appendstate->ps,
146  node->part_prune_info);
147  appendstate->as_prune_state = prunestate;
148 
149  /* Perform an initial partition prune, if required. */
150  if (prunestate->do_initial_prune)
151  {
152  /* Determine which subplans survive initial pruning */
153  validsubplans = ExecFindInitialMatchingSubPlans(prunestate,
154  list_length(node->appendplans));
155 
156  nplans = bms_num_members(validsubplans);
157  }
158  else
159  {
160  /* We'll need to initialize all subplans */
161  nplans = list_length(node->appendplans);
162  Assert(nplans > 0);
163  validsubplans = bms_add_range(NULL, 0, nplans - 1);
164  }
165 
166  /*
167  * When no run-time pruning is required and there's at least one
168  * subplan, we can fill as_valid_subplans immediately, preventing
169  * later calls to ExecFindMatchingSubPlans.
170  */
171  if (!prunestate->do_exec_prune && nplans > 0)
172  appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
173  }
174  else
175  {
176  nplans = list_length(node->appendplans);
177 
178  /*
179  * When run-time partition pruning is not enabled we can just mark all
180  * subplans as valid; they must also all be initialized.
181  */
182  Assert(nplans > 0);
183  appendstate->as_valid_subplans = validsubplans =
184  bms_add_range(NULL, 0, nplans - 1);
185  appendstate->as_prune_state = NULL;
186  }
187 
188  /*
189  * Initialize result tuple type and slot.
190  */
191  ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
192 
193  /* node returns slots from each of its subnodes, therefore not fixed */
194  appendstate->ps.resultopsset = true;
195  appendstate->ps.resultopsfixed = false;
196 
197  appendplanstates = (PlanState **) palloc(nplans *
198  sizeof(PlanState *));
199 
200  /*
201  * call ExecInitNode on each of the valid plans to be executed and save
202  * the results into the appendplanstates array.
203  *
204  * While at it, find out the first valid partial plan.
205  */
206  j = 0;
207  asyncplans = NULL;
208  nasyncplans = 0;
209  firstvalid = nplans;
210  i = -1;
211  while ((i = bms_next_member(validsubplans, i)) >= 0)
212  {
213  Plan *initNode = (Plan *) list_nth(node->appendplans, i);
214 
215  /*
216  * Record async subplans. When executing EvalPlanQual, we treat them
217  * as sync ones; don't do this when initializing an EvalPlanQual plan
218  * tree.
219  */
220  if (initNode->async_capable && estate->es_epq_active == NULL)
221  {
222  asyncplans = bms_add_member(asyncplans, j);
223  nasyncplans++;
224  }
225 
226  /*
227  * Record the lowest appendplans index which is a valid partial plan.
228  */
229  if (i >= node->first_partial_plan && j < firstvalid)
230  firstvalid = j;
231 
232  appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
233  }
234 
235  appendstate->as_first_partial_plan = firstvalid;
236  appendstate->appendplans = appendplanstates;
237  appendstate->as_nplans = nplans;
238 
239  /* Initialize async state */
240  appendstate->as_asyncplans = asyncplans;
241  appendstate->as_nasyncplans = nasyncplans;
242  appendstate->as_asyncrequests = NULL;
243  appendstate->as_asyncresults = NULL;
244  appendstate->as_nasyncresults = 0;
245  appendstate->as_nasyncremain = 0;
246  appendstate->as_needrequest = NULL;
247  appendstate->as_eventset = NULL;
248  appendstate->as_valid_asyncplans = NULL;
249 
250  if (nasyncplans > 0)
251  {
252  appendstate->as_asyncrequests = (AsyncRequest **)
253  palloc0(nplans * sizeof(AsyncRequest *));
254 
255  i = -1;
256  while ((i = bms_next_member(asyncplans, i)) >= 0)
257  {
258  AsyncRequest *areq;
259 
260  areq = palloc(sizeof(AsyncRequest));
261  areq->requestor = (PlanState *) appendstate;
262  areq->requestee = appendplanstates[i];
263  areq->request_index = i;
264  areq->callback_pending = false;
265  areq->request_complete = false;
266  areq->result = NULL;
267 
268  appendstate->as_asyncrequests[i] = areq;
269  }
270 
271  appendstate->as_asyncresults = (TupleTableSlot **)
272  palloc0(nasyncplans * sizeof(TupleTableSlot *));
273 
274  if (appendstate->as_valid_subplans != NULL)
275  classify_matching_subplans(appendstate);
276  }
277 
278  /*
279  * Miscellaneous initialization
280  */
281 
282  appendstate->ps.ps_ProjInfo = NULL;
283 
284  /* For parallel query, this will be overridden later. */
286 
287  return appendstate;
288 }
289 
290 /* ----------------------------------------------------------------
291  * ExecAppend
292  *
293  * Handles iteration over multiple subplans.
294  * ----------------------------------------------------------------
295  */
296 static TupleTableSlot *
298 {
299  AppendState *node = castNode(AppendState, pstate);
300  TupleTableSlot *result;
301 
302  /*
303  * If this is the first call after Init or ReScan, we need to do the
304  * initialization work.
305  */
306  if (!node->as_begun)
307  {
309  Assert(!node->as_syncdone);
310 
311  /* Nothing to do if there are no subplans */
312  if (node->as_nplans == 0)
313  return ExecClearTuple(node->ps.ps_ResultTupleSlot);
314 
315  /* If there are any async subplans, begin executing them. */
316  if (node->as_nasyncplans > 0)
317  ExecAppendAsyncBegin(node);
318 
319  /*
320  * If no sync subplan has been chosen, we must choose one before
321  * proceeding.
322  */
323  if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
324  return ExecClearTuple(node->ps.ps_ResultTupleSlot);
325 
326  Assert(node->as_syncdone ||
327  (node->as_whichplan >= 0 &&
328  node->as_whichplan < node->as_nplans));
329 
330  /* And we're initialized. */
331  node->as_begun = true;
332  }
333 
334  for (;;)
335  {
336  PlanState *subnode;
337 
339 
340  /*
341  * try to get a tuple from an async subplan if any
342  */
343  if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
344  {
345  if (ExecAppendAsyncGetNext(node, &result))
346  return result;
347  Assert(!node->as_syncdone);
349  }
350 
351  /*
352  * figure out which sync subplan we are currently processing
353  */
354  Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
355  subnode = node->appendplans[node->as_whichplan];
356 
357  /*
358  * get a tuple from the subplan
359  */
360  result = ExecProcNode(subnode);
361 
362  if (!TupIsNull(result))
363  {
364  /*
365  * If the subplan gave us something then return it as-is. We do
366  * NOT make use of the result slot that was set up in
367  * ExecInitAppend; there's no need for it.
368  */
369  return result;
370  }
371 
372  /*
373  * wait or poll for async events if any. We do this before checking
374  * for the end of iteration, because it might drain the remaining
375  * async subplans.
376  */
377  if (node->as_nasyncremain > 0)
379 
380  /* choose new sync subplan; if no sync/async subplans, we're done */
381  if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
382  return ExecClearTuple(node->ps.ps_ResultTupleSlot);
383  }
384 }
385 
386 /* ----------------------------------------------------------------
387  * ExecEndAppend
388  *
389  * Shuts down the subscans of the append node.
390  *
391  * Returns nothing of interest.
392  * ----------------------------------------------------------------
393  */
394 void
396 {
397  PlanState **appendplans;
398  int nplans;
399  int i;
400 
401  /*
402  * get information from the node
403  */
404  appendplans = node->appendplans;
405  nplans = node->as_nplans;
406 
407  /*
408  * shut down each of the subscans
409  */
410  for (i = 0; i < nplans; i++)
411  ExecEndNode(appendplans[i]);
412 }
413 
414 void
416 {
417  int nasyncplans = node->as_nasyncplans;
418  int i;
419 
420  /*
421  * If any PARAM_EXEC Params used in pruning expressions have changed, then
422  * we'd better unset the valid subplans so that they are reselected for
423  * the new parameter values.
424  */
425  if (node->as_prune_state &&
426  bms_overlap(node->ps.chgParam,
428  {
430  node->as_valid_subplans = NULL;
431  if (nasyncplans > 0)
432  {
434  node->as_valid_asyncplans = NULL;
435  }
436  }
437 
438  for (i = 0; i < node->as_nplans; i++)
439  {
440  PlanState *subnode = node->appendplans[i];
441 
442  /*
443  * ExecReScan doesn't know about my subplans, so I have to do
444  * changed-parameter signaling myself.
445  */
446  if (node->ps.chgParam != NULL)
447  UpdateChangedParamSet(subnode, node->ps.chgParam);
448 
449  /*
450  * If chgParam of subnode is not null then plan will be re-scanned by
451  * first ExecProcNode or by first ExecAsyncRequest.
452  */
453  if (subnode->chgParam == NULL)
454  ExecReScan(subnode);
455  }
456 
457  /* Reset async state */
458  if (nasyncplans > 0)
459  {
460  i = -1;
461  while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
462  {
463  AsyncRequest *areq = node->as_asyncrequests[i];
464 
465  areq->callback_pending = false;
466  areq->request_complete = false;
467  areq->result = NULL;
468  }
469 
470  node->as_nasyncresults = 0;
471  node->as_nasyncremain = 0;
472  bms_free(node->as_needrequest);
473  node->as_needrequest = NULL;
474  }
475 
476  /* Let choose_next_subplan_* function handle setting the first subplan */
478  node->as_syncdone = false;
479  node->as_begun = false;
480 }
481 
482 /* ----------------------------------------------------------------
483  * Parallel Append Support
484  * ----------------------------------------------------------------
485  */
486 
487 /* ----------------------------------------------------------------
488  * ExecAppendEstimate
489  *
490  * Compute the amount of space we'll need in the parallel
491  * query DSM, and inform pcxt->estimator about our needs.
492  * ----------------------------------------------------------------
493  */
494 void
496  ParallelContext *pcxt)
497 {
498  node->pstate_len =
500  sizeof(bool) * node->as_nplans);
501 
503  shm_toc_estimate_keys(&pcxt->estimator, 1);
504 }
505 
506 
507 /* ----------------------------------------------------------------
508  * ExecAppendInitializeDSM
509  *
510  * Set up shared state for Parallel Append.
511  * ----------------------------------------------------------------
512  */
513 void
515  ParallelContext *pcxt)
516 {
517  ParallelAppendState *pstate;
518 
519  pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
520  memset(pstate, 0, node->pstate_len);
522  shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
523 
524  node->as_pstate = pstate;
526 }
527 
528 /* ----------------------------------------------------------------
529  * ExecAppendReInitializeDSM
530  *
531  * Reset shared state before beginning a fresh scan.
532  * ----------------------------------------------------------------
533  */
534 void
536 {
537  ParallelAppendState *pstate = node->as_pstate;
538 
539  pstate->pa_next_plan = 0;
540  memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
541 }
542 
543 /* ----------------------------------------------------------------
544  * ExecAppendInitializeWorker
545  *
546  * Copy relevant information from TOC into planstate, and initialize
547  * whatever is required to choose and execute the optimal subplan.
548  * ----------------------------------------------------------------
549  */
550 void
552 {
553  node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
555 }
556 
557 /* ----------------------------------------------------------------
558  * choose_next_subplan_locally
559  *
560  * Choose next sync subplan for a non-parallel-aware Append,
561  * returning false if there are no more.
562  * ----------------------------------------------------------------
563  */
564 static bool
566 {
567  int whichplan = node->as_whichplan;
568  int nextplan;
569 
570  /* We should never be called when there are no subplans */
571  Assert(node->as_nplans > 0);
572 
573  /* Nothing to do if syncdone */
574  if (node->as_syncdone)
575  return false;
576 
577  /*
578  * If first call then have the bms member function choose the first valid
579  * sync subplan by initializing whichplan to -1. If there happen to be no
580  * valid sync subplans then the bms member function will handle that by
581  * returning a negative number which will allow us to exit returning a
582  * false value.
583  */
584  if (whichplan == INVALID_SUBPLAN_INDEX)
585  {
586  if (node->as_nasyncplans > 0)
587  {
588  /* We'd have filled as_valid_subplans already */
589  Assert(node->as_valid_subplans);
590  }
591  else if (node->as_valid_subplans == NULL)
592  node->as_valid_subplans =
594 
595  whichplan = -1;
596  }
597 
598  /* Ensure whichplan is within the expected range */
599  Assert(whichplan >= -1 && whichplan <= node->as_nplans);
600 
602  nextplan = bms_next_member(node->as_valid_subplans, whichplan);
603  else
604  nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
605 
606  if (nextplan < 0)
607  {
608  /* Set as_syncdone if in async mode */
609  if (node->as_nasyncplans > 0)
610  node->as_syncdone = true;
611  return false;
612  }
613 
614  node->as_whichplan = nextplan;
615 
616  return true;
617 }
618 
619 /* ----------------------------------------------------------------
620  * choose_next_subplan_for_leader
621  *
622  * Try to pick a plan which doesn't commit us to doing much
623  * work locally, so that as much work as possible is done in
624  * the workers. Cheapest subplans are at the end.
625  * ----------------------------------------------------------------
626  */
627 static bool
629 {
630  ParallelAppendState *pstate = node->as_pstate;
631 
632  /* Backward scan is not supported by parallel-aware plans */
634 
635  /* We should never be called when there are no subplans */
636  Assert(node->as_nplans > 0);
637 
639 
640  if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
641  {
642  /* Mark just-completed subplan as finished. */
643  node->as_pstate->pa_finished[node->as_whichplan] = true;
644  }
645  else
646  {
647  /* Start with last subplan. */
648  node->as_whichplan = node->as_nplans - 1;
649 
650  /*
651  * If we've yet to determine the valid subplans then do so now. If
652  * run-time pruning is disabled then the valid subplans will always be
653  * set to all subplans.
654  */
655  if (node->as_valid_subplans == NULL)
656  {
657  node->as_valid_subplans =
659 
660  /*
661  * Mark each invalid plan as finished to allow the loop below to
662  * select the first valid subplan.
663  */
665  }
666  }
667 
668  /* Loop until we find a subplan to execute. */
669  while (pstate->pa_finished[node->as_whichplan])
670  {
671  if (node->as_whichplan == 0)
672  {
675  LWLockRelease(&pstate->pa_lock);
676  return false;
677  }
678 
679  /*
680  * We needn't pay attention to as_valid_subplans here as all invalid
681  * plans have been marked as finished.
682  */
683  node->as_whichplan--;
684  }
685 
686  /* If non-partial, immediately mark as finished. */
687  if (node->as_whichplan < node->as_first_partial_plan)
688  node->as_pstate->pa_finished[node->as_whichplan] = true;
689 
690  LWLockRelease(&pstate->pa_lock);
691 
692  return true;
693 }
694 
695 /* ----------------------------------------------------------------
696  * choose_next_subplan_for_worker
697  *
698  * Choose next subplan for a parallel-aware Append, returning
699  * false if there are no more.
700  *
701  * We start from the first plan and advance through the list;
702  * when we get back to the end, we loop back to the first
703  * partial plan. This assigns the non-partial plans first in
704  * order of descending cost and then spreads out the workers
705  * as evenly as possible across the remaining partial plans.
706  * ----------------------------------------------------------------
707  */
708 static bool
710 {
711  ParallelAppendState *pstate = node->as_pstate;
712 
713  /* Backward scan is not supported by parallel-aware plans */
715 
716  /* We should never be called when there are no subplans */
717  Assert(node->as_nplans > 0);
718 
720 
721  /* Mark just-completed subplan as finished. */
722  if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
723  node->as_pstate->pa_finished[node->as_whichplan] = true;
724 
725  /*
726  * If we've yet to determine the valid subplans then do so now. If
727  * run-time pruning is disabled then the valid subplans will always be set
728  * to all subplans.
729  */
730  else if (node->as_valid_subplans == NULL)
731  {
732  node->as_valid_subplans =
735  }
736 
737  /* If all the plans are already done, we have nothing to do */
738  if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
739  {
740  LWLockRelease(&pstate->pa_lock);
741  return false;
742  }
743 
744  /* Save the plan from which we are starting the search. */
745  node->as_whichplan = pstate->pa_next_plan;
746 
747  /* Loop until we find a valid subplan to execute. */
748  while (pstate->pa_finished[pstate->pa_next_plan])
749  {
750  int nextplan;
751 
752  nextplan = bms_next_member(node->as_valid_subplans,
753  pstate->pa_next_plan);
754  if (nextplan >= 0)
755  {
756  /* Advance to the next valid plan. */
757  pstate->pa_next_plan = nextplan;
758  }
759  else if (node->as_whichplan > node->as_first_partial_plan)
760  {
761  /*
762  * Try looping back to the first valid partial plan, if there is
763  * one. If there isn't, arrange to bail out below.
764  */
765  nextplan = bms_next_member(node->as_valid_subplans,
766  node->as_first_partial_plan - 1);
767  pstate->pa_next_plan =
768  nextplan < 0 ? node->as_whichplan : nextplan;
769  }
770  else
771  {
772  /*
773  * At last plan, and either there are no partial plans or we've
774  * tried them all. Arrange to bail out.
775  */
776  pstate->pa_next_plan = node->as_whichplan;
777  }
778 
779  if (pstate->pa_next_plan == node->as_whichplan)
780  {
781  /* We've tried everything! */
783  LWLockRelease(&pstate->pa_lock);
784  return false;
785  }
786  }
787 
788  /* Pick the plan we found, and advance pa_next_plan one more time. */
789  node->as_whichplan = pstate->pa_next_plan;
791  pstate->pa_next_plan);
792 
793  /*
794  * If there are no more valid plans then try setting the next plan to the
795  * first valid partial plan.
796  */
797  if (pstate->pa_next_plan < 0)
798  {
799  int nextplan = bms_next_member(node->as_valid_subplans,
800  node->as_first_partial_plan - 1);
801 
802  if (nextplan >= 0)
803  pstate->pa_next_plan = nextplan;
804  else
805  {
806  /*
807  * There are no valid partial plans, and we already chose the last
808  * non-partial plan; so flag that there's nothing more for our
809  * fellow workers to do.
810  */
812  }
813  }
814 
815  /* If non-partial, immediately mark as finished. */
816  if (node->as_whichplan < node->as_first_partial_plan)
817  node->as_pstate->pa_finished[node->as_whichplan] = true;
818 
819  LWLockRelease(&pstate->pa_lock);
820 
821  return true;
822 }
823 
824 /*
825  * mark_invalid_subplans_as_finished
826  * Marks the ParallelAppendState's pa_finished as true for each invalid
827  * subplan.
828  *
829  * This function should only be called for parallel Append with run-time
830  * pruning enabled.
831  */
832 static void
834 {
835  int i;
836 
837  /* Only valid to call this while in parallel Append mode */
838  Assert(node->as_pstate);
839 
840  /* Shouldn't have been called when run-time pruning is not enabled */
841  Assert(node->as_prune_state);
842 
843  /* Nothing to do if all plans are valid */
844  if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
845  return;
846 
847  /* Mark all non-valid plans as finished */
848  for (i = 0; i < node->as_nplans; i++)
849  {
850  if (!bms_is_member(i, node->as_valid_subplans))
851  node->as_pstate->pa_finished[i] = true;
852  }
853 }
854 
855 /* ----------------------------------------------------------------
856  * Asynchronous Append Support
857  * ----------------------------------------------------------------
858  */
859 
860 /* ----------------------------------------------------------------
861  * ExecAppendAsyncBegin
862  *
863  * Begin executing designed async-capable subplans.
864  * ----------------------------------------------------------------
865  */
866 static void
868 {
869  int i;
870 
871  /* Backward scan is not supported by async-aware Appends. */
873 
874  /* We should never be called when there are no subplans */
875  Assert(node->as_nplans > 0);
876 
877  /* We should never be called when there are no async subplans. */
878  Assert(node->as_nasyncplans > 0);
879 
880  /* If we've yet to determine the valid subplans then do so now. */
881  if (node->as_valid_subplans == NULL)
882  {
883  node->as_valid_subplans =
885 
887  }
888 
889  /* Initialize state variables. */
892 
893  /* Nothing to do if there are no valid async subplans. */
894  if (node->as_nasyncremain == 0)
895  return;
896 
897  /* Make a request for each of the valid async subplans. */
898  i = -1;
899  while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
900  {
901  AsyncRequest *areq = node->as_asyncrequests[i];
902 
903  Assert(areq->request_index == i);
904  Assert(!areq->callback_pending);
905 
906  /* Do the actual work. */
907  ExecAsyncRequest(areq);
908  }
909 }
910 
911 /* ----------------------------------------------------------------
912  * ExecAppendAsyncGetNext
913  *
914  * Get the next tuple from any of the asynchronous subplans.
915  * ----------------------------------------------------------------
916  */
917 static bool
919 {
920  *result = NULL;
921 
922  /* We should never be called when there are no valid async subplans. */
923  Assert(node->as_nasyncremain > 0);
924 
925  /* Request a tuple asynchronously. */
926  if (ExecAppendAsyncRequest(node, result))
927  return true;
928 
929  while (node->as_nasyncremain > 0)
930  {
932 
933  /* Wait or poll for async events. */
935 
936  /* Request a tuple asynchronously. */
937  if (ExecAppendAsyncRequest(node, result))
938  return true;
939 
940  /* Break from loop if there's any sync subplan that isn't complete. */
941  if (!node->as_syncdone)
942  break;
943  }
944 
945  /*
946  * If all sync subplans are complete, we're totally done scanning the
947  * given node. Otherwise, we're done with the asynchronous stuff but must
948  * continue scanning the sync subplans.
949  */
950  if (node->as_syncdone)
951  {
952  Assert(node->as_nasyncremain == 0);
953  *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
954  return true;
955  }
956 
957  return false;
958 }
959 
960 /* ----------------------------------------------------------------
961  * ExecAppendAsyncRequest
962  *
963  * Request a tuple asynchronously.
964  * ----------------------------------------------------------------
965  */
966 static bool
968 {
969  Bitmapset *needrequest;
970  int i;
971 
972  /* Nothing to do if there are no async subplans needing a new request. */
973  if (bms_is_empty(node->as_needrequest))
974  {
975  Assert(node->as_nasyncresults == 0);
976  return false;
977  }
978 
979  /*
980  * If there are any asynchronously-generated results that have not yet
981  * been returned, we have nothing to do; just return one of them.
982  */
983  if (node->as_nasyncresults > 0)
984  {
985  --node->as_nasyncresults;
986  *result = node->as_asyncresults[node->as_nasyncresults];
987  return true;
988  }
989 
990  /* Make a new request for each of the async subplans that need it. */
991  needrequest = node->as_needrequest;
992  node->as_needrequest = NULL;
993  i = -1;
994  while ((i = bms_next_member(needrequest, i)) >= 0)
995  {
996  AsyncRequest *areq = node->as_asyncrequests[i];
997 
998  /* Do the actual work. */
999  ExecAsyncRequest(areq);
1000  }
1001  bms_free(needrequest);
1002 
1003  /* Return one of the asynchronously-generated results if any. */
1004  if (node->as_nasyncresults > 0)
1005  {
1006  --node->as_nasyncresults;
1007  *result = node->as_asyncresults[node->as_nasyncresults];
1008  return true;
1009  }
1010 
1011  return false;
1012 }
1013 
1014 /* ----------------------------------------------------------------
1015  * ExecAppendAsyncEventWait
1016  *
1017  * Wait or poll for file descriptor events and fire callbacks.
1018  * ----------------------------------------------------------------
1019  */
1020 static void
1022 {
1023  int nevents = node->as_nasyncplans + 1;
1024  long timeout = node->as_syncdone ? -1 : 0;
1025  WaitEvent occurred_event[EVENT_BUFFER_SIZE];
1026  int noccurred;
1027  int i;
1028 
1029  /* We should never be called when there are no valid async subplans. */
1030  Assert(node->as_nasyncremain > 0);
1031 
1034  NULL, NULL);
1035 
1036  /* Give each waiting subplan a chance to add an event. */
1037  i = -1;
1038  while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1039  {
1040  AsyncRequest *areq = node->as_asyncrequests[i];
1041 
1042  if (areq->callback_pending)
1043  ExecAsyncConfigureWait(areq);
1044  }
1045 
1046  /*
1047  * No need for further processing if there are no configured events other
1048  * than the postmaster death event.
1049  */
1050  if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1051  {
1053  node->as_eventset = NULL;
1054  return;
1055  }
1056 
1057  /* We wait on at most EVENT_BUFFER_SIZE events. */
1058  if (nevents > EVENT_BUFFER_SIZE)
1059  nevents = EVENT_BUFFER_SIZE;
1060 
1061  /*
1062  * If the timeout is -1, wait until at least one event occurs. If the
1063  * timeout is 0, poll for events, but do not wait at all.
1064  */
1065  noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1066  nevents, WAIT_EVENT_APPEND_READY);
1068  node->as_eventset = NULL;
1069  if (noccurred == 0)
1070  return;
1071 
1072  /* Deliver notifications. */
1073  for (i = 0; i < noccurred; i++)
1074  {
1075  WaitEvent *w = &occurred_event[i];
1076 
1077  /*
1078  * Each waiting subplan should have registered its wait event with
1079  * user_data pointing back to its AsyncRequest.
1080  */
1081  if ((w->events & WL_SOCKET_READABLE) != 0)
1082  {
1083  AsyncRequest *areq = (AsyncRequest *) w->user_data;
1084 
1085  if (areq->callback_pending)
1086  {
1087  /*
1088  * Mark it as no longer needing a callback. We must do this
1089  * before dispatching the callback in case the callback resets
1090  * the flag.
1091  */
1092  areq->callback_pending = false;
1093 
1094  /* Do the actual work. */
1095  ExecAsyncNotify(areq);
1096  }
1097  }
1098  }
1099 }
1100 
1101 /* ----------------------------------------------------------------
1102  * ExecAsyncAppendResponse
1103  *
1104  * Receive a response from an asynchronous request we made.
1105  * ----------------------------------------------------------------
1106  */
1107 void
1109 {
1110  AppendState *node = (AppendState *) areq->requestor;
1111  TupleTableSlot *slot = areq->result;
1112 
1113  /* The result should be a TupleTableSlot or NULL. */
1114  Assert(slot == NULL || IsA(slot, TupleTableSlot));
1115 
1116  /* Nothing to do if the request is pending. */
1117  if (!areq->request_complete)
1118  {
1119  /* The request would have been pending for a callback. */
1120  Assert(areq->callback_pending);
1121  return;
1122  }
1123 
1124  /* If the result is NULL or an empty slot, there's nothing more to do. */
1125  if (TupIsNull(slot))
1126  {
1127  /* The ending subplan wouldn't have been pending for a callback. */
1128  Assert(!areq->callback_pending);
1129  --node->as_nasyncremain;
1130  return;
1131  }
1132 
1133  /* Save result so we can return it. */
1134  Assert(node->as_nasyncresults < node->as_nasyncplans);
1135  node->as_asyncresults[node->as_nasyncresults++] = slot;
1136 
1137  /*
1138  * Mark the subplan that returned a result as ready for a new request. We
1139  * don't launch another one here immediately because it might complete.
1140  */
1142  areq->request_index);
1143 }
1144 
1145 /* ----------------------------------------------------------------
1146  * classify_matching_subplans
1147  *
1148  * Classify the node's as_valid_subplans into sync ones and
1149  * async ones, adjust it to contain sync ones only, and save
1150  * async ones in the node's as_valid_asyncplans.
1151  * ----------------------------------------------------------------
1152  */
1153 static void
1155 {
1156  Bitmapset *valid_asyncplans;
1157 
1158  Assert(node->as_valid_asyncplans == NULL);
1159 
1160  /* Nothing to do if there are no valid subplans. */
1161  if (bms_is_empty(node->as_valid_subplans))
1162  {
1163  node->as_syncdone = true;
1164  node->as_nasyncremain = 0;
1165  return;
1166  }
1167 
1168  /* Nothing to do if there are no valid async subplans. */
1169  if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1170  {
1171  node->as_nasyncremain = 0;
1172  return;
1173  }
1174 
1175  /* Get valid async subplans. */
1176  valid_asyncplans = bms_copy(node->as_asyncplans);
1177  valid_asyncplans = bms_int_members(valid_asyncplans,
1178  node->as_valid_subplans);
1179 
1180  /* Adjust the valid subplans to contain sync subplans only. */
1182  valid_asyncplans);
1183 
1184  /* Save valid async subplans. */
1185  node->as_valid_asyncplans = valid_asyncplans;
1186 }
void ExecAsyncRequest(AsyncRequest *areq)
Definition: execAsync.c:26
Size pstate_len
Definition: execnodes.h:1273
static void classify_matching_subplans(AppendState *node)
Definition: nodeAppend.c:1154
Definition: lwlock.h:31
#define IsA(nodeptr, _type_)
Definition: nodes.h:587
void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
Definition: nodeAppend.c:551
static TupleTableSlot * ExecAppend(PlanState *pstate)
Definition: nodeAppend.c:297
Bitmapset * bms_copy(const Bitmapset *a)
Definition: bitmapset.c:74
int as_nasyncremain
Definition: execnodes.h:1266
int as_nasyncplans
Definition: execnodes.h:1260
void FreeWaitEventSet(WaitEventSet *set)
Definition: latch.c:798
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:1007
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
#define ScanDirectionIsForward(direction)
Definition: sdir.h:55
int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)
Definition: latch.c:862
struct PartitionPruneState * as_prune_state
Definition: execnodes.h:1274
Bitmapset * as_asyncplans
Definition: execnodes.h:1259
#define castNode(_type_, nodeptr)
Definition: nodes.h:605
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:556
Bitmapset * as_valid_asyncplans
Definition: execnodes.h:1276
struct PlanState * requestor
Definition: execnodes.h:538
void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
Definition: nodeAppend.c:535
static void ExecAppendAsyncEventWait(AppendState *node)
Definition: nodeAppend.c:1021
shm_toc_estimator estimator
Definition: parallel.h:42
static bool choose_next_subplan_locally(AppendState *node)
Definition: nodeAppend.c:565
void ExecReScan(PlanState *node)
Definition: execAmi.c:78
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1043
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
#define EVENT_BUFFER_SIZE
Definition: nodeAppend.c:84
int plan_node_id
Definition: plannodes.h:140
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:350
AppendState * ExecInitAppend(Append *node, EState *estate, int eflags)
Definition: nodeAppend.c:109
#define WL_SOCKET_READABLE
Definition: latch.h:126
EState * state
Definition: execnodes.h:969
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
int as_nasyncresults
Definition: execnodes.h:1263
void ExecAsyncConfigureWait(AsyncRequest *areq)
Definition: execAsync.c:62
ScanDirection es_direction
Definition: execnodes.h:558
struct EPQState * es_epq_active
Definition: execnodes.h:630
PlanState ps
Definition: execnodes.h:1254
bool as_begun
Definition: execnodes.h:1258
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
List * appendplans
Definition: plannodes.h:253
WaitEventSet * CreateWaitEventSet(MemoryContext context, int nevents)
Definition: latch.c:684
bool async_capable
Definition: plannodes.h:135
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:1005
static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
Definition: nodeAppend.c:918
struct WaitEventSet * as_eventset
Definition: execnodes.h:1268
bool as_syncdone
Definition: execnodes.h:1264
Bitmapset * bms_add_range(Bitmapset *a, int lower, int upper)
Definition: bitmapset.c:834
struct PlanState * requestee
Definition: execnodes.h:539
ParallelAppendState * as_pstate
Definition: execnodes.h:1272
static void * list_nth(const List *list, int n)
Definition: pg_list.h:278
void ExecEndAppend(AppendState *node)
Definition: nodeAppend.c:395
uint32 events
Definition: latch.h:145
int first_partial_plan
Definition: plannodes.h:260
void ExecAsyncNotify(AsyncRequest *areq)
Definition: execAsync.c:88
bool(* choose_next_subplan)(AppendState *)
Definition: execnodes.h:1277
int bms_num_members(const Bitmapset *a)
Definition: bitmapset.c:646
Bitmapset * ExecFindInitialMatchingSubPlans(PartitionPruneState *prunestate, int nsubplans)
#define TupIsNull(slot)
Definition: tuptable.h:292
int as_first_partial_plan
Definition: execnodes.h:1270
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
Bitmapset * execparamids
bool resultopsset
Definition: execnodes.h:1052
Bitmapset * chgParam
Definition: execnodes.h:999
static bool choose_next_subplan_for_worker(AppendState *node)
Definition: nodeAppend.c:709
int GetNumRegisteredWaitEvents(WaitEventSet *set)
Definition: latch.c:2022
#define INVALID_SUBPLAN_INDEX
Definition: nodeAppend.c:83
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:736
bool bms_is_empty(const Bitmapset *a)
Definition: bitmapset.c:701
static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
Definition: nodeAppend.c:967
TupleTableSlot * result
Definition: execnodes.h:543
void * palloc0(Size size)
Definition: mcxt.c:1093
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:973
Bitmapset * as_valid_subplans
Definition: execnodes.h:1275
void ExecReScanAppend(AppendState *node)
Definition: nodeAppend.c:415
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
#define PGINVALID_SOCKET
Definition: port.h:33
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:252
bool pa_finished[FLEXIBLE_ARRAY_MEMBER]
Definition: nodeAppend.c:80
Plan * plan
Definition: execnodes.h:967
int bms_prev_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1102
void UpdateChangedParamSet(PlanState *node, Bitmapset *newchg)
Definition: execUtils.c:864
void ExecAsyncAppendResponse(AsyncRequest *areq)
Definition: nodeAppend.c:1108
void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt)
Definition: nodeAppend.c:514
bool callback_pending
Definition: execnodes.h:541
static void mark_invalid_subplans_as_finished(AppendState *node)
Definition: nodeAppend.c:833
void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt)
Definition: nodeAppend.c:495
void bms_free(Bitmapset *a)
Definition: bitmapset.c:208
#define makeNode(_type_)
Definition: nodes.h:584
struct PartitionPruneInfo * part_prune_info
Definition: plannodes.h:263
#define Assert(condition)
Definition: c.h:804
#define EXEC_FLAG_MARK
Definition: executor.h:59
static bool choose_next_subplan_for_leader(AppendState *node)
Definition: nodeAppend.c:628
int as_whichplan
Definition: execnodes.h:1257
static void ExecAppendAsyncBegin(AppendState *node)
Definition: nodeAppend.c:867
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:480
int request_index
Definition: execnodes.h:540
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
static int list_length(const List *l)
Definition: pg_list.h:149
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1799
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
void * user_data
Definition: latch.h:147
Bitmapset * bms_del_members(Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:928
bool bms_overlap(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:494
bool resultopsfixed
Definition: execnodes.h:1048
AsyncRequest ** as_asyncrequests
Definition: execnodes.h:1261
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * palloc(Size size)
Definition: mcxt.c:1062
PlanState ** appendplans
Definition: execnodes.h:1255
int i
Bitmapset * as_needrequest
Definition: execnodes.h:1267
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
PartitionPruneState * ExecCreatePartitionPruneState(PlanState *planstate, PartitionPruneInfo *partitionpruneinfo)
Bitmapset * bms_int_members(Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:902
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:141
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:427
bool request_complete
Definition: execnodes.h:542
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
#define offsetof(type, field)
Definition: c.h:727
shm_toc * toc
Definition: parallel.h:45
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1306
Bitmapset * ExecFindMatchingSubPlans(PartitionPruneState *prunestate)
TupleTableSlot ** as_asyncresults
Definition: execnodes.h:1262