PostgreSQL Source Code  git master
tqueue.c File Reference
#include "postgres.h"
#include "access/htup_details.h"
#include "executor/tqueue.h"
Include dependency graph for tqueue.c:

Go to the source code of this file.

Data Structures

struct  TQueueDestReceiver
 
struct  TupleQueueReader
 

Typedefs

typedef struct TQueueDestReceiver TQueueDestReceiver
 

Functions

static bool tqueueReceiveSlot (TupleTableSlot *slot, DestReceiver *self)
 
static void tqueueStartupReceiver (DestReceiver *self, int operation, TupleDesc typeinfo)
 
static void tqueueShutdownReceiver (DestReceiver *self)
 
static void tqueueDestroyReceiver (DestReceiver *self)
 
DestReceiverCreateTupleQueueDestReceiver (shm_mq_handle *handle)
 
TupleQueueReaderCreateTupleQueueReader (shm_mq_handle *handle)
 
void DestroyTupleQueueReader (TupleQueueReader *reader)
 
HeapTuple TupleQueueReaderNext (TupleQueueReader *reader, bool nowait, bool *done)
 

Typedef Documentation

◆ TQueueDestReceiver

Function Documentation

◆ CreateTupleQueueDestReceiver()

DestReceiver* CreateTupleQueueDestReceiver ( shm_mq_handle handle)

Definition at line 115 of file tqueue.c.

References DestTupleQueue, palloc0(), tqueueDestroyReceiver(), tqueueReceiveSlot(), tqueueShutdownReceiver(), and tqueueStartupReceiver().

Referenced by CreateDestReceiver(), and ExecParallelGetReceiver().

116 {
117  TQueueDestReceiver *self;
118 
119  self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
120 
121  self->pub.receiveSlot = tqueueReceiveSlot;
122  self->pub.rStartup = tqueueStartupReceiver;
123  self->pub.rShutdown = tqueueShutdownReceiver;
124  self->pub.rDestroy = tqueueDestroyReceiver;
125  self->pub.mydest = DestTupleQueue;
126  self->queue = handle;
127 
128  return (DestReceiver *) self;
129 }
static void tqueueDestroyReceiver(DestReceiver *self)
Definition: tqueue.c:101
static void tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
Definition: tqueue.c:79
static bool tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
Definition: tqueue.c:54
void * palloc0(Size size)
Definition: mcxt.c:877
static void tqueueShutdownReceiver(DestReceiver *self)
Definition: tqueue.c:88

◆ CreateTupleQueueReader()

TupleQueueReader* CreateTupleQueueReader ( shm_mq_handle handle)

Definition at line 135 of file tqueue.c.

References palloc0(), and TupleQueueReader::queue.

Referenced by ExecParallelCreateReaders().

136 {
137  TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
138 
139  reader->queue = handle;
140 
141  return reader;
142 }
shm_mq_handle * queue
Definition: tqueue.c:45
void * palloc0(Size size)
Definition: mcxt.c:877

◆ DestroyTupleQueueReader()

void DestroyTupleQueueReader ( TupleQueueReader reader)

Definition at line 151 of file tqueue.c.

References pfree().

Referenced by ExecParallelFinish().

152 {
153  pfree(reader);
154 }
void pfree(void *pointer)
Definition: mcxt.c:949

◆ tqueueDestroyReceiver()

static void tqueueDestroyReceiver ( DestReceiver self)
static

Definition at line 101 of file tqueue.c.

References pfree(), TQueueDestReceiver::queue, and shm_mq_detach().

Referenced by CreateTupleQueueDestReceiver().

102 {
103  TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
104 
105  /* We probably already detached from queue, but let's be sure */
106  if (tqueue->queue != NULL)
107  shm_mq_detach(tqueue->queue);
108  pfree(self);
109 }
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:775
void pfree(void *pointer)
Definition: mcxt.c:949
shm_mq_handle * queue
Definition: tqueue.c:33

◆ tqueueReceiveSlot()

static bool tqueueReceiveSlot ( TupleTableSlot slot,
DestReceiver self 
)
static

Definition at line 54 of file tqueue.c.

References ereport, errcode(), errmsg(), ERROR, ExecMaterializeSlot(), TQueueDestReceiver::queue, SHM_MQ_DETACHED, shm_mq_send(), SHM_MQ_SUCCESS, HeapTupleData::t_data, and HeapTupleData::t_len.

Referenced by CreateTupleQueueDestReceiver().

55 {
56  TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
57  HeapTuple tuple;
58  shm_mq_result result;
59 
60  /* Send the tuple itself. */
61  tuple = ExecMaterializeSlot(slot);
62  result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false);
63 
64  /* Check for failure. */
65  if (result == SHM_MQ_DETACHED)
66  return false;
67  else if (result != SHM_MQ_SUCCESS)
68  ereport(ERROR,
69  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
70  errmsg("could not send tuple to shared-memory queue")));
71 
72  return true;
73 }
int errcode(int sqlerrcode)
Definition: elog.c:575
HeapTupleHeader t_data
Definition: htup.h:67
#define ERROR
Definition: elog.h:43
uint32 t_len
Definition: htup.h:64
#define ereport(elevel, rest)
Definition: elog.h:122
shm_mq_result
Definition: shm_mq.h:36
shm_mq_handle * queue
Definition: tqueue.c:33
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
Definition: shm_mq.c:325
HeapTuple ExecMaterializeSlot(TupleTableSlot *slot)
Definition: execTuples.c:725
int errmsg(const char *fmt,...)
Definition: elog.c:797

◆ tqueueShutdownReceiver()

static void tqueueShutdownReceiver ( DestReceiver self)
static

Definition at line 88 of file tqueue.c.

References TQueueDestReceiver::queue, and shm_mq_detach().

Referenced by CreateTupleQueueDestReceiver().

89 {
90  TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
91 
92  if (tqueue->queue != NULL)
93  shm_mq_detach(tqueue->queue);
94  tqueue->queue = NULL;
95 }
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:775
shm_mq_handle * queue
Definition: tqueue.c:33

◆ tqueueStartupReceiver()

static void tqueueStartupReceiver ( DestReceiver self,
int  operation,
TupleDesc  typeinfo 
)
static

Definition at line 79 of file tqueue.c.

Referenced by CreateTupleQueueDestReceiver().

80 {
81  /* do nothing */
82 }

◆ TupleQueueReaderNext()

HeapTuple TupleQueueReaderNext ( TupleQueueReader reader,
bool  nowait,
bool done 
)

Definition at line 170 of file tqueue.c.

References Assert, heap_copytuple(), InvalidOid, ItemPointerSetInvalid, TupleQueueReader::queue, SHM_MQ_DETACHED, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, HeapTupleData::t_data, HeapTupleData::t_len, HeapTupleData::t_self, and HeapTupleData::t_tableOid.

Referenced by gather_readnext(), and gm_readnext_tuple().

171 {
172  HeapTupleData htup;
173  shm_mq_result result;
174  Size nbytes;
175  void *data;
176 
177  if (done != NULL)
178  *done = false;
179 
180  /* Attempt to read a message. */
181  result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
182 
183  /* If queue is detached, set *done and return NULL. */
184  if (result == SHM_MQ_DETACHED)
185  {
186  if (done != NULL)
187  *done = true;
188  return NULL;
189  }
190 
191  /* In non-blocking mode, bail out if no message ready yet. */
192  if (result == SHM_MQ_WOULD_BLOCK)
193  return NULL;
194  Assert(result == SHM_MQ_SUCCESS);
195 
196  /*
197  * Set up a dummy HeapTupleData pointing to the data from the shm_mq
198  * (which had better be sufficiently aligned).
199  */
201  htup.t_tableOid = InvalidOid;
202  htup.t_len = nbytes;
203  htup.t_data = data;
204 
205  return heap_copytuple(&htup);
206 }
HeapTuple heap_copytuple(HeapTuple tuple)
Definition: heaptuple.c:611
shm_mq_handle * queue
Definition: tqueue.c:45
HeapTupleHeader t_data
Definition: htup.h:67
ItemPointerData t_self
Definition: htup.h:65
uint32 t_len
Definition: htup.h:64
Oid t_tableOid
Definition: htup.h:66
#define InvalidOid
Definition: postgres_ext.h:36
shm_mq_result
Definition: shm_mq.h:36
#define Assert(condition)
Definition: c.h:670
size_t Size
Definition: c.h:404
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:150
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:522