PostgreSQL Source Code  git master
shm_mq.h File Reference
#include "postmaster/bgworker.h"
#include "storage/dsm.h"
#include "storage/proc.h"
Include dependency graph for shm_mq.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  shm_mq_iovec
 

Typedefs

typedef struct shm_mq shm_mq
 
typedef struct shm_mq_handle shm_mq_handle
 

Enumerations

enum  shm_mq_result { SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, SHM_MQ_DETACHED }
 

Functions

shm_mqshm_mq_create (void *address, Size size)
 
void shm_mq_set_receiver (shm_mq *mq, PGPROC *)
 
void shm_mq_set_sender (shm_mq *mq, PGPROC *)
 
PGPROCshm_mq_get_receiver (shm_mq *)
 
PGPROCshm_mq_get_sender (shm_mq *)
 
shm_mq_handleshm_mq_attach (shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 
void shm_mq_set_handle (shm_mq_handle *, BackgroundWorkerHandle *)
 
void shm_mq_detach (shm_mq_handle *mqh)
 
shm_mqshm_mq_get_queue (shm_mq_handle *mqh)
 
shm_mq_result shm_mq_send (shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
 
shm_mq_result shm_mq_sendv (shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
 
shm_mq_result shm_mq_receive (shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 
shm_mq_result shm_mq_wait_for_attach (shm_mq_handle *mqh)
 

Variables

PGDLLIMPORT const Size shm_mq_minimum_size
 

Typedef Documentation

◆ shm_mq

Definition at line 22 of file shm_mq.h.

◆ shm_mq_handle

Definition at line 26 of file shm_mq.h.

Enumeration Type Documentation

◆ shm_mq_result

Enumerator
SHM_MQ_SUCCESS 
SHM_MQ_WOULD_BLOCK 
SHM_MQ_DETACHED 

Definition at line 36 of file shm_mq.h.

37 {
38  SHM_MQ_SUCCESS, /* Sent or received a message. */
39  SHM_MQ_WOULD_BLOCK, /* Not completed; retry later. */
40  SHM_MQ_DETACHED /* Other process has detached queue. */
shm_mq_result
Definition: shm_mq.h:36

Function Documentation

◆ shm_mq_attach()

shm_mq_handle* shm_mq_attach ( shm_mq mq,
dsm_segment seg,
BackgroundWorkerHandle handle 
)

Definition at line 287 of file shm_mq.c.

References Assert, CurrentMemoryContext, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_expected_bytes, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_segment, MyProc, on_dsm_detach(), palloc(), PointerGetDatum, and shm_mq_detach_callback().

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ParallelWorkerMain(), ReinitializeParallelDSM(), and test_shm_mq_setup().

288 {
289  shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
290 
291  Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
292  mqh->mqh_queue = mq;
293  mqh->mqh_segment = seg;
294  mqh->mqh_handle = handle;
295  mqh->mqh_buffer = NULL;
296  mqh->mqh_buflen = 0;
297  mqh->mqh_consume_pending = 0;
298  mqh->mqh_partial_bytes = 0;
299  mqh->mqh_expected_bytes = 0;
300  mqh->mqh_length_word_complete = false;
301  mqh->mqh_counterparty_attached = false;
303 
304  if (seg != NULL)
306 
307  return mqh;
308 }
Size mqh_partial_bytes
Definition: shm_mq.c:137
PGPROC * MyProc
Definition: proc.c:67
#define PointerGetDatum(X)
Definition: postgres.h:562
PGPROC * mq_receiver
Definition: shm_mq.c:72
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:133
Size mqh_consume_pending
Definition: shm_mq.c:136
bool mqh_counterparty_attached
Definition: shm_mq.c:140
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1024
char * mqh_buffer
Definition: shm_mq.c:134
bool mqh_length_word_complete
Definition: shm_mq.c:139
Size mqh_buflen
Definition: shm_mq.c:135
dsm_segment * mqh_segment
Definition: shm_mq.c:132
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
Size mqh_expected_bytes
Definition: shm_mq.c:138
shm_mq * mqh_queue
Definition: shm_mq.c:131
#define Assert(condition)
Definition: c.h:680
void * palloc(Size size)
Definition: mcxt.c:835
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1221
MemoryContext mqh_context
Definition: shm_mq.c:141
PGPROC * mq_sender
Definition: shm_mq.c:73

◆ shm_mq_create()

shm_mq* shm_mq_create ( void *  address,
Size  size 
)

Definition at line 170 of file shm_mq.c.

References Assert, MAXALIGN, MAXALIGN_DOWN, shm_mq::mq_bytes_read, shm_mq::mq_bytes_written, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_ring, shm_mq::mq_ring_offset, shm_mq::mq_ring_size, shm_mq::mq_sender, offsetof, and SpinLockInit.

Referenced by ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

171 {
172  shm_mq *mq = address;
173  Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
174 
175  /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
176  size = MAXALIGN_DOWN(size);
177 
178  /* Queue size must be large enough to hold some data. */
179  Assert(size > data_offset);
180 
181  /* Initialize queue header. */
182  SpinLockInit(&mq->mq_mutex);
183  mq->mq_receiver = NULL;
184  mq->mq_sender = NULL;
185  mq->mq_bytes_read = 0;
186  mq->mq_bytes_written = 0;
187  mq->mq_ring_size = size - data_offset;
188  mq->mq_detached = false;
189  mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
190 
191  return mq;
192 }
#define SpinLockInit(lock)
Definition: spin.h:60
PGPROC * mq_receiver
Definition: shm_mq.c:72
uint64 mq_bytes_written
Definition: shm_mq.c:75
slock_t mq_mutex
Definition: shm_mq.c:71
bool mq_detached
Definition: shm_mq.c:77
#define Assert(condition)
Definition: c.h:680
size_t Size
Definition: c.h:414
#define MAXALIGN(LEN)
Definition: c.h:633
Size mq_ring_size
Definition: shm_mq.c:76
Definition: shm_mq.c:69
uint8 mq_ring_offset
Definition: shm_mq.c:78
uint64 mq_bytes_read
Definition: shm_mq.c:74
#define offsetof(type, field)
Definition: c.h:603
#define MAXALIGN_DOWN(LEN)
Definition: c.h:645
PGPROC * mq_sender
Definition: shm_mq.c:73

◆ shm_mq_detach()

void shm_mq_detach ( shm_mq_handle mqh)

Definition at line 775 of file shm_mq.c.

References cancel_on_dsm_detach(), shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_segment, pfree(), PointerGetDatum, shm_mq_detach_callback(), and shm_mq_detach_internal().

Referenced by DestroyParallelContext(), ExecParallelFinish(), HandleParallelMessage(), LaunchParallelWorkers(), mq_putmessage(), tqueueDestroyReceiver(), and tqueueShutdownReceiver().

776 {
777  /* Notify counterparty that we're outta here. */
779 
780  /* Cancel on_dsm_detach callback, if any. */
781  if (mqh->mqh_segment)
784  PointerGetDatum(mqh->mqh_queue));
785 
786  /* Release local memory associated with handle. */
787  if (mqh->mqh_buffer != NULL)
788  pfree(mqh->mqh_buffer);
789  pfree(mqh);
790 }
#define PointerGetDatum(X)
Definition: postgres.h:562
char * mqh_buffer
Definition: shm_mq.c:134
void pfree(void *pointer)
Definition: mcxt.c:936
dsm_segment * mqh_segment
Definition: shm_mq.c:132
static void shm_mq_detach_internal(shm_mq *mq)
Definition: shm_mq.c:807
shm_mq * mqh_queue
Definition: shm_mq.c:131
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1221
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1039

◆ shm_mq_get_queue()

shm_mq* shm_mq_get_queue ( shm_mq_handle mqh)

Definition at line 831 of file shm_mq.c.

References shm_mq_handle::mqh_queue.

832 {
833  return mqh->mqh_queue;
834 }
shm_mq * mqh_queue
Definition: shm_mq.c:131

◆ shm_mq_get_receiver()

PGPROC* shm_mq_get_receiver ( shm_mq )

Definition at line 237 of file shm_mq.c.

References shm_mq::mq_mutex, shm_mq::mq_receiver, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_send_bytes(), and shm_mq_wait_for_attach().

238 {
239  volatile shm_mq *vmq = mq;
240  PGPROC *receiver;
241 
242  SpinLockAcquire(&mq->mq_mutex);
243  receiver = vmq->mq_receiver;
244  SpinLockRelease(&mq->mq_mutex);
245 
246  return receiver;
247 }
PGPROC * mq_receiver
Definition: shm_mq.c:72
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
Definition: shm_mq.c:69
Definition: proc.h:95

◆ shm_mq_get_sender()

PGPROC* shm_mq_get_sender ( shm_mq )

Definition at line 253 of file shm_mq.c.

References shm_mq::mq_mutex, shm_mq::mq_sender, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_receive(), and shm_mq_wait_for_attach().

254 {
255  volatile shm_mq *vmq = mq;
256  PGPROC *sender;
257 
258  SpinLockAcquire(&mq->mq_mutex);
259  sender = vmq->mq_sender;
260  SpinLockRelease(&mq->mq_mutex);
261 
262  return sender;
263 }
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
Definition: shm_mq.c:69
Definition: proc.h:95
PGPROC * mq_sender
Definition: shm_mq.c:73

◆ shm_mq_receive()

shm_mq_result shm_mq_receive ( shm_mq_handle mqh,
Size nbytesp,
void **  datap,
bool  nowait 
)

Definition at line 522 of file shm_mq.c.

References Assert, Max, MAXALIGN, MemoryContextAlloc(), shm_mq::mq_detached, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_expected_bytes, shm_mq_handle::mqh_handle, MQH_INITIAL_BUFSIZE, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, MyProc, pfree(), shm_mq_counterparty_gone(), SHM_MQ_DETACHED, shm_mq_get_sender(), shm_mq_inc_bytes_read(), shm_mq_receive_bytes(), SHM_MQ_SUCCESS, shm_mq_wait_internal(), and SHM_MQ_WOULD_BLOCK.

Referenced by copy_messages(), HandleParallelMessages(), test_shm_mq(), test_shm_mq_pipelined(), and TupleQueueReaderNext().

523 {
524  shm_mq *mq = mqh->mqh_queue;
525  shm_mq_result res;
526  Size rb = 0;
527  Size nbytes;
528  void *rawdata;
529 
530  Assert(mq->mq_receiver == MyProc);
531 
532  /* We can't receive data until the sender has attached. */
533  if (!mqh->mqh_counterparty_attached)
534  {
535  if (nowait)
536  {
537  int counterparty_gone;
538 
539  /*
540  * We shouldn't return at this point at all unless the sender
541  * hasn't attached yet. However, the correct return value depends
542  * on whether the sender is still attached. If we first test
543  * whether the sender has ever attached and then test whether the
544  * sender has detached, there's a race condition: a sender that
545  * attaches and detaches very quickly might fool us into thinking
546  * the sender never attached at all. So, test whether our
547  * counterparty is definitively gone first, and only afterwards
548  * check whether the sender ever attached in the first place.
549  */
550  counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
551  if (shm_mq_get_sender(mq) == NULL)
552  {
553  if (counterparty_gone)
554  return SHM_MQ_DETACHED;
555  else
556  return SHM_MQ_WOULD_BLOCK;
557  }
558  }
559  else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
560  && shm_mq_get_sender(mq) == NULL)
561  {
562  mq->mq_detached = true;
563  return SHM_MQ_DETACHED;
564  }
565  mqh->mqh_counterparty_attached = true;
566  }
567 
568  /* Consume any zero-copy data from previous receive operation. */
569  if (mqh->mqh_consume_pending > 0)
570  {
572  mqh->mqh_consume_pending = 0;
573  }
574 
575  /* Try to read, or finish reading, the length word from the buffer. */
576  while (!mqh->mqh_length_word_complete)
577  {
578  /* Try to receive the message length word. */
579  Assert(mqh->mqh_partial_bytes < sizeof(Size));
580  res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes,
581  nowait, &rb, &rawdata);
582  if (res != SHM_MQ_SUCCESS)
583  return res;
584 
585  /*
586  * Hopefully, we'll receive the entire message length word at once.
587  * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
588  * multiple reads.
589  */
590  if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
591  {
592  Size needed;
593 
594  nbytes = *(Size *) rawdata;
595 
596  /* If we've already got the whole message, we're done. */
597  needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
598  if (rb >= needed)
599  {
600  /*
601  * Technically, we could consume the message length
602  * information at this point, but the extra write to shared
603  * memory wouldn't be free and in most cases we would reap no
604  * benefit.
605  */
606  mqh->mqh_consume_pending = needed;
607  *nbytesp = nbytes;
608  *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
609  return SHM_MQ_SUCCESS;
610  }
611 
612  /*
613  * We don't have the whole message, but we at least have the whole
614  * length word.
615  */
616  mqh->mqh_expected_bytes = nbytes;
617  mqh->mqh_length_word_complete = true;
618  shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size)));
619  rb -= MAXALIGN(sizeof(Size));
620  }
621  else
622  {
623  Size lengthbytes;
624 
625  /* Can't be split unless bigger than required alignment. */
626  Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
627 
628  /* Message word is split; need buffer to reassemble. */
629  if (mqh->mqh_buffer == NULL)
630  {
634  }
635  Assert(mqh->mqh_buflen >= sizeof(Size));
636 
637  /* Copy and consume partial length word. */
638  if (mqh->mqh_partial_bytes + rb > sizeof(Size))
639  lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
640  else
641  lengthbytes = rb;
642  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
643  lengthbytes);
644  mqh->mqh_partial_bytes += lengthbytes;
645  shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes));
646  rb -= lengthbytes;
647 
648  /* If we now have the whole word, we're ready to read payload. */
649  if (mqh->mqh_partial_bytes >= sizeof(Size))
650  {
651  Assert(mqh->mqh_partial_bytes == sizeof(Size));
652  mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
653  mqh->mqh_length_word_complete = true;
654  mqh->mqh_partial_bytes = 0;
655  }
656  }
657  }
658  nbytes = mqh->mqh_expected_bytes;
659 
660  if (mqh->mqh_partial_bytes == 0)
661  {
662  /*
663  * Try to obtain the whole message in a single chunk. If this works,
664  * we need not copy the data and can return a pointer directly into
665  * shared memory.
666  */
667  res = shm_mq_receive_bytes(mq, nbytes, nowait, &rb, &rawdata);
668  if (res != SHM_MQ_SUCCESS)
669  return res;
670  if (rb >= nbytes)
671  {
672  mqh->mqh_length_word_complete = false;
673  mqh->mqh_consume_pending = MAXALIGN(nbytes);
674  *nbytesp = nbytes;
675  *datap = rawdata;
676  return SHM_MQ_SUCCESS;
677  }
678 
679  /*
680  * The message has wrapped the buffer. We'll need to copy it in order
681  * to return it to the client in one chunk. First, make sure we have
682  * a large enough buffer available.
683  */
684  if (mqh->mqh_buflen < nbytes)
685  {
686  Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
687 
688  while (newbuflen < nbytes)
689  newbuflen *= 2;
690 
691  if (mqh->mqh_buffer != NULL)
692  {
693  pfree(mqh->mqh_buffer);
694  mqh->mqh_buffer = NULL;
695  mqh->mqh_buflen = 0;
696  }
697  mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
698  mqh->mqh_buflen = newbuflen;
699  }
700  }
701 
702  /* Loop until we've copied the entire message. */
703  for (;;)
704  {
705  Size still_needed;
706 
707  /* Copy as much as we can. */
708  Assert(mqh->mqh_partial_bytes + rb <= nbytes);
709  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
710  mqh->mqh_partial_bytes += rb;
711 
712  /*
713  * Update count of bytes read, with alignment padding. Note that this
714  * will never actually insert any padding except at the end of a
715  * message, because the buffer size is a multiple of MAXIMUM_ALIGNOF,
716  * and each read and write is as well.
717  */
718  Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
720 
721  /* If we got all the data, exit the loop. */
722  if (mqh->mqh_partial_bytes >= nbytes)
723  break;
724 
725  /* Wait for some more data. */
726  still_needed = nbytes - mqh->mqh_partial_bytes;
727  res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
728  if (res != SHM_MQ_SUCCESS)
729  return res;
730  if (rb > still_needed)
731  rb = still_needed;
732  }
733 
734  /* Return the complete message, and reset for next message. */
735  *nbytesp = nbytes;
736  *datap = mqh->mqh_buffer;
737  mqh->mqh_length_word_complete = false;
738  mqh->mqh_partial_bytes = 0;
739  return SHM_MQ_SUCCESS;
740 }
Size mqh_partial_bytes
Definition: shm_mq.c:137
PGPROC * MyProc
Definition: proc.c:67
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:253
PGPROC * mq_receiver
Definition: shm_mq.c:72
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:133
Size mqh_consume_pending
Definition: shm_mq.c:136
bool mqh_counterparty_attached
Definition: shm_mq.c:140
static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition: shm_mq.c:976
char * mqh_buffer
Definition: shm_mq.c:134
static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1085
bool mqh_length_word_complete
Definition: shm_mq.c:139
void pfree(void *pointer)
Definition: mcxt.c:936
static bool shm_mq_counterparty_gone(volatile shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1038
#define MQH_INITIAL_BUFSIZE
Definition: shm_mq.c:164
Size mqh_buflen
Definition: shm_mq.c:135
Size mqh_expected_bytes
Definition: shm_mq.c:138
shm_mq * mqh_queue
Definition: shm_mq.c:131
bool mq_detached
Definition: shm_mq.c:77
shm_mq_result
Definition: shm_mq.h:36
#define Max(x, y)
Definition: c.h:806
#define Assert(condition)
Definition: c.h:680
static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n)
Definition: shm_mq.c:1156
size_t Size
Definition: c.h:414
#define MAXALIGN(LEN)
Definition: c.h:633
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:693
Definition: shm_mq.c:69
MemoryContext mqh_context
Definition: shm_mq.c:141
PGPROC * mq_sender
Definition: shm_mq.c:73

◆ shm_mq_send()

shm_mq_result shm_mq_send ( shm_mq_handle mqh,
Size  nbytes,
const void *  data,
bool  nowait 
)

Definition at line 325 of file shm_mq.c.

References shm_mq_iovec::data, shm_mq_iovec::len, and shm_mq_sendv().

Referenced by copy_messages(), test_shm_mq(), test_shm_mq_pipelined(), and tqueueReceiveSlot().

326 {
327  shm_mq_iovec iov;
328 
329  iov.data = data;
330  iov.len = nbytes;
331 
332  return shm_mq_sendv(mqh, &iov, 1, nowait);
333 }
Size len
Definition: shm_mq.h:32
const char * data
Definition: shm_mq.h:31
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
Definition: shm_mq.c:351

◆ shm_mq_sendv()

shm_mq_result shm_mq_sendv ( shm_mq_handle mqh,
shm_mq_iovec iov,
int  iovcnt,
bool  nowait 
)

Definition at line 351 of file shm_mq.c.

References Assert, shm_mq_iovec::data, i, shm_mq_iovec::len, MAXALIGN_DOWN, shm_mq::mq_sender, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, MyProc, SHM_MQ_DETACHED, shm_mq_notify_receiver(), shm_mq_send_bytes(), SHM_MQ_SUCCESS, and tmpbuf.

Referenced by mq_putmessage(), and shm_mq_send().

352 {
353  shm_mq_result res;
354  shm_mq *mq = mqh->mqh_queue;
355  Size nbytes = 0;
356  Size bytes_written;
357  int i;
358  int which_iov = 0;
359  Size offset;
360 
361  Assert(mq->mq_sender == MyProc);
362 
363  /* Compute total size of write. */
364  for (i = 0; i < iovcnt; ++i)
365  nbytes += iov[i].len;
366 
367  /* Try to write, or finish writing, the length word into the buffer. */
368  while (!mqh->mqh_length_word_complete)
369  {
370  Assert(mqh->mqh_partial_bytes < sizeof(Size));
371  res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
372  ((char *) &nbytes) + mqh->mqh_partial_bytes,
373  nowait, &bytes_written);
374 
375  if (res == SHM_MQ_DETACHED)
376  {
377  /* Reset state in case caller tries to send another message. */
378  mqh->mqh_partial_bytes = 0;
379  mqh->mqh_length_word_complete = false;
380  return res;
381  }
382  mqh->mqh_partial_bytes += bytes_written;
383 
384  if (mqh->mqh_partial_bytes >= sizeof(Size))
385  {
386  Assert(mqh->mqh_partial_bytes == sizeof(Size));
387 
388  mqh->mqh_partial_bytes = 0;
389  mqh->mqh_length_word_complete = true;
390  }
391 
392  if (res != SHM_MQ_SUCCESS)
393  return res;
394 
395  /* Length word can't be split unless bigger than required alignment. */
396  Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
397  }
398 
399  /* Write the actual data bytes into the buffer. */
400  Assert(mqh->mqh_partial_bytes <= nbytes);
401  offset = mqh->mqh_partial_bytes;
402  do
403  {
404  Size chunksize;
405 
406  /* Figure out which bytes need to be sent next. */
407  if (offset >= iov[which_iov].len)
408  {
409  offset -= iov[which_iov].len;
410  ++which_iov;
411  if (which_iov >= iovcnt)
412  break;
413  continue;
414  }
415 
416  /*
417  * We want to avoid copying the data if at all possible, but every
418  * chunk of bytes we write into the queue has to be MAXALIGN'd, except
419  * the last. Thus, if a chunk other than the last one ends on a
420  * non-MAXALIGN'd boundary, we have to combine the tail end of its
421  * data with data from one or more following chunks until we either
422  * reach the last chunk or accumulate a number of bytes which is
423  * MAXALIGN'd.
424  */
425  if (which_iov + 1 < iovcnt &&
426  offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
427  {
428  char tmpbuf[MAXIMUM_ALIGNOF];
429  int j = 0;
430 
431  for (;;)
432  {
433  if (offset < iov[which_iov].len)
434  {
435  tmpbuf[j] = iov[which_iov].data[offset];
436  j++;
437  offset++;
438  if (j == MAXIMUM_ALIGNOF)
439  break;
440  }
441  else
442  {
443  offset -= iov[which_iov].len;
444  which_iov++;
445  if (which_iov >= iovcnt)
446  break;
447  }
448  }
449 
450  res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
451 
452  if (res == SHM_MQ_DETACHED)
453  {
454  /* Reset state in case caller tries to send another message. */
455  mqh->mqh_partial_bytes = 0;
456  mqh->mqh_length_word_complete = false;
457  return res;
458  }
459 
460  mqh->mqh_partial_bytes += bytes_written;
461  if (res != SHM_MQ_SUCCESS)
462  return res;
463  continue;
464  }
465 
466  /*
467  * If this is the last chunk, we can write all the data, even if it
468  * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
469  * MAXALIGN_DOWN the write size.
470  */
471  chunksize = iov[which_iov].len - offset;
472  if (which_iov + 1 < iovcnt)
473  chunksize = MAXALIGN_DOWN(chunksize);
474  res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
475  nowait, &bytes_written);
476 
477  if (res == SHM_MQ_DETACHED)
478  {
479  /* Reset state in case caller tries to send another message. */
480  mqh->mqh_length_word_complete = false;
481  mqh->mqh_partial_bytes = 0;
482  return res;
483  }
484 
485  mqh->mqh_partial_bytes += bytes_written;
486  offset += bytes_written;
487  if (res != SHM_MQ_SUCCESS)
488  return res;
489  } while (mqh->mqh_partial_bytes < nbytes);
490 
491  /* Reset for next message. */
492  mqh->mqh_partial_bytes = 0;
493  mqh->mqh_length_word_complete = false;
494 
495  /* Notify receiver of the newly-written data, and return. */
496  return shm_mq_notify_receiver(mq);
497 }
Size mqh_partial_bytes
Definition: shm_mq.c:137
PGPROC * MyProc
Definition: proc.c:67
static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq)
Definition: shm_mq.c:1202
Size len
Definition: shm_mq.h:32
bool mqh_length_word_complete
Definition: shm_mq.c:139
const char * data
Definition: shm_mq.h:31
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition: shm_mq.c:840
shm_mq * mqh_queue
Definition: shm_mq.c:131
shm_mq_result
Definition: shm_mq.h:36
#define Assert(condition)
Definition: c.h:680
size_t Size
Definition: c.h:414
static StringInfoData tmpbuf
Definition: walsender.c:162
int i
Definition: shm_mq.c:69
#define MAXALIGN_DOWN(LEN)
Definition: c.h:645
PGPROC * mq_sender
Definition: shm_mq.c:73

◆ shm_mq_set_handle()

void shm_mq_set_handle ( shm_mq_handle ,
BackgroundWorkerHandle  
)

Definition at line 315 of file shm_mq.c.

References Assert, and shm_mq_handle::mqh_handle.

Referenced by ExecParallelCreateReaders(), and LaunchParallelWorkers().

316 {
317  Assert(mqh->mqh_handle == NULL);
318  mqh->mqh_handle = handle;
319 }
#define Assert(condition)
Definition: c.h:680

◆ shm_mq_set_receiver()

void shm_mq_set_receiver ( shm_mq mq,
PGPROC  
)

Definition at line 199 of file shm_mq.c.

References Assert, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by attach_to_queues(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

200 {
201  volatile shm_mq *vmq = mq;
202  PGPROC *sender;
203 
205  Assert(vmq->mq_receiver == NULL);
206  vmq->mq_receiver = proc;
207  sender = vmq->mq_sender;
209 
210  if (sender != NULL)
211  SetLatch(&sender->procLatch);
212 }
PGPROC * mq_receiver
Definition: shm_mq.c:72
Latch procLatch
Definition: proc.h:104
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:71
#define SpinLockRelease(lock)
Definition: spin.h:64
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
#define Assert(condition)
Definition: c.h:680
Definition: shm_mq.c:69
Definition: proc.h:95
PGPROC * mq_sender
Definition: shm_mq.c:73

◆ shm_mq_set_sender()

void shm_mq_set_sender ( shm_mq mq,
PGPROC  
)

Definition at line 218 of file shm_mq.c.

References Assert, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ParallelWorkerMain(), and setup_dynamic_shared_memory().

219 {
220  volatile shm_mq *vmq = mq;
221  PGPROC *receiver;
222 
224  Assert(vmq->mq_sender == NULL);
225  vmq->mq_sender = proc;
226  receiver = vmq->mq_receiver;
228 
229  if (receiver != NULL)
230  SetLatch(&receiver->procLatch);
231 }
PGPROC * mq_receiver
Definition: shm_mq.c:72
Latch procLatch
Definition: proc.h:104
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:71
#define SpinLockRelease(lock)
Definition: spin.h:64
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
#define Assert(condition)
Definition: c.h:680
Definition: shm_mq.c:69
Definition: proc.h:95
PGPROC * mq_sender
Definition: shm_mq.c:73

◆ shm_mq_wait_for_attach()

shm_mq_result shm_mq_wait_for_attach ( shm_mq_handle mqh)

Definition at line 752 of file shm_mq.c.

References Assert, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_queue, MyProc, SHM_MQ_DETACHED, shm_mq_get_receiver(), shm_mq_get_sender(), SHM_MQ_SUCCESS, and shm_mq_wait_internal().

753 {
754  shm_mq *mq = mqh->mqh_queue;
755  PGPROC **victim;
756 
757  if (shm_mq_get_receiver(mq) == MyProc)
758  victim = &mq->mq_sender;
759  else
760  {
762  victim = &mq->mq_receiver;
763  }
764 
765  if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
766  return SHM_MQ_SUCCESS;
767  else
768  return SHM_MQ_DETACHED;
769 }
PGPROC * MyProc
Definition: proc.c:67
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:253
PGPROC * mq_receiver
Definition: shm_mq.c:72
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:237
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:133
static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1085
shm_mq * mqh_queue
Definition: shm_mq.c:131
#define Assert(condition)
Definition: c.h:680
Definition: shm_mq.c:69
Definition: proc.h:95
PGPROC * mq_sender
Definition: shm_mq.c:73

Variable Documentation

◆ shm_mq_minimum_size

PGDLLIMPORT const Size shm_mq_minimum_size

Definition at line 161 of file shm_mq.c.

Referenced by setup_dynamic_shared_memory().