PostgreSQL Source Code git master
Loading...
Searching...
No Matches
nodeAppend.c File Reference
#include "postgres.h"
#include "executor/execAsync.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
#include "executor/nodeAppend.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "utils/wait_event.h"
Include dependency graph for nodeAppend.c:

Go to the source code of this file.

Data Structures

struct  ParallelAppendState
 

Macros

#define INVALID_SUBPLAN_INDEX   -1
 
#define EVENT_BUFFER_SIZE   16
 

Functions

static TupleTableSlotExecAppend (PlanState *pstate)
 
static bool choose_next_subplan_locally (AppendState *node)
 
static bool choose_next_subplan_for_leader (AppendState *node)
 
static bool choose_next_subplan_for_worker (AppendState *node)
 
static void mark_invalid_subplans_as_finished (AppendState *node)
 
static void ExecAppendAsyncBegin (AppendState *node)
 
static bool ExecAppendAsyncGetNext (AppendState *node, TupleTableSlot **result)
 
static bool ExecAppendAsyncRequest (AppendState *node, TupleTableSlot **result)
 
static void ExecAppendAsyncEventWait (AppendState *node)
 
static void classify_matching_subplans (AppendState *node)
 
AppendStateExecInitAppend (Append *node, EState *estate, int eflags)
 
void ExecEndAppend (AppendState *node)
 
void ExecReScanAppend (AppendState *node)
 
void ExecAppendEstimate (AppendState *node, ParallelContext *pcxt)
 
void ExecAppendInitializeDSM (AppendState *node, ParallelContext *pcxt)
 
void ExecAppendReInitializeDSM (AppendState *node, ParallelContext *pcxt)
 
void ExecAppendInitializeWorker (AppendState *node, ParallelWorkerContext *pwcxt)
 
void ExecAsyncAppendResponse (AsyncRequest *areq)
 

Macro Definition Documentation

◆ EVENT_BUFFER_SIZE

#define EVENT_BUFFER_SIZE   16

Definition at line 87 of file nodeAppend.c.

◆ INVALID_SUBPLAN_INDEX

#define INVALID_SUBPLAN_INDEX   -1

Definition at line 86 of file nodeAppend.c.

Function Documentation

◆ choose_next_subplan_for_leader()

static bool choose_next_subplan_for_leader ( AppendState node)
static

Definition at line 638 of file nodeAppend.c.

639{
640 ParallelAppendState *pstate = node->as_pstate;
641
642 /* Backward scan is not supported by parallel-aware plans */
644
645 /* We should never be called when there are no subplans */
646 Assert(node->as_nplans > 0);
647
649
651 {
652 /* Mark just-completed subplan as finished. */
653 node->as_pstate->pa_finished[node->as_whichplan] = true;
654 }
655 else
656 {
657 /* Start with last subplan. */
658 node->as_whichplan = node->as_nplans - 1;
659
660 /*
661 * If we've yet to determine the valid subplans then do so now. If
662 * run-time pruning is disabled then the valid subplans will always be
663 * set to all subplans.
664 */
666 {
667 node->as_valid_subplans =
669 node->as_valid_subplans_identified = true;
670
671 /*
672 * Mark each invalid plan as finished to allow the loop below to
673 * select the first valid subplan.
674 */
676 }
677 }
678
679 /* Loop until we find a subplan to execute. */
680 while (pstate->pa_finished[node->as_whichplan])
681 {
682 if (node->as_whichplan == 0)
683 {
686 LWLockRelease(&pstate->pa_lock);
687 return false;
688 }
689
690 /*
691 * We needn't pay attention to as_valid_subplans here as all invalid
692 * plans have been marked as finished.
693 */
694 node->as_whichplan--;
695 }
696
697 /* If non-partial, immediately mark as finished. */
698 if (node->as_whichplan < node->as_first_partial_plan)
699 node->as_pstate->pa_finished[node->as_whichplan] = true;
700
701 LWLockRelease(&pstate->pa_lock);
702
703 return true;
704}
#define Assert(condition)
Definition c.h:943
Bitmapset * ExecFindMatchingSubPlans(PartitionPruneState *prunestate, bool initial_prune, Bitmapset **validsubplan_rtis)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_EXCLUSIVE
Definition lwlock.h:104
static void mark_invalid_subplans_as_finished(AppendState *node)
Definition nodeAppend.c:846
#define INVALID_SUBPLAN_INDEX
Definition nodeAppend.c:86
static int fb(int x)
#define ScanDirectionIsForward(direction)
Definition sdir.h:64
struct PartitionPruneState * as_prune_state
Definition execnodes.h:1554
int as_first_partial_plan
Definition execnodes.h:1550
PlanState ps
Definition execnodes.h:1534
ParallelAppendState * as_pstate
Definition execnodes.h:1552
Bitmapset * as_valid_subplans
Definition execnodes.h:1556
bool as_valid_subplans_identified
Definition execnodes.h:1555
ScanDirection es_direction
Definition execnodes.h:695
bool pa_finished[FLEXIBLE_ARRAY_MEMBER]
Definition nodeAppend.c:83
EState * state
Definition execnodes.h:1203

References AppendState::as_first_partial_plan, AppendState::as_nplans, AppendState::as_prune_state, AppendState::as_pstate, AppendState::as_valid_subplans, AppendState::as_valid_subplans_identified, AppendState::as_whichplan, Assert, EState::es_direction, ExecFindMatchingSubPlans(), fb(), INVALID_SUBPLAN_INDEX, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), mark_invalid_subplans_as_finished(), ParallelAppendState::pa_finished, ParallelAppendState::pa_lock, ParallelAppendState::pa_next_plan, AppendState::ps, ScanDirectionIsForward, and PlanState::state.

Referenced by ExecAppendInitializeDSM().

◆ choose_next_subplan_for_worker()

static bool choose_next_subplan_for_worker ( AppendState node)
static

Definition at line 720 of file nodeAppend.c.

721{
722 ParallelAppendState *pstate = node->as_pstate;
723
724 /* Backward scan is not supported by parallel-aware plans */
726
727 /* We should never be called when there are no subplans */
728 Assert(node->as_nplans > 0);
729
731
732 /* Mark just-completed subplan as finished. */
734 node->as_pstate->pa_finished[node->as_whichplan] = true;
735
736 /*
737 * If we've yet to determine the valid subplans then do so now. If
738 * run-time pruning is disabled then the valid subplans will always be set
739 * to all subplans.
740 */
741 else if (!node->as_valid_subplans_identified)
742 {
743 node->as_valid_subplans =
745 node->as_valid_subplans_identified = true;
746
748 }
749
750 /* If all the plans are already done, we have nothing to do */
751 if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
752 {
753 LWLockRelease(&pstate->pa_lock);
754 return false;
755 }
756
757 /* Save the plan from which we are starting the search. */
758 node->as_whichplan = pstate->pa_next_plan;
759
760 /* Loop until we find a valid subplan to execute. */
761 while (pstate->pa_finished[pstate->pa_next_plan])
762 {
763 int nextplan;
764
766 pstate->pa_next_plan);
767 if (nextplan >= 0)
768 {
769 /* Advance to the next valid plan. */
770 pstate->pa_next_plan = nextplan;
771 }
772 else if (node->as_whichplan > node->as_first_partial_plan)
773 {
774 /*
775 * Try looping back to the first valid partial plan, if there is
776 * one. If there isn't, arrange to bail out below.
777 */
779 node->as_first_partial_plan - 1);
780 pstate->pa_next_plan =
781 nextplan < 0 ? node->as_whichplan : nextplan;
782 }
783 else
784 {
785 /*
786 * At last plan, and either there are no partial plans or we've
787 * tried them all. Arrange to bail out.
788 */
789 pstate->pa_next_plan = node->as_whichplan;
790 }
791
792 if (pstate->pa_next_plan == node->as_whichplan)
793 {
794 /* We've tried everything! */
796 LWLockRelease(&pstate->pa_lock);
797 return false;
798 }
799 }
800
801 /* Pick the plan we found, and advance pa_next_plan one more time. */
802 node->as_whichplan = pstate->pa_next_plan;
804 pstate->pa_next_plan);
805
806 /*
807 * If there are no more valid plans then try setting the next plan to the
808 * first valid partial plan.
809 */
810 if (pstate->pa_next_plan < 0)
811 {
813 node->as_first_partial_plan - 1);
814
815 if (nextplan >= 0)
816 pstate->pa_next_plan = nextplan;
817 else
818 {
819 /*
820 * There are no valid partial plans, and we already chose the last
821 * non-partial plan; so flag that there's nothing more for our
822 * fellow workers to do.
823 */
825 }
826 }
827
828 /* If non-partial, immediately mark as finished. */
829 if (node->as_whichplan < node->as_first_partial_plan)
830 node->as_pstate->pa_finished[node->as_whichplan] = true;
831
832 LWLockRelease(&pstate->pa_lock);
833
834 return true;
835}
int bms_next_member(const Bitmapset *a, int prevbit)
Definition bitmapset.c:1290

References AppendState::as_first_partial_plan, AppendState::as_nplans, AppendState::as_prune_state, AppendState::as_pstate, AppendState::as_valid_subplans, AppendState::as_valid_subplans_identified, AppendState::as_whichplan, Assert, bms_next_member(), EState::es_direction, ExecFindMatchingSubPlans(), fb(), INVALID_SUBPLAN_INDEX, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), mark_invalid_subplans_as_finished(), ParallelAppendState::pa_finished, ParallelAppendState::pa_lock, ParallelAppendState::pa_next_plan, AppendState::ps, ScanDirectionIsForward, and PlanState::state.

Referenced by ExecAppendInitializeWorker().

◆ choose_next_subplan_locally()

static bool choose_next_subplan_locally ( AppendState node)
static

Definition at line 572 of file nodeAppend.c.

573{
574 int whichplan = node->as_whichplan;
575 int nextplan;
576
577 /* We should never be called when there are no subplans */
578 Assert(node->as_nplans > 0);
579
580 /* Nothing to do if syncdone */
581 if (node->as_syncdone)
582 return false;
583
584 /*
585 * If first call then have the bms member function choose the first valid
586 * sync subplan by initializing whichplan to -1. If there happen to be no
587 * valid sync subplans then the bms member function will handle that by
588 * returning a negative number which will allow us to exit returning a
589 * false value.
590 */
592 {
593 if (node->as_nasyncplans > 0)
594 {
595 /* We'd have filled as_valid_subplans already */
597 }
598 else if (!node->as_valid_subplans_identified)
599 {
600 node->as_valid_subplans =
602 node->as_valid_subplans_identified = true;
603 }
604
605 whichplan = -1;
606 }
607
608 /* Ensure whichplan is within the expected range */
609 Assert(whichplan >= -1 && whichplan <= node->as_nplans);
610
613 else
615
616 if (nextplan < 0)
617 {
618 /* Set as_syncdone if in async mode */
619 if (node->as_nasyncplans > 0)
620 node->as_syncdone = true;
621 return false;
622 }
623
624 node->as_whichplan = nextplan;
625
626 return true;
627}
int bms_prev_member(const Bitmapset *a, int prevbit)
Definition bitmapset.c:1352
bool as_syncdone
Definition execnodes.h:1544
int as_nasyncplans
Definition execnodes.h:1540

References AppendState::as_nasyncplans, AppendState::as_nplans, AppendState::as_prune_state, AppendState::as_syncdone, AppendState::as_valid_subplans, AppendState::as_valid_subplans_identified, AppendState::as_whichplan, Assert, bms_next_member(), bms_prev_member(), EState::es_direction, ExecFindMatchingSubPlans(), fb(), INVALID_SUBPLAN_INDEX, AppendState::ps, ScanDirectionIsForward, and PlanState::state.

Referenced by ExecInitAppend().

◆ classify_matching_subplans()

static void classify_matching_subplans ( AppendState node)
static

Definition at line 1191 of file nodeAppend.c.

1192{
1194
1197
1198 /* Nothing to do if there are no valid subplans. */
1200 {
1201 node->as_syncdone = true;
1202 node->as_nasyncremain = 0;
1203 return;
1204 }
1205
1206 /* Nothing to do if there are no valid async subplans. */
1207 if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1208 {
1209 node->as_nasyncremain = 0;
1210 return;
1211 }
1212
1213 /* Get valid async subplans. */
1215 node->as_valid_subplans);
1216
1217 /* Adjust the valid subplans to contain sync subplans only. */
1220
1221 /* Save valid async subplans. */
1223}
Bitmapset * bms_intersect(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:292
Bitmapset * bms_del_members(Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:1145
bool bms_overlap(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:575
#define bms_is_empty(a)
Definition bitmapset.h:118
Bitmapset * as_valid_asyncplans
Definition execnodes.h:1557
Bitmapset * as_asyncplans
Definition execnodes.h:1539
int as_nasyncremain
Definition execnodes.h:1546

References AppendState::as_asyncplans, AppendState::as_nasyncremain, AppendState::as_syncdone, AppendState::as_valid_asyncplans, AppendState::as_valid_subplans, AppendState::as_valid_subplans_identified, Assert, bms_del_members(), bms_intersect(), bms_is_empty, bms_overlap(), and fb().

Referenced by ExecAppendAsyncBegin(), and ExecInitAppend().

◆ ExecAppend()

static TupleTableSlot * ExecAppend ( PlanState pstate)
static

Definition at line 306 of file nodeAppend.c.

307{
308 AppendState *node = castNode(AppendState, pstate);
310
311 /*
312 * If this is the first call after Init or ReScan, we need to do the
313 * initialization work.
314 */
315 if (!node->as_begun)
316 {
318 Assert(!node->as_syncdone);
319
320 /* Nothing to do if there are no subplans */
321 if (node->as_nplans == 0)
323
324 /* If there are any async subplans, begin executing them. */
325 if (node->as_nasyncplans > 0)
327
328 /*
329 * If no sync subplan has been chosen, we must choose one before
330 * proceeding.
331 */
332 if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
334
335 Assert(node->as_syncdone ||
336 (node->as_whichplan >= 0 &&
337 node->as_whichplan < node->as_nplans));
338
339 /* And we're initialized. */
340 node->as_begun = true;
341 }
342
343 for (;;)
344 {
346
348
349 /*
350 * try to get a tuple from an async subplan if any
351 */
352 if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
353 {
354 if (ExecAppendAsyncGetNext(node, &result))
355 return result;
356 Assert(!node->as_syncdone);
358 }
359
360 /*
361 * figure out which sync subplan we are currently processing
362 */
363 Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
364 subnode = node->appendplans[node->as_whichplan];
365
366 /*
367 * get a tuple from the subplan
368 */
370
371 if (!TupIsNull(result))
372 {
373 /*
374 * If the subplan gave us something then return it as-is. We do
375 * NOT make use of the result slot that was set up in
376 * ExecInitAppend; there's no need for it.
377 */
378 return result;
379 }
380
381 /*
382 * wait or poll for async events if any. We do this before checking
383 * for the end of iteration, because it might drain the remaining
384 * async subplans.
385 */
386 if (node->as_nasyncremain > 0)
388
389 /* choose new sync subplan; if no sync/async subplans, we're done */
390 if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
392 }
393}
uint32 result
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition executor.h:322
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
static void ExecAppendAsyncBegin(AppendState *node)
Definition nodeAppend.c:880
static void ExecAppendAsyncEventWait(AppendState *node)
static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
Definition nodeAppend.c:932
#define castNode(_type_, nodeptr)
Definition nodes.h:182
Bitmapset * as_needrequest
Definition execnodes.h:1547
bool(* choose_next_subplan)(AppendState *)
Definition execnodes.h:1558
PlanState ** appendplans
Definition execnodes.h:1535
TupleTableSlot * ps_ResultTupleSlot
Definition execnodes.h:1241
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:476
#define TupIsNull(slot)
Definition tuptable.h:325

References AppendState::appendplans, AppendState::as_begun, AppendState::as_nasyncplans, AppendState::as_nasyncremain, AppendState::as_needrequest, AppendState::as_nplans, AppendState::as_syncdone, AppendState::as_whichplan, Assert, bms_is_empty, castNode, CHECK_FOR_INTERRUPTS, AppendState::choose_next_subplan, ExecAppendAsyncBegin(), ExecAppendAsyncEventWait(), ExecAppendAsyncGetNext(), ExecClearTuple(), ExecProcNode(), fb(), INVALID_SUBPLAN_INDEX, AppendState::ps, PlanState::ps_ResultTupleSlot, result, and TupIsNull.

Referenced by ExecInitAppend().

◆ ExecAppendAsyncBegin()

static void ExecAppendAsyncBegin ( AppendState node)
static

Definition at line 880 of file nodeAppend.c.

881{
882 int i;
883
884 /* Backward scan is not supported by async-aware Appends. */
886
887 /* We should never be called when there are no subplans */
888 Assert(node->as_nplans > 0);
889
890 /* We should never be called when there are no async subplans. */
891 Assert(node->as_nasyncplans > 0);
892
893 /* If we've yet to determine the valid subplans then do so now. */
895 {
896 node->as_valid_subplans =
898 node->as_valid_subplans_identified = true;
899
901 }
902
903 /* Initialize state variables. */
906
907 /* Nothing to do if there are no valid async subplans. */
908 if (node->as_nasyncremain == 0)
909 return;
910
911 /* Make a request for each of the valid async subplans. */
912 i = -1;
913 while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
914 {
916
917 Assert(areq->request_index == i);
918 Assert(!areq->callback_pending);
919
920 /* Do the actual work. */
922 }
923}
int bms_num_members(const Bitmapset *a)
Definition bitmapset.c:744
void ExecAsyncRequest(AsyncRequest *areq)
Definition execAsync.c:27
int i
Definition isn.c:77
static void classify_matching_subplans(AppendState *node)
AsyncRequest ** as_asyncrequests
Definition execnodes.h:1541

References AppendState::as_asyncrequests, AppendState::as_nasyncplans, AppendState::as_nasyncremain, AppendState::as_nplans, AppendState::as_prune_state, AppendState::as_syncdone, AppendState::as_valid_asyncplans, AppendState::as_valid_subplans, AppendState::as_valid_subplans_identified, Assert, bms_is_empty, bms_next_member(), bms_num_members(), classify_matching_subplans(), EState::es_direction, ExecAsyncRequest(), ExecFindMatchingSubPlans(), fb(), i, AppendState::ps, ScanDirectionIsForward, and PlanState::state.

Referenced by ExecAppend().

◆ ExecAppendAsyncEventWait()

static void ExecAppendAsyncEventWait ( AppendState node)
static

Definition at line 1035 of file nodeAppend.c.

1036{
1037 int nevents = node->as_nasyncplans + 2;
1038 long timeout = node->as_syncdone ? -1 : 0;
1040 int noccurred;
1041 int i;
1042
1043 /* We should never be called when there are no valid async subplans. */
1044 Assert(node->as_nasyncremain > 0);
1045
1046 Assert(node->as_eventset == NULL);
1049 NULL, NULL);
1050
1051 /* Give each waiting subplan a chance to add an event. */
1052 i = -1;
1053 while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1054 {
1056
1057 if (areq->callback_pending)
1059 }
1060
1061 /*
1062 * No need for further processing if none of the subplans configured any
1063 * events.
1064 */
1066 {
1068 node->as_eventset = NULL;
1069 return;
1070 }
1071
1072 /*
1073 * Add the process latch to the set, so that we wake up to process the
1074 * standard interrupts with CHECK_FOR_INTERRUPTS().
1075 *
1076 * NOTE: For historical reasons, it's important that this is added to the
1077 * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely,
1078 * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
1079 * any other events are in the set. That's a poor design, it's
1080 * questionable for postgres_fdw to be doing that in the first place, but
1081 * we cannot change it now. The pattern has possibly been copied to other
1082 * extensions too.
1083 */
1085 MyLatch, NULL);
1086
1087 /* Return at most EVENT_BUFFER_SIZE events in one call. */
1088 if (nevents > EVENT_BUFFER_SIZE)
1089 nevents = EVENT_BUFFER_SIZE;
1090
1091 /*
1092 * If the timeout is -1, wait until at least one event occurs. If the
1093 * timeout is 0, poll for events, but do not wait at all.
1094 */
1096 nevents, WAIT_EVENT_APPEND_READY);
1098 node->as_eventset = NULL;
1099 if (noccurred == 0)
1100 return;
1101
1102 /* Deliver notifications. */
1103 for (i = 0; i < noccurred; i++)
1104 {
1105 WaitEvent *w = &occurred_event[i];
1106
1107 /*
1108 * Each waiting subplan should have registered its wait event with
1109 * user_data pointing back to its AsyncRequest.
1110 */
1111 if ((w->events & WL_SOCKET_READABLE) != 0)
1112 {
1114
1115 if (areq->callback_pending)
1116 {
1117 /*
1118 * Mark it as no longer needing a callback. We must do this
1119 * before dispatching the callback in case the callback resets
1120 * the flag.
1121 */
1122 areq->callback_pending = false;
1123
1124 /* Do the actual work. */
1126 }
1127 }
1128
1129 /* Handle standard interrupts */
1130 if ((w->events & WL_LATCH_SET) != 0)
1131 {
1134 }
1135 }
1136}
void ExecAsyncConfigureWait(AsyncRequest *areq)
Definition execAsync.c:63
void ExecAsyncNotify(AsyncRequest *areq)
Definition execAsync.c:89
struct Latch * MyLatch
Definition globals.c:65
void ResetLatch(Latch *latch)
Definition latch.c:374
#define EVENT_BUFFER_SIZE
Definition nodeAppend.c:87
#define PGINVALID_SOCKET
Definition port.h:31
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
struct WaitEventSet * as_eventset
Definition execnodes.h:1548
void * user_data
uint32 events
int GetNumRegisteredWaitEvents(WaitEventSet *set)
int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
void FreeWaitEventSet(WaitEventSet *set)
WaitEventSet * CreateWaitEventSet(ResourceOwner resowner, int nevents)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET

References AddWaitEventToSet(), AppendState::as_asyncplans, AppendState::as_asyncrequests, AppendState::as_eventset, AppendState::as_nasyncplans, AppendState::as_nasyncremain, AppendState::as_syncdone, Assert, bms_next_member(), CHECK_FOR_INTERRUPTS, CreateWaitEventSet(), CurrentResourceOwner, EVENT_BUFFER_SIZE, WaitEvent::events, ExecAsyncConfigureWait(), ExecAsyncNotify(), fb(), FreeWaitEventSet(), GetNumRegisteredWaitEvents(), i, MyLatch, PGINVALID_SOCKET, ResetLatch(), WaitEvent::user_data, WaitEventSetWait(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_SOCKET_READABLE.

Referenced by ExecAppend(), and ExecAppendAsyncGetNext().

◆ ExecAppendAsyncGetNext()

static bool ExecAppendAsyncGetNext ( AppendState node,
TupleTableSlot **  result 
)
static

Definition at line 932 of file nodeAppend.c.

933{
934 *result = NULL;
935
936 /* We should never be called when there are no valid async subplans. */
937 Assert(node->as_nasyncremain > 0);
938
939 /* Request a tuple asynchronously. */
941 return true;
942
943 while (node->as_nasyncremain > 0)
944 {
946
947 /* Wait or poll for async events. */
949
950 /* Request a tuple asynchronously. */
952 return true;
953
954 /* Break from loop if there's any sync subplan that isn't complete. */
955 if (!node->as_syncdone)
956 break;
957 }
958
959 /*
960 * If all sync subplans are complete, we're totally done scanning the
961 * given node. Otherwise, we're done with the asynchronous stuff but must
962 * continue scanning the sync subplans.
963 */
964 if (node->as_syncdone)
965 {
966 Assert(node->as_nasyncremain == 0);
968 return true;
969 }
970
971 return false;
972}
static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
Definition nodeAppend.c:981

References AppendState::as_nasyncremain, AppendState::as_syncdone, Assert, CHECK_FOR_INTERRUPTS, ExecAppendAsyncEventWait(), ExecAppendAsyncRequest(), ExecClearTuple(), fb(), AppendState::ps, PlanState::ps_ResultTupleSlot, and result.

Referenced by ExecAppend().

◆ ExecAppendAsyncRequest()

static bool ExecAppendAsyncRequest ( AppendState node,
TupleTableSlot **  result 
)
static

Definition at line 981 of file nodeAppend.c.

982{
984 int i;
985
986 /* Nothing to do if there are no async subplans needing a new request. */
987 if (bms_is_empty(node->as_needrequest))
988 {
989 Assert(node->as_nasyncresults == 0);
990 return false;
991 }
992
993 /*
994 * If there are any asynchronously-generated results that have not yet
995 * been returned, we have nothing to do; just return one of them.
996 */
997 if (node->as_nasyncresults > 0)
998 {
999 --node->as_nasyncresults;
1000 *result = node->as_asyncresults[node->as_nasyncresults];
1001 return true;
1002 }
1003
1004 /* Make a new request for each of the async subplans that need it. */
1006 node->as_needrequest = NULL;
1007 i = -1;
1008 while ((i = bms_next_member(needrequest, i)) >= 0)
1009 {
1011
1012 /* Do the actual work. */
1014 }
1016
1017 /* Return one of the asynchronously-generated results if any. */
1018 if (node->as_nasyncresults > 0)
1019 {
1020 --node->as_nasyncresults;
1021 *result = node->as_asyncresults[node->as_nasyncresults];
1022 return true;
1023 }
1024
1025 return false;
1026}
void bms_free(Bitmapset *a)
Definition bitmapset.c:239
int as_nasyncresults
Definition execnodes.h:1543
TupleTableSlot ** as_asyncresults
Definition execnodes.h:1542

References AppendState::as_asyncrequests, AppendState::as_asyncresults, AppendState::as_nasyncresults, AppendState::as_needrequest, Assert, bms_free(), bms_is_empty, bms_next_member(), ExecAsyncRequest(), fb(), i, and result.

Referenced by ExecAppendAsyncGetNext().

◆ ExecAppendEstimate()

void ExecAppendEstimate ( AppendState node,
ParallelContext pcxt 
)

Definition at line 502 of file nodeAppend.c.

504{
505 node->pstate_len =
507 sizeof(bool) * node->as_nplans);
508
511}
Size add_size(Size s1, Size s2)
Definition mcxt.c:1733
#define shm_toc_estimate_chunk(e, sz)
Definition shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition shm_toc.h:53
Size pstate_len
Definition execnodes.h:1553
shm_toc_estimator estimator
Definition parallel.h:43

References add_size(), AppendState::as_nplans, ParallelContext::estimator, fb(), AppendState::pstate_len, shm_toc_estimate_chunk, and shm_toc_estimate_keys.

Referenced by ExecParallelEstimate().

◆ ExecAppendInitializeDSM()

void ExecAppendInitializeDSM ( AppendState node,
ParallelContext pcxt 
)

Definition at line 521 of file nodeAppend.c.

523{
524 ParallelAppendState *pstate;
525
526 pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
527 memset(pstate, 0, node->pstate_len);
529 shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
530
531 node->as_pstate = pstate;
533}
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:670
static bool choose_next_subplan_for_leader(AppendState *node)
Definition nodeAppend.c:638
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition shm_toc.c:88
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition shm_toc.c:171
shm_toc * toc
Definition parallel.h:46
Plan * plan
Definition execnodes.h:1201
int plan_node_id
Definition plannodes.h:233

References AppendState::as_pstate, AppendState::choose_next_subplan, choose_next_subplan_for_leader(), fb(), LWLockInitialize(), ParallelAppendState::pa_lock, PlanState::plan, Plan::plan_node_id, AppendState::ps, AppendState::pstate_len, shm_toc_allocate(), shm_toc_insert(), and ParallelContext::toc.

Referenced by ExecParallelInitializeDSM().

◆ ExecAppendInitializeWorker()

void ExecAppendInitializeWorker ( AppendState node,
ParallelWorkerContext pwcxt 
)

Definition at line 558 of file nodeAppend.c.

559{
560 node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
562}
static bool choose_next_subplan_for_worker(AppendState *node)
Definition nodeAppend.c:720
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition shm_toc.c:239

References AppendState::as_pstate, AppendState::choose_next_subplan, choose_next_subplan_for_worker(), fb(), PlanState::plan, Plan::plan_node_id, AppendState::ps, and shm_toc_lookup().

Referenced by ExecParallelInitializeWorker().

◆ ExecAppendReInitializeDSM()

void ExecAppendReInitializeDSM ( AppendState node,
ParallelContext pcxt 
)

Definition at line 542 of file nodeAppend.c.

543{
544 ParallelAppendState *pstate = node->as_pstate;
545
546 pstate->pa_next_plan = 0;
547 memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
548}

References AppendState::as_nplans, AppendState::as_pstate, fb(), ParallelAppendState::pa_finished, and ParallelAppendState::pa_next_plan.

Referenced by ExecParallelReInitializeDSM().

◆ ExecAsyncAppendResponse()

void ExecAsyncAppendResponse ( AsyncRequest areq)

Definition at line 1145 of file nodeAppend.c.

1146{
1147 AppendState *node = (AppendState *) areq->requestor;
1148 TupleTableSlot *slot = areq->result;
1149
1150 /* The result should be a TupleTableSlot or NULL. */
1151 Assert(slot == NULL || IsA(slot, TupleTableSlot));
1152
1153 /* Nothing to do if the request is pending. */
1154 if (!areq->request_complete)
1155 {
1156 /* The request would have been pending for a callback. */
1157 Assert(areq->callback_pending);
1158 return;
1159 }
1160
1161 /* If the result is NULL or an empty slot, there's nothing more to do. */
1162 if (TupIsNull(slot))
1163 {
1164 /* The ending subplan wouldn't have been pending for a callback. */
1165 Assert(!areq->callback_pending);
1166 --node->as_nasyncremain;
1167 return;
1168 }
1169
1170 /* Save result so we can return it. */
1171 Assert(node->as_nasyncresults < node->as_nasyncplans);
1172 node->as_asyncresults[node->as_nasyncresults++] = slot;
1173
1174 /*
1175 * Mark the subplan that returned a result as ready for a new request. We
1176 * don't launch another one here immediately because it might complete.
1177 */
1178 node->as_needrequest = bms_add_member(node->as_needrequest,
1179 areq->request_index);
1180}
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition bitmapset.c:799
#define IsA(nodeptr, _type_)
Definition nodes.h:164

References AppendState::as_asyncresults, AppendState::as_nasyncplans, AppendState::as_nasyncremain, AppendState::as_nasyncresults, AppendState::as_needrequest, Assert, bms_add_member(), fb(), IsA, and TupIsNull.

Referenced by ExecAsyncResponse().

◆ ExecEndAppend()

void ExecEndAppend ( AppendState node)

Definition at line 404 of file nodeAppend.c.

405{
406 PlanState **appendplans;
407 int nplans;
408 int i;
409
410 /*
411 * get information from the node
412 */
413 appendplans = node->appendplans;
414 nplans = node->as_nplans;
415
416 /*
417 * shut down each of the subscans
418 */
419 for (i = 0; i < nplans; i++)
420 ExecEndNode(appendplans[i]);
421}
void ExecEndNode(PlanState *node)

References AppendState::appendplans, AppendState::as_nplans, ExecEndNode(), and i.

Referenced by ExecEndNode().

◆ ExecInitAppend()

AppendState * ExecInitAppend ( Append node,
EState estate,
int  eflags 
)

Definition at line 112 of file nodeAppend.c.

113{
119 int nplans;
120 int nasyncplans;
121 int firstvalid;
122 int i,
123 j;
124
125 /* check for unsupported flags */
126 Assert(!(eflags & EXEC_FLAG_MARK));
127
128 /*
129 * create new AppendState for our append node
130 */
131 appendstate->ps.plan = (Plan *) node;
132 appendstate->ps.state = estate;
133 appendstate->ps.ExecProcNode = ExecAppend;
134
135 /* Let choose_next_subplan_* function handle setting the first subplan */
136 appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
137 appendstate->as_syncdone = false;
138 appendstate->as_begun = false;
139
140 /* If run-time partition pruning is enabled, then set that up now */
141 if (node->part_prune_index >= 0)
142 {
144
145 /*
146 * Set up pruning data structure. This also initializes the set of
147 * subplans to initialize (validsubplans) by taking into account the
148 * result of performing initial pruning if any.
149 */
152 node->part_prune_index,
153 node->apprelids,
155 appendstate->as_prune_state = prunestate;
157
158 /*
159 * When no run-time pruning is required and there's at least one
160 * subplan, we can fill as_valid_subplans immediately, preventing
161 * later calls to ExecFindMatchingSubPlans.
162 */
163 if (!prunestate->do_exec_prune && nplans > 0)
164 {
165 appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
166 appendstate->as_valid_subplans_identified = true;
167 }
168 }
169 else
170 {
171 nplans = list_length(node->appendplans);
172
173 /*
174 * When run-time partition pruning is not enabled we can just mark all
175 * subplans as valid; they must also all be initialized.
176 */
177 Assert(nplans > 0);
178 appendstate->as_valid_subplans = validsubplans =
179 bms_add_range(NULL, 0, nplans - 1);
180 appendstate->as_valid_subplans_identified = true;
181 appendstate->as_prune_state = NULL;
182 }
183
184 appendplanstates = (PlanState **) palloc(nplans *
185 sizeof(PlanState *));
186
187 /*
188 * call ExecInitNode on each of the valid plans to be executed and save
189 * the results into the appendplanstates array.
190 *
191 * While at it, find out the first valid partial plan.
192 */
193 j = 0;
195 nasyncplans = 0;
196 firstvalid = nplans;
197 i = -1;
198 while ((i = bms_next_member(validsubplans, i)) >= 0)
199 {
200 Plan *initNode = (Plan *) list_nth(node->appendplans, i);
201
202 /*
203 * Record async subplans. When executing EvalPlanQual, we treat them
204 * as sync ones; don't do this when initializing an EvalPlanQual plan
205 * tree.
206 */
207 if (initNode->async_capable && estate->es_epq_active == NULL)
208 {
210 nasyncplans++;
211 }
212
213 /*
214 * Record the lowest appendplans index which is a valid partial plan.
215 */
216 if (i >= node->first_partial_plan && j < firstvalid)
217 firstvalid = j;
218
219 appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
220 }
221
222 appendstate->as_first_partial_plan = firstvalid;
223 appendstate->appendplans = appendplanstates;
224 appendstate->as_nplans = nplans;
225
226 /*
227 * Initialize Append's result tuple type and slot. If the child plans all
228 * produce the same fixed slot type, we can use that slot type; otherwise
229 * make a virtual slot. (Note that the result slot itself is used only to
230 * return a null tuple at end of execution; real tuples are returned to
231 * the caller in the children's own result slots. What we are doing here
232 * is allowing the parent plan node to optimize if the Append will return
233 * only one kind of slot.)
234 */
236 if (appendops != NULL)
237 {
239 }
240 else
241 {
243 /* show that the output slot type is not fixed */
244 appendstate->ps.resultopsset = true;
245 appendstate->ps.resultopsfixed = false;
246 }
247
248 /* Initialize async state */
249 appendstate->as_asyncplans = asyncplans;
250 appendstate->as_nasyncplans = nasyncplans;
251 appendstate->as_asyncrequests = NULL;
252 appendstate->as_asyncresults = NULL;
253 appendstate->as_nasyncresults = 0;
254 appendstate->as_nasyncremain = 0;
255 appendstate->as_needrequest = NULL;
256 appendstate->as_eventset = NULL;
257 appendstate->as_valid_asyncplans = NULL;
258
259 if (nasyncplans > 0)
260 {
261 appendstate->as_asyncrequests = (AsyncRequest **)
262 palloc0(nplans * sizeof(AsyncRequest *));
263
264 i = -1;
265 while ((i = bms_next_member(asyncplans, i)) >= 0)
266 {
268
270 areq->requestor = (PlanState *) appendstate;
271 areq->requestee = appendplanstates[i];
272 areq->request_index = i;
273 areq->callback_pending = false;
274 areq->request_complete = false;
275 areq->result = NULL;
276
277 appendstate->as_asyncrequests[i] = areq;
278 }
279
280 appendstate->as_asyncresults = (TupleTableSlot **)
281 palloc0(nasyncplans * sizeof(TupleTableSlot *));
282
283 if (appendstate->as_valid_subplans_identified)
285 }
286
287 /*
288 * Miscellaneous initialization
289 */
290
291 appendstate->ps.ps_ProjInfo = NULL;
292
293 /* For parallel query, this will be overridden later. */
294 appendstate->choose_next_subplan = choose_next_subplan_locally;
295
296 return appendstate;
297}
Bitmapset * bms_add_range(Bitmapset *a, int lower, int upper)
Definition bitmapset.c:1003
PartitionPruneState * ExecInitPartitionExecPruning(PlanState *planstate, int n_total_subplans, int part_prune_index, Bitmapset *relids, Bitmapset **initially_valid_subplans)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
const TupleTableSlotOps TTSOpsVirtual
Definition execTuples.c:84
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps * ExecGetCommonSlotOps(PlanState **planstates, int nplans)
Definition execUtils.c:541
#define EXEC_FLAG_MARK
Definition executor.h:71
#define palloc_object(type)
Definition fe_memutils.h:89
int j
Definition isn.c:78
void * palloc0(Size size)
Definition mcxt.c:1420
void * palloc(Size size)
Definition mcxt.c:1390
static TupleTableSlot * ExecAppend(PlanState *pstate)
Definition nodeAppend.c:306
static bool choose_next_subplan_locally(AppendState *node)
Definition nodeAppend.c:572
#define makeNode(_type_)
Definition nodes.h:161
static int list_length(const List *l)
Definition pg_list.h:152
static void * list_nth(const List *list, int n)
Definition pg_list.h:331
int first_partial_plan
Definition plannodes.h:418
int part_prune_index
Definition plannodes.h:425
Bitmapset * apprelids
Definition plannodes.h:403
List * appendplans
Definition plannodes.h:409
struct EPQState * es_epq_active
Definition execnodes.h:778

References Append::appendplans, Append::apprelids, Assert, bms_add_member(), bms_add_range(), bms_next_member(), bms_num_members(), choose_next_subplan_locally(), classify_matching_subplans(), EState::es_epq_active, EXEC_FLAG_MARK, ExecAppend(), ExecGetCommonSlotOps(), ExecInitNode(), ExecInitPartitionExecPruning(), ExecInitResultTupleSlotTL(), fb(), Append::first_partial_plan, i, INVALID_SUBPLAN_INDEX, j, list_length(), list_nth(), makeNode, palloc(), palloc0(), palloc_object, Append::part_prune_index, and TTSOpsVirtual.

Referenced by ExecInitNode().

◆ ExecReScanAppend()

void ExecReScanAppend ( AppendState node)

Definition at line 424 of file nodeAppend.c.

425{
426 int nasyncplans = node->as_nasyncplans;
427 int i;
428
429 /*
430 * If any PARAM_EXEC Params used in pruning expressions have changed, then
431 * we'd better unset the valid subplans so that they are reselected for
432 * the new parameter values.
433 */
434 if (node->as_prune_state &&
435 bms_overlap(node->ps.chgParam,
437 {
438 node->as_valid_subplans_identified = false;
440 node->as_valid_subplans = NULL;
443 }
444
445 for (i = 0; i < node->as_nplans; i++)
446 {
447 PlanState *subnode = node->appendplans[i];
448
449 /*
450 * ExecReScan doesn't know about my subplans, so I have to do
451 * changed-parameter signaling myself.
452 */
453 if (node->ps.chgParam != NULL)
455
456 /*
457 * If chgParam of subnode is not null then plan will be re-scanned by
458 * first ExecProcNode or by first ExecAsyncRequest.
459 */
460 if (subnode->chgParam == NULL)
462 }
463
464 /* Reset async state */
465 if (nasyncplans > 0)
466 {
467 i = -1;
468 while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
469 {
471
472 areq->callback_pending = false;
473 areq->request_complete = false;
474 areq->result = NULL;
475 }
476
477 node->as_nasyncresults = 0;
478 node->as_nasyncremain = 0;
480 node->as_needrequest = NULL;
481 }
482
483 /* Let choose_next_subplan_* function handle setting the first subplan */
485 node->as_syncdone = false;
486 node->as_begun = false;
487}
void ExecReScan(PlanState *node)
Definition execAmi.c:78
void UpdateChangedParamSet(PlanState *node, Bitmapset *newchg)
Definition execUtils.c:936
bool callback_pending
Definition execnodes.h:678
Bitmapset * execparamids
Bitmapset * chgParam
Definition execnodes.h:1235

References AppendState::appendplans, AppendState::as_asyncplans, AppendState::as_asyncrequests, AppendState::as_begun, AppendState::as_nasyncplans, AppendState::as_nasyncremain, AppendState::as_nasyncresults, AppendState::as_needrequest, AppendState::as_nplans, AppendState::as_prune_state, AppendState::as_syncdone, AppendState::as_valid_asyncplans, AppendState::as_valid_subplans, AppendState::as_valid_subplans_identified, AppendState::as_whichplan, bms_free(), bms_next_member(), bms_overlap(), AsyncRequest::callback_pending, PlanState::chgParam, PartitionPruneState::execparamids, ExecReScan(), fb(), i, INVALID_SUBPLAN_INDEX, AppendState::ps, and UpdateChangedParamSet().

Referenced by ExecReScan().

◆ mark_invalid_subplans_as_finished()

static void mark_invalid_subplans_as_finished ( AppendState node)
static

Definition at line 846 of file nodeAppend.c.

847{
848 int i;
849
850 /* Only valid to call this while in parallel Append mode */
851 Assert(node->as_pstate);
852
853 /* Shouldn't have been called when run-time pruning is not enabled */
854 Assert(node->as_prune_state);
855
856 /* Nothing to do if all plans are valid */
857 if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
858 return;
859
860 /* Mark all non-valid plans as finished */
861 for (i = 0; i < node->as_nplans; i++)
862 {
863 if (!bms_is_member(i, node->as_valid_subplans))
864 node->as_pstate->pa_finished[i] = true;
865 }
866}
bool bms_is_member(int x, const Bitmapset *a)
Definition bitmapset.c:510

References AppendState::as_nplans, AppendState::as_prune_state, AppendState::as_pstate, AppendState::as_valid_subplans, Assert, bms_is_member(), bms_num_members(), i, and ParallelAppendState::pa_finished.

Referenced by choose_next_subplan_for_leader(), and choose_next_subplan_for_worker().