PostgreSQL Source Code  git master
execAsync.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * execAsync.c
4  * Support routines for asynchronous execution
5  *
6  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  * src/backend/executor/execAsync.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "executor/execAsync.h"
18 #include "executor/executor.h"
19 #include "executor/nodeAppend.h"
21 
22 /*
23  * Asynchronously request a tuple from a designed async-capable node.
24  */
25 void
27 {
28  if (areq->requestee->chgParam != NULL) /* something changed? */
29  ExecReScan(areq->requestee); /* let ReScan handle this */
30 
31  /* must provide our own instrumentation support */
32  if (areq->requestee->instrument)
34 
35  switch (nodeTag(areq->requestee))
36  {
37  case T_ForeignScanState:
39  break;
40  default:
41  /* If the node doesn't support async, caller messed up. */
42  elog(ERROR, "unrecognized node type: %d",
43  (int) nodeTag(areq->requestee));
44  }
45 
46  ExecAsyncResponse(areq);
47 
48  /* must provide our own instrumentation support */
49  if (areq->requestee->instrument)
51  TupIsNull(areq->result) ? 0.0 : 1.0);
52 }
53 
54 /*
55  * Give the asynchronous node a chance to configure the file descriptor event
56  * for which it wishes to wait. We expect the node-type specific callback to
57  * make a single call of the following form:
58  *
59  * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
60  */
61 void
63 {
64  /* must provide our own instrumentation support */
65  if (areq->requestee->instrument)
67 
68  switch (nodeTag(areq->requestee))
69  {
70  case T_ForeignScanState:
72  break;
73  default:
74  /* If the node doesn't support async, caller messed up. */
75  elog(ERROR, "unrecognized node type: %d",
76  (int) nodeTag(areq->requestee));
77  }
78 
79  /* must provide our own instrumentation support */
80  if (areq->requestee->instrument)
81  InstrStopNode(areq->requestee->instrument, 0.0);
82 }
83 
84 /*
85  * Call the asynchronous node back when a relevant event has occurred.
86  */
87 void
89 {
90  /* must provide our own instrumentation support */
91  if (areq->requestee->instrument)
93 
94  switch (nodeTag(areq->requestee))
95  {
96  case T_ForeignScanState:
98  break;
99  default:
100  /* If the node doesn't support async, caller messed up. */
101  elog(ERROR, "unrecognized node type: %d",
102  (int) nodeTag(areq->requestee));
103  }
104 
105  ExecAsyncResponse(areq);
106 
107  /* must provide our own instrumentation support */
108  if (areq->requestee->instrument)
110  TupIsNull(areq->result) ? 0.0 : 1.0);
111 }
112 
113 /*
114  * Call the requestor back when an asynchronous node has produced a result.
115  */
116 void
118 {
119  switch (nodeTag(areq->requestor))
120  {
121  case T_AppendState:
123  break;
124  default:
125  /* If the node doesn't support async, caller messed up. */
126  elog(ERROR, "unrecognized node type: %d",
127  (int) nodeTag(areq->requestor));
128  }
129 }
130 
131 /*
132  * A requestee node should call this function to deliver the tuple to its
133  * requestor node. The requestee node can call this from its ExecAsyncRequest
134  * or ExecAsyncNotify callback.
135  */
136 void
138 {
139  areq->request_complete = true;
140  areq->result = result;
141 }
142 
143 /*
144  * A requestee node should call this function to indicate that it is pending
145  * for a callback. The requestee node can call this from its ExecAsyncRequest
146  * or ExecAsyncNotify callback.
147  */
148 void
150 {
151  areq->callback_pending = true;
152  areq->request_complete = false;
153  areq->result = NULL;
154 }
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
void ExecAsyncResponse(AsyncRequest *areq)
Definition: execAsync.c:117
void ExecAsyncRequestPending(AsyncRequest *areq)
Definition: execAsync.c:149
void ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
Definition: execAsync.c:137
void ExecAsyncRequest(AsyncRequest *areq)
Definition: execAsync.c:26
void ExecAsyncConfigureWait(AsyncRequest *areq)
Definition: execAsync.c:62
void ExecAsyncNotify(AsyncRequest *areq)
Definition: execAsync.c:88
void InstrStartNode(Instrumentation *instr)
Definition: instrument.c:68
void InstrStopNode(Instrumentation *instr, double nTuples)
Definition: instrument.c:84
void ExecAsyncAppendResponse(AsyncRequest *areq)
Definition: nodeAppend.c:1105
void ExecAsyncForeignScanNotify(AsyncRequest *areq)
void ExecAsyncForeignScanConfigureWait(AsyncRequest *areq)
void ExecAsyncForeignScanRequest(AsyncRequest *areq)
#define nodeTag(nodeptr)
Definition: nodes.h:133
struct PlanState * requestor
Definition: execnodes.h:603
TupleTableSlot * result
Definition: execnodes.h:608
bool request_complete
Definition: execnodes.h:607
bool callback_pending
Definition: execnodes.h:606
struct PlanState * requestee
Definition: execnodes.h:604
Instrumentation * instrument
Definition: execnodes.h:1128
Bitmapset * chgParam
Definition: execnodes.h:1150
#define TupIsNull(slot)
Definition: tuptable.h:306