PostgreSQL Source Code  git master
shm_mq.h File Reference
#include "postmaster/bgworker.h"
#include "storage/dsm.h"
#include "storage/proc.h"
Include dependency graph for shm_mq.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  shm_mq_iovec
 

Typedefs

typedef struct shm_mq shm_mq
 
typedef struct shm_mq_handle shm_mq_handle
 

Enumerations

enum  shm_mq_result { SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, SHM_MQ_DETACHED }
 

Functions

shm_mqshm_mq_create (void *address, Size size)
 
void shm_mq_set_receiver (shm_mq *mq, PGPROC *)
 
void shm_mq_set_sender (shm_mq *mq, PGPROC *)
 
PGPROCshm_mq_get_receiver (shm_mq *)
 
PGPROCshm_mq_get_sender (shm_mq *)
 
shm_mq_handleshm_mq_attach (shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 
void shm_mq_set_handle (shm_mq_handle *, BackgroundWorkerHandle *)
 
void shm_mq_detach (shm_mq_handle *mqh)
 
shm_mqshm_mq_get_queue (shm_mq_handle *mqh)
 
shm_mq_result shm_mq_send (shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
 
shm_mq_result shm_mq_sendv (shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
 
shm_mq_result shm_mq_receive (shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 
shm_mq_result shm_mq_wait_for_attach (shm_mq_handle *mqh)
 

Variables

PGDLLIMPORT const Size shm_mq_minimum_size
 

Typedef Documentation

◆ shm_mq

typedef struct shm_mq shm_mq

Definition at line 22 of file shm_mq.h.

◆ shm_mq_handle

typedef struct shm_mq_handle shm_mq_handle

Definition at line 26 of file shm_mq.h.

Enumeration Type Documentation

◆ shm_mq_result

Enumerator
SHM_MQ_SUCCESS 
SHM_MQ_WOULD_BLOCK 
SHM_MQ_DETACHED 

Definition at line 36 of file shm_mq.h.

37 {
38  SHM_MQ_SUCCESS, /* Sent or received a message. */
39  SHM_MQ_WOULD_BLOCK, /* Not completed; retry later. */
40  SHM_MQ_DETACHED /* Other process has detached queue. */
shm_mq_result
Definition: shm_mq.h:36

Function Documentation

◆ shm_mq_attach()

shm_mq_handle* shm_mq_attach ( shm_mq mq,
dsm_segment seg,
BackgroundWorkerHandle handle 
)

Definition at line 283 of file shm_mq.c.

References Assert, CurrentMemoryContext, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_expected_bytes, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_segment, MyProc, on_dsm_detach(), palloc(), PointerGetDatum, and shm_mq_detach_callback().

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ParallelWorkerMain(), ReinitializeParallelDSM(), and test_shm_mq_setup().

284 {
285  shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
286 
287  Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
288  mqh->mqh_queue = mq;
289  mqh->mqh_segment = seg;
290  mqh->mqh_handle = handle;
291  mqh->mqh_buffer = NULL;
292  mqh->mqh_buflen = 0;
293  mqh->mqh_consume_pending = 0;
294  mqh->mqh_partial_bytes = 0;
295  mqh->mqh_expected_bytes = 0;
296  mqh->mqh_length_word_complete = false;
297  mqh->mqh_counterparty_attached = false;
299 
300  if (seg != NULL)
302 
303  return mqh;
304 }
Size mqh_partial_bytes
Definition: shm_mq.c:139
PGPROC * MyProc
Definition: proc.c:67
#define PointerGetDatum(X)
Definition: postgres.h:556
PGPROC * mq_receiver
Definition: shm_mq.c:74
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:135
Size mqh_consume_pending
Definition: shm_mq.c:138
bool mqh_counterparty_attached
Definition: shm_mq.c:142
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1091
char * mqh_buffer
Definition: shm_mq.c:136
bool mqh_length_word_complete
Definition: shm_mq.c:141
Size mqh_buflen
Definition: shm_mq.c:137
dsm_segment * mqh_segment
Definition: shm_mq.c:134
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
Size mqh_expected_bytes
Definition: shm_mq.c:140
shm_mq * mqh_queue
Definition: shm_mq.c:133
#define Assert(condition)
Definition: c.h:800
void * palloc(Size size)
Definition: mcxt.c:950
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1280
MemoryContext mqh_context
Definition: shm_mq.c:143
PGPROC * mq_sender
Definition: shm_mq.c:75

◆ shm_mq_create()

shm_mq* shm_mq_create ( void *  address,
Size  size 
)

Definition at line 170 of file shm_mq.c.

References Assert, MAXALIGN, MAXALIGN_DOWN, shm_mq::mq_bytes_read, shm_mq::mq_bytes_written, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_ring, shm_mq::mq_ring_offset, shm_mq::mq_ring_size, shm_mq::mq_sender, offsetof, pg_atomic_init_u64(), and SpinLockInit.

Referenced by ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

171 {
172  shm_mq *mq = address;
173  Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
174 
175  /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
176  size = MAXALIGN_DOWN(size);
177 
178  /* Queue size must be large enough to hold some data. */
179  Assert(size > data_offset);
180 
181  /* Initialize queue header. */
182  SpinLockInit(&mq->mq_mutex);
183  mq->mq_receiver = NULL;
184  mq->mq_sender = NULL;
187  mq->mq_ring_size = size - data_offset;
188  mq->mq_detached = false;
189  mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
190 
191  return mq;
192 }
#define SpinLockInit(lock)
Definition: spin.h:60
PGPROC * mq_receiver
Definition: shm_mq.c:74
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:415
slock_t mq_mutex
Definition: shm_mq.c:73
bool mq_detached
Definition: shm_mq.c:79
#define Assert(condition)
Definition: c.h:800
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:76
size_t Size
Definition: c.h:528
#define MAXALIGN(LEN)
Definition: c.h:753
Size mq_ring_size
Definition: shm_mq.c:78
Definition: shm_mq.c:71
uint8 mq_ring_offset
Definition: shm_mq.c:80
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:77
#define offsetof(type, field)
Definition: c.h:723
#define MAXALIGN_DOWN(LEN)
Definition: c.h:765
PGPROC * mq_sender
Definition: shm_mq.c:75

◆ shm_mq_detach()

void shm_mq_detach ( shm_mq_handle mqh)

Definition at line 817 of file shm_mq.c.

References cancel_on_dsm_detach(), shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_segment, pfree(), PointerGetDatum, shm_mq_detach_callback(), and shm_mq_detach_internal().

Referenced by DestroyParallelContext(), ExecParallelFinish(), HandleParallelMessage(), LaunchParallelWorkers(), mq_putmessage(), tqueueDestroyReceiver(), and tqueueShutdownReceiver().

818 {
819  /* Notify counterparty that we're outta here. */
821 
822  /* Cancel on_dsm_detach callback, if any. */
823  if (mqh->mqh_segment)
826  PointerGetDatum(mqh->mqh_queue));
827 
828  /* Release local memory associated with handle. */
829  if (mqh->mqh_buffer != NULL)
830  pfree(mqh->mqh_buffer);
831  pfree(mqh);
832 }
#define PointerGetDatum(X)
Definition: postgres.h:556
char * mqh_buffer
Definition: shm_mq.c:136
void pfree(void *pointer)
Definition: mcxt.c:1057
dsm_segment * mqh_segment
Definition: shm_mq.c:134
static void shm_mq_detach_internal(shm_mq *mq)
Definition: shm_mq.c:849
shm_mq * mqh_queue
Definition: shm_mq.c:133
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1280
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1106

◆ shm_mq_get_queue()

shm_mq* shm_mq_get_queue ( shm_mq_handle mqh)

Definition at line 872 of file shm_mq.c.

References shm_mq_handle::mqh_queue.

Referenced by WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

873 {
874  return mqh->mqh_queue;
875 }
shm_mq * mqh_queue
Definition: shm_mq.c:133

◆ shm_mq_get_receiver()

PGPROC* shm_mq_get_receiver ( shm_mq )

Definition at line 235 of file shm_mq.c.

References shm_mq::mq_mutex, shm_mq::mq_receiver, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_send_bytes(), and shm_mq_wait_for_attach().

236 {
237  PGPROC *receiver;
238 
239  SpinLockAcquire(&mq->mq_mutex);
240  receiver = mq->mq_receiver;
241  SpinLockRelease(&mq->mq_mutex);
242 
243  return receiver;
244 }
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
Definition: proc.h:120

◆ shm_mq_get_sender()

PGPROC* shm_mq_get_sender ( shm_mq )

Definition at line 250 of file shm_mq.c.

References shm_mq::mq_mutex, shm_mq::mq_sender, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_receive(), shm_mq_wait_for_attach(), WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

251 {
252  PGPROC *sender;
253 
254  SpinLockAcquire(&mq->mq_mutex);
255  sender = mq->mq_sender;
256  SpinLockRelease(&mq->mq_mutex);
257 
258  return sender;
259 }
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
Definition: proc.h:120

◆ shm_mq_receive()

shm_mq_result shm_mq_receive ( shm_mq_handle mqh,
Size nbytesp,
void **  datap,
bool  nowait 
)

Definition at line 548 of file shm_mq.c.

References Assert, ereport, errcode(), errmsg(), ERROR, Max, MAXALIGN, MaxAllocSize, MemoryContextAlloc(), Min, shm_mq::mq_detached, shm_mq::mq_receiver, shm_mq::mq_ring_size, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_expected_bytes, shm_mq_handle::mqh_handle, MQH_INITIAL_BUFSIZE, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, MyProc, pfree(), shm_mq_counterparty_gone(), SHM_MQ_DETACHED, shm_mq_get_sender(), shm_mq_inc_bytes_read(), shm_mq_receive_bytes(), SHM_MQ_SUCCESS, shm_mq_wait_internal(), and SHM_MQ_WOULD_BLOCK.

Referenced by copy_messages(), HandleParallelMessages(), test_shm_mq(), test_shm_mq_pipelined(), and TupleQueueReaderNext().

549 {
550  shm_mq *mq = mqh->mqh_queue;
551  shm_mq_result res;
552  Size rb = 0;
553  Size nbytes;
554  void *rawdata;
555 
556  Assert(mq->mq_receiver == MyProc);
557 
558  /* We can't receive data until the sender has attached. */
559  if (!mqh->mqh_counterparty_attached)
560  {
561  if (nowait)
562  {
563  int counterparty_gone;
564 
565  /*
566  * We shouldn't return at this point at all unless the sender
567  * hasn't attached yet. However, the correct return value depends
568  * on whether the sender is still attached. If we first test
569  * whether the sender has ever attached and then test whether the
570  * sender has detached, there's a race condition: a sender that
571  * attaches and detaches very quickly might fool us into thinking
572  * the sender never attached at all. So, test whether our
573  * counterparty is definitively gone first, and only afterwards
574  * check whether the sender ever attached in the first place.
575  */
576  counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
577  if (shm_mq_get_sender(mq) == NULL)
578  {
579  if (counterparty_gone)
580  return SHM_MQ_DETACHED;
581  else
582  return SHM_MQ_WOULD_BLOCK;
583  }
584  }
585  else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
586  && shm_mq_get_sender(mq) == NULL)
587  {
588  mq->mq_detached = true;
589  return SHM_MQ_DETACHED;
590  }
591  mqh->mqh_counterparty_attached = true;
592  }
593 
594  /*
595  * If we've consumed an amount of data greater than 1/4th of the ring
596  * size, mark it consumed in shared memory. We try to avoid doing this
597  * unnecessarily when only a small amount of data has been consumed,
598  * because SetLatch() is fairly expensive and we don't want to do it too
599  * often.
600  */
601  if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
602  {
604  mqh->mqh_consume_pending = 0;
605  }
606 
607  /* Try to read, or finish reading, the length word from the buffer. */
608  while (!mqh->mqh_length_word_complete)
609  {
610  /* Try to receive the message length word. */
611  Assert(mqh->mqh_partial_bytes < sizeof(Size));
612  res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
613  nowait, &rb, &rawdata);
614  if (res != SHM_MQ_SUCCESS)
615  return res;
616 
617  /*
618  * Hopefully, we'll receive the entire message length word at once.
619  * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
620  * multiple reads.
621  */
622  if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
623  {
624  Size needed;
625 
626  nbytes = *(Size *) rawdata;
627 
628  /* If we've already got the whole message, we're done. */
629  needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
630  if (rb >= needed)
631  {
632  mqh->mqh_consume_pending += needed;
633  *nbytesp = nbytes;
634  *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
635  return SHM_MQ_SUCCESS;
636  }
637 
638  /*
639  * We don't have the whole message, but we at least have the whole
640  * length word.
641  */
642  mqh->mqh_expected_bytes = nbytes;
643  mqh->mqh_length_word_complete = true;
644  mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
645  rb -= MAXALIGN(sizeof(Size));
646  }
647  else
648  {
649  Size lengthbytes;
650 
651  /* Can't be split unless bigger than required alignment. */
652  Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
653 
654  /* Message word is split; need buffer to reassemble. */
655  if (mqh->mqh_buffer == NULL)
656  {
660  }
661  Assert(mqh->mqh_buflen >= sizeof(Size));
662 
663  /* Copy partial length word; remember to consume it. */
664  if (mqh->mqh_partial_bytes + rb > sizeof(Size))
665  lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
666  else
667  lengthbytes = rb;
668  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
669  lengthbytes);
670  mqh->mqh_partial_bytes += lengthbytes;
671  mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
672  rb -= lengthbytes;
673 
674  /* If we now have the whole word, we're ready to read payload. */
675  if (mqh->mqh_partial_bytes >= sizeof(Size))
676  {
677  Assert(mqh->mqh_partial_bytes == sizeof(Size));
678  mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
679  mqh->mqh_length_word_complete = true;
680  mqh->mqh_partial_bytes = 0;
681  }
682  }
683  }
684  nbytes = mqh->mqh_expected_bytes;
685 
686  /*
687  * Should be disallowed on the sending side already, but better check and
688  * error out on the receiver side as well rather than trying to read a
689  * prohibitively large message.
690  */
691  if (nbytes > MaxAllocSize)
692  ereport(ERROR,
693  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
694  errmsg("invalid message size %zu in shared memory queue",
695  nbytes)));
696 
697  if (mqh->mqh_partial_bytes == 0)
698  {
699  /*
700  * Try to obtain the whole message in a single chunk. If this works,
701  * we need not copy the data and can return a pointer directly into
702  * shared memory.
703  */
704  res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
705  if (res != SHM_MQ_SUCCESS)
706  return res;
707  if (rb >= nbytes)
708  {
709  mqh->mqh_length_word_complete = false;
710  mqh->mqh_consume_pending += MAXALIGN(nbytes);
711  *nbytesp = nbytes;
712  *datap = rawdata;
713  return SHM_MQ_SUCCESS;
714  }
715 
716  /*
717  * The message has wrapped the buffer. We'll need to copy it in order
718  * to return it to the client in one chunk. First, make sure we have
719  * a large enough buffer available.
720  */
721  if (mqh->mqh_buflen < nbytes)
722  {
723  Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
724 
725  /*
726  * Double the buffer size until the payload fits, but limit to
727  * MaxAllocSize.
728  */
729  while (newbuflen < nbytes)
730  newbuflen *= 2;
731  newbuflen = Min(newbuflen, MaxAllocSize);
732 
733  if (mqh->mqh_buffer != NULL)
734  {
735  pfree(mqh->mqh_buffer);
736  mqh->mqh_buffer = NULL;
737  mqh->mqh_buflen = 0;
738  }
739  mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
740  mqh->mqh_buflen = newbuflen;
741  }
742  }
743 
744  /* Loop until we've copied the entire message. */
745  for (;;)
746  {
747  Size still_needed;
748 
749  /* Copy as much as we can. */
750  Assert(mqh->mqh_partial_bytes + rb <= nbytes);
751  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
752  mqh->mqh_partial_bytes += rb;
753 
754  /*
755  * Update count of bytes that can be consumed, accounting for
756  * alignment padding. Note that this will never actually insert any
757  * padding except at the end of a message, because the buffer size is
758  * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
759  */
760  Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
761  mqh->mqh_consume_pending += MAXALIGN(rb);
762 
763  /* If we got all the data, exit the loop. */
764  if (mqh->mqh_partial_bytes >= nbytes)
765  break;
766 
767  /* Wait for some more data. */
768  still_needed = nbytes - mqh->mqh_partial_bytes;
769  res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
770  if (res != SHM_MQ_SUCCESS)
771  return res;
772  if (rb > still_needed)
773  rb = still_needed;
774  }
775 
776  /* Return the complete message, and reset for next message. */
777  *nbytesp = nbytes;
778  *datap = mqh->mqh_buffer;
779  mqh->mqh_length_word_complete = false;
780  mqh->mqh_partial_bytes = 0;
781  return SHM_MQ_SUCCESS;
782 }
Size mqh_partial_bytes
Definition: shm_mq.c:139
PGPROC * MyProc
Definition: proc.c:67
#define Min(x, y)
Definition: c.h:982
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:250
int errcode(int sqlerrcode)
Definition: elog.c:691
PGPROC * mq_receiver
Definition: shm_mq.c:74
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:135
Size mqh_consume_pending
Definition: shm_mq.c:138
bool mqh_counterparty_attached
Definition: shm_mq.c:142
char * mqh_buffer
Definition: shm_mq.c:136
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1175
bool mqh_length_word_complete
Definition: shm_mq.c:141
void pfree(void *pointer)
Definition: mcxt.c:1057
#define ERROR
Definition: elog.h:43
#define MQH_INITIAL_BUFSIZE
Definition: shm_mq.c:164
Size mqh_buflen
Definition: shm_mq.c:137
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition: shm_mq.c:1036
Size mqh_expected_bytes
Definition: shm_mq.c:140
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
Definition: shm_mq.c:1227
#define MaxAllocSize
Definition: memutils.h:40
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1136
shm_mq * mqh_queue
Definition: shm_mq.c:133
bool mq_detached
Definition: shm_mq.c:79
#define ereport(elevel,...)
Definition: elog.h:155
shm_mq_result
Definition: shm_mq.h:36
#define Max(x, y)
Definition: c.h:976
#define Assert(condition)
Definition: c.h:800
size_t Size
Definition: c.h:528
#define MAXALIGN(LEN)
Definition: c.h:753
Size mq_ring_size
Definition: shm_mq.c:78
int errmsg(const char *fmt,...)
Definition: elog.c:902
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
Definition: shm_mq.c:71
MemoryContext mqh_context
Definition: shm_mq.c:143
PGPROC * mq_sender
Definition: shm_mq.c:75

◆ shm_mq_send()

shm_mq_result shm_mq_send ( shm_mq_handle mqh,
Size  nbytes,
const void *  data,
bool  nowait 
)

Definition at line 321 of file shm_mq.c.

References shm_mq_iovec::data, shm_mq_iovec::len, and shm_mq_sendv().

Referenced by copy_messages(), test_shm_mq(), test_shm_mq_pipelined(), and tqueueReceiveSlot().

322 {
323  shm_mq_iovec iov;
324 
325  iov.data = data;
326  iov.len = nbytes;
327 
328  return shm_mq_sendv(mqh, &iov, 1, nowait);
329 }
Size len
Definition: shm_mq.h:32
const char * data
Definition: shm_mq.h:31
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
Definition: shm_mq.c:347

◆ shm_mq_sendv()

shm_mq_result shm_mq_sendv ( shm_mq_handle mqh,
shm_mq_iovec iov,
int  iovcnt,
bool  nowait 
)

Definition at line 347 of file shm_mq.c.

References Assert, shm_mq_iovec::data, ereport, errcode(), errmsg(), ERROR, i, shm_mq_iovec::len, MAXALIGN_DOWN, MaxAllocSize, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, MyProc, PGPROC::procLatch, SetLatch(), SHM_MQ_DETACHED, shm_mq_send_bytes(), SHM_MQ_SUCCESS, SpinLockAcquire, SpinLockRelease, and tmpbuf.

Referenced by mq_putmessage(), and shm_mq_send().

348 {
349  shm_mq_result res;
350  shm_mq *mq = mqh->mqh_queue;
351  PGPROC *receiver;
352  Size nbytes = 0;
353  Size bytes_written;
354  int i;
355  int which_iov = 0;
356  Size offset;
357 
358  Assert(mq->mq_sender == MyProc);
359 
360  /* Compute total size of write. */
361  for (i = 0; i < iovcnt; ++i)
362  nbytes += iov[i].len;
363 
364  /* Prevent writing messages overwhelming the receiver. */
365  if (nbytes > MaxAllocSize)
366  ereport(ERROR,
367  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
368  errmsg("cannot send a message of size %zu via shared memory queue",
369  nbytes)));
370 
371  /* Try to write, or finish writing, the length word into the buffer. */
372  while (!mqh->mqh_length_word_complete)
373  {
374  Assert(mqh->mqh_partial_bytes < sizeof(Size));
375  res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
376  ((char *) &nbytes) + mqh->mqh_partial_bytes,
377  nowait, &bytes_written);
378 
379  if (res == SHM_MQ_DETACHED)
380  {
381  /* Reset state in case caller tries to send another message. */
382  mqh->mqh_partial_bytes = 0;
383  mqh->mqh_length_word_complete = false;
384  return res;
385  }
386  mqh->mqh_partial_bytes += bytes_written;
387 
388  if (mqh->mqh_partial_bytes >= sizeof(Size))
389  {
390  Assert(mqh->mqh_partial_bytes == sizeof(Size));
391 
392  mqh->mqh_partial_bytes = 0;
393  mqh->mqh_length_word_complete = true;
394  }
395 
396  if (res != SHM_MQ_SUCCESS)
397  return res;
398 
399  /* Length word can't be split unless bigger than required alignment. */
400  Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
401  }
402 
403  /* Write the actual data bytes into the buffer. */
404  Assert(mqh->mqh_partial_bytes <= nbytes);
405  offset = mqh->mqh_partial_bytes;
406  do
407  {
408  Size chunksize;
409 
410  /* Figure out which bytes need to be sent next. */
411  if (offset >= iov[which_iov].len)
412  {
413  offset -= iov[which_iov].len;
414  ++which_iov;
415  if (which_iov >= iovcnt)
416  break;
417  continue;
418  }
419 
420  /*
421  * We want to avoid copying the data if at all possible, but every
422  * chunk of bytes we write into the queue has to be MAXALIGN'd, except
423  * the last. Thus, if a chunk other than the last one ends on a
424  * non-MAXALIGN'd boundary, we have to combine the tail end of its
425  * data with data from one or more following chunks until we either
426  * reach the last chunk or accumulate a number of bytes which is
427  * MAXALIGN'd.
428  */
429  if (which_iov + 1 < iovcnt &&
430  offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
431  {
432  char tmpbuf[MAXIMUM_ALIGNOF];
433  int j = 0;
434 
435  for (;;)
436  {
437  if (offset < iov[which_iov].len)
438  {
439  tmpbuf[j] = iov[which_iov].data[offset];
440  j++;
441  offset++;
442  if (j == MAXIMUM_ALIGNOF)
443  break;
444  }
445  else
446  {
447  offset -= iov[which_iov].len;
448  which_iov++;
449  if (which_iov >= iovcnt)
450  break;
451  }
452  }
453 
454  res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
455 
456  if (res == SHM_MQ_DETACHED)
457  {
458  /* Reset state in case caller tries to send another message. */
459  mqh->mqh_partial_bytes = 0;
460  mqh->mqh_length_word_complete = false;
461  return res;
462  }
463 
464  mqh->mqh_partial_bytes += bytes_written;
465  if (res != SHM_MQ_SUCCESS)
466  return res;
467  continue;
468  }
469 
470  /*
471  * If this is the last chunk, we can write all the data, even if it
472  * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
473  * MAXALIGN_DOWN the write size.
474  */
475  chunksize = iov[which_iov].len - offset;
476  if (which_iov + 1 < iovcnt)
477  chunksize = MAXALIGN_DOWN(chunksize);
478  res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
479  nowait, &bytes_written);
480 
481  if (res == SHM_MQ_DETACHED)
482  {
483  /* Reset state in case caller tries to send another message. */
484  mqh->mqh_length_word_complete = false;
485  mqh->mqh_partial_bytes = 0;
486  return res;
487  }
488 
489  mqh->mqh_partial_bytes += bytes_written;
490  offset += bytes_written;
491  if (res != SHM_MQ_SUCCESS)
492  return res;
493  } while (mqh->mqh_partial_bytes < nbytes);
494 
495  /* Reset for next message. */
496  mqh->mqh_partial_bytes = 0;
497  mqh->mqh_length_word_complete = false;
498 
499  /* If queue has been detached, let caller know. */
500  if (mq->mq_detached)
501  return SHM_MQ_DETACHED;
502 
503  /*
504  * If the counterparty is known to have attached, we can read mq_receiver
505  * without acquiring the spinlock and assume it isn't NULL. Otherwise,
506  * more caution is needed.
507  */
508  if (mqh->mqh_counterparty_attached)
509  receiver = mq->mq_receiver;
510  else
511  {
513  receiver = mq->mq_receiver;
515  if (receiver == NULL)
516  return SHM_MQ_SUCCESS;
517  mqh->mqh_counterparty_attached = true;
518  }
519 
520  /* Notify receiver of the newly-written data, and return. */
521  SetLatch(&receiver->procLatch);
522  return SHM_MQ_SUCCESS;
523 }
Size mqh_partial_bytes
Definition: shm_mq.c:139
PGPROC * MyProc
Definition: proc.c:67
int errcode(int sqlerrcode)
Definition: elog.c:691
PGPROC * mq_receiver
Definition: shm_mq.c:74
void SetLatch(Latch *latch)
Definition: latch.c:505
bool mqh_counterparty_attached
Definition: shm_mq.c:142
Latch procLatch
Definition: proc.h:129
Size len
Definition: shm_mq.h:32
#define SpinLockAcquire(lock)
Definition: spin.h:62
bool mqh_length_word_complete
Definition: shm_mq.c:141
const char * data
Definition: shm_mq.h:31
#define ERROR
Definition: elog.h:43
slock_t mq_mutex
Definition: shm_mq.c:73
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition: shm_mq.c:881
#define MaxAllocSize
Definition: memutils.h:40
#define SpinLockRelease(lock)
Definition: spin.h:64
shm_mq * mqh_queue
Definition: shm_mq.c:133
bool mq_detached
Definition: shm_mq.c:79
#define ereport(elevel,...)
Definition: elog.h:155
shm_mq_result
Definition: shm_mq.h:36
#define Assert(condition)
Definition: c.h:800
size_t Size
Definition: c.h:528
static StringInfoData tmpbuf
Definition: walsender.c:159
int errmsg(const char *fmt,...)
Definition: elog.c:902
int i
Definition: shm_mq.c:71
Definition: proc.h:120
#define MAXALIGN_DOWN(LEN)
Definition: c.h:765
PGPROC * mq_sender
Definition: shm_mq.c:75

◆ shm_mq_set_handle()

void shm_mq_set_handle ( shm_mq_handle ,
BackgroundWorkerHandle  
)

Definition at line 311 of file shm_mq.c.

References Assert, and shm_mq_handle::mqh_handle.

Referenced by ExecParallelCreateReaders(), and LaunchParallelWorkers().

312 {
313  Assert(mqh->mqh_handle == NULL);
314  mqh->mqh_handle = handle;
315 }
#define Assert(condition)
Definition: c.h:800

◆ shm_mq_set_receiver()

void shm_mq_set_receiver ( shm_mq mq,
PGPROC  
)

Definition at line 199 of file shm_mq.c.

References Assert, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by attach_to_queues(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

200 {
201  PGPROC *sender;
202 
204  Assert(mq->mq_receiver == NULL);
205  mq->mq_receiver = proc;
206  sender = mq->mq_sender;
208 
209  if (sender != NULL)
210  SetLatch(&sender->procLatch);
211 }
PGPROC * mq_receiver
Definition: shm_mq.c:74
void SetLatch(Latch *latch)
Definition: latch.c:505
Latch procLatch
Definition: proc.h:129
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:73
#define SpinLockRelease(lock)
Definition: spin.h:64
#define Assert(condition)
Definition: c.h:800
Definition: proc.h:120
PGPROC * mq_sender
Definition: shm_mq.c:75

◆ shm_mq_set_sender()

void shm_mq_set_sender ( shm_mq mq,
PGPROC  
)

Definition at line 217 of file shm_mq.c.

References Assert, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ParallelWorkerMain(), and setup_dynamic_shared_memory().

218 {
219  PGPROC *receiver;
220 
222  Assert(mq->mq_sender == NULL);
223  mq->mq_sender = proc;
224  receiver = mq->mq_receiver;
226 
227  if (receiver != NULL)
228  SetLatch(&receiver->procLatch);
229 }
PGPROC * mq_receiver
Definition: shm_mq.c:74
void SetLatch(Latch *latch)
Definition: latch.c:505
Latch procLatch
Definition: proc.h:129
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:73
#define SpinLockRelease(lock)
Definition: spin.h:64
#define Assert(condition)
Definition: c.h:800
Definition: proc.h:120
PGPROC * mq_sender
Definition: shm_mq.c:75

◆ shm_mq_wait_for_attach()

shm_mq_result shm_mq_wait_for_attach ( shm_mq_handle mqh)

Definition at line 794 of file shm_mq.c.

References Assert, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_queue, MyProc, SHM_MQ_DETACHED, shm_mq_get_receiver(), shm_mq_get_sender(), SHM_MQ_SUCCESS, and shm_mq_wait_internal().

795 {
796  shm_mq *mq = mqh->mqh_queue;
797  PGPROC **victim;
798 
799  if (shm_mq_get_receiver(mq) == MyProc)
800  victim = &mq->mq_sender;
801  else
802  {
804  victim = &mq->mq_receiver;
805  }
806 
807  if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
808  return SHM_MQ_SUCCESS;
809  else
810  return SHM_MQ_DETACHED;
811 }
PGPROC * MyProc
Definition: proc.c:67
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:250
PGPROC * mq_receiver
Definition: shm_mq.c:74
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:235
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:135
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1175
shm_mq * mqh_queue
Definition: shm_mq.c:133
#define Assert(condition)
Definition: c.h:800
Definition: shm_mq.c:71
Definition: proc.h:120
PGPROC * mq_sender
Definition: shm_mq.c:75

Variable Documentation

◆ shm_mq_minimum_size

PGDLLIMPORT const Size shm_mq_minimum_size

Definition at line 161 of file shm_mq.c.

Referenced by setup_dynamic_shared_memory().