PostgreSQL Source Code  git master
shm_mq.h File Reference
#include "postmaster/bgworker.h"
#include "storage/dsm.h"
#include "storage/proc.h"
Include dependency graph for shm_mq.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  shm_mq_iovec
 

Typedefs

typedef struct shm_mq shm_mq
 
typedef struct shm_mq_handle shm_mq_handle
 

Enumerations

enum  shm_mq_result { SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, SHM_MQ_DETACHED }
 

Functions

shm_mqshm_mq_create (void *address, Size size)
 
void shm_mq_set_receiver (shm_mq *mq, PGPROC *)
 
void shm_mq_set_sender (shm_mq *mq, PGPROC *)
 
PGPROCshm_mq_get_receiver (shm_mq *)
 
PGPROCshm_mq_get_sender (shm_mq *)
 
shm_mq_handleshm_mq_attach (shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 
void shm_mq_set_handle (shm_mq_handle *, BackgroundWorkerHandle *)
 
void shm_mq_detach (shm_mq_handle *mqh)
 
shm_mqshm_mq_get_queue (shm_mq_handle *mqh)
 
shm_mq_result shm_mq_send (shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
 
shm_mq_result shm_mq_sendv (shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
 
shm_mq_result shm_mq_receive (shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 
shm_mq_result shm_mq_wait_for_attach (shm_mq_handle *mqh)
 

Variables

PGDLLIMPORT const Size shm_mq_minimum_size
 

Typedef Documentation

◆ shm_mq

Definition at line 22 of file shm_mq.h.

◆ shm_mq_handle

Definition at line 26 of file shm_mq.h.

Enumeration Type Documentation

◆ shm_mq_result

Enumerator
SHM_MQ_SUCCESS 
SHM_MQ_WOULD_BLOCK 
SHM_MQ_DETACHED 

Definition at line 36 of file shm_mq.h.

37 {
38  SHM_MQ_SUCCESS, /* Sent or received a message. */
39  SHM_MQ_WOULD_BLOCK, /* Not completed; retry later. */
40  SHM_MQ_DETACHED /* Other process has detached queue. */
shm_mq_result
Definition: shm_mq.h:36

Function Documentation

◆ shm_mq_attach()

shm_mq_handle* shm_mq_attach ( shm_mq mq,
dsm_segment seg,
BackgroundWorkerHandle handle 
)

Definition at line 282 of file shm_mq.c.

References Assert, CurrentMemoryContext, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_expected_bytes, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_segment, MyProc, on_dsm_detach(), palloc(), PointerGetDatum, and shm_mq_detach_callback().

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ParallelWorkerMain(), ReinitializeParallelDSM(), and test_shm_mq_setup().

283 {
284  shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
285 
286  Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
287  mqh->mqh_queue = mq;
288  mqh->mqh_segment = seg;
289  mqh->mqh_handle = handle;
290  mqh->mqh_buffer = NULL;
291  mqh->mqh_buflen = 0;
292  mqh->mqh_consume_pending = 0;
293  mqh->mqh_partial_bytes = 0;
294  mqh->mqh_expected_bytes = 0;
295  mqh->mqh_length_word_complete = false;
296  mqh->mqh_counterparty_attached = false;
298 
299  if (seg != NULL)
301 
302  return mqh;
303 }
Size mqh_partial_bytes
Definition: shm_mq.c:138
PGPROC * MyProc
Definition: proc.c:67
#define PointerGetDatum(X)
Definition: postgres.h:541
PGPROC * mq_receiver
Definition: shm_mq.c:73
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:134
Size mqh_consume_pending
Definition: shm_mq.c:137
bool mqh_counterparty_attached
Definition: shm_mq.c:141
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1024
char * mqh_buffer
Definition: shm_mq.c:135
bool mqh_length_word_complete
Definition: shm_mq.c:140
Size mqh_buflen
Definition: shm_mq.c:136
dsm_segment * mqh_segment
Definition: shm_mq.c:133
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
Size mqh_expected_bytes
Definition: shm_mq.c:139
shm_mq * mqh_queue
Definition: shm_mq.c:132
#define Assert(condition)
Definition: c.h:699
void * palloc(Size size)
Definition: mcxt.c:924
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1253
MemoryContext mqh_context
Definition: shm_mq.c:142
PGPROC * mq_sender
Definition: shm_mq.c:74

◆ shm_mq_create()

shm_mq* shm_mq_create ( void *  address,
Size  size 
)

Definition at line 169 of file shm_mq.c.

References Assert, MAXALIGN, MAXALIGN_DOWN, shm_mq::mq_bytes_read, shm_mq::mq_bytes_written, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_ring, shm_mq::mq_ring_offset, shm_mq::mq_ring_size, shm_mq::mq_sender, offsetof, pg_atomic_init_u64(), and SpinLockInit.

Referenced by ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

170 {
171  shm_mq *mq = address;
172  Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
173 
174  /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
175  size = MAXALIGN_DOWN(size);
176 
177  /* Queue size must be large enough to hold some data. */
178  Assert(size > data_offset);
179 
180  /* Initialize queue header. */
181  SpinLockInit(&mq->mq_mutex);
182  mq->mq_receiver = NULL;
183  mq->mq_sender = NULL;
186  mq->mq_ring_size = size - data_offset;
187  mq->mq_detached = false;
188  mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
189 
190  return mq;
191 }
#define SpinLockInit(lock)
Definition: spin.h:60
PGPROC * mq_receiver
Definition: shm_mq.c:73
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:418
slock_t mq_mutex
Definition: shm_mq.c:72
bool mq_detached
Definition: shm_mq.c:78
#define Assert(condition)
Definition: c.h:699
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:75
size_t Size
Definition: c.h:433
#define MAXALIGN(LEN)
Definition: c.h:652
Size mq_ring_size
Definition: shm_mq.c:77
Definition: shm_mq.c:70
uint8 mq_ring_offset
Definition: shm_mq.c:79
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:76
#define offsetof(type, field)
Definition: c.h:622
#define MAXALIGN_DOWN(LEN)
Definition: c.h:664
PGPROC * mq_sender
Definition: shm_mq.c:74

◆ shm_mq_detach()

void shm_mq_detach ( shm_mq_handle mqh)

Definition at line 793 of file shm_mq.c.

References cancel_on_dsm_detach(), shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_segment, pfree(), PointerGetDatum, shm_mq_detach_callback(), and shm_mq_detach_internal().

Referenced by DestroyParallelContext(), ExecParallelFinish(), HandleParallelMessage(), LaunchParallelWorkers(), mq_putmessage(), tqueueDestroyReceiver(), and tqueueShutdownReceiver().

794 {
795  /* Notify counterparty that we're outta here. */
797 
798  /* Cancel on_dsm_detach callback, if any. */
799  if (mqh->mqh_segment)
802  PointerGetDatum(mqh->mqh_queue));
803 
804  /* Release local memory associated with handle. */
805  if (mqh->mqh_buffer != NULL)
806  pfree(mqh->mqh_buffer);
807  pfree(mqh);
808 }
#define PointerGetDatum(X)
Definition: postgres.h:541
char * mqh_buffer
Definition: shm_mq.c:135
void pfree(void *pointer)
Definition: mcxt.c:1031
dsm_segment * mqh_segment
Definition: shm_mq.c:133
static void shm_mq_detach_internal(shm_mq *mq)
Definition: shm_mq.c:825
shm_mq * mqh_queue
Definition: shm_mq.c:132
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1253
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1039

◆ shm_mq_get_queue()

shm_mq* shm_mq_get_queue ( shm_mq_handle mqh)

Definition at line 848 of file shm_mq.c.

References shm_mq_handle::mqh_queue.

Referenced by WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

849 {
850  return mqh->mqh_queue;
851 }
shm_mq * mqh_queue
Definition: shm_mq.c:132

◆ shm_mq_get_receiver()

PGPROC* shm_mq_get_receiver ( shm_mq )

Definition at line 234 of file shm_mq.c.

References shm_mq::mq_mutex, shm_mq::mq_receiver, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_send_bytes(), and shm_mq_wait_for_attach().

235 {
236  PGPROC *receiver;
237 
238  SpinLockAcquire(&mq->mq_mutex);
239  receiver = mq->mq_receiver;
240  SpinLockRelease(&mq->mq_mutex);
241 
242  return receiver;
243 }
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
Definition: proc.h:95

◆ shm_mq_get_sender()

PGPROC* shm_mq_get_sender ( shm_mq )

Definition at line 249 of file shm_mq.c.

References shm_mq::mq_mutex, shm_mq::mq_sender, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_receive(), shm_mq_wait_for_attach(), WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

250 {
251  PGPROC *sender;
252 
253  SpinLockAcquire(&mq->mq_mutex);
254  sender = mq->mq_sender;
255  SpinLockRelease(&mq->mq_mutex);
256 
257  return sender;
258 }
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
Definition: proc.h:95

◆ shm_mq_receive()

shm_mq_result shm_mq_receive ( shm_mq_handle mqh,
Size nbytesp,
void **  datap,
bool  nowait 
)

Definition at line 540 of file shm_mq.c.

References Assert, Max, MAXALIGN, MemoryContextAlloc(), shm_mq::mq_detached, shm_mq::mq_receiver, shm_mq::mq_ring_size, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_expected_bytes, shm_mq_handle::mqh_handle, MQH_INITIAL_BUFSIZE, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, MyProc, pfree(), shm_mq_counterparty_gone(), SHM_MQ_DETACHED, shm_mq_get_sender(), shm_mq_inc_bytes_read(), shm_mq_receive_bytes(), SHM_MQ_SUCCESS, shm_mq_wait_internal(), and SHM_MQ_WOULD_BLOCK.

Referenced by copy_messages(), HandleParallelMessages(), test_shm_mq(), test_shm_mq_pipelined(), and TupleQueueReaderNext().

541 {
542  shm_mq *mq = mqh->mqh_queue;
543  shm_mq_result res;
544  Size rb = 0;
545  Size nbytes;
546  void *rawdata;
547 
548  Assert(mq->mq_receiver == MyProc);
549 
550  /* We can't receive data until the sender has attached. */
551  if (!mqh->mqh_counterparty_attached)
552  {
553  if (nowait)
554  {
555  int counterparty_gone;
556 
557  /*
558  * We shouldn't return at this point at all unless the sender
559  * hasn't attached yet. However, the correct return value depends
560  * on whether the sender is still attached. If we first test
561  * whether the sender has ever attached and then test whether the
562  * sender has detached, there's a race condition: a sender that
563  * attaches and detaches very quickly might fool us into thinking
564  * the sender never attached at all. So, test whether our
565  * counterparty is definitively gone first, and only afterwards
566  * check whether the sender ever attached in the first place.
567  */
568  counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
569  if (shm_mq_get_sender(mq) == NULL)
570  {
571  if (counterparty_gone)
572  return SHM_MQ_DETACHED;
573  else
574  return SHM_MQ_WOULD_BLOCK;
575  }
576  }
577  else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
578  && shm_mq_get_sender(mq) == NULL)
579  {
580  mq->mq_detached = true;
581  return SHM_MQ_DETACHED;
582  }
583  mqh->mqh_counterparty_attached = true;
584  }
585 
586  /*
587  * If we've consumed an amount of data greater than 1/4th of the ring
588  * size, mark it consumed in shared memory. We try to avoid doing this
589  * unnecessarily when only a small amount of data has been consumed,
590  * because SetLatch() is fairly expensive and we don't want to do it too
591  * often.
592  */
593  if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
594  {
596  mqh->mqh_consume_pending = 0;
597  }
598 
599  /* Try to read, or finish reading, the length word from the buffer. */
600  while (!mqh->mqh_length_word_complete)
601  {
602  /* Try to receive the message length word. */
603  Assert(mqh->mqh_partial_bytes < sizeof(Size));
604  res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
605  nowait, &rb, &rawdata);
606  if (res != SHM_MQ_SUCCESS)
607  return res;
608 
609  /*
610  * Hopefully, we'll receive the entire message length word at once.
611  * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
612  * multiple reads.
613  */
614  if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
615  {
616  Size needed;
617 
618  nbytes = *(Size *) rawdata;
619 
620  /* If we've already got the whole message, we're done. */
621  needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
622  if (rb >= needed)
623  {
624  mqh->mqh_consume_pending += needed;
625  *nbytesp = nbytes;
626  *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
627  return SHM_MQ_SUCCESS;
628  }
629 
630  /*
631  * We don't have the whole message, but we at least have the whole
632  * length word.
633  */
634  mqh->mqh_expected_bytes = nbytes;
635  mqh->mqh_length_word_complete = true;
636  mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
637  rb -= MAXALIGN(sizeof(Size));
638  }
639  else
640  {
641  Size lengthbytes;
642 
643  /* Can't be split unless bigger than required alignment. */
644  Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
645 
646  /* Message word is split; need buffer to reassemble. */
647  if (mqh->mqh_buffer == NULL)
648  {
652  }
653  Assert(mqh->mqh_buflen >= sizeof(Size));
654 
655  /* Copy partial length word; remember to consume it. */
656  if (mqh->mqh_partial_bytes + rb > sizeof(Size))
657  lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
658  else
659  lengthbytes = rb;
660  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
661  lengthbytes);
662  mqh->mqh_partial_bytes += lengthbytes;
663  mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
664  rb -= lengthbytes;
665 
666  /* If we now have the whole word, we're ready to read payload. */
667  if (mqh->mqh_partial_bytes >= sizeof(Size))
668  {
669  Assert(mqh->mqh_partial_bytes == sizeof(Size));
670  mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
671  mqh->mqh_length_word_complete = true;
672  mqh->mqh_partial_bytes = 0;
673  }
674  }
675  }
676  nbytes = mqh->mqh_expected_bytes;
677 
678  if (mqh->mqh_partial_bytes == 0)
679  {
680  /*
681  * Try to obtain the whole message in a single chunk. If this works,
682  * we need not copy the data and can return a pointer directly into
683  * shared memory.
684  */
685  res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
686  if (res != SHM_MQ_SUCCESS)
687  return res;
688  if (rb >= nbytes)
689  {
690  mqh->mqh_length_word_complete = false;
691  mqh->mqh_consume_pending += MAXALIGN(nbytes);
692  *nbytesp = nbytes;
693  *datap = rawdata;
694  return SHM_MQ_SUCCESS;
695  }
696 
697  /*
698  * The message has wrapped the buffer. We'll need to copy it in order
699  * to return it to the client in one chunk. First, make sure we have
700  * a large enough buffer available.
701  */
702  if (mqh->mqh_buflen < nbytes)
703  {
704  Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
705 
706  while (newbuflen < nbytes)
707  newbuflen *= 2;
708 
709  if (mqh->mqh_buffer != NULL)
710  {
711  pfree(mqh->mqh_buffer);
712  mqh->mqh_buffer = NULL;
713  mqh->mqh_buflen = 0;
714  }
715  mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
716  mqh->mqh_buflen = newbuflen;
717  }
718  }
719 
720  /* Loop until we've copied the entire message. */
721  for (;;)
722  {
723  Size still_needed;
724 
725  /* Copy as much as we can. */
726  Assert(mqh->mqh_partial_bytes + rb <= nbytes);
727  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
728  mqh->mqh_partial_bytes += rb;
729 
730  /*
731  * Update count of bytes that can be consumed, accounting for
732  * alignment padding. Note that this will never actually insert any
733  * padding except at the end of a message, because the buffer size is
734  * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
735  */
736  Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
737  mqh->mqh_consume_pending += MAXALIGN(rb);
738 
739  /* If we got all the data, exit the loop. */
740  if (mqh->mqh_partial_bytes >= nbytes)
741  break;
742 
743  /* Wait for some more data. */
744  still_needed = nbytes - mqh->mqh_partial_bytes;
745  res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
746  if (res != SHM_MQ_SUCCESS)
747  return res;
748  if (rb > still_needed)
749  rb = still_needed;
750  }
751 
752  /* Return the complete message, and reset for next message. */
753  *nbytesp = nbytes;
754  *datap = mqh->mqh_buffer;
755  mqh->mqh_length_word_complete = false;
756  mqh->mqh_partial_bytes = 0;
757  return SHM_MQ_SUCCESS;
758 }
Size mqh_partial_bytes
Definition: shm_mq.c:138
PGPROC * MyProc
Definition: proc.c:67
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:249
PGPROC * mq_receiver
Definition: shm_mq.c:73
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:134
Size mqh_consume_pending
Definition: shm_mq.c:137
bool mqh_counterparty_attached
Definition: shm_mq.c:141
char * mqh_buffer
Definition: shm_mq.c:135
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1149
bool mqh_length_word_complete
Definition: shm_mq.c:140
void pfree(void *pointer)
Definition: mcxt.c:1031
#define MQH_INITIAL_BUFSIZE
Definition: shm_mq.c:163
Size mqh_buflen
Definition: shm_mq.c:136
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition: shm_mq.c:1011
Size mqh_expected_bytes
Definition: shm_mq.c:139
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
Definition: shm_mq.c:1200
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1110
shm_mq * mqh_queue
Definition: shm_mq.c:132
bool mq_detached
Definition: shm_mq.c:78
shm_mq_result
Definition: shm_mq.h:36
#define Max(x, y)
Definition: c.h:851
#define Assert(condition)
Definition: c.h:699
size_t Size
Definition: c.h:433
#define MAXALIGN(LEN)
Definition: c.h:652
Size mq_ring_size
Definition: shm_mq.c:77
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:771
Definition: shm_mq.c:70
MemoryContext mqh_context
Definition: shm_mq.c:142
PGPROC * mq_sender
Definition: shm_mq.c:74

◆ shm_mq_send()

shm_mq_result shm_mq_send ( shm_mq_handle mqh,
Size  nbytes,
const void *  data,
bool  nowait 
)

Definition at line 320 of file shm_mq.c.

References shm_mq_iovec::data, shm_mq_iovec::len, and shm_mq_sendv().

Referenced by copy_messages(), test_shm_mq(), test_shm_mq_pipelined(), and tqueueReceiveSlot().

321 {
322  shm_mq_iovec iov;
323 
324  iov.data = data;
325  iov.len = nbytes;
326 
327  return shm_mq_sendv(mqh, &iov, 1, nowait);
328 }
Size len
Definition: shm_mq.h:32
const char * data
Definition: shm_mq.h:31
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
Definition: shm_mq.c:346

◆ shm_mq_sendv()

shm_mq_result shm_mq_sendv ( shm_mq_handle mqh,
shm_mq_iovec iov,
int  iovcnt,
bool  nowait 
)

Definition at line 346 of file shm_mq.c.

References Assert, shm_mq_iovec::data, i, shm_mq_iovec::len, MAXALIGN_DOWN, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, MyProc, PGPROC::procLatch, SetLatch(), SHM_MQ_DETACHED, shm_mq_send_bytes(), SHM_MQ_SUCCESS, SpinLockAcquire, SpinLockRelease, and tmpbuf.

Referenced by mq_putmessage(), and shm_mq_send().

347 {
348  shm_mq_result res;
349  shm_mq *mq = mqh->mqh_queue;
350  PGPROC *receiver;
351  Size nbytes = 0;
352  Size bytes_written;
353  int i;
354  int which_iov = 0;
355  Size offset;
356 
357  Assert(mq->mq_sender == MyProc);
358 
359  /* Compute total size of write. */
360  for (i = 0; i < iovcnt; ++i)
361  nbytes += iov[i].len;
362 
363  /* Try to write, or finish writing, the length word into the buffer. */
364  while (!mqh->mqh_length_word_complete)
365  {
366  Assert(mqh->mqh_partial_bytes < sizeof(Size));
367  res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
368  ((char *) &nbytes) + mqh->mqh_partial_bytes,
369  nowait, &bytes_written);
370 
371  if (res == SHM_MQ_DETACHED)
372  {
373  /* Reset state in case caller tries to send another message. */
374  mqh->mqh_partial_bytes = 0;
375  mqh->mqh_length_word_complete = false;
376  return res;
377  }
378  mqh->mqh_partial_bytes += bytes_written;
379 
380  if (mqh->mqh_partial_bytes >= sizeof(Size))
381  {
382  Assert(mqh->mqh_partial_bytes == sizeof(Size));
383 
384  mqh->mqh_partial_bytes = 0;
385  mqh->mqh_length_word_complete = true;
386  }
387 
388  if (res != SHM_MQ_SUCCESS)
389  return res;
390 
391  /* Length word can't be split unless bigger than required alignment. */
392  Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
393  }
394 
395  /* Write the actual data bytes into the buffer. */
396  Assert(mqh->mqh_partial_bytes <= nbytes);
397  offset = mqh->mqh_partial_bytes;
398  do
399  {
400  Size chunksize;
401 
402  /* Figure out which bytes need to be sent next. */
403  if (offset >= iov[which_iov].len)
404  {
405  offset -= iov[which_iov].len;
406  ++which_iov;
407  if (which_iov >= iovcnt)
408  break;
409  continue;
410  }
411 
412  /*
413  * We want to avoid copying the data if at all possible, but every
414  * chunk of bytes we write into the queue has to be MAXALIGN'd, except
415  * the last. Thus, if a chunk other than the last one ends on a
416  * non-MAXALIGN'd boundary, we have to combine the tail end of its
417  * data with data from one or more following chunks until we either
418  * reach the last chunk or accumulate a number of bytes which is
419  * MAXALIGN'd.
420  */
421  if (which_iov + 1 < iovcnt &&
422  offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
423  {
424  char tmpbuf[MAXIMUM_ALIGNOF];
425  int j = 0;
426 
427  for (;;)
428  {
429  if (offset < iov[which_iov].len)
430  {
431  tmpbuf[j] = iov[which_iov].data[offset];
432  j++;
433  offset++;
434  if (j == MAXIMUM_ALIGNOF)
435  break;
436  }
437  else
438  {
439  offset -= iov[which_iov].len;
440  which_iov++;
441  if (which_iov >= iovcnt)
442  break;
443  }
444  }
445 
446  res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
447 
448  if (res == SHM_MQ_DETACHED)
449  {
450  /* Reset state in case caller tries to send another message. */
451  mqh->mqh_partial_bytes = 0;
452  mqh->mqh_length_word_complete = false;
453  return res;
454  }
455 
456  mqh->mqh_partial_bytes += bytes_written;
457  if (res != SHM_MQ_SUCCESS)
458  return res;
459  continue;
460  }
461 
462  /*
463  * If this is the last chunk, we can write all the data, even if it
464  * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
465  * MAXALIGN_DOWN the write size.
466  */
467  chunksize = iov[which_iov].len - offset;
468  if (which_iov + 1 < iovcnt)
469  chunksize = MAXALIGN_DOWN(chunksize);
470  res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
471  nowait, &bytes_written);
472 
473  if (res == SHM_MQ_DETACHED)
474  {
475  /* Reset state in case caller tries to send another message. */
476  mqh->mqh_length_word_complete = false;
477  mqh->mqh_partial_bytes = 0;
478  return res;
479  }
480 
481  mqh->mqh_partial_bytes += bytes_written;
482  offset += bytes_written;
483  if (res != SHM_MQ_SUCCESS)
484  return res;
485  } while (mqh->mqh_partial_bytes < nbytes);
486 
487  /* Reset for next message. */
488  mqh->mqh_partial_bytes = 0;
489  mqh->mqh_length_word_complete = false;
490 
491  /* If queue has been detached, let caller know. */
492  if (mq->mq_detached)
493  return SHM_MQ_DETACHED;
494 
495  /*
496  * If the counterparty is known to have attached, we can read mq_receiver
497  * without acquiring the spinlock and assume it isn't NULL. Otherwise,
498  * more caution is needed.
499  */
500  if (mqh->mqh_counterparty_attached)
501  receiver = mq->mq_receiver;
502  else
503  {
505  receiver = mq->mq_receiver;
507  if (receiver == NULL)
508  return SHM_MQ_SUCCESS;
509  mqh->mqh_counterparty_attached = true;
510  }
511 
512  /* Notify receiver of the newly-written data, and return. */
513  SetLatch(&receiver->procLatch);
514  return SHM_MQ_SUCCESS;
515 }
Size mqh_partial_bytes
Definition: shm_mq.c:138
PGPROC * MyProc
Definition: proc.c:67
PGPROC * mq_receiver
Definition: shm_mq.c:73
bool mqh_counterparty_attached
Definition: shm_mq.c:141
Latch procLatch
Definition: proc.h:104
Size len
Definition: shm_mq.h:32
#define SpinLockAcquire(lock)
Definition: spin.h:62
bool mqh_length_word_complete
Definition: shm_mq.c:140
const char * data
Definition: shm_mq.h:31
slock_t mq_mutex
Definition: shm_mq.c:72
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition: shm_mq.c:857
#define SpinLockRelease(lock)
Definition: spin.h:64
shm_mq * mqh_queue
Definition: shm_mq.c:132
bool mq_detached
Definition: shm_mq.c:78
shm_mq_result
Definition: shm_mq.h:36
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
#define Assert(condition)
Definition: c.h:699
size_t Size
Definition: c.h:433
static StringInfoData tmpbuf
Definition: walsender.c:163
int i
Definition: shm_mq.c:70
Definition: proc.h:95
#define MAXALIGN_DOWN(LEN)
Definition: c.h:664
PGPROC * mq_sender
Definition: shm_mq.c:74

◆ shm_mq_set_handle()

void shm_mq_set_handle ( shm_mq_handle ,
BackgroundWorkerHandle  
)

Definition at line 310 of file shm_mq.c.

References Assert, and shm_mq_handle::mqh_handle.

Referenced by ExecParallelCreateReaders(), and LaunchParallelWorkers().

311 {
312  Assert(mqh->mqh_handle == NULL);
313  mqh->mqh_handle = handle;
314 }
#define Assert(condition)
Definition: c.h:699

◆ shm_mq_set_receiver()

void shm_mq_set_receiver ( shm_mq mq,
PGPROC  
)

Definition at line 198 of file shm_mq.c.

References Assert, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by attach_to_queues(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

199 {
200  PGPROC *sender;
201 
203  Assert(mq->mq_receiver == NULL);
204  mq->mq_receiver = proc;
205  sender = mq->mq_sender;
207 
208  if (sender != NULL)
209  SetLatch(&sender->procLatch);
210 }
PGPROC * mq_receiver
Definition: shm_mq.c:73
Latch procLatch
Definition: proc.h:104
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:72
#define SpinLockRelease(lock)
Definition: spin.h:64
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
#define Assert(condition)
Definition: c.h:699
Definition: proc.h:95
PGPROC * mq_sender
Definition: shm_mq.c:74

◆ shm_mq_set_sender()

void shm_mq_set_sender ( shm_mq mq,
PGPROC  
)

Definition at line 216 of file shm_mq.c.

References Assert, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ParallelWorkerMain(), and setup_dynamic_shared_memory().

217 {
218  PGPROC *receiver;
219 
221  Assert(mq->mq_sender == NULL);
222  mq->mq_sender = proc;
223  receiver = mq->mq_receiver;
225 
226  if (receiver != NULL)
227  SetLatch(&receiver->procLatch);
228 }
PGPROC * mq_receiver
Definition: shm_mq.c:73
Latch procLatch
Definition: proc.h:104
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:72
#define SpinLockRelease(lock)
Definition: spin.h:64
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
#define Assert(condition)
Definition: c.h:699
Definition: proc.h:95
PGPROC * mq_sender
Definition: shm_mq.c:74

◆ shm_mq_wait_for_attach()

shm_mq_result shm_mq_wait_for_attach ( shm_mq_handle mqh)

Definition at line 770 of file shm_mq.c.

References Assert, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_queue, MyProc, SHM_MQ_DETACHED, shm_mq_get_receiver(), shm_mq_get_sender(), SHM_MQ_SUCCESS, and shm_mq_wait_internal().

771 {
772  shm_mq *mq = mqh->mqh_queue;
773  PGPROC **victim;
774 
775  if (shm_mq_get_receiver(mq) == MyProc)
776  victim = &mq->mq_sender;
777  else
778  {
780  victim = &mq->mq_receiver;
781  }
782 
783  if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
784  return SHM_MQ_SUCCESS;
785  else
786  return SHM_MQ_DETACHED;
787 }
PGPROC * MyProc
Definition: proc.c:67
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:249
PGPROC * mq_receiver
Definition: shm_mq.c:73
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:234
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:134
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1149
shm_mq * mqh_queue
Definition: shm_mq.c:132
#define Assert(condition)
Definition: c.h:699
Definition: shm_mq.c:70
Definition: proc.h:95
PGPROC * mq_sender
Definition: shm_mq.c:74

Variable Documentation

◆ shm_mq_minimum_size

PGDLLIMPORT const Size shm_mq_minimum_size

Definition at line 160 of file shm_mq.c.

Referenced by setup_dynamic_shared_memory().