PostgreSQL Source Code  git master
tqueue.c File Reference
#include "postgres.h"
#include "access/htup_details.h"
#include "executor/tqueue.h"
Include dependency graph for tqueue.c:

Go to the source code of this file.

Data Structures

struct  TQueueDestReceiver
 
struct  TupleQueueReader
 

Typedefs

typedef struct TQueueDestReceiver TQueueDestReceiver
 

Functions

static bool tqueueReceiveSlot (TupleTableSlot *slot, DestReceiver *self)
 
static void tqueueStartupReceiver (DestReceiver *self, int operation, TupleDesc typeinfo)
 
static void tqueueShutdownReceiver (DestReceiver *self)
 
static void tqueueDestroyReceiver (DestReceiver *self)
 
DestReceiverCreateTupleQueueDestReceiver (shm_mq_handle *handle)
 
TupleQueueReaderCreateTupleQueueReader (shm_mq_handle *handle)
 
void DestroyTupleQueueReader (TupleQueueReader *reader)
 
MinimalTuple TupleQueueReaderNext (TupleQueueReader *reader, bool nowait, bool *done)
 

Typedef Documentation

◆ TQueueDestReceiver

Function Documentation

◆ CreateTupleQueueDestReceiver()

DestReceiver* CreateTupleQueueDestReceiver ( shm_mq_handle handle)

Definition at line 119 of file tqueue.c.

120 {
121  TQueueDestReceiver *self;
122 
123  self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
124 
125  self->pub.receiveSlot = tqueueReceiveSlot;
126  self->pub.rStartup = tqueueStartupReceiver;
127  self->pub.rShutdown = tqueueShutdownReceiver;
128  self->pub.rDestroy = tqueueDestroyReceiver;
129  self->pub.mydest = DestTupleQueue;
130  self->queue = handle;
131 
132  return (DestReceiver *) self;
133 }
@ DestTupleQueue
Definition: dest.h:98
void * palloc0(Size size)
Definition: mcxt.c:1347
static void tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
Definition: tqueue.c:83
static void tqueueShutdownReceiver(DestReceiver *self)
Definition: tqueue.c:92
static bool tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
Definition: tqueue.c:54
static void tqueueDestroyReceiver(DestReceiver *self)
Definition: tqueue.c:105

References DestTupleQueue, palloc0(), tqueueDestroyReceiver(), tqueueReceiveSlot(), tqueueShutdownReceiver(), and tqueueStartupReceiver().

Referenced by CreateDestReceiver(), and ExecParallelGetReceiver().

◆ CreateTupleQueueReader()

TupleQueueReader* CreateTupleQueueReader ( shm_mq_handle handle)

Definition at line 139 of file tqueue.c.

140 {
141  TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
142 
143  reader->queue = handle;
144 
145  return reader;
146 }
shm_mq_handle * queue
Definition: tqueue.c:45

References palloc0(), and TupleQueueReader::queue.

Referenced by ExecParallelCreateReaders().

◆ DestroyTupleQueueReader()

void DestroyTupleQueueReader ( TupleQueueReader reader)

Definition at line 155 of file tqueue.c.

156 {
157  pfree(reader);
158 }
void pfree(void *pointer)
Definition: mcxt.c:1521

References pfree().

Referenced by ExecParallelFinish().

◆ tqueueDestroyReceiver()

static void tqueueDestroyReceiver ( DestReceiver self)
static

Definition at line 105 of file tqueue.c.

106 {
107  TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
108 
109  /* We probably already detached from queue, but let's be sure */
110  if (tqueue->queue != NULL)
111  shm_mq_detach(tqueue->queue);
112  pfree(self);
113 }
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:843
shm_mq_handle * queue
Definition: tqueue.c:33

References pfree(), TQueueDestReceiver::queue, and shm_mq_detach().

Referenced by CreateTupleQueueDestReceiver().

◆ tqueueReceiveSlot()

static bool tqueueReceiveSlot ( TupleTableSlot slot,
DestReceiver self 
)
static

Definition at line 54 of file tqueue.c.

55 {
56  TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
57  MinimalTuple tuple;
58  shm_mq_result result;
59  bool should_free;
60 
61  /* Send the tuple itself. */
62  tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
63  result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false, false);
64 
65  if (should_free)
66  pfree(tuple);
67 
68  /* Check for failure. */
69  if (result == SHM_MQ_DETACHED)
70  return false;
71  else if (result != SHM_MQ_SUCCESS)
72  ereport(ERROR,
73  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
74  errmsg("could not send tuple to shared-memory queue")));
75 
76  return true;
77 }
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1779
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition: shm_mq.c:329
shm_mq_result
Definition: shm_mq.h:37
@ SHM_MQ_SUCCESS
Definition: shm_mq.h:38
@ SHM_MQ_DETACHED
Definition: shm_mq.h:40

References ereport, errcode(), errmsg(), ERROR, ExecFetchSlotMinimalTuple(), pfree(), TQueueDestReceiver::queue, SHM_MQ_DETACHED, shm_mq_send(), SHM_MQ_SUCCESS, and MinimalTupleData::t_len.

Referenced by CreateTupleQueueDestReceiver().

◆ tqueueShutdownReceiver()

static void tqueueShutdownReceiver ( DestReceiver self)
static

Definition at line 92 of file tqueue.c.

93 {
94  TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
95 
96  if (tqueue->queue != NULL)
97  shm_mq_detach(tqueue->queue);
98  tqueue->queue = NULL;
99 }

References TQueueDestReceiver::queue, and shm_mq_detach().

Referenced by CreateTupleQueueDestReceiver().

◆ tqueueStartupReceiver()

static void tqueueStartupReceiver ( DestReceiver self,
int  operation,
TupleDesc  typeinfo 
)
static

Definition at line 83 of file tqueue.c.

84 {
85  /* do nothing */
86 }

Referenced by CreateTupleQueueDestReceiver().

◆ TupleQueueReaderNext()

MinimalTuple TupleQueueReaderNext ( TupleQueueReader reader,
bool  nowait,
bool done 
)

Definition at line 176 of file tqueue.c.

177 {
178  MinimalTuple tuple;
179  shm_mq_result result;
180  Size nbytes;
181  void *data;
182 
183  if (done != NULL)
184  *done = false;
185 
186  /* Attempt to read a message. */
187  result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
188 
189  /* If queue is detached, set *done and return NULL. */
190  if (result == SHM_MQ_DETACHED)
191  {
192  if (done != NULL)
193  *done = true;
194  return NULL;
195  }
196 
197  /* In non-blocking mode, bail out if no message ready yet. */
198  if (result == SHM_MQ_WOULD_BLOCK)
199  return NULL;
200  Assert(result == SHM_MQ_SUCCESS);
201 
202  /*
203  * Return a pointer to the queue memory directly (which had better be
204  * sufficiently aligned).
205  */
206  tuple = (MinimalTuple) data;
207  Assert(tuple->t_len == nbytes);
208 
209  return tuple;
210 }
#define Assert(condition)
Definition: c.h:858
size_t Size
Definition: c.h:605
MinimalTupleData * MinimalTuple
Definition: htup.h:27
const void * data
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:572
@ SHM_MQ_WOULD_BLOCK
Definition: shm_mq.h:39

References Assert, data, TupleQueueReader::queue, SHM_MQ_DETACHED, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, and MinimalTupleData::t_len.

Referenced by gather_readnext(), and gm_readnext_tuple().