PostgreSQL Source Code git master
nodeAppend.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * nodeAppend.c
4 * routines to handle append nodes.
5 *
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/executor/nodeAppend.c
12 *
13 *-------------------------------------------------------------------------
14 */
15/* INTERFACE ROUTINES
16 * ExecInitAppend - initialize the append node
17 * ExecAppend - retrieve the next tuple from the node
18 * ExecEndAppend - shut down the append node
19 * ExecReScanAppend - rescan the append node
20 *
21 * NOTES
22 * Each append node contains a list of one or more subplans which
23 * must be iteratively processed (forwards or backwards).
24 * Tuples are retrieved by executing the 'whichplan'th subplan
25 * until the subplan stops returning tuples, at which point that
26 * plan is shut down and the next started up.
27 *
28 * Append nodes don't make use of their left and right
29 * subtrees, rather they maintain a list of subplans so
30 * a typical append node looks like this in the plan tree:
31 *
32 * ...
33 * /
34 * Append -------+------+------+--- nil
35 * / \ | | |
36 * nil nil ... ... ...
37 * subplans
38 *
39 * Append nodes are currently used for unions, and to support
40 * inheritance queries, where several relations need to be scanned.
41 * For example, in our standard person/student/employee/student-emp
42 * example, where student and employee inherit from person
43 * and student-emp inherits from student and employee, the
44 * query:
45 *
46 * select name from person
47 *
48 * generates the plan:
49 *
50 * |
51 * Append -------+-------+--------+--------+
52 * / \ | | | |
53 * nil nil Scan Scan Scan Scan
54 * | | | |
55 * person employee student student-emp
56 */
57
58#include "postgres.h"
59
60#include "executor/execAsync.h"
62#include "executor/executor.h"
63#include "executor/nodeAppend.h"
64#include "miscadmin.h"
65#include "pgstat.h"
66#include "storage/latch.h"
67
68/* Shared state for parallel-aware Append. */
70{
71 LWLock pa_lock; /* mutual exclusion to choose next subplan */
72 int pa_next_plan; /* next plan to choose by any worker */
73
74 /*
75 * pa_finished[i] should be true if no more workers should select subplan
76 * i. for a non-partial plan, this should be set to true as soon as a
77 * worker selects the plan; for a partial plan, it remains false until
78 * some worker executes the plan to completion.
79 */
81};
82
83#define INVALID_SUBPLAN_INDEX -1
84#define EVENT_BUFFER_SIZE 16
85
86static TupleTableSlot *ExecAppend(PlanState *pstate);
91static void ExecAppendAsyncBegin(AppendState *node);
92static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
93static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
94static void ExecAppendAsyncEventWait(AppendState *node);
96
97/* ----------------------------------------------------------------
98 * ExecInitAppend
99 *
100 * Begin all of the subscans of the append node.
101 *
102 * (This is potentially wasteful, since the entire result of the
103 * append node may not be scanned, but this way all of the
104 * structures get allocated in the executor's top level memory
105 * block instead of that of the call to ExecAppend.)
106 * ----------------------------------------------------------------
107 */
109ExecInitAppend(Append *node, EState *estate, int eflags)
110{
111 AppendState *appendstate = makeNode(AppendState);
112 PlanState **appendplanstates;
113 const TupleTableSlotOps *appendops;
114 Bitmapset *validsubplans;
115 Bitmapset *asyncplans;
116 int nplans;
117 int nasyncplans;
118 int firstvalid;
119 int i,
120 j;
121
122 /* check for unsupported flags */
123 Assert(!(eflags & EXEC_FLAG_MARK));
124
125 /*
126 * create new AppendState for our append node
127 */
128 appendstate->ps.plan = (Plan *) node;
129 appendstate->ps.state = estate;
130 appendstate->ps.ExecProcNode = ExecAppend;
131
132 /* Let choose_next_subplan_* function handle setting the first subplan */
133 appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
134 appendstate->as_syncdone = false;
135 appendstate->as_begun = false;
136
137 /* If run-time partition pruning is enabled, then set that up now */
138 if (node->part_prune_index >= 0)
139 {
140 PartitionPruneState *prunestate;
141
142 /*
143 * Set up pruning data structure. This also initializes the set of
144 * subplans to initialize (validsubplans) by taking into account the
145 * result of performing initial pruning if any.
146 */
147 prunestate = ExecInitPartitionExecPruning(&appendstate->ps,
149 node->part_prune_index,
150 node->apprelids,
151 &validsubplans);
152 appendstate->as_prune_state = prunestate;
153 nplans = bms_num_members(validsubplans);
154
155 /*
156 * When no run-time pruning is required and there's at least one
157 * subplan, we can fill as_valid_subplans immediately, preventing
158 * later calls to ExecFindMatchingSubPlans.
159 */
160 if (!prunestate->do_exec_prune && nplans > 0)
161 {
162 appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
163 appendstate->as_valid_subplans_identified = true;
164 }
165 }
166 else
167 {
168 nplans = list_length(node->appendplans);
169
170 /*
171 * When run-time partition pruning is not enabled we can just mark all
172 * subplans as valid; they must also all be initialized.
173 */
174 Assert(nplans > 0);
175 appendstate->as_valid_subplans = validsubplans =
176 bms_add_range(NULL, 0, nplans - 1);
177 appendstate->as_valid_subplans_identified = true;
178 appendstate->as_prune_state = NULL;
179 }
180
181 appendplanstates = (PlanState **) palloc(nplans *
182 sizeof(PlanState *));
183
184 /*
185 * call ExecInitNode on each of the valid plans to be executed and save
186 * the results into the appendplanstates array.
187 *
188 * While at it, find out the first valid partial plan.
189 */
190 j = 0;
191 asyncplans = NULL;
192 nasyncplans = 0;
193 firstvalid = nplans;
194 i = -1;
195 while ((i = bms_next_member(validsubplans, i)) >= 0)
196 {
197 Plan *initNode = (Plan *) list_nth(node->appendplans, i);
198
199 /*
200 * Record async subplans. When executing EvalPlanQual, we treat them
201 * as sync ones; don't do this when initializing an EvalPlanQual plan
202 * tree.
203 */
204 if (initNode->async_capable && estate->es_epq_active == NULL)
205 {
206 asyncplans = bms_add_member(asyncplans, j);
207 nasyncplans++;
208 }
209
210 /*
211 * Record the lowest appendplans index which is a valid partial plan.
212 */
213 if (i >= node->first_partial_plan && j < firstvalid)
214 firstvalid = j;
215
216 appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
217 }
218
219 appendstate->as_first_partial_plan = firstvalid;
220 appendstate->appendplans = appendplanstates;
221 appendstate->as_nplans = nplans;
222
223 /*
224 * Initialize Append's result tuple type and slot. If the child plans all
225 * produce the same fixed slot type, we can use that slot type; otherwise
226 * make a virtual slot. (Note that the result slot itself is used only to
227 * return a null tuple at end of execution; real tuples are returned to
228 * the caller in the children's own result slots. What we are doing here
229 * is allowing the parent plan node to optimize if the Append will return
230 * only one kind of slot.)
231 */
232 appendops = ExecGetCommonSlotOps(appendplanstates, j);
233 if (appendops != NULL)
234 {
235 ExecInitResultTupleSlotTL(&appendstate->ps, appendops);
236 }
237 else
238 {
240 /* show that the output slot type is not fixed */
241 appendstate->ps.resultopsset = true;
242 appendstate->ps.resultopsfixed = false;
243 }
244
245 /* Initialize async state */
246 appendstate->as_asyncplans = asyncplans;
247 appendstate->as_nasyncplans = nasyncplans;
248 appendstate->as_asyncrequests = NULL;
249 appendstate->as_asyncresults = NULL;
250 appendstate->as_nasyncresults = 0;
251 appendstate->as_nasyncremain = 0;
252 appendstate->as_needrequest = NULL;
253 appendstate->as_eventset = NULL;
254 appendstate->as_valid_asyncplans = NULL;
255
256 if (nasyncplans > 0)
257 {
258 appendstate->as_asyncrequests = (AsyncRequest **)
259 palloc0(nplans * sizeof(AsyncRequest *));
260
261 i = -1;
262 while ((i = bms_next_member(asyncplans, i)) >= 0)
263 {
264 AsyncRequest *areq;
265
266 areq = palloc(sizeof(AsyncRequest));
267 areq->requestor = (PlanState *) appendstate;
268 areq->requestee = appendplanstates[i];
269 areq->request_index = i;
270 areq->callback_pending = false;
271 areq->request_complete = false;
272 areq->result = NULL;
273
274 appendstate->as_asyncrequests[i] = areq;
275 }
276
277 appendstate->as_asyncresults = (TupleTableSlot **)
278 palloc0(nasyncplans * sizeof(TupleTableSlot *));
279
280 if (appendstate->as_valid_subplans_identified)
281 classify_matching_subplans(appendstate);
282 }
283
284 /*
285 * Miscellaneous initialization
286 */
287
288 appendstate->ps.ps_ProjInfo = NULL;
289
290 /* For parallel query, this will be overridden later. */
292
293 return appendstate;
294}
295
296/* ----------------------------------------------------------------
297 * ExecAppend
298 *
299 * Handles iteration over multiple subplans.
300 * ----------------------------------------------------------------
301 */
302static TupleTableSlot *
304{
305 AppendState *node = castNode(AppendState, pstate);
306 TupleTableSlot *result;
307
308 /*
309 * If this is the first call after Init or ReScan, we need to do the
310 * initialization work.
311 */
312 if (!node->as_begun)
313 {
315 Assert(!node->as_syncdone);
316
317 /* Nothing to do if there are no subplans */
318 if (node->as_nplans == 0)
320
321 /* If there are any async subplans, begin executing them. */
322 if (node->as_nasyncplans > 0)
324
325 /*
326 * If no sync subplan has been chosen, we must choose one before
327 * proceeding.
328 */
329 if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
331
332 Assert(node->as_syncdone ||
333 (node->as_whichplan >= 0 &&
334 node->as_whichplan < node->as_nplans));
335
336 /* And we're initialized. */
337 node->as_begun = true;
338 }
339
340 for (;;)
341 {
342 PlanState *subnode;
343
345
346 /*
347 * try to get a tuple from an async subplan if any
348 */
349 if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
350 {
351 if (ExecAppendAsyncGetNext(node, &result))
352 return result;
353 Assert(!node->as_syncdone);
355 }
356
357 /*
358 * figure out which sync subplan we are currently processing
359 */
360 Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
361 subnode = node->appendplans[node->as_whichplan];
362
363 /*
364 * get a tuple from the subplan
365 */
366 result = ExecProcNode(subnode);
367
368 if (!TupIsNull(result))
369 {
370 /*
371 * If the subplan gave us something then return it as-is. We do
372 * NOT make use of the result slot that was set up in
373 * ExecInitAppend; there's no need for it.
374 */
375 return result;
376 }
377
378 /*
379 * wait or poll for async events if any. We do this before checking
380 * for the end of iteration, because it might drain the remaining
381 * async subplans.
382 */
383 if (node->as_nasyncremain > 0)
385
386 /* choose new sync subplan; if no sync/async subplans, we're done */
387 if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
389 }
390}
391
392/* ----------------------------------------------------------------
393 * ExecEndAppend
394 *
395 * Shuts down the subscans of the append node.
396 *
397 * Returns nothing of interest.
398 * ----------------------------------------------------------------
399 */
400void
402{
403 PlanState **appendplans;
404 int nplans;
405 int i;
406
407 /*
408 * get information from the node
409 */
410 appendplans = node->appendplans;
411 nplans = node->as_nplans;
412
413 /*
414 * shut down each of the subscans
415 */
416 for (i = 0; i < nplans; i++)
417 ExecEndNode(appendplans[i]);
418}
419
420void
422{
423 int nasyncplans = node->as_nasyncplans;
424 int i;
425
426 /*
427 * If any PARAM_EXEC Params used in pruning expressions have changed, then
428 * we'd better unset the valid subplans so that they are reselected for
429 * the new parameter values.
430 */
431 if (node->as_prune_state &&
432 bms_overlap(node->ps.chgParam,
434 {
435 node->as_valid_subplans_identified = false;
437 node->as_valid_subplans = NULL;
439 node->as_valid_asyncplans = NULL;
440 }
441
442 for (i = 0; i < node->as_nplans; i++)
443 {
444 PlanState *subnode = node->appendplans[i];
445
446 /*
447 * ExecReScan doesn't know about my subplans, so I have to do
448 * changed-parameter signaling myself.
449 */
450 if (node->ps.chgParam != NULL)
451 UpdateChangedParamSet(subnode, node->ps.chgParam);
452
453 /*
454 * If chgParam of subnode is not null then plan will be re-scanned by
455 * first ExecProcNode or by first ExecAsyncRequest.
456 */
457 if (subnode->chgParam == NULL)
458 ExecReScan(subnode);
459 }
460
461 /* Reset async state */
462 if (nasyncplans > 0)
463 {
464 i = -1;
465 while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
466 {
467 AsyncRequest *areq = node->as_asyncrequests[i];
468
469 areq->callback_pending = false;
470 areq->request_complete = false;
471 areq->result = NULL;
472 }
473
474 node->as_nasyncresults = 0;
475 node->as_nasyncremain = 0;
477 node->as_needrequest = NULL;
478 }
479
480 /* Let choose_next_subplan_* function handle setting the first subplan */
482 node->as_syncdone = false;
483 node->as_begun = false;
484}
485
486/* ----------------------------------------------------------------
487 * Parallel Append Support
488 * ----------------------------------------------------------------
489 */
490
491/* ----------------------------------------------------------------
492 * ExecAppendEstimate
493 *
494 * Compute the amount of space we'll need in the parallel
495 * query DSM, and inform pcxt->estimator about our needs.
496 * ----------------------------------------------------------------
497 */
498void
500 ParallelContext *pcxt)
501{
502 node->pstate_len =
503 add_size(offsetof(ParallelAppendState, pa_finished),
504 sizeof(bool) * node->as_nplans);
505
508}
509
510
511/* ----------------------------------------------------------------
512 * ExecAppendInitializeDSM
513 *
514 * Set up shared state for Parallel Append.
515 * ----------------------------------------------------------------
516 */
517void
519 ParallelContext *pcxt)
520{
521 ParallelAppendState *pstate;
522
523 pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
524 memset(pstate, 0, node->pstate_len);
526 shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
527
528 node->as_pstate = pstate;
530}
531
532/* ----------------------------------------------------------------
533 * ExecAppendReInitializeDSM
534 *
535 * Reset shared state before beginning a fresh scan.
536 * ----------------------------------------------------------------
537 */
538void
540{
541 ParallelAppendState *pstate = node->as_pstate;
542
543 pstate->pa_next_plan = 0;
544 memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
545}
546
547/* ----------------------------------------------------------------
548 * ExecAppendInitializeWorker
549 *
550 * Copy relevant information from TOC into planstate, and initialize
551 * whatever is required to choose and execute the optimal subplan.
552 * ----------------------------------------------------------------
553 */
554void
556{
557 node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
559}
560
561/* ----------------------------------------------------------------
562 * choose_next_subplan_locally
563 *
564 * Choose next sync subplan for a non-parallel-aware Append,
565 * returning false if there are no more.
566 * ----------------------------------------------------------------
567 */
568static bool
570{
571 int whichplan = node->as_whichplan;
572 int nextplan;
573
574 /* We should never be called when there are no subplans */
575 Assert(node->as_nplans > 0);
576
577 /* Nothing to do if syncdone */
578 if (node->as_syncdone)
579 return false;
580
581 /*
582 * If first call then have the bms member function choose the first valid
583 * sync subplan by initializing whichplan to -1. If there happen to be no
584 * valid sync subplans then the bms member function will handle that by
585 * returning a negative number which will allow us to exit returning a
586 * false value.
587 */
588 if (whichplan == INVALID_SUBPLAN_INDEX)
589 {
590 if (node->as_nasyncplans > 0)
591 {
592 /* We'd have filled as_valid_subplans already */
594 }
595 else if (!node->as_valid_subplans_identified)
596 {
597 node->as_valid_subplans =
598 ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
599 node->as_valid_subplans_identified = true;
600 }
601
602 whichplan = -1;
603 }
604
605 /* Ensure whichplan is within the expected range */
606 Assert(whichplan >= -1 && whichplan <= node->as_nplans);
607
609 nextplan = bms_next_member(node->as_valid_subplans, whichplan);
610 else
611 nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
612
613 if (nextplan < 0)
614 {
615 /* Set as_syncdone if in async mode */
616 if (node->as_nasyncplans > 0)
617 node->as_syncdone = true;
618 return false;
619 }
620
621 node->as_whichplan = nextplan;
622
623 return true;
624}
625
626/* ----------------------------------------------------------------
627 * choose_next_subplan_for_leader
628 *
629 * Try to pick a plan which doesn't commit us to doing much
630 * work locally, so that as much work as possible is done in
631 * the workers. Cheapest subplans are at the end.
632 * ----------------------------------------------------------------
633 */
634static bool
636{
637 ParallelAppendState *pstate = node->as_pstate;
638
639 /* Backward scan is not supported by parallel-aware plans */
641
642 /* We should never be called when there are no subplans */
643 Assert(node->as_nplans > 0);
644
646
648 {
649 /* Mark just-completed subplan as finished. */
650 node->as_pstate->pa_finished[node->as_whichplan] = true;
651 }
652 else
653 {
654 /* Start with last subplan. */
655 node->as_whichplan = node->as_nplans - 1;
656
657 /*
658 * If we've yet to determine the valid subplans then do so now. If
659 * run-time pruning is disabled then the valid subplans will always be
660 * set to all subplans.
661 */
663 {
664 node->as_valid_subplans =
665 ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
666 node->as_valid_subplans_identified = true;
667
668 /*
669 * Mark each invalid plan as finished to allow the loop below to
670 * select the first valid subplan.
671 */
673 }
674 }
675
676 /* Loop until we find a subplan to execute. */
677 while (pstate->pa_finished[node->as_whichplan])
678 {
679 if (node->as_whichplan == 0)
680 {
683 LWLockRelease(&pstate->pa_lock);
684 return false;
685 }
686
687 /*
688 * We needn't pay attention to as_valid_subplans here as all invalid
689 * plans have been marked as finished.
690 */
691 node->as_whichplan--;
692 }
693
694 /* If non-partial, immediately mark as finished. */
695 if (node->as_whichplan < node->as_first_partial_plan)
696 node->as_pstate->pa_finished[node->as_whichplan] = true;
697
698 LWLockRelease(&pstate->pa_lock);
699
700 return true;
701}
702
703/* ----------------------------------------------------------------
704 * choose_next_subplan_for_worker
705 *
706 * Choose next subplan for a parallel-aware Append, returning
707 * false if there are no more.
708 *
709 * We start from the first plan and advance through the list;
710 * when we get back to the end, we loop back to the first
711 * partial plan. This assigns the non-partial plans first in
712 * order of descending cost and then spreads out the workers
713 * as evenly as possible across the remaining partial plans.
714 * ----------------------------------------------------------------
715 */
716static bool
718{
719 ParallelAppendState *pstate = node->as_pstate;
720
721 /* Backward scan is not supported by parallel-aware plans */
723
724 /* We should never be called when there are no subplans */
725 Assert(node->as_nplans > 0);
726
728
729 /* Mark just-completed subplan as finished. */
731 node->as_pstate->pa_finished[node->as_whichplan] = true;
732
733 /*
734 * If we've yet to determine the valid subplans then do so now. If
735 * run-time pruning is disabled then the valid subplans will always be set
736 * to all subplans.
737 */
738 else if (!node->as_valid_subplans_identified)
739 {
740 node->as_valid_subplans =
741 ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
742 node->as_valid_subplans_identified = true;
743
745 }
746
747 /* If all the plans are already done, we have nothing to do */
748 if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
749 {
750 LWLockRelease(&pstate->pa_lock);
751 return false;
752 }
753
754 /* Save the plan from which we are starting the search. */
755 node->as_whichplan = pstate->pa_next_plan;
756
757 /* Loop until we find a valid subplan to execute. */
758 while (pstate->pa_finished[pstate->pa_next_plan])
759 {
760 int nextplan;
761
762 nextplan = bms_next_member(node->as_valid_subplans,
763 pstate->pa_next_plan);
764 if (nextplan >= 0)
765 {
766 /* Advance to the next valid plan. */
767 pstate->pa_next_plan = nextplan;
768 }
769 else if (node->as_whichplan > node->as_first_partial_plan)
770 {
771 /*
772 * Try looping back to the first valid partial plan, if there is
773 * one. If there isn't, arrange to bail out below.
774 */
775 nextplan = bms_next_member(node->as_valid_subplans,
776 node->as_first_partial_plan - 1);
777 pstate->pa_next_plan =
778 nextplan < 0 ? node->as_whichplan : nextplan;
779 }
780 else
781 {
782 /*
783 * At last plan, and either there are no partial plans or we've
784 * tried them all. Arrange to bail out.
785 */
786 pstate->pa_next_plan = node->as_whichplan;
787 }
788
789 if (pstate->pa_next_plan == node->as_whichplan)
790 {
791 /* We've tried everything! */
793 LWLockRelease(&pstate->pa_lock);
794 return false;
795 }
796 }
797
798 /* Pick the plan we found, and advance pa_next_plan one more time. */
799 node->as_whichplan = pstate->pa_next_plan;
801 pstate->pa_next_plan);
802
803 /*
804 * If there are no more valid plans then try setting the next plan to the
805 * first valid partial plan.
806 */
807 if (pstate->pa_next_plan < 0)
808 {
809 int nextplan = bms_next_member(node->as_valid_subplans,
810 node->as_first_partial_plan - 1);
811
812 if (nextplan >= 0)
813 pstate->pa_next_plan = nextplan;
814 else
815 {
816 /*
817 * There are no valid partial plans, and we already chose the last
818 * non-partial plan; so flag that there's nothing more for our
819 * fellow workers to do.
820 */
822 }
823 }
824
825 /* If non-partial, immediately mark as finished. */
826 if (node->as_whichplan < node->as_first_partial_plan)
827 node->as_pstate->pa_finished[node->as_whichplan] = true;
828
829 LWLockRelease(&pstate->pa_lock);
830
831 return true;
832}
833
834/*
835 * mark_invalid_subplans_as_finished
836 * Marks the ParallelAppendState's pa_finished as true for each invalid
837 * subplan.
838 *
839 * This function should only be called for parallel Append with run-time
840 * pruning enabled.
841 */
842static void
844{
845 int i;
846
847 /* Only valid to call this while in parallel Append mode */
848 Assert(node->as_pstate);
849
850 /* Shouldn't have been called when run-time pruning is not enabled */
851 Assert(node->as_prune_state);
852
853 /* Nothing to do if all plans are valid */
854 if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
855 return;
856
857 /* Mark all non-valid plans as finished */
858 for (i = 0; i < node->as_nplans; i++)
859 {
860 if (!bms_is_member(i, node->as_valid_subplans))
861 node->as_pstate->pa_finished[i] = true;
862 }
863}
864
865/* ----------------------------------------------------------------
866 * Asynchronous Append Support
867 * ----------------------------------------------------------------
868 */
869
870/* ----------------------------------------------------------------
871 * ExecAppendAsyncBegin
872 *
873 * Begin executing designed async-capable subplans.
874 * ----------------------------------------------------------------
875 */
876static void
878{
879 int i;
880
881 /* Backward scan is not supported by async-aware Appends. */
883
884 /* We should never be called when there are no subplans */
885 Assert(node->as_nplans > 0);
886
887 /* We should never be called when there are no async subplans. */
888 Assert(node->as_nasyncplans > 0);
889
890 /* If we've yet to determine the valid subplans then do so now. */
892 {
893 node->as_valid_subplans =
894 ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
895 node->as_valid_subplans_identified = true;
896
898 }
899
900 /* Initialize state variables. */
903
904 /* Nothing to do if there are no valid async subplans. */
905 if (node->as_nasyncremain == 0)
906 return;
907
908 /* Make a request for each of the valid async subplans. */
909 i = -1;
910 while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
911 {
912 AsyncRequest *areq = node->as_asyncrequests[i];
913
914 Assert(areq->request_index == i);
915 Assert(!areq->callback_pending);
916
917 /* Do the actual work. */
918 ExecAsyncRequest(areq);
919 }
920}
921
922/* ----------------------------------------------------------------
923 * ExecAppendAsyncGetNext
924 *
925 * Get the next tuple from any of the asynchronous subplans.
926 * ----------------------------------------------------------------
927 */
928static bool
930{
931 *result = NULL;
932
933 /* We should never be called when there are no valid async subplans. */
934 Assert(node->as_nasyncremain > 0);
935
936 /* Request a tuple asynchronously. */
937 if (ExecAppendAsyncRequest(node, result))
938 return true;
939
940 while (node->as_nasyncremain > 0)
941 {
943
944 /* Wait or poll for async events. */
946
947 /* Request a tuple asynchronously. */
948 if (ExecAppendAsyncRequest(node, result))
949 return true;
950
951 /* Break from loop if there's any sync subplan that isn't complete. */
952 if (!node->as_syncdone)
953 break;
954 }
955
956 /*
957 * If all sync subplans are complete, we're totally done scanning the
958 * given node. Otherwise, we're done with the asynchronous stuff but must
959 * continue scanning the sync subplans.
960 */
961 if (node->as_syncdone)
962 {
963 Assert(node->as_nasyncremain == 0);
964 *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
965 return true;
966 }
967
968 return false;
969}
970
971/* ----------------------------------------------------------------
972 * ExecAppendAsyncRequest
973 *
974 * Request a tuple asynchronously.
975 * ----------------------------------------------------------------
976 */
977static bool
979{
980 Bitmapset *needrequest;
981 int i;
982
983 /* Nothing to do if there are no async subplans needing a new request. */
984 if (bms_is_empty(node->as_needrequest))
985 {
986 Assert(node->as_nasyncresults == 0);
987 return false;
988 }
989
990 /*
991 * If there are any asynchronously-generated results that have not yet
992 * been returned, we have nothing to do; just return one of them.
993 */
994 if (node->as_nasyncresults > 0)
995 {
996 --node->as_nasyncresults;
997 *result = node->as_asyncresults[node->as_nasyncresults];
998 return true;
999 }
1000
1001 /* Make a new request for each of the async subplans that need it. */
1002 needrequest = node->as_needrequest;
1003 node->as_needrequest = NULL;
1004 i = -1;
1005 while ((i = bms_next_member(needrequest, i)) >= 0)
1006 {
1007 AsyncRequest *areq = node->as_asyncrequests[i];
1008
1009 /* Do the actual work. */
1010 ExecAsyncRequest(areq);
1011 }
1012 bms_free(needrequest);
1013
1014 /* Return one of the asynchronously-generated results if any. */
1015 if (node->as_nasyncresults > 0)
1016 {
1017 --node->as_nasyncresults;
1018 *result = node->as_asyncresults[node->as_nasyncresults];
1019 return true;
1020 }
1021
1022 return false;
1023}
1024
1025/* ----------------------------------------------------------------
1026 * ExecAppendAsyncEventWait
1027 *
1028 * Wait or poll for file descriptor events and fire callbacks.
1029 * ----------------------------------------------------------------
1030 */
1031static void
1033{
1034 int nevents = node->as_nasyncplans + 1;
1035 long timeout = node->as_syncdone ? -1 : 0;
1036 WaitEvent occurred_event[EVENT_BUFFER_SIZE];
1037 int noccurred;
1038 int i;
1039
1040 /* We should never be called when there are no valid async subplans. */
1041 Assert(node->as_nasyncremain > 0);
1042
1043 Assert(node->as_eventset == NULL);
1046 NULL, NULL);
1047
1048 /* Give each waiting subplan a chance to add an event. */
1049 i = -1;
1050 while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1051 {
1052 AsyncRequest *areq = node->as_asyncrequests[i];
1053
1054 if (areq->callback_pending)
1056 }
1057
1058 /*
1059 * No need for further processing if there are no configured events other
1060 * than the postmaster death event.
1061 */
1063 {
1065 node->as_eventset = NULL;
1066 return;
1067 }
1068
1069 /* Return at most EVENT_BUFFER_SIZE events in one call. */
1070 if (nevents > EVENT_BUFFER_SIZE)
1071 nevents = EVENT_BUFFER_SIZE;
1072
1073 /*
1074 * If the timeout is -1, wait until at least one event occurs. If the
1075 * timeout is 0, poll for events, but do not wait at all.
1076 */
1077 noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1078 nevents, WAIT_EVENT_APPEND_READY);
1080 node->as_eventset = NULL;
1081 if (noccurred == 0)
1082 return;
1083
1084 /* Deliver notifications. */
1085 for (i = 0; i < noccurred; i++)
1086 {
1087 WaitEvent *w = &occurred_event[i];
1088
1089 /*
1090 * Each waiting subplan should have registered its wait event with
1091 * user_data pointing back to its AsyncRequest.
1092 */
1093 if ((w->events & WL_SOCKET_READABLE) != 0)
1094 {
1095 AsyncRequest *areq = (AsyncRequest *) w->user_data;
1096
1097 if (areq->callback_pending)
1098 {
1099 /*
1100 * Mark it as no longer needing a callback. We must do this
1101 * before dispatching the callback in case the callback resets
1102 * the flag.
1103 */
1104 areq->callback_pending = false;
1105
1106 /* Do the actual work. */
1107 ExecAsyncNotify(areq);
1108 }
1109 }
1110 }
1111}
1112
1113/* ----------------------------------------------------------------
1114 * ExecAsyncAppendResponse
1115 *
1116 * Receive a response from an asynchronous request we made.
1117 * ----------------------------------------------------------------
1118 */
1119void
1121{
1122 AppendState *node = (AppendState *) areq->requestor;
1123 TupleTableSlot *slot = areq->result;
1124
1125 /* The result should be a TupleTableSlot or NULL. */
1126 Assert(slot == NULL || IsA(slot, TupleTableSlot));
1127
1128 /* Nothing to do if the request is pending. */
1129 if (!areq->request_complete)
1130 {
1131 /* The request would have been pending for a callback. */
1132 Assert(areq->callback_pending);
1133 return;
1134 }
1135
1136 /* If the result is NULL or an empty slot, there's nothing more to do. */
1137 if (TupIsNull(slot))
1138 {
1139 /* The ending subplan wouldn't have been pending for a callback. */
1140 Assert(!areq->callback_pending);
1141 --node->as_nasyncremain;
1142 return;
1143 }
1144
1145 /* Save result so we can return it. */
1146 Assert(node->as_nasyncresults < node->as_nasyncplans);
1147 node->as_asyncresults[node->as_nasyncresults++] = slot;
1148
1149 /*
1150 * Mark the subplan that returned a result as ready for a new request. We
1151 * don't launch another one here immediately because it might complete.
1152 */
1154 areq->request_index);
1155}
1156
1157/* ----------------------------------------------------------------
1158 * classify_matching_subplans
1159 *
1160 * Classify the node's as_valid_subplans into sync ones and
1161 * async ones, adjust it to contain sync ones only, and save
1162 * async ones in the node's as_valid_asyncplans.
1163 * ----------------------------------------------------------------
1164 */
1165static void
1167{
1168 Bitmapset *valid_asyncplans;
1169
1171 Assert(node->as_valid_asyncplans == NULL);
1172
1173 /* Nothing to do if there are no valid subplans. */
1175 {
1176 node->as_syncdone = true;
1177 node->as_nasyncremain = 0;
1178 return;
1179 }
1180
1181 /* Nothing to do if there are no valid async subplans. */
1182 if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1183 {
1184 node->as_nasyncremain = 0;
1185 return;
1186 }
1187
1188 /* Get valid async subplans. */
1189 valid_asyncplans = bms_intersect(node->as_asyncplans,
1190 node->as_valid_subplans);
1191
1192 /* Adjust the valid subplans to contain sync subplans only. */
1194 valid_asyncplans);
1195
1196 /* Save valid async subplans. */
1197 node->as_valid_asyncplans = valid_asyncplans;
1198}
int bms_prev_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1367
Bitmapset * bms_intersect(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:292
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1306
Bitmapset * bms_del_members(Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:1161
Bitmapset * bms_add_range(Bitmapset *a, int lower, int upper)
Definition: bitmapset.c:1019
void bms_free(Bitmapset *a)
Definition: bitmapset.c:239
int bms_num_members(const Bitmapset *a)
Definition: bitmapset.c:751
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:510
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
bool bms_overlap(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:582
#define bms_is_empty(a)
Definition: bitmapset.h:118
#define Assert(condition)
Definition: c.h:815
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:420
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
void ExecAsyncRequest(AsyncRequest *areq)
Definition: execAsync.c:26
void ExecAsyncConfigureWait(AsyncRequest *areq)
Definition: execAsync.c:62
void ExecAsyncNotify(AsyncRequest *areq)
Definition: execAsync.c:88
PartitionPruneState * ExecInitPartitionExecPruning(PlanState *planstate, int n_total_subplans, int part_prune_index, Bitmapset *relids, Bitmapset **initially_valid_subplans)
Bitmapset * ExecFindMatchingSubPlans(PartitionPruneState *prunestate, bool initial_prune, Bitmapset **validsubplan_rtis)
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:562
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:142
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1986
const TupleTableSlotOps * ExecGetCommonSlotOps(PlanState **planstates, int nplans)
Definition: execUtils.c:537
void UpdateChangedParamSet(PlanState *node, Bitmapset *newchg)
Definition: execUtils.c:900
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:267
#define EXEC_FLAG_MARK
Definition: executor.h:69
int j
Definition: isn.c:73
int i
Definition: isn.c:72
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:76
int GetNumRegisteredWaitEvents(WaitEventSet *set)
Definition: latch.c:2263
int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)
Definition: latch.c:957
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1418
void FreeWaitEventSet(WaitEventSet *set)
Definition: latch.c:868
WaitEventSet * CreateWaitEventSet(ResourceOwner resowner, int nevents)
Definition: latch.c:751
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:707
@ LWTRANCHE_PARALLEL_APPEND
Definition: lwlock.h:201
@ LW_EXCLUSIVE
Definition: lwlock.h:114
void * palloc0(Size size)
Definition: mcxt.c:1347
void * palloc(Size size)
Definition: mcxt.c:1317
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
void ExecEndAppend(AppendState *node)
Definition: nodeAppend.c:401
static void ExecAppendAsyncBegin(AppendState *node)
Definition: nodeAppend.c:877
static void ExecAppendAsyncEventWait(AppendState *node)
Definition: nodeAppend.c:1032
void ExecReScanAppend(AppendState *node)
Definition: nodeAppend.c:421
static void classify_matching_subplans(AppendState *node)
Definition: nodeAppend.c:1166
static void mark_invalid_subplans_as_finished(AppendState *node)
Definition: nodeAppend.c:843
static TupleTableSlot * ExecAppend(PlanState *pstate)
Definition: nodeAppend.c:303
static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
Definition: nodeAppend.c:978
static bool choose_next_subplan_for_leader(AppendState *node)
Definition: nodeAppend.c:635
static bool choose_next_subplan_for_worker(AppendState *node)
Definition: nodeAppend.c:717
void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
Definition: nodeAppend.c:539
#define INVALID_SUBPLAN_INDEX
Definition: nodeAppend.c:83
void ExecAsyncAppendResponse(AsyncRequest *areq)
Definition: nodeAppend.c:1120
#define EVENT_BUFFER_SIZE
Definition: nodeAppend.c:84
AppendState * ExecInitAppend(Append *node, EState *estate, int eflags)
Definition: nodeAppend.c:109
static bool choose_next_subplan_locally(AppendState *node)
Definition: nodeAppend.c:569
void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
Definition: nodeAppend.c:555
void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt)
Definition: nodeAppend.c:518
void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt)
Definition: nodeAppend.c:499
static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
Definition: nodeAppend.c:929
#define IsA(nodeptr, _type_)
Definition: nodes.h:158
#define makeNode(_type_)
Definition: nodes.h:155
#define castNode(_type_, nodeptr)
Definition: nodes.h:176
static int list_length(const List *l)
Definition: pg_list.h:152
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
#define PGINVALID_SOCKET
Definition: port.h:31
ResourceOwner CurrentResourceOwner
Definition: resowner.c:165
#define ScanDirectionIsForward(direction)
Definition: sdir.h:64
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size add_size(Size s1, Size s2)
Definition: shmem.c:488
struct PartitionPruneState * as_prune_state
Definition: execnodes.h:1501
Bitmapset * as_valid_asyncplans
Definition: execnodes.h:1504
Bitmapset * as_needrequest
Definition: execnodes.h:1494
bool as_syncdone
Definition: execnodes.h:1491
AsyncRequest ** as_asyncrequests
Definition: execnodes.h:1488
bool as_begun
Definition: execnodes.h:1485
Bitmapset * as_asyncplans
Definition: execnodes.h:1486
int as_whichplan
Definition: execnodes.h:1484
int as_nasyncresults
Definition: execnodes.h:1490
struct WaitEventSet * as_eventset
Definition: execnodes.h:1495
bool(* choose_next_subplan)(AppendState *)
Definition: execnodes.h:1505
PlanState ** appendplans
Definition: execnodes.h:1482
int as_first_partial_plan
Definition: execnodes.h:1497
PlanState ps
Definition: execnodes.h:1481
int as_nasyncremain
Definition: execnodes.h:1493
ParallelAppendState * as_pstate
Definition: execnodes.h:1499
Bitmapset * as_valid_subplans
Definition: execnodes.h:1503
Size pstate_len
Definition: execnodes.h:1500
TupleTableSlot ** as_asyncresults
Definition: execnodes.h:1489
int as_nasyncplans
Definition: execnodes.h:1487
bool as_valid_subplans_identified
Definition: execnodes.h:1502
int first_partial_plan
Definition: plannodes.h:351
int part_prune_index
Definition: plannodes.h:358
Bitmapset * apprelids
Definition: plannodes.h:342
List * appendplans
Definition: plannodes.h:343
struct PlanState * requestor
Definition: execnodes.h:629
TupleTableSlot * result
Definition: execnodes.h:634
bool request_complete
Definition: execnodes.h:633
int request_index
Definition: execnodes.h:631
bool callback_pending
Definition: execnodes.h:632
struct PlanState * requestee
Definition: execnodes.h:630
ScanDirection es_direction
Definition: execnodes.h:649
struct EPQState * es_epq_active
Definition: execnodes.h:732
Definition: lwlock.h:42
bool pa_finished[FLEXIBLE_ARRAY_MEMBER]
Definition: nodeAppend.c:80
shm_toc_estimator estimator
Definition: parallel.h:41
shm_toc * toc
Definition: parallel.h:44
Bitmapset * execparamids
bool resultopsset
Definition: execnodes.h:1235
Plan * plan
Definition: execnodes.h:1150
EState * state
Definition: execnodes.h:1152
Bitmapset * chgParam
Definition: execnodes.h:1182
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:1188
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:1190
bool resultopsfixed
Definition: execnodes.h:1231
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:1156
bool async_capable
Definition: plannodes.h:191
int plan_node_id
Definition: plannodes.h:197
void * user_data
Definition: latch.h:157
uint32 events
Definition: latch.h:155
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:454
#define TupIsNull(slot)
Definition: tuptable.h:306