PostgreSQL Source Code git master
tqueue.c File Reference
#include "postgres.h"
#include "access/htup_details.h"
#include "executor/tqueue.h"
Include dependency graph for tqueue.c:

Go to the source code of this file.

Data Structures

struct  TQueueDestReceiver
 
struct  TupleQueueReader
 

Typedefs

typedef struct TQueueDestReceiver TQueueDestReceiver
 

Functions

static bool tqueueReceiveSlot (TupleTableSlot *slot, DestReceiver *self)
 
static void tqueueStartupReceiver (DestReceiver *self, int operation, TupleDesc typeinfo)
 
static void tqueueShutdownReceiver (DestReceiver *self)
 
static void tqueueDestroyReceiver (DestReceiver *self)
 
DestReceiverCreateTupleQueueDestReceiver (shm_mq_handle *handle)
 
TupleQueueReaderCreateTupleQueueReader (shm_mq_handle *handle)
 
void DestroyTupleQueueReader (TupleQueueReader *reader)
 
MinimalTuple TupleQueueReaderNext (TupleQueueReader *reader, bool nowait, bool *done)
 

Typedef Documentation

◆ TQueueDestReceiver

Function Documentation

◆ CreateTupleQueueDestReceiver()

DestReceiver * CreateTupleQueueDestReceiver ( shm_mq_handle handle)

Definition at line 119 of file tqueue.c.

120{
121 TQueueDestReceiver *self;
122
124
129 self->pub.mydest = DestTupleQueue;
130 self->queue = handle;
131
132 return (DestReceiver *) self;
133}
@ DestTupleQueue
Definition: dest.h:98
void * palloc0(Size size)
Definition: mcxt.c:1347
DestReceiver pub
Definition: tqueue.c:32
shm_mq_handle * queue
Definition: tqueue.c:33
void(* rStartup)(DestReceiver *self, int operation, TupleDesc typeinfo)
Definition: dest.h:121
void(* rShutdown)(DestReceiver *self)
Definition: dest.h:124
bool(* receiveSlot)(TupleTableSlot *slot, DestReceiver *self)
Definition: dest.h:118
void(* rDestroy)(DestReceiver *self)
Definition: dest.h:126
CommandDest mydest
Definition: dest.h:128
static void tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
Definition: tqueue.c:83
static void tqueueShutdownReceiver(DestReceiver *self)
Definition: tqueue.c:92
static bool tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
Definition: tqueue.c:54
static void tqueueDestroyReceiver(DestReceiver *self)
Definition: tqueue.c:105

References DestTupleQueue, _DestReceiver::mydest, palloc0(), TQueueDestReceiver::pub, TQueueDestReceiver::queue, _DestReceiver::rDestroy, _DestReceiver::receiveSlot, _DestReceiver::rShutdown, _DestReceiver::rStartup, tqueueDestroyReceiver(), tqueueReceiveSlot(), tqueueShutdownReceiver(), and tqueueStartupReceiver().

Referenced by CreateDestReceiver(), and ExecParallelGetReceiver().

◆ CreateTupleQueueReader()

TupleQueueReader * CreateTupleQueueReader ( shm_mq_handle handle)

Definition at line 139 of file tqueue.c.

140{
141 TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
142
143 reader->queue = handle;
144
145 return reader;
146}
shm_mq_handle * queue
Definition: tqueue.c:45

References palloc0(), and TupleQueueReader::queue.

Referenced by ExecParallelCreateReaders().

◆ DestroyTupleQueueReader()

void DestroyTupleQueueReader ( TupleQueueReader reader)

Definition at line 155 of file tqueue.c.

156{
157 pfree(reader);
158}
void pfree(void *pointer)
Definition: mcxt.c:1521

References pfree().

Referenced by ExecParallelFinish().

◆ tqueueDestroyReceiver()

static void tqueueDestroyReceiver ( DestReceiver self)
static

Definition at line 105 of file tqueue.c.

106{
107 TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
108
109 /* We probably already detached from queue, but let's be sure */
110 if (tqueue->queue != NULL)
111 shm_mq_detach(tqueue->queue);
112 pfree(self);
113}
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:843

References pfree(), TQueueDestReceiver::queue, and shm_mq_detach().

Referenced by CreateTupleQueueDestReceiver().

◆ tqueueReceiveSlot()

static bool tqueueReceiveSlot ( TupleTableSlot slot,
DestReceiver self 
)
static

Definition at line 54 of file tqueue.c.

55{
56 TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
57 MinimalTuple tuple;
58 shm_mq_result result;
59 bool should_free;
60
61 /* Send the tuple itself. */
62 tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
63 result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false, false);
64
65 if (should_free)
66 pfree(tuple);
67
68 /* Check for failure. */
69 if (result == SHM_MQ_DETACHED)
70 return false;
71 else if (result != SHM_MQ_SUCCESS)
73 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
74 errmsg("could not send tuple to shared-memory queue")));
75
76 return true;
77}
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1879
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition: shm_mq.c:329
shm_mq_result
Definition: shm_mq.h:37
@ SHM_MQ_SUCCESS
Definition: shm_mq.h:38
@ SHM_MQ_DETACHED
Definition: shm_mq.h:40

References ereport, errcode(), errmsg(), ERROR, ExecFetchSlotMinimalTuple(), pfree(), TQueueDestReceiver::queue, SHM_MQ_DETACHED, shm_mq_send(), SHM_MQ_SUCCESS, and MinimalTupleData::t_len.

Referenced by CreateTupleQueueDestReceiver().

◆ tqueueShutdownReceiver()

static void tqueueShutdownReceiver ( DestReceiver self)
static

Definition at line 92 of file tqueue.c.

93{
94 TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
95
96 if (tqueue->queue != NULL)
97 shm_mq_detach(tqueue->queue);
98 tqueue->queue = NULL;
99}

References TQueueDestReceiver::queue, and shm_mq_detach().

Referenced by CreateTupleQueueDestReceiver().

◆ tqueueStartupReceiver()

static void tqueueStartupReceiver ( DestReceiver self,
int  operation,
TupleDesc  typeinfo 
)
static

Definition at line 83 of file tqueue.c.

84{
85 /* do nothing */
86}

Referenced by CreateTupleQueueDestReceiver().

◆ TupleQueueReaderNext()

MinimalTuple TupleQueueReaderNext ( TupleQueueReader reader,
bool  nowait,
bool *  done 
)

Definition at line 176 of file tqueue.c.

177{
178 MinimalTuple tuple;
179 shm_mq_result result;
180 Size nbytes;
181 void *data;
182
183 if (done != NULL)
184 *done = false;
185
186 /* Attempt to read a message. */
187 result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
188
189 /* If queue is detached, set *done and return NULL. */
190 if (result == SHM_MQ_DETACHED)
191 {
192 if (done != NULL)
193 *done = true;
194 return NULL;
195 }
196
197 /* In non-blocking mode, bail out if no message ready yet. */
198 if (result == SHM_MQ_WOULD_BLOCK)
199 return NULL;
200 Assert(result == SHM_MQ_SUCCESS);
201
202 /*
203 * Return a pointer to the queue memory directly (which had better be
204 * sufficiently aligned).
205 */
206 tuple = (MinimalTuple) data;
207 Assert(tuple->t_len == nbytes);
208
209 return tuple;
210}
#define Assert(condition)
Definition: c.h:815
size_t Size
Definition: c.h:562
MinimalTupleData * MinimalTuple
Definition: htup.h:27
const void * data
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:572
@ SHM_MQ_WOULD_BLOCK
Definition: shm_mq.h:39

References Assert, data, TupleQueueReader::queue, SHM_MQ_DETACHED, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, and MinimalTupleData::t_len.

Referenced by gather_readnext(), and gm_readnext_tuple().