PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
shm_mq.h File Reference
#include "postmaster/bgworker.h"
#include "storage/dsm.h"
#include "storage/proc.h"
Include dependency graph for shm_mq.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  shm_mq_iovec
 

Typedefs

typedef struct shm_mq shm_mq
 
typedef struct shm_mq_handle shm_mq_handle
 

Enumerations

enum  shm_mq_result { SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, SHM_MQ_DETACHED }
 

Functions

shm_mqshm_mq_create (void *address, Size size)
 
void shm_mq_set_receiver (shm_mq *mq, PGPROC *)
 
void shm_mq_set_sender (shm_mq *mq, PGPROC *)
 
PGPROCshm_mq_get_receiver (shm_mq *)
 
PGPROCshm_mq_get_sender (shm_mq *)
 
shm_mq_handleshm_mq_attach (shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 
void shm_mq_set_handle (shm_mq_handle *, BackgroundWorkerHandle *)
 
void shm_mq_detach (shm_mq *)
 
shm_mqshm_mq_get_queue (shm_mq_handle *mqh)
 
shm_mq_result shm_mq_send (shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
 
shm_mq_result shm_mq_sendv (shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
 
shm_mq_result shm_mq_receive (shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 
shm_mq_result shm_mq_wait_for_attach (shm_mq_handle *mqh)
 

Variables

PGDLLIMPORT const Size shm_mq_minimum_size
 

Typedef Documentation

Definition at line 22 of file shm_mq.h.

Definition at line 26 of file shm_mq.h.

Enumeration Type Documentation

Enumerator
SHM_MQ_SUCCESS 
SHM_MQ_WOULD_BLOCK 
SHM_MQ_DETACHED 

Definition at line 36 of file shm_mq.h.

37 {
38  SHM_MQ_SUCCESS, /* Sent or received a message. */
39  SHM_MQ_WOULD_BLOCK, /* Not completed; retry later. */
40  SHM_MQ_DETACHED /* Other process has detached queue. */
shm_mq_result
Definition: shm_mq.h:36

Function Documentation

shm_mq_handle* shm_mq_attach ( shm_mq mq,
dsm_segment seg,
BackgroundWorkerHandle handle 
)

Definition at line 284 of file shm_mq.c.

References Assert, CurrentMemoryContext, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_segment, MyProc, NULL, on_dsm_detach(), palloc(), PointerGetDatum, and shm_mq_detach_callback().

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ParallelWorkerMain(), ReinitializeParallelDSM(), and test_shm_mq_setup().

285 {
286  shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
287 
288  Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
289  mqh->mqh_queue = mq;
290  mqh->mqh_segment = seg;
291  mqh->mqh_buffer = NULL;
292  mqh->mqh_handle = handle;
293  mqh->mqh_buflen = 0;
294  mqh->mqh_consume_pending = 0;
296  mqh->mqh_partial_bytes = 0;
297  mqh->mqh_length_word_complete = false;
298  mqh->mqh_counterparty_attached = false;
299 
300  if (seg != NULL)
302 
303  return mqh;
304 }
Size mqh_partial_bytes
Definition: shm_mq.c:135
PGPROC * MyProc
Definition: proc.c:67
#define PointerGetDatum(X)
Definition: postgres.h:562
PGPROC * mq_receiver
Definition: shm_mq.c:72
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:131
Size mqh_consume_pending
Definition: shm_mq.c:134
bool mqh_counterparty_attached
Definition: shm_mq.c:138
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1037
char * mqh_buffer
Definition: shm_mq.c:132
bool mqh_length_word_complete
Definition: shm_mq.c:137
Size mqh_buflen
Definition: shm_mq.c:133
dsm_segment * mqh_segment
Definition: shm_mq.c:130
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
shm_mq * mqh_queue
Definition: shm_mq.c:129
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
void * palloc(Size size)
Definition: mcxt.c:849
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1192
MemoryContext mqh_context
Definition: shm_mq.c:139
PGPROC * mq_sender
Definition: shm_mq.c:73
shm_mq* shm_mq_create ( void *  address,
Size  size 
)

Definition at line 167 of file shm_mq.c.

References Assert, MAXALIGN, MAXALIGN_DOWN, shm_mq::mq_bytes_read, shm_mq::mq_bytes_written, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_ring_offset, shm_mq::mq_ring_size, shm_mq::mq_sender, NULL, offsetof, and SpinLockInit.

Referenced by ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

168 {
169  shm_mq *mq = address;
170  Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
171 
172  /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
173  size = MAXALIGN_DOWN(size);
174 
175  /* Queue size must be large enough to hold some data. */
176  Assert(size > data_offset);
177 
178  /* Initialize queue header. */
179  SpinLockInit(&mq->mq_mutex);
180  mq->mq_receiver = NULL;
181  mq->mq_sender = NULL;
182  mq->mq_bytes_read = 0;
183  mq->mq_bytes_written = 0;
184  mq->mq_ring_size = size - data_offset;
185  mq->mq_detached = false;
186  mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
187 
188  return mq;
189 }
#define SpinLockInit(lock)
Definition: spin.h:60
PGPROC * mq_receiver
Definition: shm_mq.c:72
uint64 mq_bytes_written
Definition: shm_mq.c:75
slock_t mq_mutex
Definition: shm_mq.c:71
bool mq_detached
Definition: shm_mq.c:77
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
size_t Size
Definition: c.h:356
#define MAXALIGN(LEN)
Definition: c.h:588
Size mq_ring_size
Definition: shm_mq.c:76
Definition: shm_mq.c:69
uint8 mq_ring_offset
Definition: shm_mq.c:78
uint64 mq_bytes_read
Definition: shm_mq.c:74
#define offsetof(type, field)
Definition: c.h:555
#define MAXALIGN_DOWN(LEN)
Definition: c.h:600
PGPROC * mq_sender
Definition: shm_mq.c:73
void shm_mq_detach ( shm_mq )

Definition at line 778 of file shm_mq.c.

References Assert, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, MyProc, NULL, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by DestroyTupleQueueReader(), mq_putmessage(), shm_mq_detach_callback(), and tqueueShutdownReceiver().

779 {
780  volatile shm_mq *vmq = mq;
781  PGPROC *victim;
782 
783  SpinLockAcquire(&mq->mq_mutex);
784  if (vmq->mq_sender == MyProc)
785  victim = vmq->mq_receiver;
786  else
787  {
788  Assert(vmq->mq_receiver == MyProc);
789  victim = vmq->mq_sender;
790  }
791  vmq->mq_detached = true;
792  SpinLockRelease(&mq->mq_mutex);
793 
794  if (victim != NULL)
795  SetLatch(&victim->procLatch);
796 }
PGPROC * MyProc
Definition: proc.c:67
PGPROC * mq_receiver
Definition: shm_mq.c:72
Latch procLatch
Definition: proc.h:103
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
bool mq_detached
Definition: shm_mq.c:77
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
Definition: shm_mq.c:69
Definition: proc.h:94
PGPROC * mq_sender
Definition: shm_mq.c:73
shm_mq* shm_mq_get_queue ( shm_mq_handle mqh)

Definition at line 802 of file shm_mq.c.

References shm_mq_handle::mqh_queue.

Referenced by DestroyTupleQueueReader(), pq_redirect_to_shm_mq(), and tqueueShutdownReceiver().

803 {
804  return mqh->mqh_queue;
805 }
shm_mq * mqh_queue
Definition: shm_mq.c:129
PGPROC* shm_mq_get_receiver ( shm_mq )

Definition at line 234 of file shm_mq.c.

References shm_mq::mq_mutex, shm_mq::mq_receiver, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_send_bytes(), and shm_mq_wait_for_attach().

235 {
236  volatile shm_mq *vmq = mq;
237  PGPROC *receiver;
238 
239  SpinLockAcquire(&mq->mq_mutex);
240  receiver = vmq->mq_receiver;
241  SpinLockRelease(&mq->mq_mutex);
242 
243  return receiver;
244 }
PGPROC * mq_receiver
Definition: shm_mq.c:72
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
Definition: shm_mq.c:69
Definition: proc.h:94
PGPROC* shm_mq_get_sender ( shm_mq )

Definition at line 250 of file shm_mq.c.

References shm_mq::mq_mutex, shm_mq::mq_sender, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_receive(), and shm_mq_wait_for_attach().

251 {
252  volatile shm_mq *vmq = mq;
253  PGPROC *sender;
254 
255  SpinLockAcquire(&mq->mq_mutex);
256  sender = vmq->mq_sender;
257  SpinLockRelease(&mq->mq_mutex);
258 
259  return sender;
260 }
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
Definition: shm_mq.c:69
Definition: proc.h:94
PGPROC * mq_sender
Definition: shm_mq.c:73
shm_mq_result shm_mq_receive ( shm_mq_handle mqh,
Size nbytesp,
void **  datap,
bool  nowait 
)

Definition at line 518 of file shm_mq.c.

References Assert, Max, MAXALIGN, MemoryContextAlloc(), shm_mq::mq_detached, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_expected_bytes, shm_mq_handle::mqh_handle, MQH_INITIAL_BUFSIZE, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, MyProc, NULL, pfree(), shm_mq_counterparty_gone(), SHM_MQ_DETACHED, shm_mq_get_sender(), shm_mq_inc_bytes_read(), shm_mq_receive_bytes(), SHM_MQ_SUCCESS, shm_mq_wait_internal(), and SHM_MQ_WOULD_BLOCK.

Referenced by copy_messages(), HandleParallelMessages(), test_shm_mq(), test_shm_mq_pipelined(), and TupleQueueReaderNext().

519 {
520  shm_mq *mq = mqh->mqh_queue;
521  shm_mq_result res;
522  Size rb = 0;
523  Size nbytes;
524  void *rawdata;
525 
526  Assert(mq->mq_receiver == MyProc);
527 
528  /* We can't receive data until the sender has attached. */
529  if (!mqh->mqh_counterparty_attached)
530  {
531  if (nowait)
532  {
533  int counterparty_gone;
534 
535  /*
536  * We shouldn't return at this point at all unless the sender
537  * hasn't attached yet. However, the correct return value depends
538  * on whether the sender is still attached. If we first test
539  * whether the sender has ever attached and then test whether the
540  * sender has detached, there's a race condition: a sender that
541  * attaches and detaches very quickly might fool us into thinking
542  * the sender never attached at all. So, test whether our
543  * counterparty is definitively gone first, and only afterwards
544  * check whether the sender ever attached in the first place.
545  */
546  counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
547  if (shm_mq_get_sender(mq) == NULL)
548  {
549  if (counterparty_gone)
550  return SHM_MQ_DETACHED;
551  else
552  return SHM_MQ_WOULD_BLOCK;
553  }
554  }
555  else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
556  && shm_mq_get_sender(mq) == NULL)
557  {
558  mq->mq_detached = true;
559  return SHM_MQ_DETACHED;
560  }
561  mqh->mqh_counterparty_attached = true;
562  }
563 
564  /* Consume any zero-copy data from previous receive operation. */
565  if (mqh->mqh_consume_pending > 0)
566  {
568  mqh->mqh_consume_pending = 0;
569  }
570 
571  /* Try to read, or finish reading, the length word from the buffer. */
572  while (!mqh->mqh_length_word_complete)
573  {
574  /* Try to receive the message length word. */
575  Assert(mqh->mqh_partial_bytes < sizeof(Size));
576  res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes,
577  nowait, &rb, &rawdata);
578  if (res != SHM_MQ_SUCCESS)
579  return res;
580 
581  /*
582  * Hopefully, we'll receive the entire message length word at once.
583  * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
584  * multiple reads.
585  */
586  if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
587  {
588  Size needed;
589 
590  nbytes = *(Size *) rawdata;
591 
592  /* If we've already got the whole message, we're done. */
593  needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
594  if (rb >= needed)
595  {
596  /*
597  * Technically, we could consume the message length
598  * information at this point, but the extra write to shared
599  * memory wouldn't be free and in most cases we would reap no
600  * benefit.
601  */
602  mqh->mqh_consume_pending = needed;
603  *nbytesp = nbytes;
604  *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
605  return SHM_MQ_SUCCESS;
606  }
607 
608  /*
609  * We don't have the whole message, but we at least have the whole
610  * length word.
611  */
612  mqh->mqh_expected_bytes = nbytes;
613  mqh->mqh_length_word_complete = true;
614  shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size)));
615  rb -= MAXALIGN(sizeof(Size));
616  }
617  else
618  {
619  Size lengthbytes;
620 
621  /* Can't be split unless bigger than required alignment. */
622  Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
623 
624  /* Message word is split; need buffer to reassemble. */
625  if (mqh->mqh_buffer == NULL)
626  {
630  }
631  Assert(mqh->mqh_buflen >= sizeof(Size));
632 
633  /* Copy and consume partial length word. */
634  if (mqh->mqh_partial_bytes + rb > sizeof(Size))
635  lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
636  else
637  lengthbytes = rb;
638  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
639  lengthbytes);
640  mqh->mqh_partial_bytes += lengthbytes;
641  shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes));
642  rb -= lengthbytes;
643 
644  /* If we now have the whole word, we're ready to read payload. */
645  if (mqh->mqh_partial_bytes >= sizeof(Size))
646  {
647  Assert(mqh->mqh_partial_bytes == sizeof(Size));
648  mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
649  mqh->mqh_length_word_complete = true;
650  mqh->mqh_partial_bytes = 0;
651  }
652  }
653  }
654  nbytes = mqh->mqh_expected_bytes;
655 
656  if (mqh->mqh_partial_bytes == 0)
657  {
658  /*
659  * Try to obtain the whole message in a single chunk. If this works,
660  * we need not copy the data and can return a pointer directly into
661  * shared memory.
662  */
663  res = shm_mq_receive_bytes(mq, nbytes, nowait, &rb, &rawdata);
664  if (res != SHM_MQ_SUCCESS)
665  return res;
666  if (rb >= nbytes)
667  {
668  mqh->mqh_length_word_complete = false;
669  mqh->mqh_consume_pending = MAXALIGN(nbytes);
670  *nbytesp = nbytes;
671  *datap = rawdata;
672  return SHM_MQ_SUCCESS;
673  }
674 
675  /*
676  * The message has wrapped the buffer. We'll need to copy it in order
677  * to return it to the client in one chunk. First, make sure we have
678  * a large enough buffer available.
679  */
680  if (mqh->mqh_buflen < nbytes)
681  {
682  Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
683 
684  while (newbuflen < nbytes)
685  newbuflen *= 2;
686 
687  if (mqh->mqh_buffer != NULL)
688  {
689  pfree(mqh->mqh_buffer);
690  mqh->mqh_buffer = NULL;
691  mqh->mqh_buflen = 0;
692  }
693  mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
694  mqh->mqh_buflen = newbuflen;
695  }
696  }
697 
698  /* Loop until we've copied the entire message. */
699  for (;;)
700  {
701  Size still_needed;
702 
703  /* Copy as much as we can. */
704  Assert(mqh->mqh_partial_bytes + rb <= nbytes);
705  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
706  mqh->mqh_partial_bytes += rb;
707 
708  /*
709  * Update count of bytes read, with alignment padding. Note that this
710  * will never actually insert any padding except at the end of a
711  * message, because the buffer size is a multiple of MAXIMUM_ALIGNOF,
712  * and each read and write is as well.
713  */
714  Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
716 
717  /* If we got all the data, exit the loop. */
718  if (mqh->mqh_partial_bytes >= nbytes)
719  break;
720 
721  /* Wait for some more data. */
722  still_needed = nbytes - mqh->mqh_partial_bytes;
723  res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
724  if (res != SHM_MQ_SUCCESS)
725  return res;
726  if (rb > still_needed)
727  rb = still_needed;
728  }
729 
730  /* Return the complete message, and reset for next message. */
731  *nbytesp = nbytes;
732  *datap = mqh->mqh_buffer;
733  mqh->mqh_length_word_complete = false;
734  mqh->mqh_partial_bytes = 0;
735  return SHM_MQ_SUCCESS;
736 }
Size mqh_partial_bytes
Definition: shm_mq.c:135
PGPROC * MyProc
Definition: proc.c:67
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:250
PGPROC * mq_receiver
Definition: shm_mq.c:72
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:131
Size mqh_consume_pending
Definition: shm_mq.c:134
bool mqh_counterparty_attached
Definition: shm_mq.c:138
static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition: shm_mq.c:947
char * mqh_buffer
Definition: shm_mq.c:132
static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1056
bool mqh_length_word_complete
Definition: shm_mq.c:137
void pfree(void *pointer)
Definition: mcxt.c:950
static bool shm_mq_counterparty_gone(volatile shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1009
#define MQH_INITIAL_BUFSIZE
Definition: shm_mq.c:161
Size mqh_buflen
Definition: shm_mq.c:133
Size mqh_expected_bytes
Definition: shm_mq.c:136
shm_mq * mqh_queue
Definition: shm_mq.c:129
bool mq_detached
Definition: shm_mq.c:77
shm_mq_result
Definition: shm_mq.h:36
#define Max(x, y)
Definition: c.h:800
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n)
Definition: shm_mq.c:1127
size_t Size
Definition: c.h:356
#define MAXALIGN(LEN)
Definition: c.h:588
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
Definition: shm_mq.c:69
MemoryContext mqh_context
Definition: shm_mq.c:139
PGPROC * mq_sender
Definition: shm_mq.c:73
shm_mq_result shm_mq_send ( shm_mq_handle mqh,
Size  nbytes,
const void *  data,
bool  nowait 
)

Definition at line 321 of file shm_mq.c.

References shm_mq_iovec::data, shm_mq_iovec::len, and shm_mq_sendv().

Referenced by copy_messages(), test_shm_mq(), test_shm_mq_pipelined(), TQSendRecordInfo(), and tqueueReceiveSlot().

322 {
323  shm_mq_iovec iov;
324 
325  iov.data = data;
326  iov.len = nbytes;
327 
328  return shm_mq_sendv(mqh, &iov, 1, nowait);
329 }
Size len
Definition: shm_mq.h:32
const char * data
Definition: shm_mq.h:31
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
Definition: shm_mq.c:347
shm_mq_result shm_mq_sendv ( shm_mq_handle mqh,
shm_mq_iovec iov,
int  iovcnt,
bool  nowait 
)

Definition at line 347 of file shm_mq.c.

References Assert, shm_mq_iovec::data, i, shm_mq_iovec::len, MAXALIGN_DOWN, shm_mq::mq_sender, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, MyProc, SHM_MQ_DETACHED, shm_mq_notify_receiver(), shm_mq_send_bytes(), SHM_MQ_SUCCESS, and tmpbuf.

Referenced by mq_putmessage(), and shm_mq_send().

348 {
349  shm_mq_result res;
350  shm_mq *mq = mqh->mqh_queue;
351  Size nbytes = 0;
352  Size bytes_written;
353  int i;
354  int which_iov = 0;
355  Size offset;
356 
357  Assert(mq->mq_sender == MyProc);
358 
359  /* Compute total size of write. */
360  for (i = 0; i < iovcnt; ++i)
361  nbytes += iov[i].len;
362 
363  /* Try to write, or finish writing, the length word into the buffer. */
364  while (!mqh->mqh_length_word_complete)
365  {
366  Assert(mqh->mqh_partial_bytes < sizeof(Size));
367  res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
368  ((char *) &nbytes) +mqh->mqh_partial_bytes,
369  nowait, &bytes_written);
370 
371  if (res == SHM_MQ_DETACHED)
372  {
373  /* Reset state in case caller tries to send another message. */
374  mqh->mqh_partial_bytes = 0;
375  mqh->mqh_length_word_complete = false;
376  return res;
377  }
378  mqh->mqh_partial_bytes += bytes_written;
379 
380  if (mqh->mqh_partial_bytes >= sizeof(Size))
381  {
382  Assert(mqh->mqh_partial_bytes == sizeof(Size));
383 
384  mqh->mqh_partial_bytes = 0;
385  mqh->mqh_length_word_complete = true;
386  }
387 
388  if (res != SHM_MQ_SUCCESS)
389  return res;
390 
391  /* Length word can't be split unless bigger than required alignment. */
392  Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
393  }
394 
395  /* Write the actual data bytes into the buffer. */
396  Assert(mqh->mqh_partial_bytes <= nbytes);
397  offset = mqh->mqh_partial_bytes;
398  do
399  {
400  Size chunksize;
401 
402  /* Figure out which bytes need to be sent next. */
403  if (offset >= iov[which_iov].len)
404  {
405  offset -= iov[which_iov].len;
406  ++which_iov;
407  if (which_iov >= iovcnt)
408  break;
409  continue;
410  }
411 
412  /*
413  * We want to avoid copying the data if at all possible, but every
414  * chunk of bytes we write into the queue has to be MAXALIGN'd, except
415  * the last. Thus, if a chunk other than the last one ends on a
416  * non-MAXALIGN'd boundary, we have to combine the tail end of its
417  * data with data from one or more following chunks until we either
418  * reach the last chunk or accumulate a number of bytes which is
419  * MAXALIGN'd.
420  */
421  if (which_iov + 1 < iovcnt &&
422  offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
423  {
424  char tmpbuf[MAXIMUM_ALIGNOF];
425  int j = 0;
426 
427  for (;;)
428  {
429  if (offset < iov[which_iov].len)
430  {
431  tmpbuf[j] = iov[which_iov].data[offset];
432  j++;
433  offset++;
434  if (j == MAXIMUM_ALIGNOF)
435  break;
436  }
437  else
438  {
439  offset -= iov[which_iov].len;
440  which_iov++;
441  if (which_iov >= iovcnt)
442  break;
443  }
444  }
445 
446  res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
447 
448  if (res == SHM_MQ_DETACHED)
449  {
450  /* Reset state in case caller tries to send another message. */
451  mqh->mqh_partial_bytes = 0;
452  mqh->mqh_length_word_complete = false;
453  return res;
454  }
455 
456  mqh->mqh_partial_bytes += bytes_written;
457  if (res != SHM_MQ_SUCCESS)
458  return res;
459  continue;
460  }
461 
462  /*
463  * If this is the last chunk, we can write all the data, even if it
464  * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
465  * MAXALIGN_DOWN the write size.
466  */
467  chunksize = iov[which_iov].len - offset;
468  if (which_iov + 1 < iovcnt)
469  chunksize = MAXALIGN_DOWN(chunksize);
470  res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
471  nowait, &bytes_written);
472 
473  if (res == SHM_MQ_DETACHED)
474  {
475  /* Reset state in case caller tries to send another message. */
476  mqh->mqh_length_word_complete = false;
477  mqh->mqh_partial_bytes = 0;
478  return res;
479  }
480 
481  mqh->mqh_partial_bytes += bytes_written;
482  offset += bytes_written;
483  if (res != SHM_MQ_SUCCESS)
484  return res;
485  } while (mqh->mqh_partial_bytes < nbytes);
486 
487  /* Reset for next message. */
488  mqh->mqh_partial_bytes = 0;
489  mqh->mqh_length_word_complete = false;
490 
491  /* Notify receiver of the newly-written data, and return. */
492  return shm_mq_notify_receiver(mq);
493 }
Size mqh_partial_bytes
Definition: shm_mq.c:135
PGPROC * MyProc
Definition: proc.c:67
static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq)
Definition: shm_mq.c:1173
Size len
Definition: shm_mq.h:32
bool mqh_length_word_complete
Definition: shm_mq.c:137
const char * data
Definition: shm_mq.h:31
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition: shm_mq.c:811
shm_mq * mqh_queue
Definition: shm_mq.c:129
shm_mq_result
Definition: shm_mq.h:36
#define Assert(condition)
Definition: c.h:675
size_t Size
Definition: c.h:356
static StringInfoData tmpbuf
Definition: walsender.c:155
int i
Definition: shm_mq.c:69
#define MAXALIGN_DOWN(LEN)
Definition: c.h:600
PGPROC * mq_sender
Definition: shm_mq.c:73
void shm_mq_set_handle ( shm_mq_handle ,
BackgroundWorkerHandle  
)

Definition at line 311 of file shm_mq.c.

References Assert, shm_mq_handle::mqh_handle, and NULL.

Referenced by ExecGather(), ExecGatherMerge(), and LaunchParallelWorkers().

312 {
313  Assert(mqh->mqh_handle == NULL);
314  mqh->mqh_handle = handle;
315 }
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
void shm_mq_set_receiver ( shm_mq mq,
PGPROC  
)

Definition at line 196 of file shm_mq.c.

References Assert, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, NULL, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by attach_to_queues(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

197 {
198  volatile shm_mq *vmq = mq;
199  PGPROC *sender;
200 
202  Assert(vmq->mq_receiver == NULL);
203  vmq->mq_receiver = proc;
204  sender = vmq->mq_sender;
206 
207  if (sender != NULL)
208  SetLatch(&sender->procLatch);
209 }
PGPROC * mq_receiver
Definition: shm_mq.c:72
Latch procLatch
Definition: proc.h:103
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:71
#define SpinLockRelease(lock)
Definition: spin.h:64
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
Definition: shm_mq.c:69
Definition: proc.h:94
PGPROC * mq_sender
Definition: shm_mq.c:73
void shm_mq_set_sender ( shm_mq mq,
PGPROC  
)

Definition at line 215 of file shm_mq.c.

References Assert, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, NULL, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ParallelWorkerMain(), and setup_dynamic_shared_memory().

216 {
217  volatile shm_mq *vmq = mq;
218  PGPROC *receiver;
219 
221  Assert(vmq->mq_sender == NULL);
222  vmq->mq_sender = proc;
223  receiver = vmq->mq_receiver;
225 
226  if (receiver != NULL)
227  SetLatch(&receiver->procLatch);
228 }
PGPROC * mq_receiver
Definition: shm_mq.c:72
Latch procLatch
Definition: proc.h:103
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:71
#define SpinLockRelease(lock)
Definition: spin.h:64
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
Definition: shm_mq.c:69
Definition: proc.h:94
PGPROC * mq_sender
Definition: shm_mq.c:73
shm_mq_result shm_mq_wait_for_attach ( shm_mq_handle mqh)

Definition at line 748 of file shm_mq.c.

References Assert, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_queue, MyProc, SHM_MQ_DETACHED, shm_mq_get_receiver(), shm_mq_get_sender(), SHM_MQ_SUCCESS, and shm_mq_wait_internal().

749 {
750  shm_mq *mq = mqh->mqh_queue;
751  PGPROC **victim;
752 
753  if (shm_mq_get_receiver(mq) == MyProc)
754  victim = &mq->mq_sender;
755  else
756  {
758  victim = &mq->mq_receiver;
759  }
760 
761  if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
762  return SHM_MQ_SUCCESS;
763  else
764  return SHM_MQ_DETACHED;
765 }
PGPROC * MyProc
Definition: proc.c:67
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:250
PGPROC * mq_receiver
Definition: shm_mq.c:72
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:234
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:131
static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1056
shm_mq * mqh_queue
Definition: shm_mq.c:129
#define Assert(condition)
Definition: c.h:675
Definition: shm_mq.c:69
Definition: proc.h:94
PGPROC * mq_sender
Definition: shm_mq.c:73

Variable Documentation

PGDLLIMPORT const Size shm_mq_minimum_size

Definition at line 158 of file shm_mq.c.

Referenced by setup_dynamic_shared_memory().