PostgreSQL Source Code  git master
shm_mq.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * shm_mq.c
4  * single-reader, single-writer shared memory message queue
5  *
6  * Both the sender and the receiver must have a PGPROC; their respective
7  * process latches are used for synchronization. Only the sender may send,
8  * and only the receiver may receive. This is intended to allow a user
9  * backend to communicate with worker backends that it has registered.
10  *
11  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  * src/backend/storage/ipc/shm_mq.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "port/pg_bitutils.h"
24 #include "postmaster/bgworker.h"
25 #include "storage/procsignal.h"
26 #include "storage/shm_mq.h"
27 #include "storage/spin.h"
28 #include "utils/memutils.h"
29 
30 /*
31  * This structure represents the actual queue, stored in shared memory.
32  *
33  * Some notes on synchronization:
34  *
35  * mq_receiver and mq_bytes_read can only be changed by the receiver; and
36  * mq_sender and mq_bytes_written can only be changed by the sender.
37  * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
38  * they cannot change once set, and thus may be read without a lock once this
39  * is known to be the case.
40  *
41  * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead,
42  * they are written atomically using 8 byte loads and stores. Memory barriers
43  * must be carefully used to synchronize reads and writes of these values with
44  * reads and writes of the actual data in mq_ring.
45  *
46  * mq_detached needs no locking. It can be set by either the sender or the
47  * receiver, but only ever from false to true, so redundant writes don't
48  * matter. It is important that if we set mq_detached and then set the
49  * counterparty's latch, the counterparty must be certain to see the change
50  * after waking up. Since SetLatch begins with a memory barrier and ResetLatch
51  * ends with one, this should be OK.
52  *
53  * mq_ring_size and mq_ring_offset never change after initialization, and
54  * can therefore be read without the lock.
55  *
56  * Importantly, mq_ring can be safely read and written without a lock.
57  * At any given time, the difference between mq_bytes_read and
58  * mq_bytes_written defines the number of bytes within mq_ring that contain
59  * unread data, and mq_bytes_read defines the position where those bytes
60  * begin. The sender can increase the number of unread bytes at any time,
61  * but only the receiver can give license to overwrite those bytes, by
62  * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
63  * the unread bytes it knows to be present without the lock. Conversely,
64  * the sender can write to the unused portion of the ring buffer without
65  * the lock, because nobody else can be reading or writing those bytes. The
66  * receiver could be making more bytes unused by incrementing mq_bytes_read,
67  * but that's OK. Note that it would be unsafe for the receiver to read any
68  * data it's already marked as read, or to write any data; and it would be
69  * unsafe for the sender to reread any data after incrementing
70  * mq_bytes_written, but fortunately there's no need for any of that.
71  */
72 struct shm_mq
73 {
83 };
84 
85 /*
86  * This structure is a backend-private handle for access to a queue.
87  *
88  * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
89  * an optional pointer to the dynamic shared memory segment that contains it.
90  * (If mqh_segment is provided, we register an on_dsm_detach callback to
91  * make sure we detach from the queue before detaching from DSM.)
92  *
93  * If this queue is intended to connect the current process with a background
94  * worker that started it, the user can pass a pointer to the worker handle
95  * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
96  * is to allow us to begin sending to or receiving from that queue before the
97  * process we'll be communicating with has even been started. If it fails
98  * to start, the handle will allow us to notice that and fail cleanly, rather
99  * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
100  * simple cases - e.g. where there are just 2 processes communicating; in
101  * more complex scenarios, every process may not have a BackgroundWorkerHandle
102  * available, or may need to watch for the failure of more than one other
103  * process at a time.
104  *
105  * When a message exists as a contiguous chunk of bytes in the queue - that is,
106  * it is smaller than the size of the ring buffer and does not wrap around
107  * the end - we return the message to the caller as a pointer into the buffer.
108  * For messages that are larger or happen to wrap, we reassemble the message
109  * locally by copying the chunks into a backend-local buffer. mqh_buffer is
110  * the buffer, and mqh_buflen is the number of bytes allocated for it.
111  *
112  * mqh_send_pending, is number of bytes that is written to the queue but not
113  * yet updated in the shared memory. We will not update it until the written
114  * data is 1/4th of the ring size or the tuple queue is full. This will
115  * prevent frequent CPU cache misses, and it will also avoid frequent
116  * SetLatch() calls, which are quite expensive.
117  *
118  * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
119  * are used to track the state of non-blocking operations. When the caller
120  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
121  * are expected to retry the call at a later time with the same argument;
122  * we need to retain enough state to pick up where we left off.
123  * mqh_length_word_complete tracks whether we are done sending or receiving
124  * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
125  * the number of bytes read or written for either the length word or the
126  * message itself, and mqh_expected_bytes - which is used only for reads -
127  * tracks the expected total size of the payload.
128  *
129  * mqh_counterparty_attached tracks whether we know the counterparty to have
130  * attached to the queue at some previous point. This lets us avoid some
131  * mutex acquisitions.
132  *
133  * mqh_context is the memory context in effect at the time we attached to
134  * the shm_mq. The shm_mq_handle itself is allocated in this context, and
135  * we make sure any other allocations we do happen in this context as well,
136  * to avoid nasty surprises.
137  */
139 {
143  char *mqh_buffer;
152 };
153 
154 static void shm_mq_detach_internal(shm_mq *mq);
156  const void *data, bool nowait, Size *bytes_written);
158  Size bytes_needed, bool nowait, Size *nbytesp,
159  void **datap);
160 static bool shm_mq_counterparty_gone(shm_mq *mq,
161  BackgroundWorkerHandle *handle);
162 static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
163  BackgroundWorkerHandle *handle);
164 static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
165 static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
166 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
167 
168 /* Minimum queue size is enough for header and at least one chunk of data. */
170 MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
171 
172 #define MQH_INITIAL_BUFSIZE 8192
173 
174 /*
175  * Initialize a new shared message queue.
176  */
177 shm_mq *
178 shm_mq_create(void *address, Size size)
179 {
180  shm_mq *mq = address;
181  Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
182 
183  /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
184  size = MAXALIGN_DOWN(size);
185 
186  /* Queue size must be large enough to hold some data. */
187  Assert(size > data_offset);
188 
189  /* Initialize queue header. */
190  SpinLockInit(&mq->mq_mutex);
191  mq->mq_receiver = NULL;
192  mq->mq_sender = NULL;
195  mq->mq_ring_size = size - data_offset;
196  mq->mq_detached = false;
197  mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
198 
199  return mq;
200 }
201 
202 /*
203  * Set the identity of the process that will receive from a shared message
204  * queue.
205  */
206 void
208 {
209  PGPROC *sender;
210 
212  Assert(mq->mq_receiver == NULL);
213  mq->mq_receiver = proc;
214  sender = mq->mq_sender;
216 
217  if (sender != NULL)
218  SetLatch(&sender->procLatch);
219 }
220 
221 /*
222  * Set the identity of the process that will send to a shared message queue.
223  */
224 void
226 {
227  PGPROC *receiver;
228 
230  Assert(mq->mq_sender == NULL);
231  mq->mq_sender = proc;
232  receiver = mq->mq_receiver;
234 
235  if (receiver != NULL)
236  SetLatch(&receiver->procLatch);
237 }
238 
239 /*
240  * Get the configured receiver.
241  */
242 PGPROC *
244 {
245  PGPROC *receiver;
246 
248  receiver = mq->mq_receiver;
250 
251  return receiver;
252 }
253 
254 /*
255  * Get the configured sender.
256  */
257 PGPROC *
259 {
260  PGPROC *sender;
261 
263  sender = mq->mq_sender;
265 
266  return sender;
267 }
268 
269 /*
270  * Attach to a shared message queue so we can send or receive messages.
271  *
272  * The memory context in effect at the time this function is called should
273  * be one which will last for at least as long as the message queue itself.
274  * We'll allocate the handle in that context, and future allocations that
275  * are needed to buffer incoming data will happen in that context as well.
276  *
277  * If seg != NULL, the queue will be automatically detached when that dynamic
278  * shared memory segment is detached.
279  *
280  * If handle != NULL, the queue can be read or written even before the
281  * other process has attached. We'll wait for it to do so if needed. The
282  * handle must be for a background worker initialized with bgw_notify_pid
283  * equal to our PID.
284  *
285  * shm_mq_detach() should be called when done. This will free the
286  * shm_mq_handle and mark the queue itself as detached, so that our
287  * counterpart won't get stuck waiting for us to fill or drain the queue
288  * after we've already lost interest.
289  */
292 {
293  shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
294 
295  Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
296  mqh->mqh_queue = mq;
297  mqh->mqh_segment = seg;
298  mqh->mqh_handle = handle;
299  mqh->mqh_buffer = NULL;
300  mqh->mqh_buflen = 0;
301  mqh->mqh_consume_pending = 0;
302  mqh->mqh_send_pending = 0;
303  mqh->mqh_partial_bytes = 0;
304  mqh->mqh_expected_bytes = 0;
305  mqh->mqh_length_word_complete = false;
306  mqh->mqh_counterparty_attached = false;
308 
309  if (seg != NULL)
311 
312  return mqh;
313 }
314 
315 /*
316  * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
317  * been passed to shm_mq_attach.
318  */
319 void
321 {
322  Assert(mqh->mqh_handle == NULL);
323  mqh->mqh_handle = handle;
324 }
325 
326 /*
327  * Write a message into a shared message queue.
328  */
330 shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait,
331  bool force_flush)
332 {
333  shm_mq_iovec iov;
334 
335  iov.data = data;
336  iov.len = nbytes;
337 
338  return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
339 }
340 
341 /*
342  * Write a message into a shared message queue, gathered from multiple
343  * addresses.
344  *
345  * When nowait = false, we'll wait on our process latch when the ring buffer
346  * fills up, and then continue writing once the receiver has drained some data.
347  * The process latch is reset after each wait.
348  *
349  * When nowait = true, we do not manipulate the state of the process latch;
350  * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
351  * this case, the caller should call this function again, with the same
352  * arguments, each time the process latch is set. (Once begun, the sending
353  * of a message cannot be aborted except by detaching from the queue; changing
354  * the length or payload will corrupt the queue.)
355  *
356  * When force_flush = true, we immediately update the shm_mq's mq_bytes_written
357  * and notify the receiver (if it is already attached). Otherwise, we don't
358  * update it until we have written an amount of data greater than 1/4th of the
359  * ring size.
360  */
362 shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
363  bool force_flush)
364 {
366  shm_mq *mq = mqh->mqh_queue;
367  PGPROC *receiver;
368  Size nbytes = 0;
369  Size bytes_written;
370  int i;
371  int which_iov = 0;
372  Size offset;
373 
374  Assert(mq->mq_sender == MyProc);
375 
376  /* Compute total size of write. */
377  for (i = 0; i < iovcnt; ++i)
378  nbytes += iov[i].len;
379 
380  /* Prevent writing messages overwhelming the receiver. */
381  if (nbytes > MaxAllocSize)
382  ereport(ERROR,
383  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
384  errmsg("cannot send a message of size %zu via shared memory queue",
385  nbytes)));
386 
387  /* Try to write, or finish writing, the length word into the buffer. */
388  while (!mqh->mqh_length_word_complete)
389  {
390  Assert(mqh->mqh_partial_bytes < sizeof(Size));
391  res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
392  ((char *) &nbytes) + mqh->mqh_partial_bytes,
393  nowait, &bytes_written);
394 
395  if (res == SHM_MQ_DETACHED)
396  {
397  /* Reset state in case caller tries to send another message. */
398  mqh->mqh_partial_bytes = 0;
399  mqh->mqh_length_word_complete = false;
400  return res;
401  }
402  mqh->mqh_partial_bytes += bytes_written;
403 
404  if (mqh->mqh_partial_bytes >= sizeof(Size))
405  {
406  Assert(mqh->mqh_partial_bytes == sizeof(Size));
407 
408  mqh->mqh_partial_bytes = 0;
409  mqh->mqh_length_word_complete = true;
410  }
411 
412  if (res != SHM_MQ_SUCCESS)
413  return res;
414 
415  /* Length word can't be split unless bigger than required alignment. */
416  Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
417  }
418 
419  /* Write the actual data bytes into the buffer. */
420  Assert(mqh->mqh_partial_bytes <= nbytes);
421  offset = mqh->mqh_partial_bytes;
422  do
423  {
424  Size chunksize;
425 
426  /* Figure out which bytes need to be sent next. */
427  if (offset >= iov[which_iov].len)
428  {
429  offset -= iov[which_iov].len;
430  ++which_iov;
431  if (which_iov >= iovcnt)
432  break;
433  continue;
434  }
435 
436  /*
437  * We want to avoid copying the data if at all possible, but every
438  * chunk of bytes we write into the queue has to be MAXALIGN'd, except
439  * the last. Thus, if a chunk other than the last one ends on a
440  * non-MAXALIGN'd boundary, we have to combine the tail end of its
441  * data with data from one or more following chunks until we either
442  * reach the last chunk or accumulate a number of bytes which is
443  * MAXALIGN'd.
444  */
445  if (which_iov + 1 < iovcnt &&
446  offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
447  {
448  char tmpbuf[MAXIMUM_ALIGNOF];
449  int j = 0;
450 
451  for (;;)
452  {
453  if (offset < iov[which_iov].len)
454  {
455  tmpbuf[j] = iov[which_iov].data[offset];
456  j++;
457  offset++;
458  if (j == MAXIMUM_ALIGNOF)
459  break;
460  }
461  else
462  {
463  offset -= iov[which_iov].len;
464  which_iov++;
465  if (which_iov >= iovcnt)
466  break;
467  }
468  }
469 
470  res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
471 
472  if (res == SHM_MQ_DETACHED)
473  {
474  /* Reset state in case caller tries to send another message. */
475  mqh->mqh_partial_bytes = 0;
476  mqh->mqh_length_word_complete = false;
477  return res;
478  }
479 
480  mqh->mqh_partial_bytes += bytes_written;
481  if (res != SHM_MQ_SUCCESS)
482  return res;
483  continue;
484  }
485 
486  /*
487  * If this is the last chunk, we can write all the data, even if it
488  * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
489  * MAXALIGN_DOWN the write size.
490  */
491  chunksize = iov[which_iov].len - offset;
492  if (which_iov + 1 < iovcnt)
493  chunksize = MAXALIGN_DOWN(chunksize);
494  res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
495  nowait, &bytes_written);
496 
497  if (res == SHM_MQ_DETACHED)
498  {
499  /* Reset state in case caller tries to send another message. */
500  mqh->mqh_length_word_complete = false;
501  mqh->mqh_partial_bytes = 0;
502  return res;
503  }
504 
505  mqh->mqh_partial_bytes += bytes_written;
506  offset += bytes_written;
507  if (res != SHM_MQ_SUCCESS)
508  return res;
509  } while (mqh->mqh_partial_bytes < nbytes);
510 
511  /* Reset for next message. */
512  mqh->mqh_partial_bytes = 0;
513  mqh->mqh_length_word_complete = false;
514 
515  /* If queue has been detached, let caller know. */
516  if (mq->mq_detached)
517  return SHM_MQ_DETACHED;
518 
519  /*
520  * If the counterparty is known to have attached, we can read mq_receiver
521  * without acquiring the spinlock. Otherwise, more caution is needed.
522  */
523  if (mqh->mqh_counterparty_attached)
524  receiver = mq->mq_receiver;
525  else
526  {
528  receiver = mq->mq_receiver;
530  if (receiver != NULL)
531  mqh->mqh_counterparty_attached = true;
532  }
533 
534  /*
535  * If the caller has requested force flush or we have written more than
536  * 1/4 of the ring size, mark it as written in shared memory and notify
537  * the receiver.
538  */
539  if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
540  {
542  if (receiver != NULL)
543  SetLatch(&receiver->procLatch);
544  mqh->mqh_send_pending = 0;
545  }
546 
547  return SHM_MQ_SUCCESS;
548 }
549 
550 /*
551  * Receive a message from a shared message queue.
552  *
553  * We set *nbytes to the message length and *data to point to the message
554  * payload. If the entire message exists in the queue as a single,
555  * contiguous chunk, *data will point directly into shared memory; otherwise,
556  * it will point to a temporary buffer. This mostly avoids data copying in
557  * the hoped-for case where messages are short compared to the buffer size,
558  * while still allowing longer messages. In either case, the return value
559  * remains valid until the next receive operation is performed on the queue.
560  *
561  * When nowait = false, we'll wait on our process latch when the ring buffer
562  * is empty and we have not yet received a full message. The sender will
563  * set our process latch after more data has been written, and we'll resume
564  * processing. Each call will therefore return a complete message
565  * (unless the sender detaches the queue).
566  *
567  * When nowait = true, we do not manipulate the state of the process latch;
568  * instead, whenever the buffer is empty and we need to read from it, we
569  * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
570  * function again after the process latch has been set.
571  */
573 shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
574 {
575  shm_mq *mq = mqh->mqh_queue;
577  Size rb = 0;
578  Size nbytes;
579  void *rawdata;
580 
581  Assert(mq->mq_receiver == MyProc);
582 
583  /* We can't receive data until the sender has attached. */
584  if (!mqh->mqh_counterparty_attached)
585  {
586  if (nowait)
587  {
588  int counterparty_gone;
589 
590  /*
591  * We shouldn't return at this point at all unless the sender
592  * hasn't attached yet. However, the correct return value depends
593  * on whether the sender is still attached. If we first test
594  * whether the sender has ever attached and then test whether the
595  * sender has detached, there's a race condition: a sender that
596  * attaches and detaches very quickly might fool us into thinking
597  * the sender never attached at all. So, test whether our
598  * counterparty is definitively gone first, and only afterwards
599  * check whether the sender ever attached in the first place.
600  */
601  counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
602  if (shm_mq_get_sender(mq) == NULL)
603  {
604  if (counterparty_gone)
605  return SHM_MQ_DETACHED;
606  else
607  return SHM_MQ_WOULD_BLOCK;
608  }
609  }
610  else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
611  && shm_mq_get_sender(mq) == NULL)
612  {
613  mq->mq_detached = true;
614  return SHM_MQ_DETACHED;
615  }
616  mqh->mqh_counterparty_attached = true;
617  }
618 
619  /*
620  * If we've consumed an amount of data greater than 1/4th of the ring
621  * size, mark it consumed in shared memory. We try to avoid doing this
622  * unnecessarily when only a small amount of data has been consumed,
623  * because SetLatch() is fairly expensive and we don't want to do it too
624  * often.
625  */
626  if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
627  {
629  mqh->mqh_consume_pending = 0;
630  }
631 
632  /* Try to read, or finish reading, the length word from the buffer. */
633  while (!mqh->mqh_length_word_complete)
634  {
635  /* Try to receive the message length word. */
636  Assert(mqh->mqh_partial_bytes < sizeof(Size));
637  res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
638  nowait, &rb, &rawdata);
639  if (res != SHM_MQ_SUCCESS)
640  return res;
641 
642  /*
643  * Hopefully, we'll receive the entire message length word at once.
644  * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
645  * multiple reads.
646  */
647  if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
648  {
649  Size needed;
650 
651  nbytes = *(Size *) rawdata;
652 
653  /* If we've already got the whole message, we're done. */
654  needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
655  if (rb >= needed)
656  {
657  mqh->mqh_consume_pending += needed;
658  *nbytesp = nbytes;
659  *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
660  return SHM_MQ_SUCCESS;
661  }
662 
663  /*
664  * We don't have the whole message, but we at least have the whole
665  * length word.
666  */
667  mqh->mqh_expected_bytes = nbytes;
668  mqh->mqh_length_word_complete = true;
669  mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
670  rb -= MAXALIGN(sizeof(Size));
671  }
672  else
673  {
674  Size lengthbytes;
675 
676  /* Can't be split unless bigger than required alignment. */
677  Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
678 
679  /* Message word is split; need buffer to reassemble. */
680  if (mqh->mqh_buffer == NULL)
681  {
685  }
686  Assert(mqh->mqh_buflen >= sizeof(Size));
687 
688  /* Copy partial length word; remember to consume it. */
689  if (mqh->mqh_partial_bytes + rb > sizeof(Size))
690  lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
691  else
692  lengthbytes = rb;
693  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
694  lengthbytes);
695  mqh->mqh_partial_bytes += lengthbytes;
696  mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
697  rb -= lengthbytes;
698 
699  /* If we now have the whole word, we're ready to read payload. */
700  if (mqh->mqh_partial_bytes >= sizeof(Size))
701  {
702  Assert(mqh->mqh_partial_bytes == sizeof(Size));
703  mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
704  mqh->mqh_length_word_complete = true;
705  mqh->mqh_partial_bytes = 0;
706  }
707  }
708  }
709  nbytes = mqh->mqh_expected_bytes;
710 
711  /*
712  * Should be disallowed on the sending side already, but better check and
713  * error out on the receiver side as well rather than trying to read a
714  * prohibitively large message.
715  */
716  if (nbytes > MaxAllocSize)
717  ereport(ERROR,
718  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
719  errmsg("invalid message size %zu in shared memory queue",
720  nbytes)));
721 
722  if (mqh->mqh_partial_bytes == 0)
723  {
724  /*
725  * Try to obtain the whole message in a single chunk. If this works,
726  * we need not copy the data and can return a pointer directly into
727  * shared memory.
728  */
729  res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
730  if (res != SHM_MQ_SUCCESS)
731  return res;
732  if (rb >= nbytes)
733  {
734  mqh->mqh_length_word_complete = false;
735  mqh->mqh_consume_pending += MAXALIGN(nbytes);
736  *nbytesp = nbytes;
737  *datap = rawdata;
738  return SHM_MQ_SUCCESS;
739  }
740 
741  /*
742  * The message has wrapped the buffer. We'll need to copy it in order
743  * to return it to the client in one chunk. First, make sure we have
744  * a large enough buffer available.
745  */
746  if (mqh->mqh_buflen < nbytes)
747  {
748  Size newbuflen;
749 
750  /*
751  * Increase size to the next power of 2 that's >= nbytes, but
752  * limit to MaxAllocSize.
753  */
754  newbuflen = pg_nextpower2_size_t(nbytes);
755  newbuflen = Min(newbuflen, MaxAllocSize);
756 
757  if (mqh->mqh_buffer != NULL)
758  {
759  pfree(mqh->mqh_buffer);
760  mqh->mqh_buffer = NULL;
761  mqh->mqh_buflen = 0;
762  }
763  mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
764  mqh->mqh_buflen = newbuflen;
765  }
766  }
767 
768  /* Loop until we've copied the entire message. */
769  for (;;)
770  {
771  Size still_needed;
772 
773  /* Copy as much as we can. */
774  Assert(mqh->mqh_partial_bytes + rb <= nbytes);
775  if (rb > 0)
776  {
777  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
778  mqh->mqh_partial_bytes += rb;
779  }
780 
781  /*
782  * Update count of bytes that can be consumed, accounting for
783  * alignment padding. Note that this will never actually insert any
784  * padding except at the end of a message, because the buffer size is
785  * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
786  */
787  Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
788  mqh->mqh_consume_pending += MAXALIGN(rb);
789 
790  /* If we got all the data, exit the loop. */
791  if (mqh->mqh_partial_bytes >= nbytes)
792  break;
793 
794  /* Wait for some more data. */
795  still_needed = nbytes - mqh->mqh_partial_bytes;
796  res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
797  if (res != SHM_MQ_SUCCESS)
798  return res;
799  if (rb > still_needed)
800  rb = still_needed;
801  }
802 
803  /* Return the complete message, and reset for next message. */
804  *nbytesp = nbytes;
805  *datap = mqh->mqh_buffer;
806  mqh->mqh_length_word_complete = false;
807  mqh->mqh_partial_bytes = 0;
808  return SHM_MQ_SUCCESS;
809 }
810 
811 /*
812  * Wait for the other process that's supposed to use this queue to attach
813  * to it.
814  *
815  * The return value is SHM_MQ_DETACHED if the worker has already detached or
816  * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
817  * Note that we will only be able to detect that the worker has died before
818  * attaching if a background worker handle was passed to shm_mq_attach().
819  */
822 {
823  shm_mq *mq = mqh->mqh_queue;
824  PGPROC **victim;
825 
826  if (shm_mq_get_receiver(mq) == MyProc)
827  victim = &mq->mq_sender;
828  else
829  {
831  victim = &mq->mq_receiver;
832  }
833 
834  if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
835  return SHM_MQ_SUCCESS;
836  else
837  return SHM_MQ_DETACHED;
838 }
839 
840 /*
841  * Detach from a shared message queue, and destroy the shm_mq_handle.
842  */
843 void
845 {
846  /* Before detaching, notify the receiver about any already-written data. */
847  if (mqh->mqh_send_pending > 0)
848  {
850  mqh->mqh_send_pending = 0;
851  }
852 
853  /* Notify counterparty that we're outta here. */
855 
856  /* Cancel on_dsm_detach callback, if any. */
857  if (mqh->mqh_segment)
860  PointerGetDatum(mqh->mqh_queue));
861 
862  /* Release local memory associated with handle. */
863  if (mqh->mqh_buffer != NULL)
864  pfree(mqh->mqh_buffer);
865  pfree(mqh);
866 }
867 
868 /*
869  * Notify counterparty that we're detaching from shared message queue.
870  *
871  * The purpose of this function is to make sure that the process
872  * with which we're communicating doesn't block forever waiting for us to
873  * fill or drain the queue once we've lost interest. When the sender
874  * detaches, the receiver can read any messages remaining in the queue;
875  * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
876  * further attempts to send messages will likewise return SHM_MQ_DETACHED.
877  *
878  * This is separated out from shm_mq_detach() because if the on_dsm_detach
879  * callback fires, we only want to do this much. We do not try to touch
880  * the local shm_mq_handle, as it may have been pfree'd already.
881  */
882 static void
884 {
885  PGPROC *victim;
886 
888  if (mq->mq_sender == MyProc)
889  victim = mq->mq_receiver;
890  else
891  {
892  Assert(mq->mq_receiver == MyProc);
893  victim = mq->mq_sender;
894  }
895  mq->mq_detached = true;
897 
898  if (victim != NULL)
899  SetLatch(&victim->procLatch);
900 }
901 
902 /*
903  * Get the shm_mq from handle.
904  */
905 shm_mq *
907 {
908  return mqh->mqh_queue;
909 }
910 
911 /*
912  * Write bytes into a shared message queue.
913  */
914 static shm_mq_result
915 shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
916  bool nowait, Size *bytes_written)
917 {
918  shm_mq *mq = mqh->mqh_queue;
919  Size sent = 0;
920  uint64 used;
921  Size ringsize = mq->mq_ring_size;
922  Size available;
923 
924  while (sent < nbytes)
925  {
926  uint64 rb;
927  uint64 wb;
928 
929  /* Compute number of ring buffer bytes used and available. */
932  Assert(wb >= rb);
933  used = wb - rb;
934  Assert(used <= ringsize);
935  available = Min(ringsize - used, nbytes - sent);
936 
937  /*
938  * Bail out if the queue has been detached. Note that we would be in
939  * trouble if the compiler decided to cache the value of
940  * mq->mq_detached in a register or on the stack across loop
941  * iterations. It probably shouldn't do that anyway since we'll
942  * always return, call an external function that performs a system
943  * call, or reach a memory barrier at some point later in the loop,
944  * but just to be sure, insert a compiler barrier here.
945  */
947  if (mq->mq_detached)
948  {
949  *bytes_written = sent;
950  return SHM_MQ_DETACHED;
951  }
952 
953  if (available == 0 && !mqh->mqh_counterparty_attached)
954  {
955  /*
956  * The queue is full, so if the receiver isn't yet known to be
957  * attached, we must wait for that to happen.
958  */
959  if (nowait)
960  {
961  if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
962  {
963  *bytes_written = sent;
964  return SHM_MQ_DETACHED;
965  }
966  if (shm_mq_get_receiver(mq) == NULL)
967  {
968  *bytes_written = sent;
969  return SHM_MQ_WOULD_BLOCK;
970  }
971  }
972  else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
973  mqh->mqh_handle))
974  {
975  mq->mq_detached = true;
976  *bytes_written = sent;
977  return SHM_MQ_DETACHED;
978  }
979  mqh->mqh_counterparty_attached = true;
980 
981  /*
982  * The receiver may have read some data after attaching, so we
983  * must not wait without rechecking the queue state.
984  */
985  }
986  else if (available == 0)
987  {
988  /* Update the pending send bytes in the shared memory. */
990 
991  /*
992  * Since mq->mqh_counterparty_attached is known to be true at this
993  * point, mq_receiver has been set, and it can't change once set.
994  * Therefore, we can read it without acquiring the spinlock.
995  */
998 
999  /*
1000  * We have just updated the mqh_send_pending bytes in the shared
1001  * memory so reset it.
1002  */
1003  mqh->mqh_send_pending = 0;
1004 
1005  /* Skip manipulation of our latch if nowait = true. */
1006  if (nowait)
1007  {
1008  *bytes_written = sent;
1009  return SHM_MQ_WOULD_BLOCK;
1010  }
1011 
1012  /*
1013  * Wait for our latch to be set. It might already be set for some
1014  * unrelated reason, but that'll just result in one extra trip
1015  * through the loop. It's worth it to avoid resetting the latch
1016  * at top of loop, because setting an already-set latch is much
1017  * cheaper than setting one that has been reset.
1018  */
1020  WAIT_EVENT_MESSAGE_QUEUE_SEND);
1021 
1022  /* Reset the latch so we don't spin. */
1024 
1025  /* An interrupt may have occurred while we were waiting. */
1027  }
1028  else
1029  {
1030  Size offset;
1031  Size sendnow;
1032 
1033  offset = wb % (uint64) ringsize;
1034  sendnow = Min(available, ringsize - offset);
1035 
1036  /*
1037  * Write as much data as we can via a single memcpy(). Make sure
1038  * these writes happen after the read of mq_bytes_read, above.
1039  * This barrier pairs with the one in shm_mq_inc_bytes_read.
1040  * (Since we're separating the read of mq_bytes_read from a
1041  * subsequent write to mq_ring, we need a full barrier here.)
1042  */
1044  memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
1045  (char *) data + sent, sendnow);
1046  sent += sendnow;
1047 
1048  /*
1049  * Update count of bytes written, with alignment padding. Note
1050  * that this will never actually insert any padding except at the
1051  * end of a run of bytes, because the buffer size is a multiple of
1052  * MAXIMUM_ALIGNOF, and each read is as well.
1053  */
1054  Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
1055 
1056  /*
1057  * For efficiency, we don't update the bytes written in the shared
1058  * memory and also don't set the reader's latch here. Refer to
1059  * the comments atop the shm_mq_handle structure for more
1060  * information.
1061  */
1062  mqh->mqh_send_pending += MAXALIGN(sendnow);
1063  }
1064  }
1065 
1066  *bytes_written = sent;
1067  return SHM_MQ_SUCCESS;
1068 }
1069 
1070 /*
1071  * Wait until at least *nbytesp bytes are available to be read from the
1072  * shared message queue, or until the buffer wraps around. If the queue is
1073  * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
1074  * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
1075  * to the location at which data bytes can be read, *nbytesp is set to the
1076  * number of bytes which can be read at that address, and the return value
1077  * is SHM_MQ_SUCCESS.
1078  */
1079 static shm_mq_result
1080 shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
1081  Size *nbytesp, void **datap)
1082 {
1083  shm_mq *mq = mqh->mqh_queue;
1084  Size ringsize = mq->mq_ring_size;
1085  uint64 used;
1086  uint64 written;
1087 
1088  for (;;)
1089  {
1090  Size offset;
1091  uint64 read;
1092 
1093  /* Get bytes written, so we can compute what's available to read. */
1094  written = pg_atomic_read_u64(&mq->mq_bytes_written);
1095 
1096  /*
1097  * Get bytes read. Include bytes we could consume but have not yet
1098  * consumed.
1099  */
1101  mqh->mqh_consume_pending;
1102  used = written - read;
1103  Assert(used <= ringsize);
1104  offset = read % (uint64) ringsize;
1105 
1106  /* If we have enough data or buffer has wrapped, we're done. */
1107  if (used >= bytes_needed || offset + used >= ringsize)
1108  {
1109  *nbytesp = Min(used, ringsize - offset);
1110  *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1111 
1112  /*
1113  * Separate the read of mq_bytes_written, above, from caller's
1114  * attempt to read the data itself. Pairs with the barrier in
1115  * shm_mq_inc_bytes_written.
1116  */
1117  pg_read_barrier();
1118  return SHM_MQ_SUCCESS;
1119  }
1120 
1121  /*
1122  * Fall out before waiting if the queue has been detached.
1123  *
1124  * Note that we don't check for this until *after* considering whether
1125  * the data already available is enough, since the receiver can finish
1126  * receiving a message stored in the buffer even after the sender has
1127  * detached.
1128  */
1129  if (mq->mq_detached)
1130  {
1131  /*
1132  * If the writer advanced mq_bytes_written and then set
1133  * mq_detached, we might not have read the final value of
1134  * mq_bytes_written above. Insert a read barrier and then check
1135  * again if mq_bytes_written has advanced.
1136  */
1137  pg_read_barrier();
1138  if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
1139  continue;
1140 
1141  return SHM_MQ_DETACHED;
1142  }
1143 
1144  /*
1145  * We didn't get enough data to satisfy the request, so mark any data
1146  * previously-consumed as read to make more buffer space.
1147  */
1148  if (mqh->mqh_consume_pending > 0)
1149  {
1151  mqh->mqh_consume_pending = 0;
1152  }
1153 
1154  /* Skip manipulation of our latch if nowait = true. */
1155  if (nowait)
1156  return SHM_MQ_WOULD_BLOCK;
1157 
1158  /*
1159  * Wait for our latch to be set. It might already be set for some
1160  * unrelated reason, but that'll just result in one extra trip through
1161  * the loop. It's worth it to avoid resetting the latch at top of
1162  * loop, because setting an already-set latch is much cheaper than
1163  * setting one that has been reset.
1164  */
1166  WAIT_EVENT_MESSAGE_QUEUE_RECEIVE);
1167 
1168  /* Reset the latch so we don't spin. */
1170 
1171  /* An interrupt may have occurred while we were waiting. */
1173  }
1174 }
1175 
1176 /*
1177  * Test whether a counterparty who may not even be alive yet is definitely gone.
1178  */
1179 static bool
1181 {
1182  pid_t pid;
1183 
1184  /* If the queue has been detached, counterparty is definitely gone. */
1185  if (mq->mq_detached)
1186  return true;
1187 
1188  /* If there's a handle, check worker status. */
1189  if (handle != NULL)
1190  {
1191  BgwHandleStatus status;
1192 
1193  /* Check for unexpected worker death. */
1194  status = GetBackgroundWorkerPid(handle, &pid);
1195  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1196  {
1197  /* Mark it detached, just to make it official. */
1198  mq->mq_detached = true;
1199  return true;
1200  }
1201  }
1202 
1203  /* Counterparty is not definitively gone. */
1204  return false;
1205 }
1206 
1207 /*
1208  * This is used when a process is waiting for its counterpart to attach to the
1209  * queue. We exit when the other process attaches as expected, or, if
1210  * handle != NULL, when the referenced background process or the postmaster
1211  * dies. Note that if handle == NULL, and the process fails to attach, we'll
1212  * potentially get stuck here forever waiting for a process that may never
1213  * start. We do check for interrupts, though.
1214  *
1215  * ptr is a pointer to the memory address that we're expecting to become
1216  * non-NULL when our counterpart attaches to the queue.
1217  */
1218 static bool
1220 {
1221  bool result = false;
1222 
1223  for (;;)
1224  {
1225  BgwHandleStatus status;
1226  pid_t pid;
1227 
1228  /* Acquire the lock just long enough to check the pointer. */
1229  SpinLockAcquire(&mq->mq_mutex);
1230  result = (*ptr != NULL);
1231  SpinLockRelease(&mq->mq_mutex);
1232 
1233  /* Fail if detached; else succeed if initialized. */
1234  if (mq->mq_detached)
1235  {
1236  result = false;
1237  break;
1238  }
1239  if (result)
1240  break;
1241 
1242  if (handle != NULL)
1243  {
1244  /* Check for unexpected worker death. */
1245  status = GetBackgroundWorkerPid(handle, &pid);
1246  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1247  {
1248  result = false;
1249  break;
1250  }
1251  }
1252 
1253  /* Wait to be signaled. */
1255  WAIT_EVENT_MESSAGE_QUEUE_INTERNAL);
1256 
1257  /* Reset the latch so we don't spin. */
1259 
1260  /* An interrupt may have occurred while we were waiting. */
1262  }
1263 
1264  return result;
1265 }
1266 
1267 /*
1268  * Increment the number of bytes read.
1269  */
1270 static void
1272 {
1273  PGPROC *sender;
1274 
1275  /*
1276  * Separate prior reads of mq_ring from the increment of mq_bytes_read
1277  * which follows. This pairs with the full barrier in
1278  * shm_mq_send_bytes(). We only need a read barrier here because the
1279  * increment of mq_bytes_read is actually a read followed by a dependent
1280  * write.
1281  */
1282  pg_read_barrier();
1283 
1284  /*
1285  * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1286  * else can be changing this value. This method should be cheaper.
1287  */
1289  pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1290 
1291  /*
1292  * We shouldn't have any bytes to read without a sender, so we can read
1293  * mq_sender here without a lock. Once it's initialized, it can't change.
1294  */
1295  sender = mq->mq_sender;
1296  Assert(sender != NULL);
1297  SetLatch(&sender->procLatch);
1298 }
1299 
1300 /*
1301  * Increment the number of bytes written.
1302  */
1303 static void
1305 {
1306  /*
1307  * Separate prior reads of mq_ring from the write of mq_bytes_written
1308  * which we're about to do. Pairs with the read barrier found in
1309  * shm_mq_receive_bytes.
1310  */
1311  pg_write_barrier();
1312 
1313  /*
1314  * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1315  * else can be changing this value. This method avoids taking the bus
1316  * lock unnecessarily.
1317  */
1320 }
1321 
1322 /* Shim for on_dsm_detach callback. */
1323 static void
1325 {
1326  shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1327 
1329 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:433
#define pg_memory_barrier()
Definition: atomics.h:140
#define pg_read_barrier()
Definition: atomics.h:153
#define pg_write_barrier()
Definition: atomics.h:154
#define pg_compiler_barrier()
Definition: atomics.h:128
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:410
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:424
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1098
BgwHandleStatus
Definition: bgworker.h:104
@ BGWH_STARTED
Definition: bgworker.h:105
@ BGWH_NOT_YET_STARTED
Definition: bgworker.h:106
#define MAXALIGN_DOWN(LEN)
Definition: c.h:812
#define Min(x, y)
Definition: c.h:993
#define MAXALIGN(LEN)
Definition: c.h:800
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:387
unsigned char uint8
Definition: c.h:493
size_t Size
Definition: c.h:594
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1133
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1148
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
struct Latch * MyLatch
Definition: globals.c:58
#define read(a, b, c)
Definition: win32.h:13
int j
Definition: isn.c:74
int i
Definition: isn.c:73
void SetLatch(Latch *latch)
Definition: latch.c:633
void ResetLatch(Latch *latch)
Definition: latch.c:725
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:518
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
Assert(fmt[strlen(fmt) - 1] !='\n')
void pfree(void *pointer)
Definition: mcxt.c:1456
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1021
void * palloc(Size size)
Definition: mcxt.c:1226
#define MaxAllocSize
Definition: memutils.h:40
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
void * arg
#define pg_nextpower2_size_t
Definition: pg_bitutils.h:339
const void size_t len
const void * data
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
int slock_t
Definition: s_lock.h:754
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:291
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition: shm_mq.c:915
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
Definition: shm_mq.c:906
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
Definition: shm_mq.c:1304
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:225
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:320
#define MQH_INITIAL_BUFSIZE
Definition: shm_mq.c:172
shm_mq * shm_mq_create(void *address, Size size)
Definition: shm_mq.c:178
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:844
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1324
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1180
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool force_flush)
Definition: shm_mq.c:362
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1219
static void shm_mq_detach_internal(shm_mq *mq)
Definition: shm_mq.c:883
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:243
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:207
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:258
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:573
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition: shm_mq.c:330
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
Definition: shm_mq.c:1271
shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh)
Definition: shm_mq.c:821
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition: shm_mq.c:1080
const Size shm_mq_minimum_size
Definition: shm_mq.c:169
shm_mq_result
Definition: shm_mq.h:37
@ SHM_MQ_SUCCESS
Definition: shm_mq.h:38
@ SHM_MQ_WOULD_BLOCK
Definition: shm_mq.h:39
@ SHM_MQ_DETACHED
Definition: shm_mq.h:40
#define SpinLockInit(lock)
Definition: spin.h:60
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
PGPROC * MyProc
Definition: proc.c:66
Definition: proc.h:162
Latch procLatch
Definition: proc.h:170
Size mqh_consume_pending
Definition: shm_mq.c:145
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:142
char * mqh_buffer
Definition: shm_mq.c:143
Size mqh_send_pending
Definition: shm_mq.c:146
Size mqh_expected_bytes
Definition: shm_mq.c:148
bool mqh_counterparty_attached
Definition: shm_mq.c:150
shm_mq * mqh_queue
Definition: shm_mq.c:140
dsm_segment * mqh_segment
Definition: shm_mq.c:141
bool mqh_length_word_complete
Definition: shm_mq.c:149
Size mqh_buflen
Definition: shm_mq.c:144
MemoryContext mqh_context
Definition: shm_mq.c:151
Size mqh_partial_bytes
Definition: shm_mq.c:147
const char * data
Definition: shm_mq.h:31
Size len
Definition: shm_mq.h:32
Definition: shm_mq.c:73
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:78
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:77
bool mq_detached
Definition: shm_mq.c:80
PGPROC * mq_sender
Definition: shm_mq.c:76
uint8 mq_ring_offset
Definition: shm_mq.c:81
slock_t mq_mutex
Definition: shm_mq.c:74
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
Definition: shm_mq.c:82
Size mq_ring_size
Definition: shm_mq.c:79
PGPROC * mq_receiver
Definition: shm_mq.c:75
static StringInfoData tmpbuf
Definition: walsender.c:160