PostgreSQL Source Code git master
Loading...
Searching...
No Matches
shm_mq.c File Reference
#include "postgres.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "port/pg_bitutils.h"
#include "postmaster/bgworker.h"
#include "storage/proc.h"
#include "storage/shm_mq.h"
#include "storage/spin.h"
#include "utils/memutils.h"
Include dependency graph for shm_mq.c:

Go to the source code of this file.

Data Structures

struct  shm_mq
 
struct  shm_mq_handle
 

Macros

#define MQH_INITIAL_BUFSIZE   8192
 

Functions

static void shm_mq_detach_internal (shm_mq *mq)
 
static shm_mq_result shm_mq_send_bytes (shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
 
static shm_mq_result shm_mq_receive_bytes (shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
 
static bool shm_mq_counterparty_gone (shm_mq *mq, BackgroundWorkerHandle *handle)
 
static bool shm_mq_wait_internal (shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
 
static void shm_mq_inc_bytes_read (shm_mq *mq, Size n)
 
static void shm_mq_inc_bytes_written (shm_mq *mq, Size n)
 
static void shm_mq_detach_callback (dsm_segment *seg, Datum arg)
 
shm_mqshm_mq_create (void *address, Size size)
 
void shm_mq_set_receiver (shm_mq *mq, PGPROC *proc)
 
void shm_mq_set_sender (shm_mq *mq, PGPROC *proc)
 
PGPROCshm_mq_get_receiver (shm_mq *mq)
 
PGPROCshm_mq_get_sender (shm_mq *mq)
 
shm_mq_handleshm_mq_attach (shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 
void shm_mq_set_handle (shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
 
shm_mq_result shm_mq_send (shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
 
shm_mq_result shm_mq_sendv (shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool force_flush)
 
shm_mq_result shm_mq_receive (shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 
shm_mq_result shm_mq_wait_for_attach (shm_mq_handle *mqh)
 
void shm_mq_detach (shm_mq_handle *mqh)
 
shm_mqshm_mq_get_queue (shm_mq_handle *mqh)
 

Variables

const Size shm_mq_minimum_size
 

Macro Definition Documentation

◆ MQH_INITIAL_BUFSIZE

#define MQH_INITIAL_BUFSIZE   8192

Definition at line 172 of file shm_mq.c.

Function Documentation

◆ shm_mq_attach()

shm_mq_handle * shm_mq_attach ( shm_mq mq,
dsm_segment seg,
BackgroundWorkerHandle handle 
)

Definition at line 291 of file shm_mq.c.

292{
294
295 Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
296 mqh->mqh_queue = mq;
297 mqh->mqh_segment = seg;
298 mqh->mqh_handle = handle;
299 mqh->mqh_buffer = NULL;
300 mqh->mqh_buflen = 0;
301 mqh->mqh_consume_pending = 0;
302 mqh->mqh_send_pending = 0;
303 mqh->mqh_partial_bytes = 0;
304 mqh->mqh_expected_bytes = 0;
305 mqh->mqh_length_word_complete = false;
306 mqh->mqh_counterparty_attached = false;
307 mqh->mqh_context = CurrentMemoryContext;
308
309 if (seg != NULL)
311
312 return mqh;
313}
#define Assert(condition)
Definition c.h:885
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition dsm.c:1132
#define palloc_object(type)
Definition fe_memutils.h:74
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
static Datum PointerGetDatum(const void *X)
Definition postgres.h:352
static int fb(int x)
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition shm_mq.c:1324
PGPROC * MyProc
Definition proc.c:67

References Assert, CurrentMemoryContext, fb(), MyProc, on_dsm_detach(), palloc_object, PointerGetDatum(), and shm_mq_detach_callback().

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), pa_setup_dsm(), ParallelApplyWorkerMain(), ParallelWorkerMain(), ReinitializeParallelDSM(), and test_shm_mq_setup().

◆ shm_mq_counterparty_gone()

static bool shm_mq_counterparty_gone ( shm_mq mq,
BackgroundWorkerHandle handle 
)
static

Definition at line 1180 of file shm_mq.c.

1181{
1182 pid_t pid;
1183
1184 /* If the queue has been detached, counterparty is definitely gone. */
1185 if (mq->mq_detached)
1186 return true;
1187
1188 /* If there's a handle, check worker status. */
1189 if (handle != NULL)
1190 {
1191 BgwHandleStatus status;
1192
1193 /* Check for unexpected worker death. */
1194 status = GetBackgroundWorkerPid(handle, &pid);
1195 if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1196 {
1197 /* Mark it detached, just to make it official. */
1198 mq->mq_detached = true;
1199 return true;
1200 }
1201 }
1202
1203 /* Counterparty is not definitively gone. */
1204 return false;
1205}
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition bgworker.c:1164
BgwHandleStatus
Definition bgworker.h:111
@ BGWH_STARTED
Definition bgworker.h:112
@ BGWH_NOT_YET_STARTED
Definition bgworker.h:113

References BGWH_NOT_YET_STARTED, BGWH_STARTED, fb(), and GetBackgroundWorkerPid().

Referenced by shm_mq_receive(), and shm_mq_send_bytes().

◆ shm_mq_create()

shm_mq * shm_mq_create ( void address,
Size  size 
)

Definition at line 178 of file shm_mq.c.

179{
180 shm_mq *mq = address;
182
183 /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
184 size = MAXALIGN_DOWN(size);
185
186 /* Queue size must be large enough to hold some data. */
187 Assert(size > data_offset);
188
189 /* Initialize queue header. */
190 SpinLockInit(&mq->mq_mutex);
191 mq->mq_receiver = NULL;
192 mq->mq_sender = NULL;
193 pg_atomic_init_u64(&mq->mq_bytes_read, 0);
194 pg_atomic_init_u64(&mq->mq_bytes_written, 0);
195 mq->mq_ring_size = size - data_offset;
196 mq->mq_detached = false;
197 mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
198
199 return mq;
200}
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:453
#define MAXALIGN_DOWN(LEN)
Definition c.h:850
#define MAXALIGN(LEN)
Definition c.h:838
size_t Size
Definition c.h:631
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50

References Assert, fb(), MAXALIGN, MAXALIGN_DOWN, pg_atomic_init_u64(), and SpinLockInit().

Referenced by ExecParallelSetupTupleQueues(), InitializeParallelDSM(), pa_setup_dsm(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

◆ shm_mq_detach()

void shm_mq_detach ( shm_mq_handle mqh)

Definition at line 844 of file shm_mq.c.

845{
846 /* Before detaching, notify the receiver about any already-written data. */
847 if (mqh->mqh_send_pending > 0)
848 {
849 shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending);
850 mqh->mqh_send_pending = 0;
851 }
852
853 /* Notify counterparty that we're outta here. */
854 shm_mq_detach_internal(mqh->mqh_queue);
855
856 /* Cancel on_dsm_detach callback, if any. */
857 if (mqh->mqh_segment)
858 cancel_on_dsm_detach(mqh->mqh_segment,
860 PointerGetDatum(mqh->mqh_queue));
861
862 /* Release local memory associated with handle. */
863 if (mqh->mqh_buffer != NULL)
864 pfree(mqh->mqh_buffer);
865 pfree(mqh);
866}
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition dsm.c:1147
void pfree(void *pointer)
Definition mcxt.c:1616
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
Definition shm_mq.c:1304
static void shm_mq_detach_internal(shm_mq *mq)
Definition shm_mq.c:883

References cancel_on_dsm_detach(), fb(), pfree(), PointerGetDatum(), shm_mq_detach_callback(), shm_mq_detach_internal(), and shm_mq_inc_bytes_written().

Referenced by DestroyParallelContext(), ExecParallelFinish(), LaunchParallelWorkers(), logicalrep_pa_worker_stop(), mq_putmessage(), pa_detach_all_error_mq(), pa_free_worker_info(), ProcessParallelMessage(), tqueueDestroyReceiver(), and tqueueShutdownReceiver().

◆ shm_mq_detach_callback()

static void shm_mq_detach_callback ( dsm_segment seg,
Datum  arg 
)
static

Definition at line 1324 of file shm_mq.c.

1325{
1327
1329}
Datum arg
Definition elog.c:1322
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:342

References arg, DatumGetPointer(), fb(), and shm_mq_detach_internal().

Referenced by shm_mq_attach(), and shm_mq_detach().

◆ shm_mq_detach_internal()

static void shm_mq_detach_internal ( shm_mq mq)
static

Definition at line 883 of file shm_mq.c.

884{
885 PGPROC *victim;
886
887 SpinLockAcquire(&mq->mq_mutex);
888 if (mq->mq_sender == MyProc)
889 victim = mq->mq_receiver;
890 else
891 {
892 Assert(mq->mq_receiver == MyProc);
893 victim = mq->mq_sender;
894 }
895 mq->mq_detached = true;
896 SpinLockRelease(&mq->mq_mutex);
897
898 if (victim != NULL)
899 SetLatch(&victim->procLatch);
900}
void SetLatch(Latch *latch)
Definition latch.c:290
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
Definition proc.h:176

References Assert, fb(), MyProc, SetLatch(), SpinLockAcquire(), and SpinLockRelease().

Referenced by shm_mq_detach(), and shm_mq_detach_callback().

◆ shm_mq_get_queue()

shm_mq * shm_mq_get_queue ( shm_mq_handle mqh)

Definition at line 906 of file shm_mq.c.

907{
908 return mqh->mqh_queue;
909}

References fb().

Referenced by WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

◆ shm_mq_get_receiver()

PGPROC * shm_mq_get_receiver ( shm_mq mq)

Definition at line 243 of file shm_mq.c.

244{
246
247 SpinLockAcquire(&mq->mq_mutex);
248 receiver = mq->mq_receiver;
249 SpinLockRelease(&mq->mq_mutex);
250
251 return receiver;
252}

References fb(), SpinLockAcquire(), and SpinLockRelease().

Referenced by shm_mq_send_bytes(), and shm_mq_wait_for_attach().

◆ shm_mq_get_sender()

PGPROC * shm_mq_get_sender ( shm_mq mq)

Definition at line 258 of file shm_mq.c.

259{
260 PGPROC *sender;
261
262 SpinLockAcquire(&mq->mq_mutex);
263 sender = mq->mq_sender;
264 SpinLockRelease(&mq->mq_mutex);
265
266 return sender;
267}

References fb(), SpinLockAcquire(), and SpinLockRelease().

Referenced by shm_mq_receive(), shm_mq_wait_for_attach(), WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

◆ shm_mq_inc_bytes_read()

static void shm_mq_inc_bytes_read ( shm_mq mq,
Size  n 
)
static

Definition at line 1271 of file shm_mq.c.

1272{
1273 PGPROC *sender;
1274
1275 /*
1276 * Separate prior reads of mq_ring from the increment of mq_bytes_read
1277 * which follows. This pairs with the full barrier in
1278 * shm_mq_send_bytes(). We only need a read barrier here because the
1279 * increment of mq_bytes_read is actually a read followed by a dependent
1280 * write.
1281 */
1283
1284 /*
1285 * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1286 * else can be changing this value. This method should be cheaper.
1287 */
1288 pg_atomic_write_u64(&mq->mq_bytes_read,
1289 pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1290
1291 /*
1292 * We shouldn't have any bytes to read without a sender, so we can read
1293 * mq_sender here without a lock. Once it's initialized, it can't change.
1294 */
1295 sender = mq->mq_sender;
1296 Assert(sender != NULL);
1297 SetLatch(&sender->procLatch);
1298}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:485
#define pg_read_barrier()
Definition atomics.h:154
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition atomics.h:467

References Assert, fb(), pg_atomic_read_u64(), pg_atomic_write_u64(), pg_read_barrier, and SetLatch().

Referenced by shm_mq_receive(), and shm_mq_receive_bytes().

◆ shm_mq_inc_bytes_written()

static void shm_mq_inc_bytes_written ( shm_mq mq,
Size  n 
)
static

Definition at line 1304 of file shm_mq.c.

1305{
1306 /*
1307 * Separate prior reads of mq_ring from the write of mq_bytes_written
1308 * which we're about to do. Pairs with the read barrier found in
1309 * shm_mq_receive_bytes.
1310 */
1312
1313 /*
1314 * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1315 * else can be changing this value. This method avoids taking the bus
1316 * lock unnecessarily.
1317 */
1318 pg_atomic_write_u64(&mq->mq_bytes_written,
1319 pg_atomic_read_u64(&mq->mq_bytes_written) + n);
1320}
#define pg_write_barrier()
Definition atomics.h:155

References fb(), pg_atomic_read_u64(), pg_atomic_write_u64(), and pg_write_barrier.

Referenced by shm_mq_detach(), shm_mq_send_bytes(), and shm_mq_sendv().

◆ shm_mq_receive()

shm_mq_result shm_mq_receive ( shm_mq_handle mqh,
Size nbytesp,
void **  datap,
bool  nowait 
)

Definition at line 573 of file shm_mq.c.

574{
575 shm_mq *mq = mqh->mqh_queue;
576 shm_mq_result res;
577 Size rb = 0;
578 Size nbytes;
579 void *rawdata;
580
581 Assert(mq->mq_receiver == MyProc);
582
583 /* We can't receive data until the sender has attached. */
584 if (!mqh->mqh_counterparty_attached)
585 {
586 if (nowait)
587 {
589
590 /*
591 * We shouldn't return at this point at all unless the sender
592 * hasn't attached yet. However, the correct return value depends
593 * on whether the sender is still attached. If we first test
594 * whether the sender has ever attached and then test whether the
595 * sender has detached, there's a race condition: a sender that
596 * attaches and detaches very quickly might fool us into thinking
597 * the sender never attached at all. So, test whether our
598 * counterparty is definitively gone first, and only afterwards
599 * check whether the sender ever attached in the first place.
600 */
602 if (shm_mq_get_sender(mq) == NULL)
603 {
605 return SHM_MQ_DETACHED;
606 else
607 return SHM_MQ_WOULD_BLOCK;
608 }
609 }
610 else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
612 {
613 mq->mq_detached = true;
614 return SHM_MQ_DETACHED;
615 }
616 mqh->mqh_counterparty_attached = true;
617 }
618
619 /*
620 * If we've consumed an amount of data greater than 1/4th of the ring
621 * size, mark it consumed in shared memory. We try to avoid doing this
622 * unnecessarily when only a small amount of data has been consumed,
623 * because SetLatch() is fairly expensive and we don't want to do it too
624 * often.
625 */
626 if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
627 {
628 shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
629 mqh->mqh_consume_pending = 0;
630 }
631
632 /* Try to read, or finish reading, the length word from the buffer. */
633 while (!mqh->mqh_length_word_complete)
634 {
635 /* Try to receive the message length word. */
636 Assert(mqh->mqh_partial_bytes < sizeof(Size));
637 res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
638 nowait, &rb, &rawdata);
639 if (res != SHM_MQ_SUCCESS)
640 return res;
641
642 /*
643 * Hopefully, we'll receive the entire message length word at once.
644 * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
645 * multiple reads.
646 */
647 if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
648 {
649 Size needed;
650
651 nbytes = *(Size *) rawdata;
652
653 /* If we've already got the whole message, we're done. */
654 needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
655 if (rb >= needed)
656 {
657 mqh->mqh_consume_pending += needed;
658 *nbytesp = nbytes;
659 *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
660 return SHM_MQ_SUCCESS;
661 }
662
663 /*
664 * We don't have the whole message, but we at least have the whole
665 * length word.
666 */
667 mqh->mqh_expected_bytes = nbytes;
668 mqh->mqh_length_word_complete = true;
669 mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
670 rb -= MAXALIGN(sizeof(Size));
671 }
672 else
673 {
675
676 /* Can't be split unless bigger than required alignment. */
677 Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
678
679 /* Message word is split; need buffer to reassemble. */
680 if (mqh->mqh_buffer == NULL)
681 {
682 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
684 mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
685 }
686 Assert(mqh->mqh_buflen >= sizeof(Size));
687
688 /* Copy partial length word; remember to consume it. */
689 if (mqh->mqh_partial_bytes + rb > sizeof(Size))
690 lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
691 else
692 lengthbytes = rb;
693 memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
695 mqh->mqh_partial_bytes += lengthbytes;
696 mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
697 rb -= lengthbytes;
698
699 /* If we now have the whole word, we're ready to read payload. */
700 if (mqh->mqh_partial_bytes >= sizeof(Size))
701 {
702 Assert(mqh->mqh_partial_bytes == sizeof(Size));
703 mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
704 mqh->mqh_length_word_complete = true;
705 mqh->mqh_partial_bytes = 0;
706 }
707 }
708 }
709 nbytes = mqh->mqh_expected_bytes;
710
711 /*
712 * Should be disallowed on the sending side already, but better check and
713 * error out on the receiver side as well rather than trying to read a
714 * prohibitively large message.
715 */
716 if (nbytes > MaxAllocSize)
719 errmsg("invalid message size %zu in shared memory queue",
720 nbytes)));
721
722 if (mqh->mqh_partial_bytes == 0)
723 {
724 /*
725 * Try to obtain the whole message in a single chunk. If this works,
726 * we need not copy the data and can return a pointer directly into
727 * shared memory.
728 */
729 res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
730 if (res != SHM_MQ_SUCCESS)
731 return res;
732 if (rb >= nbytes)
733 {
734 mqh->mqh_length_word_complete = false;
735 mqh->mqh_consume_pending += MAXALIGN(nbytes);
736 *nbytesp = nbytes;
737 *datap = rawdata;
738 return SHM_MQ_SUCCESS;
739 }
740
741 /*
742 * The message has wrapped the buffer. We'll need to copy it in order
743 * to return it to the client in one chunk. First, make sure we have
744 * a large enough buffer available.
745 */
746 if (mqh->mqh_buflen < nbytes)
747 {
749
750 /*
751 * Increase size to the next power of 2 that's >= nbytes, but
752 * limit to MaxAllocSize.
753 */
756
757 if (mqh->mqh_buffer != NULL)
758 {
759 pfree(mqh->mqh_buffer);
760 mqh->mqh_buffer = NULL;
761 mqh->mqh_buflen = 0;
762 }
763 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
764 mqh->mqh_buflen = newbuflen;
765 }
766 }
767
768 /* Loop until we've copied the entire message. */
769 for (;;)
770 {
772
773 /* Copy as much as we can. */
774 Assert(mqh->mqh_partial_bytes + rb <= nbytes);
775 if (rb > 0)
776 {
777 memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
778 mqh->mqh_partial_bytes += rb;
779 }
780
781 /*
782 * Update count of bytes that can be consumed, accounting for
783 * alignment padding. Note that this will never actually insert any
784 * padding except at the end of a message, because the buffer size is
785 * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
786 */
787 Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
788 mqh->mqh_consume_pending += MAXALIGN(rb);
789
790 /* If we got all the data, exit the loop. */
791 if (mqh->mqh_partial_bytes >= nbytes)
792 break;
793
794 /* Wait for some more data. */
795 still_needed = nbytes - mqh->mqh_partial_bytes;
796 res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
797 if (res != SHM_MQ_SUCCESS)
798 return res;
799 if (rb > still_needed)
801 }
802
803 /* Return the complete message, and reset for next message. */
804 *nbytesp = nbytes;
805 *datap = mqh->mqh_buffer;
806 mqh->mqh_length_word_complete = false;
807 mqh->mqh_partial_bytes = 0;
808 return SHM_MQ_SUCCESS;
809}
#define Min(x, y)
Definition c.h:1019
int errcode(int sqlerrcode)
Definition elog.c:874
int errmsg(const char *fmt,...)
Definition elog.c:1093
#define ERROR
Definition elog.h:39
#define ereport(elevel,...)
Definition elog.h:150
#define MaxAllocSize
Definition fe_memutils.h:22
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1232
#define pg_nextpower2_size_t
#define MQH_INITIAL_BUFSIZE
Definition shm_mq.c:172
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition shm_mq.c:258
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
Definition shm_mq.c:1180
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition shm_mq.c:1219
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
Definition shm_mq.c:1271
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition shm_mq.c:1080
shm_mq_result
Definition shm_mq.h:39
@ SHM_MQ_SUCCESS
Definition shm_mq.h:40
@ SHM_MQ_WOULD_BLOCK
Definition shm_mq.h:41
@ SHM_MQ_DETACHED
Definition shm_mq.h:42

References Assert, ereport, errcode(), errmsg(), ERROR, fb(), MAXALIGN, MaxAllocSize, MemoryContextAlloc(), Min, MQH_INITIAL_BUFSIZE, MyProc, pfree(), pg_nextpower2_size_t, shm_mq_counterparty_gone(), SHM_MQ_DETACHED, shm_mq_get_sender(), shm_mq_inc_bytes_read(), shm_mq_receive_bytes(), SHM_MQ_SUCCESS, shm_mq_wait_internal(), and SHM_MQ_WOULD_BLOCK.

Referenced by copy_messages(), LogicalParallelApplyLoop(), ProcessParallelApplyMessages(), ProcessParallelMessages(), test_shm_mq(), test_shm_mq_pipelined(), and TupleQueueReaderNext().

◆ shm_mq_receive_bytes()

static shm_mq_result shm_mq_receive_bytes ( shm_mq_handle mqh,
Size  bytes_needed,
bool  nowait,
Size nbytesp,
void **  datap 
)
static

Definition at line 1080 of file shm_mq.c.

1082{
1083 shm_mq *mq = mqh->mqh_queue;
1085 uint64 used;
1087
1088 for (;;)
1089 {
1090 Size offset;
1091 uint64 read;
1092
1093 /* Get bytes written, so we can compute what's available to read. */
1094 written = pg_atomic_read_u64(&mq->mq_bytes_written);
1095
1096 /*
1097 * Get bytes read. Include bytes we could consume but have not yet
1098 * consumed.
1099 */
1100 read = pg_atomic_read_u64(&mq->mq_bytes_read) +
1101 mqh->mqh_consume_pending;
1102 used = written - read;
1103 Assert(used <= ringsize);
1104 offset = read % (uint64) ringsize;
1105
1106 /* If we have enough data or buffer has wrapped, we're done. */
1107 if (used >= bytes_needed || offset + used >= ringsize)
1108 {
1109 *nbytesp = Min(used, ringsize - offset);
1110 *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1111
1112 /*
1113 * Separate the read of mq_bytes_written, above, from caller's
1114 * attempt to read the data itself. Pairs with the barrier in
1115 * shm_mq_inc_bytes_written.
1116 */
1118 return SHM_MQ_SUCCESS;
1119 }
1120
1121 /*
1122 * Fall out before waiting if the queue has been detached.
1123 *
1124 * Note that we don't check for this until *after* considering whether
1125 * the data already available is enough, since the receiver can finish
1126 * receiving a message stored in the buffer even after the sender has
1127 * detached.
1128 */
1129 if (mq->mq_detached)
1130 {
1131 /*
1132 * If the writer advanced mq_bytes_written and then set
1133 * mq_detached, we might not have read the final value of
1134 * mq_bytes_written above. Insert a read barrier and then check
1135 * again if mq_bytes_written has advanced.
1136 */
1138 if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
1139 continue;
1140
1141 return SHM_MQ_DETACHED;
1142 }
1143
1144 /*
1145 * We didn't get enough data to satisfy the request, so mark any data
1146 * previously-consumed as read to make more buffer space.
1147 */
1148 if (mqh->mqh_consume_pending > 0)
1149 {
1150 shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
1151 mqh->mqh_consume_pending = 0;
1152 }
1153
1154 /* Skip manipulation of our latch if nowait = true. */
1155 if (nowait)
1156 return SHM_MQ_WOULD_BLOCK;
1157
1158 /*
1159 * Wait for our latch to be set. It might already be set for some
1160 * unrelated reason, but that'll just result in one extra trip through
1161 * the loop. It's worth it to avoid resetting the latch at top of
1162 * loop, because setting an already-set latch is much cheaper than
1163 * setting one that has been reset.
1164 */
1167
1168 /* Reset the latch so we don't spin. */
1170
1171 /* An interrupt may have occurred while we were waiting. */
1173 }
1174}
uint64_t uint64
Definition c.h:559
struct Latch * MyLatch
Definition globals.c:63
#define read(a, b, c)
Definition win32.h:13
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
Size mq_ring_size
Definition shm_mq.c:79
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET

References Assert, CHECK_FOR_INTERRUPTS, fb(), Min, shm_mq::mq_ring_size, MyLatch, pg_atomic_read_u64(), pg_read_barrier, read, ResetLatch(), SHM_MQ_DETACHED, shm_mq_inc_bytes_read(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, WaitLatch(), WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by shm_mq_receive().

◆ shm_mq_send()

shm_mq_result shm_mq_send ( shm_mq_handle mqh,
Size  nbytes,
const void data,
bool  nowait,
bool  force_flush 
)

Definition at line 330 of file shm_mq.c.

332{
334
335 iov.data = data;
336 iov.len = nbytes;
337
338 return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
339}
const void * data
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool force_flush)
Definition shm_mq.c:362
const char * data
Definition shm_mq.h:33

References shm_mq_iovec::data, data, fb(), and shm_mq_sendv().

Referenced by copy_messages(), pa_send_data(), test_shm_mq(), test_shm_mq_pipelined(), and tqueueReceiveSlot().

◆ shm_mq_send_bytes()

static shm_mq_result shm_mq_send_bytes ( shm_mq_handle mqh,
Size  nbytes,
const void data,
bool  nowait,
Size bytes_written 
)
static

Definition at line 915 of file shm_mq.c.

917{
918 shm_mq *mq = mqh->mqh_queue;
919 Size sent = 0;
920 uint64 used;
923
924 while (sent < nbytes)
925 {
926 uint64 rb;
927 uint64 wb;
928
929 /* Compute number of ring buffer bytes used and available. */
930 rb = pg_atomic_read_u64(&mq->mq_bytes_read);
931 wb = pg_atomic_read_u64(&mq->mq_bytes_written) + mqh->mqh_send_pending;
932 Assert(wb >= rb);
933 used = wb - rb;
934 Assert(used <= ringsize);
935 available = Min(ringsize - used, nbytes - sent);
936
937 /*
938 * Bail out if the queue has been detached. Note that we would be in
939 * trouble if the compiler decided to cache the value of
940 * mq->mq_detached in a register or on the stack across loop
941 * iterations. It probably shouldn't do that anyway since we'll
942 * always return, call an external function that performs a system
943 * call, or reach a memory barrier at some point later in the loop,
944 * but just to be sure, insert a compiler barrier here.
945 */
947 if (mq->mq_detached)
948 {
950 return SHM_MQ_DETACHED;
951 }
952
953 if (available == 0 && !mqh->mqh_counterparty_attached)
954 {
955 /*
956 * The queue is full, so if the receiver isn't yet known to be
957 * attached, we must wait for that to happen.
958 */
959 if (nowait)
960 {
961 if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
962 {
964 return SHM_MQ_DETACHED;
965 }
967 {
969 return SHM_MQ_WOULD_BLOCK;
970 }
971 }
972 else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
973 mqh->mqh_handle))
974 {
975 mq->mq_detached = true;
977 return SHM_MQ_DETACHED;
978 }
979 mqh->mqh_counterparty_attached = true;
980
981 /*
982 * The receiver may have read some data after attaching, so we
983 * must not wait without rechecking the queue state.
984 */
985 }
986 else if (available == 0)
987 {
988 /* Update the pending send bytes in the shared memory. */
989 shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
990
991 /*
992 * Since mq->mqh_counterparty_attached is known to be true at this
993 * point, mq_receiver has been set, and it can't change once set.
994 * Therefore, we can read it without acquiring the spinlock.
995 */
996 Assert(mqh->mqh_counterparty_attached);
997 SetLatch(&mq->mq_receiver->procLatch);
998
999 /*
1000 * We have just updated the mqh_send_pending bytes in the shared
1001 * memory so reset it.
1002 */
1003 mqh->mqh_send_pending = 0;
1004
1005 /* Skip manipulation of our latch if nowait = true. */
1006 if (nowait)
1007 {
1009 return SHM_MQ_WOULD_BLOCK;
1010 }
1011
1012 /*
1013 * Wait for our latch to be set. It might already be set for some
1014 * unrelated reason, but that'll just result in one extra trip
1015 * through the loop. It's worth it to avoid resetting the latch
1016 * at top of loop, because setting an already-set latch is much
1017 * cheaper than setting one that has been reset.
1018 */
1021
1022 /* Reset the latch so we don't spin. */
1024
1025 /* An interrupt may have occurred while we were waiting. */
1027 }
1028 else
1029 {
1030 Size offset;
1031 Size sendnow;
1032
1033 offset = wb % (uint64) ringsize;
1034 sendnow = Min(available, ringsize - offset);
1035
1036 /*
1037 * Write as much data as we can via a single memcpy(). Make sure
1038 * these writes happen after the read of mq_bytes_read, above.
1039 * This barrier pairs with the one in shm_mq_inc_bytes_read.
1040 * (Since we're separating the read of mq_bytes_read from a
1041 * subsequent write to mq_ring, we need a full barrier here.)
1042 */
1044 memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
1045 (const char *) data + sent, sendnow);
1046 sent += sendnow;
1047
1048 /*
1049 * Update count of bytes written, with alignment padding. Note
1050 * that this will never actually insert any padding except at the
1051 * end of a run of bytes, because the buffer size is a multiple of
1052 * MAXIMUM_ALIGNOF, and each read is as well.
1053 */
1054 Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
1055
1056 /*
1057 * For efficiency, we don't update the bytes written in the shared
1058 * memory and also don't set the reader's latch here. Refer to
1059 * the comments atop the shm_mq_handle structure for more
1060 * information.
1061 */
1062 mqh->mqh_send_pending += MAXALIGN(sendnow);
1063 }
1064 }
1065
1067 return SHM_MQ_SUCCESS;
1068}
#define pg_memory_barrier()
Definition atomics.h:141
#define pg_compiler_barrier()
Definition atomics.h:129
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition shm_mq.c:243

References Assert, CHECK_FOR_INTERRUPTS, data, fb(), MAXALIGN, Min, shm_mq::mq_ring_size, MyLatch, pg_atomic_read_u64(), pg_compiler_barrier, pg_memory_barrier, ResetLatch(), SetLatch(), shm_mq_counterparty_gone(), SHM_MQ_DETACHED, shm_mq_get_receiver(), shm_mq_inc_bytes_written(), SHM_MQ_SUCCESS, shm_mq_wait_internal(), SHM_MQ_WOULD_BLOCK, WaitLatch(), WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by shm_mq_sendv().

◆ shm_mq_sendv()

shm_mq_result shm_mq_sendv ( shm_mq_handle mqh,
shm_mq_iovec iov,
int  iovcnt,
bool  nowait,
bool  force_flush 
)

Definition at line 362 of file shm_mq.c.

364{
365 shm_mq_result res;
366 shm_mq *mq = mqh->mqh_queue;
368 Size nbytes = 0;
370 int i;
371 int which_iov = 0;
372 Size offset;
373
374 Assert(mq->mq_sender == MyProc);
375
376 /* Compute total size of write. */
377 for (i = 0; i < iovcnt; ++i)
378 nbytes += iov[i].len;
379
380 /* Prevent writing messages overwhelming the receiver. */
381 if (nbytes > MaxAllocSize)
384 errmsg("cannot send a message of size %zu via shared memory queue",
385 nbytes)));
386
387 /* Try to write, or finish writing, the length word into the buffer. */
388 while (!mqh->mqh_length_word_complete)
389 {
390 Assert(mqh->mqh_partial_bytes < sizeof(Size));
391 res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
392 ((char *) &nbytes) + mqh->mqh_partial_bytes,
393 nowait, &bytes_written);
394
395 if (res == SHM_MQ_DETACHED)
396 {
397 /* Reset state in case caller tries to send another message. */
398 mqh->mqh_partial_bytes = 0;
399 mqh->mqh_length_word_complete = false;
400 return res;
401 }
402 mqh->mqh_partial_bytes += bytes_written;
403
404 if (mqh->mqh_partial_bytes >= sizeof(Size))
405 {
406 Assert(mqh->mqh_partial_bytes == sizeof(Size));
407
408 mqh->mqh_partial_bytes = 0;
409 mqh->mqh_length_word_complete = true;
410 }
411
412 if (res != SHM_MQ_SUCCESS)
413 return res;
414
415 /* Length word can't be split unless bigger than required alignment. */
416 Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
417 }
418
419 /* Write the actual data bytes into the buffer. */
420 Assert(mqh->mqh_partial_bytes <= nbytes);
421 offset = mqh->mqh_partial_bytes;
422 do
423 {
425
426 /* Figure out which bytes need to be sent next. */
427 if (offset >= iov[which_iov].len)
428 {
429 offset -= iov[which_iov].len;
430 ++which_iov;
431 if (which_iov >= iovcnt)
432 break;
433 continue;
434 }
435
436 /*
437 * We want to avoid copying the data if at all possible, but every
438 * chunk of bytes we write into the queue has to be MAXALIGN'd, except
439 * the last. Thus, if a chunk other than the last one ends on a
440 * non-MAXALIGN'd boundary, we have to combine the tail end of its
441 * data with data from one or more following chunks until we either
442 * reach the last chunk or accumulate a number of bytes which is
443 * MAXALIGN'd.
444 */
445 if (which_iov + 1 < iovcnt &&
446 offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
447 {
449 int j = 0;
450
451 for (;;)
452 {
453 if (offset < iov[which_iov].len)
454 {
455 tmpbuf[j] = iov[which_iov].data[offset];
456 j++;
457 offset++;
458 if (j == MAXIMUM_ALIGNOF)
459 break;
460 }
461 else
462 {
463 offset -= iov[which_iov].len;
464 which_iov++;
465 if (which_iov >= iovcnt)
466 break;
467 }
468 }
469
470 res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
471
472 if (res == SHM_MQ_DETACHED)
473 {
474 /* Reset state in case caller tries to send another message. */
475 mqh->mqh_partial_bytes = 0;
476 mqh->mqh_length_word_complete = false;
477 return res;
478 }
479
480 mqh->mqh_partial_bytes += bytes_written;
481 if (res != SHM_MQ_SUCCESS)
482 return res;
483 continue;
484 }
485
486 /*
487 * If this is the last chunk, we can write all the data, even if it
488 * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
489 * MAXALIGN_DOWN the write size.
490 */
491 chunksize = iov[which_iov].len - offset;
492 if (which_iov + 1 < iovcnt)
495 nowait, &bytes_written);
496
497 if (res == SHM_MQ_DETACHED)
498 {
499 /* Reset state in case caller tries to send another message. */
500 mqh->mqh_length_word_complete = false;
501 mqh->mqh_partial_bytes = 0;
502 return res;
503 }
504
505 mqh->mqh_partial_bytes += bytes_written;
506 offset += bytes_written;
507 if (res != SHM_MQ_SUCCESS)
508 return res;
509 } while (mqh->mqh_partial_bytes < nbytes);
510
511 /* Reset for next message. */
512 mqh->mqh_partial_bytes = 0;
513 mqh->mqh_length_word_complete = false;
514
515 /* If queue has been detached, let caller know. */
516 if (mq->mq_detached)
517 return SHM_MQ_DETACHED;
518
519 /*
520 * If the counterparty is known to have attached, we can read mq_receiver
521 * without acquiring the spinlock. Otherwise, more caution is needed.
522 */
523 if (mqh->mqh_counterparty_attached)
524 receiver = mq->mq_receiver;
525 else
526 {
527 SpinLockAcquire(&mq->mq_mutex);
528 receiver = mq->mq_receiver;
529 SpinLockRelease(&mq->mq_mutex);
530 if (receiver != NULL)
531 mqh->mqh_counterparty_attached = true;
532 }
533
534 /*
535 * If the caller has requested force flush or we have written more than
536 * 1/4 of the ring size, mark it as written in shared memory and notify
537 * the receiver.
538 */
539 if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
540 {
541 shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
542 if (receiver != NULL)
543 SetLatch(&receiver->procLatch);
544 mqh->mqh_send_pending = 0;
545 }
546
547 return SHM_MQ_SUCCESS;
548}
int j
Definition isn.c:78
int i
Definition isn.c:77
const void size_t len
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition shm_mq.c:915
static StringInfoData tmpbuf
Definition walsender.c:178

References Assert, StringInfoData::data, data, ereport, errcode(), errmsg(), ERROR, fb(), i, j, len, MAXALIGN_DOWN, MaxAllocSize, MyProc, SetLatch(), SHM_MQ_DETACHED, shm_mq_inc_bytes_written(), shm_mq_send_bytes(), SHM_MQ_SUCCESS, SpinLockAcquire(), SpinLockRelease(), and tmpbuf.

Referenced by mq_putmessage(), and shm_mq_send().

◆ shm_mq_set_handle()

void shm_mq_set_handle ( shm_mq_handle mqh,
BackgroundWorkerHandle handle 
)

Definition at line 320 of file shm_mq.c.

321{
322 Assert(mqh->mqh_handle == NULL);
323 mqh->mqh_handle = handle;
324}

References Assert, and fb().

Referenced by ExecParallelCreateReaders(), and LaunchParallelWorkers().

◆ shm_mq_set_receiver()

void shm_mq_set_receiver ( shm_mq mq,
PGPROC proc 
)

Definition at line 207 of file shm_mq.c.

208{
209 PGPROC *sender;
210
211 SpinLockAcquire(&mq->mq_mutex);
212 Assert(mq->mq_receiver == NULL);
213 mq->mq_receiver = proc;
214 sender = mq->mq_sender;
215 SpinLockRelease(&mq->mq_mutex);
216
217 if (sender != NULL)
218 SetLatch(&sender->procLatch);
219}

References Assert, fb(), SetLatch(), SpinLockAcquire(), and SpinLockRelease().

Referenced by attach_to_queues(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), pa_setup_dsm(), ParallelApplyWorkerMain(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

◆ shm_mq_set_sender()

void shm_mq_set_sender ( shm_mq mq,
PGPROC proc 
)

Definition at line 225 of file shm_mq.c.

226{
228
229 SpinLockAcquire(&mq->mq_mutex);
230 Assert(mq->mq_sender == NULL);
231 mq->mq_sender = proc;
232 receiver = mq->mq_receiver;
233 SpinLockRelease(&mq->mq_mutex);
234
235 if (receiver != NULL)
236 SetLatch(&receiver->procLatch);
237}

References Assert, fb(), SetLatch(), SpinLockAcquire(), and SpinLockRelease().

Referenced by attach_to_queues(), ExecParallelGetReceiver(), pa_setup_dsm(), ParallelApplyWorkerMain(), ParallelWorkerMain(), and setup_dynamic_shared_memory().

◆ shm_mq_wait_for_attach()

shm_mq_result shm_mq_wait_for_attach ( shm_mq_handle mqh)

Definition at line 821 of file shm_mq.c.

822{
823 shm_mq *mq = mqh->mqh_queue;
824 PGPROC **victim;
825
827 victim = &mq->mq_sender;
828 else
829 {
831 victim = &mq->mq_receiver;
832 }
833
834 if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
835 return SHM_MQ_SUCCESS;
836 else
837 return SHM_MQ_DETACHED;
838}

References Assert, fb(), MyProc, SHM_MQ_DETACHED, shm_mq_get_receiver(), shm_mq_get_sender(), SHM_MQ_SUCCESS, and shm_mq_wait_internal().

◆ shm_mq_wait_internal()

static bool shm_mq_wait_internal ( shm_mq mq,
PGPROC **  ptr,
BackgroundWorkerHandle handle 
)
static

Definition at line 1219 of file shm_mq.c.

1220{
1221 bool result = false;
1222
1223 for (;;)
1224 {
1225 BgwHandleStatus status;
1226 pid_t pid;
1227
1228 /* Acquire the lock just long enough to check the pointer. */
1229 SpinLockAcquire(&mq->mq_mutex);
1230 result = (*ptr != NULL);
1231 SpinLockRelease(&mq->mq_mutex);
1232
1233 /* Fail if detached; else succeed if initialized. */
1234 if (mq->mq_detached)
1235 {
1236 result = false;
1237 break;
1238 }
1239 if (result)
1240 break;
1241
1242 if (handle != NULL)
1243 {
1244 /* Check for unexpected worker death. */
1245 status = GetBackgroundWorkerPid(handle, &pid);
1246 if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1247 {
1248 result = false;
1249 break;
1250 }
1251 }
1252
1253 /* Wait to be signaled. */
1256
1257 /* Reset the latch so we don't spin. */
1259
1260 /* An interrupt may have occurred while we were waiting. */
1262 }
1263
1264 return result;
1265}

References BGWH_NOT_YET_STARTED, BGWH_STARTED, CHECK_FOR_INTERRUPTS, fb(), GetBackgroundWorkerPid(), MyLatch, ResetLatch(), SpinLockAcquire(), SpinLockRelease(), WaitLatch(), WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by shm_mq_receive(), shm_mq_send_bytes(), and shm_mq_wait_for_attach().

Variable Documentation

◆ shm_mq_minimum_size

const Size shm_mq_minimum_size
Initial value:

Definition at line 169 of file shm_mq.c.

Referenced by setup_dynamic_shared_memory().