PostgreSQL Source Code git master
shm_mq.c File Reference
#include "postgres.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "port/pg_bitutils.h"
#include "postmaster/bgworker.h"
#include "storage/shm_mq.h"
#include "storage/spin.h"
#include "utils/memutils.h"
Include dependency graph for shm_mq.c:

Go to the source code of this file.

Data Structures

struct  shm_mq
 
struct  shm_mq_handle
 

Macros

#define MQH_INITIAL_BUFSIZE   8192
 

Functions

static void shm_mq_detach_internal (shm_mq *mq)
 
static shm_mq_result shm_mq_send_bytes (shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
 
static shm_mq_result shm_mq_receive_bytes (shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
 
static bool shm_mq_counterparty_gone (shm_mq *mq, BackgroundWorkerHandle *handle)
 
static bool shm_mq_wait_internal (shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
 
static void shm_mq_inc_bytes_read (shm_mq *mq, Size n)
 
static void shm_mq_inc_bytes_written (shm_mq *mq, Size n)
 
static void shm_mq_detach_callback (dsm_segment *seg, Datum arg)
 
shm_mqshm_mq_create (void *address, Size size)
 
void shm_mq_set_receiver (shm_mq *mq, PGPROC *proc)
 
void shm_mq_set_sender (shm_mq *mq, PGPROC *proc)
 
PGPROCshm_mq_get_receiver (shm_mq *mq)
 
PGPROCshm_mq_get_sender (shm_mq *mq)
 
shm_mq_handleshm_mq_attach (shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 
void shm_mq_set_handle (shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
 
shm_mq_result shm_mq_send (shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
 
shm_mq_result shm_mq_sendv (shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool force_flush)
 
shm_mq_result shm_mq_receive (shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 
shm_mq_result shm_mq_wait_for_attach (shm_mq_handle *mqh)
 
void shm_mq_detach (shm_mq_handle *mqh)
 
shm_mqshm_mq_get_queue (shm_mq_handle *mqh)
 

Variables

const Size shm_mq_minimum_size
 

Macro Definition Documentation

◆ MQH_INITIAL_BUFSIZE

#define MQH_INITIAL_BUFSIZE   8192

Definition at line 171 of file shm_mq.c.

Function Documentation

◆ shm_mq_attach()

shm_mq_handle * shm_mq_attach ( shm_mq mq,
dsm_segment seg,
BackgroundWorkerHandle handle 
)

Definition at line 290 of file shm_mq.c.

291{
292 shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
293
294 Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
295 mqh->mqh_queue = mq;
296 mqh->mqh_segment = seg;
297 mqh->mqh_handle = handle;
298 mqh->mqh_buffer = NULL;
299 mqh->mqh_buflen = 0;
300 mqh->mqh_consume_pending = 0;
301 mqh->mqh_send_pending = 0;
302 mqh->mqh_partial_bytes = 0;
303 mqh->mqh_expected_bytes = 0;
304 mqh->mqh_length_word_complete = false;
305 mqh->mqh_counterparty_attached = false;
307
308 if (seg != NULL)
310
311 return mqh;
312}
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1132
Assert(PointerIsAligned(start, uint64))
void * palloc(Size size)
Definition: mcxt.c:1317
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:327
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1323
PGPROC * MyProc
Definition: proc.c:66
Size mqh_consume_pending
Definition: shm_mq.c:144
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:141
char * mqh_buffer
Definition: shm_mq.c:142
Size mqh_send_pending
Definition: shm_mq.c:145
Size mqh_expected_bytes
Definition: shm_mq.c:147
bool mqh_counterparty_attached
Definition: shm_mq.c:149
shm_mq * mqh_queue
Definition: shm_mq.c:139
dsm_segment * mqh_segment
Definition: shm_mq.c:140
bool mqh_length_word_complete
Definition: shm_mq.c:148
Size mqh_buflen
Definition: shm_mq.c:143
MemoryContext mqh_context
Definition: shm_mq.c:150
Size mqh_partial_bytes
Definition: shm_mq.c:146
PGPROC * mq_sender
Definition: shm_mq.c:75
PGPROC * mq_receiver
Definition: shm_mq.c:74

References Assert(), CurrentMemoryContext, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_expected_bytes, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_segment, shm_mq_handle::mqh_send_pending, MyProc, on_dsm_detach(), palloc(), PointerGetDatum(), and shm_mq_detach_callback().

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), pa_setup_dsm(), ParallelApplyWorkerMain(), ParallelWorkerMain(), ReinitializeParallelDSM(), and test_shm_mq_setup().

◆ shm_mq_counterparty_gone()

static bool shm_mq_counterparty_gone ( shm_mq mq,
BackgroundWorkerHandle handle 
)
static

Definition at line 1179 of file shm_mq.c.

1180{
1181 pid_t pid;
1182
1183 /* If the queue has been detached, counterparty is definitely gone. */
1184 if (mq->mq_detached)
1185 return true;
1186
1187 /* If there's a handle, check worker status. */
1188 if (handle != NULL)
1189 {
1190 BgwHandleStatus status;
1191
1192 /* Check for unexpected worker death. */
1193 status = GetBackgroundWorkerPid(handle, &pid);
1194 if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1195 {
1196 /* Mark it detached, just to make it official. */
1197 mq->mq_detached = true;
1198 return true;
1199 }
1200 }
1201
1202 /* Counterparty is not definitively gone. */
1203 return false;
1204}
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1157
BgwHandleStatus
Definition: bgworker.h:104
@ BGWH_STARTED
Definition: bgworker.h:105
@ BGWH_NOT_YET_STARTED
Definition: bgworker.h:106
bool mq_detached
Definition: shm_mq.c:79

References BGWH_NOT_YET_STARTED, BGWH_STARTED, GetBackgroundWorkerPid(), and shm_mq::mq_detached.

Referenced by shm_mq_receive(), and shm_mq_send_bytes().

◆ shm_mq_create()

shm_mq * shm_mq_create ( void *  address,
Size  size 
)

Definition at line 177 of file shm_mq.c.

178{
179 shm_mq *mq = address;
180 Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
181
182 /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
183 size = MAXALIGN_DOWN(size);
184
185 /* Queue size must be large enough to hold some data. */
186 Assert(size > data_offset);
187
188 /* Initialize queue header. */
190 mq->mq_receiver = NULL;
191 mq->mq_sender = NULL;
194 mq->mq_ring_size = size - data_offset;
195 mq->mq_detached = false;
196 mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
197
198 return mq;
199}
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:453
#define MAXALIGN_DOWN(LEN)
Definition: c.h:794
#define MAXALIGN(LEN)
Definition: c.h:782
size_t Size
Definition: c.h:576
#define SpinLockInit(lock)
Definition: spin.h:57
Definition: shm_mq.c:72
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:77
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:76
uint8 mq_ring_offset
Definition: shm_mq.c:80
slock_t mq_mutex
Definition: shm_mq.c:73
Size mq_ring_size
Definition: shm_mq.c:78

References Assert(), MAXALIGN, MAXALIGN_DOWN, shm_mq::mq_bytes_read, shm_mq::mq_bytes_written, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_ring_offset, shm_mq::mq_ring_size, shm_mq::mq_sender, pg_atomic_init_u64(), and SpinLockInit.

Referenced by ExecParallelSetupTupleQueues(), InitializeParallelDSM(), pa_setup_dsm(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

◆ shm_mq_detach()

void shm_mq_detach ( shm_mq_handle mqh)

Definition at line 843 of file shm_mq.c.

844{
845 /* Before detaching, notify the receiver about any already-written data. */
846 if (mqh->mqh_send_pending > 0)
847 {
849 mqh->mqh_send_pending = 0;
850 }
851
852 /* Notify counterparty that we're outta here. */
854
855 /* Cancel on_dsm_detach callback, if any. */
856 if (mqh->mqh_segment)
860
861 /* Release local memory associated with handle. */
862 if (mqh->mqh_buffer != NULL)
863 pfree(mqh->mqh_buffer);
864 pfree(mqh);
865}
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1147
void pfree(void *pointer)
Definition: mcxt.c:1524
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
Definition: shm_mq.c:1303
static void shm_mq_detach_internal(shm_mq *mq)
Definition: shm_mq.c:882

References cancel_on_dsm_detach(), shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_segment, shm_mq_handle::mqh_send_pending, pfree(), PointerGetDatum(), shm_mq_detach_callback(), shm_mq_detach_internal(), and shm_mq_inc_bytes_written().

Referenced by DestroyParallelContext(), ExecParallelFinish(), LaunchParallelWorkers(), logicalrep_pa_worker_stop(), mq_putmessage(), pa_detach_all_error_mq(), pa_free_worker_info(), ProcessParallelMessage(), tqueueDestroyReceiver(), and tqueueShutdownReceiver().

◆ shm_mq_detach_callback()

static void shm_mq_detach_callback ( dsm_segment seg,
Datum  arg 
)
static

Definition at line 1323 of file shm_mq.c.

1324{
1325 shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1326
1328}
void * arg
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:317

References arg, DatumGetPointer(), and shm_mq_detach_internal().

Referenced by shm_mq_attach(), and shm_mq_detach().

◆ shm_mq_detach_internal()

static void shm_mq_detach_internal ( shm_mq mq)
static

Definition at line 882 of file shm_mq.c.

883{
884 PGPROC *victim;
885
887 if (mq->mq_sender == MyProc)
888 victim = mq->mq_receiver;
889 else
890 {
891 Assert(mq->mq_receiver == MyProc);
892 victim = mq->mq_sender;
893 }
894 mq->mq_detached = true;
896
897 if (victim != NULL)
898 SetLatch(&victim->procLatch);
899}
void SetLatch(Latch *latch)
Definition: latch.c:288
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
Definition: proc.h:163
Latch procLatch
Definition: proc.h:170

References Assert(), shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, MyProc, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_detach(), and shm_mq_detach_callback().

◆ shm_mq_get_queue()

shm_mq * shm_mq_get_queue ( shm_mq_handle mqh)

Definition at line 905 of file shm_mq.c.

906{
907 return mqh->mqh_queue;
908}

References shm_mq_handle::mqh_queue.

Referenced by WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

◆ shm_mq_get_receiver()

PGPROC * shm_mq_get_receiver ( shm_mq mq)

Definition at line 242 of file shm_mq.c.

243{
244 PGPROC *receiver;
245
247 receiver = mq->mq_receiver;
249
250 return receiver;
251}

References shm_mq::mq_mutex, shm_mq::mq_receiver, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_send_bytes(), and shm_mq_wait_for_attach().

◆ shm_mq_get_sender()

PGPROC * shm_mq_get_sender ( shm_mq mq)

Definition at line 257 of file shm_mq.c.

258{
259 PGPROC *sender;
260
262 sender = mq->mq_sender;
264
265 return sender;
266}

References shm_mq::mq_mutex, shm_mq::mq_sender, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_receive(), shm_mq_wait_for_attach(), WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

◆ shm_mq_inc_bytes_read()

static void shm_mq_inc_bytes_read ( shm_mq mq,
Size  n 
)
static

Definition at line 1270 of file shm_mq.c.

1271{
1272 PGPROC *sender;
1273
1274 /*
1275 * Separate prior reads of mq_ring from the increment of mq_bytes_read
1276 * which follows. This pairs with the full barrier in
1277 * shm_mq_send_bytes(). We only need a read barrier here because the
1278 * increment of mq_bytes_read is actually a read followed by a dependent
1279 * write.
1280 */
1282
1283 /*
1284 * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1285 * else can be changing this value. This method should be cheaper.
1286 */
1289
1290 /*
1291 * We shouldn't have any bytes to read without a sender, so we can read
1292 * mq_sender here without a lock. Once it's initialized, it can't change.
1293 */
1294 sender = mq->mq_sender;
1295 Assert(sender != NULL);
1296 SetLatch(&sender->procLatch);
1297}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:485
#define pg_read_barrier()
Definition: atomics.h:156
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:467

References Assert(), shm_mq::mq_bytes_read, shm_mq::mq_sender, pg_atomic_read_u64(), pg_atomic_write_u64(), pg_read_barrier, PGPROC::procLatch, and SetLatch().

Referenced by shm_mq_receive(), and shm_mq_receive_bytes().

◆ shm_mq_inc_bytes_written()

static void shm_mq_inc_bytes_written ( shm_mq mq,
Size  n 
)
static

Definition at line 1303 of file shm_mq.c.

1304{
1305 /*
1306 * Separate prior reads of mq_ring from the write of mq_bytes_written
1307 * which we're about to do. Pairs with the read barrier found in
1308 * shm_mq_receive_bytes.
1309 */
1311
1312 /*
1313 * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1314 * else can be changing this value. This method avoids taking the bus
1315 * lock unnecessarily.
1316 */
1319}
#define pg_write_barrier()
Definition: atomics.h:157

References shm_mq::mq_bytes_written, pg_atomic_read_u64(), pg_atomic_write_u64(), and pg_write_barrier.

Referenced by shm_mq_detach(), shm_mq_send_bytes(), and shm_mq_sendv().

◆ shm_mq_receive()

shm_mq_result shm_mq_receive ( shm_mq_handle mqh,
Size nbytesp,
void **  datap,
bool  nowait 
)

Definition at line 572 of file shm_mq.c.

573{
574 shm_mq *mq = mqh->mqh_queue;
575 shm_mq_result res;
576 Size rb = 0;
577 Size nbytes;
578 void *rawdata;
579
580 Assert(mq->mq_receiver == MyProc);
581
582 /* We can't receive data until the sender has attached. */
584 {
585 if (nowait)
586 {
587 int counterparty_gone;
588
589 /*
590 * We shouldn't return at this point at all unless the sender
591 * hasn't attached yet. However, the correct return value depends
592 * on whether the sender is still attached. If we first test
593 * whether the sender has ever attached and then test whether the
594 * sender has detached, there's a race condition: a sender that
595 * attaches and detaches very quickly might fool us into thinking
596 * the sender never attached at all. So, test whether our
597 * counterparty is definitively gone first, and only afterwards
598 * check whether the sender ever attached in the first place.
599 */
600 counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
601 if (shm_mq_get_sender(mq) == NULL)
602 {
603 if (counterparty_gone)
604 return SHM_MQ_DETACHED;
605 else
606 return SHM_MQ_WOULD_BLOCK;
607 }
608 }
609 else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
610 && shm_mq_get_sender(mq) == NULL)
611 {
612 mq->mq_detached = true;
613 return SHM_MQ_DETACHED;
614 }
615 mqh->mqh_counterparty_attached = true;
616 }
617
618 /*
619 * If we've consumed an amount of data greater than 1/4th of the ring
620 * size, mark it consumed in shared memory. We try to avoid doing this
621 * unnecessarily when only a small amount of data has been consumed,
622 * because SetLatch() is fairly expensive and we don't want to do it too
623 * often.
624 */
625 if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
626 {
628 mqh->mqh_consume_pending = 0;
629 }
630
631 /* Try to read, or finish reading, the length word from the buffer. */
632 while (!mqh->mqh_length_word_complete)
633 {
634 /* Try to receive the message length word. */
635 Assert(mqh->mqh_partial_bytes < sizeof(Size));
636 res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
637 nowait, &rb, &rawdata);
638 if (res != SHM_MQ_SUCCESS)
639 return res;
640
641 /*
642 * Hopefully, we'll receive the entire message length word at once.
643 * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
644 * multiple reads.
645 */
646 if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
647 {
648 Size needed;
649
650 nbytes = *(Size *) rawdata;
651
652 /* If we've already got the whole message, we're done. */
653 needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
654 if (rb >= needed)
655 {
656 mqh->mqh_consume_pending += needed;
657 *nbytesp = nbytes;
658 *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
659 return SHM_MQ_SUCCESS;
660 }
661
662 /*
663 * We don't have the whole message, but we at least have the whole
664 * length word.
665 */
666 mqh->mqh_expected_bytes = nbytes;
667 mqh->mqh_length_word_complete = true;
668 mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
669 rb -= MAXALIGN(sizeof(Size));
670 }
671 else
672 {
673 Size lengthbytes;
674
675 /* Can't be split unless bigger than required alignment. */
676 Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
677
678 /* Message word is split; need buffer to reassemble. */
679 if (mqh->mqh_buffer == NULL)
680 {
684 }
685 Assert(mqh->mqh_buflen >= sizeof(Size));
686
687 /* Copy partial length word; remember to consume it. */
688 if (mqh->mqh_partial_bytes + rb > sizeof(Size))
689 lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
690 else
691 lengthbytes = rb;
692 memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
693 lengthbytes);
694 mqh->mqh_partial_bytes += lengthbytes;
695 mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
696 rb -= lengthbytes;
697
698 /* If we now have the whole word, we're ready to read payload. */
699 if (mqh->mqh_partial_bytes >= sizeof(Size))
700 {
701 Assert(mqh->mqh_partial_bytes == sizeof(Size));
702 mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
703 mqh->mqh_length_word_complete = true;
704 mqh->mqh_partial_bytes = 0;
705 }
706 }
707 }
708 nbytes = mqh->mqh_expected_bytes;
709
710 /*
711 * Should be disallowed on the sending side already, but better check and
712 * error out on the receiver side as well rather than trying to read a
713 * prohibitively large message.
714 */
715 if (nbytes > MaxAllocSize)
717 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
718 errmsg("invalid message size %zu in shared memory queue",
719 nbytes)));
720
721 if (mqh->mqh_partial_bytes == 0)
722 {
723 /*
724 * Try to obtain the whole message in a single chunk. If this works,
725 * we need not copy the data and can return a pointer directly into
726 * shared memory.
727 */
728 res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
729 if (res != SHM_MQ_SUCCESS)
730 return res;
731 if (rb >= nbytes)
732 {
733 mqh->mqh_length_word_complete = false;
734 mqh->mqh_consume_pending += MAXALIGN(nbytes);
735 *nbytesp = nbytes;
736 *datap = rawdata;
737 return SHM_MQ_SUCCESS;
738 }
739
740 /*
741 * The message has wrapped the buffer. We'll need to copy it in order
742 * to return it to the client in one chunk. First, make sure we have
743 * a large enough buffer available.
744 */
745 if (mqh->mqh_buflen < nbytes)
746 {
747 Size newbuflen;
748
749 /*
750 * Increase size to the next power of 2 that's >= nbytes, but
751 * limit to MaxAllocSize.
752 */
753 newbuflen = pg_nextpower2_size_t(nbytes);
754 newbuflen = Min(newbuflen, MaxAllocSize);
755
756 if (mqh->mqh_buffer != NULL)
757 {
758 pfree(mqh->mqh_buffer);
759 mqh->mqh_buffer = NULL;
760 mqh->mqh_buflen = 0;
761 }
762 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
763 mqh->mqh_buflen = newbuflen;
764 }
765 }
766
767 /* Loop until we've copied the entire message. */
768 for (;;)
769 {
770 Size still_needed;
771
772 /* Copy as much as we can. */
773 Assert(mqh->mqh_partial_bytes + rb <= nbytes);
774 if (rb > 0)
775 {
776 memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
777 mqh->mqh_partial_bytes += rb;
778 }
779
780 /*
781 * Update count of bytes that can be consumed, accounting for
782 * alignment padding. Note that this will never actually insert any
783 * padding except at the end of a message, because the buffer size is
784 * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
785 */
786 Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
787 mqh->mqh_consume_pending += MAXALIGN(rb);
788
789 /* If we got all the data, exit the loop. */
790 if (mqh->mqh_partial_bytes >= nbytes)
791 break;
792
793 /* Wait for some more data. */
794 still_needed = nbytes - mqh->mqh_partial_bytes;
795 res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
796 if (res != SHM_MQ_SUCCESS)
797 return res;
798 if (rb > still_needed)
799 rb = still_needed;
800 }
801
802 /* Return the complete message, and reset for next message. */
803 *nbytesp = nbytes;
804 *datap = mqh->mqh_buffer;
805 mqh->mqh_length_word_complete = false;
806 mqh->mqh_partial_bytes = 0;
807 return SHM_MQ_SUCCESS;
808}
#define Min(x, y)
Definition: c.h:975
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
#define MaxAllocSize
Definition: fe_memutils.h:22
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1181
#define pg_nextpower2_size_t
Definition: pg_bitutils.h:415
#define MQH_INITIAL_BUFSIZE
Definition: shm_mq.c:171
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:257
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1179
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1218
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
Definition: shm_mq.c:1270
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition: shm_mq.c:1079
shm_mq_result
Definition: shm_mq.h:37
@ SHM_MQ_SUCCESS
Definition: shm_mq.h:38
@ SHM_MQ_WOULD_BLOCK
Definition: shm_mq.h:39
@ SHM_MQ_DETACHED
Definition: shm_mq.h:40

References Assert(), ereport, errcode(), errmsg(), ERROR, MAXALIGN, MaxAllocSize, MemoryContextAlloc(), Min, shm_mq::mq_detached, shm_mq::mq_receiver, shm_mq::mq_ring_size, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_expected_bytes, shm_mq_handle::mqh_handle, MQH_INITIAL_BUFSIZE, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, MyProc, pfree(), pg_nextpower2_size_t, shm_mq_counterparty_gone(), SHM_MQ_DETACHED, shm_mq_get_sender(), shm_mq_inc_bytes_read(), shm_mq_receive_bytes(), SHM_MQ_SUCCESS, shm_mq_wait_internal(), and SHM_MQ_WOULD_BLOCK.

Referenced by copy_messages(), LogicalParallelApplyLoop(), ProcessParallelApplyMessages(), ProcessParallelMessages(), test_shm_mq(), test_shm_mq_pipelined(), and TupleQueueReaderNext().

◆ shm_mq_receive_bytes()

static shm_mq_result shm_mq_receive_bytes ( shm_mq_handle mqh,
Size  bytes_needed,
bool  nowait,
Size nbytesp,
void **  datap 
)
static

Definition at line 1079 of file shm_mq.c.

1081{
1082 shm_mq *mq = mqh->mqh_queue;
1083 Size ringsize = mq->mq_ring_size;
1084 uint64 used;
1085 uint64 written;
1086
1087 for (;;)
1088 {
1089 Size offset;
1090 uint64 read;
1091
1092 /* Get bytes written, so we can compute what's available to read. */
1093 written = pg_atomic_read_u64(&mq->mq_bytes_written);
1094
1095 /*
1096 * Get bytes read. Include bytes we could consume but have not yet
1097 * consumed.
1098 */
1101 used = written - read;
1102 Assert(used <= ringsize);
1103 offset = read % (uint64) ringsize;
1104
1105 /* If we have enough data or buffer has wrapped, we're done. */
1106 if (used >= bytes_needed || offset + used >= ringsize)
1107 {
1108 *nbytesp = Min(used, ringsize - offset);
1109 *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1110
1111 /*
1112 * Separate the read of mq_bytes_written, above, from caller's
1113 * attempt to read the data itself. Pairs with the barrier in
1114 * shm_mq_inc_bytes_written.
1115 */
1117 return SHM_MQ_SUCCESS;
1118 }
1119
1120 /*
1121 * Fall out before waiting if the queue has been detached.
1122 *
1123 * Note that we don't check for this until *after* considering whether
1124 * the data already available is enough, since the receiver can finish
1125 * receiving a message stored in the buffer even after the sender has
1126 * detached.
1127 */
1128 if (mq->mq_detached)
1129 {
1130 /*
1131 * If the writer advanced mq_bytes_written and then set
1132 * mq_detached, we might not have read the final value of
1133 * mq_bytes_written above. Insert a read barrier and then check
1134 * again if mq_bytes_written has advanced.
1135 */
1137 if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
1138 continue;
1139
1140 return SHM_MQ_DETACHED;
1141 }
1142
1143 /*
1144 * We didn't get enough data to satisfy the request, so mark any data
1145 * previously-consumed as read to make more buffer space.
1146 */
1147 if (mqh->mqh_consume_pending > 0)
1148 {
1150 mqh->mqh_consume_pending = 0;
1151 }
1152
1153 /* Skip manipulation of our latch if nowait = true. */
1154 if (nowait)
1155 return SHM_MQ_WOULD_BLOCK;
1156
1157 /*
1158 * Wait for our latch to be set. It might already be set for some
1159 * unrelated reason, but that'll just result in one extra trip through
1160 * the loop. It's worth it to avoid resetting the latch at top of
1161 * loop, because setting an already-set latch is much cheaper than
1162 * setting one that has been reset.
1163 */
1165 WAIT_EVENT_MESSAGE_QUEUE_RECEIVE);
1166
1167 /* Reset the latch so we don't spin. */
1169
1170 /* An interrupt may have occurred while we were waiting. */
1172 }
1173}
uint64_t uint64
Definition: c.h:503
struct Latch * MyLatch
Definition: globals.c:62
#define read(a, b, c)
Definition: win32.h:13
void ResetLatch(Latch *latch)
Definition: latch.c:372
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
Definition: shm_mq.c:81
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34

References Assert(), CHECK_FOR_INTERRUPTS, Min, shm_mq::mq_bytes_read, shm_mq::mq_bytes_written, shm_mq::mq_detached, shm_mq::mq_ring, shm_mq::mq_ring_offset, shm_mq::mq_ring_size, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_queue, MyLatch, pg_atomic_read_u64(), pg_read_barrier, read, ResetLatch(), SHM_MQ_DETACHED, shm_mq_inc_bytes_read(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, WaitLatch(), WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by shm_mq_receive().

◆ shm_mq_send()

shm_mq_result shm_mq_send ( shm_mq_handle mqh,
Size  nbytes,
const void *  data,
bool  nowait,
bool  force_flush 
)

Definition at line 329 of file shm_mq.c.

331{
332 shm_mq_iovec iov;
333
334 iov.data = data;
335 iov.len = nbytes;
336
337 return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
338}
const void * data
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool force_flush)
Definition: shm_mq.c:361
const char * data
Definition: shm_mq.h:31
Size len
Definition: shm_mq.h:32

References shm_mq_iovec::data, data, shm_mq_iovec::len, and shm_mq_sendv().

Referenced by copy_messages(), pa_send_data(), test_shm_mq(), test_shm_mq_pipelined(), and tqueueReceiveSlot().

◆ shm_mq_send_bytes()

static shm_mq_result shm_mq_send_bytes ( shm_mq_handle mqh,
Size  nbytes,
const void *  data,
bool  nowait,
Size bytes_written 
)
static

Definition at line 914 of file shm_mq.c.

916{
917 shm_mq *mq = mqh->mqh_queue;
918 Size sent = 0;
919 uint64 used;
920 Size ringsize = mq->mq_ring_size;
921 Size available;
922
923 while (sent < nbytes)
924 {
925 uint64 rb;
926 uint64 wb;
927
928 /* Compute number of ring buffer bytes used and available. */
931 Assert(wb >= rb);
932 used = wb - rb;
933 Assert(used <= ringsize);
934 available = Min(ringsize - used, nbytes - sent);
935
936 /*
937 * Bail out if the queue has been detached. Note that we would be in
938 * trouble if the compiler decided to cache the value of
939 * mq->mq_detached in a register or on the stack across loop
940 * iterations. It probably shouldn't do that anyway since we'll
941 * always return, call an external function that performs a system
942 * call, or reach a memory barrier at some point later in the loop,
943 * but just to be sure, insert a compiler barrier here.
944 */
946 if (mq->mq_detached)
947 {
948 *bytes_written = sent;
949 return SHM_MQ_DETACHED;
950 }
951
952 if (available == 0 && !mqh->mqh_counterparty_attached)
953 {
954 /*
955 * The queue is full, so if the receiver isn't yet known to be
956 * attached, we must wait for that to happen.
957 */
958 if (nowait)
959 {
961 {
962 *bytes_written = sent;
963 return SHM_MQ_DETACHED;
964 }
965 if (shm_mq_get_receiver(mq) == NULL)
966 {
967 *bytes_written = sent;
968 return SHM_MQ_WOULD_BLOCK;
969 }
970 }
971 else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
972 mqh->mqh_handle))
973 {
974 mq->mq_detached = true;
975 *bytes_written = sent;
976 return SHM_MQ_DETACHED;
977 }
978 mqh->mqh_counterparty_attached = true;
979
980 /*
981 * The receiver may have read some data after attaching, so we
982 * must not wait without rechecking the queue state.
983 */
984 }
985 else if (available == 0)
986 {
987 /* Update the pending send bytes in the shared memory. */
989
990 /*
991 * Since mq->mqh_counterparty_attached is known to be true at this
992 * point, mq_receiver has been set, and it can't change once set.
993 * Therefore, we can read it without acquiring the spinlock.
994 */
997
998 /*
999 * We have just updated the mqh_send_pending bytes in the shared
1000 * memory so reset it.
1001 */
1002 mqh->mqh_send_pending = 0;
1003
1004 /* Skip manipulation of our latch if nowait = true. */
1005 if (nowait)
1006 {
1007 *bytes_written = sent;
1008 return SHM_MQ_WOULD_BLOCK;
1009 }
1010
1011 /*
1012 * Wait for our latch to be set. It might already be set for some
1013 * unrelated reason, but that'll just result in one extra trip
1014 * through the loop. It's worth it to avoid resetting the latch
1015 * at top of loop, because setting an already-set latch is much
1016 * cheaper than setting one that has been reset.
1017 */
1019 WAIT_EVENT_MESSAGE_QUEUE_SEND);
1020
1021 /* Reset the latch so we don't spin. */
1023
1024 /* An interrupt may have occurred while we were waiting. */
1026 }
1027 else
1028 {
1029 Size offset;
1030 Size sendnow;
1031
1032 offset = wb % (uint64) ringsize;
1033 sendnow = Min(available, ringsize - offset);
1034
1035 /*
1036 * Write as much data as we can via a single memcpy(). Make sure
1037 * these writes happen after the read of mq_bytes_read, above.
1038 * This barrier pairs with the one in shm_mq_inc_bytes_read.
1039 * (Since we're separating the read of mq_bytes_read from a
1040 * subsequent write to mq_ring, we need a full barrier here.)
1041 */
1043 memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
1044 (char *) data + sent, sendnow);
1045 sent += sendnow;
1046
1047 /*
1048 * Update count of bytes written, with alignment padding. Note
1049 * that this will never actually insert any padding except at the
1050 * end of a run of bytes, because the buffer size is a multiple of
1051 * MAXIMUM_ALIGNOF, and each read is as well.
1052 */
1053 Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
1054
1055 /*
1056 * For efficiency, we don't update the bytes written in the shared
1057 * memory and also don't set the reader's latch here. Refer to
1058 * the comments atop the shm_mq_handle structure for more
1059 * information.
1060 */
1061 mqh->mqh_send_pending += MAXALIGN(sendnow);
1062 }
1063 }
1064
1065 *bytes_written = sent;
1066 return SHM_MQ_SUCCESS;
1067}
#define pg_memory_barrier()
Definition: atomics.h:143
#define pg_compiler_barrier()
Definition: atomics.h:131
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:242

References Assert(), CHECK_FOR_INTERRUPTS, data, MAXALIGN, Min, shm_mq::mq_bytes_read, shm_mq::mq_bytes_written, shm_mq::mq_detached, shm_mq::mq_receiver, shm_mq::mq_ring, shm_mq::mq_ring_offset, shm_mq::mq_ring_size, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_send_pending, MyLatch, pg_atomic_read_u64(), pg_compiler_barrier, pg_memory_barrier, PGPROC::procLatch, ResetLatch(), SetLatch(), shm_mq_counterparty_gone(), SHM_MQ_DETACHED, shm_mq_get_receiver(), shm_mq_inc_bytes_written(), SHM_MQ_SUCCESS, shm_mq_wait_internal(), SHM_MQ_WOULD_BLOCK, WaitLatch(), WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by shm_mq_sendv().

◆ shm_mq_sendv()

shm_mq_result shm_mq_sendv ( shm_mq_handle mqh,
shm_mq_iovec iov,
int  iovcnt,
bool  nowait,
bool  force_flush 
)

Definition at line 361 of file shm_mq.c.

363{
364 shm_mq_result res;
365 shm_mq *mq = mqh->mqh_queue;
366 PGPROC *receiver;
367 Size nbytes = 0;
368 Size bytes_written;
369 int i;
370 int which_iov = 0;
371 Size offset;
372
373 Assert(mq->mq_sender == MyProc);
374
375 /* Compute total size of write. */
376 for (i = 0; i < iovcnt; ++i)
377 nbytes += iov[i].len;
378
379 /* Prevent writing messages overwhelming the receiver. */
380 if (nbytes > MaxAllocSize)
382 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
383 errmsg("cannot send a message of size %zu via shared memory queue",
384 nbytes)));
385
386 /* Try to write, or finish writing, the length word into the buffer. */
387 while (!mqh->mqh_length_word_complete)
388 {
389 Assert(mqh->mqh_partial_bytes < sizeof(Size));
390 res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
391 ((char *) &nbytes) + mqh->mqh_partial_bytes,
392 nowait, &bytes_written);
393
394 if (res == SHM_MQ_DETACHED)
395 {
396 /* Reset state in case caller tries to send another message. */
397 mqh->mqh_partial_bytes = 0;
398 mqh->mqh_length_word_complete = false;
399 return res;
400 }
401 mqh->mqh_partial_bytes += bytes_written;
402
403 if (mqh->mqh_partial_bytes >= sizeof(Size))
404 {
405 Assert(mqh->mqh_partial_bytes == sizeof(Size));
406
407 mqh->mqh_partial_bytes = 0;
408 mqh->mqh_length_word_complete = true;
409 }
410
411 if (res != SHM_MQ_SUCCESS)
412 return res;
413
414 /* Length word can't be split unless bigger than required alignment. */
415 Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
416 }
417
418 /* Write the actual data bytes into the buffer. */
419 Assert(mqh->mqh_partial_bytes <= nbytes);
420 offset = mqh->mqh_partial_bytes;
421 do
422 {
423 Size chunksize;
424
425 /* Figure out which bytes need to be sent next. */
426 if (offset >= iov[which_iov].len)
427 {
428 offset -= iov[which_iov].len;
429 ++which_iov;
430 if (which_iov >= iovcnt)
431 break;
432 continue;
433 }
434
435 /*
436 * We want to avoid copying the data if at all possible, but every
437 * chunk of bytes we write into the queue has to be MAXALIGN'd, except
438 * the last. Thus, if a chunk other than the last one ends on a
439 * non-MAXALIGN'd boundary, we have to combine the tail end of its
440 * data with data from one or more following chunks until we either
441 * reach the last chunk or accumulate a number of bytes which is
442 * MAXALIGN'd.
443 */
444 if (which_iov + 1 < iovcnt &&
445 offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
446 {
447 char tmpbuf[MAXIMUM_ALIGNOF];
448 int j = 0;
449
450 for (;;)
451 {
452 if (offset < iov[which_iov].len)
453 {
454 tmpbuf[j] = iov[which_iov].data[offset];
455 j++;
456 offset++;
457 if (j == MAXIMUM_ALIGNOF)
458 break;
459 }
460 else
461 {
462 offset -= iov[which_iov].len;
463 which_iov++;
464 if (which_iov >= iovcnt)
465 break;
466 }
467 }
468
469 res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
470
471 if (res == SHM_MQ_DETACHED)
472 {
473 /* Reset state in case caller tries to send another message. */
474 mqh->mqh_partial_bytes = 0;
475 mqh->mqh_length_word_complete = false;
476 return res;
477 }
478
479 mqh->mqh_partial_bytes += bytes_written;
480 if (res != SHM_MQ_SUCCESS)
481 return res;
482 continue;
483 }
484
485 /*
486 * If this is the last chunk, we can write all the data, even if it
487 * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
488 * MAXALIGN_DOWN the write size.
489 */
490 chunksize = iov[which_iov].len - offset;
491 if (which_iov + 1 < iovcnt)
492 chunksize = MAXALIGN_DOWN(chunksize);
493 res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
494 nowait, &bytes_written);
495
496 if (res == SHM_MQ_DETACHED)
497 {
498 /* Reset state in case caller tries to send another message. */
499 mqh->mqh_length_word_complete = false;
500 mqh->mqh_partial_bytes = 0;
501 return res;
502 }
503
504 mqh->mqh_partial_bytes += bytes_written;
505 offset += bytes_written;
506 if (res != SHM_MQ_SUCCESS)
507 return res;
508 } while (mqh->mqh_partial_bytes < nbytes);
509
510 /* Reset for next message. */
511 mqh->mqh_partial_bytes = 0;
512 mqh->mqh_length_word_complete = false;
513
514 /* If queue has been detached, let caller know. */
515 if (mq->mq_detached)
516 return SHM_MQ_DETACHED;
517
518 /*
519 * If the counterparty is known to have attached, we can read mq_receiver
520 * without acquiring the spinlock. Otherwise, more caution is needed.
521 */
523 receiver = mq->mq_receiver;
524 else
525 {
527 receiver = mq->mq_receiver;
529 if (receiver != NULL)
530 mqh->mqh_counterparty_attached = true;
531 }
532
533 /*
534 * If the caller has requested force flush or we have written more than
535 * 1/4 of the ring size, mark it as written in shared memory and notify
536 * the receiver.
537 */
538 if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
539 {
541 if (receiver != NULL)
542 SetLatch(&receiver->procLatch);
543 mqh->mqh_send_pending = 0;
544 }
545
546 return SHM_MQ_SUCCESS;
547}
int j
Definition: isn.c:75
int i
Definition: isn.c:74
const void size_t len
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition: shm_mq.c:914
static StringInfoData tmpbuf
Definition: walsender.c:171

References Assert(), shm_mq_iovec::data, data, ereport, errcode(), errmsg(), ERROR, i, j, shm_mq_iovec::len, len, MAXALIGN_DOWN, MaxAllocSize, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_ring_size, shm_mq::mq_sender, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_send_pending, MyProc, PGPROC::procLatch, SetLatch(), SHM_MQ_DETACHED, shm_mq_inc_bytes_written(), shm_mq_send_bytes(), SHM_MQ_SUCCESS, SpinLockAcquire, SpinLockRelease, and tmpbuf.

Referenced by mq_putmessage(), and shm_mq_send().

◆ shm_mq_set_handle()

void shm_mq_set_handle ( shm_mq_handle mqh,
BackgroundWorkerHandle handle 
)

Definition at line 319 of file shm_mq.c.

320{
321 Assert(mqh->mqh_handle == NULL);
322 mqh->mqh_handle = handle;
323}

References Assert(), and shm_mq_handle::mqh_handle.

Referenced by ExecParallelCreateReaders(), and LaunchParallelWorkers().

◆ shm_mq_set_receiver()

void shm_mq_set_receiver ( shm_mq mq,
PGPROC proc 
)

◆ shm_mq_set_sender()

void shm_mq_set_sender ( shm_mq mq,
PGPROC proc 
)

Definition at line 224 of file shm_mq.c.

225{
226 PGPROC *receiver;
227
229 Assert(mq->mq_sender == NULL);
230 mq->mq_sender = proc;
231 receiver = mq->mq_receiver;
233
234 if (receiver != NULL)
235 SetLatch(&receiver->procLatch);
236}

References Assert(), shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by attach_to_queues(), ExecParallelGetReceiver(), pa_setup_dsm(), ParallelApplyWorkerMain(), ParallelWorkerMain(), and setup_dynamic_shared_memory().

◆ shm_mq_wait_for_attach()

shm_mq_result shm_mq_wait_for_attach ( shm_mq_handle mqh)

Definition at line 820 of file shm_mq.c.

821{
822 shm_mq *mq = mqh->mqh_queue;
823 PGPROC **victim;
824
825 if (shm_mq_get_receiver(mq) == MyProc)
826 victim = &mq->mq_sender;
827 else
828 {
830 victim = &mq->mq_receiver;
831 }
832
833 if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
834 return SHM_MQ_SUCCESS;
835 else
836 return SHM_MQ_DETACHED;
837}

References Assert(), shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_queue, MyProc, SHM_MQ_DETACHED, shm_mq_get_receiver(), shm_mq_get_sender(), SHM_MQ_SUCCESS, and shm_mq_wait_internal().

◆ shm_mq_wait_internal()

static bool shm_mq_wait_internal ( shm_mq mq,
PGPROC **  ptr,
BackgroundWorkerHandle handle 
)
static

Definition at line 1218 of file shm_mq.c.

1219{
1220 bool result = false;
1221
1222 for (;;)
1223 {
1224 BgwHandleStatus status;
1225 pid_t pid;
1226
1227 /* Acquire the lock just long enough to check the pointer. */
1229 result = (*ptr != NULL);
1231
1232 /* Fail if detached; else succeed if initialized. */
1233 if (mq->mq_detached)
1234 {
1235 result = false;
1236 break;
1237 }
1238 if (result)
1239 break;
1240
1241 if (handle != NULL)
1242 {
1243 /* Check for unexpected worker death. */
1244 status = GetBackgroundWorkerPid(handle, &pid);
1245 if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1246 {
1247 result = false;
1248 break;
1249 }
1250 }
1251
1252 /* Wait to be signaled. */
1254 WAIT_EVENT_MESSAGE_QUEUE_INTERNAL);
1255
1256 /* Reset the latch so we don't spin. */
1258
1259 /* An interrupt may have occurred while we were waiting. */
1261 }
1262
1263 return result;
1264}

References BGWH_NOT_YET_STARTED, BGWH_STARTED, CHECK_FOR_INTERRUPTS, GetBackgroundWorkerPid(), shm_mq::mq_detached, shm_mq::mq_mutex, MyLatch, ResetLatch(), SpinLockAcquire, SpinLockRelease, WaitLatch(), WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by shm_mq_receive(), shm_mq_send_bytes(), and shm_mq_wait_for_attach().

Variable Documentation

◆ shm_mq_minimum_size

const Size shm_mq_minimum_size
Initial value:
=
MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF

Definition at line 168 of file shm_mq.c.

Referenced by setup_dynamic_shared_memory().