PostgreSQL Source Code git master
Loading...
Searching...
No Matches
shm_mq.h File Reference
#include "postmaster/bgworker.h"
#include "storage/dsm.h"
Include dependency graph for shm_mq.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  shm_mq_iovec
 

Typedefs

typedef struct PGPROC PGPROC
 
typedef struct shm_mq shm_mq
 
typedef struct shm_mq_handle shm_mq_handle
 

Enumerations

enum  shm_mq_result { SHM_MQ_SUCCESS , SHM_MQ_WOULD_BLOCK , SHM_MQ_DETACHED }
 

Functions

shm_mqshm_mq_create (void *address, Size size)
 
void shm_mq_set_receiver (shm_mq *mq, PGPROC *)
 
void shm_mq_set_sender (shm_mq *mq, PGPROC *)
 
PGPROCshm_mq_get_receiver (shm_mq *)
 
PGPROCshm_mq_get_sender (shm_mq *)
 
shm_mq_handleshm_mq_attach (shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 
void shm_mq_set_handle (shm_mq_handle *, BackgroundWorkerHandle *)
 
void shm_mq_detach (shm_mq_handle *mqh)
 
shm_mqshm_mq_get_queue (shm_mq_handle *mqh)
 
shm_mq_result shm_mq_send (shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
 
shm_mq_result shm_mq_sendv (shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool force_flush)
 
shm_mq_result shm_mq_receive (shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 
shm_mq_result shm_mq_wait_for_attach (shm_mq_handle *mqh)
 

Variables

PGDLLIMPORT const Size shm_mq_minimum_size
 

Typedef Documentation

◆ PGPROC

Definition at line 20 of file shm_mq.h.

◆ shm_mq

Definition at line 24 of file shm_mq.h.

◆ shm_mq_handle

Definition at line 28 of file shm_mq.h.

Enumeration Type Documentation

◆ shm_mq_result

Enumerator
SHM_MQ_SUCCESS 
SHM_MQ_WOULD_BLOCK 
SHM_MQ_DETACHED 

Definition at line 38 of file shm_mq.h.

39{
40 SHM_MQ_SUCCESS, /* Sent or received a message. */
41 SHM_MQ_WOULD_BLOCK, /* Not completed; retry later. */
42 SHM_MQ_DETACHED, /* Other process has detached queue. */
shm_mq_result
Definition shm_mq.h:39
@ SHM_MQ_SUCCESS
Definition shm_mq.h:40
@ SHM_MQ_WOULD_BLOCK
Definition shm_mq.h:41
@ SHM_MQ_DETACHED
Definition shm_mq.h:42

Function Documentation

◆ shm_mq_attach()

shm_mq_handle * shm_mq_attach ( shm_mq mq,
dsm_segment seg,
BackgroundWorkerHandle handle 
)
extern

Definition at line 292 of file shm_mq.c.

293{
295
296 Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
297 mqh->mqh_queue = mq;
298 mqh->mqh_segment = seg;
299 mqh->mqh_handle = handle;
300 mqh->mqh_buffer = NULL;
301 mqh->mqh_buflen = 0;
302 mqh->mqh_consume_pending = 0;
303 mqh->mqh_send_pending = 0;
304 mqh->mqh_partial_bytes = 0;
305 mqh->mqh_expected_bytes = 0;
306 mqh->mqh_length_word_complete = false;
307 mqh->mqh_counterparty_attached = false;
308 mqh->mqh_context = CurrentMemoryContext;
309
310 if (seg != NULL)
312
313 return mqh;
314}
#define Assert(condition)
Definition c.h:906
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition dsm.c:1132
#define palloc_object(type)
Definition fe_memutils.h:74
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
static Datum PointerGetDatum(const void *X)
Definition postgres.h:352
static int fb(int x)
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition shm_mq.c:1325
PGPROC * MyProc
Definition proc.c:68

References Assert, CurrentMemoryContext, fb(), MyProc, on_dsm_detach(), palloc_object, PointerGetDatum(), and shm_mq_detach_callback().

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), pa_setup_dsm(), ParallelApplyWorkerMain(), ParallelWorkerMain(), ReinitializeParallelDSM(), and test_shm_mq_setup().

◆ shm_mq_create()

shm_mq * shm_mq_create ( void address,
Size  size 
)
extern

Definition at line 179 of file shm_mq.c.

180{
181 shm_mq *mq = address;
183
184 /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
185 size = MAXALIGN_DOWN(size);
186
187 /* Queue size must be large enough to hold some data. */
188 Assert(size > data_offset);
189
190 /* Initialize queue header. */
191 SpinLockInit(&mq->mq_mutex);
192 mq->mq_receiver = NULL;
193 mq->mq_sender = NULL;
194 pg_atomic_init_u64(&mq->mq_bytes_read, 0);
195 pg_atomic_init_u64(&mq->mq_bytes_written, 0);
196 mq->mq_ring_size = size - data_offset;
197 mq->mq_detached = false;
198 mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
199
200 return mq;
201}
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:453
#define MAXALIGN_DOWN(LEN)
Definition c.h:871
#define MAXALIGN(LEN)
Definition c.h:859
size_t Size
Definition c.h:652
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50

References Assert, fb(), MAXALIGN, MAXALIGN_DOWN, pg_atomic_init_u64(), and SpinLockInit().

Referenced by ExecParallelSetupTupleQueues(), InitializeParallelDSM(), pa_setup_dsm(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

◆ shm_mq_detach()

void shm_mq_detach ( shm_mq_handle mqh)
extern

Definition at line 845 of file shm_mq.c.

846{
847 /* Before detaching, notify the receiver about any already-written data. */
848 if (mqh->mqh_send_pending > 0)
849 {
850 shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending);
851 mqh->mqh_send_pending = 0;
852 }
853
854 /* Notify counterparty that we're outta here. */
855 shm_mq_detach_internal(mqh->mqh_queue);
856
857 /* Cancel on_dsm_detach callback, if any. */
858 if (mqh->mqh_segment)
859 cancel_on_dsm_detach(mqh->mqh_segment,
861 PointerGetDatum(mqh->mqh_queue));
862
863 /* Release local memory associated with handle. */
864 if (mqh->mqh_buffer != NULL)
865 pfree(mqh->mqh_buffer);
866 pfree(mqh);
867}
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition dsm.c:1147
void pfree(void *pointer)
Definition mcxt.c:1616
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
Definition shm_mq.c:1305
static void shm_mq_detach_internal(shm_mq *mq)
Definition shm_mq.c:884

References cancel_on_dsm_detach(), fb(), pfree(), PointerGetDatum(), shm_mq_detach_callback(), shm_mq_detach_internal(), and shm_mq_inc_bytes_written().

Referenced by DestroyParallelContext(), ExecParallelFinish(), LaunchParallelWorkers(), logicalrep_pa_worker_stop(), mq_putmessage(), pa_detach_all_error_mq(), pa_free_worker_info(), ProcessParallelMessage(), tqueueDestroyReceiver(), and tqueueShutdownReceiver().

◆ shm_mq_get_queue()

shm_mq * shm_mq_get_queue ( shm_mq_handle mqh)
extern

Definition at line 907 of file shm_mq.c.

908{
909 return mqh->mqh_queue;
910}

References fb().

Referenced by WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

◆ shm_mq_get_receiver()

PGPROC * shm_mq_get_receiver ( shm_mq mq)
extern

Definition at line 244 of file shm_mq.c.

245{
247
248 SpinLockAcquire(&mq->mq_mutex);
249 receiver = mq->mq_receiver;
250 SpinLockRelease(&mq->mq_mutex);
251
252 return receiver;
253}
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
Definition proc.h:176

References fb(), SpinLockAcquire(), and SpinLockRelease().

Referenced by shm_mq_send_bytes(), and shm_mq_wait_for_attach().

◆ shm_mq_get_sender()

PGPROC * shm_mq_get_sender ( shm_mq mq)
extern

Definition at line 259 of file shm_mq.c.

260{
261 PGPROC *sender;
262
263 SpinLockAcquire(&mq->mq_mutex);
264 sender = mq->mq_sender;
265 SpinLockRelease(&mq->mq_mutex);
266
267 return sender;
268}

References fb(), SpinLockAcquire(), and SpinLockRelease().

Referenced by shm_mq_receive(), shm_mq_wait_for_attach(), WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

◆ shm_mq_receive()

shm_mq_result shm_mq_receive ( shm_mq_handle mqh,
Size nbytesp,
void **  datap,
bool  nowait 
)
extern

Definition at line 574 of file shm_mq.c.

575{
576 shm_mq *mq = mqh->mqh_queue;
577 shm_mq_result res;
578 Size rb = 0;
579 Size nbytes;
580 void *rawdata;
581
582 Assert(mq->mq_receiver == MyProc);
583
584 /* We can't receive data until the sender has attached. */
585 if (!mqh->mqh_counterparty_attached)
586 {
587 if (nowait)
588 {
590
591 /*
592 * We shouldn't return at this point at all unless the sender
593 * hasn't attached yet. However, the correct return value depends
594 * on whether the sender is still attached. If we first test
595 * whether the sender has ever attached and then test whether the
596 * sender has detached, there's a race condition: a sender that
597 * attaches and detaches very quickly might fool us into thinking
598 * the sender never attached at all. So, test whether our
599 * counterparty is definitively gone first, and only afterwards
600 * check whether the sender ever attached in the first place.
601 */
603 if (shm_mq_get_sender(mq) == NULL)
604 {
606 return SHM_MQ_DETACHED;
607 else
608 return SHM_MQ_WOULD_BLOCK;
609 }
610 }
611 else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
613 {
614 mq->mq_detached = true;
615 return SHM_MQ_DETACHED;
616 }
617 mqh->mqh_counterparty_attached = true;
618 }
619
620 /*
621 * If we've consumed an amount of data greater than 1/4th of the ring
622 * size, mark it consumed in shared memory. We try to avoid doing this
623 * unnecessarily when only a small amount of data has been consumed,
624 * because SetLatch() is fairly expensive and we don't want to do it too
625 * often.
626 */
627 if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
628 {
629 shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
630 mqh->mqh_consume_pending = 0;
631 }
632
633 /* Try to read, or finish reading, the length word from the buffer. */
634 while (!mqh->mqh_length_word_complete)
635 {
636 /* Try to receive the message length word. */
637 Assert(mqh->mqh_partial_bytes < sizeof(Size));
638 res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
639 nowait, &rb, &rawdata);
640 if (res != SHM_MQ_SUCCESS)
641 return res;
642
643 /*
644 * Hopefully, we'll receive the entire message length word at once.
645 * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
646 * multiple reads.
647 */
648 if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
649 {
650 Size needed;
651
652 nbytes = *(Size *) rawdata;
653
654 /* If we've already got the whole message, we're done. */
655 needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
656 if (rb >= needed)
657 {
658 mqh->mqh_consume_pending += needed;
659 *nbytesp = nbytes;
660 *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
661 return SHM_MQ_SUCCESS;
662 }
663
664 /*
665 * We don't have the whole message, but we at least have the whole
666 * length word.
667 */
668 mqh->mqh_expected_bytes = nbytes;
669 mqh->mqh_length_word_complete = true;
670 mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
671 rb -= MAXALIGN(sizeof(Size));
672 }
673 else
674 {
676
677 /* Can't be split unless bigger than required alignment. */
678 Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
679
680 /* Message word is split; need buffer to reassemble. */
681 if (mqh->mqh_buffer == NULL)
682 {
683 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
685 mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
686 }
687 Assert(mqh->mqh_buflen >= sizeof(Size));
688
689 /* Copy partial length word; remember to consume it. */
690 if (mqh->mqh_partial_bytes + rb > sizeof(Size))
691 lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
692 else
693 lengthbytes = rb;
694 memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
696 mqh->mqh_partial_bytes += lengthbytes;
697 mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
698 rb -= lengthbytes;
699
700 /* If we now have the whole word, we're ready to read payload. */
701 if (mqh->mqh_partial_bytes >= sizeof(Size))
702 {
703 Assert(mqh->mqh_partial_bytes == sizeof(Size));
704 mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
705 mqh->mqh_length_word_complete = true;
706 mqh->mqh_partial_bytes = 0;
707 }
708 }
709 }
710 nbytes = mqh->mqh_expected_bytes;
711
712 /*
713 * Should be disallowed on the sending side already, but better check and
714 * error out on the receiver side as well rather than trying to read a
715 * prohibitively large message.
716 */
717 if (nbytes > MaxAllocSize)
720 errmsg("invalid message size %zu in shared memory queue",
721 nbytes)));
722
723 if (mqh->mqh_partial_bytes == 0)
724 {
725 /*
726 * Try to obtain the whole message in a single chunk. If this works,
727 * we need not copy the data and can return a pointer directly into
728 * shared memory.
729 */
730 res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
731 if (res != SHM_MQ_SUCCESS)
732 return res;
733 if (rb >= nbytes)
734 {
735 mqh->mqh_length_word_complete = false;
736 mqh->mqh_consume_pending += MAXALIGN(nbytes);
737 *nbytesp = nbytes;
738 *datap = rawdata;
739 return SHM_MQ_SUCCESS;
740 }
741
742 /*
743 * The message has wrapped the buffer. We'll need to copy it in order
744 * to return it to the client in one chunk. First, make sure we have
745 * a large enough buffer available.
746 */
747 if (mqh->mqh_buflen < nbytes)
748 {
750
751 /*
752 * Increase size to the next power of 2 that's >= nbytes, but
753 * limit to MaxAllocSize.
754 */
757
758 if (mqh->mqh_buffer != NULL)
759 {
760 pfree(mqh->mqh_buffer);
761 mqh->mqh_buffer = NULL;
762 mqh->mqh_buflen = 0;
763 }
764 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
765 mqh->mqh_buflen = newbuflen;
766 }
767 }
768
769 /* Loop until we've copied the entire message. */
770 for (;;)
771 {
773
774 /* Copy as much as we can. */
775 Assert(mqh->mqh_partial_bytes + rb <= nbytes);
776 if (rb > 0)
777 {
778 memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
779 mqh->mqh_partial_bytes += rb;
780 }
781
782 /*
783 * Update count of bytes that can be consumed, accounting for
784 * alignment padding. Note that this will never actually insert any
785 * padding except at the end of a message, because the buffer size is
786 * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
787 */
788 Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
789 mqh->mqh_consume_pending += MAXALIGN(rb);
790
791 /* If we got all the data, exit the loop. */
792 if (mqh->mqh_partial_bytes >= nbytes)
793 break;
794
795 /* Wait for some more data. */
796 still_needed = nbytes - mqh->mqh_partial_bytes;
797 res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
798 if (res != SHM_MQ_SUCCESS)
799 return res;
800 if (rb > still_needed)
802 }
803
804 /* Return the complete message, and reset for next message. */
805 *nbytesp = nbytes;
806 *datap = mqh->mqh_buffer;
807 mqh->mqh_length_word_complete = false;
808 mqh->mqh_partial_bytes = 0;
809 return SHM_MQ_SUCCESS;
810}
#define Min(x, y)
Definition c.h:1054
int errcode(int sqlerrcode)
Definition elog.c:874
#define ERROR
Definition elog.h:39
#define ereport(elevel,...)
Definition elog.h:150
#define MaxAllocSize
Definition fe_memutils.h:22
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1232
static char * errmsg
#define pg_nextpower2_size_t
#define MQH_INITIAL_BUFSIZE
Definition shm_mq.c:173
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition shm_mq.c:259
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
Definition shm_mq.c:1181
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition shm_mq.c:1220
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
Definition shm_mq.c:1272
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition shm_mq.c:1081

References Assert, ereport, errcode(), errmsg, ERROR, fb(), MAXALIGN, MaxAllocSize, MemoryContextAlloc(), Min, MQH_INITIAL_BUFSIZE, MyProc, pfree(), pg_nextpower2_size_t, shm_mq_counterparty_gone(), SHM_MQ_DETACHED, shm_mq_get_sender(), shm_mq_inc_bytes_read(), shm_mq_receive_bytes(), SHM_MQ_SUCCESS, shm_mq_wait_internal(), and SHM_MQ_WOULD_BLOCK.

Referenced by copy_messages(), LogicalParallelApplyLoop(), ProcessParallelApplyMessages(), ProcessParallelMessages(), test_shm_mq(), test_shm_mq_pipelined(), and TupleQueueReaderNext().

◆ shm_mq_send()

shm_mq_result shm_mq_send ( shm_mq_handle mqh,
Size  nbytes,
const void data,
bool  nowait,
bool  force_flush 
)
extern

Definition at line 331 of file shm_mq.c.

333{
335
336 iov.data = data;
337 iov.len = nbytes;
338
339 return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
340}
const void * data
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool force_flush)
Definition shm_mq.c:363
const char * data
Definition shm_mq.h:33

References shm_mq_iovec::data, data, fb(), and shm_mq_sendv().

Referenced by copy_messages(), pa_send_data(), test_shm_mq(), test_shm_mq_pipelined(), and tqueueReceiveSlot().

◆ shm_mq_sendv()

shm_mq_result shm_mq_sendv ( shm_mq_handle mqh,
shm_mq_iovec iov,
int  iovcnt,
bool  nowait,
bool  force_flush 
)
extern

Definition at line 363 of file shm_mq.c.

365{
366 shm_mq_result res;
367 shm_mq *mq = mqh->mqh_queue;
369 Size nbytes = 0;
371 int i;
372 int which_iov = 0;
373 Size offset;
374
375 Assert(mq->mq_sender == MyProc);
376
377 /* Compute total size of write. */
378 for (i = 0; i < iovcnt; ++i)
379 nbytes += iov[i].len;
380
381 /* Prevent writing messages overwhelming the receiver. */
382 if (nbytes > MaxAllocSize)
385 errmsg("cannot send a message of size %zu via shared memory queue",
386 nbytes)));
387
388 /* Try to write, or finish writing, the length word into the buffer. */
389 while (!mqh->mqh_length_word_complete)
390 {
391 Assert(mqh->mqh_partial_bytes < sizeof(Size));
392 res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
393 ((char *) &nbytes) + mqh->mqh_partial_bytes,
394 nowait, &bytes_written);
395
396 if (res == SHM_MQ_DETACHED)
397 {
398 /* Reset state in case caller tries to send another message. */
399 mqh->mqh_partial_bytes = 0;
400 mqh->mqh_length_word_complete = false;
401 return res;
402 }
403 mqh->mqh_partial_bytes += bytes_written;
404
405 if (mqh->mqh_partial_bytes >= sizeof(Size))
406 {
407 Assert(mqh->mqh_partial_bytes == sizeof(Size));
408
409 mqh->mqh_partial_bytes = 0;
410 mqh->mqh_length_word_complete = true;
411 }
412
413 if (res != SHM_MQ_SUCCESS)
414 return res;
415
416 /* Length word can't be split unless bigger than required alignment. */
417 Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
418 }
419
420 /* Write the actual data bytes into the buffer. */
421 Assert(mqh->mqh_partial_bytes <= nbytes);
422 offset = mqh->mqh_partial_bytes;
423 do
424 {
426
427 /* Figure out which bytes need to be sent next. */
428 if (offset >= iov[which_iov].len)
429 {
430 offset -= iov[which_iov].len;
431 ++which_iov;
432 if (which_iov >= iovcnt)
433 break;
434 continue;
435 }
436
437 /*
438 * We want to avoid copying the data if at all possible, but every
439 * chunk of bytes we write into the queue has to be MAXALIGN'd, except
440 * the last. Thus, if a chunk other than the last one ends on a
441 * non-MAXALIGN'd boundary, we have to combine the tail end of its
442 * data with data from one or more following chunks until we either
443 * reach the last chunk or accumulate a number of bytes which is
444 * MAXALIGN'd.
445 */
446 if (which_iov + 1 < iovcnt &&
447 offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
448 {
450 int j = 0;
451
452 for (;;)
453 {
454 if (offset < iov[which_iov].len)
455 {
456 tmpbuf[j] = iov[which_iov].data[offset];
457 j++;
458 offset++;
459 if (j == MAXIMUM_ALIGNOF)
460 break;
461 }
462 else
463 {
464 offset -= iov[which_iov].len;
465 which_iov++;
466 if (which_iov >= iovcnt)
467 break;
468 }
469 }
470
471 res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
472
473 if (res == SHM_MQ_DETACHED)
474 {
475 /* Reset state in case caller tries to send another message. */
476 mqh->mqh_partial_bytes = 0;
477 mqh->mqh_length_word_complete = false;
478 return res;
479 }
480
481 mqh->mqh_partial_bytes += bytes_written;
482 if (res != SHM_MQ_SUCCESS)
483 return res;
484 continue;
485 }
486
487 /*
488 * If this is the last chunk, we can write all the data, even if it
489 * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
490 * MAXALIGN_DOWN the write size.
491 */
492 chunksize = iov[which_iov].len - offset;
493 if (which_iov + 1 < iovcnt)
496 nowait, &bytes_written);
497
498 if (res == SHM_MQ_DETACHED)
499 {
500 /* Reset state in case caller tries to send another message. */
501 mqh->mqh_length_word_complete = false;
502 mqh->mqh_partial_bytes = 0;
503 return res;
504 }
505
506 mqh->mqh_partial_bytes += bytes_written;
507 offset += bytes_written;
508 if (res != SHM_MQ_SUCCESS)
509 return res;
510 } while (mqh->mqh_partial_bytes < nbytes);
511
512 /* Reset for next message. */
513 mqh->mqh_partial_bytes = 0;
514 mqh->mqh_length_word_complete = false;
515
516 /* If queue has been detached, let caller know. */
517 if (mq->mq_detached)
518 return SHM_MQ_DETACHED;
519
520 /*
521 * If the counterparty is known to have attached, we can read mq_receiver
522 * without acquiring the spinlock. Otherwise, more caution is needed.
523 */
524 if (mqh->mqh_counterparty_attached)
525 receiver = mq->mq_receiver;
526 else
527 {
528 SpinLockAcquire(&mq->mq_mutex);
529 receiver = mq->mq_receiver;
530 SpinLockRelease(&mq->mq_mutex);
531 if (receiver != NULL)
532 mqh->mqh_counterparty_attached = true;
533 }
534
535 /*
536 * If the caller has requested force flush or we have written more than
537 * 1/4 of the ring size, mark it as written in shared memory and notify
538 * the receiver.
539 */
540 if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
541 {
542 shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
543 if (receiver != NULL)
544 SetLatch(&receiver->procLatch);
545 mqh->mqh_send_pending = 0;
546 }
547
548 return SHM_MQ_SUCCESS;
549}
int j
Definition isn.c:78
int i
Definition isn.c:77
void SetLatch(Latch *latch)
Definition latch.c:290
const void size_t len
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition shm_mq.c:916
static StringInfoData tmpbuf
Definition walsender.c:179

References Assert, StringInfoData::data, data, ereport, errcode(), errmsg, ERROR, fb(), i, j, len, MAXALIGN_DOWN, MaxAllocSize, MyProc, SetLatch(), SHM_MQ_DETACHED, shm_mq_inc_bytes_written(), shm_mq_send_bytes(), SHM_MQ_SUCCESS, SpinLockAcquire(), SpinLockRelease(), and tmpbuf.

Referenced by mq_putmessage(), and shm_mq_send().

◆ shm_mq_set_handle()

void shm_mq_set_handle ( shm_mq_handle mqh,
BackgroundWorkerHandle handle 
)
extern

Definition at line 321 of file shm_mq.c.

322{
323 Assert(mqh->mqh_handle == NULL);
324 mqh->mqh_handle = handle;
325}

References Assert, and fb().

Referenced by ExecParallelCreateReaders(), and LaunchParallelWorkers().

◆ shm_mq_set_receiver()

void shm_mq_set_receiver ( shm_mq mq,
PGPROC proc 
)
extern

Definition at line 208 of file shm_mq.c.

209{
210 PGPROC *sender;
211
212 SpinLockAcquire(&mq->mq_mutex);
213 Assert(mq->mq_receiver == NULL);
214 mq->mq_receiver = proc;
215 sender = mq->mq_sender;
216 SpinLockRelease(&mq->mq_mutex);
217
218 if (sender != NULL)
219 SetLatch(&sender->procLatch);
220}

References Assert, fb(), SetLatch(), SpinLockAcquire(), and SpinLockRelease().

Referenced by attach_to_queues(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), pa_setup_dsm(), ParallelApplyWorkerMain(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

◆ shm_mq_set_sender()

void shm_mq_set_sender ( shm_mq mq,
PGPROC proc 
)
extern

Definition at line 226 of file shm_mq.c.

227{
229
230 SpinLockAcquire(&mq->mq_mutex);
231 Assert(mq->mq_sender == NULL);
232 mq->mq_sender = proc;
233 receiver = mq->mq_receiver;
234 SpinLockRelease(&mq->mq_mutex);
235
236 if (receiver != NULL)
237 SetLatch(&receiver->procLatch);
238}

References Assert, fb(), SetLatch(), SpinLockAcquire(), and SpinLockRelease().

Referenced by attach_to_queues(), ExecParallelGetReceiver(), pa_setup_dsm(), ParallelApplyWorkerMain(), ParallelWorkerMain(), and setup_dynamic_shared_memory().

◆ shm_mq_wait_for_attach()

shm_mq_result shm_mq_wait_for_attach ( shm_mq_handle mqh)
extern

Definition at line 822 of file shm_mq.c.

823{
824 shm_mq *mq = mqh->mqh_queue;
825 PGPROC **victim;
826
828 victim = &mq->mq_sender;
829 else
830 {
832 victim = &mq->mq_receiver;
833 }
834
835 if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
836 return SHM_MQ_SUCCESS;
837 else
838 return SHM_MQ_DETACHED;
839}
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition shm_mq.c:244

References Assert, fb(), MyProc, SHM_MQ_DETACHED, shm_mq_get_receiver(), shm_mq_get_sender(), SHM_MQ_SUCCESS, and shm_mq_wait_internal().

Variable Documentation

◆ shm_mq_minimum_size

PGDLLIMPORT const Size shm_mq_minimum_size
extern

Definition at line 170 of file shm_mq.c.

Referenced by setup_dynamic_shared_memory().