PostgreSQL Source Code git master
Loading...
Searching...
No Matches
shm_mq.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * shm_mq.c
4 * single-reader, single-writer shared memory message queue
5 *
6 * Both the sender and the receiver must have a PGPROC; their respective
7 * process latches are used for synchronization. Only the sender may send,
8 * and only the receiver may receive. This is intended to allow a user
9 * backend to communicate with worker backends that it has registered.
10 *
11 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
13 *
14 * src/backend/storage/ipc/shm_mq.c
15 *
16 *-------------------------------------------------------------------------
17 */
18
19#include "postgres.h"
20
21#include "miscadmin.h"
22#include "pgstat.h"
23#include "port/pg_bitutils.h"
24#include "postmaster/bgworker.h"
25#include "storage/proc.h"
26#include "storage/shm_mq.h"
27#include "storage/spin.h"
28#include "utils/memutils.h"
29#include "utils/wait_event.h"
30
31/*
32 * This structure represents the actual queue, stored in shared memory.
33 *
34 * Some notes on synchronization:
35 *
36 * mq_receiver and mq_bytes_read can only be changed by the receiver; and
37 * mq_sender and mq_bytes_written can only be changed by the sender.
38 * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
39 * they cannot change once set, and thus may be read without a lock once this
40 * is known to be the case.
41 *
42 * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead,
43 * they are written atomically using 8 byte loads and stores. Memory barriers
44 * must be carefully used to synchronize reads and writes of these values with
45 * reads and writes of the actual data in mq_ring.
46 *
47 * mq_detached needs no locking. It can be set by either the sender or the
48 * receiver, but only ever from false to true, so redundant writes don't
49 * matter. It is important that if we set mq_detached and then set the
50 * counterparty's latch, the counterparty must be certain to see the change
51 * after waking up. Since SetLatch begins with a memory barrier and ResetLatch
52 * ends with one, this should be OK.
53 *
54 * mq_ring_size and mq_ring_offset never change after initialization, and
55 * can therefore be read without the lock.
56 *
57 * Importantly, mq_ring can be safely read and written without a lock.
58 * At any given time, the difference between mq_bytes_read and
59 * mq_bytes_written defines the number of bytes within mq_ring that contain
60 * unread data, and mq_bytes_read defines the position where those bytes
61 * begin. The sender can increase the number of unread bytes at any time,
62 * but only the receiver can give license to overwrite those bytes, by
63 * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
64 * the unread bytes it knows to be present without the lock. Conversely,
65 * the sender can write to the unused portion of the ring buffer without
66 * the lock, because nobody else can be reading or writing those bytes. The
67 * receiver could be making more bytes unused by incrementing mq_bytes_read,
68 * but that's OK. Note that it would be unsafe for the receiver to read any
69 * data it's already marked as read, or to write any data; and it would be
70 * unsafe for the sender to reread any data after incrementing
71 * mq_bytes_written, but fortunately there's no need for any of that.
72 */
85
86/*
87 * This structure is a backend-private handle for access to a queue.
88 *
89 * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
90 * an optional pointer to the dynamic shared memory segment that contains it.
91 * (If mqh_segment is provided, we register an on_dsm_detach callback to
92 * make sure we detach from the queue before detaching from DSM.)
93 *
94 * If this queue is intended to connect the current process with a background
95 * worker that started it, the user can pass a pointer to the worker handle
96 * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
97 * is to allow us to begin sending to or receiving from that queue before the
98 * process we'll be communicating with has even been started. If it fails
99 * to start, the handle will allow us to notice that and fail cleanly, rather
100 * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
101 * simple cases - e.g. where there are just 2 processes communicating; in
102 * more complex scenarios, every process may not have a BackgroundWorkerHandle
103 * available, or may need to watch for the failure of more than one other
104 * process at a time.
105 *
106 * When a message exists as a contiguous chunk of bytes in the queue - that is,
107 * it is smaller than the size of the ring buffer and does not wrap around
108 * the end - we return the message to the caller as a pointer into the buffer.
109 * For messages that are larger or happen to wrap, we reassemble the message
110 * locally by copying the chunks into a backend-local buffer. mqh_buffer is
111 * the buffer, and mqh_buflen is the number of bytes allocated for it.
112 *
113 * mqh_send_pending, is number of bytes that is written to the queue but not
114 * yet updated in the shared memory. We will not update it until the written
115 * data is 1/4th of the ring size or the tuple queue is full. This will
116 * prevent frequent CPU cache misses, and it will also avoid frequent
117 * SetLatch() calls, which are quite expensive.
118 *
119 * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
120 * are used to track the state of non-blocking operations. When the caller
121 * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
122 * are expected to retry the call at a later time with the same argument;
123 * we need to retain enough state to pick up where we left off.
124 * mqh_length_word_complete tracks whether we are done sending or receiving
125 * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
126 * the number of bytes read or written for either the length word or the
127 * message itself, and mqh_expected_bytes - which is used only for reads -
128 * tracks the expected total size of the payload.
129 *
130 * mqh_counterparty_attached tracks whether we know the counterparty to have
131 * attached to the queue at some previous point. This lets us avoid some
132 * mutex acquisitions.
133 *
134 * mqh_context is the memory context in effect at the time we attached to
135 * the shm_mq. The shm_mq_handle itself is allocated in this context, and
136 * we make sure any other allocations we do happen in this context as well,
137 * to avoid nasty surprises.
138 */
154
155static void shm_mq_detach_internal(shm_mq *mq);
157 const void *data, bool nowait, Size *bytes_written);
159 Size bytes_needed, bool nowait, Size *nbytesp,
160 void **datap);
162 BackgroundWorkerHandle *handle);
163static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
164 BackgroundWorkerHandle *handle);
165static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
166static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
168
169/* Minimum queue size is enough for header and at least one chunk of data. */
172
173#define MQH_INITIAL_BUFSIZE 8192
174
175/*
176 * Initialize a new shared message queue.
177 */
178shm_mq *
179shm_mq_create(void *address, Size size)
180{
181 shm_mq *mq = address;
183
184 /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
185 size = MAXALIGN_DOWN(size);
186
187 /* Queue size must be large enough to hold some data. */
188 Assert(size > data_offset);
189
190 /* Initialize queue header. */
191 SpinLockInit(&mq->mq_mutex);
192 mq->mq_receiver = NULL;
193 mq->mq_sender = NULL;
194 pg_atomic_init_u64(&mq->mq_bytes_read, 0);
195 pg_atomic_init_u64(&mq->mq_bytes_written, 0);
196 mq->mq_ring_size = size - data_offset;
197 mq->mq_detached = false;
198 mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
199
200 return mq;
201}
202
203/*
204 * Set the identity of the process that will receive from a shared message
205 * queue.
206 */
207void
209{
210 PGPROC *sender;
211
212 SpinLockAcquire(&mq->mq_mutex);
213 Assert(mq->mq_receiver == NULL);
214 mq->mq_receiver = proc;
215 sender = mq->mq_sender;
216 SpinLockRelease(&mq->mq_mutex);
217
218 if (sender != NULL)
219 SetLatch(&sender->procLatch);
220}
221
222/*
223 * Set the identity of the process that will send to a shared message queue.
224 */
225void
227{
229
230 SpinLockAcquire(&mq->mq_mutex);
231 Assert(mq->mq_sender == NULL);
232 mq->mq_sender = proc;
233 receiver = mq->mq_receiver;
234 SpinLockRelease(&mq->mq_mutex);
235
236 if (receiver != NULL)
237 SetLatch(&receiver->procLatch);
238}
239
240/*
241 * Get the configured receiver.
242 */
243PGPROC *
245{
247
248 SpinLockAcquire(&mq->mq_mutex);
249 receiver = mq->mq_receiver;
250 SpinLockRelease(&mq->mq_mutex);
251
252 return receiver;
253}
254
255/*
256 * Get the configured sender.
257 */
258PGPROC *
260{
261 PGPROC *sender;
262
263 SpinLockAcquire(&mq->mq_mutex);
264 sender = mq->mq_sender;
265 SpinLockRelease(&mq->mq_mutex);
266
267 return sender;
268}
269
270/*
271 * Attach to a shared message queue so we can send or receive messages.
272 *
273 * The memory context in effect at the time this function is called should
274 * be one which will last for at least as long as the message queue itself.
275 * We'll allocate the handle in that context, and future allocations that
276 * are needed to buffer incoming data will happen in that context as well.
277 *
278 * If seg != NULL, the queue will be automatically detached when that dynamic
279 * shared memory segment is detached.
280 *
281 * If handle != NULL, the queue can be read or written even before the
282 * other process has attached. We'll wait for it to do so if needed. The
283 * handle must be for a background worker initialized with bgw_notify_pid
284 * equal to our PID.
285 *
286 * shm_mq_detach() should be called when done. This will free the
287 * shm_mq_handle and mark the queue itself as detached, so that our
288 * counterpart won't get stuck waiting for us to fill or drain the queue
289 * after we've already lost interest.
290 */
293{
295
296 Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
297 mqh->mqh_queue = mq;
298 mqh->mqh_segment = seg;
299 mqh->mqh_handle = handle;
300 mqh->mqh_buffer = NULL;
301 mqh->mqh_buflen = 0;
302 mqh->mqh_consume_pending = 0;
303 mqh->mqh_send_pending = 0;
304 mqh->mqh_partial_bytes = 0;
305 mqh->mqh_expected_bytes = 0;
306 mqh->mqh_length_word_complete = false;
307 mqh->mqh_counterparty_attached = false;
308 mqh->mqh_context = CurrentMemoryContext;
309
310 if (seg != NULL)
312
313 return mqh;
314}
315
316/*
317 * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
318 * been passed to shm_mq_attach.
319 */
320void
322{
323 Assert(mqh->mqh_handle == NULL);
324 mqh->mqh_handle = handle;
325}
326
327/*
328 * Write a message into a shared message queue.
329 */
331shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait,
332 bool force_flush)
333{
335
336 iov.data = data;
337 iov.len = nbytes;
338
339 return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
340}
341
342/*
343 * Write a message into a shared message queue, gathered from multiple
344 * addresses.
345 *
346 * When nowait = false, we'll wait on our process latch when the ring buffer
347 * fills up, and then continue writing once the receiver has drained some data.
348 * The process latch is reset after each wait.
349 *
350 * When nowait = true, we do not manipulate the state of the process latch;
351 * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
352 * this case, the caller should call this function again, with the same
353 * arguments, each time the process latch is set. (Once begun, the sending
354 * of a message cannot be aborted except by detaching from the queue; changing
355 * the length or payload will corrupt the queue.)
356 *
357 * When force_flush = true, we immediately update the shm_mq's mq_bytes_written
358 * and notify the receiver (if it is already attached). Otherwise, we don't
359 * update it until we have written an amount of data greater than 1/4th of the
360 * ring size.
361 */
364 bool force_flush)
365{
366 shm_mq_result res;
367 shm_mq *mq = mqh->mqh_queue;
369 Size nbytes = 0;
371 int i;
372 int which_iov = 0;
373 Size offset;
374
375 Assert(mq->mq_sender == MyProc);
376
377 /* Compute total size of write. */
378 for (i = 0; i < iovcnt; ++i)
379 nbytes += iov[i].len;
380
381 /* Prevent writing messages overwhelming the receiver. */
382 if (nbytes > MaxAllocSize)
385 errmsg("cannot send a message of size %zu via shared memory queue",
386 nbytes)));
387
388 /* Try to write, or finish writing, the length word into the buffer. */
389 while (!mqh->mqh_length_word_complete)
390 {
391 Assert(mqh->mqh_partial_bytes < sizeof(Size));
392 res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
393 ((char *) &nbytes) + mqh->mqh_partial_bytes,
394 nowait, &bytes_written);
395
396 if (res == SHM_MQ_DETACHED)
397 {
398 /* Reset state in case caller tries to send another message. */
399 mqh->mqh_partial_bytes = 0;
400 mqh->mqh_length_word_complete = false;
401 return res;
402 }
403 mqh->mqh_partial_bytes += bytes_written;
404
405 if (mqh->mqh_partial_bytes >= sizeof(Size))
406 {
407 Assert(mqh->mqh_partial_bytes == sizeof(Size));
408
409 mqh->mqh_partial_bytes = 0;
410 mqh->mqh_length_word_complete = true;
411 }
412
413 if (res != SHM_MQ_SUCCESS)
414 return res;
415
416 /* Length word can't be split unless bigger than required alignment. */
417 Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
418 }
419
420 /* Write the actual data bytes into the buffer. */
421 Assert(mqh->mqh_partial_bytes <= nbytes);
422 offset = mqh->mqh_partial_bytes;
423 do
424 {
426
427 /* Figure out which bytes need to be sent next. */
428 if (offset >= iov[which_iov].len)
429 {
430 offset -= iov[which_iov].len;
431 ++which_iov;
432 if (which_iov >= iovcnt)
433 break;
434 continue;
435 }
436
437 /*
438 * We want to avoid copying the data if at all possible, but every
439 * chunk of bytes we write into the queue has to be MAXALIGN'd, except
440 * the last. Thus, if a chunk other than the last one ends on a
441 * non-MAXALIGN'd boundary, we have to combine the tail end of its
442 * data with data from one or more following chunks until we either
443 * reach the last chunk or accumulate a number of bytes which is
444 * MAXALIGN'd.
445 */
446 if (which_iov + 1 < iovcnt &&
447 offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
448 {
450 int j = 0;
451
452 for (;;)
453 {
454 if (offset < iov[which_iov].len)
455 {
456 tmpbuf[j] = iov[which_iov].data[offset];
457 j++;
458 offset++;
459 if (j == MAXIMUM_ALIGNOF)
460 break;
461 }
462 else
463 {
464 offset -= iov[which_iov].len;
465 which_iov++;
466 if (which_iov >= iovcnt)
467 break;
468 }
469 }
470
471 res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
472
473 if (res == SHM_MQ_DETACHED)
474 {
475 /* Reset state in case caller tries to send another message. */
476 mqh->mqh_partial_bytes = 0;
477 mqh->mqh_length_word_complete = false;
478 return res;
479 }
480
481 mqh->mqh_partial_bytes += bytes_written;
482 if (res != SHM_MQ_SUCCESS)
483 return res;
484 continue;
485 }
486
487 /*
488 * If this is the last chunk, we can write all the data, even if it
489 * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
490 * MAXALIGN_DOWN the write size.
491 */
492 chunksize = iov[which_iov].len - offset;
493 if (which_iov + 1 < iovcnt)
496 nowait, &bytes_written);
497
498 if (res == SHM_MQ_DETACHED)
499 {
500 /* Reset state in case caller tries to send another message. */
501 mqh->mqh_length_word_complete = false;
502 mqh->mqh_partial_bytes = 0;
503 return res;
504 }
505
506 mqh->mqh_partial_bytes += bytes_written;
507 offset += bytes_written;
508 if (res != SHM_MQ_SUCCESS)
509 return res;
510 } while (mqh->mqh_partial_bytes < nbytes);
511
512 /* Reset for next message. */
513 mqh->mqh_partial_bytes = 0;
514 mqh->mqh_length_word_complete = false;
515
516 /* If queue has been detached, let caller know. */
517 if (mq->mq_detached)
518 return SHM_MQ_DETACHED;
519
520 /*
521 * If the counterparty is known to have attached, we can read mq_receiver
522 * without acquiring the spinlock. Otherwise, more caution is needed.
523 */
524 if (mqh->mqh_counterparty_attached)
525 receiver = mq->mq_receiver;
526 else
527 {
528 SpinLockAcquire(&mq->mq_mutex);
529 receiver = mq->mq_receiver;
530 SpinLockRelease(&mq->mq_mutex);
531 if (receiver != NULL)
532 mqh->mqh_counterparty_attached = true;
533 }
534
535 /*
536 * If the caller has requested force flush or we have written more than
537 * 1/4 of the ring size, mark it as written in shared memory and notify
538 * the receiver.
539 */
540 if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
541 {
542 shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
543 if (receiver != NULL)
544 SetLatch(&receiver->procLatch);
545 mqh->mqh_send_pending = 0;
546 }
547
548 return SHM_MQ_SUCCESS;
549}
550
551/*
552 * Receive a message from a shared message queue.
553 *
554 * We set *nbytes to the message length and *data to point to the message
555 * payload. If the entire message exists in the queue as a single,
556 * contiguous chunk, *data will point directly into shared memory; otherwise,
557 * it will point to a temporary buffer. This mostly avoids data copying in
558 * the hoped-for case where messages are short compared to the buffer size,
559 * while still allowing longer messages. In either case, the return value
560 * remains valid until the next receive operation is performed on the queue.
561 *
562 * When nowait = false, we'll wait on our process latch when the ring buffer
563 * is empty and we have not yet received a full message. The sender will
564 * set our process latch after more data has been written, and we'll resume
565 * processing. Each call will therefore return a complete message
566 * (unless the sender detaches the queue).
567 *
568 * When nowait = true, we do not manipulate the state of the process latch;
569 * instead, whenever the buffer is empty and we need to read from it, we
570 * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
571 * function again after the process latch has been set.
572 */
575{
576 shm_mq *mq = mqh->mqh_queue;
577 shm_mq_result res;
578 Size rb = 0;
579 Size nbytes;
580 void *rawdata;
581
582 Assert(mq->mq_receiver == MyProc);
583
584 /* We can't receive data until the sender has attached. */
585 if (!mqh->mqh_counterparty_attached)
586 {
587 if (nowait)
588 {
590
591 /*
592 * We shouldn't return at this point at all unless the sender
593 * hasn't attached yet. However, the correct return value depends
594 * on whether the sender is still attached. If we first test
595 * whether the sender has ever attached and then test whether the
596 * sender has detached, there's a race condition: a sender that
597 * attaches and detaches very quickly might fool us into thinking
598 * the sender never attached at all. So, test whether our
599 * counterparty is definitively gone first, and only afterwards
600 * check whether the sender ever attached in the first place.
601 */
603 if (shm_mq_get_sender(mq) == NULL)
604 {
606 return SHM_MQ_DETACHED;
607 else
608 return SHM_MQ_WOULD_BLOCK;
609 }
610 }
611 else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
613 {
614 mq->mq_detached = true;
615 return SHM_MQ_DETACHED;
616 }
617 mqh->mqh_counterparty_attached = true;
618 }
619
620 /*
621 * If we've consumed an amount of data greater than 1/4th of the ring
622 * size, mark it consumed in shared memory. We try to avoid doing this
623 * unnecessarily when only a small amount of data has been consumed,
624 * because SetLatch() is fairly expensive and we don't want to do it too
625 * often.
626 */
627 if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
628 {
629 shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
630 mqh->mqh_consume_pending = 0;
631 }
632
633 /* Try to read, or finish reading, the length word from the buffer. */
634 while (!mqh->mqh_length_word_complete)
635 {
636 /* Try to receive the message length word. */
637 Assert(mqh->mqh_partial_bytes < sizeof(Size));
638 res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
639 nowait, &rb, &rawdata);
640 if (res != SHM_MQ_SUCCESS)
641 return res;
642
643 /*
644 * Hopefully, we'll receive the entire message length word at once.
645 * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
646 * multiple reads.
647 */
648 if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
649 {
650 Size needed;
651
652 nbytes = *(Size *) rawdata;
653
654 /* If we've already got the whole message, we're done. */
655 needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
656 if (rb >= needed)
657 {
658 mqh->mqh_consume_pending += needed;
659 *nbytesp = nbytes;
660 *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
661 return SHM_MQ_SUCCESS;
662 }
663
664 /*
665 * We don't have the whole message, but we at least have the whole
666 * length word.
667 */
668 mqh->mqh_expected_bytes = nbytes;
669 mqh->mqh_length_word_complete = true;
670 mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
671 rb -= MAXALIGN(sizeof(Size));
672 }
673 else
674 {
676
677 /* Can't be split unless bigger than required alignment. */
678 Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
679
680 /* Message word is split; need buffer to reassemble. */
681 if (mqh->mqh_buffer == NULL)
682 {
683 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
685 mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
686 }
687 Assert(mqh->mqh_buflen >= sizeof(Size));
688
689 /* Copy partial length word; remember to consume it. */
690 if (mqh->mqh_partial_bytes + rb > sizeof(Size))
691 lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
692 else
693 lengthbytes = rb;
694 memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
696 mqh->mqh_partial_bytes += lengthbytes;
697 mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
698 rb -= lengthbytes;
699
700 /* If we now have the whole word, we're ready to read payload. */
701 if (mqh->mqh_partial_bytes >= sizeof(Size))
702 {
703 Assert(mqh->mqh_partial_bytes == sizeof(Size));
704 mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
705 mqh->mqh_length_word_complete = true;
706 mqh->mqh_partial_bytes = 0;
707 }
708 }
709 }
710 nbytes = mqh->mqh_expected_bytes;
711
712 /*
713 * Should be disallowed on the sending side already, but better check and
714 * error out on the receiver side as well rather than trying to read a
715 * prohibitively large message.
716 */
717 if (nbytes > MaxAllocSize)
720 errmsg("invalid message size %zu in shared memory queue",
721 nbytes)));
722
723 if (mqh->mqh_partial_bytes == 0)
724 {
725 /*
726 * Try to obtain the whole message in a single chunk. If this works,
727 * we need not copy the data and can return a pointer directly into
728 * shared memory.
729 */
730 res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
731 if (res != SHM_MQ_SUCCESS)
732 return res;
733 if (rb >= nbytes)
734 {
735 mqh->mqh_length_word_complete = false;
736 mqh->mqh_consume_pending += MAXALIGN(nbytes);
737 *nbytesp = nbytes;
738 *datap = rawdata;
739 return SHM_MQ_SUCCESS;
740 }
741
742 /*
743 * The message has wrapped the buffer. We'll need to copy it in order
744 * to return it to the client in one chunk. First, make sure we have
745 * a large enough buffer available.
746 */
747 if (mqh->mqh_buflen < nbytes)
748 {
750
751 /*
752 * Increase size to the next power of 2 that's >= nbytes, but
753 * limit to MaxAllocSize.
754 */
757
758 if (mqh->mqh_buffer != NULL)
759 {
760 pfree(mqh->mqh_buffer);
761 mqh->mqh_buffer = NULL;
762 mqh->mqh_buflen = 0;
763 }
764 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
765 mqh->mqh_buflen = newbuflen;
766 }
767 }
768
769 /* Loop until we've copied the entire message. */
770 for (;;)
771 {
773
774 /* Copy as much as we can. */
775 Assert(mqh->mqh_partial_bytes + rb <= nbytes);
776 if (rb > 0)
777 {
778 memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
779 mqh->mqh_partial_bytes += rb;
780 }
781
782 /*
783 * Update count of bytes that can be consumed, accounting for
784 * alignment padding. Note that this will never actually insert any
785 * padding except at the end of a message, because the buffer size is
786 * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
787 */
788 Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
789 mqh->mqh_consume_pending += MAXALIGN(rb);
790
791 /* If we got all the data, exit the loop. */
792 if (mqh->mqh_partial_bytes >= nbytes)
793 break;
794
795 /* Wait for some more data. */
796 still_needed = nbytes - mqh->mqh_partial_bytes;
797 res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
798 if (res != SHM_MQ_SUCCESS)
799 return res;
800 if (rb > still_needed)
802 }
803
804 /* Return the complete message, and reset for next message. */
805 *nbytesp = nbytes;
806 *datap = mqh->mqh_buffer;
807 mqh->mqh_length_word_complete = false;
808 mqh->mqh_partial_bytes = 0;
809 return SHM_MQ_SUCCESS;
810}
811
812/*
813 * Wait for the other process that's supposed to use this queue to attach
814 * to it.
815 *
816 * The return value is SHM_MQ_DETACHED if the worker has already detached or
817 * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
818 * Note that we will only be able to detect that the worker has died before
819 * attaching if a background worker handle was passed to shm_mq_attach().
820 */
823{
824 shm_mq *mq = mqh->mqh_queue;
825 PGPROC **victim;
826
828 victim = &mq->mq_sender;
829 else
830 {
832 victim = &mq->mq_receiver;
833 }
834
835 if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
836 return SHM_MQ_SUCCESS;
837 else
838 return SHM_MQ_DETACHED;
839}
840
841/*
842 * Detach from a shared message queue, and destroy the shm_mq_handle.
843 */
844void
846{
847 /* Before detaching, notify the receiver about any already-written data. */
848 if (mqh->mqh_send_pending > 0)
849 {
850 shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending);
851 mqh->mqh_send_pending = 0;
852 }
853
854 /* Notify counterparty that we're outta here. */
855 shm_mq_detach_internal(mqh->mqh_queue);
856
857 /* Cancel on_dsm_detach callback, if any. */
858 if (mqh->mqh_segment)
859 cancel_on_dsm_detach(mqh->mqh_segment,
861 PointerGetDatum(mqh->mqh_queue));
862
863 /* Release local memory associated with handle. */
864 if (mqh->mqh_buffer != NULL)
865 pfree(mqh->mqh_buffer);
866 pfree(mqh);
867}
868
869/*
870 * Notify counterparty that we're detaching from shared message queue.
871 *
872 * The purpose of this function is to make sure that the process
873 * with which we're communicating doesn't block forever waiting for us to
874 * fill or drain the queue once we've lost interest. When the sender
875 * detaches, the receiver can read any messages remaining in the queue;
876 * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
877 * further attempts to send messages will likewise return SHM_MQ_DETACHED.
878 *
879 * This is separated out from shm_mq_detach() because if the on_dsm_detach
880 * callback fires, we only want to do this much. We do not try to touch
881 * the local shm_mq_handle, as it may have been pfree'd already.
882 */
883static void
885{
886 PGPROC *victim;
887
888 SpinLockAcquire(&mq->mq_mutex);
889 if (mq->mq_sender == MyProc)
890 victim = mq->mq_receiver;
891 else
892 {
893 Assert(mq->mq_receiver == MyProc);
894 victim = mq->mq_sender;
895 }
896 mq->mq_detached = true;
897 SpinLockRelease(&mq->mq_mutex);
898
899 if (victim != NULL)
900 SetLatch(&victim->procLatch);
901}
902
903/*
904 * Get the shm_mq from handle.
905 */
906shm_mq *
908{
909 return mqh->mqh_queue;
910}
911
912/*
913 * Write bytes into a shared message queue.
914 */
915static shm_mq_result
917 bool nowait, Size *bytes_written)
918{
919 shm_mq *mq = mqh->mqh_queue;
920 Size sent = 0;
921 uint64 used;
924
925 while (sent < nbytes)
926 {
927 uint64 rb;
928 uint64 wb;
929
930 /* Compute number of ring buffer bytes used and available. */
931 rb = pg_atomic_read_u64(&mq->mq_bytes_read);
932 wb = pg_atomic_read_u64(&mq->mq_bytes_written) + mqh->mqh_send_pending;
933 Assert(wb >= rb);
934 used = wb - rb;
935 Assert(used <= ringsize);
936 available = Min(ringsize - used, nbytes - sent);
937
938 /*
939 * Bail out if the queue has been detached. Note that we would be in
940 * trouble if the compiler decided to cache the value of
941 * mq->mq_detached in a register or on the stack across loop
942 * iterations. It probably shouldn't do that anyway since we'll
943 * always return, call an external function that performs a system
944 * call, or reach a memory barrier at some point later in the loop,
945 * but just to be sure, insert a compiler barrier here.
946 */
948 if (mq->mq_detached)
949 {
951 return SHM_MQ_DETACHED;
952 }
953
954 if (available == 0 && !mqh->mqh_counterparty_attached)
955 {
956 /*
957 * The queue is full, so if the receiver isn't yet known to be
958 * attached, we must wait for that to happen.
959 */
960 if (nowait)
961 {
962 if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
963 {
965 return SHM_MQ_DETACHED;
966 }
968 {
970 return SHM_MQ_WOULD_BLOCK;
971 }
972 }
973 else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
974 mqh->mqh_handle))
975 {
976 mq->mq_detached = true;
978 return SHM_MQ_DETACHED;
979 }
980 mqh->mqh_counterparty_attached = true;
981
982 /*
983 * The receiver may have read some data after attaching, so we
984 * must not wait without rechecking the queue state.
985 */
986 }
987 else if (available == 0)
988 {
989 /* Update the pending send bytes in the shared memory. */
990 shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
991
992 /*
993 * Since mq->mqh_counterparty_attached is known to be true at this
994 * point, mq_receiver has been set, and it can't change once set.
995 * Therefore, we can read it without acquiring the spinlock.
996 */
997 Assert(mqh->mqh_counterparty_attached);
998 SetLatch(&mq->mq_receiver->procLatch);
999
1000 /*
1001 * We have just updated the mqh_send_pending bytes in the shared
1002 * memory so reset it.
1003 */
1004 mqh->mqh_send_pending = 0;
1005
1006 /* Skip manipulation of our latch if nowait = true. */
1007 if (nowait)
1008 {
1010 return SHM_MQ_WOULD_BLOCK;
1011 }
1012
1013 /*
1014 * Wait for our latch to be set. It might already be set for some
1015 * unrelated reason, but that'll just result in one extra trip
1016 * through the loop. It's worth it to avoid resetting the latch
1017 * at top of loop, because setting an already-set latch is much
1018 * cheaper than setting one that has been reset.
1019 */
1022
1023 /* Reset the latch so we don't spin. */
1025
1026 /* An interrupt may have occurred while we were waiting. */
1028 }
1029 else
1030 {
1031 Size offset;
1032 Size sendnow;
1033
1034 offset = wb % (uint64) ringsize;
1035 sendnow = Min(available, ringsize - offset);
1036
1037 /*
1038 * Write as much data as we can via a single memcpy(). Make sure
1039 * these writes happen after the read of mq_bytes_read, above.
1040 * This barrier pairs with the one in shm_mq_inc_bytes_read.
1041 * (Since we're separating the read of mq_bytes_read from a
1042 * subsequent write to mq_ring, we need a full barrier here.)
1043 */
1045 memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
1046 (const char *) data + sent, sendnow);
1047 sent += sendnow;
1048
1049 /*
1050 * Update count of bytes written, with alignment padding. Note
1051 * that this will never actually insert any padding except at the
1052 * end of a run of bytes, because the buffer size is a multiple of
1053 * MAXIMUM_ALIGNOF, and each read is as well.
1054 */
1055 Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
1056
1057 /*
1058 * For efficiency, we don't update the bytes written in the shared
1059 * memory and also don't set the reader's latch here. Refer to
1060 * the comments atop the shm_mq_handle structure for more
1061 * information.
1062 */
1063 mqh->mqh_send_pending += MAXALIGN(sendnow);
1064 }
1065 }
1066
1068 return SHM_MQ_SUCCESS;
1069}
1070
1071/*
1072 * Wait until at least *nbytesp bytes are available to be read from the
1073 * shared message queue, or until the buffer wraps around. If the queue is
1074 * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
1075 * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
1076 * to the location at which data bytes can be read, *nbytesp is set to the
1077 * number of bytes which can be read at that address, and the return value
1078 * is SHM_MQ_SUCCESS.
1079 */
1080static shm_mq_result
1082 Size *nbytesp, void **datap)
1083{
1084 shm_mq *mq = mqh->mqh_queue;
1086 uint64 used;
1088
1089 for (;;)
1090 {
1091 Size offset;
1092 uint64 read;
1093
1094 /* Get bytes written, so we can compute what's available to read. */
1095 written = pg_atomic_read_u64(&mq->mq_bytes_written);
1096
1097 /*
1098 * Get bytes read. Include bytes we could consume but have not yet
1099 * consumed.
1100 */
1101 read = pg_atomic_read_u64(&mq->mq_bytes_read) +
1102 mqh->mqh_consume_pending;
1103 used = written - read;
1104 Assert(used <= ringsize);
1105 offset = read % (uint64) ringsize;
1106
1107 /* If we have enough data or buffer has wrapped, we're done. */
1108 if (used >= bytes_needed || offset + used >= ringsize)
1109 {
1110 *nbytesp = Min(used, ringsize - offset);
1111 *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1112
1113 /*
1114 * Separate the read of mq_bytes_written, above, from caller's
1115 * attempt to read the data itself. Pairs with the barrier in
1116 * shm_mq_inc_bytes_written.
1117 */
1119 return SHM_MQ_SUCCESS;
1120 }
1121
1122 /*
1123 * Fall out before waiting if the queue has been detached.
1124 *
1125 * Note that we don't check for this until *after* considering whether
1126 * the data already available is enough, since the receiver can finish
1127 * receiving a message stored in the buffer even after the sender has
1128 * detached.
1129 */
1130 if (mq->mq_detached)
1131 {
1132 /*
1133 * If the writer advanced mq_bytes_written and then set
1134 * mq_detached, we might not have read the final value of
1135 * mq_bytes_written above. Insert a read barrier and then check
1136 * again if mq_bytes_written has advanced.
1137 */
1139 if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
1140 continue;
1141
1142 return SHM_MQ_DETACHED;
1143 }
1144
1145 /*
1146 * We didn't get enough data to satisfy the request, so mark any data
1147 * previously-consumed as read to make more buffer space.
1148 */
1149 if (mqh->mqh_consume_pending > 0)
1150 {
1151 shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
1152 mqh->mqh_consume_pending = 0;
1153 }
1154
1155 /* Skip manipulation of our latch if nowait = true. */
1156 if (nowait)
1157 return SHM_MQ_WOULD_BLOCK;
1158
1159 /*
1160 * Wait for our latch to be set. It might already be set for some
1161 * unrelated reason, but that'll just result in one extra trip through
1162 * the loop. It's worth it to avoid resetting the latch at top of
1163 * loop, because setting an already-set latch is much cheaper than
1164 * setting one that has been reset.
1165 */
1168
1169 /* Reset the latch so we don't spin. */
1171
1172 /* An interrupt may have occurred while we were waiting. */
1174 }
1175}
1176
1177/*
1178 * Test whether a counterparty who may not even be alive yet is definitely gone.
1179 */
1180static bool
1182{
1183 pid_t pid;
1184
1185 /* If the queue has been detached, counterparty is definitely gone. */
1186 if (mq->mq_detached)
1187 return true;
1188
1189 /* If there's a handle, check worker status. */
1190 if (handle != NULL)
1191 {
1192 BgwHandleStatus status;
1193
1194 /* Check for unexpected worker death. */
1195 status = GetBackgroundWorkerPid(handle, &pid);
1196 if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1197 {
1198 /* Mark it detached, just to make it official. */
1199 mq->mq_detached = true;
1200 return true;
1201 }
1202 }
1203
1204 /* Counterparty is not definitively gone. */
1205 return false;
1206}
1207
1208/*
1209 * This is used when a process is waiting for its counterpart to attach to the
1210 * queue. We exit when the other process attaches as expected, or, if
1211 * handle != NULL, when the referenced background process or the postmaster
1212 * dies. Note that if handle == NULL, and the process fails to attach, we'll
1213 * potentially get stuck here forever waiting for a process that may never
1214 * start. We do check for interrupts, though.
1215 *
1216 * ptr is a pointer to the memory address that we're expecting to become
1217 * non-NULL when our counterpart attaches to the queue.
1218 */
1219static bool
1221{
1222 bool result = false;
1223
1224 for (;;)
1225 {
1226 BgwHandleStatus status;
1227 pid_t pid;
1228
1229 /* Acquire the lock just long enough to check the pointer. */
1230 SpinLockAcquire(&mq->mq_mutex);
1231 result = (*ptr != NULL);
1232 SpinLockRelease(&mq->mq_mutex);
1233
1234 /* Fail if detached; else succeed if initialized. */
1235 if (mq->mq_detached)
1236 {
1237 result = false;
1238 break;
1239 }
1240 if (result)
1241 break;
1242
1243 if (handle != NULL)
1244 {
1245 /* Check for unexpected worker death. */
1246 status = GetBackgroundWorkerPid(handle, &pid);
1247 if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1248 {
1249 result = false;
1250 break;
1251 }
1252 }
1253
1254 /* Wait to be signaled. */
1257
1258 /* Reset the latch so we don't spin. */
1260
1261 /* An interrupt may have occurred while we were waiting. */
1263 }
1264
1265 return result;
1266}
1267
1268/*
1269 * Increment the number of bytes read.
1270 */
1271static void
1273{
1274 PGPROC *sender;
1275
1276 /*
1277 * Separate prior reads of mq_ring from the increment of mq_bytes_read
1278 * which follows. This pairs with the full barrier in
1279 * shm_mq_send_bytes(). We only need a read barrier here because the
1280 * increment of mq_bytes_read is actually a read followed by a dependent
1281 * write.
1282 */
1284
1285 /*
1286 * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1287 * else can be changing this value. This method should be cheaper.
1288 */
1289 pg_atomic_write_u64(&mq->mq_bytes_read,
1290 pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1291
1292 /*
1293 * We shouldn't have any bytes to read without a sender, so we can read
1294 * mq_sender here without a lock. Once it's initialized, it can't change.
1295 */
1296 sender = mq->mq_sender;
1297 Assert(sender != NULL);
1298 SetLatch(&sender->procLatch);
1299}
1300
1301/*
1302 * Increment the number of bytes written.
1303 */
1304static void
1306{
1307 /*
1308 * Separate prior reads of mq_ring from the write of mq_bytes_written
1309 * which we're about to do. Pairs with the read barrier found in
1310 * shm_mq_receive_bytes.
1311 */
1313
1314 /*
1315 * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1316 * else can be changing this value. This method avoids taking the bus
1317 * lock unnecessarily.
1318 */
1319 pg_atomic_write_u64(&mq->mq_bytes_written,
1320 pg_atomic_read_u64(&mq->mq_bytes_written) + n);
1321}
1322
1323/* Shim for on_dsm_detach callback. */
1324static void
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:485
#define pg_memory_barrier()
Definition atomics.h:141
#define pg_read_barrier()
Definition atomics.h:154
#define pg_write_barrier()
Definition atomics.h:155
#define pg_compiler_barrier()
Definition atomics.h:129
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:453
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition atomics.h:467
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition bgworker.c:1165
BgwHandleStatus
Definition bgworker.h:111
@ BGWH_STARTED
Definition bgworker.h:112
@ BGWH_NOT_YET_STARTED
Definition bgworker.h:113
#define MAXALIGN_DOWN(LEN)
Definition c.h:910
#define Min(x, y)
Definition c.h:1093
#define MAXALIGN(LEN)
Definition c.h:898
uint8_t uint8
Definition c.h:616
#define Assert(condition)
Definition c.h:945
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:552
uint64_t uint64
Definition c.h:619
size_t Size
Definition c.h:691
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition dsm.c:1132
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition dsm.c:1147
Datum arg
Definition elog.c:1322
int errcode(int sqlerrcode)
Definition elog.c:874
#define ERROR
Definition elog.h:39
#define ereport(elevel,...)
Definition elog.h:150
#define palloc_object(type)
Definition fe_memutils.h:74
#define MaxAllocSize
Definition fe_memutils.h:22
struct Latch * MyLatch
Definition globals.c:63
#define read(a, b, c)
Definition win32.h:13
int j
Definition isn.c:78
int i
Definition isn.c:77
void SetLatch(Latch *latch)
Definition latch.c:290
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1232
void pfree(void *pointer)
Definition mcxt.c:1616
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
static char * errmsg
#define pg_nextpower2_size_t
const void size_t len
const void * data
static Datum PointerGetDatum(const void *X)
Definition postgres.h:342
uint64_t Datum
Definition postgres.h:70
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:332
static int fb(int x)
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition shm_mq.c:916
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
Definition shm_mq.c:907
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
Definition shm_mq.c:1305
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition shm_mq.c:226
shm_mq * shm_mq_create(void *address, Size size)
Definition shm_mq.c:179
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition shm_mq.c:244
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
Definition shm_mq.c:321
#define MQH_INITIAL_BUFSIZE
Definition shm_mq.c:173
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition shm_mq.c:259
void shm_mq_detach(shm_mq_handle *mqh)
Definition shm_mq.c:845
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition shm_mq.c:1325
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
Definition shm_mq.c:1181
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool force_flush)
Definition shm_mq.c:363
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition shm_mq.c:1220
static void shm_mq_detach_internal(shm_mq *mq)
Definition shm_mq.c:884
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
Definition shm_mq.c:208
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition shm_mq.c:574
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition shm_mq.c:331
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
Definition shm_mq.c:1272
shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh)
Definition shm_mq.c:822
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition shm_mq.c:292
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition shm_mq.c:1081
const Size shm_mq_minimum_size
Definition shm_mq.c:170
shm_mq_result
Definition shm_mq.h:39
@ SHM_MQ_SUCCESS
Definition shm_mq.h:40
@ SHM_MQ_WOULD_BLOCK
Definition shm_mq.h:41
@ SHM_MQ_DETACHED
Definition shm_mq.h:42
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50
PGPROC * MyProc
Definition proc.c:68
Definition proc.h:176
Size mqh_consume_pending
Definition shm_mq.c:146
BackgroundWorkerHandle * mqh_handle
Definition shm_mq.c:143
char * mqh_buffer
Definition shm_mq.c:144
Size mqh_send_pending
Definition shm_mq.c:147
Size mqh_expected_bytes
Definition shm_mq.c:149
bool mqh_counterparty_attached
Definition shm_mq.c:151
shm_mq * mqh_queue
Definition shm_mq.c:141
dsm_segment * mqh_segment
Definition shm_mq.c:142
bool mqh_length_word_complete
Definition shm_mq.c:150
Size mqh_buflen
Definition shm_mq.c:145
MemoryContext mqh_context
Definition shm_mq.c:152
Size mqh_partial_bytes
Definition shm_mq.c:148
const char * data
Definition shm_mq.h:33
pg_atomic_uint64 mq_bytes_written
Definition shm_mq.c:79
pg_atomic_uint64 mq_bytes_read
Definition shm_mq.c:78
bool mq_detached
Definition shm_mq.c:81
PGPROC * mq_sender
Definition shm_mq.c:77
uint8 mq_ring_offset
Definition shm_mq.c:82
slock_t mq_mutex
Definition shm_mq.c:75
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
Definition shm_mq.c:83
Size mq_ring_size
Definition shm_mq.c:80
PGPROC * mq_receiver
Definition shm_mq.c:76
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
static StringInfoData tmpbuf
Definition walsender.c:179