PostgreSQL Source Code git master
Loading...
Searching...
No Matches
nodeAppend.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * nodeAppend.c
4 * routines to handle append nodes.
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/executor/nodeAppend.c
12 *
13 *-------------------------------------------------------------------------
14 */
15/* INTERFACE ROUTINES
16 * ExecInitAppend - initialize the append node
17 * ExecAppend - retrieve the next tuple from the node
18 * ExecEndAppend - shut down the append node
19 * ExecReScanAppend - rescan the append node
20 *
21 * NOTES
22 * Each append node contains a list of one or more subplans which
23 * must be iteratively processed (forwards or backwards).
24 * Tuples are retrieved by executing the 'whichplan'th subplan
25 * until the subplan stops returning tuples, at which point that
26 * plan is shut down and the next started up.
27 *
28 * Append nodes don't make use of their left and right
29 * subtrees, rather they maintain a list of subplans so
30 * a typical append node looks like this in the plan tree:
31 *
32 * ...
33 * /
34 * Append -------+------+------+--- nil
35 * / \ | | |
36 * nil nil ... ... ...
37 * subplans
38 *
39 * Append nodes are currently used for unions, and to support
40 * inheritance queries, where several relations need to be scanned.
41 * For example, in our standard person/student/employee/student-emp
42 * example, where student and employee inherit from person
43 * and student-emp inherits from student and employee, the
44 * query:
45 *
46 * select name from person
47 *
48 * generates the plan:
49 *
50 * |
51 * Append -------+-------+--------+--------+
52 * / \ | | | |
53 * nil nil Scan Scan Scan Scan
54 * | | | |
55 * person employee student student-emp
56 */
57
58#include "postgres.h"
59
60#include "executor/execAsync.h"
62#include "executor/executor.h"
63#include "executor/nodeAppend.h"
64#include "miscadmin.h"
65#include "pgstat.h"
66#include "storage/latch.h"
67#include "storage/lwlock.h"
68#include "utils/wait_event.h"
69
70/* Shared state for parallel-aware Append. */
72{
73 LWLock pa_lock; /* mutual exclusion to choose next subplan */
74 int pa_next_plan; /* next plan to choose by any worker */
75
76 /*
77 * pa_finished[i] should be true if no more workers should select subplan
78 * i. for a non-partial plan, this should be set to true as soon as a
79 * worker selects the plan; for a partial plan, it remains false until
80 * some worker executes the plan to completion.
81 */
83};
84
85#define INVALID_SUBPLAN_INDEX -1
86#define EVENT_BUFFER_SIZE 16
87
88static TupleTableSlot *ExecAppend(PlanState *pstate);
93static void ExecAppendAsyncBegin(AppendState *node);
94static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
95static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
96static void ExecAppendAsyncEventWait(AppendState *node);
98
99/* ----------------------------------------------------------------
100 * ExecInitAppend
101 *
102 * Begin all of the subscans of the append node.
103 *
104 * (This is potentially wasteful, since the entire result of the
105 * append node may not be scanned, but this way all of the
106 * structures get allocated in the executor's top level memory
107 * block instead of that of the call to ExecAppend.)
108 * ----------------------------------------------------------------
109 */
111ExecInitAppend(Append *node, EState *estate, int eflags)
112{
118 int nplans;
119 int nasyncplans;
120 int firstvalid;
121 int i,
122 j;
123
124 /* check for unsupported flags */
125 Assert(!(eflags & EXEC_FLAG_MARK));
126
127 /*
128 * create new AppendState for our append node
129 */
130 appendstate->ps.plan = (Plan *) node;
131 appendstate->ps.state = estate;
132 appendstate->ps.ExecProcNode = ExecAppend;
133
134 /* Let choose_next_subplan_* function handle setting the first subplan */
135 appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
136 appendstate->as_syncdone = false;
137 appendstate->as_begun = false;
138
139 /* If run-time partition pruning is enabled, then set that up now */
140 if (node->part_prune_index >= 0)
141 {
143
144 /*
145 * Set up pruning data structure. This also initializes the set of
146 * subplans to initialize (validsubplans) by taking into account the
147 * result of performing initial pruning if any.
148 */
151 node->part_prune_index,
152 node->apprelids,
154 appendstate->as_prune_state = prunestate;
156
157 /*
158 * When no run-time pruning is required and there's at least one
159 * subplan, we can fill as_valid_subplans immediately, preventing
160 * later calls to ExecFindMatchingSubPlans.
161 */
162 if (!prunestate->do_exec_prune && nplans > 0)
163 {
164 appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
165 appendstate->as_valid_subplans_identified = true;
166 }
167 }
168 else
169 {
170 nplans = list_length(node->appendplans);
171
172 /*
173 * When run-time partition pruning is not enabled we can just mark all
174 * subplans as valid; they must also all be initialized.
175 */
176 Assert(nplans > 0);
177 appendstate->as_valid_subplans = validsubplans =
178 bms_add_range(NULL, 0, nplans - 1);
179 appendstate->as_valid_subplans_identified = true;
180 appendstate->as_prune_state = NULL;
181 }
182
183 appendplanstates = (PlanState **) palloc(nplans *
184 sizeof(PlanState *));
185
186 /*
187 * call ExecInitNode on each of the valid plans to be executed and save
188 * the results into the appendplanstates array.
189 *
190 * While at it, find out the first valid partial plan.
191 */
192 j = 0;
194 nasyncplans = 0;
195 firstvalid = nplans;
196 i = -1;
197 while ((i = bms_next_member(validsubplans, i)) >= 0)
198 {
199 Plan *initNode = (Plan *) list_nth(node->appendplans, i);
200
201 /*
202 * Record async subplans. When executing EvalPlanQual, we treat them
203 * as sync ones; don't do this when initializing an EvalPlanQual plan
204 * tree.
205 */
206 if (initNode->async_capable && estate->es_epq_active == NULL)
207 {
209 nasyncplans++;
210 }
211
212 /*
213 * Record the lowest appendplans index which is a valid partial plan.
214 */
215 if (i >= node->first_partial_plan && j < firstvalid)
216 firstvalid = j;
217
218 appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
219 }
220
221 appendstate->as_first_partial_plan = firstvalid;
222 appendstate->appendplans = appendplanstates;
223 appendstate->as_nplans = nplans;
224
225 /*
226 * Initialize Append's result tuple type and slot. If the child plans all
227 * produce the same fixed slot type, we can use that slot type; otherwise
228 * make a virtual slot. (Note that the result slot itself is used only to
229 * return a null tuple at end of execution; real tuples are returned to
230 * the caller in the children's own result slots. What we are doing here
231 * is allowing the parent plan node to optimize if the Append will return
232 * only one kind of slot.)
233 */
235 if (appendops != NULL)
236 {
238 }
239 else
240 {
242 /* show that the output slot type is not fixed */
243 appendstate->ps.resultopsset = true;
244 appendstate->ps.resultopsfixed = false;
245 }
246
247 /* Initialize async state */
248 appendstate->as_asyncplans = asyncplans;
249 appendstate->as_nasyncplans = nasyncplans;
250 appendstate->as_asyncrequests = NULL;
251 appendstate->as_asyncresults = NULL;
252 appendstate->as_nasyncresults = 0;
253 appendstate->as_nasyncremain = 0;
254 appendstate->as_needrequest = NULL;
255 appendstate->as_eventset = NULL;
256 appendstate->as_valid_asyncplans = NULL;
257
258 if (nasyncplans > 0)
259 {
260 appendstate->as_asyncrequests = (AsyncRequest **)
261 palloc0(nplans * sizeof(AsyncRequest *));
262
263 i = -1;
264 while ((i = bms_next_member(asyncplans, i)) >= 0)
265 {
267
269 areq->requestor = (PlanState *) appendstate;
270 areq->requestee = appendplanstates[i];
271 areq->request_index = i;
272 areq->callback_pending = false;
273 areq->request_complete = false;
274 areq->result = NULL;
275
276 appendstate->as_asyncrequests[i] = areq;
277 }
278
279 appendstate->as_asyncresults = (TupleTableSlot **)
280 palloc0(nasyncplans * sizeof(TupleTableSlot *));
281
282 if (appendstate->as_valid_subplans_identified)
284 }
285
286 /*
287 * Miscellaneous initialization
288 */
289
290 appendstate->ps.ps_ProjInfo = NULL;
291
292 /* For parallel query, this will be overridden later. */
293 appendstate->choose_next_subplan = choose_next_subplan_locally;
294
295 return appendstate;
296}
297
298/* ----------------------------------------------------------------
299 * ExecAppend
300 *
301 * Handles iteration over multiple subplans.
302 * ----------------------------------------------------------------
303 */
304static TupleTableSlot *
306{
307 AppendState *node = castNode(AppendState, pstate);
308 TupleTableSlot *result;
309
310 /*
311 * If this is the first call after Init or ReScan, we need to do the
312 * initialization work.
313 */
314 if (!node->as_begun)
315 {
317 Assert(!node->as_syncdone);
318
319 /* Nothing to do if there are no subplans */
320 if (node->as_nplans == 0)
322
323 /* If there are any async subplans, begin executing them. */
324 if (node->as_nasyncplans > 0)
326
327 /*
328 * If no sync subplan has been chosen, we must choose one before
329 * proceeding.
330 */
331 if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
333
334 Assert(node->as_syncdone ||
335 (node->as_whichplan >= 0 &&
336 node->as_whichplan < node->as_nplans));
337
338 /* And we're initialized. */
339 node->as_begun = true;
340 }
341
342 for (;;)
343 {
345
347
348 /*
349 * try to get a tuple from an async subplan if any
350 */
351 if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
352 {
353 if (ExecAppendAsyncGetNext(node, &result))
354 return result;
355 Assert(!node->as_syncdone);
357 }
358
359 /*
360 * figure out which sync subplan we are currently processing
361 */
362 Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
363 subnode = node->appendplans[node->as_whichplan];
364
365 /*
366 * get a tuple from the subplan
367 */
368 result = ExecProcNode(subnode);
369
370 if (!TupIsNull(result))
371 {
372 /*
373 * If the subplan gave us something then return it as-is. We do
374 * NOT make use of the result slot that was set up in
375 * ExecInitAppend; there's no need for it.
376 */
377 return result;
378 }
379
380 /*
381 * wait or poll for async events if any. We do this before checking
382 * for the end of iteration, because it might drain the remaining
383 * async subplans.
384 */
385 if (node->as_nasyncremain > 0)
387
388 /* choose new sync subplan; if no sync/async subplans, we're done */
389 if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
391 }
392}
393
394/* ----------------------------------------------------------------
395 * ExecEndAppend
396 *
397 * Shuts down the subscans of the append node.
398 *
399 * Returns nothing of interest.
400 * ----------------------------------------------------------------
401 */
402void
404{
405 PlanState **appendplans;
406 int nplans;
407 int i;
408
409 /*
410 * get information from the node
411 */
412 appendplans = node->appendplans;
413 nplans = node->as_nplans;
414
415 /*
416 * shut down each of the subscans
417 */
418 for (i = 0; i < nplans; i++)
419 ExecEndNode(appendplans[i]);
420}
421
422void
424{
425 int nasyncplans = node->as_nasyncplans;
426 int i;
427
428 /*
429 * If any PARAM_EXEC Params used in pruning expressions have changed, then
430 * we'd better unset the valid subplans so that they are reselected for
431 * the new parameter values.
432 */
433 if (node->as_prune_state &&
434 bms_overlap(node->ps.chgParam,
436 {
437 node->as_valid_subplans_identified = false;
439 node->as_valid_subplans = NULL;
442 }
443
444 for (i = 0; i < node->as_nplans; i++)
445 {
446 PlanState *subnode = node->appendplans[i];
447
448 /*
449 * ExecReScan doesn't know about my subplans, so I have to do
450 * changed-parameter signaling myself.
451 */
452 if (node->ps.chgParam != NULL)
454
455 /*
456 * If chgParam of subnode is not null then plan will be re-scanned by
457 * first ExecProcNode or by first ExecAsyncRequest.
458 */
459 if (subnode->chgParam == NULL)
461 }
462
463 /* Reset async state */
464 if (nasyncplans > 0)
465 {
466 i = -1;
467 while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
468 {
470
471 areq->callback_pending = false;
472 areq->request_complete = false;
473 areq->result = NULL;
474 }
475
476 node->as_nasyncresults = 0;
477 node->as_nasyncremain = 0;
479 node->as_needrequest = NULL;
480 }
481
482 /* Let choose_next_subplan_* function handle setting the first subplan */
484 node->as_syncdone = false;
485 node->as_begun = false;
486}
487
488/* ----------------------------------------------------------------
489 * Parallel Append Support
490 * ----------------------------------------------------------------
491 */
492
493/* ----------------------------------------------------------------
494 * ExecAppendEstimate
495 *
496 * Compute the amount of space we'll need in the parallel
497 * query DSM, and inform pcxt->estimator about our needs.
498 * ----------------------------------------------------------------
499 */
500void
502 ParallelContext *pcxt)
503{
504 node->pstate_len =
506 sizeof(bool) * node->as_nplans);
507
510}
511
512
513/* ----------------------------------------------------------------
514 * ExecAppendInitializeDSM
515 *
516 * Set up shared state for Parallel Append.
517 * ----------------------------------------------------------------
518 */
519void
521 ParallelContext *pcxt)
522{
523 ParallelAppendState *pstate;
524
525 pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
526 memset(pstate, 0, node->pstate_len);
528 shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
529
530 node->as_pstate = pstate;
532}
533
534/* ----------------------------------------------------------------
535 * ExecAppendReInitializeDSM
536 *
537 * Reset shared state before beginning a fresh scan.
538 * ----------------------------------------------------------------
539 */
540void
542{
543 ParallelAppendState *pstate = node->as_pstate;
544
545 pstate->pa_next_plan = 0;
546 memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
547}
548
549/* ----------------------------------------------------------------
550 * ExecAppendInitializeWorker
551 *
552 * Copy relevant information from TOC into planstate, and initialize
553 * whatever is required to choose and execute the optimal subplan.
554 * ----------------------------------------------------------------
555 */
556void
562
563/* ----------------------------------------------------------------
564 * choose_next_subplan_locally
565 *
566 * Choose next sync subplan for a non-parallel-aware Append,
567 * returning false if there are no more.
568 * ----------------------------------------------------------------
569 */
570static bool
572{
573 int whichplan = node->as_whichplan;
574 int nextplan;
575
576 /* We should never be called when there are no subplans */
577 Assert(node->as_nplans > 0);
578
579 /* Nothing to do if syncdone */
580 if (node->as_syncdone)
581 return false;
582
583 /*
584 * If first call then have the bms member function choose the first valid
585 * sync subplan by initializing whichplan to -1. If there happen to be no
586 * valid sync subplans then the bms member function will handle that by
587 * returning a negative number which will allow us to exit returning a
588 * false value.
589 */
591 {
592 if (node->as_nasyncplans > 0)
593 {
594 /* We'd have filled as_valid_subplans already */
596 }
597 else if (!node->as_valid_subplans_identified)
598 {
599 node->as_valid_subplans =
601 node->as_valid_subplans_identified = true;
602 }
603
604 whichplan = -1;
605 }
606
607 /* Ensure whichplan is within the expected range */
608 Assert(whichplan >= -1 && whichplan <= node->as_nplans);
609
612 else
614
615 if (nextplan < 0)
616 {
617 /* Set as_syncdone if in async mode */
618 if (node->as_nasyncplans > 0)
619 node->as_syncdone = true;
620 return false;
621 }
622
623 node->as_whichplan = nextplan;
624
625 return true;
626}
627
628/* ----------------------------------------------------------------
629 * choose_next_subplan_for_leader
630 *
631 * Try to pick a plan which doesn't commit us to doing much
632 * work locally, so that as much work as possible is done in
633 * the workers. Cheapest subplans are at the end.
634 * ----------------------------------------------------------------
635 */
636static bool
638{
639 ParallelAppendState *pstate = node->as_pstate;
640
641 /* Backward scan is not supported by parallel-aware plans */
643
644 /* We should never be called when there are no subplans */
645 Assert(node->as_nplans > 0);
646
648
650 {
651 /* Mark just-completed subplan as finished. */
652 node->as_pstate->pa_finished[node->as_whichplan] = true;
653 }
654 else
655 {
656 /* Start with last subplan. */
657 node->as_whichplan = node->as_nplans - 1;
658
659 /*
660 * If we've yet to determine the valid subplans then do so now. If
661 * run-time pruning is disabled then the valid subplans will always be
662 * set to all subplans.
663 */
665 {
666 node->as_valid_subplans =
668 node->as_valid_subplans_identified = true;
669
670 /*
671 * Mark each invalid plan as finished to allow the loop below to
672 * select the first valid subplan.
673 */
675 }
676 }
677
678 /* Loop until we find a subplan to execute. */
679 while (pstate->pa_finished[node->as_whichplan])
680 {
681 if (node->as_whichplan == 0)
682 {
685 LWLockRelease(&pstate->pa_lock);
686 return false;
687 }
688
689 /*
690 * We needn't pay attention to as_valid_subplans here as all invalid
691 * plans have been marked as finished.
692 */
693 node->as_whichplan--;
694 }
695
696 /* If non-partial, immediately mark as finished. */
697 if (node->as_whichplan < node->as_first_partial_plan)
698 node->as_pstate->pa_finished[node->as_whichplan] = true;
699
700 LWLockRelease(&pstate->pa_lock);
701
702 return true;
703}
704
705/* ----------------------------------------------------------------
706 * choose_next_subplan_for_worker
707 *
708 * Choose next subplan for a parallel-aware Append, returning
709 * false if there are no more.
710 *
711 * We start from the first plan and advance through the list;
712 * when we get back to the end, we loop back to the first
713 * partial plan. This assigns the non-partial plans first in
714 * order of descending cost and then spreads out the workers
715 * as evenly as possible across the remaining partial plans.
716 * ----------------------------------------------------------------
717 */
718static bool
720{
721 ParallelAppendState *pstate = node->as_pstate;
722
723 /* Backward scan is not supported by parallel-aware plans */
725
726 /* We should never be called when there are no subplans */
727 Assert(node->as_nplans > 0);
728
730
731 /* Mark just-completed subplan as finished. */
733 node->as_pstate->pa_finished[node->as_whichplan] = true;
734
735 /*
736 * If we've yet to determine the valid subplans then do so now. If
737 * run-time pruning is disabled then the valid subplans will always be set
738 * to all subplans.
739 */
740 else if (!node->as_valid_subplans_identified)
741 {
742 node->as_valid_subplans =
744 node->as_valid_subplans_identified = true;
745
747 }
748
749 /* If all the plans are already done, we have nothing to do */
750 if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
751 {
752 LWLockRelease(&pstate->pa_lock);
753 return false;
754 }
755
756 /* Save the plan from which we are starting the search. */
757 node->as_whichplan = pstate->pa_next_plan;
758
759 /* Loop until we find a valid subplan to execute. */
760 while (pstate->pa_finished[pstate->pa_next_plan])
761 {
762 int nextplan;
763
765 pstate->pa_next_plan);
766 if (nextplan >= 0)
767 {
768 /* Advance to the next valid plan. */
769 pstate->pa_next_plan = nextplan;
770 }
771 else if (node->as_whichplan > node->as_first_partial_plan)
772 {
773 /*
774 * Try looping back to the first valid partial plan, if there is
775 * one. If there isn't, arrange to bail out below.
776 */
778 node->as_first_partial_plan - 1);
779 pstate->pa_next_plan =
780 nextplan < 0 ? node->as_whichplan : nextplan;
781 }
782 else
783 {
784 /*
785 * At last plan, and either there are no partial plans or we've
786 * tried them all. Arrange to bail out.
787 */
788 pstate->pa_next_plan = node->as_whichplan;
789 }
790
791 if (pstate->pa_next_plan == node->as_whichplan)
792 {
793 /* We've tried everything! */
795 LWLockRelease(&pstate->pa_lock);
796 return false;
797 }
798 }
799
800 /* Pick the plan we found, and advance pa_next_plan one more time. */
801 node->as_whichplan = pstate->pa_next_plan;
803 pstate->pa_next_plan);
804
805 /*
806 * If there are no more valid plans then try setting the next plan to the
807 * first valid partial plan.
808 */
809 if (pstate->pa_next_plan < 0)
810 {
812 node->as_first_partial_plan - 1);
813
814 if (nextplan >= 0)
815 pstate->pa_next_plan = nextplan;
816 else
817 {
818 /*
819 * There are no valid partial plans, and we already chose the last
820 * non-partial plan; so flag that there's nothing more for our
821 * fellow workers to do.
822 */
824 }
825 }
826
827 /* If non-partial, immediately mark as finished. */
828 if (node->as_whichplan < node->as_first_partial_plan)
829 node->as_pstate->pa_finished[node->as_whichplan] = true;
830
831 LWLockRelease(&pstate->pa_lock);
832
833 return true;
834}
835
836/*
837 * mark_invalid_subplans_as_finished
838 * Marks the ParallelAppendState's pa_finished as true for each invalid
839 * subplan.
840 *
841 * This function should only be called for parallel Append with run-time
842 * pruning enabled.
843 */
844static void
846{
847 int i;
848
849 /* Only valid to call this while in parallel Append mode */
850 Assert(node->as_pstate);
851
852 /* Shouldn't have been called when run-time pruning is not enabled */
853 Assert(node->as_prune_state);
854
855 /* Nothing to do if all plans are valid */
856 if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
857 return;
858
859 /* Mark all non-valid plans as finished */
860 for (i = 0; i < node->as_nplans; i++)
861 {
862 if (!bms_is_member(i, node->as_valid_subplans))
863 node->as_pstate->pa_finished[i] = true;
864 }
865}
866
867/* ----------------------------------------------------------------
868 * Asynchronous Append Support
869 * ----------------------------------------------------------------
870 */
871
872/* ----------------------------------------------------------------
873 * ExecAppendAsyncBegin
874 *
875 * Begin executing designed async-capable subplans.
876 * ----------------------------------------------------------------
877 */
878static void
880{
881 int i;
882
883 /* Backward scan is not supported by async-aware Appends. */
885
886 /* We should never be called when there are no subplans */
887 Assert(node->as_nplans > 0);
888
889 /* We should never be called when there are no async subplans. */
890 Assert(node->as_nasyncplans > 0);
891
892 /* If we've yet to determine the valid subplans then do so now. */
894 {
895 node->as_valid_subplans =
897 node->as_valid_subplans_identified = true;
898
900 }
901
902 /* Initialize state variables. */
905
906 /* Nothing to do if there are no valid async subplans. */
907 if (node->as_nasyncremain == 0)
908 return;
909
910 /* Make a request for each of the valid async subplans. */
911 i = -1;
912 while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
913 {
915
916 Assert(areq->request_index == i);
917 Assert(!areq->callback_pending);
918
919 /* Do the actual work. */
921 }
922}
923
924/* ----------------------------------------------------------------
925 * ExecAppendAsyncGetNext
926 *
927 * Get the next tuple from any of the asynchronous subplans.
928 * ----------------------------------------------------------------
929 */
930static bool
932{
933 *result = NULL;
934
935 /* We should never be called when there are no valid async subplans. */
936 Assert(node->as_nasyncremain > 0);
937
938 /* Request a tuple asynchronously. */
939 if (ExecAppendAsyncRequest(node, result))
940 return true;
941
942 while (node->as_nasyncremain > 0)
943 {
945
946 /* Wait or poll for async events. */
948
949 /* Request a tuple asynchronously. */
950 if (ExecAppendAsyncRequest(node, result))
951 return true;
952
953 /* Break from loop if there's any sync subplan that isn't complete. */
954 if (!node->as_syncdone)
955 break;
956 }
957
958 /*
959 * If all sync subplans are complete, we're totally done scanning the
960 * given node. Otherwise, we're done with the asynchronous stuff but must
961 * continue scanning the sync subplans.
962 */
963 if (node->as_syncdone)
964 {
965 Assert(node->as_nasyncremain == 0);
966 *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
967 return true;
968 }
969
970 return false;
971}
972
973/* ----------------------------------------------------------------
974 * ExecAppendAsyncRequest
975 *
976 * Request a tuple asynchronously.
977 * ----------------------------------------------------------------
978 */
979static bool
981{
983 int i;
984
985 /* Nothing to do if there are no async subplans needing a new request. */
986 if (bms_is_empty(node->as_needrequest))
987 {
988 Assert(node->as_nasyncresults == 0);
989 return false;
990 }
991
992 /*
993 * If there are any asynchronously-generated results that have not yet
994 * been returned, we have nothing to do; just return one of them.
995 */
996 if (node->as_nasyncresults > 0)
997 {
998 --node->as_nasyncresults;
999 *result = node->as_asyncresults[node->as_nasyncresults];
1000 return true;
1001 }
1002
1003 /* Make a new request for each of the async subplans that need it. */
1005 node->as_needrequest = NULL;
1006 i = -1;
1007 while ((i = bms_next_member(needrequest, i)) >= 0)
1008 {
1010
1011 /* Do the actual work. */
1013 }
1015
1016 /* Return one of the asynchronously-generated results if any. */
1017 if (node->as_nasyncresults > 0)
1018 {
1019 --node->as_nasyncresults;
1020 *result = node->as_asyncresults[node->as_nasyncresults];
1021 return true;
1022 }
1023
1024 return false;
1025}
1026
1027/* ----------------------------------------------------------------
1028 * ExecAppendAsyncEventWait
1029 *
1030 * Wait or poll for file descriptor events and fire callbacks.
1031 * ----------------------------------------------------------------
1032 */
1033static void
1035{
1036 int nevents = node->as_nasyncplans + 2;
1037 long timeout = node->as_syncdone ? -1 : 0;
1039 int noccurred;
1040 int i;
1041
1042 /* We should never be called when there are no valid async subplans. */
1043 Assert(node->as_nasyncremain > 0);
1044
1045 Assert(node->as_eventset == NULL);
1048 NULL, NULL);
1049
1050 /* Give each waiting subplan a chance to add an event. */
1051 i = -1;
1052 while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1053 {
1055
1056 if (areq->callback_pending)
1058 }
1059
1060 /*
1061 * No need for further processing if none of the subplans configured any
1062 * events.
1063 */
1065 {
1067 node->as_eventset = NULL;
1068 return;
1069 }
1070
1071 /*
1072 * Add the process latch to the set, so that we wake up to process the
1073 * standard interrupts with CHECK_FOR_INTERRUPTS().
1074 *
1075 * NOTE: For historical reasons, it's important that this is added to the
1076 * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely,
1077 * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
1078 * any other events are in the set. That's a poor design, it's
1079 * questionable for postgres_fdw to be doing that in the first place, but
1080 * we cannot change it now. The pattern has possibly been copied to other
1081 * extensions too.
1082 */
1084 MyLatch, NULL);
1085
1086 /* Return at most EVENT_BUFFER_SIZE events in one call. */
1087 if (nevents > EVENT_BUFFER_SIZE)
1088 nevents = EVENT_BUFFER_SIZE;
1089
1090 /*
1091 * If the timeout is -1, wait until at least one event occurs. If the
1092 * timeout is 0, poll for events, but do not wait at all.
1093 */
1095 nevents, WAIT_EVENT_APPEND_READY);
1097 node->as_eventset = NULL;
1098 if (noccurred == 0)
1099 return;
1100
1101 /* Deliver notifications. */
1102 for (i = 0; i < noccurred; i++)
1103 {
1104 WaitEvent *w = &occurred_event[i];
1105
1106 /*
1107 * Each waiting subplan should have registered its wait event with
1108 * user_data pointing back to its AsyncRequest.
1109 */
1110 if ((w->events & WL_SOCKET_READABLE) != 0)
1111 {
1113
1114 if (areq->callback_pending)
1115 {
1116 /*
1117 * Mark it as no longer needing a callback. We must do this
1118 * before dispatching the callback in case the callback resets
1119 * the flag.
1120 */
1121 areq->callback_pending = false;
1122
1123 /* Do the actual work. */
1125 }
1126 }
1127
1128 /* Handle standard interrupts */
1129 if ((w->events & WL_LATCH_SET) != 0)
1130 {
1133 }
1134 }
1135}
1136
1137/* ----------------------------------------------------------------
1138 * ExecAsyncAppendResponse
1139 *
1140 * Receive a response from an asynchronous request we made.
1141 * ----------------------------------------------------------------
1142 */
1143void
1145{
1146 AppendState *node = (AppendState *) areq->requestor;
1147 TupleTableSlot *slot = areq->result;
1148
1149 /* The result should be a TupleTableSlot or NULL. */
1150 Assert(slot == NULL || IsA(slot, TupleTableSlot));
1151
1152 /* Nothing to do if the request is pending. */
1153 if (!areq->request_complete)
1154 {
1155 /* The request would have been pending for a callback. */
1156 Assert(areq->callback_pending);
1157 return;
1158 }
1159
1160 /* If the result is NULL or an empty slot, there's nothing more to do. */
1161 if (TupIsNull(slot))
1162 {
1163 /* The ending subplan wouldn't have been pending for a callback. */
1164 Assert(!areq->callback_pending);
1165 --node->as_nasyncremain;
1166 return;
1167 }
1168
1169 /* Save result so we can return it. */
1170 Assert(node->as_nasyncresults < node->as_nasyncplans);
1171 node->as_asyncresults[node->as_nasyncresults++] = slot;
1172
1173 /*
1174 * Mark the subplan that returned a result as ready for a new request. We
1175 * don't launch another one here immediately because it might complete.
1176 */
1178 areq->request_index);
1179}
1180
1181/* ----------------------------------------------------------------
1182 * classify_matching_subplans
1183 *
1184 * Classify the node's as_valid_subplans into sync ones and
1185 * async ones, adjust it to contain sync ones only, and save
1186 * async ones in the node's as_valid_asyncplans.
1187 * ----------------------------------------------------------------
1188 */
1189static void
1191{
1193
1196
1197 /* Nothing to do if there are no valid subplans. */
1199 {
1200 node->as_syncdone = true;
1201 node->as_nasyncremain = 0;
1202 return;
1203 }
1204
1205 /* Nothing to do if there are no valid async subplans. */
1206 if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1207 {
1208 node->as_nasyncremain = 0;
1209 return;
1210 }
1211
1212 /* Get valid async subplans. */
1214 node->as_valid_subplans);
1215
1216 /* Adjust the valid subplans to contain sync subplans only. */
1219
1220 /* Save valid async subplans. */
1222}
int bms_prev_member(const Bitmapset *a, int prevbit)
Definition bitmapset.c:1350
Bitmapset * bms_intersect(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:292
int bms_next_member(const Bitmapset *a, int prevbit)
Definition bitmapset.c:1290
Bitmapset * bms_del_members(Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:1145
Bitmapset * bms_add_range(Bitmapset *a, int lower, int upper)
Definition bitmapset.c:1003
void bms_free(Bitmapset *a)
Definition bitmapset.c:239
int bms_num_members(const Bitmapset *a)
Definition bitmapset.c:744
bool bms_is_member(int x, const Bitmapset *a)
Definition bitmapset.c:510
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition bitmapset.c:799
bool bms_overlap(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:575
#define bms_is_empty(a)
Definition bitmapset.h:118
#define Assert(condition)
Definition c.h:945
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:552
void ExecReScan(PlanState *node)
Definition execAmi.c:78
void ExecAsyncRequest(AsyncRequest *areq)
Definition execAsync.c:27
void ExecAsyncConfigureWait(AsyncRequest *areq)
Definition execAsync.c:63
void ExecAsyncNotify(AsyncRequest *areq)
Definition execAsync.c:89
PartitionPruneState * ExecInitPartitionExecPruning(PlanState *planstate, int n_total_subplans, int part_prune_index, Bitmapset *relids, Bitmapset **initially_valid_subplans)
Bitmapset * ExecFindMatchingSubPlans(PartitionPruneState *prunestate, bool initial_prune, Bitmapset **validsubplan_rtis)
void ExecEndNode(PlanState *node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
const TupleTableSlotOps TTSOpsVirtual
Definition execTuples.c:84
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps * ExecGetCommonSlotOps(PlanState **planstates, int nplans)
Definition execUtils.c:541
void UpdateChangedParamSet(PlanState *node, Bitmapset *newchg)
Definition execUtils.c:915
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition executor.h:315
#define EXEC_FLAG_MARK
Definition executor.h:71
#define palloc_object(type)
Definition fe_memutils.h:74
struct Latch * MyLatch
Definition globals.c:63
int j
Definition isn.c:78
int i
Definition isn.c:77
void ResetLatch(Latch *latch)
Definition latch.c:374
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1177
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1794
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:699
@ LW_EXCLUSIVE
Definition lwlock.h:112
void * palloc0(Size size)
Definition mcxt.c:1417
void * palloc(Size size)
Definition mcxt.c:1387
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
void ExecEndAppend(AppendState *node)
Definition nodeAppend.c:403
static void ExecAppendAsyncBegin(AppendState *node)
Definition nodeAppend.c:879
static void ExecAppendAsyncEventWait(AppendState *node)
void ExecReScanAppend(AppendState *node)
Definition nodeAppend.c:423
static void classify_matching_subplans(AppendState *node)
static void mark_invalid_subplans_as_finished(AppendState *node)
Definition nodeAppend.c:845
static TupleTableSlot * ExecAppend(PlanState *pstate)
Definition nodeAppend.c:305
static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
Definition nodeAppend.c:980
static bool choose_next_subplan_for_leader(AppendState *node)
Definition nodeAppend.c:637
static bool choose_next_subplan_for_worker(AppendState *node)
Definition nodeAppend.c:719
void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
Definition nodeAppend.c:541
#define INVALID_SUBPLAN_INDEX
Definition nodeAppend.c:85
void ExecAsyncAppendResponse(AsyncRequest *areq)
#define EVENT_BUFFER_SIZE
Definition nodeAppend.c:86
AppendState * ExecInitAppend(Append *node, EState *estate, int eflags)
Definition nodeAppend.c:111
static bool choose_next_subplan_locally(AppendState *node)
Definition nodeAppend.c:571
void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
Definition nodeAppend.c:557
void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt)
Definition nodeAppend.c:520
void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt)
Definition nodeAppend.c:501
static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
Definition nodeAppend.c:931
#define IsA(nodeptr, _type_)
Definition nodes.h:164
#define makeNode(_type_)
Definition nodes.h:161
#define castNode(_type_, nodeptr)
Definition nodes.h:182
static int list_length(const List *l)
Definition pg_list.h:152
static void * list_nth(const List *list, int n)
Definition pg_list.h:299
#define PGINVALID_SOCKET
Definition port.h:31
static int fb(int x)
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
#define ScanDirectionIsForward(direction)
Definition sdir.h:64
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition shm_toc.c:88
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition shm_toc.c:171
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition shm_toc.c:232
#define shm_toc_estimate_chunk(e, sz)
Definition shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition shm_toc.h:53
Size add_size(Size s1, Size s2)
Definition shmem.c:485
struct PartitionPruneState * as_prune_state
Definition execnodes.h:1528
Bitmapset * as_valid_asyncplans
Definition execnodes.h:1531
Bitmapset * as_needrequest
Definition execnodes.h:1521
bool as_syncdone
Definition execnodes.h:1518
AsyncRequest ** as_asyncrequests
Definition execnodes.h:1515
Bitmapset * as_asyncplans
Definition execnodes.h:1513
int as_nasyncresults
Definition execnodes.h:1517
struct WaitEventSet * as_eventset
Definition execnodes.h:1522
bool(* choose_next_subplan)(AppendState *)
Definition execnodes.h:1532
PlanState ** appendplans
Definition execnodes.h:1509
int as_first_partial_plan
Definition execnodes.h:1524
PlanState ps
Definition execnodes.h:1508
int as_nasyncremain
Definition execnodes.h:1520
ParallelAppendState * as_pstate
Definition execnodes.h:1526
Bitmapset * as_valid_subplans
Definition execnodes.h:1530
Size pstate_len
Definition execnodes.h:1527
TupleTableSlot ** as_asyncresults
Definition execnodes.h:1516
int as_nasyncplans
Definition execnodes.h:1514
bool as_valid_subplans_identified
Definition execnodes.h:1529
int first_partial_plan
Definition plannodes.h:414
int part_prune_index
Definition plannodes.h:421
Bitmapset * apprelids
Definition plannodes.h:399
List * appendplans
Definition plannodes.h:405
bool callback_pending
Definition execnodes.h:654
ScanDirection es_direction
Definition execnodes.h:671
struct EPQState * es_epq_active
Definition execnodes.h:754
bool pa_finished[FLEXIBLE_ARRAY_MEMBER]
Definition nodeAppend.c:82
shm_toc_estimator estimator
Definition parallel.h:43
shm_toc * toc
Definition parallel.h:46
Bitmapset * execparamids
Plan * plan
Definition execnodes.h:1177
EState * state
Definition execnodes.h:1179
Bitmapset * chgParam
Definition execnodes.h:1209
TupleTableSlot * ps_ResultTupleSlot
Definition execnodes.h:1215
int plan_node_id
Definition plannodes.h:231
void * user_data
uint32 events
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:476
#define TupIsNull(slot)
Definition tuptable.h:325
int GetNumRegisteredWaitEvents(WaitEventSet *set)
int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
void FreeWaitEventSet(WaitEventSet *set)
WaitEventSet * CreateWaitEventSet(ResourceOwner resowner, int nevents)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET