PostgreSQL Source Code  git master
shm_mq.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * shm_mq.c
4  * single-reader, single-writer shared memory message queue
5  *
6  * Both the sender and the receiver must have a PGPROC; their respective
7  * process latches are used for synchronization. Only the sender may send,
8  * and only the receiver may receive. This is intended to allow a user
9  * backend to communicate with worker backends that it has registered.
10  *
11  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  * src/backend/storage/ipc/shm_mq.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "port/pg_bitutils.h"
24 #include "postmaster/bgworker.h"
25 #include "storage/shm_mq.h"
26 #include "storage/spin.h"
27 #include "utils/memutils.h"
28 
29 /*
30  * This structure represents the actual queue, stored in shared memory.
31  *
32  * Some notes on synchronization:
33  *
34  * mq_receiver and mq_bytes_read can only be changed by the receiver; and
35  * mq_sender and mq_bytes_written can only be changed by the sender.
36  * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
37  * they cannot change once set, and thus may be read without a lock once this
38  * is known to be the case.
39  *
40  * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead,
41  * they are written atomically using 8 byte loads and stores. Memory barriers
42  * must be carefully used to synchronize reads and writes of these values with
43  * reads and writes of the actual data in mq_ring.
44  *
45  * mq_detached needs no locking. It can be set by either the sender or the
46  * receiver, but only ever from false to true, so redundant writes don't
47  * matter. It is important that if we set mq_detached and then set the
48  * counterparty's latch, the counterparty must be certain to see the change
49  * after waking up. Since SetLatch begins with a memory barrier and ResetLatch
50  * ends with one, this should be OK.
51  *
52  * mq_ring_size and mq_ring_offset never change after initialization, and
53  * can therefore be read without the lock.
54  *
55  * Importantly, mq_ring can be safely read and written without a lock.
56  * At any given time, the difference between mq_bytes_read and
57  * mq_bytes_written defines the number of bytes within mq_ring that contain
58  * unread data, and mq_bytes_read defines the position where those bytes
59  * begin. The sender can increase the number of unread bytes at any time,
60  * but only the receiver can give license to overwrite those bytes, by
61  * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
62  * the unread bytes it knows to be present without the lock. Conversely,
63  * the sender can write to the unused portion of the ring buffer without
64  * the lock, because nobody else can be reading or writing those bytes. The
65  * receiver could be making more bytes unused by incrementing mq_bytes_read,
66  * but that's OK. Note that it would be unsafe for the receiver to read any
67  * data it's already marked as read, or to write any data; and it would be
68  * unsafe for the sender to reread any data after incrementing
69  * mq_bytes_written, but fortunately there's no need for any of that.
70  */
71 struct shm_mq
72 {
82 };
83 
84 /*
85  * This structure is a backend-private handle for access to a queue.
86  *
87  * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
88  * an optional pointer to the dynamic shared memory segment that contains it.
89  * (If mqh_segment is provided, we register an on_dsm_detach callback to
90  * make sure we detach from the queue before detaching from DSM.)
91  *
92  * If this queue is intended to connect the current process with a background
93  * worker that started it, the user can pass a pointer to the worker handle
94  * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
95  * is to allow us to begin sending to or receiving from that queue before the
96  * process we'll be communicating with has even been started. If it fails
97  * to start, the handle will allow us to notice that and fail cleanly, rather
98  * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
99  * simple cases - e.g. where there are just 2 processes communicating; in
100  * more complex scenarios, every process may not have a BackgroundWorkerHandle
101  * available, or may need to watch for the failure of more than one other
102  * process at a time.
103  *
104  * When a message exists as a contiguous chunk of bytes in the queue - that is,
105  * it is smaller than the size of the ring buffer and does not wrap around
106  * the end - we return the message to the caller as a pointer into the buffer.
107  * For messages that are larger or happen to wrap, we reassemble the message
108  * locally by copying the chunks into a backend-local buffer. mqh_buffer is
109  * the buffer, and mqh_buflen is the number of bytes allocated for it.
110  *
111  * mqh_send_pending, is number of bytes that is written to the queue but not
112  * yet updated in the shared memory. We will not update it until the written
113  * data is 1/4th of the ring size or the tuple queue is full. This will
114  * prevent frequent CPU cache misses, and it will also avoid frequent
115  * SetLatch() calls, which are quite expensive.
116  *
117  * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
118  * are used to track the state of non-blocking operations. When the caller
119  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
120  * are expected to retry the call at a later time with the same argument;
121  * we need to retain enough state to pick up where we left off.
122  * mqh_length_word_complete tracks whether we are done sending or receiving
123  * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
124  * the number of bytes read or written for either the length word or the
125  * message itself, and mqh_expected_bytes - which is used only for reads -
126  * tracks the expected total size of the payload.
127  *
128  * mqh_counterparty_attached tracks whether we know the counterparty to have
129  * attached to the queue at some previous point. This lets us avoid some
130  * mutex acquisitions.
131  *
132  * mqh_context is the memory context in effect at the time we attached to
133  * the shm_mq. The shm_mq_handle itself is allocated in this context, and
134  * we make sure any other allocations we do happen in this context as well,
135  * to avoid nasty surprises.
136  */
138 {
142  char *mqh_buffer;
151 };
152 
153 static void shm_mq_detach_internal(shm_mq *mq);
155  const void *data, bool nowait, Size *bytes_written);
157  Size bytes_needed, bool nowait, Size *nbytesp,
158  void **datap);
159 static bool shm_mq_counterparty_gone(shm_mq *mq,
160  BackgroundWorkerHandle *handle);
161 static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
162  BackgroundWorkerHandle *handle);
163 static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
164 static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
165 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
166 
167 /* Minimum queue size is enough for header and at least one chunk of data. */
169 MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
170 
171 #define MQH_INITIAL_BUFSIZE 8192
172 
173 /*
174  * Initialize a new shared message queue.
175  */
176 shm_mq *
177 shm_mq_create(void *address, Size size)
178 {
179  shm_mq *mq = address;
180  Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
181 
182  /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
184 
185  /* Queue size must be large enough to hold some data. */
186  Assert(size > data_offset);
187 
188  /* Initialize queue header. */
189  SpinLockInit(&mq->mq_mutex);
190  mq->mq_receiver = NULL;
191  mq->mq_sender = NULL;
194  mq->mq_ring_size = size - data_offset;
195  mq->mq_detached = false;
196  mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
197 
198  return mq;
199 }
200 
201 /*
202  * Set the identity of the process that will receive from a shared message
203  * queue.
204  */
205 void
207 {
208  PGPROC *sender;
209 
211  Assert(mq->mq_receiver == NULL);
212  mq->mq_receiver = proc;
213  sender = mq->mq_sender;
215 
216  if (sender != NULL)
217  SetLatch(&sender->procLatch);
218 }
219 
220 /*
221  * Set the identity of the process that will send to a shared message queue.
222  */
223 void
225 {
226  PGPROC *receiver;
227 
229  Assert(mq->mq_sender == NULL);
230  mq->mq_sender = proc;
231  receiver = mq->mq_receiver;
233 
234  if (receiver != NULL)
235  SetLatch(&receiver->procLatch);
236 }
237 
238 /*
239  * Get the configured receiver.
240  */
241 PGPROC *
243 {
244  PGPROC *receiver;
245 
247  receiver = mq->mq_receiver;
249 
250  return receiver;
251 }
252 
253 /*
254  * Get the configured sender.
255  */
256 PGPROC *
258 {
259  PGPROC *sender;
260 
262  sender = mq->mq_sender;
264 
265  return sender;
266 }
267 
268 /*
269  * Attach to a shared message queue so we can send or receive messages.
270  *
271  * The memory context in effect at the time this function is called should
272  * be one which will last for at least as long as the message queue itself.
273  * We'll allocate the handle in that context, and future allocations that
274  * are needed to buffer incoming data will happen in that context as well.
275  *
276  * If seg != NULL, the queue will be automatically detached when that dynamic
277  * shared memory segment is detached.
278  *
279  * If handle != NULL, the queue can be read or written even before the
280  * other process has attached. We'll wait for it to do so if needed. The
281  * handle must be for a background worker initialized with bgw_notify_pid
282  * equal to our PID.
283  *
284  * shm_mq_detach() should be called when done. This will free the
285  * shm_mq_handle and mark the queue itself as detached, so that our
286  * counterpart won't get stuck waiting for us to fill or drain the queue
287  * after we've already lost interest.
288  */
291 {
292  shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
293 
294  Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
295  mqh->mqh_queue = mq;
296  mqh->mqh_segment = seg;
297  mqh->mqh_handle = handle;
298  mqh->mqh_buffer = NULL;
299  mqh->mqh_buflen = 0;
300  mqh->mqh_consume_pending = 0;
301  mqh->mqh_send_pending = 0;
302  mqh->mqh_partial_bytes = 0;
303  mqh->mqh_expected_bytes = 0;
304  mqh->mqh_length_word_complete = false;
305  mqh->mqh_counterparty_attached = false;
307 
308  if (seg != NULL)
310 
311  return mqh;
312 }
313 
314 /*
315  * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
316  * been passed to shm_mq_attach.
317  */
318 void
320 {
321  Assert(mqh->mqh_handle == NULL);
322  mqh->mqh_handle = handle;
323 }
324 
325 /*
326  * Write a message into a shared message queue.
327  */
329 shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait,
330  bool force_flush)
331 {
332  shm_mq_iovec iov;
333 
334  iov.data = data;
335  iov.len = nbytes;
336 
337  return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
338 }
339 
340 /*
341  * Write a message into a shared message queue, gathered from multiple
342  * addresses.
343  *
344  * When nowait = false, we'll wait on our process latch when the ring buffer
345  * fills up, and then continue writing once the receiver has drained some data.
346  * The process latch is reset after each wait.
347  *
348  * When nowait = true, we do not manipulate the state of the process latch;
349  * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
350  * this case, the caller should call this function again, with the same
351  * arguments, each time the process latch is set. (Once begun, the sending
352  * of a message cannot be aborted except by detaching from the queue; changing
353  * the length or payload will corrupt the queue.)
354  *
355  * When force_flush = true, we immediately update the shm_mq's mq_bytes_written
356  * and notify the receiver (if it is already attached). Otherwise, we don't
357  * update it until we have written an amount of data greater than 1/4th of the
358  * ring size.
359  */
361 shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
362  bool force_flush)
363 {
365  shm_mq *mq = mqh->mqh_queue;
366  PGPROC *receiver;
367  Size nbytes = 0;
368  Size bytes_written;
369  int i;
370  int which_iov = 0;
371  Size offset;
372 
373  Assert(mq->mq_sender == MyProc);
374 
375  /* Compute total size of write. */
376  for (i = 0; i < iovcnt; ++i)
377  nbytes += iov[i].len;
378 
379  /* Prevent writing messages overwhelming the receiver. */
380  if (nbytes > MaxAllocSize)
381  ereport(ERROR,
382  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
383  errmsg("cannot send a message of size %zu via shared memory queue",
384  nbytes)));
385 
386  /* Try to write, or finish writing, the length word into the buffer. */
387  while (!mqh->mqh_length_word_complete)
388  {
389  Assert(mqh->mqh_partial_bytes < sizeof(Size));
390  res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
391  ((char *) &nbytes) + mqh->mqh_partial_bytes,
392  nowait, &bytes_written);
393 
394  if (res == SHM_MQ_DETACHED)
395  {
396  /* Reset state in case caller tries to send another message. */
397  mqh->mqh_partial_bytes = 0;
398  mqh->mqh_length_word_complete = false;
399  return res;
400  }
401  mqh->mqh_partial_bytes += bytes_written;
402 
403  if (mqh->mqh_partial_bytes >= sizeof(Size))
404  {
405  Assert(mqh->mqh_partial_bytes == sizeof(Size));
406 
407  mqh->mqh_partial_bytes = 0;
408  mqh->mqh_length_word_complete = true;
409  }
410 
411  if (res != SHM_MQ_SUCCESS)
412  return res;
413 
414  /* Length word can't be split unless bigger than required alignment. */
415  Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
416  }
417 
418  /* Write the actual data bytes into the buffer. */
419  Assert(mqh->mqh_partial_bytes <= nbytes);
420  offset = mqh->mqh_partial_bytes;
421  do
422  {
423  Size chunksize;
424 
425  /* Figure out which bytes need to be sent next. */
426  if (offset >= iov[which_iov].len)
427  {
428  offset -= iov[which_iov].len;
429  ++which_iov;
430  if (which_iov >= iovcnt)
431  break;
432  continue;
433  }
434 
435  /*
436  * We want to avoid copying the data if at all possible, but every
437  * chunk of bytes we write into the queue has to be MAXALIGN'd, except
438  * the last. Thus, if a chunk other than the last one ends on a
439  * non-MAXALIGN'd boundary, we have to combine the tail end of its
440  * data with data from one or more following chunks until we either
441  * reach the last chunk or accumulate a number of bytes which is
442  * MAXALIGN'd.
443  */
444  if (which_iov + 1 < iovcnt &&
445  offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
446  {
447  char tmpbuf[MAXIMUM_ALIGNOF];
448  int j = 0;
449 
450  for (;;)
451  {
452  if (offset < iov[which_iov].len)
453  {
454  tmpbuf[j] = iov[which_iov].data[offset];
455  j++;
456  offset++;
457  if (j == MAXIMUM_ALIGNOF)
458  break;
459  }
460  else
461  {
462  offset -= iov[which_iov].len;
463  which_iov++;
464  if (which_iov >= iovcnt)
465  break;
466  }
467  }
468 
469  res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
470 
471  if (res == SHM_MQ_DETACHED)
472  {
473  /* Reset state in case caller tries to send another message. */
474  mqh->mqh_partial_bytes = 0;
475  mqh->mqh_length_word_complete = false;
476  return res;
477  }
478 
479  mqh->mqh_partial_bytes += bytes_written;
480  if (res != SHM_MQ_SUCCESS)
481  return res;
482  continue;
483  }
484 
485  /*
486  * If this is the last chunk, we can write all the data, even if it
487  * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
488  * MAXALIGN_DOWN the write size.
489  */
490  chunksize = iov[which_iov].len - offset;
491  if (which_iov + 1 < iovcnt)
492  chunksize = MAXALIGN_DOWN(chunksize);
493  res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
494  nowait, &bytes_written);
495 
496  if (res == SHM_MQ_DETACHED)
497  {
498  /* Reset state in case caller tries to send another message. */
499  mqh->mqh_length_word_complete = false;
500  mqh->mqh_partial_bytes = 0;
501  return res;
502  }
503 
504  mqh->mqh_partial_bytes += bytes_written;
505  offset += bytes_written;
506  if (res != SHM_MQ_SUCCESS)
507  return res;
508  } while (mqh->mqh_partial_bytes < nbytes);
509 
510  /* Reset for next message. */
511  mqh->mqh_partial_bytes = 0;
512  mqh->mqh_length_word_complete = false;
513 
514  /* If queue has been detached, let caller know. */
515  if (mq->mq_detached)
516  return SHM_MQ_DETACHED;
517 
518  /*
519  * If the counterparty is known to have attached, we can read mq_receiver
520  * without acquiring the spinlock. Otherwise, more caution is needed.
521  */
522  if (mqh->mqh_counterparty_attached)
523  receiver = mq->mq_receiver;
524  else
525  {
527  receiver = mq->mq_receiver;
529  if (receiver != NULL)
530  mqh->mqh_counterparty_attached = true;
531  }
532 
533  /*
534  * If the caller has requested force flush or we have written more than
535  * 1/4 of the ring size, mark it as written in shared memory and notify
536  * the receiver.
537  */
538  if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
539  {
541  if (receiver != NULL)
542  SetLatch(&receiver->procLatch);
543  mqh->mqh_send_pending = 0;
544  }
545 
546  return SHM_MQ_SUCCESS;
547 }
548 
549 /*
550  * Receive a message from a shared message queue.
551  *
552  * We set *nbytes to the message length and *data to point to the message
553  * payload. If the entire message exists in the queue as a single,
554  * contiguous chunk, *data will point directly into shared memory; otherwise,
555  * it will point to a temporary buffer. This mostly avoids data copying in
556  * the hoped-for case where messages are short compared to the buffer size,
557  * while still allowing longer messages. In either case, the return value
558  * remains valid until the next receive operation is performed on the queue.
559  *
560  * When nowait = false, we'll wait on our process latch when the ring buffer
561  * is empty and we have not yet received a full message. The sender will
562  * set our process latch after more data has been written, and we'll resume
563  * processing. Each call will therefore return a complete message
564  * (unless the sender detaches the queue).
565  *
566  * When nowait = true, we do not manipulate the state of the process latch;
567  * instead, whenever the buffer is empty and we need to read from it, we
568  * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
569  * function again after the process latch has been set.
570  */
572 shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
573 {
574  shm_mq *mq = mqh->mqh_queue;
576  Size rb = 0;
577  Size nbytes;
578  void *rawdata;
579 
580  Assert(mq->mq_receiver == MyProc);
581 
582  /* We can't receive data until the sender has attached. */
583  if (!mqh->mqh_counterparty_attached)
584  {
585  if (nowait)
586  {
587  int counterparty_gone;
588 
589  /*
590  * We shouldn't return at this point at all unless the sender
591  * hasn't attached yet. However, the correct return value depends
592  * on whether the sender is still attached. If we first test
593  * whether the sender has ever attached and then test whether the
594  * sender has detached, there's a race condition: a sender that
595  * attaches and detaches very quickly might fool us into thinking
596  * the sender never attached at all. So, test whether our
597  * counterparty is definitively gone first, and only afterwards
598  * check whether the sender ever attached in the first place.
599  */
600  counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
601  if (shm_mq_get_sender(mq) == NULL)
602  {
603  if (counterparty_gone)
604  return SHM_MQ_DETACHED;
605  else
606  return SHM_MQ_WOULD_BLOCK;
607  }
608  }
609  else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
610  && shm_mq_get_sender(mq) == NULL)
611  {
612  mq->mq_detached = true;
613  return SHM_MQ_DETACHED;
614  }
615  mqh->mqh_counterparty_attached = true;
616  }
617 
618  /*
619  * If we've consumed an amount of data greater than 1/4th of the ring
620  * size, mark it consumed in shared memory. We try to avoid doing this
621  * unnecessarily when only a small amount of data has been consumed,
622  * because SetLatch() is fairly expensive and we don't want to do it too
623  * often.
624  */
625  if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
626  {
628  mqh->mqh_consume_pending = 0;
629  }
630 
631  /* Try to read, or finish reading, the length word from the buffer. */
632  while (!mqh->mqh_length_word_complete)
633  {
634  /* Try to receive the message length word. */
635  Assert(mqh->mqh_partial_bytes < sizeof(Size));
636  res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
637  nowait, &rb, &rawdata);
638  if (res != SHM_MQ_SUCCESS)
639  return res;
640 
641  /*
642  * Hopefully, we'll receive the entire message length word at once.
643  * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
644  * multiple reads.
645  */
646  if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
647  {
648  Size needed;
649 
650  nbytes = *(Size *) rawdata;
651 
652  /* If we've already got the whole message, we're done. */
653  needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
654  if (rb >= needed)
655  {
656  mqh->mqh_consume_pending += needed;
657  *nbytesp = nbytes;
658  *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
659  return SHM_MQ_SUCCESS;
660  }
661 
662  /*
663  * We don't have the whole message, but we at least have the whole
664  * length word.
665  */
666  mqh->mqh_expected_bytes = nbytes;
667  mqh->mqh_length_word_complete = true;
668  mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
669  rb -= MAXALIGN(sizeof(Size));
670  }
671  else
672  {
673  Size lengthbytes;
674 
675  /* Can't be split unless bigger than required alignment. */
676  Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
677 
678  /* Message word is split; need buffer to reassemble. */
679  if (mqh->mqh_buffer == NULL)
680  {
684  }
685  Assert(mqh->mqh_buflen >= sizeof(Size));
686 
687  /* Copy partial length word; remember to consume it. */
688  if (mqh->mqh_partial_bytes + rb > sizeof(Size))
689  lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
690  else
691  lengthbytes = rb;
692  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
693  lengthbytes);
694  mqh->mqh_partial_bytes += lengthbytes;
695  mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
696  rb -= lengthbytes;
697 
698  /* If we now have the whole word, we're ready to read payload. */
699  if (mqh->mqh_partial_bytes >= sizeof(Size))
700  {
701  Assert(mqh->mqh_partial_bytes == sizeof(Size));
702  mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
703  mqh->mqh_length_word_complete = true;
704  mqh->mqh_partial_bytes = 0;
705  }
706  }
707  }
708  nbytes = mqh->mqh_expected_bytes;
709 
710  /*
711  * Should be disallowed on the sending side already, but better check and
712  * error out on the receiver side as well rather than trying to read a
713  * prohibitively large message.
714  */
715  if (nbytes > MaxAllocSize)
716  ereport(ERROR,
717  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
718  errmsg("invalid message size %zu in shared memory queue",
719  nbytes)));
720 
721  if (mqh->mqh_partial_bytes == 0)
722  {
723  /*
724  * Try to obtain the whole message in a single chunk. If this works,
725  * we need not copy the data and can return a pointer directly into
726  * shared memory.
727  */
728  res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
729  if (res != SHM_MQ_SUCCESS)
730  return res;
731  if (rb >= nbytes)
732  {
733  mqh->mqh_length_word_complete = false;
734  mqh->mqh_consume_pending += MAXALIGN(nbytes);
735  *nbytesp = nbytes;
736  *datap = rawdata;
737  return SHM_MQ_SUCCESS;
738  }
739 
740  /*
741  * The message has wrapped the buffer. We'll need to copy it in order
742  * to return it to the client in one chunk. First, make sure we have
743  * a large enough buffer available.
744  */
745  if (mqh->mqh_buflen < nbytes)
746  {
747  Size newbuflen;
748 
749  /*
750  * Increase size to the next power of 2 that's >= nbytes, but
751  * limit to MaxAllocSize.
752  */
753  newbuflen = pg_nextpower2_size_t(nbytes);
754  newbuflen = Min(newbuflen, MaxAllocSize);
755 
756  if (mqh->mqh_buffer != NULL)
757  {
758  pfree(mqh->mqh_buffer);
759  mqh->mqh_buffer = NULL;
760  mqh->mqh_buflen = 0;
761  }
762  mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
763  mqh->mqh_buflen = newbuflen;
764  }
765  }
766 
767  /* Loop until we've copied the entire message. */
768  for (;;)
769  {
770  Size still_needed;
771 
772  /* Copy as much as we can. */
773  Assert(mqh->mqh_partial_bytes + rb <= nbytes);
774  if (rb > 0)
775  {
776  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
777  mqh->mqh_partial_bytes += rb;
778  }
779 
780  /*
781  * Update count of bytes that can be consumed, accounting for
782  * alignment padding. Note that this will never actually insert any
783  * padding except at the end of a message, because the buffer size is
784  * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
785  */
786  Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
787  mqh->mqh_consume_pending += MAXALIGN(rb);
788 
789  /* If we got all the data, exit the loop. */
790  if (mqh->mqh_partial_bytes >= nbytes)
791  break;
792 
793  /* Wait for some more data. */
794  still_needed = nbytes - mqh->mqh_partial_bytes;
795  res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
796  if (res != SHM_MQ_SUCCESS)
797  return res;
798  if (rb > still_needed)
799  rb = still_needed;
800  }
801 
802  /* Return the complete message, and reset for next message. */
803  *nbytesp = nbytes;
804  *datap = mqh->mqh_buffer;
805  mqh->mqh_length_word_complete = false;
806  mqh->mqh_partial_bytes = 0;
807  return SHM_MQ_SUCCESS;
808 }
809 
810 /*
811  * Wait for the other process that's supposed to use this queue to attach
812  * to it.
813  *
814  * The return value is SHM_MQ_DETACHED if the worker has already detached or
815  * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
816  * Note that we will only be able to detect that the worker has died before
817  * attaching if a background worker handle was passed to shm_mq_attach().
818  */
821 {
822  shm_mq *mq = mqh->mqh_queue;
823  PGPROC **victim;
824 
825  if (shm_mq_get_receiver(mq) == MyProc)
826  victim = &mq->mq_sender;
827  else
828  {
830  victim = &mq->mq_receiver;
831  }
832 
833  if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
834  return SHM_MQ_SUCCESS;
835  else
836  return SHM_MQ_DETACHED;
837 }
838 
839 /*
840  * Detach from a shared message queue, and destroy the shm_mq_handle.
841  */
842 void
844 {
845  /* Before detaching, notify the receiver about any already-written data. */
846  if (mqh->mqh_send_pending > 0)
847  {
849  mqh->mqh_send_pending = 0;
850  }
851 
852  /* Notify counterparty that we're outta here. */
854 
855  /* Cancel on_dsm_detach callback, if any. */
856  if (mqh->mqh_segment)
859  PointerGetDatum(mqh->mqh_queue));
860 
861  /* Release local memory associated with handle. */
862  if (mqh->mqh_buffer != NULL)
863  pfree(mqh->mqh_buffer);
864  pfree(mqh);
865 }
866 
867 /*
868  * Notify counterparty that we're detaching from shared message queue.
869  *
870  * The purpose of this function is to make sure that the process
871  * with which we're communicating doesn't block forever waiting for us to
872  * fill or drain the queue once we've lost interest. When the sender
873  * detaches, the receiver can read any messages remaining in the queue;
874  * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
875  * further attempts to send messages will likewise return SHM_MQ_DETACHED.
876  *
877  * This is separated out from shm_mq_detach() because if the on_dsm_detach
878  * callback fires, we only want to do this much. We do not try to touch
879  * the local shm_mq_handle, as it may have been pfree'd already.
880  */
881 static void
883 {
884  PGPROC *victim;
885 
887  if (mq->mq_sender == MyProc)
888  victim = mq->mq_receiver;
889  else
890  {
891  Assert(mq->mq_receiver == MyProc);
892  victim = mq->mq_sender;
893  }
894  mq->mq_detached = true;
896 
897  if (victim != NULL)
898  SetLatch(&victim->procLatch);
899 }
900 
901 /*
902  * Get the shm_mq from handle.
903  */
904 shm_mq *
906 {
907  return mqh->mqh_queue;
908 }
909 
910 /*
911  * Write bytes into a shared message queue.
912  */
913 static shm_mq_result
914 shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
915  bool nowait, Size *bytes_written)
916 {
917  shm_mq *mq = mqh->mqh_queue;
918  Size sent = 0;
919  uint64 used;
920  Size ringsize = mq->mq_ring_size;
921  Size available;
922 
923  while (sent < nbytes)
924  {
925  uint64 rb;
926  uint64 wb;
927 
928  /* Compute number of ring buffer bytes used and available. */
931  Assert(wb >= rb);
932  used = wb - rb;
933  Assert(used <= ringsize);
934  available = Min(ringsize - used, nbytes - sent);
935 
936  /*
937  * Bail out if the queue has been detached. Note that we would be in
938  * trouble if the compiler decided to cache the value of
939  * mq->mq_detached in a register or on the stack across loop
940  * iterations. It probably shouldn't do that anyway since we'll
941  * always return, call an external function that performs a system
942  * call, or reach a memory barrier at some point later in the loop,
943  * but just to be sure, insert a compiler barrier here.
944  */
946  if (mq->mq_detached)
947  {
948  *bytes_written = sent;
949  return SHM_MQ_DETACHED;
950  }
951 
952  if (available == 0 && !mqh->mqh_counterparty_attached)
953  {
954  /*
955  * The queue is full, so if the receiver isn't yet known to be
956  * attached, we must wait for that to happen.
957  */
958  if (nowait)
959  {
960  if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
961  {
962  *bytes_written = sent;
963  return SHM_MQ_DETACHED;
964  }
965  if (shm_mq_get_receiver(mq) == NULL)
966  {
967  *bytes_written = sent;
968  return SHM_MQ_WOULD_BLOCK;
969  }
970  }
971  else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
972  mqh->mqh_handle))
973  {
974  mq->mq_detached = true;
975  *bytes_written = sent;
976  return SHM_MQ_DETACHED;
977  }
978  mqh->mqh_counterparty_attached = true;
979 
980  /*
981  * The receiver may have read some data after attaching, so we
982  * must not wait without rechecking the queue state.
983  */
984  }
985  else if (available == 0)
986  {
987  /* Update the pending send bytes in the shared memory. */
989 
990  /*
991  * Since mq->mqh_counterparty_attached is known to be true at this
992  * point, mq_receiver has been set, and it can't change once set.
993  * Therefore, we can read it without acquiring the spinlock.
994  */
997 
998  /*
999  * We have just updated the mqh_send_pending bytes in the shared
1000  * memory so reset it.
1001  */
1002  mqh->mqh_send_pending = 0;
1003 
1004  /* Skip manipulation of our latch if nowait = true. */
1005  if (nowait)
1006  {
1007  *bytes_written = sent;
1008  return SHM_MQ_WOULD_BLOCK;
1009  }
1010 
1011  /*
1012  * Wait for our latch to be set. It might already be set for some
1013  * unrelated reason, but that'll just result in one extra trip
1014  * through the loop. It's worth it to avoid resetting the latch
1015  * at top of loop, because setting an already-set latch is much
1016  * cheaper than setting one that has been reset.
1017  */
1019  WAIT_EVENT_MESSAGE_QUEUE_SEND);
1020 
1021  /* Reset the latch so we don't spin. */
1023 
1024  /* An interrupt may have occurred while we were waiting. */
1026  }
1027  else
1028  {
1029  Size offset;
1030  Size sendnow;
1031 
1032  offset = wb % (uint64) ringsize;
1033  sendnow = Min(available, ringsize - offset);
1034 
1035  /*
1036  * Write as much data as we can via a single memcpy(). Make sure
1037  * these writes happen after the read of mq_bytes_read, above.
1038  * This barrier pairs with the one in shm_mq_inc_bytes_read.
1039  * (Since we're separating the read of mq_bytes_read from a
1040  * subsequent write to mq_ring, we need a full barrier here.)
1041  */
1043  memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
1044  (char *) data + sent, sendnow);
1045  sent += sendnow;
1046 
1047  /*
1048  * Update count of bytes written, with alignment padding. Note
1049  * that this will never actually insert any padding except at the
1050  * end of a run of bytes, because the buffer size is a multiple of
1051  * MAXIMUM_ALIGNOF, and each read is as well.
1052  */
1053  Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
1054 
1055  /*
1056  * For efficiency, we don't update the bytes written in the shared
1057  * memory and also don't set the reader's latch here. Refer to
1058  * the comments atop the shm_mq_handle structure for more
1059  * information.
1060  */
1061  mqh->mqh_send_pending += MAXALIGN(sendnow);
1062  }
1063  }
1064 
1065  *bytes_written = sent;
1066  return SHM_MQ_SUCCESS;
1067 }
1068 
1069 /*
1070  * Wait until at least *nbytesp bytes are available to be read from the
1071  * shared message queue, or until the buffer wraps around. If the queue is
1072  * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
1073  * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
1074  * to the location at which data bytes can be read, *nbytesp is set to the
1075  * number of bytes which can be read at that address, and the return value
1076  * is SHM_MQ_SUCCESS.
1077  */
1078 static shm_mq_result
1079 shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
1080  Size *nbytesp, void **datap)
1081 {
1082  shm_mq *mq = mqh->mqh_queue;
1083  Size ringsize = mq->mq_ring_size;
1084  uint64 used;
1085  uint64 written;
1086 
1087  for (;;)
1088  {
1089  Size offset;
1090  uint64 read;
1091 
1092  /* Get bytes written, so we can compute what's available to read. */
1093  written = pg_atomic_read_u64(&mq->mq_bytes_written);
1094 
1095  /*
1096  * Get bytes read. Include bytes we could consume but have not yet
1097  * consumed.
1098  */
1100  mqh->mqh_consume_pending;
1101  used = written - read;
1102  Assert(used <= ringsize);
1103  offset = read % (uint64) ringsize;
1104 
1105  /* If we have enough data or buffer has wrapped, we're done. */
1106  if (used >= bytes_needed || offset + used >= ringsize)
1107  {
1108  *nbytesp = Min(used, ringsize - offset);
1109  *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1110 
1111  /*
1112  * Separate the read of mq_bytes_written, above, from caller's
1113  * attempt to read the data itself. Pairs with the barrier in
1114  * shm_mq_inc_bytes_written.
1115  */
1116  pg_read_barrier();
1117  return SHM_MQ_SUCCESS;
1118  }
1119 
1120  /*
1121  * Fall out before waiting if the queue has been detached.
1122  *
1123  * Note that we don't check for this until *after* considering whether
1124  * the data already available is enough, since the receiver can finish
1125  * receiving a message stored in the buffer even after the sender has
1126  * detached.
1127  */
1128  if (mq->mq_detached)
1129  {
1130  /*
1131  * If the writer advanced mq_bytes_written and then set
1132  * mq_detached, we might not have read the final value of
1133  * mq_bytes_written above. Insert a read barrier and then check
1134  * again if mq_bytes_written has advanced.
1135  */
1136  pg_read_barrier();
1137  if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
1138  continue;
1139 
1140  return SHM_MQ_DETACHED;
1141  }
1142 
1143  /*
1144  * We didn't get enough data to satisfy the request, so mark any data
1145  * previously-consumed as read to make more buffer space.
1146  */
1147  if (mqh->mqh_consume_pending > 0)
1148  {
1150  mqh->mqh_consume_pending = 0;
1151  }
1152 
1153  /* Skip manipulation of our latch if nowait = true. */
1154  if (nowait)
1155  return SHM_MQ_WOULD_BLOCK;
1156 
1157  /*
1158  * Wait for our latch to be set. It might already be set for some
1159  * unrelated reason, but that'll just result in one extra trip through
1160  * the loop. It's worth it to avoid resetting the latch at top of
1161  * loop, because setting an already-set latch is much cheaper than
1162  * setting one that has been reset.
1163  */
1165  WAIT_EVENT_MESSAGE_QUEUE_RECEIVE);
1166 
1167  /* Reset the latch so we don't spin. */
1169 
1170  /* An interrupt may have occurred while we were waiting. */
1172  }
1173 }
1174 
1175 /*
1176  * Test whether a counterparty who may not even be alive yet is definitely gone.
1177  */
1178 static bool
1180 {
1181  pid_t pid;
1182 
1183  /* If the queue has been detached, counterparty is definitely gone. */
1184  if (mq->mq_detached)
1185  return true;
1186 
1187  /* If there's a handle, check worker status. */
1188  if (handle != NULL)
1189  {
1190  BgwHandleStatus status;
1191 
1192  /* Check for unexpected worker death. */
1193  status = GetBackgroundWorkerPid(handle, &pid);
1194  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1195  {
1196  /* Mark it detached, just to make it official. */
1197  mq->mq_detached = true;
1198  return true;
1199  }
1200  }
1201 
1202  /* Counterparty is not definitively gone. */
1203  return false;
1204 }
1205 
1206 /*
1207  * This is used when a process is waiting for its counterpart to attach to the
1208  * queue. We exit when the other process attaches as expected, or, if
1209  * handle != NULL, when the referenced background process or the postmaster
1210  * dies. Note that if handle == NULL, and the process fails to attach, we'll
1211  * potentially get stuck here forever waiting for a process that may never
1212  * start. We do check for interrupts, though.
1213  *
1214  * ptr is a pointer to the memory address that we're expecting to become
1215  * non-NULL when our counterpart attaches to the queue.
1216  */
1217 static bool
1219 {
1220  bool result = false;
1221 
1222  for (;;)
1223  {
1224  BgwHandleStatus status;
1225  pid_t pid;
1226 
1227  /* Acquire the lock just long enough to check the pointer. */
1228  SpinLockAcquire(&mq->mq_mutex);
1229  result = (*ptr != NULL);
1230  SpinLockRelease(&mq->mq_mutex);
1231 
1232  /* Fail if detached; else succeed if initialized. */
1233  if (mq->mq_detached)
1234  {
1235  result = false;
1236  break;
1237  }
1238  if (result)
1239  break;
1240 
1241  if (handle != NULL)
1242  {
1243  /* Check for unexpected worker death. */
1244  status = GetBackgroundWorkerPid(handle, &pid);
1245  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1246  {
1247  result = false;
1248  break;
1249  }
1250  }
1251 
1252  /* Wait to be signaled. */
1254  WAIT_EVENT_MESSAGE_QUEUE_INTERNAL);
1255 
1256  /* Reset the latch so we don't spin. */
1258 
1259  /* An interrupt may have occurred while we were waiting. */
1261  }
1262 
1263  return result;
1264 }
1265 
1266 /*
1267  * Increment the number of bytes read.
1268  */
1269 static void
1271 {
1272  PGPROC *sender;
1273 
1274  /*
1275  * Separate prior reads of mq_ring from the increment of mq_bytes_read
1276  * which follows. This pairs with the full barrier in
1277  * shm_mq_send_bytes(). We only need a read barrier here because the
1278  * increment of mq_bytes_read is actually a read followed by a dependent
1279  * write.
1280  */
1281  pg_read_barrier();
1282 
1283  /*
1284  * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1285  * else can be changing this value. This method should be cheaper.
1286  */
1288  pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1289 
1290  /*
1291  * We shouldn't have any bytes to read without a sender, so we can read
1292  * mq_sender here without a lock. Once it's initialized, it can't change.
1293  */
1294  sender = mq->mq_sender;
1295  Assert(sender != NULL);
1296  SetLatch(&sender->procLatch);
1297 }
1298 
1299 /*
1300  * Increment the number of bytes written.
1301  */
1302 static void
1304 {
1305  /*
1306  * Separate prior reads of mq_ring from the write of mq_bytes_written
1307  * which we're about to do. Pairs with the read barrier found in
1308  * shm_mq_receive_bytes.
1309  */
1310  pg_write_barrier();
1311 
1312  /*
1313  * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1314  * else can be changing this value. This method avoids taking the bus
1315  * lock unnecessarily.
1316  */
1319 }
1320 
1321 /* Shim for on_dsm_detach callback. */
1322 static void
1324 {
1325  shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1326 
1328 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:480
#define pg_memory_barrier()
Definition: atomics.h:138
#define pg_read_barrier()
Definition: atomics.h:151
#define pg_write_barrier()
Definition: atomics.h:152
#define pg_compiler_barrier()
Definition: atomics.h:126
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:448
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:462
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1082
BgwHandleStatus
Definition: bgworker.h:104
@ BGWH_STARTED
Definition: bgworker.h:105
@ BGWH_NOT_YET_STARTED
Definition: bgworker.h:106
#define MAXALIGN_DOWN(LEN)
Definition: c.h:810
#define Min(x, y)
Definition: c.h:991
#define MAXALIGN(LEN)
Definition: c.h:798
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:385
unsigned char uint8
Definition: c.h:491
size_t Size
Definition: c.h:592
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1132
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1147
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
struct Latch * MyLatch
Definition: globals.c:60
#define read(a, b, c)
Definition: win32.h:13
int j
Definition: isn.c:74
int i
Definition: isn.c:73
void SetLatch(Latch *latch)
Definition: latch.c:632
void ResetLatch(Latch *latch)
Definition: latch.c:724
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:517
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
Assert(fmt[strlen(fmt) - 1] !='\n')
void pfree(void *pointer)
Definition: mcxt.c:1508
MemoryContext CurrentMemoryContext
Definition: mcxt.c:131
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1168
void * palloc(Size size)
Definition: mcxt.c:1304
#define MaxAllocSize
Definition: memutils.h:40
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
void * arg
#define pg_nextpower2_size_t
Definition: pg_bitutils.h:339
const void size_t len
const void * data
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
int slock_t
Definition: s_lock.h:735
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:290
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition: shm_mq.c:914
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
Definition: shm_mq.c:905
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
Definition: shm_mq.c:1303
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:224
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:319
#define MQH_INITIAL_BUFSIZE
Definition: shm_mq.c:171
shm_mq * shm_mq_create(void *address, Size size)
Definition: shm_mq.c:177
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:843
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1323
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1179
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool force_flush)
Definition: shm_mq.c:361
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1218
static void shm_mq_detach_internal(shm_mq *mq)
Definition: shm_mq.c:882
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:242
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:206
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:257
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:572
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition: shm_mq.c:329
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
Definition: shm_mq.c:1270
shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh)
Definition: shm_mq.c:820
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition: shm_mq.c:1079
const Size shm_mq_minimum_size
Definition: shm_mq.c:168
shm_mq_result
Definition: shm_mq.h:37
@ SHM_MQ_SUCCESS
Definition: shm_mq.h:38
@ SHM_MQ_WOULD_BLOCK
Definition: shm_mq.h:39
@ SHM_MQ_DETACHED
Definition: shm_mq.h:40
static pg_noinline void Size size
Definition: slab.c:607
#define SpinLockInit(lock)
Definition: spin.h:60
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
PGPROC * MyProc
Definition: proc.c:66
Definition: proc.h:157
Latch procLatch
Definition: proc.h:165
Size mqh_consume_pending
Definition: shm_mq.c:144
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:141
char * mqh_buffer
Definition: shm_mq.c:142
Size mqh_send_pending
Definition: shm_mq.c:145
Size mqh_expected_bytes
Definition: shm_mq.c:147
bool mqh_counterparty_attached
Definition: shm_mq.c:149
shm_mq * mqh_queue
Definition: shm_mq.c:139
dsm_segment * mqh_segment
Definition: shm_mq.c:140
bool mqh_length_word_complete
Definition: shm_mq.c:148
Size mqh_buflen
Definition: shm_mq.c:143
MemoryContext mqh_context
Definition: shm_mq.c:150
Size mqh_partial_bytes
Definition: shm_mq.c:146
const char * data
Definition: shm_mq.h:31
Size len
Definition: shm_mq.h:32
Definition: shm_mq.c:72
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:77
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:76
bool mq_detached
Definition: shm_mq.c:79
PGPROC * mq_sender
Definition: shm_mq.c:75
uint8 mq_ring_offset
Definition: shm_mq.c:80
slock_t mq_mutex
Definition: shm_mq.c:73
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
Definition: shm_mq.c:81
Size mq_ring_size
Definition: shm_mq.c:78
PGPROC * mq_receiver
Definition: shm_mq.c:74
static StringInfoData tmpbuf
Definition: walsender.c:170