PostgreSQL Source Code  git master
shm_mq.h File Reference
#include "postmaster/bgworker.h"
#include "storage/dsm.h"
#include "storage/proc.h"
Include dependency graph for shm_mq.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  shm_mq_iovec
 

Typedefs

typedef struct shm_mq shm_mq
 
typedef struct shm_mq_handle shm_mq_handle
 

Enumerations

enum  shm_mq_result { SHM_MQ_SUCCESS , SHM_MQ_WOULD_BLOCK , SHM_MQ_DETACHED }
 

Functions

shm_mqshm_mq_create (void *address, Size size)
 
void shm_mq_set_receiver (shm_mq *mq, PGPROC *)
 
void shm_mq_set_sender (shm_mq *mq, PGPROC *)
 
PGPROCshm_mq_get_receiver (shm_mq *)
 
PGPROCshm_mq_get_sender (shm_mq *)
 
shm_mq_handleshm_mq_attach (shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 
void shm_mq_set_handle (shm_mq_handle *, BackgroundWorkerHandle *)
 
void shm_mq_detach (shm_mq_handle *mqh)
 
shm_mqshm_mq_get_queue (shm_mq_handle *mqh)
 
shm_mq_result shm_mq_send (shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
 
shm_mq_result shm_mq_sendv (shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool force_flush)
 
shm_mq_result shm_mq_receive (shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 
shm_mq_result shm_mq_wait_for_attach (shm_mq_handle *mqh)
 

Variables

PGDLLIMPORT const Size shm_mq_minimum_size
 

Typedef Documentation

◆ shm_mq

typedef struct shm_mq shm_mq

Definition at line 1 of file shm_mq.h.

◆ shm_mq_handle

typedef struct shm_mq_handle shm_mq_handle

Definition at line 1 of file shm_mq.h.

Enumeration Type Documentation

◆ shm_mq_result

Enumerator
SHM_MQ_SUCCESS 
SHM_MQ_WOULD_BLOCK 
SHM_MQ_DETACHED 

Definition at line 36 of file shm_mq.h.

37 {
38  SHM_MQ_SUCCESS, /* Sent or received a message. */
39  SHM_MQ_WOULD_BLOCK, /* Not completed; retry later. */
40  SHM_MQ_DETACHED, /* Other process has detached queue. */
shm_mq_result
Definition: shm_mq.h:37
@ SHM_MQ_SUCCESS
Definition: shm_mq.h:38
@ SHM_MQ_WOULD_BLOCK
Definition: shm_mq.h:39
@ SHM_MQ_DETACHED
Definition: shm_mq.h:40

Function Documentation

◆ shm_mq_attach()

shm_mq_handle* shm_mq_attach ( shm_mq mq,
dsm_segment seg,
BackgroundWorkerHandle handle 
)

Definition at line 290 of file shm_mq.c.

291 {
292  shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
293 
294  Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
295  mqh->mqh_queue = mq;
296  mqh->mqh_segment = seg;
297  mqh->mqh_handle = handle;
298  mqh->mqh_buffer = NULL;
299  mqh->mqh_buflen = 0;
300  mqh->mqh_consume_pending = 0;
301  mqh->mqh_send_pending = 0;
302  mqh->mqh_partial_bytes = 0;
303  mqh->mqh_expected_bytes = 0;
304  mqh->mqh_length_word_complete = false;
305  mqh->mqh_counterparty_attached = false;
307 
308  if (seg != NULL)
310 
311  return mqh;
312 }
#define Assert(condition)
Definition: c.h:858
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1132
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void * palloc(Size size)
Definition: mcxt.c:1317
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1323
PGPROC * MyProc
Definition: proc.c:67
Size mqh_consume_pending
Definition: shm_mq.c:144
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:141
char * mqh_buffer
Definition: shm_mq.c:142
Size mqh_send_pending
Definition: shm_mq.c:145
Size mqh_expected_bytes
Definition: shm_mq.c:147
bool mqh_counterparty_attached
Definition: shm_mq.c:149
shm_mq * mqh_queue
Definition: shm_mq.c:139
dsm_segment * mqh_segment
Definition: shm_mq.c:140
bool mqh_length_word_complete
Definition: shm_mq.c:148
Size mqh_buflen
Definition: shm_mq.c:143
MemoryContext mqh_context
Definition: shm_mq.c:150
Size mqh_partial_bytes
Definition: shm_mq.c:146
PGPROC * mq_sender
Definition: shm_mq.c:75
PGPROC * mq_receiver
Definition: shm_mq.c:74

References Assert, CurrentMemoryContext, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_expected_bytes, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_segment, shm_mq_handle::mqh_send_pending, MyProc, on_dsm_detach(), palloc(), PointerGetDatum(), and shm_mq_detach_callback().

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), pa_setup_dsm(), ParallelApplyWorkerMain(), ParallelWorkerMain(), ReinitializeParallelDSM(), and test_shm_mq_setup().

◆ shm_mq_create()

shm_mq* shm_mq_create ( void *  address,
Size  size 
)

Definition at line 177 of file shm_mq.c.

178 {
179  shm_mq *mq = address;
180  Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
181 
182  /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
184 
185  /* Queue size must be large enough to hold some data. */
186  Assert(size > data_offset);
187 
188  /* Initialize queue header. */
189  SpinLockInit(&mq->mq_mutex);
190  mq->mq_receiver = NULL;
191  mq->mq_sender = NULL;
194  mq->mq_ring_size = size - data_offset;
195  mq->mq_detached = false;
196  mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
197 
198  return mq;
199 }
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:453
#define MAXALIGN_DOWN(LEN)
Definition: c.h:823
#define MAXALIGN(LEN)
Definition: c.h:811
size_t Size
Definition: c.h:605
static pg_noinline void Size size
Definition: slab.c:607
#define SpinLockInit(lock)
Definition: spin.h:57
Definition: shm_mq.c:72
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:77
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:76
bool mq_detached
Definition: shm_mq.c:79
uint8 mq_ring_offset
Definition: shm_mq.c:80
slock_t mq_mutex
Definition: shm_mq.c:73
Size mq_ring_size
Definition: shm_mq.c:78

References Assert, MAXALIGN, MAXALIGN_DOWN, shm_mq::mq_bytes_read, shm_mq::mq_bytes_written, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_ring_offset, shm_mq::mq_ring_size, shm_mq::mq_sender, pg_atomic_init_u64(), size, and SpinLockInit.

Referenced by ExecParallelSetupTupleQueues(), InitializeParallelDSM(), pa_setup_dsm(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

◆ shm_mq_detach()

void shm_mq_detach ( shm_mq_handle mqh)

Definition at line 843 of file shm_mq.c.

844 {
845  /* Before detaching, notify the receiver about any already-written data. */
846  if (mqh->mqh_send_pending > 0)
847  {
849  mqh->mqh_send_pending = 0;
850  }
851 
852  /* Notify counterparty that we're outta here. */
854 
855  /* Cancel on_dsm_detach callback, if any. */
856  if (mqh->mqh_segment)
859  PointerGetDatum(mqh->mqh_queue));
860 
861  /* Release local memory associated with handle. */
862  if (mqh->mqh_buffer != NULL)
863  pfree(mqh->mqh_buffer);
864  pfree(mqh);
865 }
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1147
void pfree(void *pointer)
Definition: mcxt.c:1521
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
Definition: shm_mq.c:1303
static void shm_mq_detach_internal(shm_mq *mq)
Definition: shm_mq.c:882

References cancel_on_dsm_detach(), shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_segment, shm_mq_handle::mqh_send_pending, pfree(), PointerGetDatum(), shm_mq_detach_callback(), shm_mq_detach_internal(), and shm_mq_inc_bytes_written().

Referenced by DestroyParallelContext(), ExecParallelFinish(), HandleParallelMessage(), LaunchParallelWorkers(), logicalrep_pa_worker_stop(), mq_putmessage(), pa_detach_all_error_mq(), pa_free_worker_info(), tqueueDestroyReceiver(), and tqueueShutdownReceiver().

◆ shm_mq_get_queue()

shm_mq* shm_mq_get_queue ( shm_mq_handle mqh)

Definition at line 905 of file shm_mq.c.

906 {
907  return mqh->mqh_queue;
908 }

References shm_mq_handle::mqh_queue.

Referenced by WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

◆ shm_mq_get_receiver()

PGPROC* shm_mq_get_receiver ( shm_mq mq)

Definition at line 242 of file shm_mq.c.

243 {
244  PGPROC *receiver;
245 
247  receiver = mq->mq_receiver;
249 
250  return receiver;
251 }
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
Definition: proc.h:157

References shm_mq::mq_mutex, shm_mq::mq_receiver, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_send_bytes(), and shm_mq_wait_for_attach().

◆ shm_mq_get_sender()

PGPROC* shm_mq_get_sender ( shm_mq mq)

Definition at line 257 of file shm_mq.c.

258 {
259  PGPROC *sender;
260 
262  sender = mq->mq_sender;
264 
265  return sender;
266 }

References shm_mq::mq_mutex, shm_mq::mq_sender, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_receive(), shm_mq_wait_for_attach(), WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

◆ shm_mq_receive()

shm_mq_result shm_mq_receive ( shm_mq_handle mqh,
Size nbytesp,
void **  datap,
bool  nowait 
)

Definition at line 572 of file shm_mq.c.

573 {
574  shm_mq *mq = mqh->mqh_queue;
576  Size rb = 0;
577  Size nbytes;
578  void *rawdata;
579 
580  Assert(mq->mq_receiver == MyProc);
581 
582  /* We can't receive data until the sender has attached. */
583  if (!mqh->mqh_counterparty_attached)
584  {
585  if (nowait)
586  {
587  int counterparty_gone;
588 
589  /*
590  * We shouldn't return at this point at all unless the sender
591  * hasn't attached yet. However, the correct return value depends
592  * on whether the sender is still attached. If we first test
593  * whether the sender has ever attached and then test whether the
594  * sender has detached, there's a race condition: a sender that
595  * attaches and detaches very quickly might fool us into thinking
596  * the sender never attached at all. So, test whether our
597  * counterparty is definitively gone first, and only afterwards
598  * check whether the sender ever attached in the first place.
599  */
600  counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
601  if (shm_mq_get_sender(mq) == NULL)
602  {
603  if (counterparty_gone)
604  return SHM_MQ_DETACHED;
605  else
606  return SHM_MQ_WOULD_BLOCK;
607  }
608  }
609  else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
610  && shm_mq_get_sender(mq) == NULL)
611  {
612  mq->mq_detached = true;
613  return SHM_MQ_DETACHED;
614  }
615  mqh->mqh_counterparty_attached = true;
616  }
617 
618  /*
619  * If we've consumed an amount of data greater than 1/4th of the ring
620  * size, mark it consumed in shared memory. We try to avoid doing this
621  * unnecessarily when only a small amount of data has been consumed,
622  * because SetLatch() is fairly expensive and we don't want to do it too
623  * often.
624  */
625  if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
626  {
628  mqh->mqh_consume_pending = 0;
629  }
630 
631  /* Try to read, or finish reading, the length word from the buffer. */
632  while (!mqh->mqh_length_word_complete)
633  {
634  /* Try to receive the message length word. */
635  Assert(mqh->mqh_partial_bytes < sizeof(Size));
636  res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
637  nowait, &rb, &rawdata);
638  if (res != SHM_MQ_SUCCESS)
639  return res;
640 
641  /*
642  * Hopefully, we'll receive the entire message length word at once.
643  * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
644  * multiple reads.
645  */
646  if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
647  {
648  Size needed;
649 
650  nbytes = *(Size *) rawdata;
651 
652  /* If we've already got the whole message, we're done. */
653  needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
654  if (rb >= needed)
655  {
656  mqh->mqh_consume_pending += needed;
657  *nbytesp = nbytes;
658  *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
659  return SHM_MQ_SUCCESS;
660  }
661 
662  /*
663  * We don't have the whole message, but we at least have the whole
664  * length word.
665  */
666  mqh->mqh_expected_bytes = nbytes;
667  mqh->mqh_length_word_complete = true;
668  mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
669  rb -= MAXALIGN(sizeof(Size));
670  }
671  else
672  {
673  Size lengthbytes;
674 
675  /* Can't be split unless bigger than required alignment. */
676  Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
677 
678  /* Message word is split; need buffer to reassemble. */
679  if (mqh->mqh_buffer == NULL)
680  {
684  }
685  Assert(mqh->mqh_buflen >= sizeof(Size));
686 
687  /* Copy partial length word; remember to consume it. */
688  if (mqh->mqh_partial_bytes + rb > sizeof(Size))
689  lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
690  else
691  lengthbytes = rb;
692  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
693  lengthbytes);
694  mqh->mqh_partial_bytes += lengthbytes;
695  mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
696  rb -= lengthbytes;
697 
698  /* If we now have the whole word, we're ready to read payload. */
699  if (mqh->mqh_partial_bytes >= sizeof(Size))
700  {
701  Assert(mqh->mqh_partial_bytes == sizeof(Size));
702  mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
703  mqh->mqh_length_word_complete = true;
704  mqh->mqh_partial_bytes = 0;
705  }
706  }
707  }
708  nbytes = mqh->mqh_expected_bytes;
709 
710  /*
711  * Should be disallowed on the sending side already, but better check and
712  * error out on the receiver side as well rather than trying to read a
713  * prohibitively large message.
714  */
715  if (nbytes > MaxAllocSize)
716  ereport(ERROR,
717  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
718  errmsg("invalid message size %zu in shared memory queue",
719  nbytes)));
720 
721  if (mqh->mqh_partial_bytes == 0)
722  {
723  /*
724  * Try to obtain the whole message in a single chunk. If this works,
725  * we need not copy the data and can return a pointer directly into
726  * shared memory.
727  */
728  res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
729  if (res != SHM_MQ_SUCCESS)
730  return res;
731  if (rb >= nbytes)
732  {
733  mqh->mqh_length_word_complete = false;
734  mqh->mqh_consume_pending += MAXALIGN(nbytes);
735  *nbytesp = nbytes;
736  *datap = rawdata;
737  return SHM_MQ_SUCCESS;
738  }
739 
740  /*
741  * The message has wrapped the buffer. We'll need to copy it in order
742  * to return it to the client in one chunk. First, make sure we have
743  * a large enough buffer available.
744  */
745  if (mqh->mqh_buflen < nbytes)
746  {
747  Size newbuflen;
748 
749  /*
750  * Increase size to the next power of 2 that's >= nbytes, but
751  * limit to MaxAllocSize.
752  */
753  newbuflen = pg_nextpower2_size_t(nbytes);
754  newbuflen = Min(newbuflen, MaxAllocSize);
755 
756  if (mqh->mqh_buffer != NULL)
757  {
758  pfree(mqh->mqh_buffer);
759  mqh->mqh_buffer = NULL;
760  mqh->mqh_buflen = 0;
761  }
762  mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
763  mqh->mqh_buflen = newbuflen;
764  }
765  }
766 
767  /* Loop until we've copied the entire message. */
768  for (;;)
769  {
770  Size still_needed;
771 
772  /* Copy as much as we can. */
773  Assert(mqh->mqh_partial_bytes + rb <= nbytes);
774  if (rb > 0)
775  {
776  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
777  mqh->mqh_partial_bytes += rb;
778  }
779 
780  /*
781  * Update count of bytes that can be consumed, accounting for
782  * alignment padding. Note that this will never actually insert any
783  * padding except at the end of a message, because the buffer size is
784  * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
785  */
786  Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
787  mqh->mqh_consume_pending += MAXALIGN(rb);
788 
789  /* If we got all the data, exit the loop. */
790  if (mqh->mqh_partial_bytes >= nbytes)
791  break;
792 
793  /* Wait for some more data. */
794  still_needed = nbytes - mqh->mqh_partial_bytes;
795  res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
796  if (res != SHM_MQ_SUCCESS)
797  return res;
798  if (rb > still_needed)
799  rb = still_needed;
800  }
801 
802  /* Return the complete message, and reset for next message. */
803  *nbytesp = nbytes;
804  *datap = mqh->mqh_buffer;
805  mqh->mqh_length_word_complete = false;
806  mqh->mqh_partial_bytes = 0;
807  return SHM_MQ_SUCCESS;
808 }
#define Min(x, y)
Definition: c.h:1004
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1181
#define MaxAllocSize
Definition: memutils.h:40
#define pg_nextpower2_size_t
Definition: pg_bitutils.h:417
#define MQH_INITIAL_BUFSIZE
Definition: shm_mq.c:171
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1179
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1218
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:257
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
Definition: shm_mq.c:1270
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition: shm_mq.c:1079

References Assert, ereport, errcode(), errmsg(), ERROR, MAXALIGN, MaxAllocSize, MemoryContextAlloc(), Min, shm_mq::mq_detached, shm_mq::mq_receiver, shm_mq::mq_ring_size, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_expected_bytes, shm_mq_handle::mqh_handle, MQH_INITIAL_BUFSIZE, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, MyProc, pfree(), pg_nextpower2_size_t, res, shm_mq_counterparty_gone(), SHM_MQ_DETACHED, shm_mq_get_sender(), shm_mq_inc_bytes_read(), shm_mq_receive_bytes(), SHM_MQ_SUCCESS, shm_mq_wait_internal(), and SHM_MQ_WOULD_BLOCK.

Referenced by copy_messages(), HandleParallelApplyMessages(), HandleParallelMessages(), LogicalParallelApplyLoop(), test_shm_mq(), test_shm_mq_pipelined(), and TupleQueueReaderNext().

◆ shm_mq_send()

shm_mq_result shm_mq_send ( shm_mq_handle mqh,
Size  nbytes,
const void *  data,
bool  nowait,
bool  force_flush 
)

Definition at line 329 of file shm_mq.c.

331 {
332  shm_mq_iovec iov;
333 
334  iov.data = data;
335  iov.len = nbytes;
336 
337  return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
338 }
const void * data
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool force_flush)
Definition: shm_mq.c:361
const char * data
Definition: shm_mq.h:31
Size len
Definition: shm_mq.h:32

References shm_mq_iovec::data, data, shm_mq_iovec::len, and shm_mq_sendv().

Referenced by copy_messages(), pa_send_data(), test_shm_mq(), test_shm_mq_pipelined(), and tqueueReceiveSlot().

◆ shm_mq_sendv()

shm_mq_result shm_mq_sendv ( shm_mq_handle mqh,
shm_mq_iovec iov,
int  iovcnt,
bool  nowait,
bool  force_flush 
)

Definition at line 361 of file shm_mq.c.

363 {
365  shm_mq *mq = mqh->mqh_queue;
366  PGPROC *receiver;
367  Size nbytes = 0;
368  Size bytes_written;
369  int i;
370  int which_iov = 0;
371  Size offset;
372 
373  Assert(mq->mq_sender == MyProc);
374 
375  /* Compute total size of write. */
376  for (i = 0; i < iovcnt; ++i)
377  nbytes += iov[i].len;
378 
379  /* Prevent writing messages overwhelming the receiver. */
380  if (nbytes > MaxAllocSize)
381  ereport(ERROR,
382  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
383  errmsg("cannot send a message of size %zu via shared memory queue",
384  nbytes)));
385 
386  /* Try to write, or finish writing, the length word into the buffer. */
387  while (!mqh->mqh_length_word_complete)
388  {
389  Assert(mqh->mqh_partial_bytes < sizeof(Size));
390  res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
391  ((char *) &nbytes) + mqh->mqh_partial_bytes,
392  nowait, &bytes_written);
393 
394  if (res == SHM_MQ_DETACHED)
395  {
396  /* Reset state in case caller tries to send another message. */
397  mqh->mqh_partial_bytes = 0;
398  mqh->mqh_length_word_complete = false;
399  return res;
400  }
401  mqh->mqh_partial_bytes += bytes_written;
402 
403  if (mqh->mqh_partial_bytes >= sizeof(Size))
404  {
405  Assert(mqh->mqh_partial_bytes == sizeof(Size));
406 
407  mqh->mqh_partial_bytes = 0;
408  mqh->mqh_length_word_complete = true;
409  }
410 
411  if (res != SHM_MQ_SUCCESS)
412  return res;
413 
414  /* Length word can't be split unless bigger than required alignment. */
415  Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
416  }
417 
418  /* Write the actual data bytes into the buffer. */
419  Assert(mqh->mqh_partial_bytes <= nbytes);
420  offset = mqh->mqh_partial_bytes;
421  do
422  {
423  Size chunksize;
424 
425  /* Figure out which bytes need to be sent next. */
426  if (offset >= iov[which_iov].len)
427  {
428  offset -= iov[which_iov].len;
429  ++which_iov;
430  if (which_iov >= iovcnt)
431  break;
432  continue;
433  }
434 
435  /*
436  * We want to avoid copying the data if at all possible, but every
437  * chunk of bytes we write into the queue has to be MAXALIGN'd, except
438  * the last. Thus, if a chunk other than the last one ends on a
439  * non-MAXALIGN'd boundary, we have to combine the tail end of its
440  * data with data from one or more following chunks until we either
441  * reach the last chunk or accumulate a number of bytes which is
442  * MAXALIGN'd.
443  */
444  if (which_iov + 1 < iovcnt &&
445  offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
446  {
447  char tmpbuf[MAXIMUM_ALIGNOF];
448  int j = 0;
449 
450  for (;;)
451  {
452  if (offset < iov[which_iov].len)
453  {
454  tmpbuf[j] = iov[which_iov].data[offset];
455  j++;
456  offset++;
457  if (j == MAXIMUM_ALIGNOF)
458  break;
459  }
460  else
461  {
462  offset -= iov[which_iov].len;
463  which_iov++;
464  if (which_iov >= iovcnt)
465  break;
466  }
467  }
468 
469  res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
470 
471  if (res == SHM_MQ_DETACHED)
472  {
473  /* Reset state in case caller tries to send another message. */
474  mqh->mqh_partial_bytes = 0;
475  mqh->mqh_length_word_complete = false;
476  return res;
477  }
478 
479  mqh->mqh_partial_bytes += bytes_written;
480  if (res != SHM_MQ_SUCCESS)
481  return res;
482  continue;
483  }
484 
485  /*
486  * If this is the last chunk, we can write all the data, even if it
487  * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
488  * MAXALIGN_DOWN the write size.
489  */
490  chunksize = iov[which_iov].len - offset;
491  if (which_iov + 1 < iovcnt)
492  chunksize = MAXALIGN_DOWN(chunksize);
493  res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
494  nowait, &bytes_written);
495 
496  if (res == SHM_MQ_DETACHED)
497  {
498  /* Reset state in case caller tries to send another message. */
499  mqh->mqh_length_word_complete = false;
500  mqh->mqh_partial_bytes = 0;
501  return res;
502  }
503 
504  mqh->mqh_partial_bytes += bytes_written;
505  offset += bytes_written;
506  if (res != SHM_MQ_SUCCESS)
507  return res;
508  } while (mqh->mqh_partial_bytes < nbytes);
509 
510  /* Reset for next message. */
511  mqh->mqh_partial_bytes = 0;
512  mqh->mqh_length_word_complete = false;
513 
514  /* If queue has been detached, let caller know. */
515  if (mq->mq_detached)
516  return SHM_MQ_DETACHED;
517 
518  /*
519  * If the counterparty is known to have attached, we can read mq_receiver
520  * without acquiring the spinlock. Otherwise, more caution is needed.
521  */
522  if (mqh->mqh_counterparty_attached)
523  receiver = mq->mq_receiver;
524  else
525  {
527  receiver = mq->mq_receiver;
529  if (receiver != NULL)
530  mqh->mqh_counterparty_attached = true;
531  }
532 
533  /*
534  * If the caller has requested force flush or we have written more than
535  * 1/4 of the ring size, mark it as written in shared memory and notify
536  * the receiver.
537  */
538  if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
539  {
541  if (receiver != NULL)
542  SetLatch(&receiver->procLatch);
543  mqh->mqh_send_pending = 0;
544  }
545 
546  return SHM_MQ_SUCCESS;
547 }
int j
Definition: isn.c:74
int i
Definition: isn.c:73
void SetLatch(Latch *latch)
Definition: latch.c:632
const void size_t len
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition: shm_mq.c:914
Latch procLatch
Definition: proc.h:164
static StringInfoData tmpbuf
Definition: walsender.c:170

References Assert, shm_mq_iovec::data, data, ereport, errcode(), errmsg(), ERROR, i, j, shm_mq_iovec::len, len, MAXALIGN_DOWN, MaxAllocSize, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_ring_size, shm_mq::mq_sender, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_send_pending, MyProc, PGPROC::procLatch, res, SetLatch(), SHM_MQ_DETACHED, shm_mq_inc_bytes_written(), shm_mq_send_bytes(), SHM_MQ_SUCCESS, SpinLockAcquire, SpinLockRelease, and tmpbuf.

Referenced by mq_putmessage(), and shm_mq_send().

◆ shm_mq_set_handle()

void shm_mq_set_handle ( shm_mq_handle mqh,
BackgroundWorkerHandle handle 
)

Definition at line 319 of file shm_mq.c.

320 {
321  Assert(mqh->mqh_handle == NULL);
322  mqh->mqh_handle = handle;
323 }

References Assert, and shm_mq_handle::mqh_handle.

Referenced by ExecParallelCreateReaders(), and LaunchParallelWorkers().

◆ shm_mq_set_receiver()

void shm_mq_set_receiver ( shm_mq mq,
PGPROC proc 
)

Definition at line 206 of file shm_mq.c.

207 {
208  PGPROC *sender;
209 
211  Assert(mq->mq_receiver == NULL);
212  mq->mq_receiver = proc;
213  sender = mq->mq_sender;
215 
216  if (sender != NULL)
217  SetLatch(&sender->procLatch);
218 }

References Assert, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by attach_to_queues(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), pa_setup_dsm(), ParallelApplyWorkerMain(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

◆ shm_mq_set_sender()

void shm_mq_set_sender ( shm_mq mq,
PGPROC proc 
)

Definition at line 224 of file shm_mq.c.

225 {
226  PGPROC *receiver;
227 
229  Assert(mq->mq_sender == NULL);
230  mq->mq_sender = proc;
231  receiver = mq->mq_receiver;
233 
234  if (receiver != NULL)
235  SetLatch(&receiver->procLatch);
236 }

References Assert, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by attach_to_queues(), ExecParallelGetReceiver(), pa_setup_dsm(), ParallelApplyWorkerMain(), ParallelWorkerMain(), and setup_dynamic_shared_memory().

◆ shm_mq_wait_for_attach()

shm_mq_result shm_mq_wait_for_attach ( shm_mq_handle mqh)

Definition at line 820 of file shm_mq.c.

821 {
822  shm_mq *mq = mqh->mqh_queue;
823  PGPROC **victim;
824 
825  if (shm_mq_get_receiver(mq) == MyProc)
826  victim = &mq->mq_sender;
827  else
828  {
830  victim = &mq->mq_receiver;
831  }
832 
833  if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
834  return SHM_MQ_SUCCESS;
835  else
836  return SHM_MQ_DETACHED;
837 }
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:242

References Assert, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_queue, MyProc, SHM_MQ_DETACHED, shm_mq_get_receiver(), shm_mq_get_sender(), SHM_MQ_SUCCESS, and shm_mq_wait_internal().

Variable Documentation

◆ shm_mq_minimum_size

PGDLLIMPORT const Size shm_mq_minimum_size
extern

Definition at line 168 of file shm_mq.c.

Referenced by setup_dynamic_shared_memory().