PostgreSQL Source Code git master
Loading...
Searching...
No Matches
nodeAppend.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * nodeAppend.c
4 * routines to handle append nodes.
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/executor/nodeAppend.c
12 *
13 *-------------------------------------------------------------------------
14 */
15/*
16 * INTERFACE ROUTINES
17 * ExecInitAppend - initialize the append node
18 * ExecAppend - retrieve the next tuple from the node
19 * ExecEndAppend - shut down the append node
20 * ExecReScanAppend - rescan the append node
21 *
22 * NOTES
23 * Each append node contains a list of one or more subplans which
24 * must be iteratively processed (forwards or backwards).
25 * Tuples are retrieved by executing the 'whichplan'th subplan
26 * until the subplan stops returning tuples, at which point that
27 * plan is shut down and the next started up.
28 *
29 * Append nodes don't make use of their left and right
30 * subtrees, rather they maintain a list of subplans so
31 * a typical append node looks like this in the plan tree:
32 *
33 * ...
34 * /
35 * Append -------+------+------+--- nil
36 * / \ | | |
37 * nil nil ... ... ...
38 * subplans
39 *
40 * Append nodes are currently used for unions, and to support
41 * inheritance queries, where several relations need to be scanned.
42 * For example, in our standard person/student/employee/student-emp
43 * example, where student and employee inherit from person
44 * and student-emp inherits from student and employee, the
45 * query:
46 *
47 * select name from person
48 *
49 * generates the plan:
50 *
51 * |
52 * Append -------+-------+--------+--------+
53 * / \ | | | |
54 * nil nil Scan Scan Scan Scan
55 * | | | |
56 * person employee student student-emp
57 */
58
59#include "postgres.h"
60
61#include "executor/execAsync.h"
63#include "executor/executor.h"
64#include "executor/nodeAppend.h"
65#include "miscadmin.h"
66#include "pgstat.h"
67#include "storage/latch.h"
68#include "storage/lwlock.h"
69#include "utils/wait_event.h"
70
71/* Shared state for parallel-aware Append. */
73{
74 LWLock pa_lock; /* mutual exclusion to choose next subplan */
75 int pa_next_plan; /* next plan to choose by any worker */
76
77 /*
78 * pa_finished[i] should be true if no more workers should select subplan
79 * i. for a non-partial plan, this should be set to true as soon as a
80 * worker selects the plan; for a partial plan, it remains false until
81 * some worker executes the plan to completion.
82 */
84};
85
86#define INVALID_SUBPLAN_INDEX -1
87#define EVENT_BUFFER_SIZE 16
88
89static TupleTableSlot *ExecAppend(PlanState *pstate);
94static void ExecAppendAsyncBegin(AppendState *node);
97static void ExecAppendAsyncEventWait(AppendState *node);
99
100/* ----------------------------------------------------------------
101 * ExecInitAppend
102 *
103 * Begin all of the subscans of the append node.
104 *
105 * (This is potentially wasteful, since the entire result of the
106 * append node may not be scanned, but this way all of the
107 * structures get allocated in the executor's top level memory
108 * block instead of that of the call to ExecAppend.)
109 * ----------------------------------------------------------------
110 */
112ExecInitAppend(Append *node, EState *estate, int eflags)
113{
119 int nplans;
120 int nasyncplans;
121 int firstvalid;
122 int i,
123 j;
124
125 /* check for unsupported flags */
126 Assert(!(eflags & EXEC_FLAG_MARK));
127
128 /*
129 * create new AppendState for our append node
130 */
131 appendstate->ps.plan = (Plan *) node;
132 appendstate->ps.state = estate;
133 appendstate->ps.ExecProcNode = ExecAppend;
134
135 /* Let choose_next_subplan_* function handle setting the first subplan */
136 appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
137 appendstate->as_syncdone = false;
138 appendstate->as_begun = false;
139
140 /* If run-time partition pruning is enabled, then set that up now */
141 if (node->part_prune_index >= 0)
142 {
144
145 /*
146 * Set up pruning data structure. This also initializes the set of
147 * subplans to initialize (validsubplans) by taking into account the
148 * result of performing initial pruning if any.
149 */
152 node->part_prune_index,
153 node->apprelids,
155 appendstate->as_prune_state = prunestate;
157
158 /*
159 * When no run-time pruning is required and there's at least one
160 * subplan, we can fill as_valid_subplans immediately, preventing
161 * later calls to ExecFindMatchingSubPlans.
162 */
163 if (!prunestate->do_exec_prune && nplans > 0)
164 {
165 appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
166 appendstate->as_valid_subplans_identified = true;
167 }
168 }
169 else
170 {
171 nplans = list_length(node->appendplans);
172
173 /*
174 * When run-time partition pruning is not enabled we can just mark all
175 * subplans as valid; they must also all be initialized.
176 */
177 Assert(nplans > 0);
178 appendstate->as_valid_subplans = validsubplans =
179 bms_add_range(NULL, 0, nplans - 1);
180 appendstate->as_valid_subplans_identified = true;
181 appendstate->as_prune_state = NULL;
182 }
183
184 appendplanstates = (PlanState **) palloc(nplans *
185 sizeof(PlanState *));
186
187 /*
188 * call ExecInitNode on each of the valid plans to be executed and save
189 * the results into the appendplanstates array.
190 *
191 * While at it, find out the first valid partial plan.
192 */
193 j = 0;
195 nasyncplans = 0;
196 firstvalid = nplans;
197 i = -1;
198 while ((i = bms_next_member(validsubplans, i)) >= 0)
199 {
200 Plan *initNode = (Plan *) list_nth(node->appendplans, i);
201
202 /*
203 * Record async subplans. When executing EvalPlanQual, we treat them
204 * as sync ones; don't do this when initializing an EvalPlanQual plan
205 * tree.
206 */
207 if (initNode->async_capable && estate->es_epq_active == NULL)
208 {
210 nasyncplans++;
211 }
212
213 /*
214 * Record the lowest appendplans index which is a valid partial plan.
215 */
216 if (i >= node->first_partial_plan && j < firstvalid)
217 firstvalid = j;
218
219 appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
220 }
221
222 appendstate->as_first_partial_plan = firstvalid;
223 appendstate->appendplans = appendplanstates;
224 appendstate->as_nplans = nplans;
225
226 /*
227 * Initialize Append's result tuple type and slot. If the child plans all
228 * produce the same fixed slot type, we can use that slot type; otherwise
229 * make a virtual slot. (Note that the result slot itself is used only to
230 * return a null tuple at end of execution; real tuples are returned to
231 * the caller in the children's own result slots. What we are doing here
232 * is allowing the parent plan node to optimize if the Append will return
233 * only one kind of slot.)
234 */
236 if (appendops != NULL)
237 {
239 }
240 else
241 {
243 /* show that the output slot type is not fixed */
244 appendstate->ps.resultopsset = true;
245 appendstate->ps.resultopsfixed = false;
246 }
247
248 /* Initialize async state */
249 appendstate->as_asyncplans = asyncplans;
250 appendstate->as_nasyncplans = nasyncplans;
251 appendstate->as_asyncrequests = NULL;
252 appendstate->as_asyncresults = NULL;
253 appendstate->as_nasyncresults = 0;
254 appendstate->as_nasyncremain = 0;
255 appendstate->as_needrequest = NULL;
256 appendstate->as_eventset = NULL;
257 appendstate->as_valid_asyncplans = NULL;
258
259 if (nasyncplans > 0)
260 {
261 appendstate->as_asyncrequests = (AsyncRequest **)
262 palloc0(nplans * sizeof(AsyncRequest *));
263
264 i = -1;
265 while ((i = bms_next_member(asyncplans, i)) >= 0)
266 {
268
270 areq->requestor = (PlanState *) appendstate;
271 areq->requestee = appendplanstates[i];
272 areq->request_index = i;
273 areq->callback_pending = false;
274 areq->request_complete = false;
275 areq->result = NULL;
276
277 appendstate->as_asyncrequests[i] = areq;
278 }
279
280 appendstate->as_asyncresults = (TupleTableSlot **)
281 palloc0(nasyncplans * sizeof(TupleTableSlot *));
282
283 if (appendstate->as_valid_subplans_identified)
285 }
286
287 /*
288 * Miscellaneous initialization
289 */
290
291 appendstate->ps.ps_ProjInfo = NULL;
292
293 /* For parallel query, this will be overridden later. */
294 appendstate->choose_next_subplan = choose_next_subplan_locally;
295
296 return appendstate;
297}
298
299/* ----------------------------------------------------------------
300 * ExecAppend
301 *
302 * Handles iteration over multiple subplans.
303 * ----------------------------------------------------------------
304 */
305static TupleTableSlot *
307{
308 AppendState *node = castNode(AppendState, pstate);
310
311 /*
312 * If this is the first call after Init or ReScan, we need to do the
313 * initialization work.
314 */
315 if (!node->as_begun)
316 {
318 Assert(!node->as_syncdone);
319
320 /* Nothing to do if there are no subplans */
321 if (node->as_nplans == 0)
323
324 /* If there are any async subplans, begin executing them. */
325 if (node->as_nasyncplans > 0)
327
328 /*
329 * If no sync subplan has been chosen, we must choose one before
330 * proceeding.
331 */
332 if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
334
335 Assert(node->as_syncdone ||
336 (node->as_whichplan >= 0 &&
337 node->as_whichplan < node->as_nplans));
338
339 /* And we're initialized. */
340 node->as_begun = true;
341 }
342
343 for (;;)
344 {
346
348
349 /*
350 * try to get a tuple from an async subplan if any
351 */
352 if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
353 {
354 if (ExecAppendAsyncGetNext(node, &result))
355 return result;
356 Assert(!node->as_syncdone);
358 }
359
360 /*
361 * figure out which sync subplan we are currently processing
362 */
363 Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
364 subnode = node->appendplans[node->as_whichplan];
365
366 /*
367 * get a tuple from the subplan
368 */
370
371 if (!TupIsNull(result))
372 {
373 /*
374 * If the subplan gave us something then return it as-is. We do
375 * NOT make use of the result slot that was set up in
376 * ExecInitAppend; there's no need for it.
377 */
378 return result;
379 }
380
381 /*
382 * wait or poll for async events if any. We do this before checking
383 * for the end of iteration, because it might drain the remaining
384 * async subplans.
385 */
386 if (node->as_nasyncremain > 0)
388
389 /* choose new sync subplan; if no sync/async subplans, we're done */
390 if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
392 }
393}
394
395/* ----------------------------------------------------------------
396 * ExecEndAppend
397 *
398 * Shuts down the subscans of the append node.
399 *
400 * Returns nothing of interest.
401 * ----------------------------------------------------------------
402 */
403void
405{
406 PlanState **appendplans;
407 int nplans;
408 int i;
409
410 /*
411 * get information from the node
412 */
413 appendplans = node->appendplans;
414 nplans = node->as_nplans;
415
416 /*
417 * shut down each of the subscans
418 */
419 for (i = 0; i < nplans; i++)
420 ExecEndNode(appendplans[i]);
421}
422
423void
425{
426 int nasyncplans = node->as_nasyncplans;
427 int i;
428
429 /*
430 * If any PARAM_EXEC Params used in pruning expressions have changed, then
431 * we'd better unset the valid subplans so that they are reselected for
432 * the new parameter values.
433 */
434 if (node->as_prune_state &&
435 bms_overlap(node->ps.chgParam,
437 {
438 node->as_valid_subplans_identified = false;
440 node->as_valid_subplans = NULL;
443 }
444
445 for (i = 0; i < node->as_nplans; i++)
446 {
447 PlanState *subnode = node->appendplans[i];
448
449 /*
450 * ExecReScan doesn't know about my subplans, so I have to do
451 * changed-parameter signaling myself.
452 */
453 if (node->ps.chgParam != NULL)
455
456 /*
457 * If chgParam of subnode is not null then plan will be re-scanned by
458 * first ExecProcNode or by first ExecAsyncRequest.
459 */
460 if (subnode->chgParam == NULL)
462 }
463
464 /* Reset async state */
465 if (nasyncplans > 0)
466 {
467 i = -1;
468 while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
469 {
471
472 areq->callback_pending = false;
473 areq->request_complete = false;
474 areq->result = NULL;
475 }
476
477 node->as_nasyncresults = 0;
478 node->as_nasyncremain = 0;
480 node->as_needrequest = NULL;
481 }
482
483 /* Let choose_next_subplan_* function handle setting the first subplan */
485 node->as_syncdone = false;
486 node->as_begun = false;
487}
488
489/* ----------------------------------------------------------------
490 * Parallel Append Support
491 * ----------------------------------------------------------------
492 */
493
494/* ----------------------------------------------------------------
495 * ExecAppendEstimate
496 *
497 * Compute the amount of space we'll need in the parallel
498 * query DSM, and inform pcxt->estimator about our needs.
499 * ----------------------------------------------------------------
500 */
501void
503 ParallelContext *pcxt)
504{
505 node->pstate_len =
507 sizeof(bool) * node->as_nplans);
508
511}
512
513
514/* ----------------------------------------------------------------
515 * ExecAppendInitializeDSM
516 *
517 * Set up shared state for Parallel Append.
518 * ----------------------------------------------------------------
519 */
520void
522 ParallelContext *pcxt)
523{
524 ParallelAppendState *pstate;
525
526 pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
527 memset(pstate, 0, node->pstate_len);
529 shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
530
531 node->as_pstate = pstate;
533}
534
535/* ----------------------------------------------------------------
536 * ExecAppendReInitializeDSM
537 *
538 * Reset shared state before beginning a fresh scan.
539 * ----------------------------------------------------------------
540 */
541void
543{
544 ParallelAppendState *pstate = node->as_pstate;
545
546 pstate->pa_next_plan = 0;
547 memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
548}
549
550/* ----------------------------------------------------------------
551 * ExecAppendInitializeWorker
552 *
553 * Copy relevant information from TOC into planstate, and initialize
554 * whatever is required to choose and execute the optimal subplan.
555 * ----------------------------------------------------------------
556 */
557void
563
564/* ----------------------------------------------------------------
565 * choose_next_subplan_locally
566 *
567 * Choose next sync subplan for a non-parallel-aware Append,
568 * returning false if there are no more.
569 * ----------------------------------------------------------------
570 */
571static bool
573{
574 int whichplan = node->as_whichplan;
575 int nextplan;
576
577 /* We should never be called when there are no subplans */
578 Assert(node->as_nplans > 0);
579
580 /* Nothing to do if syncdone */
581 if (node->as_syncdone)
582 return false;
583
584 /*
585 * If first call then have the bms member function choose the first valid
586 * sync subplan by initializing whichplan to -1. If there happen to be no
587 * valid sync subplans then the bms member function will handle that by
588 * returning a negative number which will allow us to exit returning a
589 * false value.
590 */
592 {
593 if (node->as_nasyncplans > 0)
594 {
595 /* We'd have filled as_valid_subplans already */
597 }
598 else if (!node->as_valid_subplans_identified)
599 {
600 node->as_valid_subplans =
602 node->as_valid_subplans_identified = true;
603 }
604
605 whichplan = -1;
606 }
607
608 /* Ensure whichplan is within the expected range */
609 Assert(whichplan >= -1 && whichplan <= node->as_nplans);
610
613 else
615
616 if (nextplan < 0)
617 {
618 /* Set as_syncdone if in async mode */
619 if (node->as_nasyncplans > 0)
620 node->as_syncdone = true;
621 return false;
622 }
623
624 node->as_whichplan = nextplan;
625
626 return true;
627}
628
629/* ----------------------------------------------------------------
630 * choose_next_subplan_for_leader
631 *
632 * Try to pick a plan which doesn't commit us to doing much
633 * work locally, so that as much work as possible is done in
634 * the workers. Cheapest subplans are at the end.
635 * ----------------------------------------------------------------
636 */
637static bool
639{
640 ParallelAppendState *pstate = node->as_pstate;
641
642 /* Backward scan is not supported by parallel-aware plans */
644
645 /* We should never be called when there are no subplans */
646 Assert(node->as_nplans > 0);
647
649
651 {
652 /* Mark just-completed subplan as finished. */
653 node->as_pstate->pa_finished[node->as_whichplan] = true;
654 }
655 else
656 {
657 /* Start with last subplan. */
658 node->as_whichplan = node->as_nplans - 1;
659
660 /*
661 * If we've yet to determine the valid subplans then do so now. If
662 * run-time pruning is disabled then the valid subplans will always be
663 * set to all subplans.
664 */
666 {
667 node->as_valid_subplans =
669 node->as_valid_subplans_identified = true;
670
671 /*
672 * Mark each invalid plan as finished to allow the loop below to
673 * select the first valid subplan.
674 */
676 }
677 }
678
679 /* Loop until we find a subplan to execute. */
680 while (pstate->pa_finished[node->as_whichplan])
681 {
682 if (node->as_whichplan == 0)
683 {
686 LWLockRelease(&pstate->pa_lock);
687 return false;
688 }
689
690 /*
691 * We needn't pay attention to as_valid_subplans here as all invalid
692 * plans have been marked as finished.
693 */
694 node->as_whichplan--;
695 }
696
697 /* If non-partial, immediately mark as finished. */
698 if (node->as_whichplan < node->as_first_partial_plan)
699 node->as_pstate->pa_finished[node->as_whichplan] = true;
700
701 LWLockRelease(&pstate->pa_lock);
702
703 return true;
704}
705
706/* ----------------------------------------------------------------
707 * choose_next_subplan_for_worker
708 *
709 * Choose next subplan for a parallel-aware Append, returning
710 * false if there are no more.
711 *
712 * We start from the first plan and advance through the list;
713 * when we get back to the end, we loop back to the first
714 * partial plan. This assigns the non-partial plans first in
715 * order of descending cost and then spreads out the workers
716 * as evenly as possible across the remaining partial plans.
717 * ----------------------------------------------------------------
718 */
719static bool
721{
722 ParallelAppendState *pstate = node->as_pstate;
723
724 /* Backward scan is not supported by parallel-aware plans */
726
727 /* We should never be called when there are no subplans */
728 Assert(node->as_nplans > 0);
729
731
732 /* Mark just-completed subplan as finished. */
734 node->as_pstate->pa_finished[node->as_whichplan] = true;
735
736 /*
737 * If we've yet to determine the valid subplans then do so now. If
738 * run-time pruning is disabled then the valid subplans will always be set
739 * to all subplans.
740 */
741 else if (!node->as_valid_subplans_identified)
742 {
743 node->as_valid_subplans =
745 node->as_valid_subplans_identified = true;
746
748 }
749
750 /* If all the plans are already done, we have nothing to do */
751 if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
752 {
753 LWLockRelease(&pstate->pa_lock);
754 return false;
755 }
756
757 /* Save the plan from which we are starting the search. */
758 node->as_whichplan = pstate->pa_next_plan;
759
760 /* Loop until we find a valid subplan to execute. */
761 while (pstate->pa_finished[pstate->pa_next_plan])
762 {
763 int nextplan;
764
766 pstate->pa_next_plan);
767 if (nextplan >= 0)
768 {
769 /* Advance to the next valid plan. */
770 pstate->pa_next_plan = nextplan;
771 }
772 else if (node->as_whichplan > node->as_first_partial_plan)
773 {
774 /*
775 * Try looping back to the first valid partial plan, if there is
776 * one. If there isn't, arrange to bail out below.
777 */
779 node->as_first_partial_plan - 1);
780 pstate->pa_next_plan =
781 nextplan < 0 ? node->as_whichplan : nextplan;
782 }
783 else
784 {
785 /*
786 * At last plan, and either there are no partial plans or we've
787 * tried them all. Arrange to bail out.
788 */
789 pstate->pa_next_plan = node->as_whichplan;
790 }
791
792 if (pstate->pa_next_plan == node->as_whichplan)
793 {
794 /* We've tried everything! */
796 LWLockRelease(&pstate->pa_lock);
797 return false;
798 }
799 }
800
801 /* Pick the plan we found, and advance pa_next_plan one more time. */
802 node->as_whichplan = pstate->pa_next_plan;
804 pstate->pa_next_plan);
805
806 /*
807 * If there are no more valid plans then try setting the next plan to the
808 * first valid partial plan.
809 */
810 if (pstate->pa_next_plan < 0)
811 {
813 node->as_first_partial_plan - 1);
814
815 if (nextplan >= 0)
816 pstate->pa_next_plan = nextplan;
817 else
818 {
819 /*
820 * There are no valid partial plans, and we already chose the last
821 * non-partial plan; so flag that there's nothing more for our
822 * fellow workers to do.
823 */
825 }
826 }
827
828 /* If non-partial, immediately mark as finished. */
829 if (node->as_whichplan < node->as_first_partial_plan)
830 node->as_pstate->pa_finished[node->as_whichplan] = true;
831
832 LWLockRelease(&pstate->pa_lock);
833
834 return true;
835}
836
837/*
838 * mark_invalid_subplans_as_finished
839 * Marks the ParallelAppendState's pa_finished as true for each invalid
840 * subplan.
841 *
842 * This function should only be called for parallel Append with run-time
843 * pruning enabled.
844 */
845static void
847{
848 int i;
849
850 /* Only valid to call this while in parallel Append mode */
851 Assert(node->as_pstate);
852
853 /* Shouldn't have been called when run-time pruning is not enabled */
854 Assert(node->as_prune_state);
855
856 /* Nothing to do if all plans are valid */
857 if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
858 return;
859
860 /* Mark all non-valid plans as finished */
861 for (i = 0; i < node->as_nplans; i++)
862 {
863 if (!bms_is_member(i, node->as_valid_subplans))
864 node->as_pstate->pa_finished[i] = true;
865 }
866}
867
868/* ----------------------------------------------------------------
869 * Asynchronous Append Support
870 * ----------------------------------------------------------------
871 */
872
873/* ----------------------------------------------------------------
874 * ExecAppendAsyncBegin
875 *
876 * Begin executing designed async-capable subplans.
877 * ----------------------------------------------------------------
878 */
879static void
881{
882 int i;
883
884 /* Backward scan is not supported by async-aware Appends. */
886
887 /* We should never be called when there are no subplans */
888 Assert(node->as_nplans > 0);
889
890 /* We should never be called when there are no async subplans. */
891 Assert(node->as_nasyncplans > 0);
892
893 /* If we've yet to determine the valid subplans then do so now. */
895 {
896 node->as_valid_subplans =
898 node->as_valid_subplans_identified = true;
899
901 }
902
903 /* Initialize state variables. */
906
907 /* Nothing to do if there are no valid async subplans. */
908 if (node->as_nasyncremain == 0)
909 return;
910
911 /* Make a request for each of the valid async subplans. */
912 i = -1;
913 while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
914 {
916
917 Assert(areq->request_index == i);
918 Assert(!areq->callback_pending);
919
920 /* Do the actual work. */
922 }
923}
924
925/* ----------------------------------------------------------------
926 * ExecAppendAsyncGetNext
927 *
928 * Get the next tuple from any of the asynchronous subplans.
929 * ----------------------------------------------------------------
930 */
931static bool
933{
934 *result = NULL;
935
936 /* We should never be called when there are no valid async subplans. */
937 Assert(node->as_nasyncremain > 0);
938
939 /* Request a tuple asynchronously. */
941 return true;
942
943 while (node->as_nasyncremain > 0)
944 {
946
947 /* Wait or poll for async events. */
949
950 /* Request a tuple asynchronously. */
952 return true;
953
954 /* Break from loop if there's any sync subplan that isn't complete. */
955 if (!node->as_syncdone)
956 break;
957 }
958
959 /*
960 * If all sync subplans are complete, we're totally done scanning the
961 * given node. Otherwise, we're done with the asynchronous stuff but must
962 * continue scanning the sync subplans.
963 */
964 if (node->as_syncdone)
965 {
966 Assert(node->as_nasyncremain == 0);
968 return true;
969 }
970
971 return false;
972}
973
974/* ----------------------------------------------------------------
975 * ExecAppendAsyncRequest
976 *
977 * Request a tuple asynchronously.
978 * ----------------------------------------------------------------
979 */
980static bool
982{
984 int i;
985
986 /* Nothing to do if there are no async subplans needing a new request. */
987 if (bms_is_empty(node->as_needrequest))
988 {
989 Assert(node->as_nasyncresults == 0);
990 return false;
991 }
992
993 /*
994 * If there are any asynchronously-generated results that have not yet
995 * been returned, we have nothing to do; just return one of them.
996 */
997 if (node->as_nasyncresults > 0)
998 {
999 --node->as_nasyncresults;
1000 *result = node->as_asyncresults[node->as_nasyncresults];
1001 return true;
1002 }
1003
1004 /* Make a new request for each of the async subplans that need it. */
1006 node->as_needrequest = NULL;
1007 i = -1;
1008 while ((i = bms_next_member(needrequest, i)) >= 0)
1009 {
1011
1012 /* Do the actual work. */
1014 }
1016
1017 /* Return one of the asynchronously-generated results if any. */
1018 if (node->as_nasyncresults > 0)
1019 {
1020 --node->as_nasyncresults;
1021 *result = node->as_asyncresults[node->as_nasyncresults];
1022 return true;
1023 }
1024
1025 return false;
1026}
1027
1028/* ----------------------------------------------------------------
1029 * ExecAppendAsyncEventWait
1030 *
1031 * Wait or poll for file descriptor events and fire callbacks.
1032 * ----------------------------------------------------------------
1033 */
1034static void
1036{
1037 int nevents = node->as_nasyncplans + 2;
1038 long timeout = node->as_syncdone ? -1 : 0;
1040 int noccurred;
1041 int i;
1042
1043 /* We should never be called when there are no valid async subplans. */
1044 Assert(node->as_nasyncremain > 0);
1045
1046 Assert(node->as_eventset == NULL);
1049 NULL, NULL);
1050
1051 /* Give each waiting subplan a chance to add an event. */
1052 i = -1;
1053 while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1054 {
1056
1057 if (areq->callback_pending)
1059 }
1060
1061 /*
1062 * No need for further processing if none of the subplans configured any
1063 * events.
1064 */
1066 {
1068 node->as_eventset = NULL;
1069 return;
1070 }
1071
1072 /*
1073 * Add the process latch to the set, so that we wake up to process the
1074 * standard interrupts with CHECK_FOR_INTERRUPTS().
1075 *
1076 * NOTE: For historical reasons, it's important that this is added to the
1077 * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely,
1078 * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
1079 * any other events are in the set. That's a poor design, it's
1080 * questionable for postgres_fdw to be doing that in the first place, but
1081 * we cannot change it now. The pattern has possibly been copied to other
1082 * extensions too.
1083 */
1085 MyLatch, NULL);
1086
1087 /* Return at most EVENT_BUFFER_SIZE events in one call. */
1088 if (nevents > EVENT_BUFFER_SIZE)
1089 nevents = EVENT_BUFFER_SIZE;
1090
1091 /*
1092 * If the timeout is -1, wait until at least one event occurs. If the
1093 * timeout is 0, poll for events, but do not wait at all.
1094 */
1096 nevents, WAIT_EVENT_APPEND_READY);
1098 node->as_eventset = NULL;
1099 if (noccurred == 0)
1100 return;
1101
1102 /* Deliver notifications. */
1103 for (i = 0; i < noccurred; i++)
1104 {
1105 WaitEvent *w = &occurred_event[i];
1106
1107 /*
1108 * Each waiting subplan should have registered its wait event with
1109 * user_data pointing back to its AsyncRequest.
1110 */
1111 if ((w->events & WL_SOCKET_READABLE) != 0)
1112 {
1114
1115 if (areq->callback_pending)
1116 {
1117 /*
1118 * Mark it as no longer needing a callback. We must do this
1119 * before dispatching the callback in case the callback resets
1120 * the flag.
1121 */
1122 areq->callback_pending = false;
1123
1124 /* Do the actual work. */
1126 }
1127 }
1128
1129 /* Handle standard interrupts */
1130 if ((w->events & WL_LATCH_SET) != 0)
1131 {
1134 }
1135 }
1136}
1137
1138/* ----------------------------------------------------------------
1139 * ExecAsyncAppendResponse
1140 *
1141 * Receive a response from an asynchronous request we made.
1142 * ----------------------------------------------------------------
1143 */
1144void
1146{
1147 AppendState *node = (AppendState *) areq->requestor;
1148 TupleTableSlot *slot = areq->result;
1149
1150 /* The result should be a TupleTableSlot or NULL. */
1151 Assert(slot == NULL || IsA(slot, TupleTableSlot));
1152
1153 /* Nothing to do if the request is pending. */
1154 if (!areq->request_complete)
1155 {
1156 /* The request would have been pending for a callback. */
1157 Assert(areq->callback_pending);
1158 return;
1159 }
1160
1161 /* If the result is NULL or an empty slot, there's nothing more to do. */
1162 if (TupIsNull(slot))
1163 {
1164 /* The ending subplan wouldn't have been pending for a callback. */
1165 Assert(!areq->callback_pending);
1166 --node->as_nasyncremain;
1167 return;
1168 }
1169
1170 /* Save result so we can return it. */
1171 Assert(node->as_nasyncresults < node->as_nasyncplans);
1172 node->as_asyncresults[node->as_nasyncresults++] = slot;
1173
1174 /*
1175 * Mark the subplan that returned a result as ready for a new request. We
1176 * don't launch another one here immediately because it might complete.
1177 */
1179 areq->request_index);
1180}
1181
1182/* ----------------------------------------------------------------
1183 * classify_matching_subplans
1184 *
1185 * Classify the node's as_valid_subplans into sync ones and
1186 * async ones, adjust it to contain sync ones only, and save
1187 * async ones in the node's as_valid_asyncplans.
1188 * ----------------------------------------------------------------
1189 */
1190static void
1192{
1194
1197
1198 /* Nothing to do if there are no valid subplans. */
1200 {
1201 node->as_syncdone = true;
1202 node->as_nasyncremain = 0;
1203 return;
1204 }
1205
1206 /* Nothing to do if there are no valid async subplans. */
1207 if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1208 {
1209 node->as_nasyncremain = 0;
1210 return;
1211 }
1212
1213 /* Get valid async subplans. */
1215 node->as_valid_subplans);
1216
1217 /* Adjust the valid subplans to contain sync subplans only. */
1220
1221 /* Save valid async subplans. */
1223}
int bms_prev_member(const Bitmapset *a, int prevbit)
Definition bitmapset.c:1352
Bitmapset * bms_intersect(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:292
int bms_next_member(const Bitmapset *a, int prevbit)
Definition bitmapset.c:1290
Bitmapset * bms_del_members(Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:1145
Bitmapset * bms_add_range(Bitmapset *a, int lower, int upper)
Definition bitmapset.c:1003
void bms_free(Bitmapset *a)
Definition bitmapset.c:239
int bms_num_members(const Bitmapset *a)
Definition bitmapset.c:744
bool bms_is_member(int x, const Bitmapset *a)
Definition bitmapset.c:510
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition bitmapset.c:799
bool bms_overlap(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:575
#define bms_is_empty(a)
Definition bitmapset.h:118
#define Assert(condition)
Definition c.h:943
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:558
uint32 result
void ExecReScan(PlanState *node)
Definition execAmi.c:78
void ExecAsyncRequest(AsyncRequest *areq)
Definition execAsync.c:27
void ExecAsyncConfigureWait(AsyncRequest *areq)
Definition execAsync.c:63
void ExecAsyncNotify(AsyncRequest *areq)
Definition execAsync.c:89
PartitionPruneState * ExecInitPartitionExecPruning(PlanState *planstate, int n_total_subplans, int part_prune_index, Bitmapset *relids, Bitmapset **initially_valid_subplans)
Bitmapset * ExecFindMatchingSubPlans(PartitionPruneState *prunestate, bool initial_prune, Bitmapset **validsubplan_rtis)
void ExecEndNode(PlanState *node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
const TupleTableSlotOps TTSOpsVirtual
Definition execTuples.c:84
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps * ExecGetCommonSlotOps(PlanState **planstates, int nplans)
Definition execUtils.c:541
void UpdateChangedParamSet(PlanState *node, Bitmapset *newchg)
Definition execUtils.c:936
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition executor.h:322
#define EXEC_FLAG_MARK
Definition executor.h:71
#define palloc_object(type)
Definition fe_memutils.h:89
struct Latch * MyLatch
Definition globals.c:65
int j
Definition isn.c:78
int i
Definition isn.c:77
void ResetLatch(Latch *latch)
Definition latch.c:374
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:670
@ LW_EXCLUSIVE
Definition lwlock.h:104
Size add_size(Size s1, Size s2)
Definition mcxt.c:1733
void * palloc0(Size size)
Definition mcxt.c:1420
void * palloc(Size size)
Definition mcxt.c:1390
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
void ExecEndAppend(AppendState *node)
Definition nodeAppend.c:404
static void ExecAppendAsyncBegin(AppendState *node)
Definition nodeAppend.c:880
static void ExecAppendAsyncEventWait(AppendState *node)
void ExecReScanAppend(AppendState *node)
Definition nodeAppend.c:424
static void classify_matching_subplans(AppendState *node)
static void mark_invalid_subplans_as_finished(AppendState *node)
Definition nodeAppend.c:846
static TupleTableSlot * ExecAppend(PlanState *pstate)
Definition nodeAppend.c:306
static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
Definition nodeAppend.c:981
static bool choose_next_subplan_for_leader(AppendState *node)
Definition nodeAppend.c:638
static bool choose_next_subplan_for_worker(AppendState *node)
Definition nodeAppend.c:720
void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
Definition nodeAppend.c:542
#define INVALID_SUBPLAN_INDEX
Definition nodeAppend.c:86
void ExecAsyncAppendResponse(AsyncRequest *areq)
#define EVENT_BUFFER_SIZE
Definition nodeAppend.c:87
AppendState * ExecInitAppend(Append *node, EState *estate, int eflags)
Definition nodeAppend.c:112
static bool choose_next_subplan_locally(AppendState *node)
Definition nodeAppend.c:572
void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
Definition nodeAppend.c:558
void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt)
Definition nodeAppend.c:521
void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt)
Definition nodeAppend.c:502
static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
Definition nodeAppend.c:932
#define IsA(nodeptr, _type_)
Definition nodes.h:164
#define makeNode(_type_)
Definition nodes.h:161
#define castNode(_type_, nodeptr)
Definition nodes.h:182
static int list_length(const List *l)
Definition pg_list.h:152
static void * list_nth(const List *list, int n)
Definition pg_list.h:331
#define PGINVALID_SOCKET
Definition port.h:31
static int fb(int x)
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
#define ScanDirectionIsForward(direction)
Definition sdir.h:64
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition shm_toc.c:88
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition shm_toc.c:171
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition shm_toc.c:239
#define shm_toc_estimate_chunk(e, sz)
Definition shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition shm_toc.h:53
struct PartitionPruneState * as_prune_state
Definition execnodes.h:1554
Bitmapset * as_valid_asyncplans
Definition execnodes.h:1557
Bitmapset * as_needrequest
Definition execnodes.h:1547
bool as_syncdone
Definition execnodes.h:1544
AsyncRequest ** as_asyncrequests
Definition execnodes.h:1541
Bitmapset * as_asyncplans
Definition execnodes.h:1539
int as_nasyncresults
Definition execnodes.h:1543
struct WaitEventSet * as_eventset
Definition execnodes.h:1548
bool(* choose_next_subplan)(AppendState *)
Definition execnodes.h:1558
PlanState ** appendplans
Definition execnodes.h:1535
int as_first_partial_plan
Definition execnodes.h:1550
PlanState ps
Definition execnodes.h:1534
int as_nasyncremain
Definition execnodes.h:1546
ParallelAppendState * as_pstate
Definition execnodes.h:1552
Bitmapset * as_valid_subplans
Definition execnodes.h:1556
Size pstate_len
Definition execnodes.h:1553
TupleTableSlot ** as_asyncresults
Definition execnodes.h:1542
int as_nasyncplans
Definition execnodes.h:1540
bool as_valid_subplans_identified
Definition execnodes.h:1555
int first_partial_plan
Definition plannodes.h:418
int part_prune_index
Definition plannodes.h:425
Bitmapset * apprelids
Definition plannodes.h:403
List * appendplans
Definition plannodes.h:409
bool callback_pending
Definition execnodes.h:678
ScanDirection es_direction
Definition execnodes.h:695
struct EPQState * es_epq_active
Definition execnodes.h:778
bool pa_finished[FLEXIBLE_ARRAY_MEMBER]
Definition nodeAppend.c:83
shm_toc_estimator estimator
Definition parallel.h:43
shm_toc * toc
Definition parallel.h:46
Bitmapset * execparamids
Plan * plan
Definition execnodes.h:1201
EState * state
Definition execnodes.h:1203
Bitmapset * chgParam
Definition execnodes.h:1235
TupleTableSlot * ps_ResultTupleSlot
Definition execnodes.h:1241
int plan_node_id
Definition plannodes.h:233
void * user_data
uint32 events
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:476
#define TupIsNull(slot)
Definition tuptable.h:325
int GetNumRegisteredWaitEvents(WaitEventSet *set)
int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
void FreeWaitEventSet(WaitEventSet *set)
WaitEventSet * CreateWaitEventSet(ResourceOwner resowner, int nevents)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET