PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
execParallel.h File Reference
#include "access/parallel.h"
#include "nodes/execnodes.h"
#include "nodes/parsenodes.h"
#include "nodes/plannodes.h"
#include "utils/dsa.h"
Include dependency graph for execParallel.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ParallelExecutorInfo
 

Typedefs

typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation
 
typedef struct ParallelExecutorInfo ParallelExecutorInfo
 

Functions

ParallelExecutorInfoExecInitParallelPlan (PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed)
 
void ExecParallelCreateReaders (ParallelExecutorInfo *pei)
 
void ExecParallelFinish (ParallelExecutorInfo *pei)
 
void ExecParallelCleanup (ParallelExecutorInfo *pei)
 
void ExecParallelReinitialize (PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParams)
 
void ParallelQueryMain (dsm_segment *seg, shm_toc *toc)
 

Typedef Documentation

◆ ParallelExecutorInfo

◆ SharedExecutorInstrumentation

Function Documentation

◆ ExecInitParallelPlan()

ParallelExecutorInfo * ExecInitParallelPlan ( PlanState planstate,
EState estate,
Bitmapset sendParams,
int  nworkers,
int64  tuples_needed 
)

Definition at line 599 of file execParallel.c.

602{
604 ParallelContext *pcxt;
608 char *pstmt_data;
609 char *pstmt_space;
610 char *paramlistinfo_space;
611 BufferUsage *bufusage_space;
612 WalUsage *walusage_space;
613 SharedExecutorInstrumentation *instrumentation = NULL;
614 SharedJitInstrumentation *jit_instrumentation = NULL;
615 int pstmt_len;
616 int paramlistinfo_len;
617 int instrumentation_len = 0;
618 int jit_instrumentation_len = 0;
619 int instrument_offset = 0;
620 Size dsa_minsize = dsa_minimum_size();
621 char *query_string;
622 int query_len;
623
624 /*
625 * Force any initplan outputs that we're going to pass to workers to be
626 * evaluated, if they weren't already.
627 *
628 * For simplicity, we use the EState's per-output-tuple ExprContext here.
629 * That risks intra-query memory leakage, since we might pass through here
630 * many times before that ExprContext gets reset; but ExecSetParamPlan
631 * doesn't normally leak any memory in the context (see its comments), so
632 * it doesn't seem worth complicating this function's API to pass it a
633 * shorter-lived ExprContext. This might need to change someday.
634 */
636
637 /* Allocate object for return value. */
638 pei = palloc0(sizeof(ParallelExecutorInfo));
639 pei->finished = false;
640 pei->planstate = planstate;
641
642 /* Fix up and serialize plan to be sent to workers. */
643 pstmt_data = ExecSerializePlan(planstate->plan, estate);
644
645 /* Create a parallel context. */
646 pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
647 pei->pcxt = pcxt;
648
649 /*
650 * Before telling the parallel context to create a dynamic shared memory
651 * segment, we need to figure out how big it should be. Estimate space
652 * for the various things we need to store.
653 */
654
655 /* Estimate space for fixed-size state. */
659
660 /* Estimate space for query text. */
661 query_len = strlen(estate->es_sourceText);
662 shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
664
665 /* Estimate space for serialized PlannedStmt. */
666 pstmt_len = strlen(pstmt_data) + 1;
667 shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
669
670 /* Estimate space for serialized ParamListInfo. */
671 paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
672 shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
674
675 /*
676 * Estimate space for BufferUsage.
677 *
678 * If EXPLAIN is not in use and there are no extensions loaded that care,
679 * we could skip this. But we have no way of knowing whether anyone's
680 * looking at pgBufferUsage, so do it unconditionally.
681 */
683 mul_size(sizeof(BufferUsage), pcxt->nworkers));
685
686 /*
687 * Same thing for WalUsage.
688 */
690 mul_size(sizeof(WalUsage), pcxt->nworkers));
692
693 /* Estimate space for tuple queues. */
697
698 /*
699 * Give parallel-aware nodes a chance to add to the estimates, and get a
700 * count of how many PlanState nodes there are.
701 */
702 e.pcxt = pcxt;
703 e.nnodes = 0;
704 ExecParallelEstimate(planstate, &e);
705
706 /* Estimate space for instrumentation, if required. */
707 if (estate->es_instrument)
708 {
709 instrumentation_len =
710 offsetof(SharedExecutorInstrumentation, plan_node_id) +
711 sizeof(int) * e.nnodes;
712 instrumentation_len = MAXALIGN(instrumentation_len);
713 instrument_offset = instrumentation_len;
714 instrumentation_len +=
716 mul_size(e.nnodes, nworkers));
717 shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
719
720 /* Estimate space for JIT instrumentation, if required. */
721 if (estate->es_jit_flags != PGJIT_NONE)
722 {
723 jit_instrumentation_len =
724 offsetof(SharedJitInstrumentation, jit_instr) +
725 sizeof(JitInstrumentation) * nworkers;
726 shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
728 }
729 }
730
731 /* Estimate space for DSA area. */
732 shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
734
735 /*
736 * InitializeParallelDSM() passes the active snapshot to the parallel
737 * worker, which uses it to set es_snapshot. Make sure we don't set
738 * es_snapshot differently in the child.
739 */
741
742 /* Everyone's had a chance to ask for space, so now create the DSM. */
744
745 /*
746 * OK, now we have a dynamic shared memory segment, and it should be big
747 * enough to store all of the data we estimated we would want to put into
748 * it, plus whatever general stuff (not specifically executor-related) the
749 * ParallelContext itself needs to store there. None of the space we
750 * asked for has been allocated or initialized yet, though, so do that.
751 */
752
753 /* Store fixed-size state. */
754 fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
755 fpes->tuples_needed = tuples_needed;
757 fpes->eflags = estate->es_top_eflags;
758 fpes->jit_flags = estate->es_jit_flags;
760
761 /* Store query string */
762 query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
763 memcpy(query_string, estate->es_sourceText, query_len + 1);
764 shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
765
766 /* Store serialized PlannedStmt. */
767 pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
768 memcpy(pstmt_space, pstmt_data, pstmt_len);
769 shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
770
771 /* Store serialized ParamListInfo. */
772 paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
773 shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
774 SerializeParamList(estate->es_param_list_info, &paramlistinfo_space);
775
776 /* Allocate space for each worker's BufferUsage; no need to initialize. */
777 bufusage_space = shm_toc_allocate(pcxt->toc,
778 mul_size(sizeof(BufferUsage), pcxt->nworkers));
779 shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
780 pei->buffer_usage = bufusage_space;
781
782 /* Same for WalUsage. */
783 walusage_space = shm_toc_allocate(pcxt->toc,
784 mul_size(sizeof(WalUsage), pcxt->nworkers));
785 shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
786 pei->wal_usage = walusage_space;
787
788 /* Set up the tuple queues that the workers will write into. */
789 pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
790
791 /* We don't need the TupleQueueReaders yet, though. */
792 pei->reader = NULL;
793
794 /*
795 * If instrumentation options were supplied, allocate space for the data.
796 * It only gets partially initialized here; the rest happens during
797 * ExecParallelInitializeDSM.
798 */
799 if (estate->es_instrument)
800 {
801 Instrumentation *instrument;
802 int i;
803
804 instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
805 instrumentation->instrument_options = estate->es_instrument;
806 instrumentation->instrument_offset = instrument_offset;
807 instrumentation->num_workers = nworkers;
808 instrumentation->num_plan_nodes = e.nnodes;
809 instrument = GetInstrumentationArray(instrumentation);
810 for (i = 0; i < nworkers * e.nnodes; ++i)
811 InstrInit(&instrument[i], estate->es_instrument);
813 instrumentation);
814 pei->instrumentation = instrumentation;
815
816 if (estate->es_jit_flags != PGJIT_NONE)
817 {
818 jit_instrumentation = shm_toc_allocate(pcxt->toc,
819 jit_instrumentation_len);
820 jit_instrumentation->num_workers = nworkers;
821 memset(jit_instrumentation->jit_instr, 0,
822 sizeof(JitInstrumentation) * nworkers);
824 jit_instrumentation);
825 pei->jit_instrumentation = jit_instrumentation;
826 }
827 }
828
829 /*
830 * Create a DSA area that can be used by the leader and all workers.
831 * (However, if we failed to create a DSM and are using private memory
832 * instead, then skip this.)
833 */
834 if (pcxt->seg != NULL)
835 {
836 char *area_space;
837
838 area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
839 shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
840 pei->area = dsa_create_in_place(area_space, dsa_minsize,
842 pcxt->seg);
843
844 /*
845 * Serialize parameters, if any, using DSA storage. We don't dare use
846 * the main parallel query DSM for this because we might relaunch
847 * workers after the values have changed (and thus the amount of
848 * storage required has changed).
849 */
850 if (!bms_is_empty(sendParams))
851 {
852 pei->param_exec = SerializeParamExecParams(estate, sendParams,
853 pei->area);
854 fpes->param_exec = pei->param_exec;
855 }
856 }
857
858 /*
859 * Give parallel-aware nodes a chance to initialize their shared data.
860 * This also initializes the elements of instrumentation->ps_instrument,
861 * if it exists.
862 */
863 d.pcxt = pcxt;
864 d.instrumentation = instrumentation;
865 d.nnodes = 0;
866
867 /* Install our DSA area while initializing the plan. */
868 estate->es_query_dsa = pei->area;
869 ExecParallelInitializeDSM(planstate, &d);
870 estate->es_query_dsa = NULL;
871
872 /*
873 * Make sure that the world hasn't shifted under our feet. This could
874 * probably just be an Assert(), but let's be conservative for now.
875 */
876 if (e.nnodes != d.nnodes)
877 elog(ERROR, "inconsistent count of PlanState nodes");
878
879 /* OK, we're ready to rock and roll. */
880 return pei;
881}
void InitializeParallelDSM(ParallelContext *pcxt)
Definition: parallel.c:211
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
Definition: parallel.c:173
#define bms_is_empty(a)
Definition: bitmapset.h:118
#define MAXALIGN(LEN)
Definition: c.h:782
size_t Size
Definition: c.h:576
size_t dsa_minimum_size(void)
Definition: dsa.c:1196
#define dsa_create_in_place(place, size, tranch_id, segment)
Definition: dsa.h:122
#define InvalidDsaPointer
Definition: dsa.h:78
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define PARALLEL_KEY_BUFFER_USAGE
Definition: execParallel.c:61
#define PARALLEL_KEY_JIT_INSTRUMENTATION
Definition: execParallel.c:66
#define PARALLEL_KEY_PARAMLISTINFO
Definition: execParallel.c:60
#define PARALLEL_TUPLE_QUEUE_SIZE
Definition: execParallel.c:69
static dsa_pointer SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
Definition: execParallel.c:363
#define PARALLEL_KEY_INSTRUMENTATION
Definition: execParallel.c:63
static shm_mq_handle ** ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
Definition: execParallel.c:547
#define PARALLEL_KEY_PLANNEDSTMT
Definition: execParallel.c:59
static bool ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
Definition: execParallel.c:233
#define GetInstrumentationArray(sei)
Definition: execParallel.c:107
#define PARALLEL_KEY_DSA
Definition: execParallel.c:64
#define PARALLEL_KEY_EXECUTOR_FIXED
Definition: execParallel.c:58
static char * ExecSerializePlan(Plan *plan, EState *estate)
Definition: execParallel.c:146
#define PARALLEL_KEY_QUERY_TEXT
Definition: execParallel.c:65
#define PARALLEL_KEY_WAL_USAGE
Definition: execParallel.c:67
static bool ExecParallelInitializeDSM(PlanState *planstate, ExecParallelInitializeDSMContext *d)
Definition: execParallel.c:447
#define GetPerTupleExprContext(estate)
Definition: executor.h:678
Assert(PointerIsAligned(start, uint64))
void InstrInit(Instrumentation *instr, int instrument_options)
Definition: instrument.c:58
int i
Definition: isn.c:77
struct JitInstrumentation JitInstrumentation
#define PGJIT_NONE
Definition: jit.h:19
@ LWTRANCHE_PARALLEL_QUERY_DSA
Definition: lwlock.h:200
void * palloc0(Size size)
Definition: mcxt.c:1973
void ExecSetParamPlanMulti(const Bitmapset *params, ExprContext *econtext)
Definition: nodeSubplan.c:1276
Size EstimateParamListSpace(ParamListInfo paramLI)
Definition: params.c:167
void SerializeParamList(ParamListInfo paramLI, char **start_address)
Definition: params.c:229
e
Definition: preproc-init.c:82
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
Snapshot GetActiveSnapshot(void)
Definition: snapmgr.c:787
struct dsa_area * es_query_dsa
Definition: execnodes.h:751
int es_top_eflags
Definition: execnodes.h:717
int es_instrument
Definition: execnodes.h:718
ParamListInfo es_param_list_info
Definition: execnodes.h:702
int es_jit_flags
Definition: execnodes.h:762
const char * es_sourceText
Definition: execnodes.h:675
Snapshot es_snapshot
Definition: execnodes.h:657
SharedExecutorInstrumentation * instrumentation
Definition: execParallel.c:122
dsm_segment * seg
Definition: parallel.h:42
shm_toc_estimator estimator
Definition: parallel.h:41
shm_toc * toc
Definition: parallel.h:44
PlanState * planstate
Definition: execParallel.h:26
struct SharedJitInstrumentation * jit_instrumentation
Definition: execParallel.h:31
BufferUsage * buffer_usage
Definition: execParallel.h:28
dsa_pointer param_exec
Definition: execParallel.h:33
ParallelContext * pcxt
Definition: execParallel.h:27
WalUsage * wal_usage
Definition: execParallel.h:29
shm_mq_handle ** tqueue
Definition: execParallel.h:36
SharedExecutorInstrumentation * instrumentation
Definition: execParallel.h:30
struct TupleQueueReader ** reader
Definition: execParallel.h:37
Plan * plan
Definition: execnodes.h:1159
JitInstrumentation jit_instr[FLEXIBLE_ARRAY_MEMBER]
Definition: jit.h:54

References ParallelExecutorInfo::area, Assert(), bms_is_empty, ParallelExecutorInfo::buffer_usage, CreateParallelContext(), dsa_create_in_place, dsa_minimum_size(), FixedParallelExecutorState::eflags, elog, ERROR, EState::es_instrument, EState::es_jit_flags, EState::es_param_list_info, EState::es_query_dsa, EState::es_snapshot, EState::es_sourceText, EState::es_top_eflags, EstimateParamListSpace(), ParallelContext::estimator, ExecParallelEstimate(), ExecParallelInitializeDSM(), ExecParallelSetupTupleQueues(), ExecSerializePlan(), ExecSetParamPlanMulti(), ParallelExecutorInfo::finished, GetActiveSnapshot(), GetInstrumentationArray, GetPerTupleExprContext, i, InitializeParallelDSM(), InstrInit(), SharedExecutorInstrumentation::instrument_offset, SharedExecutorInstrumentation::instrument_options, ExecParallelInitializeDSMContext::instrumentation, ParallelExecutorInfo::instrumentation, InvalidDsaPointer, FixedParallelExecutorState::jit_flags, SharedJitInstrumentation::jit_instr, ParallelExecutorInfo::jit_instrumentation, LWTRANCHE_PARALLEL_QUERY_DSA, MAXALIGN, mul_size(), ExecParallelInitializeDSMContext::nnodes, SharedExecutorInstrumentation::num_plan_nodes, SharedExecutorInstrumentation::num_workers, SharedJitInstrumentation::num_workers, ParallelContext::nworkers, palloc0(), PARALLEL_KEY_BUFFER_USAGE, PARALLEL_KEY_DSA, PARALLEL_KEY_EXECUTOR_FIXED, PARALLEL_KEY_INSTRUMENTATION, PARALLEL_KEY_JIT_INSTRUMENTATION, PARALLEL_KEY_PARAMLISTINFO, PARALLEL_KEY_PLANNEDSTMT, PARALLEL_KEY_QUERY_TEXT, PARALLEL_KEY_WAL_USAGE, PARALLEL_TUPLE_QUEUE_SIZE, FixedParallelExecutorState::param_exec, ParallelExecutorInfo::param_exec, ExecParallelInitializeDSMContext::pcxt, ParallelExecutorInfo::pcxt, PGJIT_NONE, PlanState::plan, ParallelExecutorInfo::planstate, ParallelExecutorInfo::reader, ParallelContext::seg, SerializeParamExecParams(), SerializeParamList(), shm_toc_allocate(), shm_toc_estimate_chunk, shm_toc_estimate_keys, shm_toc_insert(), ParallelContext::toc, ParallelExecutorInfo::tqueue, FixedParallelExecutorState::tuples_needed, and ParallelExecutorInfo::wal_usage.

Referenced by ExecGather(), and ExecGatherMerge().

◆ ExecParallelCleanup()

void ExecParallelCleanup ( ParallelExecutorInfo pei)

Definition at line 1209 of file execParallel.c.

1210{
1211 /* Accumulate instrumentation, if any. */
1212 if (pei->instrumentation)
1214 pei->instrumentation);
1215
1216 /* Accumulate JIT instrumentation, if any. */
1217 if (pei->jit_instrumentation)
1219 pei->jit_instrumentation);
1220
1221 /* Free any serialized parameters. */
1222 if (DsaPointerIsValid(pei->param_exec))
1223 {
1224 dsa_free(pei->area, pei->param_exec);
1226 }
1227 if (pei->area != NULL)
1228 {
1229 dsa_detach(pei->area);
1230 pei->area = NULL;
1231 }
1232 if (pei->pcxt != NULL)
1233 {
1235 pei->pcxt = NULL;
1236 }
1237 pfree(pei);
1238}
void DestroyParallelContext(ParallelContext *pcxt)
Definition: parallel.c:950
void dsa_detach(dsa_area *area)
Definition: dsa.c:1952
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:826
#define DsaPointerIsValid(x)
Definition: dsa.h:106
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation)
static void ExecParallelRetrieveJitInstrumentation(PlanState *planstate, SharedJitInstrumentation *shared_jit)
void pfree(void *pointer)
Definition: mcxt.c:2150

References ParallelExecutorInfo::area, DestroyParallelContext(), dsa_detach(), dsa_free(), DsaPointerIsValid, ExecParallelRetrieveInstrumentation(), ExecParallelRetrieveJitInstrumentation(), ParallelExecutorInfo::instrumentation, InvalidDsaPointer, ParallelExecutorInfo::jit_instrumentation, ParallelExecutorInfo::param_exec, ParallelExecutorInfo::pcxt, pfree(), and ParallelExecutorInfo::planstate.

Referenced by ExecShutdownGather(), and ExecShutdownGatherMerge().

◆ ExecParallelCreateReaders()

void ExecParallelCreateReaders ( ParallelExecutorInfo pei)

Definition at line 890 of file execParallel.c.

891{
892 int nworkers = pei->pcxt->nworkers_launched;
893 int i;
894
895 Assert(pei->reader == NULL);
896
897 if (nworkers > 0)
898 {
899 pei->reader = (TupleQueueReader **)
900 palloc(nworkers * sizeof(TupleQueueReader *));
901
902 for (i = 0; i < nworkers; i++)
903 {
905 pei->pcxt->worker[i].bgwhandle);
906 pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
907 }
908 }
909}
void * palloc(Size size)
Definition: mcxt.c:1943
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:319
ParallelWorkerInfo * worker
Definition: parallel.h:45
int nworkers_launched
Definition: parallel.h:37
BackgroundWorkerHandle * bgwhandle
Definition: parallel.h:27
TupleQueueReader * CreateTupleQueueReader(shm_mq_handle *handle)
Definition: tqueue.c:139

References Assert(), ParallelWorkerInfo::bgwhandle, CreateTupleQueueReader(), i, ParallelContext::nworkers_launched, palloc(), ParallelExecutorInfo::pcxt, ParallelExecutorInfo::reader, shm_mq_set_handle(), ParallelExecutorInfo::tqueue, and ParallelContext::worker.

Referenced by ExecGather(), and ExecGatherMerge().

◆ ExecParallelFinish()

void ExecParallelFinish ( ParallelExecutorInfo pei)

Definition at line 1156 of file execParallel.c.

1157{
1158 int nworkers = pei->pcxt->nworkers_launched;
1159 int i;
1160
1161 /* Make this be a no-op if called twice in a row. */
1162 if (pei->finished)
1163 return;
1164
1165 /*
1166 * Detach from tuple queues ASAP, so that any still-active workers will
1167 * notice that no further results are wanted.
1168 */
1169 if (pei->tqueue != NULL)
1170 {
1171 for (i = 0; i < nworkers; i++)
1172 shm_mq_detach(pei->tqueue[i]);
1173 pfree(pei->tqueue);
1174 pei->tqueue = NULL;
1175 }
1176
1177 /*
1178 * While we're waiting for the workers to finish, let's get rid of the
1179 * tuple queue readers. (Any other local cleanup could be done here too.)
1180 */
1181 if (pei->reader != NULL)
1182 {
1183 for (i = 0; i < nworkers; i++)
1185 pfree(pei->reader);
1186 pei->reader = NULL;
1187 }
1188
1189 /* Now wait for the workers to finish. */
1191
1192 /*
1193 * Next, accumulate buffer/WAL usage. (This must wait for the workers to
1194 * finish, or we might get incomplete data.)
1195 */
1196 for (i = 0; i < nworkers; i++)
1198
1199 pei->finished = true;
1200}
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
Definition: parallel.c:796
void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition: instrument.c:218
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:843
void DestroyTupleQueueReader(TupleQueueReader *reader)
Definition: tqueue.c:155

References ParallelExecutorInfo::buffer_usage, DestroyTupleQueueReader(), ParallelExecutorInfo::finished, i, InstrAccumParallelQuery(), ParallelContext::nworkers_launched, ParallelExecutorInfo::pcxt, pfree(), ParallelExecutorInfo::reader, shm_mq_detach(), ParallelExecutorInfo::tqueue, WaitForParallelWorkersToFinish(), and ParallelExecutorInfo::wal_usage.

Referenced by ExecShutdownGatherMergeWorkers(), and ExecShutdownGatherWorkers().

◆ ExecParallelReinitialize()

void ExecParallelReinitialize ( PlanState planstate,
ParallelExecutorInfo pei,
Bitmapset sendParams 
)

Definition at line 916 of file execParallel.c.

919{
920 EState *estate = planstate->state;
922
923 /* Old workers must already be shut down */
924 Assert(pei->finished);
925
926 /*
927 * Force any initplan outputs that we're going to pass to workers to be
928 * evaluated, if they weren't already (see comments in
929 * ExecInitParallelPlan).
930 */
932
934 pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
935 pei->reader = NULL;
936 pei->finished = false;
937
939
940 /* Free any serialized parameters from the last round. */
941 if (DsaPointerIsValid(fpes->param_exec))
942 {
943 dsa_free(pei->area, fpes->param_exec);
945 }
946
947 /* Serialize current parameter values if required. */
948 if (!bms_is_empty(sendParams))
949 {
950 pei->param_exec = SerializeParamExecParams(estate, sendParams,
951 pei->area);
952 fpes->param_exec = pei->param_exec;
953 }
954
955 /* Traverse plan tree and let each child node reset associated state. */
956 estate->es_query_dsa = pei->area;
957 ExecParallelReInitializeDSM(planstate, pei->pcxt);
958 estate->es_query_dsa = NULL;
959}
void ReinitializeParallelDSM(ParallelContext *pcxt)
Definition: parallel.c:508
static bool ExecParallelReInitializeDSM(PlanState *planstate, ParallelContext *pcxt)
Definition: execParallel.c:965
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
EState * state
Definition: execnodes.h:1161

References ParallelExecutorInfo::area, Assert(), bms_is_empty, dsa_free(), DsaPointerIsValid, EState::es_query_dsa, ExecParallelReInitializeDSM(), ExecParallelSetupTupleQueues(), ExecSetParamPlanMulti(), ParallelExecutorInfo::finished, GetPerTupleExprContext, InvalidDsaPointer, PARALLEL_KEY_EXECUTOR_FIXED, FixedParallelExecutorState::param_exec, ParallelExecutorInfo::param_exec, ParallelExecutorInfo::pcxt, ParallelExecutorInfo::reader, ReinitializeParallelDSM(), SerializeParamExecParams(), shm_toc_lookup(), PlanState::state, ParallelContext::toc, and ParallelExecutorInfo::tqueue.

Referenced by ExecGather(), and ExecGatherMerge().

◆ ParallelQueryMain()

void ParallelQueryMain ( dsm_segment seg,
shm_toc toc 
)

Definition at line 1436 of file execParallel.c.

1437{
1439 BufferUsage *buffer_usage;
1440 WalUsage *wal_usage;
1441 DestReceiver *receiver;
1442 QueryDesc *queryDesc;
1443 SharedExecutorInstrumentation *instrumentation;
1444 SharedJitInstrumentation *jit_instrumentation;
1445 int instrument_options = 0;
1446 void *area_space;
1447 dsa_area *area;
1449
1450 /* Get fixed-size state. */
1451 fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1452
1453 /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
1454 receiver = ExecParallelGetReceiver(seg, toc);
1455 instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
1456 if (instrumentation != NULL)
1457 instrument_options = instrumentation->instrument_options;
1458 jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1459 true);
1460 queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1461
1462 /* Setting debug_query_string for individual workers */
1463 debug_query_string = queryDesc->sourceText;
1464
1465 /* Report workers' query for monitoring purposes */
1467
1468 /* Attach to the dynamic shared memory area. */
1469 area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
1470 area = dsa_attach_in_place(area_space, seg);
1471
1472 /* Start up the executor */
1473 queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
1474 if (!ExecutorStart(queryDesc, fpes->eflags))
1475 elog(ERROR, "ExecutorStart() failed unexpectedly");
1476
1477 /* Special executor initialization steps for parallel workers */
1478 queryDesc->planstate->state->es_query_dsa = area;
1479 if (DsaPointerIsValid(fpes->param_exec))
1480 {
1481 char *paramexec_space;
1482
1483 paramexec_space = dsa_get_address(area, fpes->param_exec);
1484 RestoreParamExecParams(paramexec_space, queryDesc->estate);
1485 }
1486 pwcxt.toc = toc;
1487 pwcxt.seg = seg;
1488 ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1489
1490 /* Pass down any tuple bound */
1491 ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1492
1493 /*
1494 * Prepare to track buffer/WAL usage during query execution.
1495 *
1496 * We do this after starting up the executor to match what happens in the
1497 * leader, which also doesn't count buffer accesses and WAL activity that
1498 * occur during executor startup.
1499 */
1501
1502 /*
1503 * Run the plan. If we specified a tuple bound, be careful not to demand
1504 * more tuples than that.
1505 */
1506 ExecutorRun(queryDesc,
1508 fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
1509
1510 /* Shut down the executor */
1511 ExecutorFinish(queryDesc);
1512
1513 /* Report buffer/WAL usage during parallel execution. */
1514 buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1515 wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
1517 &wal_usage[ParallelWorkerNumber]);
1518
1519 /* Report instrumentation data if any instrumentation options are set. */
1520 if (instrumentation != NULL)
1522 instrumentation);
1523
1524 /* Report JIT instrumentation data if any */
1525 if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1526 {
1527 Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1528 jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1529 queryDesc->estate->es_jit->instr;
1530 }
1531
1532 /* Must do this after capturing instrumentation. */
1533 ExecutorEnd(queryDesc);
1534
1535 /* Cleanup. */
1536 dsa_detach(area);
1537 FreeQueryDesc(queryDesc);
1538 receiver->rDestroy(receiver);
1539}
int ParallelWorkerNumber
Definition: parallel.c:115
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
int64_t int64
Definition: c.h:499
dsa_area * dsa_attach_in_place(void *place, dsm_segment *segment)
Definition: dsa.c:545
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:942
bool ExecutorStart(QueryDesc *queryDesc, int eflags)
Definition: execMain.c:128
void ExecutorEnd(QueryDesc *queryDesc)
Definition: execMain.c:538
void ExecutorFinish(QueryDesc *queryDesc)
Definition: execMain.c:475
void ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count)
Definition: execMain.c:365
static QueryDesc * ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, int instrument_options)
static DestReceiver * ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
static bool ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
static bool ExecParallelReportInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation)
static void RestoreParamExecParams(char *start_address, EState *estate)
Definition: execParallel.c:418
void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node)
Definition: execProcnode.c:848
void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition: instrument.c:208
void InstrStartParallelQuery(void)
Definition: instrument.c:200
const char * debug_query_string
Definition: postgres.c:88
void FreeQueryDesc(QueryDesc *qdesc)
Definition: pquery.c:112
@ ForwardScanDirection
Definition: sdir.h:28
struct JitContext * es_jit
Definition: execnodes.h:763
JitInstrumentation instr
Definition: jit.h:62
dsm_segment * seg
Definition: parallel.h:52
int jitFlags
Definition: plannodes.h:80
const char * sourceText
Definition: execdesc.h:39
EState * estate
Definition: execdesc.h:49
PlannedStmt * plannedstmt
Definition: execdesc.h:37
PlanState * planstate
Definition: execdesc.h:50
void(* rDestroy)(DestReceiver *self)
Definition: dest.h:126
Definition: dsa.c:348

References Assert(), debug_query_string, dsa_attach_in_place(), dsa_detach(), dsa_get_address(), DsaPointerIsValid, FixedParallelExecutorState::eflags, elog, ERROR, EState::es_jit, EState::es_query_dsa, QueryDesc::estate, ExecParallelGetQueryDesc(), ExecParallelGetReceiver(), ExecParallelInitializeWorker(), ExecParallelReportInstrumentation(), ExecSetTupleBound(), ExecutorEnd(), ExecutorFinish(), ExecutorRun(), ExecutorStart(), ForwardScanDirection, FreeQueryDesc(), JitContext::instr, InstrEndParallelQuery(), InstrStartParallelQuery(), SharedExecutorInstrumentation::instrument_options, FixedParallelExecutorState::jit_flags, SharedJitInstrumentation::jit_instr, PlannedStmt::jitFlags, PARALLEL_KEY_BUFFER_USAGE, PARALLEL_KEY_DSA, PARALLEL_KEY_EXECUTOR_FIXED, PARALLEL_KEY_INSTRUMENTATION, PARALLEL_KEY_JIT_INSTRUMENTATION, PARALLEL_KEY_WAL_USAGE, ParallelWorkerNumber, FixedParallelExecutorState::param_exec, pgstat_report_activity(), QueryDesc::plannedstmt, QueryDesc::planstate, _DestReceiver::rDestroy, RestoreParamExecParams(), ParallelWorkerContext::seg, shm_toc_lookup(), QueryDesc::sourceText, PlanState::state, STATE_RUNNING, ParallelWorkerContext::toc, and FixedParallelExecutorState::tuples_needed.