PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
execParallel.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * execParallel.c
4 * Support routines for parallel execution.
5 *
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * This file contains routines that are intended to support setting up,
10 * using, and tearing down a ParallelContext from within the PostgreSQL
11 * executor. The ParallelContext machinery will handle starting the
12 * workers and ensuring that their state generally matches that of the
13 * leader; see src/backend/access/transam/README.parallel for details.
14 * However, we must save and restore relevant executor state, such as
15 * any ParamListInfo associated with the query, buffer/WAL usage info, and
16 * the actual plan to be passed down to the worker.
17 *
18 * IDENTIFICATION
19 * src/backend/executor/execParallel.c
20 *
21 *-------------------------------------------------------------------------
22 */
23
24#include "postgres.h"
25
27#include "executor/executor.h"
28#include "executor/nodeAgg.h"
29#include "executor/nodeAppend.h"
32#include "executor/nodeCustom.h"
34#include "executor/nodeHash.h"
41#include "executor/nodeSort.h"
43#include "executor/tqueue.h"
44#include "jit/jit.h"
45#include "nodes/nodeFuncs.h"
46#include "pgstat.h"
47#include "tcop/tcopprot.h"
48#include "utils/datum.h"
49#include "utils/dsa.h"
50#include "utils/lsyscache.h"
51#include "utils/snapmgr.h"
52
53/*
54 * Magic numbers for parallel executor communication. We use constants
55 * greater than any 32-bit integer here so that values < 2^32 can be used
56 * by individual parallel nodes to store their own state.
57 */
58#define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
59#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
60#define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
61#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
62#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
63#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
64#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
65#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
66#define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
67#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
68
69#define PARALLEL_TUPLE_QUEUE_SIZE 65536
70
71/*
72 * Fixed-size random stuff that we need to pass to parallel workers.
73 */
75{
76 int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
78 int eflags;
81
82/*
83 * DSM structure for accumulating per-PlanState instrumentation.
84 *
85 * instrument_options: Same meaning here as in instrument.c.
86 *
87 * instrument_offset: Offset, relative to the start of this structure,
88 * of the first Instrumentation object. This will depend on the length of
89 * the plan_node_id array.
90 *
91 * num_workers: Number of workers.
92 *
93 * num_plan_nodes: Number of plan nodes.
94 *
95 * plan_node_id: Array of plan nodes for which we are gathering instrumentation
96 * from parallel workers. The length of this array is given by num_plan_nodes.
97 */
99{
105 /* array of num_plan_nodes * num_workers Instrumentation objects follows */
106};
107#define GetInstrumentationArray(sei) \
108 (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
109 (Instrumentation *) (((char *) sei) + sei->instrument_offset))
110
111/* Context object for ExecParallelEstimate. */
113{
117
118/* Context object for ExecParallelInitializeDSM. */
120{
125
126/* Helper functions that run in the parallel leader. */
127static char *ExecSerializePlan(Plan *plan, EState *estate);
128static bool ExecParallelEstimate(PlanState *planstate,
130static bool ExecParallelInitializeDSM(PlanState *planstate,
133 bool reinitialize);
134static bool ExecParallelReInitializeDSM(PlanState *planstate,
135 ParallelContext *pcxt);
137 SharedExecutorInstrumentation *instrumentation);
138
139/* Helper function that runs in the parallel worker. */
141
142/*
143 * Create a serialized representation of the plan to be sent to each worker.
144 */
145static char *
147{
148 PlannedStmt *pstmt;
149 ListCell *lc;
150
151 /* We can't scribble on the original plan, so make a copy. */
153
154 /*
155 * The worker will start its own copy of the executor, and that copy will
156 * insert a junk filter if the toplevel node has any resjunk entries. We
157 * don't want that to happen, because while resjunk columns shouldn't be
158 * sent back to the user, here the tuples are coming back to another
159 * backend which may very well need them. So mutate the target list
160 * accordingly. This is sort of a hack; there might be better ways to do
161 * this...
162 */
163 foreach(lc, plan->targetlist)
164 {
166
167 tle->resjunk = false;
168 }
169
170 /*
171 * Create a dummy PlannedStmt. Most of the fields don't need to be valid
172 * for our purposes, but the worker will need at least a minimal
173 * PlannedStmt to start the executor.
174 */
175 pstmt = makeNode(PlannedStmt);
176 pstmt->commandType = CMD_SELECT;
178 pstmt->planId = pgstat_get_my_plan_id();
179 pstmt->hasReturning = false;
180 pstmt->hasModifyingCTE = false;
181 pstmt->canSetTag = true;
182 pstmt->transientPlan = false;
183 pstmt->dependsOnRole = false;
184 pstmt->parallelModeNeeded = false;
185 pstmt->planTree = plan;
186 pstmt->partPruneInfos = estate->es_part_prune_infos;
187 pstmt->rtable = estate->es_range_table;
188 pstmt->unprunableRelids = estate->es_unpruned_relids;
189 pstmt->permInfos = estate->es_rteperminfos;
190 pstmt->resultRelations = NIL;
191 pstmt->appendRelations = NIL;
192
193 /*
194 * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
195 * for unsafe ones (so that the list indexes of the safe ones are
196 * preserved). This positively ensures that the worker won't try to run,
197 * or even do ExecInitNode on, an unsafe subplan. That's important to
198 * protect, eg, non-parallel-aware FDWs from getting into trouble.
199 */
200 pstmt->subplans = NIL;
201 foreach(lc, estate->es_plannedstmt->subplans)
202 {
203 Plan *subplan = (Plan *) lfirst(lc);
204
205 if (subplan && !subplan->parallel_safe)
206 subplan = NULL;
207 pstmt->subplans = lappend(pstmt->subplans, subplan);
208 }
209
210 pstmt->rewindPlanIDs = NULL;
211 pstmt->rowMarks = NIL;
212 pstmt->relationOids = NIL;
213 pstmt->invalItems = NIL; /* workers can't replan anyway... */
215 pstmt->utilityStmt = NULL;
216 pstmt->stmt_location = -1;
217 pstmt->stmt_len = -1;
218
219 /* Return serialized copy of our dummy PlannedStmt. */
220 return nodeToString(pstmt);
221}
222
223/*
224 * Parallel-aware plan nodes (and occasionally others) may need some state
225 * which is shared across all parallel workers. Before we size the DSM, give
226 * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
227 * &pcxt->estimator.
228 *
229 * While we're at it, count the number of PlanState nodes in the tree, so
230 * we know how many Instrumentation structures we need.
231 */
232static bool
234{
235 if (planstate == NULL)
236 return false;
237
238 /* Count this node. */
239 e->nnodes++;
240
241 switch (nodeTag(planstate))
242 {
243 case T_SeqScanState:
244 if (planstate->plan->parallel_aware)
245 ExecSeqScanEstimate((SeqScanState *) planstate,
246 e->pcxt);
247 break;
248 case T_IndexScanState:
249 /* even when not parallel-aware, for EXPLAIN ANALYZE */
251 e->pcxt);
252 break;
253 case T_IndexOnlyScanState:
254 /* even when not parallel-aware, for EXPLAIN ANALYZE */
256 e->pcxt);
257 break;
258 case T_BitmapIndexScanState:
259 /* even when not parallel-aware, for EXPLAIN ANALYZE */
261 e->pcxt);
262 break;
263 case T_ForeignScanState:
264 if (planstate->plan->parallel_aware)
266 e->pcxt);
267 break;
268 case T_AppendState:
269 if (planstate->plan->parallel_aware)
270 ExecAppendEstimate((AppendState *) planstate,
271 e->pcxt);
272 break;
273 case T_CustomScanState:
274 if (planstate->plan->parallel_aware)
276 e->pcxt);
277 break;
278 case T_BitmapHeapScanState:
279 if (planstate->plan->parallel_aware)
281 e->pcxt);
282 break;
283 case T_HashJoinState:
284 if (planstate->plan->parallel_aware)
286 e->pcxt);
287 break;
288 case T_HashState:
289 /* even when not parallel-aware, for EXPLAIN ANALYZE */
290 ExecHashEstimate((HashState *) planstate, e->pcxt);
291 break;
292 case T_SortState:
293 /* even when not parallel-aware, for EXPLAIN ANALYZE */
294 ExecSortEstimate((SortState *) planstate, e->pcxt);
295 break;
296 case T_IncrementalSortState:
297 /* even when not parallel-aware, for EXPLAIN ANALYZE */
299 break;
300 case T_AggState:
301 /* even when not parallel-aware, for EXPLAIN ANALYZE */
302 ExecAggEstimate((AggState *) planstate, e->pcxt);
303 break;
304 case T_MemoizeState:
305 /* even when not parallel-aware, for EXPLAIN ANALYZE */
306 ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
307 break;
308 default:
309 break;
310 }
311
312 return planstate_tree_walker(planstate, ExecParallelEstimate, e);
313}
314
315/*
316 * Estimate the amount of space required to serialize the indicated parameters.
317 */
318static Size
320{
321 int paramid;
322 Size sz = sizeof(int);
323
324 paramid = -1;
325 while ((paramid = bms_next_member(params, paramid)) >= 0)
326 {
327 Oid typeOid;
328 int16 typLen;
329 bool typByVal;
330 ParamExecData *prm;
331
332 prm = &(estate->es_param_exec_vals[paramid]);
333 typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
334 paramid);
335
336 sz = add_size(sz, sizeof(int)); /* space for paramid */
337
338 /* space for datum/isnull */
339 if (OidIsValid(typeOid))
340 get_typlenbyval(typeOid, &typLen, &typByVal);
341 else
342 {
343 /* If no type OID, assume by-value, like copyParamList does. */
344 typLen = sizeof(Datum);
345 typByVal = true;
346 }
347 sz = add_size(sz,
349 typByVal, typLen));
350 }
351 return sz;
352}
353
354/*
355 * Serialize specified PARAM_EXEC parameters.
356 *
357 * We write the number of parameters first, as a 4-byte integer, and then
358 * write details for each parameter in turn. The details for each parameter
359 * consist of a 4-byte paramid (location of param in execution time internal
360 * parameter array) and then the datum as serialized by datumSerialize().
361 */
362static dsa_pointer
364{
365 Size size;
366 int nparams;
367 int paramid;
368 ParamExecData *prm;
369 dsa_pointer handle;
370 char *start_address;
371
372 /* Allocate enough space for the current parameter values. */
373 size = EstimateParamExecSpace(estate, params);
374 handle = dsa_allocate(area, size);
375 start_address = dsa_get_address(area, handle);
376
377 /* First write the number of parameters as a 4-byte integer. */
378 nparams = bms_num_members(params);
379 memcpy(start_address, &nparams, sizeof(int));
380 start_address += sizeof(int);
381
382 /* Write details for each parameter in turn. */
383 paramid = -1;
384 while ((paramid = bms_next_member(params, paramid)) >= 0)
385 {
386 Oid typeOid;
387 int16 typLen;
388 bool typByVal;
389
390 prm = &(estate->es_param_exec_vals[paramid]);
391 typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
392 paramid);
393
394 /* Write paramid. */
395 memcpy(start_address, &paramid, sizeof(int));
396 start_address += sizeof(int);
397
398 /* Write datum/isnull */
399 if (OidIsValid(typeOid))
400 get_typlenbyval(typeOid, &typLen, &typByVal);
401 else
402 {
403 /* If no type OID, assume by-value, like copyParamList does. */
404 typLen = sizeof(Datum);
405 typByVal = true;
406 }
407 datumSerialize(prm->value, prm->isnull, typByVal, typLen,
408 &start_address);
409 }
410
411 return handle;
412}
413
414/*
415 * Restore specified PARAM_EXEC parameters.
416 */
417static void
418RestoreParamExecParams(char *start_address, EState *estate)
419{
420 int nparams;
421 int i;
422 int paramid;
423
424 memcpy(&nparams, start_address, sizeof(int));
425 start_address += sizeof(int);
426
427 for (i = 0; i < nparams; i++)
428 {
429 ParamExecData *prm;
430
431 /* Read paramid */
432 memcpy(&paramid, start_address, sizeof(int));
433 start_address += sizeof(int);
434 prm = &(estate->es_param_exec_vals[paramid]);
435
436 /* Read datum/isnull. */
437 prm->value = datumRestore(&start_address, &prm->isnull);
438 prm->execPlan = NULL;
439 }
440}
441
442/*
443 * Initialize the dynamic shared memory segment that will be used to control
444 * parallel execution.
445 */
446static bool
449{
450 if (planstate == NULL)
451 return false;
452
453 /* If instrumentation is enabled, initialize slot for this node. */
454 if (d->instrumentation != NULL)
456 planstate->plan->plan_node_id;
457
458 /* Count this node. */
459 d->nnodes++;
460
461 /*
462 * Call initializers for DSM-using plan nodes.
463 *
464 * Most plan nodes won't do anything here, but plan nodes that allocated
465 * DSM may need to initialize shared state in the DSM before parallel
466 * workers are launched. They can allocate the space they previously
467 * estimated using shm_toc_allocate, and add the keys they previously
468 * estimated using shm_toc_insert, in each case targeting pcxt->toc.
469 */
470 switch (nodeTag(planstate))
471 {
472 case T_SeqScanState:
473 if (planstate->plan->parallel_aware)
475 d->pcxt);
476 break;
477 case T_IndexScanState:
478 /* even when not parallel-aware, for EXPLAIN ANALYZE */
480 break;
481 case T_IndexOnlyScanState:
482 /* even when not parallel-aware, for EXPLAIN ANALYZE */
484 d->pcxt);
485 break;
486 case T_BitmapIndexScanState:
487 /* even when not parallel-aware, for EXPLAIN ANALYZE */
489 break;
490 case T_ForeignScanState:
491 if (planstate->plan->parallel_aware)
493 d->pcxt);
494 break;
495 case T_AppendState:
496 if (planstate->plan->parallel_aware)
498 d->pcxt);
499 break;
500 case T_CustomScanState:
501 if (planstate->plan->parallel_aware)
503 d->pcxt);
504 break;
505 case T_BitmapHeapScanState:
506 if (planstate->plan->parallel_aware)
508 d->pcxt);
509 break;
510 case T_HashJoinState:
511 if (planstate->plan->parallel_aware)
513 d->pcxt);
514 break;
515 case T_HashState:
516 /* even when not parallel-aware, for EXPLAIN ANALYZE */
517 ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
518 break;
519 case T_SortState:
520 /* even when not parallel-aware, for EXPLAIN ANALYZE */
521 ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
522 break;
523 case T_IncrementalSortState:
524 /* even when not parallel-aware, for EXPLAIN ANALYZE */
526 break;
527 case T_AggState:
528 /* even when not parallel-aware, for EXPLAIN ANALYZE */
529 ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
530 break;
531 case T_MemoizeState:
532 /* even when not parallel-aware, for EXPLAIN ANALYZE */
533 ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
534 break;
535 default:
536 break;
537 }
538
540}
541
542/*
543 * It sets up the response queues for backend workers to return tuples
544 * to the main backend and start the workers.
545 */
546static shm_mq_handle **
548{
549 shm_mq_handle **responseq;
550 char *tqueuespace;
551 int i;
552
553 /* Skip this if no workers. */
554 if (pcxt->nworkers == 0)
555 return NULL;
556
557 /* Allocate memory for shared memory queue handles. */
558 responseq = (shm_mq_handle **)
559 palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
560
561 /*
562 * If not reinitializing, allocate space from the DSM for the queues;
563 * otherwise, find the already allocated space.
564 */
565 if (!reinitialize)
566 tqueuespace =
567 shm_toc_allocate(pcxt->toc,
569 pcxt->nworkers));
570 else
571 tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
572
573 /* Create the queues, and become the receiver for each. */
574 for (i = 0; i < pcxt->nworkers; ++i)
575 {
576 shm_mq *mq;
577
578 mq = shm_mq_create(tqueuespace +
581
583 responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
584 }
585
586 /* Add array of queues to shm_toc, so others can find it. */
587 if (!reinitialize)
588 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
589
590 /* Return array of handles. */
591 return responseq;
592}
593
594/*
595 * Sets up the required infrastructure for backend workers to perform
596 * execution and return results to the main backend.
597 */
600 Bitmapset *sendParams, int nworkers,
601 int64 tuples_needed)
602{
604 ParallelContext *pcxt;
608 char *pstmt_data;
609 char *pstmt_space;
610 char *paramlistinfo_space;
611 BufferUsage *bufusage_space;
612 WalUsage *walusage_space;
613 SharedExecutorInstrumentation *instrumentation = NULL;
614 SharedJitInstrumentation *jit_instrumentation = NULL;
615 int pstmt_len;
616 int paramlistinfo_len;
617 int instrumentation_len = 0;
618 int jit_instrumentation_len = 0;
619 int instrument_offset = 0;
620 Size dsa_minsize = dsa_minimum_size();
621 char *query_string;
622 int query_len;
623
624 /*
625 * Force any initplan outputs that we're going to pass to workers to be
626 * evaluated, if they weren't already.
627 *
628 * For simplicity, we use the EState's per-output-tuple ExprContext here.
629 * That risks intra-query memory leakage, since we might pass through here
630 * many times before that ExprContext gets reset; but ExecSetParamPlan
631 * doesn't normally leak any memory in the context (see its comments), so
632 * it doesn't seem worth complicating this function's API to pass it a
633 * shorter-lived ExprContext. This might need to change someday.
634 */
636
637 /* Allocate object for return value. */
638 pei = palloc0(sizeof(ParallelExecutorInfo));
639 pei->finished = false;
640 pei->planstate = planstate;
641
642 /* Fix up and serialize plan to be sent to workers. */
643 pstmt_data = ExecSerializePlan(planstate->plan, estate);
644
645 /* Create a parallel context. */
646 pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
647 pei->pcxt = pcxt;
648
649 /*
650 * Before telling the parallel context to create a dynamic shared memory
651 * segment, we need to figure out how big it should be. Estimate space
652 * for the various things we need to store.
653 */
654
655 /* Estimate space for fixed-size state. */
659
660 /* Estimate space for query text. */
661 query_len = strlen(estate->es_sourceText);
662 shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
664
665 /* Estimate space for serialized PlannedStmt. */
666 pstmt_len = strlen(pstmt_data) + 1;
667 shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
669
670 /* Estimate space for serialized ParamListInfo. */
671 paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
672 shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
674
675 /*
676 * Estimate space for BufferUsage.
677 *
678 * If EXPLAIN is not in use and there are no extensions loaded that care,
679 * we could skip this. But we have no way of knowing whether anyone's
680 * looking at pgBufferUsage, so do it unconditionally.
681 */
683 mul_size(sizeof(BufferUsage), pcxt->nworkers));
685
686 /*
687 * Same thing for WalUsage.
688 */
690 mul_size(sizeof(WalUsage), pcxt->nworkers));
692
693 /* Estimate space for tuple queues. */
697
698 /*
699 * Give parallel-aware nodes a chance to add to the estimates, and get a
700 * count of how many PlanState nodes there are.
701 */
702 e.pcxt = pcxt;
703 e.nnodes = 0;
704 ExecParallelEstimate(planstate, &e);
705
706 /* Estimate space for instrumentation, if required. */
707 if (estate->es_instrument)
708 {
709 instrumentation_len =
710 offsetof(SharedExecutorInstrumentation, plan_node_id) +
711 sizeof(int) * e.nnodes;
712 instrumentation_len = MAXALIGN(instrumentation_len);
713 instrument_offset = instrumentation_len;
714 instrumentation_len +=
716 mul_size(e.nnodes, nworkers));
717 shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
719
720 /* Estimate space for JIT instrumentation, if required. */
721 if (estate->es_jit_flags != PGJIT_NONE)
722 {
723 jit_instrumentation_len =
724 offsetof(SharedJitInstrumentation, jit_instr) +
725 sizeof(JitInstrumentation) * nworkers;
726 shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
728 }
729 }
730
731 /* Estimate space for DSA area. */
732 shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
734
735 /*
736 * InitializeParallelDSM() passes the active snapshot to the parallel
737 * worker, which uses it to set es_snapshot. Make sure we don't set
738 * es_snapshot differently in the child.
739 */
741
742 /* Everyone's had a chance to ask for space, so now create the DSM. */
744
745 /*
746 * OK, now we have a dynamic shared memory segment, and it should be big
747 * enough to store all of the data we estimated we would want to put into
748 * it, plus whatever general stuff (not specifically executor-related) the
749 * ParallelContext itself needs to store there. None of the space we
750 * asked for has been allocated or initialized yet, though, so do that.
751 */
752
753 /* Store fixed-size state. */
754 fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
755 fpes->tuples_needed = tuples_needed;
757 fpes->eflags = estate->es_top_eflags;
758 fpes->jit_flags = estate->es_jit_flags;
760
761 /* Store query string */
762 query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
763 memcpy(query_string, estate->es_sourceText, query_len + 1);
764 shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
765
766 /* Store serialized PlannedStmt. */
767 pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
768 memcpy(pstmt_space, pstmt_data, pstmt_len);
769 shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
770
771 /* Store serialized ParamListInfo. */
772 paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
773 shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
774 SerializeParamList(estate->es_param_list_info, &paramlistinfo_space);
775
776 /* Allocate space for each worker's BufferUsage; no need to initialize. */
777 bufusage_space = shm_toc_allocate(pcxt->toc,
778 mul_size(sizeof(BufferUsage), pcxt->nworkers));
779 shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
780 pei->buffer_usage = bufusage_space;
781
782 /* Same for WalUsage. */
783 walusage_space = shm_toc_allocate(pcxt->toc,
784 mul_size(sizeof(WalUsage), pcxt->nworkers));
785 shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
786 pei->wal_usage = walusage_space;
787
788 /* Set up the tuple queues that the workers will write into. */
789 pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
790
791 /* We don't need the TupleQueueReaders yet, though. */
792 pei->reader = NULL;
793
794 /*
795 * If instrumentation options were supplied, allocate space for the data.
796 * It only gets partially initialized here; the rest happens during
797 * ExecParallelInitializeDSM.
798 */
799 if (estate->es_instrument)
800 {
801 Instrumentation *instrument;
802 int i;
803
804 instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
805 instrumentation->instrument_options = estate->es_instrument;
806 instrumentation->instrument_offset = instrument_offset;
807 instrumentation->num_workers = nworkers;
808 instrumentation->num_plan_nodes = e.nnodes;
809 instrument = GetInstrumentationArray(instrumentation);
810 for (i = 0; i < nworkers * e.nnodes; ++i)
811 InstrInit(&instrument[i], estate->es_instrument);
813 instrumentation);
814 pei->instrumentation = instrumentation;
815
816 if (estate->es_jit_flags != PGJIT_NONE)
817 {
818 jit_instrumentation = shm_toc_allocate(pcxt->toc,
819 jit_instrumentation_len);
820 jit_instrumentation->num_workers = nworkers;
821 memset(jit_instrumentation->jit_instr, 0,
822 sizeof(JitInstrumentation) * nworkers);
824 jit_instrumentation);
825 pei->jit_instrumentation = jit_instrumentation;
826 }
827 }
828
829 /*
830 * Create a DSA area that can be used by the leader and all workers.
831 * (However, if we failed to create a DSM and are using private memory
832 * instead, then skip this.)
833 */
834 if (pcxt->seg != NULL)
835 {
836 char *area_space;
837
838 area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
839 shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
840 pei->area = dsa_create_in_place(area_space, dsa_minsize,
842 pcxt->seg);
843
844 /*
845 * Serialize parameters, if any, using DSA storage. We don't dare use
846 * the main parallel query DSM for this because we might relaunch
847 * workers after the values have changed (and thus the amount of
848 * storage required has changed).
849 */
850 if (!bms_is_empty(sendParams))
851 {
852 pei->param_exec = SerializeParamExecParams(estate, sendParams,
853 pei->area);
854 fpes->param_exec = pei->param_exec;
855 }
856 }
857
858 /*
859 * Give parallel-aware nodes a chance to initialize their shared data.
860 * This also initializes the elements of instrumentation->ps_instrument,
861 * if it exists.
862 */
863 d.pcxt = pcxt;
864 d.instrumentation = instrumentation;
865 d.nnodes = 0;
866
867 /* Install our DSA area while initializing the plan. */
868 estate->es_query_dsa = pei->area;
869 ExecParallelInitializeDSM(planstate, &d);
870 estate->es_query_dsa = NULL;
871
872 /*
873 * Make sure that the world hasn't shifted under our feet. This could
874 * probably just be an Assert(), but let's be conservative for now.
875 */
876 if (e.nnodes != d.nnodes)
877 elog(ERROR, "inconsistent count of PlanState nodes");
878
879 /* OK, we're ready to rock and roll. */
880 return pei;
881}
882
883/*
884 * Set up tuple queue readers to read the results of a parallel subplan.
885 *
886 * This is separate from ExecInitParallelPlan() because we can launch the
887 * worker processes and let them start doing something before we do this.
888 */
889void
891{
892 int nworkers = pei->pcxt->nworkers_launched;
893 int i;
894
895 Assert(pei->reader == NULL);
896
897 if (nworkers > 0)
898 {
899 pei->reader = (TupleQueueReader **)
900 palloc(nworkers * sizeof(TupleQueueReader *));
901
902 for (i = 0; i < nworkers; i++)
903 {
905 pei->pcxt->worker[i].bgwhandle);
906 pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
907 }
908 }
909}
910
911/*
912 * Re-initialize the parallel executor shared memory state before launching
913 * a fresh batch of workers.
914 */
915void
918 Bitmapset *sendParams)
919{
920 EState *estate = planstate->state;
922
923 /* Old workers must already be shut down */
924 Assert(pei->finished);
925
926 /*
927 * Force any initplan outputs that we're going to pass to workers to be
928 * evaluated, if they weren't already (see comments in
929 * ExecInitParallelPlan).
930 */
932
934 pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
935 pei->reader = NULL;
936 pei->finished = false;
937
939
940 /* Free any serialized parameters from the last round. */
941 if (DsaPointerIsValid(fpes->param_exec))
942 {
943 dsa_free(pei->area, fpes->param_exec);
945 }
946
947 /* Serialize current parameter values if required. */
948 if (!bms_is_empty(sendParams))
949 {
950 pei->param_exec = SerializeParamExecParams(estate, sendParams,
951 pei->area);
952 fpes->param_exec = pei->param_exec;
953 }
954
955 /* Traverse plan tree and let each child node reset associated state. */
956 estate->es_query_dsa = pei->area;
957 ExecParallelReInitializeDSM(planstate, pei->pcxt);
958 estate->es_query_dsa = NULL;
959}
960
961/*
962 * Traverse plan tree to reinitialize per-node dynamic shared memory state
963 */
964static bool
966 ParallelContext *pcxt)
967{
968 if (planstate == NULL)
969 return false;
970
971 /*
972 * Call reinitializers for DSM-using plan nodes.
973 */
974 switch (nodeTag(planstate))
975 {
976 case T_SeqScanState:
977 if (planstate->plan->parallel_aware)
979 pcxt);
980 break;
981 case T_IndexScanState:
982 if (planstate->plan->parallel_aware)
984 pcxt);
985 break;
986 case T_IndexOnlyScanState:
987 if (planstate->plan->parallel_aware)
989 pcxt);
990 break;
991 case T_ForeignScanState:
992 if (planstate->plan->parallel_aware)
994 pcxt);
995 break;
996 case T_AppendState:
997 if (planstate->plan->parallel_aware)
998 ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
999 break;
1000 case T_CustomScanState:
1001 if (planstate->plan->parallel_aware)
1003 pcxt);
1004 break;
1005 case T_BitmapHeapScanState:
1006 if (planstate->plan->parallel_aware)
1008 pcxt);
1009 break;
1010 case T_HashJoinState:
1011 if (planstate->plan->parallel_aware)
1013 pcxt);
1014 break;
1015 case T_BitmapIndexScanState:
1016 case T_HashState:
1017 case T_SortState:
1018 case T_IncrementalSortState:
1019 case T_MemoizeState:
1020 /* these nodes have DSM state, but no reinitialization is required */
1021 break;
1022
1023 default:
1024 break;
1025 }
1026
1027 return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
1028}
1029
1030/*
1031 * Copy instrumentation information about this node and its descendants from
1032 * dynamic shared memory.
1033 */
1034static bool
1036 SharedExecutorInstrumentation *instrumentation)
1037{
1038 Instrumentation *instrument;
1039 int i;
1040 int n;
1041 int ibytes;
1042 int plan_node_id = planstate->plan->plan_node_id;
1043 MemoryContext oldcontext;
1044
1045 /* Find the instrumentation for this node. */
1046 for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1047 if (instrumentation->plan_node_id[i] == plan_node_id)
1048 break;
1049 if (i >= instrumentation->num_plan_nodes)
1050 elog(ERROR, "plan node %d not found", plan_node_id);
1051
1052 /* Accumulate the statistics from all workers. */
1053 instrument = GetInstrumentationArray(instrumentation);
1054 instrument += i * instrumentation->num_workers;
1055 for (n = 0; n < instrumentation->num_workers; ++n)
1056 InstrAggNode(planstate->instrument, &instrument[n]);
1057
1058 /*
1059 * Also store the per-worker detail.
1060 *
1061 * Worker instrumentation should be allocated in the same context as the
1062 * regular instrumentation information, which is the per-query context.
1063 * Switch into per-query memory context.
1064 */
1065 oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1066 ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
1067 planstate->worker_instrument =
1068 palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
1069 MemoryContextSwitchTo(oldcontext);
1070
1071 planstate->worker_instrument->num_workers = instrumentation->num_workers;
1072 memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1073
1074 /* Perform any node-type-specific work that needs to be done. */
1075 switch (nodeTag(planstate))
1076 {
1077 case T_IndexScanState:
1079 break;
1080 case T_IndexOnlyScanState:
1082 break;
1083 case T_BitmapIndexScanState:
1085 break;
1086 case T_SortState:
1088 break;
1089 case T_IncrementalSortState:
1091 break;
1092 case T_HashState:
1094 break;
1095 case T_AggState:
1097 break;
1098 case T_MemoizeState:
1100 break;
1101 case T_BitmapHeapScanState:
1103 break;
1104 default:
1105 break;
1106 }
1107
1109 instrumentation);
1110}
1111
1112/*
1113 * Add up the workers' JIT instrumentation from dynamic shared memory.
1114 */
1115static void
1117 SharedJitInstrumentation *shared_jit)
1118{
1119 JitInstrumentation *combined;
1120 int ibytes;
1121
1122 int n;
1123
1124 /*
1125 * Accumulate worker JIT instrumentation into the combined JIT
1126 * instrumentation, allocating it if required.
1127 */
1128 if (!planstate->state->es_jit_worker_instr)
1129 planstate->state->es_jit_worker_instr =
1131 combined = planstate->state->es_jit_worker_instr;
1132
1133 /* Accumulate all the workers' instrumentations. */
1134 for (n = 0; n < shared_jit->num_workers; ++n)
1135 InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1136
1137 /*
1138 * Store the per-worker detail.
1139 *
1140 * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1141 * instrumentation in per-query context.
1142 */
1143 ibytes = offsetof(SharedJitInstrumentation, jit_instr)
1144 + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
1145 planstate->worker_jit_instrument =
1146 MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1147
1148 memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1149}
1150
1151/*
1152 * Finish parallel execution. We wait for parallel workers to finish, and
1153 * accumulate their buffer/WAL usage.
1154 */
1155void
1157{
1158 int nworkers = pei->pcxt->nworkers_launched;
1159 int i;
1160
1161 /* Make this be a no-op if called twice in a row. */
1162 if (pei->finished)
1163 return;
1164
1165 /*
1166 * Detach from tuple queues ASAP, so that any still-active workers will
1167 * notice that no further results are wanted.
1168 */
1169 if (pei->tqueue != NULL)
1170 {
1171 for (i = 0; i < nworkers; i++)
1172 shm_mq_detach(pei->tqueue[i]);
1173 pfree(pei->tqueue);
1174 pei->tqueue = NULL;
1175 }
1176
1177 /*
1178 * While we're waiting for the workers to finish, let's get rid of the
1179 * tuple queue readers. (Any other local cleanup could be done here too.)
1180 */
1181 if (pei->reader != NULL)
1182 {
1183 for (i = 0; i < nworkers; i++)
1185 pfree(pei->reader);
1186 pei->reader = NULL;
1187 }
1188
1189 /* Now wait for the workers to finish. */
1191
1192 /*
1193 * Next, accumulate buffer/WAL usage. (This must wait for the workers to
1194 * finish, or we might get incomplete data.)
1195 */
1196 for (i = 0; i < nworkers; i++)
1198
1199 pei->finished = true;
1200}
1201
1202/*
1203 * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1204 * resources still exist after ExecParallelFinish. We separate these
1205 * routines because someone might want to examine the contents of the DSM
1206 * after ExecParallelFinish and before calling this routine.
1207 */
1208void
1210{
1211 /* Accumulate instrumentation, if any. */
1212 if (pei->instrumentation)
1214 pei->instrumentation);
1215
1216 /* Accumulate JIT instrumentation, if any. */
1217 if (pei->jit_instrumentation)
1219 pei->jit_instrumentation);
1220
1221 /* Free any serialized parameters. */
1222 if (DsaPointerIsValid(pei->param_exec))
1223 {
1224 dsa_free(pei->area, pei->param_exec);
1226 }
1227 if (pei->area != NULL)
1228 {
1229 dsa_detach(pei->area);
1230 pei->area = NULL;
1231 }
1232 if (pei->pcxt != NULL)
1233 {
1235 pei->pcxt = NULL;
1236 }
1237 pfree(pei);
1238}
1239
1240/*
1241 * Create a DestReceiver to write tuples we produce to the shm_mq designated
1242 * for that purpose.
1243 */
1244static DestReceiver *
1246{
1247 char *mqspace;
1248 shm_mq *mq;
1249
1250 mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
1252 mq = (shm_mq *) mqspace;
1254 return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1255}
1256
1257/*
1258 * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1259 */
1260static QueryDesc *
1262 int instrument_options)
1263{
1264 char *pstmtspace;
1265 char *paramspace;
1266 PlannedStmt *pstmt;
1267 ParamListInfo paramLI;
1268 char *queryString;
1269
1270 /* Get the query string from shared memory */
1271 queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1272
1273 /* Reconstruct leader-supplied PlannedStmt. */
1274 pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
1275 pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1276
1277 /* Reconstruct ParamListInfo. */
1278 paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
1279 paramLI = RestoreParamList(&paramspace);
1280
1281 /*
1282 * Create a QueryDesc for the query. We pass NULL for cachedplan, because
1283 * we don't have a pointer to the CachedPlan in the leader's process. It's
1284 * fine because the only reason the executor needs to see it is to decide
1285 * if it should take locks on certain relations, but parallel workers
1286 * always take locks anyway.
1287 */
1288 return CreateQueryDesc(pstmt,
1289 NULL,
1290 queryString,
1292 receiver, paramLI, NULL, instrument_options);
1293}
1294
1295/*
1296 * Copy instrumentation information from this node and its descendants into
1297 * dynamic shared memory, so that the parallel leader can retrieve it.
1298 */
1299static bool
1301 SharedExecutorInstrumentation *instrumentation)
1302{
1303 int i;
1304 int plan_node_id = planstate->plan->plan_node_id;
1305 Instrumentation *instrument;
1306
1307 InstrEndLoop(planstate->instrument);
1308
1309 /*
1310 * If we shuffled the plan_node_id values in ps_instrument into sorted
1311 * order, we could use binary search here. This might matter someday if
1312 * we're pushing down sufficiently large plan trees. For now, do it the
1313 * slow, dumb way.
1314 */
1315 for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1316 if (instrumentation->plan_node_id[i] == plan_node_id)
1317 break;
1318 if (i >= instrumentation->num_plan_nodes)
1319 elog(ERROR, "plan node %d not found", plan_node_id);
1320
1321 /*
1322 * Add our statistics to the per-node, per-worker totals. It's possible
1323 * that this could happen more than once if we relaunched workers.
1324 */
1325 instrument = GetInstrumentationArray(instrumentation);
1326 instrument += i * instrumentation->num_workers;
1328 Assert(ParallelWorkerNumber < instrumentation->num_workers);
1329 InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1330
1332 instrumentation);
1333}
1334
1335/*
1336 * Initialize the PlanState and its descendants with the information
1337 * retrieved from shared memory. This has to be done once the PlanState
1338 * is allocated and initialized by executor; that is, after ExecutorStart().
1339 */
1340static bool
1342{
1343 if (planstate == NULL)
1344 return false;
1345
1346 switch (nodeTag(planstate))
1347 {
1348 case T_SeqScanState:
1349 if (planstate->plan->parallel_aware)
1350 ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
1351 break;
1352 case T_IndexScanState:
1353 /* even when not parallel-aware, for EXPLAIN ANALYZE */
1354 ExecIndexScanInitializeWorker((IndexScanState *) planstate, pwcxt);
1355 break;
1356 case T_IndexOnlyScanState:
1357 /* even when not parallel-aware, for EXPLAIN ANALYZE */
1359 pwcxt);
1360 break;
1361 case T_BitmapIndexScanState:
1362 /* even when not parallel-aware, for EXPLAIN ANALYZE */
1364 pwcxt);
1365 break;
1366 case T_ForeignScanState:
1367 if (planstate->plan->parallel_aware)
1369 pwcxt);
1370 break;
1371 case T_AppendState:
1372 if (planstate->plan->parallel_aware)
1373 ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1374 break;
1375 case T_CustomScanState:
1376 if (planstate->plan->parallel_aware)
1378 pwcxt);
1379 break;
1380 case T_BitmapHeapScanState:
1381 if (planstate->plan->parallel_aware)
1383 pwcxt);
1384 break;
1385 case T_HashJoinState:
1386 if (planstate->plan->parallel_aware)
1388 pwcxt);
1389 break;
1390 case T_HashState:
1391 /* even when not parallel-aware, for EXPLAIN ANALYZE */
1392 ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1393 break;
1394 case T_SortState:
1395 /* even when not parallel-aware, for EXPLAIN ANALYZE */
1396 ExecSortInitializeWorker((SortState *) planstate, pwcxt);
1397 break;
1398 case T_IncrementalSortState:
1399 /* even when not parallel-aware, for EXPLAIN ANALYZE */
1401 pwcxt);
1402 break;
1403 case T_AggState:
1404 /* even when not parallel-aware, for EXPLAIN ANALYZE */
1405 ExecAggInitializeWorker((AggState *) planstate, pwcxt);
1406 break;
1407 case T_MemoizeState:
1408 /* even when not parallel-aware, for EXPLAIN ANALYZE */
1409 ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
1410 break;
1411 default:
1412 break;
1413 }
1414
1416 pwcxt);
1417}
1418
1419/*
1420 * Main entrypoint for parallel query worker processes.
1421 *
1422 * We reach this function from ParallelWorkerMain, so the setup necessary to
1423 * create a sensible parallel environment has already been done;
1424 * ParallelWorkerMain worries about stuff like the transaction state, combo
1425 * CID mappings, and GUC values, so we don't need to deal with any of that
1426 * here.
1427 *
1428 * Our job is to deal with concerns specific to the executor. The parallel
1429 * group leader will have stored a serialized PlannedStmt, and it's our job
1430 * to execute that plan and write the resulting tuples to the appropriate
1431 * tuple queue. Various bits of supporting information that we need in order
1432 * to do this are also stored in the dsm_segment and can be accessed through
1433 * the shm_toc.
1434 */
1435void
1437{
1439 BufferUsage *buffer_usage;
1440 WalUsage *wal_usage;
1441 DestReceiver *receiver;
1442 QueryDesc *queryDesc;
1443 SharedExecutorInstrumentation *instrumentation;
1444 SharedJitInstrumentation *jit_instrumentation;
1445 int instrument_options = 0;
1446 void *area_space;
1447 dsa_area *area;
1449
1450 /* Get fixed-size state. */
1451 fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1452
1453 /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
1454 receiver = ExecParallelGetReceiver(seg, toc);
1455 instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
1456 if (instrumentation != NULL)
1457 instrument_options = instrumentation->instrument_options;
1458 jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1459 true);
1460 queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1461
1462 /* Setting debug_query_string for individual workers */
1463 debug_query_string = queryDesc->sourceText;
1464
1465 /* Report workers' query for monitoring purposes */
1467
1468 /* Attach to the dynamic shared memory area. */
1469 area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
1470 area = dsa_attach_in_place(area_space, seg);
1471
1472 /* Start up the executor */
1473 queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
1474 if (!ExecutorStart(queryDesc, fpes->eflags))
1475 elog(ERROR, "ExecutorStart() failed unexpectedly");
1476
1477 /* Special executor initialization steps for parallel workers */
1478 queryDesc->planstate->state->es_query_dsa = area;
1479 if (DsaPointerIsValid(fpes->param_exec))
1480 {
1481 char *paramexec_space;
1482
1483 paramexec_space = dsa_get_address(area, fpes->param_exec);
1484 RestoreParamExecParams(paramexec_space, queryDesc->estate);
1485 }
1486 pwcxt.toc = toc;
1487 pwcxt.seg = seg;
1488 ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1489
1490 /* Pass down any tuple bound */
1491 ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1492
1493 /*
1494 * Prepare to track buffer/WAL usage during query execution.
1495 *
1496 * We do this after starting up the executor to match what happens in the
1497 * leader, which also doesn't count buffer accesses and WAL activity that
1498 * occur during executor startup.
1499 */
1501
1502 /*
1503 * Run the plan. If we specified a tuple bound, be careful not to demand
1504 * more tuples than that.
1505 */
1506 ExecutorRun(queryDesc,
1508 fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
1509
1510 /* Shut down the executor */
1511 ExecutorFinish(queryDesc);
1512
1513 /* Report buffer/WAL usage during parallel execution. */
1514 buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1515 wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
1517 &wal_usage[ParallelWorkerNumber]);
1518
1519 /* Report instrumentation data if any instrumentation options are set. */
1520 if (instrumentation != NULL)
1522 instrumentation);
1523
1524 /* Report JIT instrumentation data if any */
1525 if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1526 {
1527 Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1528 jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1529 queryDesc->estate->es_jit->instr;
1530 }
1531
1532 /* Must do this after capturing instrumentation. */
1533 ExecutorEnd(queryDesc);
1534
1535 /* Cleanup. */
1536 dsa_detach(area);
1537 FreeQueryDesc(queryDesc);
1538 receiver->rDestroy(receiver);
1539}
int ParallelWorkerNumber
Definition: parallel.c:115
void InitializeParallelDSM(ParallelContext *pcxt)
Definition: parallel.c:211
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
Definition: parallel.c:796
void ReinitializeParallelDSM(ParallelContext *pcxt)
Definition: parallel.c:508
void DestroyParallelContext(ParallelContext *pcxt)
Definition: parallel.c:950
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
Definition: parallel.c:173
uint64 pgstat_get_my_query_id(void)
uint64 pgstat_get_my_plan_id(void)
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1306
int bms_num_members(const Bitmapset *a)
Definition: bitmapset.c:751
#define bms_is_empty(a)
Definition: bitmapset.h:118
#define MAXALIGN(LEN)
Definition: c.h:782
int64_t int64
Definition: c.h:499
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:434
int16_t int16
Definition: c.h:497
#define OidIsValid(objectId)
Definition: c.h:746
size_t Size
Definition: c.h:576
Datum datumRestore(char **start_address, bool *isnull)
Definition: datum.c:521
void datumSerialize(Datum value, bool isnull, bool typByVal, int typLen, char **start_address)
Definition: datum.c:459
Size datumEstimateSpace(Datum value, bool isnull, bool typByVal, int typLen)
Definition: datum.c:412
dsa_area * dsa_attach_in_place(void *place, dsm_segment *segment)
Definition: dsa.c:545
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:942
void dsa_detach(dsa_area *area)
Definition: dsa.c:1952
void dsa_free(dsa_area *area, dsa_pointer dp)
Definition: dsa.c:826
size_t dsa_minimum_size(void)
Definition: dsa.c:1196
#define dsa_create_in_place(place, size, tranch_id, segment)
Definition: dsa.h:122
uint64 dsa_pointer
Definition: dsa.h:62
#define dsa_allocate(area, size)
Definition: dsa.h:109
#define InvalidDsaPointer
Definition: dsa.h:78
#define DsaPointerIsValid(x)
Definition: dsa.h:106
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
bool ExecutorStart(QueryDesc *queryDesc, int eflags)
Definition: execMain.c:128
void ExecutorEnd(QueryDesc *queryDesc)
Definition: execMain.c:538
void ExecutorFinish(QueryDesc *queryDesc)
Definition: execMain.c:475
void ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count)
Definition: execMain.c:365
#define PARALLEL_KEY_BUFFER_USAGE
Definition: execParallel.c:61
static bool ExecParallelReInitializeDSM(PlanState *planstate, ParallelContext *pcxt)
Definition: execParallel.c:965
#define PARALLEL_KEY_JIT_INSTRUMENTATION
Definition: execParallel.c:66
struct ExecParallelEstimateContext ExecParallelEstimateContext
#define PARALLEL_KEY_PARAMLISTINFO
Definition: execParallel.c:60
#define PARALLEL_TUPLE_QUEUE_SIZE
Definition: execParallel.c:69
static QueryDesc * ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, int instrument_options)
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation)
static dsa_pointer SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
Definition: execParallel.c:363
void ExecParallelCleanup(ParallelExecutorInfo *pei)
struct ExecParallelInitializeDSMContext ExecParallelInitializeDSMContext
#define PARALLEL_KEY_INSTRUMENTATION
Definition: execParallel.c:63
static DestReceiver * ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
void ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
static shm_mq_handle ** ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
Definition: execParallel.c:547
#define PARALLEL_KEY_PLANNEDSTMT
Definition: execParallel.c:59
static bool ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
Definition: execParallel.c:233
#define GetInstrumentationArray(sei)
Definition: execParallel.c:107
void ExecParallelReinitialize(PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParams)
Definition: execParallel.c:916
#define PARALLEL_KEY_DSA
Definition: execParallel.c:64
static bool ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
void ExecParallelCreateReaders(ParallelExecutorInfo *pei)
Definition: execParallel.c:890
#define PARALLEL_KEY_TUPLE_QUEUE
Definition: execParallel.c:62
#define PARALLEL_KEY_EXECUTOR_FIXED
Definition: execParallel.c:58
static char * ExecSerializePlan(Plan *plan, EState *estate)
Definition: execParallel.c:146
ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed)
Definition: execParallel.c:599
struct FixedParallelExecutorState FixedParallelExecutorState
#define PARALLEL_KEY_QUERY_TEXT
Definition: execParallel.c:65
static Size EstimateParamExecSpace(EState *estate, Bitmapset *params)
Definition: execParallel.c:319
void ExecParallelFinish(ParallelExecutorInfo *pei)
static bool ExecParallelReportInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation)
#define PARALLEL_KEY_WAL_USAGE
Definition: execParallel.c:67
static void ExecParallelRetrieveJitInstrumentation(PlanState *planstate, SharedJitInstrumentation *shared_jit)
static bool ExecParallelInitializeDSM(PlanState *planstate, ExecParallelInitializeDSMContext *d)
Definition: execParallel.c:447
static void RestoreParamExecParams(char *start_address, EState *estate)
Definition: execParallel.c:418
void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node)
Definition: execProcnode.c:848
#define GetPerTupleExprContext(estate)
Definition: executor.h:678
Assert(PointerIsAligned(start, uint64))
#define IsParallelWorker()
Definition: parallel.h:60
void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition: instrument.c:218
void InstrEndLoop(Instrumentation *instr)
Definition: instrument.c:140
void InstrAggNode(Instrumentation *dst, Instrumentation *add)
Definition: instrument.c:169
void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
Definition: instrument.c:208
void InstrStartParallelQuery(void)
Definition: instrument.c:200
void InstrInit(Instrumentation *instr, int instrument_options)
Definition: instrument.c:58
int i
Definition: isn.c:77
void InstrJitAgg(JitInstrumentation *dst, JitInstrumentation *add)
Definition: jit.c:182
struct JitInstrumentation JitInstrumentation
#define PGJIT_NONE
Definition: jit.h:19
List * lappend(List *list, void *datum)
Definition: list.c:339
void get_typlenbyval(Oid typid, int16 *typlen, bool *typbyval)
Definition: lsyscache.c:2391
@ LWTRANCHE_PARALLEL_QUERY_DSA
Definition: lwlock.h:200
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1260
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1294
void pfree(void *pointer)
Definition: mcxt.c:2150
void * palloc0(Size size)
Definition: mcxt.c:1973
void * palloc(Size size)
Definition: mcxt.c:1943
void ExecAggEstimate(AggState *node, ParallelContext *pcxt)
Definition: nodeAgg.c:4786
void ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)
Definition: nodeAgg.c:4832
void ExecAggRetrieveInstrumentation(AggState *node)
Definition: nodeAgg.c:4845
void ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
Definition: nodeAgg.c:4807
void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
Definition: nodeAppend.c:539
void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
Definition: nodeAppend.c:555
void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt)
Definition: nodeAppend.c:518
void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt)
Definition: nodeAppend.c:499
void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, ParallelWorkerContext *pwcxt)
void ExecBitmapHeapEstimate(BitmapHeapScanState *node, ParallelContext *pcxt)
void ExecBitmapHeapRetrieveInstrumentation(BitmapHeapScanState *node)
void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node, ParallelContext *pcxt)
void ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node, ParallelContext *pcxt)
void ExecBitmapIndexScanEstimate(BitmapIndexScanState *node, ParallelContext *pcxt)
void ExecBitmapIndexScanInitializeDSM(BitmapIndexScanState *node, ParallelContext *pcxt)
void ExecBitmapIndexScanRetrieveInstrumentation(BitmapIndexScanState *node)
void ExecBitmapIndexScanInitializeWorker(BitmapIndexScanState *node, ParallelWorkerContext *pwcxt)
void ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
Definition: nodeCustom.c:174
void ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt)
Definition: nodeCustom.c:161
void ExecCustomScanReInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
Definition: nodeCustom.c:190
void ExecCustomScanInitializeWorker(CustomScanState *node, ParallelWorkerContext *pwcxt)
Definition: nodeCustom.c:205
void ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
void ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
void ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
void ExecForeignScanInitializeWorker(ForeignScanState *node, ParallelWorkerContext *pwcxt)
#define planstate_tree_walker(ps, w, c)
Definition: nodeFuncs.h:179
void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
Definition: nodeHash.c:2774
void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
Definition: nodeHash.c:2799
void ExecHashEstimate(HashState *node, ParallelContext *pcxt)
Definition: nodeHash.c:2755
void ExecHashRetrieveInstrumentation(HashState *node)
Definition: nodeHash.c:2840
void ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
void ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
void ExecHashJoinInitializeWorker(HashJoinState *state, ParallelWorkerContext *pwcxt)
void ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt)
void ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt)
void ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node)
void ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pwcxt)
void ExecIndexOnlyScanEstimate(IndexOnlyScanState *node, ParallelContext *pcxt)
void ExecIndexOnlyScanRetrieveInstrumentation(IndexOnlyScanState *node)
void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, ParallelWorkerContext *pwcxt)
void ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node, ParallelContext *pcxt)
void ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node, ParallelContext *pcxt)
void ExecIndexScanRetrieveInstrumentation(IndexScanState *node)
void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt)
void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt)
void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt)
void ExecIndexScanInitializeWorker(IndexScanState *node, ParallelWorkerContext *pwcxt)
void ExecMemoizeInitializeDSM(MemoizeState *node, ParallelContext *pcxt)
Definition: nodeMemoize.c:1210
void ExecMemoizeEstimate(MemoizeState *node, ParallelContext *pcxt)
Definition: nodeMemoize.c:1189
void ExecMemoizeRetrieveInstrumentation(MemoizeState *node)
Definition: nodeMemoize.c:1248
void ExecMemoizeInitializeWorker(MemoizeState *node, ParallelWorkerContext *pwcxt)
Definition: nodeMemoize.c:1235
void ExecSeqScanReInitializeDSM(SeqScanState *node, ParallelContext *pcxt)
Definition: nodeSeqscan.c:383
void ExecSeqScanInitializeWorker(SeqScanState *node, ParallelWorkerContext *pwcxt)
Definition: nodeSeqscan.c:399
void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt)
Definition: nodeSeqscan.c:361
void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt)
Definition: nodeSeqscan.c:343
void ExecSortInitializeWorker(SortState *node, ParallelWorkerContext *pwcxt)
Definition: nodeSort.c:462
void ExecSortEstimate(SortState *node, ParallelContext *pcxt)
Definition: nodeSort.c:416
void ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt)
Definition: nodeSort.c:437
void ExecSortRetrieveInstrumentation(SortState *node)
Definition: nodeSort.c:476
void ExecSetParamPlanMulti(const Bitmapset *params, ExprContext *econtext)
Definition: nodeSubplan.c:1276
#define copyObject(obj)
Definition: nodes.h:230
#define nodeTag(nodeptr)
Definition: nodes.h:139
@ CMD_SELECT
Definition: nodes.h:271
#define makeNode(_type_)
Definition: nodes.h:161
char * nodeToString(const void *obj)
Definition: outfuncs.c:797
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
Size EstimateParamListSpace(ParamListInfo paramLI)
Definition: params.c:167
void SerializeParamList(ParamListInfo paramLI, char **start_address)
Definition: params.c:229
ParamListInfo RestoreParamList(char **start_address)
Definition: params.c:292
#define lfirst(lc)
Definition: pg_list.h:172
#define lfirst_node(type, lc)
Definition: pg_list.h:176
#define NIL
Definition: pg_list.h:68
static Oid list_nth_oid(const List *list, int n)
Definition: pg_list.h:321
#define plan(x)
Definition: pg_regress.c:161
const char * debug_query_string
Definition: postgres.c:88
uintptr_t Datum
Definition: postgres.h:69
unsigned int Oid
Definition: postgres_ext.h:30
void FreeQueryDesc(QueryDesc *qdesc)
Definition: pquery.c:112
QueryDesc * CreateQueryDesc(PlannedStmt *plannedstmt, CachedPlan *cplan, const char *sourceText, Snapshot snapshot, Snapshot crosscheck_snapshot, DestReceiver *dest, ParamListInfo params, QueryEnvironment *queryEnv, int instrument_options)
Definition: pquery.c:72
e
Definition: preproc-init.c:82
void * stringToNode(const char *str)
Definition: read.c:90
@ ForwardScanDirection
Definition: sdir.h:28
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:224
shm_mq * shm_mq_create(void *address, Size size)
Definition: shm_mq.c:177
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:319
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:843
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:206
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:290
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
Snapshot GetActiveSnapshot(void)
Definition: snapmgr.c:787
#define InvalidSnapshot
Definition: snapshot.h:119
PGPROC * MyProc
Definition: proc.c:67
List * es_part_prune_infos
Definition: execnodes.h:668
struct dsa_area * es_query_dsa
Definition: execnodes.h:751
int es_top_eflags
Definition: execnodes.h:717
struct JitContext * es_jit
Definition: execnodes.h:763
int es_instrument
Definition: execnodes.h:718
PlannedStmt * es_plannedstmt
Definition: execnodes.h:666
struct JitInstrumentation * es_jit_worker_instr
Definition: execnodes.h:764
ParamExecData * es_param_exec_vals
Definition: execnodes.h:703
List * es_range_table
Definition: execnodes.h:659
List * es_rteperminfos
Definition: execnodes.h:665
Bitmapset * es_unpruned_relids
Definition: execnodes.h:671
ParamListInfo es_param_list_info
Definition: execnodes.h:702
MemoryContext es_query_cxt
Definition: execnodes.h:708
int es_jit_flags
Definition: execnodes.h:762
const char * es_sourceText
Definition: execnodes.h:675
Snapshot es_snapshot
Definition: execnodes.h:657
ParallelContext * pcxt
Definition: execParallel.c:114
SharedExecutorInstrumentation * instrumentation
Definition: execParallel.c:122
JitInstrumentation instr
Definition: jit.h:62
dsm_segment * seg
Definition: parallel.h:42
shm_toc_estimator estimator
Definition: parallel.h:41
ParallelWorkerInfo * worker
Definition: parallel.h:45
shm_toc * toc
Definition: parallel.h:44
int nworkers_launched
Definition: parallel.h:37
PlanState * planstate
Definition: execParallel.h:26
struct SharedJitInstrumentation * jit_instrumentation
Definition: execParallel.h:31
BufferUsage * buffer_usage
Definition: execParallel.h:28
dsa_pointer param_exec
Definition: execParallel.h:33
ParallelContext * pcxt
Definition: execParallel.h:27
WalUsage * wal_usage
Definition: execParallel.h:29
shm_mq_handle ** tqueue
Definition: execParallel.h:36
SharedExecutorInstrumentation * instrumentation
Definition: execParallel.h:30
struct TupleQueueReader ** reader
Definition: execParallel.h:37
dsm_segment * seg
Definition: parallel.h:52
BackgroundWorkerHandle * bgwhandle
Definition: parallel.h:27
bool isnull
Definition: params.h:150
Datum value
Definition: params.h:149
void * execPlan
Definition: params.h:148
struct SharedJitInstrumentation * worker_jit_instrument
Definition: execnodes.h:1173
Instrumentation * instrument
Definition: execnodes.h:1169
Plan * plan
Definition: execnodes.h:1159
EState * state
Definition: execnodes.h:1161
WorkerInstrumentation * worker_instrument
Definition: execnodes.h:1170
bool parallel_aware
Definition: plannodes.h:193
bool parallel_safe
Definition: plannodes.h:195
int plan_node_id
Definition: plannodes.h:207
struct Plan * planTree
Definition: plannodes.h:83
bool hasModifyingCTE
Definition: plannodes.h:65
List * appendRelations
Definition: plannodes.h:116
uint64 planId
Definition: plannodes.h:59
List * permInfos
Definition: plannodes.h:102
bool canSetTag
Definition: plannodes.h:68
List * rowMarks
Definition: plannodes.h:127
int jitFlags
Definition: plannodes.h:80
Bitmapset * rewindPlanIDs
Definition: plannodes.h:124
ParseLoc stmt_len
Definition: plannodes.h:145
bool hasReturning
Definition: plannodes.h:62
ParseLoc stmt_location
Definition: plannodes.h:143
List * invalItems
Definition: plannodes.h:133
bool transientPlan
Definition: plannodes.h:71
List * resultRelations
Definition: plannodes.h:106
List * subplans
Definition: plannodes.h:121
List * relationOids
Definition: plannodes.h:130
bool dependsOnRole
Definition: plannodes.h:74
Bitmapset * unprunableRelids
Definition: plannodes.h:97
CmdType commandType
Definition: plannodes.h:53
Node * utilityStmt
Definition: plannodes.h:139
List * rtable
Definition: plannodes.h:91
List * partPruneInfos
Definition: plannodes.h:88
List * paramExecTypes
Definition: plannodes.h:136
bool parallelModeNeeded
Definition: plannodes.h:77
uint64 queryId
Definition: plannodes.h:56
const char * sourceText
Definition: execdesc.h:39
EState * estate
Definition: execdesc.h:49
PlannedStmt * plannedstmt
Definition: execdesc.h:37
PlanState * planstate
Definition: execdesc.h:50
int plan_node_id[FLEXIBLE_ARRAY_MEMBER]
Definition: execParallel.c:104
JitInstrumentation jit_instr[FLEXIBLE_ARRAY_MEMBER]
Definition: jit.h:54
Instrumentation instrument[FLEXIBLE_ARRAY_MEMBER]
Definition: instrument.h:99
void(* rDestroy)(DestReceiver *self)
Definition: dest.h:126
Definition: dsa.c:348
Definition: shm_mq.c:72
DestReceiver * CreateTupleQueueDestReceiver(shm_mq_handle *handle)
Definition: tqueue.c:119
TupleQueueReader * CreateTupleQueueReader(shm_mq_handle *handle)
Definition: tqueue.c:139
void DestroyTupleQueueReader(TupleQueueReader *reader)
Definition: tqueue.c:155