PostgreSQL Source Code  git master
nodeAppend.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeAppend.c
4  * routines to handle append nodes.
5  *
6  * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/executor/nodeAppend.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /* INTERFACE ROUTINES
16  * ExecInitAppend - initialize the append node
17  * ExecAppend - retrieve the next tuple from the node
18  * ExecEndAppend - shut down the append node
19  * ExecReScanAppend - rescan the append node
20  *
21  * NOTES
22  * Each append node contains a list of one or more subplans which
23  * must be iteratively processed (forwards or backwards).
24  * Tuples are retrieved by executing the 'whichplan'th subplan
25  * until the subplan stops returning tuples, at which point that
26  * plan is shut down and the next started up.
27  *
28  * Append nodes don't make use of their left and right
29  * subtrees, rather they maintain a list of subplans so
30  * a typical append node looks like this in the plan tree:
31  *
32  * ...
33  * /
34  * Append -------+------+------+--- nil
35  * / \ | | |
36  * nil nil ... ... ...
37  * subplans
38  *
39  * Append nodes are currently used for unions, and to support
40  * inheritance queries, where several relations need to be scanned.
41  * For example, in our standard person/student/employee/student-emp
42  * example, where student and employee inherit from person
43  * and student-emp inherits from student and employee, the
44  * query:
45  *
46  * select name from person
47  *
48  * generates the plan:
49  *
50  * |
51  * Append -------+-------+--------+--------+
52  * / \ | | | |
53  * nil nil Scan Scan Scan Scan
54  * | | | |
55  * person employee student student-emp
56  */
57 
58 #include "postgres.h"
59 
60 #include "executor/execAsync.h"
61 #include "executor/execdebug.h"
62 #include "executor/execPartition.h"
63 #include "executor/nodeAppend.h"
64 #include "miscadmin.h"
65 #include "pgstat.h"
66 #include "storage/latch.h"
67 
68 /* Shared state for parallel-aware Append. */
70 {
71  LWLock pa_lock; /* mutual exclusion to choose next subplan */
72  int pa_next_plan; /* next plan to choose by any worker */
73 
74  /*
75  * pa_finished[i] should be true if no more workers should select subplan
76  * i. for a non-partial plan, this should be set to true as soon as a
77  * worker selects the plan; for a partial plan, it remains false until
78  * some worker executes the plan to completion.
79  */
81 };
82 
83 #define INVALID_SUBPLAN_INDEX -1
84 #define EVENT_BUFFER_SIZE 16
85 
86 static TupleTableSlot *ExecAppend(PlanState *pstate);
87 static bool choose_next_subplan_locally(AppendState *node);
91 static void ExecAppendAsyncBegin(AppendState *node);
92 static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
93 static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
94 static void ExecAppendAsyncEventWait(AppendState *node);
95 static void classify_matching_subplans(AppendState *node);
96 
97 /* ----------------------------------------------------------------
98  * ExecInitAppend
99  *
100  * Begin all of the subscans of the append node.
101  *
102  * (This is potentially wasteful, since the entire result of the
103  * append node may not be scanned, but this way all of the
104  * structures get allocated in the executor's top level memory
105  * block instead of that of the call to ExecAppend.)
106  * ----------------------------------------------------------------
107  */
108 AppendState *
109 ExecInitAppend(Append *node, EState *estate, int eflags)
110 {
111  AppendState *appendstate = makeNode(AppendState);
112  PlanState **appendplanstates;
113  Bitmapset *validsubplans;
114  Bitmapset *asyncplans;
115  int nplans;
116  int nasyncplans;
117  int firstvalid;
118  int i,
119  j;
120 
121  /* check for unsupported flags */
122  Assert(!(eflags & EXEC_FLAG_MARK));
123 
124  /*
125  * create new AppendState for our append node
126  */
127  appendstate->ps.plan = (Plan *) node;
128  appendstate->ps.state = estate;
129  appendstate->ps.ExecProcNode = ExecAppend;
130 
131  /* Let choose_next_subplan_* function handle setting the first subplan */
132  appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
133  appendstate->as_syncdone = false;
134  appendstate->as_begun = false;
135 
136  /* If run-time partition pruning is enabled, then set that up now */
137  if (node->part_prune_info != NULL)
138  {
139  PartitionPruneState *prunestate;
140 
141  /*
142  * Set up pruning data structure. This also initializes the set of
143  * subplans to initialize (validsubplans) by taking into account the
144  * result of performing initial pruning if any.
145  */
146  prunestate = ExecInitPartitionPruning(&appendstate->ps,
147  list_length(node->appendplans),
148  node->part_prune_info,
149  &validsubplans);
150  appendstate->as_prune_state = prunestate;
151  nplans = bms_num_members(validsubplans);
152 
153  /*
154  * When no run-time pruning is required and there's at least one
155  * subplan, we can fill as_valid_subplans immediately, preventing
156  * later calls to ExecFindMatchingSubPlans.
157  */
158  if (!prunestate->do_exec_prune && nplans > 0)
159  appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
160  }
161  else
162  {
163  nplans = list_length(node->appendplans);
164 
165  /*
166  * When run-time partition pruning is not enabled we can just mark all
167  * subplans as valid; they must also all be initialized.
168  */
169  Assert(nplans > 0);
170  appendstate->as_valid_subplans = validsubplans =
171  bms_add_range(NULL, 0, nplans - 1);
172  appendstate->as_prune_state = NULL;
173  }
174 
175  /*
176  * Initialize result tuple type and slot.
177  */
178  ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
179 
180  /* node returns slots from each of its subnodes, therefore not fixed */
181  appendstate->ps.resultopsset = true;
182  appendstate->ps.resultopsfixed = false;
183 
184  appendplanstates = (PlanState **) palloc(nplans *
185  sizeof(PlanState *));
186 
187  /*
188  * call ExecInitNode on each of the valid plans to be executed and save
189  * the results into the appendplanstates array.
190  *
191  * While at it, find out the first valid partial plan.
192  */
193  j = 0;
194  asyncplans = NULL;
195  nasyncplans = 0;
196  firstvalid = nplans;
197  i = -1;
198  while ((i = bms_next_member(validsubplans, i)) >= 0)
199  {
200  Plan *initNode = (Plan *) list_nth(node->appendplans, i);
201 
202  /*
203  * Record async subplans. When executing EvalPlanQual, we treat them
204  * as sync ones; don't do this when initializing an EvalPlanQual plan
205  * tree.
206  */
207  if (initNode->async_capable && estate->es_epq_active == NULL)
208  {
209  asyncplans = bms_add_member(asyncplans, j);
210  nasyncplans++;
211  }
212 
213  /*
214  * Record the lowest appendplans index which is a valid partial plan.
215  */
216  if (i >= node->first_partial_plan && j < firstvalid)
217  firstvalid = j;
218 
219  appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
220  }
221 
222  appendstate->as_first_partial_plan = firstvalid;
223  appendstate->appendplans = appendplanstates;
224  appendstate->as_nplans = nplans;
225 
226  /* Initialize async state */
227  appendstate->as_asyncplans = asyncplans;
228  appendstate->as_nasyncplans = nasyncplans;
229  appendstate->as_asyncrequests = NULL;
230  appendstate->as_asyncresults = NULL;
231  appendstate->as_nasyncresults = 0;
232  appendstate->as_nasyncremain = 0;
233  appendstate->as_needrequest = NULL;
234  appendstate->as_eventset = NULL;
235  appendstate->as_valid_asyncplans = NULL;
236 
237  if (nasyncplans > 0)
238  {
239  appendstate->as_asyncrequests = (AsyncRequest **)
240  palloc0(nplans * sizeof(AsyncRequest *));
241 
242  i = -1;
243  while ((i = bms_next_member(asyncplans, i)) >= 0)
244  {
245  AsyncRequest *areq;
246 
247  areq = palloc(sizeof(AsyncRequest));
248  areq->requestor = (PlanState *) appendstate;
249  areq->requestee = appendplanstates[i];
250  areq->request_index = i;
251  areq->callback_pending = false;
252  areq->request_complete = false;
253  areq->result = NULL;
254 
255  appendstate->as_asyncrequests[i] = areq;
256  }
257 
258  appendstate->as_asyncresults = (TupleTableSlot **)
259  palloc0(nasyncplans * sizeof(TupleTableSlot *));
260 
261  if (appendstate->as_valid_subplans != NULL)
262  classify_matching_subplans(appendstate);
263  }
264 
265  /*
266  * Miscellaneous initialization
267  */
268 
269  appendstate->ps.ps_ProjInfo = NULL;
270 
271  /* For parallel query, this will be overridden later. */
273 
274  return appendstate;
275 }
276 
277 /* ----------------------------------------------------------------
278  * ExecAppend
279  *
280  * Handles iteration over multiple subplans.
281  * ----------------------------------------------------------------
282  */
283 static TupleTableSlot *
285 {
286  AppendState *node = castNode(AppendState, pstate);
287  TupleTableSlot *result;
288 
289  /*
290  * If this is the first call after Init or ReScan, we need to do the
291  * initialization work.
292  */
293  if (!node->as_begun)
294  {
296  Assert(!node->as_syncdone);
297 
298  /* Nothing to do if there are no subplans */
299  if (node->as_nplans == 0)
300  return ExecClearTuple(node->ps.ps_ResultTupleSlot);
301 
302  /* If there are any async subplans, begin executing them. */
303  if (node->as_nasyncplans > 0)
304  ExecAppendAsyncBegin(node);
305 
306  /*
307  * If no sync subplan has been chosen, we must choose one before
308  * proceeding.
309  */
310  if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
311  return ExecClearTuple(node->ps.ps_ResultTupleSlot);
312 
313  Assert(node->as_syncdone ||
314  (node->as_whichplan >= 0 &&
315  node->as_whichplan < node->as_nplans));
316 
317  /* And we're initialized. */
318  node->as_begun = true;
319  }
320 
321  for (;;)
322  {
323  PlanState *subnode;
324 
326 
327  /*
328  * try to get a tuple from an async subplan if any
329  */
330  if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
331  {
332  if (ExecAppendAsyncGetNext(node, &result))
333  return result;
334  Assert(!node->as_syncdone);
336  }
337 
338  /*
339  * figure out which sync subplan we are currently processing
340  */
341  Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
342  subnode = node->appendplans[node->as_whichplan];
343 
344  /*
345  * get a tuple from the subplan
346  */
347  result = ExecProcNode(subnode);
348 
349  if (!TupIsNull(result))
350  {
351  /*
352  * If the subplan gave us something then return it as-is. We do
353  * NOT make use of the result slot that was set up in
354  * ExecInitAppend; there's no need for it.
355  */
356  return result;
357  }
358 
359  /*
360  * wait or poll for async events if any. We do this before checking
361  * for the end of iteration, because it might drain the remaining
362  * async subplans.
363  */
364  if (node->as_nasyncremain > 0)
366 
367  /* choose new sync subplan; if no sync/async subplans, we're done */
368  if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
369  return ExecClearTuple(node->ps.ps_ResultTupleSlot);
370  }
371 }
372 
373 /* ----------------------------------------------------------------
374  * ExecEndAppend
375  *
376  * Shuts down the subscans of the append node.
377  *
378  * Returns nothing of interest.
379  * ----------------------------------------------------------------
380  */
381 void
383 {
384  PlanState **appendplans;
385  int nplans;
386  int i;
387 
388  /*
389  * get information from the node
390  */
391  appendplans = node->appendplans;
392  nplans = node->as_nplans;
393 
394  /*
395  * shut down each of the subscans
396  */
397  for (i = 0; i < nplans; i++)
398  ExecEndNode(appendplans[i]);
399 }
400 
401 void
403 {
404  int nasyncplans = node->as_nasyncplans;
405  int i;
406 
407  /*
408  * If any PARAM_EXEC Params used in pruning expressions have changed, then
409  * we'd better unset the valid subplans so that they are reselected for
410  * the new parameter values.
411  */
412  if (node->as_prune_state &&
413  bms_overlap(node->ps.chgParam,
415  {
417  node->as_valid_subplans = NULL;
418  if (nasyncplans > 0)
419  {
421  node->as_valid_asyncplans = NULL;
422  }
423  }
424 
425  for (i = 0; i < node->as_nplans; i++)
426  {
427  PlanState *subnode = node->appendplans[i];
428 
429  /*
430  * ExecReScan doesn't know about my subplans, so I have to do
431  * changed-parameter signaling myself.
432  */
433  if (node->ps.chgParam != NULL)
434  UpdateChangedParamSet(subnode, node->ps.chgParam);
435 
436  /*
437  * If chgParam of subnode is not null then plan will be re-scanned by
438  * first ExecProcNode or by first ExecAsyncRequest.
439  */
440  if (subnode->chgParam == NULL)
441  ExecReScan(subnode);
442  }
443 
444  /* Reset async state */
445  if (nasyncplans > 0)
446  {
447  i = -1;
448  while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
449  {
450  AsyncRequest *areq = node->as_asyncrequests[i];
451 
452  areq->callback_pending = false;
453  areq->request_complete = false;
454  areq->result = NULL;
455  }
456 
457  node->as_nasyncresults = 0;
458  node->as_nasyncremain = 0;
459  bms_free(node->as_needrequest);
460  node->as_needrequest = NULL;
461  }
462 
463  /* Let choose_next_subplan_* function handle setting the first subplan */
465  node->as_syncdone = false;
466  node->as_begun = false;
467 }
468 
469 /* ----------------------------------------------------------------
470  * Parallel Append Support
471  * ----------------------------------------------------------------
472  */
473 
474 /* ----------------------------------------------------------------
475  * ExecAppendEstimate
476  *
477  * Compute the amount of space we'll need in the parallel
478  * query DSM, and inform pcxt->estimator about our needs.
479  * ----------------------------------------------------------------
480  */
481 void
483  ParallelContext *pcxt)
484 {
485  node->pstate_len =
486  add_size(offsetof(ParallelAppendState, pa_finished),
487  sizeof(bool) * node->as_nplans);
488 
490  shm_toc_estimate_keys(&pcxt->estimator, 1);
491 }
492 
493 
494 /* ----------------------------------------------------------------
495  * ExecAppendInitializeDSM
496  *
497  * Set up shared state for Parallel Append.
498  * ----------------------------------------------------------------
499  */
500 void
502  ParallelContext *pcxt)
503 {
504  ParallelAppendState *pstate;
505 
506  pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
507  memset(pstate, 0, node->pstate_len);
509  shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
510 
511  node->as_pstate = pstate;
513 }
514 
515 /* ----------------------------------------------------------------
516  * ExecAppendReInitializeDSM
517  *
518  * Reset shared state before beginning a fresh scan.
519  * ----------------------------------------------------------------
520  */
521 void
523 {
524  ParallelAppendState *pstate = node->as_pstate;
525 
526  pstate->pa_next_plan = 0;
527  memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
528 }
529 
530 /* ----------------------------------------------------------------
531  * ExecAppendInitializeWorker
532  *
533  * Copy relevant information from TOC into planstate, and initialize
534  * whatever is required to choose and execute the optimal subplan.
535  * ----------------------------------------------------------------
536  */
537 void
539 {
540  node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
542 }
543 
544 /* ----------------------------------------------------------------
545  * choose_next_subplan_locally
546  *
547  * Choose next sync subplan for a non-parallel-aware Append,
548  * returning false if there are no more.
549  * ----------------------------------------------------------------
550  */
551 static bool
553 {
554  int whichplan = node->as_whichplan;
555  int nextplan;
556 
557  /* We should never be called when there are no subplans */
558  Assert(node->as_nplans > 0);
559 
560  /* Nothing to do if syncdone */
561  if (node->as_syncdone)
562  return false;
563 
564  /*
565  * If first call then have the bms member function choose the first valid
566  * sync subplan by initializing whichplan to -1. If there happen to be no
567  * valid sync subplans then the bms member function will handle that by
568  * returning a negative number which will allow us to exit returning a
569  * false value.
570  */
571  if (whichplan == INVALID_SUBPLAN_INDEX)
572  {
573  if (node->as_nasyncplans > 0)
574  {
575  /* We'd have filled as_valid_subplans already */
576  Assert(node->as_valid_subplans);
577  }
578  else if (node->as_valid_subplans == NULL)
579  node->as_valid_subplans =
581 
582  whichplan = -1;
583  }
584 
585  /* Ensure whichplan is within the expected range */
586  Assert(whichplan >= -1 && whichplan <= node->as_nplans);
587 
589  nextplan = bms_next_member(node->as_valid_subplans, whichplan);
590  else
591  nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
592 
593  if (nextplan < 0)
594  {
595  /* Set as_syncdone if in async mode */
596  if (node->as_nasyncplans > 0)
597  node->as_syncdone = true;
598  return false;
599  }
600 
601  node->as_whichplan = nextplan;
602 
603  return true;
604 }
605 
606 /* ----------------------------------------------------------------
607  * choose_next_subplan_for_leader
608  *
609  * Try to pick a plan which doesn't commit us to doing much
610  * work locally, so that as much work as possible is done in
611  * the workers. Cheapest subplans are at the end.
612  * ----------------------------------------------------------------
613  */
614 static bool
616 {
617  ParallelAppendState *pstate = node->as_pstate;
618 
619  /* Backward scan is not supported by parallel-aware plans */
621 
622  /* We should never be called when there are no subplans */
623  Assert(node->as_nplans > 0);
624 
626 
627  if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
628  {
629  /* Mark just-completed subplan as finished. */
630  node->as_pstate->pa_finished[node->as_whichplan] = true;
631  }
632  else
633  {
634  /* Start with last subplan. */
635  node->as_whichplan = node->as_nplans - 1;
636 
637  /*
638  * If we've yet to determine the valid subplans then do so now. If
639  * run-time pruning is disabled then the valid subplans will always be
640  * set to all subplans.
641  */
642  if (node->as_valid_subplans == NULL)
643  {
644  node->as_valid_subplans =
646 
647  /*
648  * Mark each invalid plan as finished to allow the loop below to
649  * select the first valid subplan.
650  */
652  }
653  }
654 
655  /* Loop until we find a subplan to execute. */
656  while (pstate->pa_finished[node->as_whichplan])
657  {
658  if (node->as_whichplan == 0)
659  {
662  LWLockRelease(&pstate->pa_lock);
663  return false;
664  }
665 
666  /*
667  * We needn't pay attention to as_valid_subplans here as all invalid
668  * plans have been marked as finished.
669  */
670  node->as_whichplan--;
671  }
672 
673  /* If non-partial, immediately mark as finished. */
674  if (node->as_whichplan < node->as_first_partial_plan)
675  node->as_pstate->pa_finished[node->as_whichplan] = true;
676 
677  LWLockRelease(&pstate->pa_lock);
678 
679  return true;
680 }
681 
682 /* ----------------------------------------------------------------
683  * choose_next_subplan_for_worker
684  *
685  * Choose next subplan for a parallel-aware Append, returning
686  * false if there are no more.
687  *
688  * We start from the first plan and advance through the list;
689  * when we get back to the end, we loop back to the first
690  * partial plan. This assigns the non-partial plans first in
691  * order of descending cost and then spreads out the workers
692  * as evenly as possible across the remaining partial plans.
693  * ----------------------------------------------------------------
694  */
695 static bool
697 {
698  ParallelAppendState *pstate = node->as_pstate;
699 
700  /* Backward scan is not supported by parallel-aware plans */
702 
703  /* We should never be called when there are no subplans */
704  Assert(node->as_nplans > 0);
705 
707 
708  /* Mark just-completed subplan as finished. */
709  if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
710  node->as_pstate->pa_finished[node->as_whichplan] = true;
711 
712  /*
713  * If we've yet to determine the valid subplans then do so now. If
714  * run-time pruning is disabled then the valid subplans will always be set
715  * to all subplans.
716  */
717  else if (node->as_valid_subplans == NULL)
718  {
719  node->as_valid_subplans =
722  }
723 
724  /* If all the plans are already done, we have nothing to do */
725  if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
726  {
727  LWLockRelease(&pstate->pa_lock);
728  return false;
729  }
730 
731  /* Save the plan from which we are starting the search. */
732  node->as_whichplan = pstate->pa_next_plan;
733 
734  /* Loop until we find a valid subplan to execute. */
735  while (pstate->pa_finished[pstate->pa_next_plan])
736  {
737  int nextplan;
738 
739  nextplan = bms_next_member(node->as_valid_subplans,
740  pstate->pa_next_plan);
741  if (nextplan >= 0)
742  {
743  /* Advance to the next valid plan. */
744  pstate->pa_next_plan = nextplan;
745  }
746  else if (node->as_whichplan > node->as_first_partial_plan)
747  {
748  /*
749  * Try looping back to the first valid partial plan, if there is
750  * one. If there isn't, arrange to bail out below.
751  */
752  nextplan = bms_next_member(node->as_valid_subplans,
753  node->as_first_partial_plan - 1);
754  pstate->pa_next_plan =
755  nextplan < 0 ? node->as_whichplan : nextplan;
756  }
757  else
758  {
759  /*
760  * At last plan, and either there are no partial plans or we've
761  * tried them all. Arrange to bail out.
762  */
763  pstate->pa_next_plan = node->as_whichplan;
764  }
765 
766  if (pstate->pa_next_plan == node->as_whichplan)
767  {
768  /* We've tried everything! */
770  LWLockRelease(&pstate->pa_lock);
771  return false;
772  }
773  }
774 
775  /* Pick the plan we found, and advance pa_next_plan one more time. */
776  node->as_whichplan = pstate->pa_next_plan;
778  pstate->pa_next_plan);
779 
780  /*
781  * If there are no more valid plans then try setting the next plan to the
782  * first valid partial plan.
783  */
784  if (pstate->pa_next_plan < 0)
785  {
786  int nextplan = bms_next_member(node->as_valid_subplans,
787  node->as_first_partial_plan - 1);
788 
789  if (nextplan >= 0)
790  pstate->pa_next_plan = nextplan;
791  else
792  {
793  /*
794  * There are no valid partial plans, and we already chose the last
795  * non-partial plan; so flag that there's nothing more for our
796  * fellow workers to do.
797  */
799  }
800  }
801 
802  /* If non-partial, immediately mark as finished. */
803  if (node->as_whichplan < node->as_first_partial_plan)
804  node->as_pstate->pa_finished[node->as_whichplan] = true;
805 
806  LWLockRelease(&pstate->pa_lock);
807 
808  return true;
809 }
810 
811 /*
812  * mark_invalid_subplans_as_finished
813  * Marks the ParallelAppendState's pa_finished as true for each invalid
814  * subplan.
815  *
816  * This function should only be called for parallel Append with run-time
817  * pruning enabled.
818  */
819 static void
821 {
822  int i;
823 
824  /* Only valid to call this while in parallel Append mode */
825  Assert(node->as_pstate);
826 
827  /* Shouldn't have been called when run-time pruning is not enabled */
828  Assert(node->as_prune_state);
829 
830  /* Nothing to do if all plans are valid */
831  if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
832  return;
833 
834  /* Mark all non-valid plans as finished */
835  for (i = 0; i < node->as_nplans; i++)
836  {
837  if (!bms_is_member(i, node->as_valid_subplans))
838  node->as_pstate->pa_finished[i] = true;
839  }
840 }
841 
842 /* ----------------------------------------------------------------
843  * Asynchronous Append Support
844  * ----------------------------------------------------------------
845  */
846 
847 /* ----------------------------------------------------------------
848  * ExecAppendAsyncBegin
849  *
850  * Begin executing designed async-capable subplans.
851  * ----------------------------------------------------------------
852  */
853 static void
855 {
856  int i;
857 
858  /* Backward scan is not supported by async-aware Appends. */
860 
861  /* We should never be called when there are no subplans */
862  Assert(node->as_nplans > 0);
863 
864  /* We should never be called when there are no async subplans. */
865  Assert(node->as_nasyncplans > 0);
866 
867  /* If we've yet to determine the valid subplans then do so now. */
868  if (node->as_valid_subplans == NULL)
869  {
870  node->as_valid_subplans =
872 
874  }
875 
876  /* Initialize state variables. */
879 
880  /* Nothing to do if there are no valid async subplans. */
881  if (node->as_nasyncremain == 0)
882  return;
883 
884  /* Make a request for each of the valid async subplans. */
885  i = -1;
886  while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
887  {
888  AsyncRequest *areq = node->as_asyncrequests[i];
889 
890  Assert(areq->request_index == i);
891  Assert(!areq->callback_pending);
892 
893  /* Do the actual work. */
894  ExecAsyncRequest(areq);
895  }
896 }
897 
898 /* ----------------------------------------------------------------
899  * ExecAppendAsyncGetNext
900  *
901  * Get the next tuple from any of the asynchronous subplans.
902  * ----------------------------------------------------------------
903  */
904 static bool
906 {
907  *result = NULL;
908 
909  /* We should never be called when there are no valid async subplans. */
910  Assert(node->as_nasyncremain > 0);
911 
912  /* Request a tuple asynchronously. */
913  if (ExecAppendAsyncRequest(node, result))
914  return true;
915 
916  while (node->as_nasyncremain > 0)
917  {
919 
920  /* Wait or poll for async events. */
922 
923  /* Request a tuple asynchronously. */
924  if (ExecAppendAsyncRequest(node, result))
925  return true;
926 
927  /* Break from loop if there's any sync subplan that isn't complete. */
928  if (!node->as_syncdone)
929  break;
930  }
931 
932  /*
933  * If all sync subplans are complete, we're totally done scanning the
934  * given node. Otherwise, we're done with the asynchronous stuff but must
935  * continue scanning the sync subplans.
936  */
937  if (node->as_syncdone)
938  {
939  Assert(node->as_nasyncremain == 0);
940  *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
941  return true;
942  }
943 
944  return false;
945 }
946 
947 /* ----------------------------------------------------------------
948  * ExecAppendAsyncRequest
949  *
950  * Request a tuple asynchronously.
951  * ----------------------------------------------------------------
952  */
953 static bool
955 {
956  Bitmapset *needrequest;
957  int i;
958 
959  /* Nothing to do if there are no async subplans needing a new request. */
960  if (bms_is_empty(node->as_needrequest))
961  {
962  Assert(node->as_nasyncresults == 0);
963  return false;
964  }
965 
966  /*
967  * If there are any asynchronously-generated results that have not yet
968  * been returned, we have nothing to do; just return one of them.
969  */
970  if (node->as_nasyncresults > 0)
971  {
972  --node->as_nasyncresults;
973  *result = node->as_asyncresults[node->as_nasyncresults];
974  return true;
975  }
976 
977  /* Make a new request for each of the async subplans that need it. */
978  needrequest = node->as_needrequest;
979  node->as_needrequest = NULL;
980  i = -1;
981  while ((i = bms_next_member(needrequest, i)) >= 0)
982  {
983  AsyncRequest *areq = node->as_asyncrequests[i];
984 
985  /* Do the actual work. */
986  ExecAsyncRequest(areq);
987  }
988  bms_free(needrequest);
989 
990  /* Return one of the asynchronously-generated results if any. */
991  if (node->as_nasyncresults > 0)
992  {
993  --node->as_nasyncresults;
994  *result = node->as_asyncresults[node->as_nasyncresults];
995  return true;
996  }
997 
998  return false;
999 }
1000 
1001 /* ----------------------------------------------------------------
1002  * ExecAppendAsyncEventWait
1003  *
1004  * Wait or poll for file descriptor events and fire callbacks.
1005  * ----------------------------------------------------------------
1006  */
1007 static void
1009 {
1010  int nevents = node->as_nasyncplans + 1;
1011  long timeout = node->as_syncdone ? -1 : 0;
1012  WaitEvent occurred_event[EVENT_BUFFER_SIZE];
1013  int noccurred;
1014  int i;
1015 
1016  /* We should never be called when there are no valid async subplans. */
1017  Assert(node->as_nasyncremain > 0);
1018 
1021  NULL, NULL);
1022 
1023  /* Give each waiting subplan a chance to add an event. */
1024  i = -1;
1025  while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1026  {
1027  AsyncRequest *areq = node->as_asyncrequests[i];
1028 
1029  if (areq->callback_pending)
1030  ExecAsyncConfigureWait(areq);
1031  }
1032 
1033  /*
1034  * No need for further processing if there are no configured events other
1035  * than the postmaster death event.
1036  */
1037  if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1038  {
1040  node->as_eventset = NULL;
1041  return;
1042  }
1043 
1044  /* We wait on at most EVENT_BUFFER_SIZE events. */
1045  if (nevents > EVENT_BUFFER_SIZE)
1046  nevents = EVENT_BUFFER_SIZE;
1047 
1048  /*
1049  * If the timeout is -1, wait until at least one event occurs. If the
1050  * timeout is 0, poll for events, but do not wait at all.
1051  */
1052  noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1053  nevents, WAIT_EVENT_APPEND_READY);
1055  node->as_eventset = NULL;
1056  if (noccurred == 0)
1057  return;
1058 
1059  /* Deliver notifications. */
1060  for (i = 0; i < noccurred; i++)
1061  {
1062  WaitEvent *w = &occurred_event[i];
1063 
1064  /*
1065  * Each waiting subplan should have registered its wait event with
1066  * user_data pointing back to its AsyncRequest.
1067  */
1068  if ((w->events & WL_SOCKET_READABLE) != 0)
1069  {
1070  AsyncRequest *areq = (AsyncRequest *) w->user_data;
1071 
1072  if (areq->callback_pending)
1073  {
1074  /*
1075  * Mark it as no longer needing a callback. We must do this
1076  * before dispatching the callback in case the callback resets
1077  * the flag.
1078  */
1079  areq->callback_pending = false;
1080 
1081  /* Do the actual work. */
1082  ExecAsyncNotify(areq);
1083  }
1084  }
1085  }
1086 }
1087 
1088 /* ----------------------------------------------------------------
1089  * ExecAsyncAppendResponse
1090  *
1091  * Receive a response from an asynchronous request we made.
1092  * ----------------------------------------------------------------
1093  */
1094 void
1096 {
1097  AppendState *node = (AppendState *) areq->requestor;
1098  TupleTableSlot *slot = areq->result;
1099 
1100  /* The result should be a TupleTableSlot or NULL. */
1101  Assert(slot == NULL || IsA(slot, TupleTableSlot));
1102 
1103  /* Nothing to do if the request is pending. */
1104  if (!areq->request_complete)
1105  {
1106  /* The request would have been pending for a callback. */
1107  Assert(areq->callback_pending);
1108  return;
1109  }
1110 
1111  /* If the result is NULL or an empty slot, there's nothing more to do. */
1112  if (TupIsNull(slot))
1113  {
1114  /* The ending subplan wouldn't have been pending for a callback. */
1115  Assert(!areq->callback_pending);
1116  --node->as_nasyncremain;
1117  return;
1118  }
1119 
1120  /* Save result so we can return it. */
1121  Assert(node->as_nasyncresults < node->as_nasyncplans);
1122  node->as_asyncresults[node->as_nasyncresults++] = slot;
1123 
1124  /*
1125  * Mark the subplan that returned a result as ready for a new request. We
1126  * don't launch another one here immediately because it might complete.
1127  */
1129  areq->request_index);
1130 }
1131 
1132 /* ----------------------------------------------------------------
1133  * classify_matching_subplans
1134  *
1135  * Classify the node's as_valid_subplans into sync ones and
1136  * async ones, adjust it to contain sync ones only, and save
1137  * async ones in the node's as_valid_asyncplans.
1138  * ----------------------------------------------------------------
1139  */
1140 static void
1142 {
1143  Bitmapset *valid_asyncplans;
1144 
1145  Assert(node->as_valid_asyncplans == NULL);
1146 
1147  /* Nothing to do if there are no valid subplans. */
1148  if (bms_is_empty(node->as_valid_subplans))
1149  {
1150  node->as_syncdone = true;
1151  node->as_nasyncremain = 0;
1152  return;
1153  }
1154 
1155  /* Nothing to do if there are no valid async subplans. */
1156  if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1157  {
1158  node->as_nasyncremain = 0;
1159  return;
1160  }
1161 
1162  /* Get valid async subplans. */
1163  valid_asyncplans = bms_copy(node->as_asyncplans);
1164  valid_asyncplans = bms_int_members(valid_asyncplans,
1165  node->as_valid_subplans);
1166 
1167  /* Adjust the valid subplans to contain sync subplans only. */
1169  valid_asyncplans);
1170 
1171  /* Save valid async subplans. */
1172  node->as_valid_asyncplans = valid_asyncplans;
1173 }
int bms_prev_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1104
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1045
void bms_free(Bitmapset *a)
Definition: bitmapset.c:208
int bms_num_members(const Bitmapset *a)
Definition: bitmapset.c:648
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:427
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:738
Bitmapset * bms_del_members(Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:930
Bitmapset * bms_int_members(Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:904
bool bms_is_empty(const Bitmapset *a)
Definition: bitmapset.c:703
bool bms_overlap(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:494
Bitmapset * bms_copy(const Bitmapset *a)
Definition: bitmapset.c:74
Bitmapset * bms_add_range(Bitmapset *a, int lower, int upper)
Definition: bitmapset.c:836
#define offsetof(type, field)
Definition: c.h:727
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:350
void ExecReScan(PlanState *node)
Definition: execAmi.c:78
void ExecAsyncRequest(AsyncRequest *areq)
Definition: execAsync.c:26
void ExecAsyncConfigureWait(AsyncRequest *areq)
Definition: execAsync.c:62
void ExecAsyncNotify(AsyncRequest *areq)
Definition: execAsync.c:88
Bitmapset * ExecFindMatchingSubPlans(PartitionPruneState *prunestate, bool initial_prune)
PartitionPruneState * ExecInitPartitionPruning(PlanState *planstate, int n_total_subplans, PartitionPruneInfo *pruneinfo, Bitmapset **initially_valid_subplans)
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:556
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:141
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1799
void UpdateChangedParamSet(PlanState *node, Bitmapset *newchg)
Definition: execUtils.c:864
#define EXEC_FLAG_MARK
Definition: executor.h:59
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:254
int j
Definition: isn.c:74
int i
Definition: isn.c:73
int GetNumRegisteredWaitEvents(WaitEventSet *set)
Definition: latch.c:2081
WaitEventSet * CreateWaitEventSet(MemoryContext context, int nevents)
Definition: latch.c:682
int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)
Definition: latch.c:861
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1320
void FreeWaitEventSet(WaitEventSet *set)
Definition: latch.c:796
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
Assert(fmt[strlen(fmt) - 1] !='\n')
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1196
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1800
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:734
@ LWTRANCHE_PARALLEL_APPEND
Definition: lwlock.h:191
@ LW_EXCLUSIVE
Definition: lwlock.h:104
void * palloc0(Size size)
Definition: mcxt.c:1099
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
void * palloc(Size size)
Definition: mcxt.c:1068
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
void ExecEndAppend(AppendState *node)
Definition: nodeAppend.c:382
static void ExecAppendAsyncBegin(AppendState *node)
Definition: nodeAppend.c:854
static void ExecAppendAsyncEventWait(AppendState *node)
Definition: nodeAppend.c:1008
void ExecReScanAppend(AppendState *node)
Definition: nodeAppend.c:402
static void classify_matching_subplans(AppendState *node)
Definition: nodeAppend.c:1141
AppendState * ExecInitAppend(Append *node, EState *estate, int eflags)
Definition: nodeAppend.c:109
static void mark_invalid_subplans_as_finished(AppendState *node)
Definition: nodeAppend.c:820
static TupleTableSlot * ExecAppend(PlanState *pstate)
Definition: nodeAppend.c:284
static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
Definition: nodeAppend.c:954
static bool choose_next_subplan_for_leader(AppendState *node)
Definition: nodeAppend.c:615
static bool choose_next_subplan_for_worker(AppendState *node)
Definition: nodeAppend.c:696
void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
Definition: nodeAppend.c:522
#define INVALID_SUBPLAN_INDEX
Definition: nodeAppend.c:83
void ExecAsyncAppendResponse(AsyncRequest *areq)
Definition: nodeAppend.c:1095
#define EVENT_BUFFER_SIZE
Definition: nodeAppend.c:84
static bool choose_next_subplan_locally(AppendState *node)
Definition: nodeAppend.c:552
void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
Definition: nodeAppend.c:538
void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt)
Definition: nodeAppend.c:501
void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt)
Definition: nodeAppend.c:482
static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
Definition: nodeAppend.c:905
#define IsA(nodeptr, _type_)
Definition: nodes.h:624
#define makeNode(_type_)
Definition: nodes.h:621
#define castNode(_type_, nodeptr)
Definition: nodes.h:642
static int list_length(const List *l)
Definition: pg_list.h:149
static void * list_nth(const List *list, int n)
Definition: pg_list.h:278
#define PGINVALID_SOCKET
Definition: port.h:31
#define ScanDirectionIsForward(direction)
Definition: sdir.h:55
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
struct PartitionPruneState * as_prune_state
Definition: execnodes.h:1319
Bitmapset * as_valid_asyncplans
Definition: execnodes.h:1321
Bitmapset * as_needrequest
Definition: execnodes.h:1312
bool as_syncdone
Definition: execnodes.h:1309
AsyncRequest ** as_asyncrequests
Definition: execnodes.h:1306
bool as_begun
Definition: execnodes.h:1303
Bitmapset * as_asyncplans
Definition: execnodes.h:1304
int as_whichplan
Definition: execnodes.h:1302
int as_nasyncresults
Definition: execnodes.h:1308
struct WaitEventSet * as_eventset
Definition: execnodes.h:1313
bool(* choose_next_subplan)(AppendState *)
Definition: execnodes.h:1322
PlanState ** appendplans
Definition: execnodes.h:1300
int as_first_partial_plan
Definition: execnodes.h:1315
PlanState ps
Definition: execnodes.h:1299
int as_nasyncremain
Definition: execnodes.h:1311
ParallelAppendState * as_pstate
Definition: execnodes.h:1317
Bitmapset * as_valid_subplans
Definition: execnodes.h:1320
Size pstate_len
Definition: execnodes.h:1318
TupleTableSlot ** as_asyncresults
Definition: execnodes.h:1307
int as_nasyncplans
Definition: execnodes.h:1305
int first_partial_plan
Definition: plannodes.h:263
struct PartitionPruneInfo * part_prune_info
Definition: plannodes.h:266
List * appendplans
Definition: plannodes.h:256
struct PlanState * requestor
Definition: execnodes.h:569
TupleTableSlot * result
Definition: execnodes.h:574
bool request_complete
Definition: execnodes.h:573
int request_index
Definition: execnodes.h:571
bool callback_pending
Definition: execnodes.h:572
struct PlanState * requestee
Definition: execnodes.h:570
ScanDirection es_direction
Definition: execnodes.h:589
struct EPQState * es_epq_active
Definition: execnodes.h:661
Definition: lwlock.h:32
bool pa_finished[FLEXIBLE_ARRAY_MEMBER]
Definition: nodeAppend.c:80
shm_toc_estimator estimator
Definition: parallel.h:42
shm_toc * toc
Definition: parallel.h:45
Bitmapset * execparamids
bool resultopsset
Definition: execnodes.h:1083
Plan * plan
Definition: execnodes.h:998
EState * state
Definition: execnodes.h:1000
Bitmapset * chgParam
Definition: execnodes.h:1030
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:1036
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:1038
bool resultopsfixed
Definition: execnodes.h:1079
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:1004
bool async_capable
Definition: plannodes.h:136
int plan_node_id
Definition: plannodes.h:141
void * user_data
Definition: latch.h:148
uint32 events
Definition: latch.h:146
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
#define TupIsNull(slot)
Definition: tuptable.h:292
@ WAIT_EVENT_APPEND_READY
Definition: wait_event.h:81