PostgreSQL Source Code  git master
tqueue.h File Reference
#include "storage/shm_mq.h"
#include "tcop/dest.h"
Include dependency graph for tqueue.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Typedefs

typedef struct TupleQueueReader TupleQueueReader
 

Functions

DestReceiverCreateTupleQueueDestReceiver (shm_mq_handle *handle)
 
TupleQueueReaderCreateTupleQueueReader (shm_mq_handle *handle)
 
void DestroyTupleQueueReader (TupleQueueReader *reader)
 
HeapTuple TupleQueueReaderNext (TupleQueueReader *reader, bool nowait, bool *done)
 

Typedef Documentation

◆ TupleQueueReader

Definition at line 21 of file tqueue.h.

Function Documentation

◆ CreateTupleQueueDestReceiver()

DestReceiver* CreateTupleQueueDestReceiver ( shm_mq_handle handle)

Definition at line 119 of file tqueue.c.

References DestTupleQueue, palloc0(), tqueueDestroyReceiver(), tqueueReceiveSlot(), tqueueShutdownReceiver(), and tqueueStartupReceiver().

Referenced by CreateDestReceiver(), and ExecParallelGetReceiver().

120 {
121  TQueueDestReceiver *self;
122 
123  self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
124 
125  self->pub.receiveSlot = tqueueReceiveSlot;
126  self->pub.rStartup = tqueueStartupReceiver;
127  self->pub.rShutdown = tqueueShutdownReceiver;
128  self->pub.rDestroy = tqueueDestroyReceiver;
129  self->pub.mydest = DestTupleQueue;
130  self->queue = handle;
131 
132  return (DestReceiver *) self;
133 }
static void tqueueDestroyReceiver(DestReceiver *self)
Definition: tqueue.c:105
static void tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
Definition: tqueue.c:83
static bool tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
Definition: tqueue.c:54
void * palloc0(Size size)
Definition: mcxt.c:980
static void tqueueShutdownReceiver(DestReceiver *self)
Definition: tqueue.c:92

◆ CreateTupleQueueReader()

TupleQueueReader* CreateTupleQueueReader ( shm_mq_handle handle)

Definition at line 139 of file tqueue.c.

References palloc0(), and TupleQueueReader::queue.

Referenced by ExecParallelCreateReaders().

140 {
141  TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
142 
143  reader->queue = handle;
144 
145  return reader;
146 }
shm_mq_handle * queue
Definition: tqueue.c:45
void * palloc0(Size size)
Definition: mcxt.c:980

◆ DestroyTupleQueueReader()

void DestroyTupleQueueReader ( TupleQueueReader reader)

Definition at line 155 of file tqueue.c.

References pfree().

Referenced by ExecParallelFinish().

156 {
157  pfree(reader);
158 }
void pfree(void *pointer)
Definition: mcxt.c:1056

◆ TupleQueueReaderNext()

HeapTuple TupleQueueReaderNext ( TupleQueueReader reader,
bool  nowait,
bool done 
)

Definition at line 176 of file tqueue.c.

References Assert, heap_copytuple(), InvalidOid, ItemPointerSetInvalid, TupleQueueReader::queue, SHM_MQ_DETACHED, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, HeapTupleData::t_data, HeapTupleData::t_len, HeapTupleData::t_self, and HeapTupleData::t_tableOid.

Referenced by gather_readnext(), and gm_readnext_tuple().

177 {
178  HeapTupleData htup;
179  shm_mq_result result;
180  Size nbytes;
181  void *data;
182 
183  if (done != NULL)
184  *done = false;
185 
186  /* Attempt to read a message. */
187  result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
188 
189  /* If queue is detached, set *done and return NULL. */
190  if (result == SHM_MQ_DETACHED)
191  {
192  if (done != NULL)
193  *done = true;
194  return NULL;
195  }
196 
197  /* In non-blocking mode, bail out if no message ready yet. */
198  if (result == SHM_MQ_WOULD_BLOCK)
199  return NULL;
200  Assert(result == SHM_MQ_SUCCESS);
201 
202  /*
203  * Set up a dummy HeapTupleData pointing to the data from the shm_mq
204  * (which had better be sufficiently aligned).
205  */
207  htup.t_tableOid = InvalidOid;
208  htup.t_len = nbytes;
209  htup.t_data = data;
210 
211  return heap_copytuple(&htup);
212 }
HeapTuple heap_copytuple(HeapTuple tuple)
Definition: heaptuple.c:680
shm_mq_handle * queue
Definition: tqueue.c:45
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData t_self
Definition: htup.h:65
uint32 t_len
Definition: htup.h:64
Oid t_tableOid
Definition: htup.h:66
#define InvalidOid
Definition: postgres_ext.h:36
shm_mq_result
Definition: shm_mq.h:36
#define Assert(condition)
Definition: c.h:739
size_t Size
Definition: c.h:467
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:172
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:540