PostgreSQL Source Code git master
tqueue.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * tqueue.c
4 * Use shm_mq to send & receive tuples between parallel backends
5 *
6 * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
7 * under the hood, writes tuples from the executor to a shm_mq.
8 *
9 * A TupleQueueReader reads tuples from a shm_mq and returns the tuples.
10 *
11 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
13 *
14 * IDENTIFICATION
15 * src/backend/executor/tqueue.c
16 *
17 *-------------------------------------------------------------------------
18 */
19
20#include "postgres.h"
21
22#include "access/htup_details.h"
23#include "executor/tqueue.h"
24
25/*
26 * DestReceiver object's private contents
27 *
28 * queue is a pointer to data supplied by DestReceiver's caller.
29 */
30typedef struct TQueueDestReceiver
31{
32 DestReceiver pub; /* public fields */
33 shm_mq_handle *queue; /* shm_mq to send to */
35
36/*
37 * TupleQueueReader object's private contents
38 *
39 * queue is a pointer to data supplied by reader's caller.
40 *
41 * "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h
42 */
44{
45 shm_mq_handle *queue; /* shm_mq to receive from */
46};
47
48/*
49 * Receive a tuple from a query, and send it to the designated shm_mq.
50 *
51 * Returns true if successful, false if shm_mq has been detached.
52 */
53static bool
55{
56 TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
57 MinimalTuple tuple;
58 shm_mq_result result;
59 bool should_free;
60
61 /* Send the tuple itself. */
62 tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
63 result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false, false);
64
65 if (should_free)
66 pfree(tuple);
67
68 /* Check for failure. */
69 if (result == SHM_MQ_DETACHED)
70 return false;
71 else if (result != SHM_MQ_SUCCESS)
73 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
74 errmsg("could not send tuple to shared-memory queue")));
75
76 return true;
77}
78
79/*
80 * Prepare to receive tuples from executor.
81 */
82static void
83tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
84{
85 /* do nothing */
86}
87
88/*
89 * Clean up at end of an executor run
90 */
91static void
93{
94 TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
95
96 if (tqueue->queue != NULL)
97 shm_mq_detach(tqueue->queue);
98 tqueue->queue = NULL;
99}
100
101/*
102 * Destroy receiver when done with it
103 */
104static void
106{
107 TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
108
109 /* We probably already detached from queue, but let's be sure */
110 if (tqueue->queue != NULL)
111 shm_mq_detach(tqueue->queue);
112 pfree(self);
113}
114
115/*
116 * Create a DestReceiver that writes tuples to a tuple queue.
117 */
120{
121 TQueueDestReceiver *self;
122
124
129 self->pub.mydest = DestTupleQueue;
130 self->queue = handle;
131
132 return (DestReceiver *) self;
133}
134
135/*
136 * Create a tuple queue reader.
137 */
140{
141 TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
142
143 reader->queue = handle;
144
145 return reader;
146}
147
148/*
149 * Destroy a tuple queue reader.
150 *
151 * Note: cleaning up the underlying shm_mq is the caller's responsibility.
152 * We won't access it here, as it may be detached already.
153 */
154void
156{
157 pfree(reader);
158}
159
160/*
161 * Fetch a tuple from a tuple queue reader.
162 *
163 * The return value is NULL if there are no remaining tuples or if
164 * nowait = true and no tuple is ready to return. *done, if not NULL,
165 * is set to true when there are no remaining tuples and otherwise to false.
166 *
167 * The returned tuple, if any, is either in shared memory or a private buffer
168 * and should not be freed. The pointer is invalid after the next call to
169 * TupleQueueReaderNext().
170 *
171 * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
172 * accumulate bytes from a partially-read message, so it's useful to call
173 * this with nowait = true even if nothing is returned.
174 */
176TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
177{
178 MinimalTuple tuple;
179 shm_mq_result result;
180 Size nbytes;
181 void *data;
182
183 if (done != NULL)
184 *done = false;
185
186 /* Attempt to read a message. */
187 result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
188
189 /* If queue is detached, set *done and return NULL. */
190 if (result == SHM_MQ_DETACHED)
191 {
192 if (done != NULL)
193 *done = true;
194 return NULL;
195 }
196
197 /* In non-blocking mode, bail out if no message ready yet. */
198 if (result == SHM_MQ_WOULD_BLOCK)
199 return NULL;
200 Assert(result == SHM_MQ_SUCCESS);
201
202 /*
203 * Return a pointer to the queue memory directly (which had better be
204 * sufficiently aligned).
205 */
206 tuple = (MinimalTuple) data;
207 Assert(tuple->t_len == nbytes);
208
209 return tuple;
210}
size_t Size
Definition: c.h:576
@ DestTupleQueue
Definition: dest.h:98
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1879
Assert(PointerIsAligned(start, uint64))
MinimalTupleData * MinimalTuple
Definition: htup.h:27
void pfree(void *pointer)
Definition: mcxt.c:1524
void * palloc0(Size size)
Definition: mcxt.c:1347
const void * data
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:843
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:572
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition: shm_mq.c:329
shm_mq_result
Definition: shm_mq.h:37
@ SHM_MQ_SUCCESS
Definition: shm_mq.h:38
@ SHM_MQ_WOULD_BLOCK
Definition: shm_mq.h:39
@ SHM_MQ_DETACHED
Definition: shm_mq.h:40
DestReceiver pub
Definition: tqueue.c:32
shm_mq_handle * queue
Definition: tqueue.c:33
shm_mq_handle * queue
Definition: tqueue.c:45
void(* rStartup)(DestReceiver *self, int operation, TupleDesc typeinfo)
Definition: dest.h:121
void(* rShutdown)(DestReceiver *self)
Definition: dest.h:124
bool(* receiveSlot)(TupleTableSlot *slot, DestReceiver *self)
Definition: dest.h:118
void(* rDestroy)(DestReceiver *self)
Definition: dest.h:126
CommandDest mydest
Definition: dest.h:128
struct TQueueDestReceiver TQueueDestReceiver
static void tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
Definition: tqueue.c:83
static void tqueueShutdownReceiver(DestReceiver *self)
Definition: tqueue.c:92
MinimalTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Definition: tqueue.c:176
DestReceiver * CreateTupleQueueDestReceiver(shm_mq_handle *handle)
Definition: tqueue.c:119
static bool tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
Definition: tqueue.c:54
TupleQueueReader * CreateTupleQueueReader(shm_mq_handle *handle)
Definition: tqueue.c:139
void DestroyTupleQueueReader(TupleQueueReader *reader)
Definition: tqueue.c:155
static void tqueueDestroyReceiver(DestReceiver *self)
Definition: tqueue.c:105