PostgreSQL Source Code git master
Loading...
Searching...
No Matches
nodeAppend.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * nodeAppend.c
4 * routines to handle append nodes.
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/executor/nodeAppend.c
12 *
13 *-------------------------------------------------------------------------
14 */
15/* INTERFACE ROUTINES
16 * ExecInitAppend - initialize the append node
17 * ExecAppend - retrieve the next tuple from the node
18 * ExecEndAppend - shut down the append node
19 * ExecReScanAppend - rescan the append node
20 *
21 * NOTES
22 * Each append node contains a list of one or more subplans which
23 * must be iteratively processed (forwards or backwards).
24 * Tuples are retrieved by executing the 'whichplan'th subplan
25 * until the subplan stops returning tuples, at which point that
26 * plan is shut down and the next started up.
27 *
28 * Append nodes don't make use of their left and right
29 * subtrees, rather they maintain a list of subplans so
30 * a typical append node looks like this in the plan tree:
31 *
32 * ...
33 * /
34 * Append -------+------+------+--- nil
35 * / \ | | |
36 * nil nil ... ... ...
37 * subplans
38 *
39 * Append nodes are currently used for unions, and to support
40 * inheritance queries, where several relations need to be scanned.
41 * For example, in our standard person/student/employee/student-emp
42 * example, where student and employee inherit from person
43 * and student-emp inherits from student and employee, the
44 * query:
45 *
46 * select name from person
47 *
48 * generates the plan:
49 *
50 * |
51 * Append -------+-------+--------+--------+
52 * / \ | | | |
53 * nil nil Scan Scan Scan Scan
54 * | | | |
55 * person employee student student-emp
56 */
57
58#include "postgres.h"
59
60#include "executor/execAsync.h"
62#include "executor/executor.h"
63#include "executor/nodeAppend.h"
64#include "miscadmin.h"
65#include "pgstat.h"
66#include "storage/latch.h"
67#include "storage/lwlock.h"
68
69/* Shared state for parallel-aware Append. */
71{
72 LWLock pa_lock; /* mutual exclusion to choose next subplan */
73 int pa_next_plan; /* next plan to choose by any worker */
74
75 /*
76 * pa_finished[i] should be true if no more workers should select subplan
77 * i. for a non-partial plan, this should be set to true as soon as a
78 * worker selects the plan; for a partial plan, it remains false until
79 * some worker executes the plan to completion.
80 */
82};
83
84#define INVALID_SUBPLAN_INDEX -1
85#define EVENT_BUFFER_SIZE 16
86
87static TupleTableSlot *ExecAppend(PlanState *pstate);
92static void ExecAppendAsyncBegin(AppendState *node);
93static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
94static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
95static void ExecAppendAsyncEventWait(AppendState *node);
97
98/* ----------------------------------------------------------------
99 * ExecInitAppend
100 *
101 * Begin all of the subscans of the append node.
102 *
103 * (This is potentially wasteful, since the entire result of the
104 * append node may not be scanned, but this way all of the
105 * structures get allocated in the executor's top level memory
106 * block instead of that of the call to ExecAppend.)
107 * ----------------------------------------------------------------
108 */
110ExecInitAppend(Append *node, EState *estate, int eflags)
111{
117 int nplans;
118 int nasyncplans;
119 int firstvalid;
120 int i,
121 j;
122
123 /* check for unsupported flags */
124 Assert(!(eflags & EXEC_FLAG_MARK));
125
126 /*
127 * create new AppendState for our append node
128 */
129 appendstate->ps.plan = (Plan *) node;
130 appendstate->ps.state = estate;
131 appendstate->ps.ExecProcNode = ExecAppend;
132
133 /* Let choose_next_subplan_* function handle setting the first subplan */
134 appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
135 appendstate->as_syncdone = false;
136 appendstate->as_begun = false;
137
138 /* If run-time partition pruning is enabled, then set that up now */
139 if (node->part_prune_index >= 0)
140 {
142
143 /*
144 * Set up pruning data structure. This also initializes the set of
145 * subplans to initialize (validsubplans) by taking into account the
146 * result of performing initial pruning if any.
147 */
150 node->part_prune_index,
151 node->apprelids,
153 appendstate->as_prune_state = prunestate;
155
156 /*
157 * When no run-time pruning is required and there's at least one
158 * subplan, we can fill as_valid_subplans immediately, preventing
159 * later calls to ExecFindMatchingSubPlans.
160 */
161 if (!prunestate->do_exec_prune && nplans > 0)
162 {
163 appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
164 appendstate->as_valid_subplans_identified = true;
165 }
166 }
167 else
168 {
169 nplans = list_length(node->appendplans);
170
171 /*
172 * When run-time partition pruning is not enabled we can just mark all
173 * subplans as valid; they must also all be initialized.
174 */
175 Assert(nplans > 0);
176 appendstate->as_valid_subplans = validsubplans =
177 bms_add_range(NULL, 0, nplans - 1);
178 appendstate->as_valid_subplans_identified = true;
179 appendstate->as_prune_state = NULL;
180 }
181
182 appendplanstates = (PlanState **) palloc(nplans *
183 sizeof(PlanState *));
184
185 /*
186 * call ExecInitNode on each of the valid plans to be executed and save
187 * the results into the appendplanstates array.
188 *
189 * While at it, find out the first valid partial plan.
190 */
191 j = 0;
193 nasyncplans = 0;
194 firstvalid = nplans;
195 i = -1;
196 while ((i = bms_next_member(validsubplans, i)) >= 0)
197 {
198 Plan *initNode = (Plan *) list_nth(node->appendplans, i);
199
200 /*
201 * Record async subplans. When executing EvalPlanQual, we treat them
202 * as sync ones; don't do this when initializing an EvalPlanQual plan
203 * tree.
204 */
205 if (initNode->async_capable && estate->es_epq_active == NULL)
206 {
208 nasyncplans++;
209 }
210
211 /*
212 * Record the lowest appendplans index which is a valid partial plan.
213 */
214 if (i >= node->first_partial_plan && j < firstvalid)
215 firstvalid = j;
216
217 appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
218 }
219
220 appendstate->as_first_partial_plan = firstvalid;
221 appendstate->appendplans = appendplanstates;
222 appendstate->as_nplans = nplans;
223
224 /*
225 * Initialize Append's result tuple type and slot. If the child plans all
226 * produce the same fixed slot type, we can use that slot type; otherwise
227 * make a virtual slot. (Note that the result slot itself is used only to
228 * return a null tuple at end of execution; real tuples are returned to
229 * the caller in the children's own result slots. What we are doing here
230 * is allowing the parent plan node to optimize if the Append will return
231 * only one kind of slot.)
232 */
234 if (appendops != NULL)
235 {
237 }
238 else
239 {
241 /* show that the output slot type is not fixed */
242 appendstate->ps.resultopsset = true;
243 appendstate->ps.resultopsfixed = false;
244 }
245
246 /* Initialize async state */
247 appendstate->as_asyncplans = asyncplans;
248 appendstate->as_nasyncplans = nasyncplans;
249 appendstate->as_asyncrequests = NULL;
250 appendstate->as_asyncresults = NULL;
251 appendstate->as_nasyncresults = 0;
252 appendstate->as_nasyncremain = 0;
253 appendstate->as_needrequest = NULL;
254 appendstate->as_eventset = NULL;
255 appendstate->as_valid_asyncplans = NULL;
256
257 if (nasyncplans > 0)
258 {
259 appendstate->as_asyncrequests = (AsyncRequest **)
260 palloc0(nplans * sizeof(AsyncRequest *));
261
262 i = -1;
263 while ((i = bms_next_member(asyncplans, i)) >= 0)
264 {
266
268 areq->requestor = (PlanState *) appendstate;
269 areq->requestee = appendplanstates[i];
270 areq->request_index = i;
271 areq->callback_pending = false;
272 areq->request_complete = false;
273 areq->result = NULL;
274
275 appendstate->as_asyncrequests[i] = areq;
276 }
277
278 appendstate->as_asyncresults = (TupleTableSlot **)
279 palloc0(nasyncplans * sizeof(TupleTableSlot *));
280
281 if (appendstate->as_valid_subplans_identified)
283 }
284
285 /*
286 * Miscellaneous initialization
287 */
288
289 appendstate->ps.ps_ProjInfo = NULL;
290
291 /* For parallel query, this will be overridden later. */
292 appendstate->choose_next_subplan = choose_next_subplan_locally;
293
294 return appendstate;
295}
296
297/* ----------------------------------------------------------------
298 * ExecAppend
299 *
300 * Handles iteration over multiple subplans.
301 * ----------------------------------------------------------------
302 */
303static TupleTableSlot *
305{
306 AppendState *node = castNode(AppendState, pstate);
307 TupleTableSlot *result;
308
309 /*
310 * If this is the first call after Init or ReScan, we need to do the
311 * initialization work.
312 */
313 if (!node->as_begun)
314 {
316 Assert(!node->as_syncdone);
317
318 /* Nothing to do if there are no subplans */
319 if (node->as_nplans == 0)
321
322 /* If there are any async subplans, begin executing them. */
323 if (node->as_nasyncplans > 0)
325
326 /*
327 * If no sync subplan has been chosen, we must choose one before
328 * proceeding.
329 */
330 if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
332
333 Assert(node->as_syncdone ||
334 (node->as_whichplan >= 0 &&
335 node->as_whichplan < node->as_nplans));
336
337 /* And we're initialized. */
338 node->as_begun = true;
339 }
340
341 for (;;)
342 {
344
346
347 /*
348 * try to get a tuple from an async subplan if any
349 */
350 if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
351 {
352 if (ExecAppendAsyncGetNext(node, &result))
353 return result;
354 Assert(!node->as_syncdone);
356 }
357
358 /*
359 * figure out which sync subplan we are currently processing
360 */
361 Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
362 subnode = node->appendplans[node->as_whichplan];
363
364 /*
365 * get a tuple from the subplan
366 */
367 result = ExecProcNode(subnode);
368
369 if (!TupIsNull(result))
370 {
371 /*
372 * If the subplan gave us something then return it as-is. We do
373 * NOT make use of the result slot that was set up in
374 * ExecInitAppend; there's no need for it.
375 */
376 return result;
377 }
378
379 /*
380 * wait or poll for async events if any. We do this before checking
381 * for the end of iteration, because it might drain the remaining
382 * async subplans.
383 */
384 if (node->as_nasyncremain > 0)
386
387 /* choose new sync subplan; if no sync/async subplans, we're done */
388 if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
390 }
391}
392
393/* ----------------------------------------------------------------
394 * ExecEndAppend
395 *
396 * Shuts down the subscans of the append node.
397 *
398 * Returns nothing of interest.
399 * ----------------------------------------------------------------
400 */
401void
403{
404 PlanState **appendplans;
405 int nplans;
406 int i;
407
408 /*
409 * get information from the node
410 */
411 appendplans = node->appendplans;
412 nplans = node->as_nplans;
413
414 /*
415 * shut down each of the subscans
416 */
417 for (i = 0; i < nplans; i++)
418 ExecEndNode(appendplans[i]);
419}
420
421void
423{
424 int nasyncplans = node->as_nasyncplans;
425 int i;
426
427 /*
428 * If any PARAM_EXEC Params used in pruning expressions have changed, then
429 * we'd better unset the valid subplans so that they are reselected for
430 * the new parameter values.
431 */
432 if (node->as_prune_state &&
433 bms_overlap(node->ps.chgParam,
435 {
436 node->as_valid_subplans_identified = false;
438 node->as_valid_subplans = NULL;
441 }
442
443 for (i = 0; i < node->as_nplans; i++)
444 {
445 PlanState *subnode = node->appendplans[i];
446
447 /*
448 * ExecReScan doesn't know about my subplans, so I have to do
449 * changed-parameter signaling myself.
450 */
451 if (node->ps.chgParam != NULL)
453
454 /*
455 * If chgParam of subnode is not null then plan will be re-scanned by
456 * first ExecProcNode or by first ExecAsyncRequest.
457 */
458 if (subnode->chgParam == NULL)
460 }
461
462 /* Reset async state */
463 if (nasyncplans > 0)
464 {
465 i = -1;
466 while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
467 {
469
470 areq->callback_pending = false;
471 areq->request_complete = false;
472 areq->result = NULL;
473 }
474
475 node->as_nasyncresults = 0;
476 node->as_nasyncremain = 0;
478 node->as_needrequest = NULL;
479 }
480
481 /* Let choose_next_subplan_* function handle setting the first subplan */
483 node->as_syncdone = false;
484 node->as_begun = false;
485}
486
487/* ----------------------------------------------------------------
488 * Parallel Append Support
489 * ----------------------------------------------------------------
490 */
491
492/* ----------------------------------------------------------------
493 * ExecAppendEstimate
494 *
495 * Compute the amount of space we'll need in the parallel
496 * query DSM, and inform pcxt->estimator about our needs.
497 * ----------------------------------------------------------------
498 */
499void
501 ParallelContext *pcxt)
502{
503 node->pstate_len =
505 sizeof(bool) * node->as_nplans);
506
509}
510
511
512/* ----------------------------------------------------------------
513 * ExecAppendInitializeDSM
514 *
515 * Set up shared state for Parallel Append.
516 * ----------------------------------------------------------------
517 */
518void
520 ParallelContext *pcxt)
521{
522 ParallelAppendState *pstate;
523
524 pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
525 memset(pstate, 0, node->pstate_len);
527 shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
528
529 node->as_pstate = pstate;
531}
532
533/* ----------------------------------------------------------------
534 * ExecAppendReInitializeDSM
535 *
536 * Reset shared state before beginning a fresh scan.
537 * ----------------------------------------------------------------
538 */
539void
541{
542 ParallelAppendState *pstate = node->as_pstate;
543
544 pstate->pa_next_plan = 0;
545 memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
546}
547
548/* ----------------------------------------------------------------
549 * ExecAppendInitializeWorker
550 *
551 * Copy relevant information from TOC into planstate, and initialize
552 * whatever is required to choose and execute the optimal subplan.
553 * ----------------------------------------------------------------
554 */
555void
561
562/* ----------------------------------------------------------------
563 * choose_next_subplan_locally
564 *
565 * Choose next sync subplan for a non-parallel-aware Append,
566 * returning false if there are no more.
567 * ----------------------------------------------------------------
568 */
569static bool
571{
572 int whichplan = node->as_whichplan;
573 int nextplan;
574
575 /* We should never be called when there are no subplans */
576 Assert(node->as_nplans > 0);
577
578 /* Nothing to do if syncdone */
579 if (node->as_syncdone)
580 return false;
581
582 /*
583 * If first call then have the bms member function choose the first valid
584 * sync subplan by initializing whichplan to -1. If there happen to be no
585 * valid sync subplans then the bms member function will handle that by
586 * returning a negative number which will allow us to exit returning a
587 * false value.
588 */
590 {
591 if (node->as_nasyncplans > 0)
592 {
593 /* We'd have filled as_valid_subplans already */
595 }
596 else if (!node->as_valid_subplans_identified)
597 {
598 node->as_valid_subplans =
600 node->as_valid_subplans_identified = true;
601 }
602
603 whichplan = -1;
604 }
605
606 /* Ensure whichplan is within the expected range */
607 Assert(whichplan >= -1 && whichplan <= node->as_nplans);
608
611 else
613
614 if (nextplan < 0)
615 {
616 /* Set as_syncdone if in async mode */
617 if (node->as_nasyncplans > 0)
618 node->as_syncdone = true;
619 return false;
620 }
621
622 node->as_whichplan = nextplan;
623
624 return true;
625}
626
627/* ----------------------------------------------------------------
628 * choose_next_subplan_for_leader
629 *
630 * Try to pick a plan which doesn't commit us to doing much
631 * work locally, so that as much work as possible is done in
632 * the workers. Cheapest subplans are at the end.
633 * ----------------------------------------------------------------
634 */
635static bool
637{
638 ParallelAppendState *pstate = node->as_pstate;
639
640 /* Backward scan is not supported by parallel-aware plans */
642
643 /* We should never be called when there are no subplans */
644 Assert(node->as_nplans > 0);
645
647
649 {
650 /* Mark just-completed subplan as finished. */
651 node->as_pstate->pa_finished[node->as_whichplan] = true;
652 }
653 else
654 {
655 /* Start with last subplan. */
656 node->as_whichplan = node->as_nplans - 1;
657
658 /*
659 * If we've yet to determine the valid subplans then do so now. If
660 * run-time pruning is disabled then the valid subplans will always be
661 * set to all subplans.
662 */
664 {
665 node->as_valid_subplans =
667 node->as_valid_subplans_identified = true;
668
669 /*
670 * Mark each invalid plan as finished to allow the loop below to
671 * select the first valid subplan.
672 */
674 }
675 }
676
677 /* Loop until we find a subplan to execute. */
678 while (pstate->pa_finished[node->as_whichplan])
679 {
680 if (node->as_whichplan == 0)
681 {
684 LWLockRelease(&pstate->pa_lock);
685 return false;
686 }
687
688 /*
689 * We needn't pay attention to as_valid_subplans here as all invalid
690 * plans have been marked as finished.
691 */
692 node->as_whichplan--;
693 }
694
695 /* If non-partial, immediately mark as finished. */
696 if (node->as_whichplan < node->as_first_partial_plan)
697 node->as_pstate->pa_finished[node->as_whichplan] = true;
698
699 LWLockRelease(&pstate->pa_lock);
700
701 return true;
702}
703
704/* ----------------------------------------------------------------
705 * choose_next_subplan_for_worker
706 *
707 * Choose next subplan for a parallel-aware Append, returning
708 * false if there are no more.
709 *
710 * We start from the first plan and advance through the list;
711 * when we get back to the end, we loop back to the first
712 * partial plan. This assigns the non-partial plans first in
713 * order of descending cost and then spreads out the workers
714 * as evenly as possible across the remaining partial plans.
715 * ----------------------------------------------------------------
716 */
717static bool
719{
720 ParallelAppendState *pstate = node->as_pstate;
721
722 /* Backward scan is not supported by parallel-aware plans */
724
725 /* We should never be called when there are no subplans */
726 Assert(node->as_nplans > 0);
727
729
730 /* Mark just-completed subplan as finished. */
732 node->as_pstate->pa_finished[node->as_whichplan] = true;
733
734 /*
735 * If we've yet to determine the valid subplans then do so now. If
736 * run-time pruning is disabled then the valid subplans will always be set
737 * to all subplans.
738 */
739 else if (!node->as_valid_subplans_identified)
740 {
741 node->as_valid_subplans =
743 node->as_valid_subplans_identified = true;
744
746 }
747
748 /* If all the plans are already done, we have nothing to do */
749 if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
750 {
751 LWLockRelease(&pstate->pa_lock);
752 return false;
753 }
754
755 /* Save the plan from which we are starting the search. */
756 node->as_whichplan = pstate->pa_next_plan;
757
758 /* Loop until we find a valid subplan to execute. */
759 while (pstate->pa_finished[pstate->pa_next_plan])
760 {
761 int nextplan;
762
764 pstate->pa_next_plan);
765 if (nextplan >= 0)
766 {
767 /* Advance to the next valid plan. */
768 pstate->pa_next_plan = nextplan;
769 }
770 else if (node->as_whichplan > node->as_first_partial_plan)
771 {
772 /*
773 * Try looping back to the first valid partial plan, if there is
774 * one. If there isn't, arrange to bail out below.
775 */
777 node->as_first_partial_plan - 1);
778 pstate->pa_next_plan =
779 nextplan < 0 ? node->as_whichplan : nextplan;
780 }
781 else
782 {
783 /*
784 * At last plan, and either there are no partial plans or we've
785 * tried them all. Arrange to bail out.
786 */
787 pstate->pa_next_plan = node->as_whichplan;
788 }
789
790 if (pstate->pa_next_plan == node->as_whichplan)
791 {
792 /* We've tried everything! */
794 LWLockRelease(&pstate->pa_lock);
795 return false;
796 }
797 }
798
799 /* Pick the plan we found, and advance pa_next_plan one more time. */
800 node->as_whichplan = pstate->pa_next_plan;
802 pstate->pa_next_plan);
803
804 /*
805 * If there are no more valid plans then try setting the next plan to the
806 * first valid partial plan.
807 */
808 if (pstate->pa_next_plan < 0)
809 {
811 node->as_first_partial_plan - 1);
812
813 if (nextplan >= 0)
814 pstate->pa_next_plan = nextplan;
815 else
816 {
817 /*
818 * There are no valid partial plans, and we already chose the last
819 * non-partial plan; so flag that there's nothing more for our
820 * fellow workers to do.
821 */
823 }
824 }
825
826 /* If non-partial, immediately mark as finished. */
827 if (node->as_whichplan < node->as_first_partial_plan)
828 node->as_pstate->pa_finished[node->as_whichplan] = true;
829
830 LWLockRelease(&pstate->pa_lock);
831
832 return true;
833}
834
835/*
836 * mark_invalid_subplans_as_finished
837 * Marks the ParallelAppendState's pa_finished as true for each invalid
838 * subplan.
839 *
840 * This function should only be called for parallel Append with run-time
841 * pruning enabled.
842 */
843static void
845{
846 int i;
847
848 /* Only valid to call this while in parallel Append mode */
849 Assert(node->as_pstate);
850
851 /* Shouldn't have been called when run-time pruning is not enabled */
852 Assert(node->as_prune_state);
853
854 /* Nothing to do if all plans are valid */
855 if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
856 return;
857
858 /* Mark all non-valid plans as finished */
859 for (i = 0; i < node->as_nplans; i++)
860 {
861 if (!bms_is_member(i, node->as_valid_subplans))
862 node->as_pstate->pa_finished[i] = true;
863 }
864}
865
866/* ----------------------------------------------------------------
867 * Asynchronous Append Support
868 * ----------------------------------------------------------------
869 */
870
871/* ----------------------------------------------------------------
872 * ExecAppendAsyncBegin
873 *
874 * Begin executing designed async-capable subplans.
875 * ----------------------------------------------------------------
876 */
877static void
879{
880 int i;
881
882 /* Backward scan is not supported by async-aware Appends. */
884
885 /* We should never be called when there are no subplans */
886 Assert(node->as_nplans > 0);
887
888 /* We should never be called when there are no async subplans. */
889 Assert(node->as_nasyncplans > 0);
890
891 /* If we've yet to determine the valid subplans then do so now. */
893 {
894 node->as_valid_subplans =
896 node->as_valid_subplans_identified = true;
897
899 }
900
901 /* Initialize state variables. */
904
905 /* Nothing to do if there are no valid async subplans. */
906 if (node->as_nasyncremain == 0)
907 return;
908
909 /* Make a request for each of the valid async subplans. */
910 i = -1;
911 while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
912 {
914
915 Assert(areq->request_index == i);
916 Assert(!areq->callback_pending);
917
918 /* Do the actual work. */
920 }
921}
922
923/* ----------------------------------------------------------------
924 * ExecAppendAsyncGetNext
925 *
926 * Get the next tuple from any of the asynchronous subplans.
927 * ----------------------------------------------------------------
928 */
929static bool
931{
932 *result = NULL;
933
934 /* We should never be called when there are no valid async subplans. */
935 Assert(node->as_nasyncremain > 0);
936
937 /* Request a tuple asynchronously. */
938 if (ExecAppendAsyncRequest(node, result))
939 return true;
940
941 while (node->as_nasyncremain > 0)
942 {
944
945 /* Wait or poll for async events. */
947
948 /* Request a tuple asynchronously. */
949 if (ExecAppendAsyncRequest(node, result))
950 return true;
951
952 /* Break from loop if there's any sync subplan that isn't complete. */
953 if (!node->as_syncdone)
954 break;
955 }
956
957 /*
958 * If all sync subplans are complete, we're totally done scanning the
959 * given node. Otherwise, we're done with the asynchronous stuff but must
960 * continue scanning the sync subplans.
961 */
962 if (node->as_syncdone)
963 {
964 Assert(node->as_nasyncremain == 0);
965 *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
966 return true;
967 }
968
969 return false;
970}
971
972/* ----------------------------------------------------------------
973 * ExecAppendAsyncRequest
974 *
975 * Request a tuple asynchronously.
976 * ----------------------------------------------------------------
977 */
978static bool
980{
982 int i;
983
984 /* Nothing to do if there are no async subplans needing a new request. */
985 if (bms_is_empty(node->as_needrequest))
986 {
987 Assert(node->as_nasyncresults == 0);
988 return false;
989 }
990
991 /*
992 * If there are any asynchronously-generated results that have not yet
993 * been returned, we have nothing to do; just return one of them.
994 */
995 if (node->as_nasyncresults > 0)
996 {
997 --node->as_nasyncresults;
998 *result = node->as_asyncresults[node->as_nasyncresults];
999 return true;
1000 }
1001
1002 /* Make a new request for each of the async subplans that need it. */
1004 node->as_needrequest = NULL;
1005 i = -1;
1006 while ((i = bms_next_member(needrequest, i)) >= 0)
1007 {
1009
1010 /* Do the actual work. */
1012 }
1014
1015 /* Return one of the asynchronously-generated results if any. */
1016 if (node->as_nasyncresults > 0)
1017 {
1018 --node->as_nasyncresults;
1019 *result = node->as_asyncresults[node->as_nasyncresults];
1020 return true;
1021 }
1022
1023 return false;
1024}
1025
1026/* ----------------------------------------------------------------
1027 * ExecAppendAsyncEventWait
1028 *
1029 * Wait or poll for file descriptor events and fire callbacks.
1030 * ----------------------------------------------------------------
1031 */
1032static void
1034{
1035 int nevents = node->as_nasyncplans + 2;
1036 long timeout = node->as_syncdone ? -1 : 0;
1038 int noccurred;
1039 int i;
1040
1041 /* We should never be called when there are no valid async subplans. */
1042 Assert(node->as_nasyncremain > 0);
1043
1044 Assert(node->as_eventset == NULL);
1047 NULL, NULL);
1048
1049 /* Give each waiting subplan a chance to add an event. */
1050 i = -1;
1051 while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1052 {
1054
1055 if (areq->callback_pending)
1057 }
1058
1059 /*
1060 * No need for further processing if none of the subplans configured any
1061 * events.
1062 */
1064 {
1066 node->as_eventset = NULL;
1067 return;
1068 }
1069
1070 /*
1071 * Add the process latch to the set, so that we wake up to process the
1072 * standard interrupts with CHECK_FOR_INTERRUPTS().
1073 *
1074 * NOTE: For historical reasons, it's important that this is added to the
1075 * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely,
1076 * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
1077 * any other events are in the set. That's a poor design, it's
1078 * questionable for postgres_fdw to be doing that in the first place, but
1079 * we cannot change it now. The pattern has possibly been copied to other
1080 * extensions too.
1081 */
1083 MyLatch, NULL);
1084
1085 /* Return at most EVENT_BUFFER_SIZE events in one call. */
1086 if (nevents > EVENT_BUFFER_SIZE)
1087 nevents = EVENT_BUFFER_SIZE;
1088
1089 /*
1090 * If the timeout is -1, wait until at least one event occurs. If the
1091 * timeout is 0, poll for events, but do not wait at all.
1092 */
1094 nevents, WAIT_EVENT_APPEND_READY);
1096 node->as_eventset = NULL;
1097 if (noccurred == 0)
1098 return;
1099
1100 /* Deliver notifications. */
1101 for (i = 0; i < noccurred; i++)
1102 {
1103 WaitEvent *w = &occurred_event[i];
1104
1105 /*
1106 * Each waiting subplan should have registered its wait event with
1107 * user_data pointing back to its AsyncRequest.
1108 */
1109 if ((w->events & WL_SOCKET_READABLE) != 0)
1110 {
1112
1113 if (areq->callback_pending)
1114 {
1115 /*
1116 * Mark it as no longer needing a callback. We must do this
1117 * before dispatching the callback in case the callback resets
1118 * the flag.
1119 */
1120 areq->callback_pending = false;
1121
1122 /* Do the actual work. */
1124 }
1125 }
1126
1127 /* Handle standard interrupts */
1128 if ((w->events & WL_LATCH_SET) != 0)
1129 {
1132 }
1133 }
1134}
1135
1136/* ----------------------------------------------------------------
1137 * ExecAsyncAppendResponse
1138 *
1139 * Receive a response from an asynchronous request we made.
1140 * ----------------------------------------------------------------
1141 */
1142void
1144{
1145 AppendState *node = (AppendState *) areq->requestor;
1146 TupleTableSlot *slot = areq->result;
1147
1148 /* The result should be a TupleTableSlot or NULL. */
1149 Assert(slot == NULL || IsA(slot, TupleTableSlot));
1150
1151 /* Nothing to do if the request is pending. */
1152 if (!areq->request_complete)
1153 {
1154 /* The request would have been pending for a callback. */
1155 Assert(areq->callback_pending);
1156 return;
1157 }
1158
1159 /* If the result is NULL or an empty slot, there's nothing more to do. */
1160 if (TupIsNull(slot))
1161 {
1162 /* The ending subplan wouldn't have been pending for a callback. */
1163 Assert(!areq->callback_pending);
1164 --node->as_nasyncremain;
1165 return;
1166 }
1167
1168 /* Save result so we can return it. */
1169 Assert(node->as_nasyncresults < node->as_nasyncplans);
1170 node->as_asyncresults[node->as_nasyncresults++] = slot;
1171
1172 /*
1173 * Mark the subplan that returned a result as ready for a new request. We
1174 * don't launch another one here immediately because it might complete.
1175 */
1177 areq->request_index);
1178}
1179
1180/* ----------------------------------------------------------------
1181 * classify_matching_subplans
1182 *
1183 * Classify the node's as_valid_subplans into sync ones and
1184 * async ones, adjust it to contain sync ones only, and save
1185 * async ones in the node's as_valid_asyncplans.
1186 * ----------------------------------------------------------------
1187 */
1188static void
1190{
1192
1195
1196 /* Nothing to do if there are no valid subplans. */
1198 {
1199 node->as_syncdone = true;
1200 node->as_nasyncremain = 0;
1201 return;
1202 }
1203
1204 /* Nothing to do if there are no valid async subplans. */
1205 if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1206 {
1207 node->as_nasyncremain = 0;
1208 return;
1209 }
1210
1211 /* Get valid async subplans. */
1213 node->as_valid_subplans);
1214
1215 /* Adjust the valid subplans to contain sync subplans only. */
1218
1219 /* Save valid async subplans. */
1221}
int bms_prev_member(const Bitmapset *a, int prevbit)
Definition bitmapset.c:1350
Bitmapset * bms_intersect(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:292
int bms_next_member(const Bitmapset *a, int prevbit)
Definition bitmapset.c:1290
Bitmapset * bms_del_members(Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:1145
Bitmapset * bms_add_range(Bitmapset *a, int lower, int upper)
Definition bitmapset.c:1003
void bms_free(Bitmapset *a)
Definition bitmapset.c:239
int bms_num_members(const Bitmapset *a)
Definition bitmapset.c:744
bool bms_is_member(int x, const Bitmapset *a)
Definition bitmapset.c:510
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition bitmapset.c:799
bool bms_overlap(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:575
#define bms_is_empty(a)
Definition bitmapset.h:118
#define Assert(condition)
Definition c.h:885
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:492
void ExecReScan(PlanState *node)
Definition execAmi.c:77
void ExecAsyncRequest(AsyncRequest *areq)
Definition execAsync.c:26
void ExecAsyncConfigureWait(AsyncRequest *areq)
Definition execAsync.c:62
void ExecAsyncNotify(AsyncRequest *areq)
Definition execAsync.c:88
PartitionPruneState * ExecInitPartitionExecPruning(PlanState *planstate, int n_total_subplans, int part_prune_index, Bitmapset *relids, Bitmapset **initially_valid_subplans)
Bitmapset * ExecFindMatchingSubPlans(PartitionPruneState *prunestate, bool initial_prune, Bitmapset **validsubplan_rtis)
void ExecEndNode(PlanState *node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
const TupleTableSlotOps TTSOpsVirtual
Definition execTuples.c:84
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps * ExecGetCommonSlotOps(PlanState **planstates, int nplans)
Definition execUtils.c:536
void UpdateChangedParamSet(PlanState *node, Bitmapset *newchg)
Definition execUtils.c:910
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition executor.h:314
#define EXEC_FLAG_MARK
Definition executor.h:70
#define palloc_object(type)
Definition fe_memutils.h:74
struct Latch * MyLatch
Definition globals.c:63
int j
Definition isn.c:78
int i
Definition isn.c:77
void ResetLatch(Latch *latch)
Definition latch.c:374
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:698
@ LW_EXCLUSIVE
Definition lwlock.h:112
void * palloc0(Size size)
Definition mcxt.c:1417
void * palloc(Size size)
Definition mcxt.c:1387
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
void ExecEndAppend(AppendState *node)
Definition nodeAppend.c:402
static void ExecAppendAsyncBegin(AppendState *node)
Definition nodeAppend.c:878
static void ExecAppendAsyncEventWait(AppendState *node)
void ExecReScanAppend(AppendState *node)
Definition nodeAppend.c:422
static void classify_matching_subplans(AppendState *node)
static void mark_invalid_subplans_as_finished(AppendState *node)
Definition nodeAppend.c:844
static TupleTableSlot * ExecAppend(PlanState *pstate)
Definition nodeAppend.c:304
static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
Definition nodeAppend.c:979
static bool choose_next_subplan_for_leader(AppendState *node)
Definition nodeAppend.c:636
static bool choose_next_subplan_for_worker(AppendState *node)
Definition nodeAppend.c:718
void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
Definition nodeAppend.c:540
#define INVALID_SUBPLAN_INDEX
Definition nodeAppend.c:84
void ExecAsyncAppendResponse(AsyncRequest *areq)
#define EVENT_BUFFER_SIZE
Definition nodeAppend.c:85
AppendState * ExecInitAppend(Append *node, EState *estate, int eflags)
Definition nodeAppend.c:110
static bool choose_next_subplan_locally(AppendState *node)
Definition nodeAppend.c:570
void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
Definition nodeAppend.c:556
void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt)
Definition nodeAppend.c:519
void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt)
Definition nodeAppend.c:500
static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
Definition nodeAppend.c:930
#define IsA(nodeptr, _type_)
Definition nodes.h:164
#define makeNode(_type_)
Definition nodes.h:161
#define castNode(_type_, nodeptr)
Definition nodes.h:182
static int list_length(const List *l)
Definition pg_list.h:152
static void * list_nth(const List *list, int n)
Definition pg_list.h:299
#define PGINVALID_SOCKET
Definition port.h:31
static int fb(int x)
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
#define ScanDirectionIsForward(direction)
Definition sdir.h:64
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition shm_toc.c:88
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition shm_toc.c:171
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition shm_toc.c:232
#define shm_toc_estimate_chunk(e, sz)
Definition shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition shm_toc.h:53
Size add_size(Size s1, Size s2)
Definition shmem.c:482
struct PartitionPruneState * as_prune_state
Definition execnodes.h:1519
Bitmapset * as_valid_asyncplans
Definition execnodes.h:1522
Bitmapset * as_needrequest
Definition execnodes.h:1512
bool as_syncdone
Definition execnodes.h:1509
AsyncRequest ** as_asyncrequests
Definition execnodes.h:1506
Bitmapset * as_asyncplans
Definition execnodes.h:1504
int as_nasyncresults
Definition execnodes.h:1508
struct WaitEventSet * as_eventset
Definition execnodes.h:1513
bool(* choose_next_subplan)(AppendState *)
Definition execnodes.h:1523
PlanState ** appendplans
Definition execnodes.h:1500
int as_first_partial_plan
Definition execnodes.h:1515
PlanState ps
Definition execnodes.h:1499
int as_nasyncremain
Definition execnodes.h:1511
ParallelAppendState * as_pstate
Definition execnodes.h:1517
Bitmapset * as_valid_subplans
Definition execnodes.h:1521
Size pstate_len
Definition execnodes.h:1518
TupleTableSlot ** as_asyncresults
Definition execnodes.h:1507
int as_nasyncplans
Definition execnodes.h:1505
bool as_valid_subplans_identified
Definition execnodes.h:1520
int first_partial_plan
Definition plannodes.h:416
int part_prune_index
Definition plannodes.h:423
Bitmapset * apprelids
Definition plannodes.h:401
List * appendplans
Definition plannodes.h:407
bool callback_pending
Definition execnodes.h:645
ScanDirection es_direction
Definition execnodes.h:662
struct EPQState * es_epq_active
Definition execnodes.h:745
bool pa_finished[FLEXIBLE_ARRAY_MEMBER]
Definition nodeAppend.c:81
shm_toc_estimator estimator
Definition parallel.h:43
shm_toc * toc
Definition parallel.h:46
Bitmapset * execparamids
Plan * plan
Definition execnodes.h:1168
EState * state
Definition execnodes.h:1170
Bitmapset * chgParam
Definition execnodes.h:1200
TupleTableSlot * ps_ResultTupleSlot
Definition execnodes.h:1206
int plan_node_id
Definition plannodes.h:233
void * user_data
uint32 events
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:457
#define TupIsNull(slot)
Definition tuptable.h:309
int GetNumRegisteredWaitEvents(WaitEventSet *set)
int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
void FreeWaitEventSet(WaitEventSet *set)
WaitEventSet * CreateWaitEventSet(ResourceOwner resowner, int nevents)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET