PostgreSQL Source Code  git master
tqueue.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * tqueue.c
4  * Use shm_mq to send & receive tuples between parallel backends
5  *
6  * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
7  * under the hood, writes tuples from the executor to a shm_mq.
8  *
9  * A TupleQueueReader reads tuples from a shm_mq and returns the tuples.
10  *
11  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  * IDENTIFICATION
15  * src/backend/executor/tqueue.c
16  *
17  *-------------------------------------------------------------------------
18  */
19 
20 #include "postgres.h"
21 
22 #include "access/htup_details.h"
23 #include "executor/tqueue.h"
24 
25 /*
26  * DestReceiver object's private contents
27  *
28  * queue is a pointer to data supplied by DestReceiver's caller.
29  */
30 typedef struct TQueueDestReceiver
31 {
32  DestReceiver pub; /* public fields */
33  shm_mq_handle *queue; /* shm_mq to send to */
35 
36 /*
37  * TupleQueueReader object's private contents
38  *
39  * queue is a pointer to data supplied by reader's caller.
40  *
41  * "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h
42  */
44 {
45  shm_mq_handle *queue; /* shm_mq to receive from */
46 };
47 
48 /*
49  * Receive a tuple from a query, and send it to the designated shm_mq.
50  *
51  * Returns true if successful, false if shm_mq has been detached.
52  */
53 static bool
55 {
56  TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
57  HeapTuple tuple;
58  shm_mq_result result;
59  bool should_free;
60 
61  /* Send the tuple itself. */
62  tuple = ExecFetchSlotHeapTuple(slot, true, &should_free);
63  result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false);
64 
65  if (should_free)
66  heap_freetuple(tuple);
67 
68  /* Check for failure. */
69  if (result == SHM_MQ_DETACHED)
70  return false;
71  else if (result != SHM_MQ_SUCCESS)
72  ereport(ERROR,
73  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
74  errmsg("could not send tuple to shared-memory queue")));
75 
76  return true;
77 }
78 
79 /*
80  * Prepare to receive tuples from executor.
81  */
82 static void
83 tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
84 {
85  /* do nothing */
86 }
87 
88 /*
89  * Clean up at end of an executor run
90  */
91 static void
93 {
94  TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
95 
96  if (tqueue->queue != NULL)
97  shm_mq_detach(tqueue->queue);
98  tqueue->queue = NULL;
99 }
100 
101 /*
102  * Destroy receiver when done with it
103  */
104 static void
106 {
107  TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
108 
109  /* We probably already detached from queue, but let's be sure */
110  if (tqueue->queue != NULL)
111  shm_mq_detach(tqueue->queue);
112  pfree(self);
113 }
114 
115 /*
116  * Create a DestReceiver that writes tuples to a tuple queue.
117  */
118 DestReceiver *
120 {
121  TQueueDestReceiver *self;
122 
123  self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
124 
125  self->pub.receiveSlot = tqueueReceiveSlot;
126  self->pub.rStartup = tqueueStartupReceiver;
127  self->pub.rShutdown = tqueueShutdownReceiver;
128  self->pub.rDestroy = tqueueDestroyReceiver;
129  self->pub.mydest = DestTupleQueue;
130  self->queue = handle;
131 
132  return (DestReceiver *) self;
133 }
134 
135 /*
136  * Create a tuple queue reader.
137  */
140 {
141  TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
142 
143  reader->queue = handle;
144 
145  return reader;
146 }
147 
148 /*
149  * Destroy a tuple queue reader.
150  *
151  * Note: cleaning up the underlying shm_mq is the caller's responsibility.
152  * We won't access it here, as it may be detached already.
153  */
154 void
156 {
157  pfree(reader);
158 }
159 
160 /*
161  * Fetch a tuple from a tuple queue reader.
162  *
163  * The return value is NULL if there are no remaining tuples or if
164  * nowait = true and no tuple is ready to return. *done, if not NULL,
165  * is set to true when there are no remaining tuples and otherwise to false.
166  *
167  * The returned tuple, if any, is allocated in CurrentMemoryContext.
168  * Note that this routine must not leak memory! (We used to allow that,
169  * but not any more.)
170  *
171  * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
172  * accumulate bytes from a partially-read message, so it's useful to call
173  * this with nowait = true even if nothing is returned.
174  */
175 HeapTuple
176 TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
177 {
178  HeapTupleData htup;
179  shm_mq_result result;
180  Size nbytes;
181  void *data;
182 
183  if (done != NULL)
184  *done = false;
185 
186  /* Attempt to read a message. */
187  result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
188 
189  /* If queue is detached, set *done and return NULL. */
190  if (result == SHM_MQ_DETACHED)
191  {
192  if (done != NULL)
193  *done = true;
194  return NULL;
195  }
196 
197  /* In non-blocking mode, bail out if no message ready yet. */
198  if (result == SHM_MQ_WOULD_BLOCK)
199  return NULL;
200  Assert(result == SHM_MQ_SUCCESS);
201 
202  /*
203  * Set up a dummy HeapTupleData pointing to the data from the shm_mq
204  * (which had better be sufficiently aligned).
205  */
207  htup.t_tableOid = InvalidOid;
208  htup.t_len = nbytes;
209  htup.t_data = data;
210 
211  return heap_copytuple(&htup);
212 }
HeapTuple heap_copytuple(HeapTuple tuple)
Definition: heaptuple.c:680
static void tqueueDestroyReceiver(DestReceiver *self)
Definition: tqueue.c:105
void DestroyTupleQueueReader(TupleQueueReader *reader)
Definition: tqueue.c:155
static void tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
Definition: tqueue.c:83
shm_mq_handle * queue
Definition: tqueue.c:45
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:793
HeapTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Definition: tqueue.c:176
int errcode(int sqlerrcode)
Definition: elog.c:608
static bool tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
Definition: tqueue.c:54
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
HeapTupleHeader t_data
Definition: htup.h:68
void pfree(void *pointer)
Definition: mcxt.c:1056
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
uint32 t_len
Definition: htup.h:64
Oid t_tableOid
Definition: htup.h:66
#define ereport(elevel, rest)
Definition: elog.h:141
HeapTuple ExecFetchSlotHeapTuple(TupleTableSlot *slot, bool materialize, bool *shouldFree)
Definition: execTuples.c:1614
TupleQueueReader * CreateTupleQueueReader(shm_mq_handle *handle)
Definition: tqueue.c:139
void * palloc0(Size size)
Definition: mcxt.c:980
#define InvalidOid
Definition: postgres_ext.h:36
shm_mq_result
Definition: shm_mq.h:36
shm_mq_handle * queue
Definition: tqueue.c:33
#define Assert(condition)
Definition: c.h:739
size_t Size
Definition: c.h:467
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
Definition: shm_mq.c:320
static void tqueueShutdownReceiver(DestReceiver *self)
Definition: tqueue.c:92
struct TQueueDestReceiver TQueueDestReceiver
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:172
int errmsg(const char *fmt,...)
Definition: elog.c:822
DestReceiver pub
Definition: tqueue.c:32
DestReceiver * CreateTupleQueueDestReceiver(shm_mq_handle *handle)
Definition: tqueue.c:119
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:540