PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
tqueue.h File Reference
#include "storage/shm_mq.h"
#include "tcop/dest.h"
Include dependency graph for tqueue.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Typedefs

typedef struct TupleQueueReader TupleQueueReader
 

Functions

DestReceiverCreateTupleQueueDestReceiver (shm_mq_handle *handle)
 
TupleQueueReaderCreateTupleQueueReader (shm_mq_handle *handle, TupleDesc tupledesc)
 
void DestroyTupleQueueReader (TupleQueueReader *reader)
 
HeapTuple TupleQueueReaderNext (TupleQueueReader *reader, bool nowait, bool *done)
 

Typedef Documentation

Definition at line 21 of file tqueue.h.

Function Documentation

DestReceiver* CreateTupleQueueDestReceiver ( shm_mq_handle handle)

Definition at line 606 of file tqueue.c.

References CurrentMemoryContext, DestTupleQueue, NULL, palloc0(), tqueueDestroyReceiver(), tqueueReceiveSlot(), tqueueShutdownReceiver(), tqueueStartupReceiver(), and TUPLE_QUEUE_MODE_DATA.

Referenced by CreateDestReceiver(), and ExecParallelGetReceiver().

607 {
608  TQueueDestReceiver *self;
609 
610  self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
611 
612  self->pub.receiveSlot = tqueueReceiveSlot;
613  self->pub.rStartup = tqueueStartupReceiver;
614  self->pub.rShutdown = tqueueShutdownReceiver;
615  self->pub.rDestroy = tqueueDestroyReceiver;
616  self->pub.mydest = DestTupleQueue;
617  self->queue = handle;
618  self->mycontext = CurrentMemoryContext;
619  self->tmpcontext = NULL;
620  self->recordhtab = NULL;
621  self->mode = TUPLE_QUEUE_MODE_DATA;
622  /* Top-level tupledesc is not known yet */
623  self->tupledesc = NULL;
624  self->field_remapinfo = NULL;
625 
626  return (DestReceiver *) self;
627 }
static void tqueueDestroyReceiver(DestReceiver *self)
Definition: tqueue.c:588
static void tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
Definition: tqueue.c:568
static bool tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
Definition: tqueue.c:224
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
void * palloc0(Size size)
Definition: mcxt.c:878
#define NULL
Definition: c.h:229
static void tqueueShutdownReceiver(DestReceiver *self)
Definition: tqueue.c:577
#define TUPLE_QUEUE_MODE_DATA
Definition: tqueue.c:61
TupleQueueReader* CreateTupleQueueReader ( shm_mq_handle handle,
TupleDesc  tupledesc 
)

Definition at line 633 of file tqueue.c.

References BuildFieldRemapInfo(), CurrentMemoryContext, TupleQueueReader::field_remapinfo, TupleQueueReader::mode, TupleQueueReader::mycontext, NULL, palloc0(), TupleQueueReader::queue, TUPLE_QUEUE_MODE_DATA, TupleQueueReader::tupledesc, and TupleQueueReader::typmodmap.

Referenced by ExecGather(), and ExecGatherMerge().

634 {
635  TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
636 
637  reader->queue = handle;
639  reader->typmodmap = NULL;
640  reader->mode = TUPLE_QUEUE_MODE_DATA;
641  reader->tupledesc = tupledesc;
642  reader->field_remapinfo = BuildFieldRemapInfo(tupledesc, reader->mycontext);
643 
644  return reader;
645 }
shm_mq_handle * queue
Definition: tqueue.c:170
TupleDesc tupledesc
Definition: tqueue.c:174
HTAB * typmodmap
Definition: tqueue.c:172
static TupleRemapInfo ** BuildFieldRemapInfo(TupleDesc tupledesc, MemoryContext mycontext)
Definition: tqueue.c:1244
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
TupleRemapInfo ** field_remapinfo
Definition: tqueue.c:175
void * palloc0(Size size)
Definition: mcxt.c:878
MemoryContext mycontext
Definition: tqueue.c:171
#define NULL
Definition: c.h:229
#define TUPLE_QUEUE_MODE_DATA
Definition: tqueue.c:61
void DestroyTupleQueueReader ( TupleQueueReader reader)

Definition at line 651 of file tqueue.c.

References TupleQueueReader::field_remapinfo, hash_destroy(), NULL, pfree(), TupleQueueReader::queue, shm_mq_detach(), shm_mq_get_queue(), and TupleQueueReader::typmodmap.

Referenced by ExecShutdownGatherMergeWorkers(), ExecShutdownGatherWorkers(), gather_merge_readnext(), and gather_readnext().

652 {
654  if (reader->typmodmap != NULL)
655  hash_destroy(reader->typmodmap);
656  /* Is it worth trying to free substructure of the remap tree? */
657  if (reader->field_remapinfo != NULL)
658  pfree(reader->field_remapinfo);
659  pfree(reader);
660 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:793
shm_mq_handle * queue
Definition: tqueue.c:170
void shm_mq_detach(shm_mq *mq)
Definition: shm_mq.c:778
void pfree(void *pointer)
Definition: mcxt.c:950
HTAB * typmodmap
Definition: tqueue.c:172
TupleRemapInfo ** field_remapinfo
Definition: tqueue.c:175
#define NULL
Definition: c.h:229
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
Definition: shm_mq.c:802
HeapTuple TupleQueueReaderNext ( TupleQueueReader reader,
bool  nowait,
bool done 
)

Definition at line 679 of file tqueue.c.

References Assert, elog, ERROR, TupleQueueReader::mode, NULL, TupleQueueReader::queue, result, SHM_MQ_DETACHED, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, TUPLE_QUEUE_MODE_CONTROL, TUPLE_QUEUE_MODE_DATA, TupleQueueHandleControlMessage(), and TupleQueueHandleDataMessage().

Referenced by gather_readnext(), and gm_readnext_tuple().

680 {
682 
683  if (done != NULL)
684  *done = false;
685 
686  for (;;)
687  {
688  Size nbytes;
689  void *data;
690 
691  /* Attempt to read a message. */
692  result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
693 
694  /* If queue is detached, set *done and return NULL. */
695  if (result == SHM_MQ_DETACHED)
696  {
697  if (done != NULL)
698  *done = true;
699  return NULL;
700  }
701 
702  /* In non-blocking mode, bail out if no message ready yet. */
703  if (result == SHM_MQ_WOULD_BLOCK)
704  return NULL;
705  Assert(result == SHM_MQ_SUCCESS);
706 
707  /*
708  * We got a message (see message spec at top of file). Process it.
709  */
710  if (nbytes == 1)
711  {
712  /* Mode switch message. */
713  reader->mode = ((char *) data)[0];
714  }
715  else if (reader->mode == TUPLE_QUEUE_MODE_DATA)
716  {
717  /* Tuple data. */
718  return TupleQueueHandleDataMessage(reader, nbytes, data);
719  }
720  else if (reader->mode == TUPLE_QUEUE_MODE_CONTROL)
721  {
722  /* Control message, describing a transient record type. */
723  TupleQueueHandleControlMessage(reader, nbytes, data);
724  }
725  else
726  elog(ERROR, "unrecognized tqueue mode: %d", (int) reader->mode);
727  }
728 }
static HeapTuple TupleQueueHandleDataMessage(TupleQueueReader *reader, Size nbytes, HeapTupleHeader data)
Definition: tqueue.c:734
shm_mq_handle * queue
Definition: tqueue.c:170
return result
Definition: formatting.c:1618
#define ERROR
Definition: elog.h:43
shm_mq_result
Definition: shm_mq.h:36
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
size_t Size
Definition: c.h:356
#define TUPLE_QUEUE_MODE_CONTROL
Definition: tqueue.c:60
static void TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes, char *data)
Definition: tqueue.c:1050
#define TUPLE_QUEUE_MODE_DATA
Definition: tqueue.c:61
#define elog
Definition: elog.h:219
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:518