PostgreSQL Source Code  git master
shm_mq.h File Reference
#include "postmaster/bgworker.h"
#include "storage/dsm.h"
#include "storage/proc.h"
Include dependency graph for shm_mq.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  shm_mq_iovec
 

Typedefs

typedef struct shm_mq shm_mq
 
typedef struct shm_mq_handle shm_mq_handle
 

Enumerations

enum  shm_mq_result { SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, SHM_MQ_DETACHED }
 

Functions

shm_mqshm_mq_create (void *address, Size size)
 
void shm_mq_set_receiver (shm_mq *mq, PGPROC *)
 
void shm_mq_set_sender (shm_mq *mq, PGPROC *)
 
PGPROCshm_mq_get_receiver (shm_mq *)
 
PGPROCshm_mq_get_sender (shm_mq *)
 
shm_mq_handleshm_mq_attach (shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 
void shm_mq_set_handle (shm_mq_handle *, BackgroundWorkerHandle *)
 
void shm_mq_detach (shm_mq_handle *mqh)
 
shm_mqshm_mq_get_queue (shm_mq_handle *mqh)
 
shm_mq_result shm_mq_send (shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
 
shm_mq_result shm_mq_sendv (shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
 
shm_mq_result shm_mq_receive (shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 
shm_mq_result shm_mq_wait_for_attach (shm_mq_handle *mqh)
 

Variables

PGDLLIMPORT const Size shm_mq_minimum_size
 

Typedef Documentation

◆ shm_mq

typedef struct shm_mq shm_mq

Definition at line 22 of file shm_mq.h.

◆ shm_mq_handle

typedef struct shm_mq_handle shm_mq_handle

Definition at line 26 of file shm_mq.h.

Enumeration Type Documentation

◆ shm_mq_result

Enumerator
SHM_MQ_SUCCESS 
SHM_MQ_WOULD_BLOCK 
SHM_MQ_DETACHED 

Definition at line 36 of file shm_mq.h.

37 {
38  SHM_MQ_SUCCESS, /* Sent or received a message. */
39  SHM_MQ_WOULD_BLOCK, /* Not completed; retry later. */
40  SHM_MQ_DETACHED /* Other process has detached queue. */
shm_mq_result
Definition: shm_mq.h:36

Function Documentation

◆ shm_mq_attach()

shm_mq_handle* shm_mq_attach ( shm_mq mq,
dsm_segment seg,
BackgroundWorkerHandle handle 
)

Definition at line 284 of file shm_mq.c.

References Assert, CurrentMemoryContext, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_expected_bytes, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_segment, MyProc, on_dsm_detach(), palloc(), PointerGetDatum, and shm_mq_detach_callback().

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ParallelWorkerMain(), ReinitializeParallelDSM(), and test_shm_mq_setup().

285 {
286  shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
287 
288  Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
289  mqh->mqh_queue = mq;
290  mqh->mqh_segment = seg;
291  mqh->mqh_handle = handle;
292  mqh->mqh_buffer = NULL;
293  mqh->mqh_buflen = 0;
294  mqh->mqh_consume_pending = 0;
295  mqh->mqh_partial_bytes = 0;
296  mqh->mqh_expected_bytes = 0;
297  mqh->mqh_length_word_complete = false;
298  mqh->mqh_counterparty_attached = false;
300 
301  if (seg != NULL)
303 
304  return mqh;
305 }
Size mqh_partial_bytes
Definition: shm_mq.c:140
PGPROC * MyProc
Definition: proc.c:68
#define PointerGetDatum(X)
Definition: postgres.h:600
PGPROC * mq_receiver
Definition: shm_mq.c:75
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:136
Size mqh_consume_pending
Definition: shm_mq.c:139
bool mqh_counterparty_attached
Definition: shm_mq.c:143
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1096
char * mqh_buffer
Definition: shm_mq.c:137
bool mqh_length_word_complete
Definition: shm_mq.c:142
Size mqh_buflen
Definition: shm_mq.c:138
dsm_segment * mqh_segment
Definition: shm_mq.c:135
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
Size mqh_expected_bytes
Definition: shm_mq.c:141
shm_mq * mqh_queue
Definition: shm_mq.c:134
#define Assert(condition)
Definition: c.h:804
void * palloc(Size size)
Definition: mcxt.c:1062
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1280
MemoryContext mqh_context
Definition: shm_mq.c:144
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_create()

shm_mq* shm_mq_create ( void *  address,
Size  size 
)

Definition at line 171 of file shm_mq.c.

References Assert, MAXALIGN, MAXALIGN_DOWN, shm_mq::mq_bytes_read, shm_mq::mq_bytes_written, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_ring, shm_mq::mq_ring_offset, shm_mq::mq_ring_size, shm_mq::mq_sender, offsetof, pg_atomic_init_u64(), and SpinLockInit.

Referenced by ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

172 {
173  shm_mq *mq = address;
174  Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
175 
176  /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
177  size = MAXALIGN_DOWN(size);
178 
179  /* Queue size must be large enough to hold some data. */
180  Assert(size > data_offset);
181 
182  /* Initialize queue header. */
183  SpinLockInit(&mq->mq_mutex);
184  mq->mq_receiver = NULL;
185  mq->mq_sender = NULL;
188  mq->mq_ring_size = size - data_offset;
189  mq->mq_detached = false;
190  mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
191 
192  return mq;
193 }
#define SpinLockInit(lock)
Definition: spin.h:60
PGPROC * mq_receiver
Definition: shm_mq.c:75
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:415
slock_t mq_mutex
Definition: shm_mq.c:74
bool mq_detached
Definition: shm_mq.c:80
#define Assert(condition)
Definition: c.h:804
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:77
size_t Size
Definition: c.h:540
#define MAXALIGN(LEN)
Definition: c.h:757
Size mq_ring_size
Definition: shm_mq.c:79
Definition: shm_mq.c:72
uint8 mq_ring_offset
Definition: shm_mq.c:81
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:78
#define offsetof(type, field)
Definition: c.h:727
#define MAXALIGN_DOWN(LEN)
Definition: c.h:769
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_detach()

void shm_mq_detach ( shm_mq_handle mqh)

Definition at line 817 of file shm_mq.c.

References cancel_on_dsm_detach(), shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_segment, pfree(), PointerGetDatum, shm_mq_detach_callback(), and shm_mq_detach_internal().

Referenced by DestroyParallelContext(), ExecParallelFinish(), HandleParallelMessage(), LaunchParallelWorkers(), mq_putmessage(), tqueueDestroyReceiver(), and tqueueShutdownReceiver().

818 {
819  /* Notify counterparty that we're outta here. */
821 
822  /* Cancel on_dsm_detach callback, if any. */
823  if (mqh->mqh_segment)
826  PointerGetDatum(mqh->mqh_queue));
827 
828  /* Release local memory associated with handle. */
829  if (mqh->mqh_buffer != NULL)
830  pfree(mqh->mqh_buffer);
831  pfree(mqh);
832 }
#define PointerGetDatum(X)
Definition: postgres.h:600
char * mqh_buffer
Definition: shm_mq.c:137
void pfree(void *pointer)
Definition: mcxt.c:1169
dsm_segment * mqh_segment
Definition: shm_mq.c:135
static void shm_mq_detach_internal(shm_mq *mq)
Definition: shm_mq.c:849
shm_mq * mqh_queue
Definition: shm_mq.c:134
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1280
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1111

◆ shm_mq_get_queue()

shm_mq* shm_mq_get_queue ( shm_mq_handle mqh)

Definition at line 872 of file shm_mq.c.

References shm_mq_handle::mqh_queue.

Referenced by WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

873 {
874  return mqh->mqh_queue;
875 }
shm_mq * mqh_queue
Definition: shm_mq.c:134

◆ shm_mq_get_receiver()

PGPROC* shm_mq_get_receiver ( shm_mq )

Definition at line 236 of file shm_mq.c.

References shm_mq::mq_mutex, shm_mq::mq_receiver, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_send_bytes(), and shm_mq_wait_for_attach().

237 {
238  PGPROC *receiver;
239 
240  SpinLockAcquire(&mq->mq_mutex);
241  receiver = mq->mq_receiver;
242  SpinLockRelease(&mq->mq_mutex);
243 
244  return receiver;
245 }
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
Definition: proc.h:121

◆ shm_mq_get_sender()

PGPROC* shm_mq_get_sender ( shm_mq )

Definition at line 251 of file shm_mq.c.

References shm_mq::mq_mutex, shm_mq::mq_sender, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_receive(), shm_mq_wait_for_attach(), WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

252 {
253  PGPROC *sender;
254 
255  SpinLockAcquire(&mq->mq_mutex);
256  sender = mq->mq_sender;
257  SpinLockRelease(&mq->mq_mutex);
258 
259  return sender;
260 }
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
Definition: proc.h:121

◆ shm_mq_receive()

shm_mq_result shm_mq_receive ( shm_mq_handle mqh,
Size nbytesp,
void **  datap,
bool  nowait 
)

Definition at line 549 of file shm_mq.c.

References Assert, ereport, errcode(), errmsg(), ERROR, MAXALIGN, MaxAllocSize, MemoryContextAlloc(), Min, shm_mq::mq_detached, shm_mq::mq_receiver, shm_mq::mq_ring_size, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_expected_bytes, shm_mq_handle::mqh_handle, MQH_INITIAL_BUFSIZE, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, MyProc, pfree(), pg_nextpower2_size_t, shm_mq_counterparty_gone(), SHM_MQ_DETACHED, shm_mq_get_sender(), shm_mq_inc_bytes_read(), shm_mq_receive_bytes(), SHM_MQ_SUCCESS, shm_mq_wait_internal(), and SHM_MQ_WOULD_BLOCK.

Referenced by copy_messages(), HandleParallelMessages(), test_shm_mq(), test_shm_mq_pipelined(), and TupleQueueReaderNext().

550 {
551  shm_mq *mq = mqh->mqh_queue;
552  shm_mq_result res;
553  Size rb = 0;
554  Size nbytes;
555  void *rawdata;
556 
557  Assert(mq->mq_receiver == MyProc);
558 
559  /* We can't receive data until the sender has attached. */
560  if (!mqh->mqh_counterparty_attached)
561  {
562  if (nowait)
563  {
564  int counterparty_gone;
565 
566  /*
567  * We shouldn't return at this point at all unless the sender
568  * hasn't attached yet. However, the correct return value depends
569  * on whether the sender is still attached. If we first test
570  * whether the sender has ever attached and then test whether the
571  * sender has detached, there's a race condition: a sender that
572  * attaches and detaches very quickly might fool us into thinking
573  * the sender never attached at all. So, test whether our
574  * counterparty is definitively gone first, and only afterwards
575  * check whether the sender ever attached in the first place.
576  */
577  counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
578  if (shm_mq_get_sender(mq) == NULL)
579  {
580  if (counterparty_gone)
581  return SHM_MQ_DETACHED;
582  else
583  return SHM_MQ_WOULD_BLOCK;
584  }
585  }
586  else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
587  && shm_mq_get_sender(mq) == NULL)
588  {
589  mq->mq_detached = true;
590  return SHM_MQ_DETACHED;
591  }
592  mqh->mqh_counterparty_attached = true;
593  }
594 
595  /*
596  * If we've consumed an amount of data greater than 1/4th of the ring
597  * size, mark it consumed in shared memory. We try to avoid doing this
598  * unnecessarily when only a small amount of data has been consumed,
599  * because SetLatch() is fairly expensive and we don't want to do it too
600  * often.
601  */
602  if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
603  {
605  mqh->mqh_consume_pending = 0;
606  }
607 
608  /* Try to read, or finish reading, the length word from the buffer. */
609  while (!mqh->mqh_length_word_complete)
610  {
611  /* Try to receive the message length word. */
612  Assert(mqh->mqh_partial_bytes < sizeof(Size));
613  res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
614  nowait, &rb, &rawdata);
615  if (res != SHM_MQ_SUCCESS)
616  return res;
617 
618  /*
619  * Hopefully, we'll receive the entire message length word at once.
620  * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
621  * multiple reads.
622  */
623  if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
624  {
625  Size needed;
626 
627  nbytes = *(Size *) rawdata;
628 
629  /* If we've already got the whole message, we're done. */
630  needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
631  if (rb >= needed)
632  {
633  mqh->mqh_consume_pending += needed;
634  *nbytesp = nbytes;
635  *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
636  return SHM_MQ_SUCCESS;
637  }
638 
639  /*
640  * We don't have the whole message, but we at least have the whole
641  * length word.
642  */
643  mqh->mqh_expected_bytes = nbytes;
644  mqh->mqh_length_word_complete = true;
645  mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
646  rb -= MAXALIGN(sizeof(Size));
647  }
648  else
649  {
650  Size lengthbytes;
651 
652  /* Can't be split unless bigger than required alignment. */
653  Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
654 
655  /* Message word is split; need buffer to reassemble. */
656  if (mqh->mqh_buffer == NULL)
657  {
661  }
662  Assert(mqh->mqh_buflen >= sizeof(Size));
663 
664  /* Copy partial length word; remember to consume it. */
665  if (mqh->mqh_partial_bytes + rb > sizeof(Size))
666  lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
667  else
668  lengthbytes = rb;
669  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
670  lengthbytes);
671  mqh->mqh_partial_bytes += lengthbytes;
672  mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
673  rb -= lengthbytes;
674 
675  /* If we now have the whole word, we're ready to read payload. */
676  if (mqh->mqh_partial_bytes >= sizeof(Size))
677  {
678  Assert(mqh->mqh_partial_bytes == sizeof(Size));
679  mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
680  mqh->mqh_length_word_complete = true;
681  mqh->mqh_partial_bytes = 0;
682  }
683  }
684  }
685  nbytes = mqh->mqh_expected_bytes;
686 
687  /*
688  * Should be disallowed on the sending side already, but better check and
689  * error out on the receiver side as well rather than trying to read a
690  * prohibitively large message.
691  */
692  if (nbytes > MaxAllocSize)
693  ereport(ERROR,
694  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
695  errmsg("invalid message size %zu in shared memory queue",
696  nbytes)));
697 
698  if (mqh->mqh_partial_bytes == 0)
699  {
700  /*
701  * Try to obtain the whole message in a single chunk. If this works,
702  * we need not copy the data and can return a pointer directly into
703  * shared memory.
704  */
705  res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
706  if (res != SHM_MQ_SUCCESS)
707  return res;
708  if (rb >= nbytes)
709  {
710  mqh->mqh_length_word_complete = false;
711  mqh->mqh_consume_pending += MAXALIGN(nbytes);
712  *nbytesp = nbytes;
713  *datap = rawdata;
714  return SHM_MQ_SUCCESS;
715  }
716 
717  /*
718  * The message has wrapped the buffer. We'll need to copy it in order
719  * to return it to the client in one chunk. First, make sure we have
720  * a large enough buffer available.
721  */
722  if (mqh->mqh_buflen < nbytes)
723  {
724  Size newbuflen;
725 
726  /*
727  * Increase size to the next power of 2 that's >= nbytes, but
728  * limit to MaxAllocSize.
729  */
730  newbuflen = pg_nextpower2_size_t(nbytes);
731  newbuflen = Min(newbuflen, MaxAllocSize);
732 
733  if (mqh->mqh_buffer != NULL)
734  {
735  pfree(mqh->mqh_buffer);
736  mqh->mqh_buffer = NULL;
737  mqh->mqh_buflen = 0;
738  }
739  mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
740  mqh->mqh_buflen = newbuflen;
741  }
742  }
743 
744  /* Loop until we've copied the entire message. */
745  for (;;)
746  {
747  Size still_needed;
748 
749  /* Copy as much as we can. */
750  Assert(mqh->mqh_partial_bytes + rb <= nbytes);
751  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
752  mqh->mqh_partial_bytes += rb;
753 
754  /*
755  * Update count of bytes that can be consumed, accounting for
756  * alignment padding. Note that this will never actually insert any
757  * padding except at the end of a message, because the buffer size is
758  * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
759  */
760  Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
761  mqh->mqh_consume_pending += MAXALIGN(rb);
762 
763  /* If we got all the data, exit the loop. */
764  if (mqh->mqh_partial_bytes >= nbytes)
765  break;
766 
767  /* Wait for some more data. */
768  still_needed = nbytes - mqh->mqh_partial_bytes;
769  res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
770  if (res != SHM_MQ_SUCCESS)
771  return res;
772  if (rb > still_needed)
773  rb = still_needed;
774  }
775 
776  /* Return the complete message, and reset for next message. */
777  *nbytesp = nbytes;
778  *datap = mqh->mqh_buffer;
779  mqh->mqh_length_word_complete = false;
780  mqh->mqh_partial_bytes = 0;
781  return SHM_MQ_SUCCESS;
782 }
Size mqh_partial_bytes
Definition: shm_mq.c:140
#define pg_nextpower2_size_t(num)
Definition: pg_bitutils.h:191
PGPROC * MyProc
Definition: proc.c:68
#define Min(x, y)
Definition: c.h:986
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:251
int errcode(int sqlerrcode)
Definition: elog.c:698
PGPROC * mq_receiver
Definition: shm_mq.c:75
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:136
Size mqh_consume_pending
Definition: shm_mq.c:139
bool mqh_counterparty_attached
Definition: shm_mq.c:143
char * mqh_buffer
Definition: shm_mq.c:137
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1175
bool mqh_length_word_complete
Definition: shm_mq.c:142
void pfree(void *pointer)
Definition: mcxt.c:1169
#define ERROR
Definition: elog.h:46
#define MQH_INITIAL_BUFSIZE
Definition: shm_mq.c:165
Size mqh_buflen
Definition: shm_mq.c:138
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition: shm_mq.c:1036
Size mqh_expected_bytes
Definition: shm_mq.c:141
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
Definition: shm_mq.c:1227
#define MaxAllocSize
Definition: memutils.h:40
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1136
shm_mq * mqh_queue
Definition: shm_mq.c:134
bool mq_detached
Definition: shm_mq.c:80
#define ereport(elevel,...)
Definition: elog.h:157
shm_mq_result
Definition: shm_mq.h:36
#define Assert(condition)
Definition: c.h:804
size_t Size
Definition: c.h:540
#define MAXALIGN(LEN)
Definition: c.h:757
Size mq_ring_size
Definition: shm_mq.c:79
int errmsg(const char *fmt,...)
Definition: elog.c:909
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
Definition: shm_mq.c:72
MemoryContext mqh_context
Definition: shm_mq.c:144
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_send()

shm_mq_result shm_mq_send ( shm_mq_handle mqh,
Size  nbytes,
const void *  data,
bool  nowait 
)

Definition at line 322 of file shm_mq.c.

References shm_mq_iovec::data, shm_mq_iovec::len, and shm_mq_sendv().

Referenced by copy_messages(), test_shm_mq(), test_shm_mq_pipelined(), and tqueueReceiveSlot().

323 {
324  shm_mq_iovec iov;
325 
326  iov.data = data;
327  iov.len = nbytes;
328 
329  return shm_mq_sendv(mqh, &iov, 1, nowait);
330 }
Size len
Definition: shm_mq.h:32
const char * data
Definition: shm_mq.h:31
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
Definition: shm_mq.c:348

◆ shm_mq_sendv()

shm_mq_result shm_mq_sendv ( shm_mq_handle mqh,
shm_mq_iovec iov,
int  iovcnt,
bool  nowait 
)

Definition at line 348 of file shm_mq.c.

References Assert, shm_mq_iovec::data, ereport, errcode(), errmsg(), ERROR, i, shm_mq_iovec::len, MAXALIGN_DOWN, MaxAllocSize, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, MyProc, PGPROC::procLatch, SetLatch(), SHM_MQ_DETACHED, shm_mq_send_bytes(), SHM_MQ_SUCCESS, SpinLockAcquire, SpinLockRelease, and tmpbuf.

Referenced by mq_putmessage(), and shm_mq_send().

349 {
350  shm_mq_result res;
351  shm_mq *mq = mqh->mqh_queue;
352  PGPROC *receiver;
353  Size nbytes = 0;
354  Size bytes_written;
355  int i;
356  int which_iov = 0;
357  Size offset;
358 
359  Assert(mq->mq_sender == MyProc);
360 
361  /* Compute total size of write. */
362  for (i = 0; i < iovcnt; ++i)
363  nbytes += iov[i].len;
364 
365  /* Prevent writing messages overwhelming the receiver. */
366  if (nbytes > MaxAllocSize)
367  ereport(ERROR,
368  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
369  errmsg("cannot send a message of size %zu via shared memory queue",
370  nbytes)));
371 
372  /* Try to write, or finish writing, the length word into the buffer. */
373  while (!mqh->mqh_length_word_complete)
374  {
375  Assert(mqh->mqh_partial_bytes < sizeof(Size));
376  res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
377  ((char *) &nbytes) + mqh->mqh_partial_bytes,
378  nowait, &bytes_written);
379 
380  if (res == SHM_MQ_DETACHED)
381  {
382  /* Reset state in case caller tries to send another message. */
383  mqh->mqh_partial_bytes = 0;
384  mqh->mqh_length_word_complete = false;
385  return res;
386  }
387  mqh->mqh_partial_bytes += bytes_written;
388 
389  if (mqh->mqh_partial_bytes >= sizeof(Size))
390  {
391  Assert(mqh->mqh_partial_bytes == sizeof(Size));
392 
393  mqh->mqh_partial_bytes = 0;
394  mqh->mqh_length_word_complete = true;
395  }
396 
397  if (res != SHM_MQ_SUCCESS)
398  return res;
399 
400  /* Length word can't be split unless bigger than required alignment. */
401  Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
402  }
403 
404  /* Write the actual data bytes into the buffer. */
405  Assert(mqh->mqh_partial_bytes <= nbytes);
406  offset = mqh->mqh_partial_bytes;
407  do
408  {
409  Size chunksize;
410 
411  /* Figure out which bytes need to be sent next. */
412  if (offset >= iov[which_iov].len)
413  {
414  offset -= iov[which_iov].len;
415  ++which_iov;
416  if (which_iov >= iovcnt)
417  break;
418  continue;
419  }
420 
421  /*
422  * We want to avoid copying the data if at all possible, but every
423  * chunk of bytes we write into the queue has to be MAXALIGN'd, except
424  * the last. Thus, if a chunk other than the last one ends on a
425  * non-MAXALIGN'd boundary, we have to combine the tail end of its
426  * data with data from one or more following chunks until we either
427  * reach the last chunk or accumulate a number of bytes which is
428  * MAXALIGN'd.
429  */
430  if (which_iov + 1 < iovcnt &&
431  offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
432  {
433  char tmpbuf[MAXIMUM_ALIGNOF];
434  int j = 0;
435 
436  for (;;)
437  {
438  if (offset < iov[which_iov].len)
439  {
440  tmpbuf[j] = iov[which_iov].data[offset];
441  j++;
442  offset++;
443  if (j == MAXIMUM_ALIGNOF)
444  break;
445  }
446  else
447  {
448  offset -= iov[which_iov].len;
449  which_iov++;
450  if (which_iov >= iovcnt)
451  break;
452  }
453  }
454 
455  res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
456 
457  if (res == SHM_MQ_DETACHED)
458  {
459  /* Reset state in case caller tries to send another message. */
460  mqh->mqh_partial_bytes = 0;
461  mqh->mqh_length_word_complete = false;
462  return res;
463  }
464 
465  mqh->mqh_partial_bytes += bytes_written;
466  if (res != SHM_MQ_SUCCESS)
467  return res;
468  continue;
469  }
470 
471  /*
472  * If this is the last chunk, we can write all the data, even if it
473  * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
474  * MAXALIGN_DOWN the write size.
475  */
476  chunksize = iov[which_iov].len - offset;
477  if (which_iov + 1 < iovcnt)
478  chunksize = MAXALIGN_DOWN(chunksize);
479  res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
480  nowait, &bytes_written);
481 
482  if (res == SHM_MQ_DETACHED)
483  {
484  /* Reset state in case caller tries to send another message. */
485  mqh->mqh_length_word_complete = false;
486  mqh->mqh_partial_bytes = 0;
487  return res;
488  }
489 
490  mqh->mqh_partial_bytes += bytes_written;
491  offset += bytes_written;
492  if (res != SHM_MQ_SUCCESS)
493  return res;
494  } while (mqh->mqh_partial_bytes < nbytes);
495 
496  /* Reset for next message. */
497  mqh->mqh_partial_bytes = 0;
498  mqh->mqh_length_word_complete = false;
499 
500  /* If queue has been detached, let caller know. */
501  if (mq->mq_detached)
502  return SHM_MQ_DETACHED;
503 
504  /*
505  * If the counterparty is known to have attached, we can read mq_receiver
506  * without acquiring the spinlock and assume it isn't NULL. Otherwise,
507  * more caution is needed.
508  */
509  if (mqh->mqh_counterparty_attached)
510  receiver = mq->mq_receiver;
511  else
512  {
514  receiver = mq->mq_receiver;
516  if (receiver == NULL)
517  return SHM_MQ_SUCCESS;
518  mqh->mqh_counterparty_attached = true;
519  }
520 
521  /* Notify receiver of the newly-written data, and return. */
522  SetLatch(&receiver->procLatch);
523  return SHM_MQ_SUCCESS;
524 }
Size mqh_partial_bytes
Definition: shm_mq.c:140
PGPROC * MyProc
Definition: proc.c:68
int errcode(int sqlerrcode)
Definition: elog.c:698
PGPROC * mq_receiver
Definition: shm_mq.c:75
void SetLatch(Latch *latch)
Definition: latch.c:567
bool mqh_counterparty_attached
Definition: shm_mq.c:143
Latch procLatch
Definition: proc.h:130
Size len
Definition: shm_mq.h:32
#define SpinLockAcquire(lock)
Definition: spin.h:62
bool mqh_length_word_complete
Definition: shm_mq.c:142
const char * data
Definition: shm_mq.h:31
#define ERROR
Definition: elog.h:46
slock_t mq_mutex
Definition: shm_mq.c:74
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition: shm_mq.c:881
#define MaxAllocSize
Definition: memutils.h:40
#define SpinLockRelease(lock)
Definition: spin.h:64
shm_mq * mqh_queue
Definition: shm_mq.c:134
bool mq_detached
Definition: shm_mq.c:80
#define ereport(elevel,...)
Definition: elog.h:157
shm_mq_result
Definition: shm_mq.h:36
#define Assert(condition)
Definition: c.h:804
size_t Size
Definition: c.h:540
static StringInfoData tmpbuf
Definition: walsender.c:159
int errmsg(const char *fmt,...)
Definition: elog.c:909
int i
Definition: shm_mq.c:72
Definition: proc.h:121
#define MAXALIGN_DOWN(LEN)
Definition: c.h:769
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_set_handle()

void shm_mq_set_handle ( shm_mq_handle ,
BackgroundWorkerHandle  
)

Definition at line 312 of file shm_mq.c.

References Assert, and shm_mq_handle::mqh_handle.

Referenced by ExecParallelCreateReaders(), and LaunchParallelWorkers().

313 {
314  Assert(mqh->mqh_handle == NULL);
315  mqh->mqh_handle = handle;
316 }
#define Assert(condition)
Definition: c.h:804

◆ shm_mq_set_receiver()

void shm_mq_set_receiver ( shm_mq mq,
PGPROC  
)

Definition at line 200 of file shm_mq.c.

References Assert, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by attach_to_queues(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

201 {
202  PGPROC *sender;
203 
205  Assert(mq->mq_receiver == NULL);
206  mq->mq_receiver = proc;
207  sender = mq->mq_sender;
209 
210  if (sender != NULL)
211  SetLatch(&sender->procLatch);
212 }
PGPROC * mq_receiver
Definition: shm_mq.c:75
void SetLatch(Latch *latch)
Definition: latch.c:567
Latch procLatch
Definition: proc.h:130
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:74
#define SpinLockRelease(lock)
Definition: spin.h:64
#define Assert(condition)
Definition: c.h:804
Definition: proc.h:121
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_set_sender()

void shm_mq_set_sender ( shm_mq mq,
PGPROC  
)

Definition at line 218 of file shm_mq.c.

References Assert, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ParallelWorkerMain(), and setup_dynamic_shared_memory().

219 {
220  PGPROC *receiver;
221 
223  Assert(mq->mq_sender == NULL);
224  mq->mq_sender = proc;
225  receiver = mq->mq_receiver;
227 
228  if (receiver != NULL)
229  SetLatch(&receiver->procLatch);
230 }
PGPROC * mq_receiver
Definition: shm_mq.c:75
void SetLatch(Latch *latch)
Definition: latch.c:567
Latch procLatch
Definition: proc.h:130
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:74
#define SpinLockRelease(lock)
Definition: spin.h:64
#define Assert(condition)
Definition: c.h:804
Definition: proc.h:121
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_wait_for_attach()

shm_mq_result shm_mq_wait_for_attach ( shm_mq_handle mqh)

Definition at line 794 of file shm_mq.c.

References Assert, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_queue, MyProc, SHM_MQ_DETACHED, shm_mq_get_receiver(), shm_mq_get_sender(), SHM_MQ_SUCCESS, and shm_mq_wait_internal().

795 {
796  shm_mq *mq = mqh->mqh_queue;
797  PGPROC **victim;
798 
799  if (shm_mq_get_receiver(mq) == MyProc)
800  victim = &mq->mq_sender;
801  else
802  {
804  victim = &mq->mq_receiver;
805  }
806 
807  if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
808  return SHM_MQ_SUCCESS;
809  else
810  return SHM_MQ_DETACHED;
811 }
PGPROC * MyProc
Definition: proc.c:68
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:251
PGPROC * mq_receiver
Definition: shm_mq.c:75
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:236
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:136
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1175
shm_mq * mqh_queue
Definition: shm_mq.c:134
#define Assert(condition)
Definition: c.h:804
Definition: shm_mq.c:72
Definition: proc.h:121
PGPROC * mq_sender
Definition: shm_mq.c:76

Variable Documentation

◆ shm_mq_minimum_size

PGDLLIMPORT const Size shm_mq_minimum_size

Definition at line 162 of file shm_mq.c.

Referenced by setup_dynamic_shared_memory().