PostgreSQL Source Code  git master
nodeAppend.c File Reference
#include "postgres.h"
#include "executor/execAsync.h"
#include "executor/execdebug.h"
#include "executor/execPartition.h"
#include "executor/nodeAppend.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/latch.h"
Include dependency graph for nodeAppend.c:

Go to the source code of this file.

Data Structures

struct  ParallelAppendState
 

Macros

#define INVALID_SUBPLAN_INDEX   -1
 
#define EVENT_BUFFER_SIZE   16
 

Functions

static TupleTableSlotExecAppend (PlanState *pstate)
 
static bool choose_next_subplan_locally (AppendState *node)
 
static bool choose_next_subplan_for_leader (AppendState *node)
 
static bool choose_next_subplan_for_worker (AppendState *node)
 
static void mark_invalid_subplans_as_finished (AppendState *node)
 
static void ExecAppendAsyncBegin (AppendState *node)
 
static bool ExecAppendAsyncGetNext (AppendState *node, TupleTableSlot **result)
 
static bool ExecAppendAsyncRequest (AppendState *node, TupleTableSlot **result)
 
static void ExecAppendAsyncEventWait (AppendState *node)
 
static void classify_matching_subplans (AppendState *node)
 
AppendStateExecInitAppend (Append *node, EState *estate, int eflags)
 
void ExecEndAppend (AppendState *node)
 
void ExecReScanAppend (AppendState *node)
 
void ExecAppendEstimate (AppendState *node, ParallelContext *pcxt)
 
void ExecAppendInitializeDSM (AppendState *node, ParallelContext *pcxt)
 
void ExecAppendReInitializeDSM (AppendState *node, ParallelContext *pcxt)
 
void ExecAppendInitializeWorker (AppendState *node, ParallelWorkerContext *pwcxt)
 
void ExecAsyncAppendResponse (AsyncRequest *areq)
 

Macro Definition Documentation

◆ EVENT_BUFFER_SIZE

#define EVENT_BUFFER_SIZE   16

Definition at line 84 of file nodeAppend.c.

◆ INVALID_SUBPLAN_INDEX

#define INVALID_SUBPLAN_INDEX   -1

Definition at line 83 of file nodeAppend.c.

Function Documentation

◆ choose_next_subplan_for_leader()

static bool choose_next_subplan_for_leader ( AppendState node)
static

Definition at line 620 of file nodeAppend.c.

621 {
622  ParallelAppendState *pstate = node->as_pstate;
623 
624  /* Backward scan is not supported by parallel-aware plans */
626 
627  /* We should never be called when there are no subplans */
628  Assert(node->as_nplans > 0);
629 
631 
632  if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
633  {
634  /* Mark just-completed subplan as finished. */
635  node->as_pstate->pa_finished[node->as_whichplan] = true;
636  }
637  else
638  {
639  /* Start with last subplan. */
640  node->as_whichplan = node->as_nplans - 1;
641 
642  /*
643  * If we've yet to determine the valid subplans then do so now. If
644  * run-time pruning is disabled then the valid subplans will always be
645  * set to all subplans.
646  */
647  if (!node->as_valid_subplans_identified)
648  {
649  node->as_valid_subplans =
651  node->as_valid_subplans_identified = true;
652 
653  /*
654  * Mark each invalid plan as finished to allow the loop below to
655  * select the first valid subplan.
656  */
658  }
659  }
660 
661  /* Loop until we find a subplan to execute. */
662  while (pstate->pa_finished[node->as_whichplan])
663  {
664  if (node->as_whichplan == 0)
665  {
668  LWLockRelease(&pstate->pa_lock);
669  return false;
670  }
671 
672  /*
673  * We needn't pay attention to as_valid_subplans here as all invalid
674  * plans have been marked as finished.
675  */
676  node->as_whichplan--;
677  }
678 
679  /* If non-partial, immediately mark as finished. */
680  if (node->as_whichplan < node->as_first_partial_plan)
681  node->as_pstate->pa_finished[node->as_whichplan] = true;
682 
683  LWLockRelease(&pstate->pa_lock);
684 
685  return true;
686 }
Bitmapset * ExecFindMatchingSubPlans(PartitionPruneState *prunestate, bool initial_prune)
Assert(fmt[strlen(fmt) - 1] !='\n')
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
@ LW_EXCLUSIVE
Definition: lwlock.h:116
static void mark_invalid_subplans_as_finished(AppendState *node)
Definition: nodeAppend.c:828
#define INVALID_SUBPLAN_INDEX
Definition: nodeAppend.c:83
#define ScanDirectionIsForward(direction)
Definition: sdir.h:64
struct PartitionPruneState * as_prune_state
Definition: execnodes.h:1369
int as_whichplan
Definition: execnodes.h:1352
int as_first_partial_plan
Definition: execnodes.h:1365
PlanState ps
Definition: execnodes.h:1349
ParallelAppendState * as_pstate
Definition: execnodes.h:1367
Bitmapset * as_valid_subplans
Definition: execnodes.h:1371
bool as_valid_subplans_identified
Definition: execnodes.h:1370
ScanDirection es_direction
Definition: execnodes.h:615
bool pa_finished[FLEXIBLE_ARRAY_MEMBER]
Definition: nodeAppend.c:80
EState * state
Definition: execnodes.h:1039

References AppendState::as_first_partial_plan, AppendState::as_nplans, AppendState::as_prune_state, AppendState::as_pstate, AppendState::as_valid_subplans, AppendState::as_valid_subplans_identified, AppendState::as_whichplan, Assert(), EState::es_direction, ExecFindMatchingSubPlans(), INVALID_SUBPLAN_INDEX, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), mark_invalid_subplans_as_finished(), ParallelAppendState::pa_finished, ParallelAppendState::pa_lock, ParallelAppendState::pa_next_plan, AppendState::ps, ScanDirectionIsForward, and PlanState::state.

Referenced by ExecAppendInitializeDSM().

◆ choose_next_subplan_for_worker()

static bool choose_next_subplan_for_worker ( AppendState node)
static

Definition at line 702 of file nodeAppend.c.

703 {
704  ParallelAppendState *pstate = node->as_pstate;
705 
706  /* Backward scan is not supported by parallel-aware plans */
708 
709  /* We should never be called when there are no subplans */
710  Assert(node->as_nplans > 0);
711 
713 
714  /* Mark just-completed subplan as finished. */
715  if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
716  node->as_pstate->pa_finished[node->as_whichplan] = true;
717 
718  /*
719  * If we've yet to determine the valid subplans then do so now. If
720  * run-time pruning is disabled then the valid subplans will always be set
721  * to all subplans.
722  */
723  else if (!node->as_valid_subplans_identified)
724  {
725  node->as_valid_subplans =
727  node->as_valid_subplans_identified = true;
728 
730  }
731 
732  /* If all the plans are already done, we have nothing to do */
733  if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
734  {
735  LWLockRelease(&pstate->pa_lock);
736  return false;
737  }
738 
739  /* Save the plan from which we are starting the search. */
740  node->as_whichplan = pstate->pa_next_plan;
741 
742  /* Loop until we find a valid subplan to execute. */
743  while (pstate->pa_finished[pstate->pa_next_plan])
744  {
745  int nextplan;
746 
747  nextplan = bms_next_member(node->as_valid_subplans,
748  pstate->pa_next_plan);
749  if (nextplan >= 0)
750  {
751  /* Advance to the next valid plan. */
752  pstate->pa_next_plan = nextplan;
753  }
754  else if (node->as_whichplan > node->as_first_partial_plan)
755  {
756  /*
757  * Try looping back to the first valid partial plan, if there is
758  * one. If there isn't, arrange to bail out below.
759  */
760  nextplan = bms_next_member(node->as_valid_subplans,
761  node->as_first_partial_plan - 1);
762  pstate->pa_next_plan =
763  nextplan < 0 ? node->as_whichplan : nextplan;
764  }
765  else
766  {
767  /*
768  * At last plan, and either there are no partial plans or we've
769  * tried them all. Arrange to bail out.
770  */
771  pstate->pa_next_plan = node->as_whichplan;
772  }
773 
774  if (pstate->pa_next_plan == node->as_whichplan)
775  {
776  /* We've tried everything! */
778  LWLockRelease(&pstate->pa_lock);
779  return false;
780  }
781  }
782 
783  /* Pick the plan we found, and advance pa_next_plan one more time. */
784  node->as_whichplan = pstate->pa_next_plan;
786  pstate->pa_next_plan);
787 
788  /*
789  * If there are no more valid plans then try setting the next plan to the
790  * first valid partial plan.
791  */
792  if (pstate->pa_next_plan < 0)
793  {
794  int nextplan = bms_next_member(node->as_valid_subplans,
795  node->as_first_partial_plan - 1);
796 
797  if (nextplan >= 0)
798  pstate->pa_next_plan = nextplan;
799  else
800  {
801  /*
802  * There are no valid partial plans, and we already chose the last
803  * non-partial plan; so flag that there's nothing more for our
804  * fellow workers to do.
805  */
807  }
808  }
809 
810  /* If non-partial, immediately mark as finished. */
811  if (node->as_whichplan < node->as_first_partial_plan)
812  node->as_pstate->pa_finished[node->as_whichplan] = true;
813 
814  LWLockRelease(&pstate->pa_lock);
815 
816  return true;
817 }
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1039

References AppendState::as_first_partial_plan, AppendState::as_nplans, AppendState::as_prune_state, AppendState::as_pstate, AppendState::as_valid_subplans, AppendState::as_valid_subplans_identified, AppendState::as_whichplan, Assert(), bms_next_member(), EState::es_direction, ExecFindMatchingSubPlans(), INVALID_SUBPLAN_INDEX, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), mark_invalid_subplans_as_finished(), ParallelAppendState::pa_finished, ParallelAppendState::pa_lock, ParallelAppendState::pa_next_plan, AppendState::ps, ScanDirectionIsForward, and PlanState::state.

Referenced by ExecAppendInitializeWorker().

◆ choose_next_subplan_locally()

static bool choose_next_subplan_locally ( AppendState node)
static

Definition at line 554 of file nodeAppend.c.

555 {
556  int whichplan = node->as_whichplan;
557  int nextplan;
558 
559  /* We should never be called when there are no subplans */
560  Assert(node->as_nplans > 0);
561 
562  /* Nothing to do if syncdone */
563  if (node->as_syncdone)
564  return false;
565 
566  /*
567  * If first call then have the bms member function choose the first valid
568  * sync subplan by initializing whichplan to -1. If there happen to be no
569  * valid sync subplans then the bms member function will handle that by
570  * returning a negative number which will allow us to exit returning a
571  * false value.
572  */
573  if (whichplan == INVALID_SUBPLAN_INDEX)
574  {
575  if (node->as_nasyncplans > 0)
576  {
577  /* We'd have filled as_valid_subplans already */
579  }
580  else if (!node->as_valid_subplans_identified)
581  {
582  node->as_valid_subplans =
584  node->as_valid_subplans_identified = true;
585  }
586 
587  whichplan = -1;
588  }
589 
590  /* Ensure whichplan is within the expected range */
591  Assert(whichplan >= -1 && whichplan <= node->as_nplans);
592 
594  nextplan = bms_next_member(node->as_valid_subplans, whichplan);
595  else
596  nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
597 
598  if (nextplan < 0)
599  {
600  /* Set as_syncdone if in async mode */
601  if (node->as_nasyncplans > 0)
602  node->as_syncdone = true;
603  return false;
604  }
605 
606  node->as_whichplan = nextplan;
607 
608  return true;
609 }
int bms_prev_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1098
bool as_syncdone
Definition: execnodes.h:1359
int as_nasyncplans
Definition: execnodes.h:1355

References AppendState::as_nasyncplans, AppendState::as_nplans, AppendState::as_prune_state, AppendState::as_syncdone, AppendState::as_valid_subplans, AppendState::as_valid_subplans_identified, AppendState::as_whichplan, Assert(), bms_next_member(), bms_prev_member(), EState::es_direction, ExecFindMatchingSubPlans(), INVALID_SUBPLAN_INDEX, AppendState::ps, ScanDirectionIsForward, and PlanState::state.

Referenced by ExecInitAppend().

◆ classify_matching_subplans()

static void classify_matching_subplans ( AppendState node)
static

Definition at line 1150 of file nodeAppend.c.

1151 {
1152  Bitmapset *valid_asyncplans;
1153 
1155  Assert(node->as_valid_asyncplans == NULL);
1156 
1157  /* Nothing to do if there are no valid subplans. */
1158  if (bms_is_empty(node->as_valid_subplans))
1159  {
1160  node->as_syncdone = true;
1161  node->as_nasyncremain = 0;
1162  return;
1163  }
1164 
1165  /* Nothing to do if there are no valid async subplans. */
1166  if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1167  {
1168  node->as_nasyncremain = 0;
1169  return;
1170  }
1171 
1172  /* Get valid async subplans. */
1173  valid_asyncplans = bms_intersect(node->as_asyncplans,
1174  node->as_valid_subplans);
1175 
1176  /* Adjust the valid subplans to contain sync subplans only. */
1178  valid_asyncplans);
1179 
1180  /* Save valid async subplans. */
1181  node->as_valid_asyncplans = valid_asyncplans;
1182 }
Bitmapset * bms_intersect(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:260
Bitmapset * bms_del_members(Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:960
bool bms_overlap(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:511
#define bms_is_empty(a)
Definition: bitmapset.h:105
Bitmapset * as_valid_asyncplans
Definition: execnodes.h:1372
Bitmapset * as_asyncplans
Definition: execnodes.h:1354
int as_nasyncremain
Definition: execnodes.h:1361

References AppendState::as_asyncplans, AppendState::as_nasyncremain, AppendState::as_syncdone, AppendState::as_valid_asyncplans, AppendState::as_valid_subplans, AppendState::as_valid_subplans_identified, Assert(), bms_del_members(), bms_intersect(), bms_is_empty, and bms_overlap().

Referenced by ExecAppendAsyncBegin(), and ExecInitAppend().

◆ ExecAppend()

static TupleTableSlot * ExecAppend ( PlanState pstate)
static

Definition at line 288 of file nodeAppend.c.

289 {
290  AppendState *node = castNode(AppendState, pstate);
291  TupleTableSlot *result;
292 
293  /*
294  * If this is the first call after Init or ReScan, we need to do the
295  * initialization work.
296  */
297  if (!node->as_begun)
298  {
300  Assert(!node->as_syncdone);
301 
302  /* Nothing to do if there are no subplans */
303  if (node->as_nplans == 0)
304  return ExecClearTuple(node->ps.ps_ResultTupleSlot);
305 
306  /* If there are any async subplans, begin executing them. */
307  if (node->as_nasyncplans > 0)
308  ExecAppendAsyncBegin(node);
309 
310  /*
311  * If no sync subplan has been chosen, we must choose one before
312  * proceeding.
313  */
314  if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
315  return ExecClearTuple(node->ps.ps_ResultTupleSlot);
316 
317  Assert(node->as_syncdone ||
318  (node->as_whichplan >= 0 &&
319  node->as_whichplan < node->as_nplans));
320 
321  /* And we're initialized. */
322  node->as_begun = true;
323  }
324 
325  for (;;)
326  {
327  PlanState *subnode;
328 
330 
331  /*
332  * try to get a tuple from an async subplan if any
333  */
334  if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
335  {
336  if (ExecAppendAsyncGetNext(node, &result))
337  return result;
338  Assert(!node->as_syncdone);
340  }
341 
342  /*
343  * figure out which sync subplan we are currently processing
344  */
345  Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
346  subnode = node->appendplans[node->as_whichplan];
347 
348  /*
349  * get a tuple from the subplan
350  */
351  result = ExecProcNode(subnode);
352 
353  if (!TupIsNull(result))
354  {
355  /*
356  * If the subplan gave us something then return it as-is. We do
357  * NOT make use of the result slot that was set up in
358  * ExecInitAppend; there's no need for it.
359  */
360  return result;
361  }
362 
363  /*
364  * wait or poll for async events if any. We do this before checking
365  * for the end of iteration, because it might drain the remaining
366  * async subplans.
367  */
368  if (node->as_nasyncremain > 0)
370 
371  /* choose new sync subplan; if no sync/async subplans, we're done */
372  if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
373  return ExecClearTuple(node->ps.ps_ResultTupleSlot);
374  }
375 }
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:268
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
static void ExecAppendAsyncBegin(AppendState *node)
Definition: nodeAppend.c:862
static void ExecAppendAsyncEventWait(AppendState *node)
Definition: nodeAppend.c:1017
static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
Definition: nodeAppend.c:914
#define castNode(_type_, nodeptr)
Definition: nodes.h:197
Bitmapset * as_needrequest
Definition: execnodes.h:1362
bool as_begun
Definition: execnodes.h:1353
bool(* choose_next_subplan)(AppendState *)
Definition: execnodes.h:1373
PlanState ** appendplans
Definition: execnodes.h:1350
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:1075
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:432
#define TupIsNull(slot)
Definition: tuptable.h:299

References AppendState::appendplans, AppendState::as_begun, AppendState::as_nasyncplans, AppendState::as_nasyncremain, AppendState::as_needrequest, AppendState::as_nplans, AppendState::as_syncdone, AppendState::as_whichplan, Assert(), bms_is_empty, castNode, CHECK_FOR_INTERRUPTS, AppendState::choose_next_subplan, ExecAppendAsyncBegin(), ExecAppendAsyncEventWait(), ExecAppendAsyncGetNext(), ExecClearTuple(), ExecProcNode(), INVALID_SUBPLAN_INDEX, AppendState::ps, PlanState::ps_ResultTupleSlot, and TupIsNull.

Referenced by ExecInitAppend().

◆ ExecAppendAsyncBegin()

static void ExecAppendAsyncBegin ( AppendState node)
static

Definition at line 862 of file nodeAppend.c.

863 {
864  int i;
865 
866  /* Backward scan is not supported by async-aware Appends. */
868 
869  /* We should never be called when there are no subplans */
870  Assert(node->as_nplans > 0);
871 
872  /* We should never be called when there are no async subplans. */
873  Assert(node->as_nasyncplans > 0);
874 
875  /* If we've yet to determine the valid subplans then do so now. */
876  if (!node->as_valid_subplans_identified)
877  {
878  node->as_valid_subplans =
880  node->as_valid_subplans_identified = true;
881 
883  }
884 
885  /* Initialize state variables. */
888 
889  /* Nothing to do if there are no valid async subplans. */
890  if (node->as_nasyncremain == 0)
891  return;
892 
893  /* Make a request for each of the valid async subplans. */
894  i = -1;
895  while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
896  {
897  AsyncRequest *areq = node->as_asyncrequests[i];
898 
899  Assert(areq->request_index == i);
900  Assert(!areq->callback_pending);
901 
902  /* Do the actual work. */
903  ExecAsyncRequest(areq);
904  }
905 }
int bms_num_members(const Bitmapset *a)
Definition: bitmapset.c:665
void ExecAsyncRequest(AsyncRequest *areq)
Definition: execAsync.c:26
int i
Definition: isn.c:73
static void classify_matching_subplans(AppendState *node)
Definition: nodeAppend.c:1150
AsyncRequest ** as_asyncrequests
Definition: execnodes.h:1356
int request_index
Definition: execnodes.h:597
bool callback_pending
Definition: execnodes.h:598

References AppendState::as_asyncrequests, AppendState::as_nasyncplans, AppendState::as_nasyncremain, AppendState::as_nplans, AppendState::as_prune_state, AppendState::as_syncdone, AppendState::as_valid_asyncplans, AppendState::as_valid_subplans, AppendState::as_valid_subplans_identified, Assert(), bms_is_empty, bms_next_member(), bms_num_members(), AsyncRequest::callback_pending, classify_matching_subplans(), EState::es_direction, ExecAsyncRequest(), ExecFindMatchingSubPlans(), i, AppendState::ps, AsyncRequest::request_index, ScanDirectionIsForward, and PlanState::state.

Referenced by ExecAppend().

◆ ExecAppendAsyncEventWait()

static void ExecAppendAsyncEventWait ( AppendState node)
static

Definition at line 1017 of file nodeAppend.c.

1018 {
1019  int nevents = node->as_nasyncplans + 1;
1020  long timeout = node->as_syncdone ? -1 : 0;
1021  WaitEvent occurred_event[EVENT_BUFFER_SIZE];
1022  int noccurred;
1023  int i;
1024 
1025  /* We should never be called when there are no valid async subplans. */
1026  Assert(node->as_nasyncremain > 0);
1027 
1030  NULL, NULL);
1031 
1032  /* Give each waiting subplan a chance to add an event. */
1033  i = -1;
1034  while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1035  {
1036  AsyncRequest *areq = node->as_asyncrequests[i];
1037 
1038  if (areq->callback_pending)
1039  ExecAsyncConfigureWait(areq);
1040  }
1041 
1042  /*
1043  * No need for further processing if there are no configured events other
1044  * than the postmaster death event.
1045  */
1046  if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1047  {
1049  node->as_eventset = NULL;
1050  return;
1051  }
1052 
1053  /* We wait on at most EVENT_BUFFER_SIZE events. */
1054  if (nevents > EVENT_BUFFER_SIZE)
1055  nevents = EVENT_BUFFER_SIZE;
1056 
1057  /*
1058  * If the timeout is -1, wait until at least one event occurs. If the
1059  * timeout is 0, poll for events, but do not wait at all.
1060  */
1061  noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1062  nevents, WAIT_EVENT_APPEND_READY);
1064  node->as_eventset = NULL;
1065  if (noccurred == 0)
1066  return;
1067 
1068  /* Deliver notifications. */
1069  for (i = 0; i < noccurred; i++)
1070  {
1071  WaitEvent *w = &occurred_event[i];
1072 
1073  /*
1074  * Each waiting subplan should have registered its wait event with
1075  * user_data pointing back to its AsyncRequest.
1076  */
1077  if ((w->events & WL_SOCKET_READABLE) != 0)
1078  {
1079  AsyncRequest *areq = (AsyncRequest *) w->user_data;
1080 
1081  if (areq->callback_pending)
1082  {
1083  /*
1084  * Mark it as no longer needing a callback. We must do this
1085  * before dispatching the callback in case the callback resets
1086  * the flag.
1087  */
1088  areq->callback_pending = false;
1089 
1090  /* Do the actual work. */
1091  ExecAsyncNotify(areq);
1092  }
1093  }
1094  }
1095 }
void ExecAsyncConfigureWait(AsyncRequest *areq)
Definition: execAsync.c:62
void ExecAsyncNotify(AsyncRequest *areq)
Definition: execAsync.c:88
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
int GetNumRegisteredWaitEvents(WaitEventSet *set)
Definition: latch.c:2153
WaitEventSet * CreateWaitEventSet(MemoryContext context, int nevents)
Definition: latch.c:723
int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)
Definition: latch.c:922
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1383
void FreeWaitEventSet(WaitEventSet *set)
Definition: latch.c:837
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
#define EVENT_BUFFER_SIZE
Definition: nodeAppend.c:84
#define PGINVALID_SOCKET
Definition: port.h:31
struct WaitEventSet * as_eventset
Definition: execnodes.h:1363
void * user_data
Definition: latch.h:155
uint32 events
Definition: latch.h:153
@ WAIT_EVENT_APPEND_READY
Definition: wait_event.h:82

References AddWaitEventToSet(), AppendState::as_asyncplans, AppendState::as_asyncrequests, AppendState::as_eventset, AppendState::as_nasyncplans, AppendState::as_nasyncremain, AppendState::as_syncdone, Assert(), bms_next_member(), AsyncRequest::callback_pending, CreateWaitEventSet(), CurrentMemoryContext, EVENT_BUFFER_SIZE, WaitEvent::events, ExecAsyncConfigureWait(), ExecAsyncNotify(), FreeWaitEventSet(), GetNumRegisteredWaitEvents(), i, if(), PGINVALID_SOCKET, WaitEvent::user_data, WAIT_EVENT_APPEND_READY, WaitEventSetWait(), WL_EXIT_ON_PM_DEATH, and WL_SOCKET_READABLE.

Referenced by ExecAppend(), and ExecAppendAsyncGetNext().

◆ ExecAppendAsyncGetNext()

static bool ExecAppendAsyncGetNext ( AppendState node,
TupleTableSlot **  result 
)
static

Definition at line 914 of file nodeAppend.c.

915 {
916  *result = NULL;
917 
918  /* We should never be called when there are no valid async subplans. */
919  Assert(node->as_nasyncremain > 0);
920 
921  /* Request a tuple asynchronously. */
922  if (ExecAppendAsyncRequest(node, result))
923  return true;
924 
925  while (node->as_nasyncremain > 0)
926  {
928 
929  /* Wait or poll for async events. */
931 
932  /* Request a tuple asynchronously. */
933  if (ExecAppendAsyncRequest(node, result))
934  return true;
935 
936  /* Break from loop if there's any sync subplan that isn't complete. */
937  if (!node->as_syncdone)
938  break;
939  }
940 
941  /*
942  * If all sync subplans are complete, we're totally done scanning the
943  * given node. Otherwise, we're done with the asynchronous stuff but must
944  * continue scanning the sync subplans.
945  */
946  if (node->as_syncdone)
947  {
948  Assert(node->as_nasyncremain == 0);
949  *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
950  return true;
951  }
952 
953  return false;
954 }
static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
Definition: nodeAppend.c:963

References AppendState::as_nasyncremain, AppendState::as_syncdone, Assert(), CHECK_FOR_INTERRUPTS, ExecAppendAsyncEventWait(), ExecAppendAsyncRequest(), ExecClearTuple(), AppendState::ps, and PlanState::ps_ResultTupleSlot.

Referenced by ExecAppend().

◆ ExecAppendAsyncRequest()

static bool ExecAppendAsyncRequest ( AppendState node,
TupleTableSlot **  result 
)
static

Definition at line 963 of file nodeAppend.c.

964 {
965  Bitmapset *needrequest;
966  int i;
967 
968  /* Nothing to do if there are no async subplans needing a new request. */
969  if (bms_is_empty(node->as_needrequest))
970  {
971  Assert(node->as_nasyncresults == 0);
972  return false;
973  }
974 
975  /*
976  * If there are any asynchronously-generated results that have not yet
977  * been returned, we have nothing to do; just return one of them.
978  */
979  if (node->as_nasyncresults > 0)
980  {
981  --node->as_nasyncresults;
982  *result = node->as_asyncresults[node->as_nasyncresults];
983  return true;
984  }
985 
986  /* Make a new request for each of the async subplans that need it. */
987  needrequest = node->as_needrequest;
988  node->as_needrequest = NULL;
989  i = -1;
990  while ((i = bms_next_member(needrequest, i)) >= 0)
991  {
992  AsyncRequest *areq = node->as_asyncrequests[i];
993 
994  /* Do the actual work. */
995  ExecAsyncRequest(areq);
996  }
997  bms_free(needrequest);
998 
999  /* Return one of the asynchronously-generated results if any. */
1000  if (node->as_nasyncresults > 0)
1001  {
1002  --node->as_nasyncresults;
1003  *result = node->as_asyncresults[node->as_nasyncresults];
1004  return true;
1005  }
1006 
1007  return false;
1008 }
void bms_free(Bitmapset *a)
Definition: bitmapset.c:209
int as_nasyncresults
Definition: execnodes.h:1358
TupleTableSlot ** as_asyncresults
Definition: execnodes.h:1357

References AppendState::as_asyncrequests, AppendState::as_asyncresults, AppendState::as_nasyncresults, AppendState::as_needrequest, Assert(), bms_free(), bms_is_empty, bms_next_member(), ExecAsyncRequest(), and i.

Referenced by ExecAppendAsyncGetNext().

◆ ExecAppendEstimate()

void ExecAppendEstimate ( AppendState node,
ParallelContext pcxt 
)

Definition at line 484 of file nodeAppend.c.

486 {
487  node->pstate_len =
488  add_size(offsetof(ParallelAppendState, pa_finished),
489  sizeof(bool) * node->as_nplans);
490 
492  shm_toc_estimate_keys(&pcxt->estimator, 1);
493 }
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
Size pstate_len
Definition: execnodes.h:1368
shm_toc_estimator estimator
Definition: parallel.h:42

References add_size(), AppendState::as_nplans, ParallelContext::estimator, AppendState::pstate_len, shm_toc_estimate_chunk, and shm_toc_estimate_keys.

Referenced by ExecParallelEstimate().

◆ ExecAppendInitializeDSM()

void ExecAppendInitializeDSM ( AppendState node,
ParallelContext pcxt 
)

Definition at line 503 of file nodeAppend.c.

505 {
506  ParallelAppendState *pstate;
507 
508  pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
509  memset(pstate, 0, node->pstate_len);
511  shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
512 
513  node->as_pstate = pstate;
515 }
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:730
@ LWTRANCHE_PARALLEL_APPEND
Definition: lwlock.h:203
static bool choose_next_subplan_for_leader(AppendState *node)
Definition: nodeAppend.c:620
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
shm_toc * toc
Definition: parallel.h:45
Plan * plan
Definition: execnodes.h:1037
int plan_node_id
Definition: plannodes.h:152

References AppendState::as_pstate, AppendState::choose_next_subplan, choose_next_subplan_for_leader(), LWLockInitialize(), LWTRANCHE_PARALLEL_APPEND, ParallelAppendState::pa_lock, PlanState::plan, Plan::plan_node_id, AppendState::ps, AppendState::pstate_len, shm_toc_allocate(), shm_toc_insert(), and ParallelContext::toc.

Referenced by ExecParallelInitializeDSM().

◆ ExecAppendInitializeWorker()

void ExecAppendInitializeWorker ( AppendState node,
ParallelWorkerContext pwcxt 
)

Definition at line 540 of file nodeAppend.c.

541 {
542  node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
544 }
static bool choose_next_subplan_for_worker(AppendState *node)
Definition: nodeAppend.c:702
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232

References AppendState::as_pstate, AppendState::choose_next_subplan, choose_next_subplan_for_worker(), PlanState::plan, Plan::plan_node_id, AppendState::ps, shm_toc_lookup(), and ParallelWorkerContext::toc.

Referenced by ExecParallelInitializeWorker().

◆ ExecAppendReInitializeDSM()

void ExecAppendReInitializeDSM ( AppendState node,
ParallelContext pcxt 
)

Definition at line 524 of file nodeAppend.c.

525 {
526  ParallelAppendState *pstate = node->as_pstate;
527 
528  pstate->pa_next_plan = 0;
529  memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
530 }

References AppendState::as_nplans, AppendState::as_pstate, ParallelAppendState::pa_finished, and ParallelAppendState::pa_next_plan.

Referenced by ExecParallelReInitializeDSM().

◆ ExecAsyncAppendResponse()

void ExecAsyncAppendResponse ( AsyncRequest areq)

Definition at line 1104 of file nodeAppend.c.

1105 {
1106  AppendState *node = (AppendState *) areq->requestor;
1107  TupleTableSlot *slot = areq->result;
1108 
1109  /* The result should be a TupleTableSlot or NULL. */
1110  Assert(slot == NULL || IsA(slot, TupleTableSlot));
1111 
1112  /* Nothing to do if the request is pending. */
1113  if (!areq->request_complete)
1114  {
1115  /* The request would have been pending for a callback. */
1116  Assert(areq->callback_pending);
1117  return;
1118  }
1119 
1120  /* If the result is NULL or an empty slot, there's nothing more to do. */
1121  if (TupIsNull(slot))
1122  {
1123  /* The ending subplan wouldn't have been pending for a callback. */
1124  Assert(!areq->callback_pending);
1125  --node->as_nasyncremain;
1126  return;
1127  }
1128 
1129  /* Save result so we can return it. */
1130  Assert(node->as_nasyncresults < node->as_nasyncplans);
1131  node->as_asyncresults[node->as_nasyncresults++] = slot;
1132 
1133  /*
1134  * Mark the subplan that returned a result as ready for a new request. We
1135  * don't launch another one here immediately because it might complete.
1136  */
1138  areq->request_index);
1139 }
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:755
#define IsA(nodeptr, _type_)
Definition: nodes.h:179
struct PlanState * requestor
Definition: execnodes.h:595
TupleTableSlot * result
Definition: execnodes.h:600
bool request_complete
Definition: execnodes.h:599

References AppendState::as_asyncresults, AppendState::as_nasyncplans, AppendState::as_nasyncremain, AppendState::as_nasyncresults, AppendState::as_needrequest, Assert(), bms_add_member(), AsyncRequest::callback_pending, IsA, AsyncRequest::request_complete, AsyncRequest::request_index, AsyncRequest::requestor, AsyncRequest::result, and TupIsNull.

Referenced by ExecAsyncResponse().

◆ ExecEndAppend()

void ExecEndAppend ( AppendState node)

Definition at line 386 of file nodeAppend.c.

387 {
388  PlanState **appendplans;
389  int nplans;
390  int i;
391 
392  /*
393  * get information from the node
394  */
395  appendplans = node->appendplans;
396  nplans = node->as_nplans;
397 
398  /*
399  * shut down each of the subscans
400  */
401  for (i = 0; i < nplans; i++)
402  ExecEndNode(appendplans[i]);
403 }
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:557

References AppendState::appendplans, AppendState::as_nplans, ExecEndNode(), and i.

Referenced by ExecEndNode().

◆ ExecInitAppend()

AppendState* ExecInitAppend ( Append node,
EState estate,
int  eflags 
)

Definition at line 109 of file nodeAppend.c.

110 {
111  AppendState *appendstate = makeNode(AppendState);
112  PlanState **appendplanstates;
113  Bitmapset *validsubplans;
114  Bitmapset *asyncplans;
115  int nplans;
116  int nasyncplans;
117  int firstvalid;
118  int i,
119  j;
120 
121  /* check for unsupported flags */
122  Assert(!(eflags & EXEC_FLAG_MARK));
123 
124  /*
125  * create new AppendState for our append node
126  */
127  appendstate->ps.plan = (Plan *) node;
128  appendstate->ps.state = estate;
129  appendstate->ps.ExecProcNode = ExecAppend;
130 
131  /* Let choose_next_subplan_* function handle setting the first subplan */
132  appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
133  appendstate->as_syncdone = false;
134  appendstate->as_begun = false;
135 
136  /* If run-time partition pruning is enabled, then set that up now */
137  if (node->part_prune_info != NULL)
138  {
139  PartitionPruneState *prunestate;
140 
141  /*
142  * Set up pruning data structure. This also initializes the set of
143  * subplans to initialize (validsubplans) by taking into account the
144  * result of performing initial pruning if any.
145  */
146  prunestate = ExecInitPartitionPruning(&appendstate->ps,
147  list_length(node->appendplans),
148  node->part_prune_info,
149  &validsubplans);
150  appendstate->as_prune_state = prunestate;
151  nplans = bms_num_members(validsubplans);
152 
153  /*
154  * When no run-time pruning is required and there's at least one
155  * subplan, we can fill as_valid_subplans immediately, preventing
156  * later calls to ExecFindMatchingSubPlans.
157  */
158  if (!prunestate->do_exec_prune && nplans > 0)
159  {
160  appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
161  appendstate->as_valid_subplans_identified = true;
162  }
163  }
164  else
165  {
166  nplans = list_length(node->appendplans);
167 
168  /*
169  * When run-time partition pruning is not enabled we can just mark all
170  * subplans as valid; they must also all be initialized.
171  */
172  Assert(nplans > 0);
173  appendstate->as_valid_subplans = validsubplans =
174  bms_add_range(NULL, 0, nplans - 1);
175  appendstate->as_valid_subplans_identified = true;
176  appendstate->as_prune_state = NULL;
177  }
178 
179  /*
180  * Initialize result tuple type and slot.
181  */
182  ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
183 
184  /* node returns slots from each of its subnodes, therefore not fixed */
185  appendstate->ps.resultopsset = true;
186  appendstate->ps.resultopsfixed = false;
187 
188  appendplanstates = (PlanState **) palloc(nplans *
189  sizeof(PlanState *));
190 
191  /*
192  * call ExecInitNode on each of the valid plans to be executed and save
193  * the results into the appendplanstates array.
194  *
195  * While at it, find out the first valid partial plan.
196  */
197  j = 0;
198  asyncplans = NULL;
199  nasyncplans = 0;
200  firstvalid = nplans;
201  i = -1;
202  while ((i = bms_next_member(validsubplans, i)) >= 0)
203  {
204  Plan *initNode = (Plan *) list_nth(node->appendplans, i);
205 
206  /*
207  * Record async subplans. When executing EvalPlanQual, we treat them
208  * as sync ones; don't do this when initializing an EvalPlanQual plan
209  * tree.
210  */
211  if (initNode->async_capable && estate->es_epq_active == NULL)
212  {
213  asyncplans = bms_add_member(asyncplans, j);
214  nasyncplans++;
215  }
216 
217  /*
218  * Record the lowest appendplans index which is a valid partial plan.
219  */
220  if (i >= node->first_partial_plan && j < firstvalid)
221  firstvalid = j;
222 
223  appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
224  }
225 
226  appendstate->as_first_partial_plan = firstvalid;
227  appendstate->appendplans = appendplanstates;
228  appendstate->as_nplans = nplans;
229 
230  /* Initialize async state */
231  appendstate->as_asyncplans = asyncplans;
232  appendstate->as_nasyncplans = nasyncplans;
233  appendstate->as_asyncrequests = NULL;
234  appendstate->as_asyncresults = NULL;
235  appendstate->as_nasyncresults = 0;
236  appendstate->as_nasyncremain = 0;
237  appendstate->as_needrequest = NULL;
238  appendstate->as_eventset = NULL;
239  appendstate->as_valid_asyncplans = NULL;
240 
241  if (nasyncplans > 0)
242  {
243  appendstate->as_asyncrequests = (AsyncRequest **)
244  palloc0(nplans * sizeof(AsyncRequest *));
245 
246  i = -1;
247  while ((i = bms_next_member(asyncplans, i)) >= 0)
248  {
249  AsyncRequest *areq;
250 
251  areq = palloc(sizeof(AsyncRequest));
252  areq->requestor = (PlanState *) appendstate;
253  areq->requestee = appendplanstates[i];
254  areq->request_index = i;
255  areq->callback_pending = false;
256  areq->request_complete = false;
257  areq->result = NULL;
258 
259  appendstate->as_asyncrequests[i] = areq;
260  }
261 
262  appendstate->as_asyncresults = (TupleTableSlot **)
263  palloc0(nasyncplans * sizeof(TupleTableSlot *));
264 
265  if (appendstate->as_valid_subplans_identified)
266  classify_matching_subplans(appendstate);
267  }
268 
269  /*
270  * Miscellaneous initialization
271  */
272 
273  appendstate->ps.ps_ProjInfo = NULL;
274 
275  /* For parallel query, this will be overridden later. */
277 
278  return appendstate;
279 }
Bitmapset * bms_add_range(Bitmapset *a, int lower, int upper)
Definition: bitmapset.c:859
PartitionPruneState * ExecInitPartitionPruning(PlanState *planstate, int n_total_subplans, PartitionPruneInfo *pruneinfo, Bitmapset **initially_valid_subplans)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:142
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1800
#define EXEC_FLAG_MARK
Definition: executor.h:69
int j
Definition: isn.c:74
void * palloc0(Size size)
Definition: mcxt.c:1257
void * palloc(Size size)
Definition: mcxt.c:1226
static TupleTableSlot * ExecAppend(PlanState *pstate)
Definition: nodeAppend.c:288
static bool choose_next_subplan_locally(AppendState *node)
Definition: nodeAppend.c:554
#define makeNode(_type_)
Definition: nodes.h:176
static int list_length(const List *l)
Definition: pg_list.h:152
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
int first_partial_plan
Definition: plannodes.h:274
struct PartitionPruneInfo * part_prune_info
Definition: plannodes.h:277
List * appendplans
Definition: plannodes.h:267
struct PlanState * requestee
Definition: execnodes.h:596
struct EPQState * es_epq_active
Definition: execnodes.h:691
bool resultopsset
Definition: execnodes.h:1122
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:1077
bool resultopsfixed
Definition: execnodes.h:1118
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:1043
bool async_capable
Definition: plannodes.h:147

References AppendState::appendplans, Append::appendplans, AppendState::as_asyncplans, AppendState::as_asyncrequests, AppendState::as_asyncresults, AppendState::as_begun, AppendState::as_eventset, AppendState::as_first_partial_plan, AppendState::as_nasyncplans, AppendState::as_nasyncremain, AppendState::as_nasyncresults, AppendState::as_needrequest, AppendState::as_nplans, AppendState::as_prune_state, AppendState::as_syncdone, AppendState::as_valid_asyncplans, AppendState::as_valid_subplans, AppendState::as_valid_subplans_identified, AppendState::as_whichplan, Assert(), Plan::async_capable, bms_add_member(), bms_add_range(), bms_next_member(), bms_num_members(), AsyncRequest::callback_pending, AppendState::choose_next_subplan, choose_next_subplan_locally(), classify_matching_subplans(), PartitionPruneState::do_exec_prune, EState::es_epq_active, EXEC_FLAG_MARK, ExecAppend(), ExecInitNode(), ExecInitPartitionPruning(), ExecInitResultTupleSlotTL(), PlanState::ExecProcNode, Append::first_partial_plan, i, INVALID_SUBPLAN_INDEX, j, list_length(), list_nth(), makeNode, palloc(), palloc0(), Append::part_prune_info, PlanState::plan, AppendState::ps, PlanState::ps_ProjInfo, AsyncRequest::request_complete, AsyncRequest::request_index, AsyncRequest::requestee, AsyncRequest::requestor, AsyncRequest::result, PlanState::resultopsfixed, PlanState::resultopsset, PlanState::state, and TTSOpsVirtual.

Referenced by ExecInitNode().

◆ ExecReScanAppend()

void ExecReScanAppend ( AppendState node)

Definition at line 406 of file nodeAppend.c.

407 {
408  int nasyncplans = node->as_nasyncplans;
409  int i;
410 
411  /*
412  * If any PARAM_EXEC Params used in pruning expressions have changed, then
413  * we'd better unset the valid subplans so that they are reselected for
414  * the new parameter values.
415  */
416  if (node->as_prune_state &&
417  bms_overlap(node->ps.chgParam,
419  {
420  node->as_valid_subplans_identified = false;
422  node->as_valid_subplans = NULL;
424  node->as_valid_asyncplans = NULL;
425  }
426 
427  for (i = 0; i < node->as_nplans; i++)
428  {
429  PlanState *subnode = node->appendplans[i];
430 
431  /*
432  * ExecReScan doesn't know about my subplans, so I have to do
433  * changed-parameter signaling myself.
434  */
435  if (node->ps.chgParam != NULL)
436  UpdateChangedParamSet(subnode, node->ps.chgParam);
437 
438  /*
439  * If chgParam of subnode is not null then plan will be re-scanned by
440  * first ExecProcNode or by first ExecAsyncRequest.
441  */
442  if (subnode->chgParam == NULL)
443  ExecReScan(subnode);
444  }
445 
446  /* Reset async state */
447  if (nasyncplans > 0)
448  {
449  i = -1;
450  while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
451  {
452  AsyncRequest *areq = node->as_asyncrequests[i];
453 
454  areq->callback_pending = false;
455  areq->request_complete = false;
456  areq->result = NULL;
457  }
458 
459  node->as_nasyncresults = 0;
460  node->as_nasyncremain = 0;
461  bms_free(node->as_needrequest);
462  node->as_needrequest = NULL;
463  }
464 
465  /* Let choose_next_subplan_* function handle setting the first subplan */
467  node->as_syncdone = false;
468  node->as_begun = false;
469 }
void ExecReScan(PlanState *node)
Definition: execAmi.c:78
void UpdateChangedParamSet(PlanState *node, Bitmapset *newchg)
Definition: execUtils.c:875
Bitmapset * execparamids
Bitmapset * chgParam
Definition: execnodes.h:1069

References AppendState::appendplans, AppendState::as_asyncplans, AppendState::as_asyncrequests, AppendState::as_begun, AppendState::as_nasyncplans, AppendState::as_nasyncremain, AppendState::as_nasyncresults, AppendState::as_needrequest, AppendState::as_nplans, AppendState::as_prune_state, AppendState::as_syncdone, AppendState::as_valid_asyncplans, AppendState::as_valid_subplans, AppendState::as_valid_subplans_identified, AppendState::as_whichplan, bms_free(), bms_next_member(), bms_overlap(), AsyncRequest::callback_pending, PlanState::chgParam, PartitionPruneState::execparamids, ExecReScan(), i, INVALID_SUBPLAN_INDEX, AppendState::ps, AsyncRequest::request_complete, AsyncRequest::result, and UpdateChangedParamSet().

Referenced by ExecReScan().

◆ mark_invalid_subplans_as_finished()

static void mark_invalid_subplans_as_finished ( AppendState node)
static

Definition at line 828 of file nodeAppend.c.

829 {
830  int i;
831 
832  /* Only valid to call this while in parallel Append mode */
833  Assert(node->as_pstate);
834 
835  /* Shouldn't have been called when run-time pruning is not enabled */
836  Assert(node->as_prune_state);
837 
838  /* Nothing to do if all plans are valid */
839  if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
840  return;
841 
842  /* Mark all non-valid plans as finished */
843  for (i = 0; i < node->as_nplans; i++)
844  {
845  if (!bms_is_member(i, node->as_valid_subplans))
846  node->as_pstate->pa_finished[i] = true;
847  }
848 }
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:444

References AppendState::as_nplans, AppendState::as_prune_state, AppendState::as_pstate, AppendState::as_valid_subplans, Assert(), bms_is_member(), bms_num_members(), i, and ParallelAppendState::pa_finished.

Referenced by choose_next_subplan_for_leader(), and choose_next_subplan_for_worker().