PostgreSQL Source Code  git master
shm_mq.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * shm_mq.c
4  * single-reader, single-writer shared memory message queue
5  *
6  * Both the sender and the receiver must have a PGPROC; their respective
7  * process latches are used for synchronization. Only the sender may send,
8  * and only the receiver may receive. This is intended to allow a user
9  * backend to communicate with worker backends that it has registered.
10  *
11  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  * src/backend/storage/ipc/shm_mq.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "postmaster/bgworker.h"
24 #include "storage/procsignal.h"
25 #include "storage/shm_mq.h"
26 #include "storage/spin.h"
27 
28 /*
29  * This structure represents the actual queue, stored in shared memory.
30  *
31  * Some notes on synchronization:
32  *
33  * mq_receiver and mq_bytes_read can only be changed by the receiver; and
34  * mq_sender and mq_bytes_written can only be changed by the sender.
35  * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
36  * they cannot change once set, and thus may be read without a lock once this
37  * is known to be the case.
38  *
39  * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead,
40  * they are written atomically using 8 byte loads and stores. Memory barriers
41  * must be carefully used to synchronize reads and writes of these values with
42  * reads and writes of the actual data in mq_ring.
43  *
44  * mq_detached needs no locking. It can be set by either the sender or the
45  * receiver, but only ever from false to true, so redundant writes don't
46  * matter. It is important that if we set mq_detached and then set the
47  * counterparty's latch, the counterparty must be certain to see the change
48  * after waking up. Since SetLatch begins with a memory barrier and ResetLatch
49  * ends with one, this should be OK.
50  *
51  * mq_ring_size and mq_ring_offset never change after initialization, and
52  * can therefore be read without the lock.
53  *
54  * Importantly, mq_ring can be safely read and written without a lock.
55  * At any given time, the difference between mq_bytes_read and
56  * mq_bytes_written defines the number of bytes within mq_ring that contain
57  * unread data, and mq_bytes_read defines the position where those bytes
58  * begin. The sender can increase the number of unread bytes at any time,
59  * but only the receiver can give license to overwrite those bytes, by
60  * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
61  * the unread bytes it knows to be present without the lock. Conversely,
62  * the sender can write to the unused portion of the ring buffer without
63  * the lock, because nobody else can be reading or writing those bytes. The
64  * receiver could be making more bytes unused by incrementing mq_bytes_read,
65  * but that's OK. Note that it would be unsafe for the receiver to read any
66  * data it's already marked as read, or to write any data; and it would be
67  * unsafe for the sender to reread any data after incrementing
68  * mq_bytes_written, but fortunately there's no need for any of that.
69  */
70 struct shm_mq
71 {
80  char mq_ring[FLEXIBLE_ARRAY_MEMBER];
81 };
82 
83 /*
84  * This structure is a backend-private handle for access to a queue.
85  *
86  * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
87  * an optional pointer to the dynamic shared memory segment that contains it.
88  * (If mqh_segment is provided, we register an on_dsm_detach callback to
89  * make sure we detach from the queue before detaching from DSM.)
90  *
91  * If this queue is intended to connect the current process with a background
92  * worker that started it, the user can pass a pointer to the worker handle
93  * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
94  * is to allow us to begin sending to or receiving from that queue before the
95  * process we'll be communicating with has even been started. If it fails
96  * to start, the handle will allow us to notice that and fail cleanly, rather
97  * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
98  * simple cases - e.g. where there are just 2 processes communicating; in
99  * more complex scenarios, every process may not have a BackgroundWorkerHandle
100  * available, or may need to watch for the failure of more than one other
101  * process at a time.
102  *
103  * When a message exists as a contiguous chunk of bytes in the queue - that is,
104  * it is smaller than the size of the ring buffer and does not wrap around
105  * the end - we return the message to the caller as a pointer into the buffer.
106  * For messages that are larger or happen to wrap, we reassemble the message
107  * locally by copying the chunks into a backend-local buffer. mqh_buffer is
108  * the buffer, and mqh_buflen is the number of bytes allocated for it.
109  *
110  * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
111  * are used to track the state of non-blocking operations. When the caller
112  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
113  * are expected to retry the call at a later time with the same argument;
114  * we need to retain enough state to pick up where we left off.
115  * mqh_length_word_complete tracks whether we are done sending or receiving
116  * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
117  * the number of bytes read or written for either the length word or the
118  * message itself, and mqh_expected_bytes - which is used only for reads -
119  * tracks the expected total size of the payload.
120  *
121  * mqh_counterparty_attached tracks whether we know the counterparty to have
122  * attached to the queue at some previous point. This lets us avoid some
123  * mutex acquisitions.
124  *
125  * mqh_context is the memory context in effect at the time we attached to
126  * the shm_mq. The shm_mq_handle itself is allocated in this context, and
127  * we make sure any other allocations we do happen in this context as well,
128  * to avoid nasty surprises.
129  */
131 {
135  char *mqh_buffer;
143 };
144 
145 static void shm_mq_detach_internal(shm_mq *mq);
147  const void *data, bool nowait, Size *bytes_written);
149  Size bytes_needed, bool nowait, Size *nbytesp,
150  void **datap);
151 static bool shm_mq_counterparty_gone(shm_mq *mq,
152  BackgroundWorkerHandle *handle);
153 static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
154  BackgroundWorkerHandle *handle);
155 static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
156 static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
157 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
158 
159 /* Minimum queue size is enough for header and at least one chunk of data. */
161 MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
162 
163 #define MQH_INITIAL_BUFSIZE 8192
164 
165 /*
166  * Initialize a new shared message queue.
167  */
168 shm_mq *
169 shm_mq_create(void *address, Size size)
170 {
171  shm_mq *mq = address;
172  Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
173 
174  /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
175  size = MAXALIGN_DOWN(size);
176 
177  /* Queue size must be large enough to hold some data. */
178  Assert(size > data_offset);
179 
180  /* Initialize queue header. */
181  SpinLockInit(&mq->mq_mutex);
182  mq->mq_receiver = NULL;
183  mq->mq_sender = NULL;
186  mq->mq_ring_size = size - data_offset;
187  mq->mq_detached = false;
188  mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
189 
190  return mq;
191 }
192 
193 /*
194  * Set the identity of the process that will receive from a shared message
195  * queue.
196  */
197 void
199 {
200  PGPROC *sender;
201 
203  Assert(mq->mq_receiver == NULL);
204  mq->mq_receiver = proc;
205  sender = mq->mq_sender;
207 
208  if (sender != NULL)
209  SetLatch(&sender->procLatch);
210 }
211 
212 /*
213  * Set the identity of the process that will send to a shared message queue.
214  */
215 void
217 {
218  PGPROC *receiver;
219 
221  Assert(mq->mq_sender == NULL);
222  mq->mq_sender = proc;
223  receiver = mq->mq_receiver;
225 
226  if (receiver != NULL)
227  SetLatch(&receiver->procLatch);
228 }
229 
230 /*
231  * Get the configured receiver.
232  */
233 PGPROC *
235 {
236  PGPROC *receiver;
237 
239  receiver = mq->mq_receiver;
241 
242  return receiver;
243 }
244 
245 /*
246  * Get the configured sender.
247  */
248 PGPROC *
250 {
251  PGPROC *sender;
252 
254  sender = mq->mq_sender;
256 
257  return sender;
258 }
259 
260 /*
261  * Attach to a shared message queue so we can send or receive messages.
262  *
263  * The memory context in effect at the time this function is called should
264  * be one which will last for at least as long as the message queue itself.
265  * We'll allocate the handle in that context, and future allocations that
266  * are needed to buffer incoming data will happen in that context as well.
267  *
268  * If seg != NULL, the queue will be automatically detached when that dynamic
269  * shared memory segment is detached.
270  *
271  * If handle != NULL, the queue can be read or written even before the
272  * other process has attached. We'll wait for it to do so if needed. The
273  * handle must be for a background worker initialized with bgw_notify_pid
274  * equal to our PID.
275  *
276  * shm_mq_detach() should be called when done. This will free the
277  * shm_mq_handle and mark the queue itself as detached, so that our
278  * counterpart won't get stuck waiting for us to fill or drain the queue
279  * after we've already lost interest.
280  */
283 {
284  shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
285 
286  Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
287  mqh->mqh_queue = mq;
288  mqh->mqh_segment = seg;
289  mqh->mqh_handle = handle;
290  mqh->mqh_buffer = NULL;
291  mqh->mqh_buflen = 0;
292  mqh->mqh_consume_pending = 0;
293  mqh->mqh_partial_bytes = 0;
294  mqh->mqh_expected_bytes = 0;
295  mqh->mqh_length_word_complete = false;
296  mqh->mqh_counterparty_attached = false;
298 
299  if (seg != NULL)
301 
302  return mqh;
303 }
304 
305 /*
306  * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
307  * been passed to shm_mq_attach.
308  */
309 void
311 {
312  Assert(mqh->mqh_handle == NULL);
313  mqh->mqh_handle = handle;
314 }
315 
316 /*
317  * Write a message into a shared message queue.
318  */
320 shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
321 {
322  shm_mq_iovec iov;
323 
324  iov.data = data;
325  iov.len = nbytes;
326 
327  return shm_mq_sendv(mqh, &iov, 1, nowait);
328 }
329 
330 /*
331  * Write a message into a shared message queue, gathered from multiple
332  * addresses.
333  *
334  * When nowait = false, we'll wait on our process latch when the ring buffer
335  * fills up, and then continue writing once the receiver has drained some data.
336  * The process latch is reset after each wait.
337  *
338  * When nowait = true, we do not manipulate the state of the process latch;
339  * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
340  * this case, the caller should call this function again, with the same
341  * arguments, each time the process latch is set. (Once begun, the sending
342  * of a message cannot be aborted except by detaching from the queue; changing
343  * the length or payload will corrupt the queue.)
344  */
346 shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
347 {
348  shm_mq_result res;
349  shm_mq *mq = mqh->mqh_queue;
350  PGPROC *receiver;
351  Size nbytes = 0;
352  Size bytes_written;
353  int i;
354  int which_iov = 0;
355  Size offset;
356 
357  Assert(mq->mq_sender == MyProc);
358 
359  /* Compute total size of write. */
360  for (i = 0; i < iovcnt; ++i)
361  nbytes += iov[i].len;
362 
363  /* Try to write, or finish writing, the length word into the buffer. */
364  while (!mqh->mqh_length_word_complete)
365  {
366  Assert(mqh->mqh_partial_bytes < sizeof(Size));
367  res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
368  ((char *) &nbytes) + mqh->mqh_partial_bytes,
369  nowait, &bytes_written);
370 
371  if (res == SHM_MQ_DETACHED)
372  {
373  /* Reset state in case caller tries to send another message. */
374  mqh->mqh_partial_bytes = 0;
375  mqh->mqh_length_word_complete = false;
376  return res;
377  }
378  mqh->mqh_partial_bytes += bytes_written;
379 
380  if (mqh->mqh_partial_bytes >= sizeof(Size))
381  {
382  Assert(mqh->mqh_partial_bytes == sizeof(Size));
383 
384  mqh->mqh_partial_bytes = 0;
385  mqh->mqh_length_word_complete = true;
386  }
387 
388  if (res != SHM_MQ_SUCCESS)
389  return res;
390 
391  /* Length word can't be split unless bigger than required alignment. */
392  Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
393  }
394 
395  /* Write the actual data bytes into the buffer. */
396  Assert(mqh->mqh_partial_bytes <= nbytes);
397  offset = mqh->mqh_partial_bytes;
398  do
399  {
400  Size chunksize;
401 
402  /* Figure out which bytes need to be sent next. */
403  if (offset >= iov[which_iov].len)
404  {
405  offset -= iov[which_iov].len;
406  ++which_iov;
407  if (which_iov >= iovcnt)
408  break;
409  continue;
410  }
411 
412  /*
413  * We want to avoid copying the data if at all possible, but every
414  * chunk of bytes we write into the queue has to be MAXALIGN'd, except
415  * the last. Thus, if a chunk other than the last one ends on a
416  * non-MAXALIGN'd boundary, we have to combine the tail end of its
417  * data with data from one or more following chunks until we either
418  * reach the last chunk or accumulate a number of bytes which is
419  * MAXALIGN'd.
420  */
421  if (which_iov + 1 < iovcnt &&
422  offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
423  {
424  char tmpbuf[MAXIMUM_ALIGNOF];
425  int j = 0;
426 
427  for (;;)
428  {
429  if (offset < iov[which_iov].len)
430  {
431  tmpbuf[j] = iov[which_iov].data[offset];
432  j++;
433  offset++;
434  if (j == MAXIMUM_ALIGNOF)
435  break;
436  }
437  else
438  {
439  offset -= iov[which_iov].len;
440  which_iov++;
441  if (which_iov >= iovcnt)
442  break;
443  }
444  }
445 
446  res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
447 
448  if (res == SHM_MQ_DETACHED)
449  {
450  /* Reset state in case caller tries to send another message. */
451  mqh->mqh_partial_bytes = 0;
452  mqh->mqh_length_word_complete = false;
453  return res;
454  }
455 
456  mqh->mqh_partial_bytes += bytes_written;
457  if (res != SHM_MQ_SUCCESS)
458  return res;
459  continue;
460  }
461 
462  /*
463  * If this is the last chunk, we can write all the data, even if it
464  * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
465  * MAXALIGN_DOWN the write size.
466  */
467  chunksize = iov[which_iov].len - offset;
468  if (which_iov + 1 < iovcnt)
469  chunksize = MAXALIGN_DOWN(chunksize);
470  res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
471  nowait, &bytes_written);
472 
473  if (res == SHM_MQ_DETACHED)
474  {
475  /* Reset state in case caller tries to send another message. */
476  mqh->mqh_length_word_complete = false;
477  mqh->mqh_partial_bytes = 0;
478  return res;
479  }
480 
481  mqh->mqh_partial_bytes += bytes_written;
482  offset += bytes_written;
483  if (res != SHM_MQ_SUCCESS)
484  return res;
485  } while (mqh->mqh_partial_bytes < nbytes);
486 
487  /* Reset for next message. */
488  mqh->mqh_partial_bytes = 0;
489  mqh->mqh_length_word_complete = false;
490 
491  /* If queue has been detached, let caller know. */
492  if (mq->mq_detached)
493  return SHM_MQ_DETACHED;
494 
495  /*
496  * If the counterparty is known to have attached, we can read mq_receiver
497  * without acquiring the spinlock and assume it isn't NULL. Otherwise,
498  * more caution is needed.
499  */
500  if (mqh->mqh_counterparty_attached)
501  receiver = mq->mq_receiver;
502  else
503  {
505  receiver = mq->mq_receiver;
507  if (receiver == NULL)
508  return SHM_MQ_SUCCESS;
509  mqh->mqh_counterparty_attached = true;
510  }
511 
512  /* Notify receiver of the newly-written data, and return. */
513  SetLatch(&receiver->procLatch);
514  return SHM_MQ_SUCCESS;
515 }
516 
517 /*
518  * Receive a message from a shared message queue.
519  *
520  * We set *nbytes to the message length and *data to point to the message
521  * payload. If the entire message exists in the queue as a single,
522  * contiguous chunk, *data will point directly into shared memory; otherwise,
523  * it will point to a temporary buffer. This mostly avoids data copying in
524  * the hoped-for case where messages are short compared to the buffer size,
525  * while still allowing longer messages. In either case, the return value
526  * remains valid until the next receive operation is performed on the queue.
527  *
528  * When nowait = false, we'll wait on our process latch when the ring buffer
529  * is empty and we have not yet received a full message. The sender will
530  * set our process latch after more data has been written, and we'll resume
531  * processing. Each call will therefore return a complete message
532  * (unless the sender detaches the queue).
533  *
534  * When nowait = true, we do not manipulate the state of the process latch;
535  * instead, whenever the buffer is empty and we need to read from it, we
536  * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
537  * function again after the process latch has been set.
538  */
540 shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
541 {
542  shm_mq *mq = mqh->mqh_queue;
543  shm_mq_result res;
544  Size rb = 0;
545  Size nbytes;
546  void *rawdata;
547 
548  Assert(mq->mq_receiver == MyProc);
549 
550  /* We can't receive data until the sender has attached. */
551  if (!mqh->mqh_counterparty_attached)
552  {
553  if (nowait)
554  {
555  int counterparty_gone;
556 
557  /*
558  * We shouldn't return at this point at all unless the sender
559  * hasn't attached yet. However, the correct return value depends
560  * on whether the sender is still attached. If we first test
561  * whether the sender has ever attached and then test whether the
562  * sender has detached, there's a race condition: a sender that
563  * attaches and detaches very quickly might fool us into thinking
564  * the sender never attached at all. So, test whether our
565  * counterparty is definitively gone first, and only afterwards
566  * check whether the sender ever attached in the first place.
567  */
568  counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
569  if (shm_mq_get_sender(mq) == NULL)
570  {
571  if (counterparty_gone)
572  return SHM_MQ_DETACHED;
573  else
574  return SHM_MQ_WOULD_BLOCK;
575  }
576  }
577  else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
578  && shm_mq_get_sender(mq) == NULL)
579  {
580  mq->mq_detached = true;
581  return SHM_MQ_DETACHED;
582  }
583  mqh->mqh_counterparty_attached = true;
584  }
585 
586  /*
587  * If we've consumed an amount of data greater than 1/4th of the ring
588  * size, mark it consumed in shared memory. We try to avoid doing this
589  * unnecessarily when only a small amount of data has been consumed,
590  * because SetLatch() is fairly expensive and we don't want to do it too
591  * often.
592  */
593  if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
594  {
596  mqh->mqh_consume_pending = 0;
597  }
598 
599  /* Try to read, or finish reading, the length word from the buffer. */
600  while (!mqh->mqh_length_word_complete)
601  {
602  /* Try to receive the message length word. */
603  Assert(mqh->mqh_partial_bytes < sizeof(Size));
604  res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
605  nowait, &rb, &rawdata);
606  if (res != SHM_MQ_SUCCESS)
607  return res;
608 
609  /*
610  * Hopefully, we'll receive the entire message length word at once.
611  * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
612  * multiple reads.
613  */
614  if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
615  {
616  Size needed;
617 
618  nbytes = *(Size *) rawdata;
619 
620  /* If we've already got the whole message, we're done. */
621  needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
622  if (rb >= needed)
623  {
624  mqh->mqh_consume_pending += needed;
625  *nbytesp = nbytes;
626  *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
627  return SHM_MQ_SUCCESS;
628  }
629 
630  /*
631  * We don't have the whole message, but we at least have the whole
632  * length word.
633  */
634  mqh->mqh_expected_bytes = nbytes;
635  mqh->mqh_length_word_complete = true;
636  mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
637  rb -= MAXALIGN(sizeof(Size));
638  }
639  else
640  {
641  Size lengthbytes;
642 
643  /* Can't be split unless bigger than required alignment. */
644  Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
645 
646  /* Message word is split; need buffer to reassemble. */
647  if (mqh->mqh_buffer == NULL)
648  {
652  }
653  Assert(mqh->mqh_buflen >= sizeof(Size));
654 
655  /* Copy partial length word; remember to consume it. */
656  if (mqh->mqh_partial_bytes + rb > sizeof(Size))
657  lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
658  else
659  lengthbytes = rb;
660  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
661  lengthbytes);
662  mqh->mqh_partial_bytes += lengthbytes;
663  mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
664  rb -= lengthbytes;
665 
666  /* If we now have the whole word, we're ready to read payload. */
667  if (mqh->mqh_partial_bytes >= sizeof(Size))
668  {
669  Assert(mqh->mqh_partial_bytes == sizeof(Size));
670  mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
671  mqh->mqh_length_word_complete = true;
672  mqh->mqh_partial_bytes = 0;
673  }
674  }
675  }
676  nbytes = mqh->mqh_expected_bytes;
677 
678  if (mqh->mqh_partial_bytes == 0)
679  {
680  /*
681  * Try to obtain the whole message in a single chunk. If this works,
682  * we need not copy the data and can return a pointer directly into
683  * shared memory.
684  */
685  res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
686  if (res != SHM_MQ_SUCCESS)
687  return res;
688  if (rb >= nbytes)
689  {
690  mqh->mqh_length_word_complete = false;
691  mqh->mqh_consume_pending += MAXALIGN(nbytes);
692  *nbytesp = nbytes;
693  *datap = rawdata;
694  return SHM_MQ_SUCCESS;
695  }
696 
697  /*
698  * The message has wrapped the buffer. We'll need to copy it in order
699  * to return it to the client in one chunk. First, make sure we have
700  * a large enough buffer available.
701  */
702  if (mqh->mqh_buflen < nbytes)
703  {
704  Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
705 
706  while (newbuflen < nbytes)
707  newbuflen *= 2;
708 
709  if (mqh->mqh_buffer != NULL)
710  {
711  pfree(mqh->mqh_buffer);
712  mqh->mqh_buffer = NULL;
713  mqh->mqh_buflen = 0;
714  }
715  mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
716  mqh->mqh_buflen = newbuflen;
717  }
718  }
719 
720  /* Loop until we've copied the entire message. */
721  for (;;)
722  {
723  Size still_needed;
724 
725  /* Copy as much as we can. */
726  Assert(mqh->mqh_partial_bytes + rb <= nbytes);
727  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
728  mqh->mqh_partial_bytes += rb;
729 
730  /*
731  * Update count of bytes that can be consumed, accounting for
732  * alignment padding. Note that this will never actually insert any
733  * padding except at the end of a message, because the buffer size is
734  * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
735  */
736  Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
737  mqh->mqh_consume_pending += MAXALIGN(rb);
738 
739  /* If we got all the data, exit the loop. */
740  if (mqh->mqh_partial_bytes >= nbytes)
741  break;
742 
743  /* Wait for some more data. */
744  still_needed = nbytes - mqh->mqh_partial_bytes;
745  res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
746  if (res != SHM_MQ_SUCCESS)
747  return res;
748  if (rb > still_needed)
749  rb = still_needed;
750  }
751 
752  /* Return the complete message, and reset for next message. */
753  *nbytesp = nbytes;
754  *datap = mqh->mqh_buffer;
755  mqh->mqh_length_word_complete = false;
756  mqh->mqh_partial_bytes = 0;
757  return SHM_MQ_SUCCESS;
758 }
759 
760 /*
761  * Wait for the other process that's supposed to use this queue to attach
762  * to it.
763  *
764  * The return value is SHM_MQ_DETACHED if the worker has already detached or
765  * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
766  * Note that we will only be able to detect that the worker has died before
767  * attaching if a background worker handle was passed to shm_mq_attach().
768  */
771 {
772  shm_mq *mq = mqh->mqh_queue;
773  PGPROC **victim;
774 
775  if (shm_mq_get_receiver(mq) == MyProc)
776  victim = &mq->mq_sender;
777  else
778  {
780  victim = &mq->mq_receiver;
781  }
782 
783  if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
784  return SHM_MQ_SUCCESS;
785  else
786  return SHM_MQ_DETACHED;
787 }
788 
789 /*
790  * Detach from a shared message queue, and destroy the shm_mq_handle.
791  */
792 void
794 {
795  /* Notify counterparty that we're outta here. */
797 
798  /* Cancel on_dsm_detach callback, if any. */
799  if (mqh->mqh_segment)
802  PointerGetDatum(mqh->mqh_queue));
803 
804  /* Release local memory associated with handle. */
805  if (mqh->mqh_buffer != NULL)
806  pfree(mqh->mqh_buffer);
807  pfree(mqh);
808 }
809 
810 /*
811  * Notify counterparty that we're detaching from shared message queue.
812  *
813  * The purpose of this function is to make sure that the process
814  * with which we're communicating doesn't block forever waiting for us to
815  * fill or drain the queue once we've lost interest. When the sender
816  * detaches, the receiver can read any messages remaining in the queue;
817  * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
818  * further attempts to send messages will likewise return SHM_MQ_DETACHED.
819  *
820  * This is separated out from shm_mq_detach() because if the on_dsm_detach
821  * callback fires, we only want to do this much. We do not try to touch
822  * the local shm_mq_handle, as it may have been pfree'd already.
823  */
824 static void
826 {
827  PGPROC *victim;
828 
830  if (mq->mq_sender == MyProc)
831  victim = mq->mq_receiver;
832  else
833  {
834  Assert(mq->mq_receiver == MyProc);
835  victim = mq->mq_sender;
836  }
837  mq->mq_detached = true;
839 
840  if (victim != NULL)
841  SetLatch(&victim->procLatch);
842 }
843 
844 /*
845  * Get the shm_mq from handle.
846  */
847 shm_mq *
849 {
850  return mqh->mqh_queue;
851 }
852 
853 /*
854  * Write bytes into a shared message queue.
855  */
856 static shm_mq_result
857 shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
858  bool nowait, Size *bytes_written)
859 {
860  shm_mq *mq = mqh->mqh_queue;
861  Size sent = 0;
862  uint64 used;
863  Size ringsize = mq->mq_ring_size;
864  Size available;
865 
866  while (sent < nbytes)
867  {
868  uint64 rb;
869  uint64 wb;
870 
871  /* Compute number of ring buffer bytes used and available. */
874  Assert(wb >= rb);
875  used = wb - rb;
876  Assert(used <= ringsize);
877  available = Min(ringsize - used, nbytes - sent);
878 
879  /*
880  * Bail out if the queue has been detached. Note that we would be in
881  * trouble if the compiler decided to cache the value of
882  * mq->mq_detached in a register or on the stack across loop
883  * iterations. It probably shouldn't do that anyway since we'll
884  * always return, call an external function that performs a system
885  * call, or reach a memory barrier at some point later in the loop,
886  * but just to be sure, insert a compiler barrier here.
887  */
889  if (mq->mq_detached)
890  {
891  *bytes_written = sent;
892  return SHM_MQ_DETACHED;
893  }
894 
895  if (available == 0 && !mqh->mqh_counterparty_attached)
896  {
897  /*
898  * The queue is full, so if the receiver isn't yet known to be
899  * attached, we must wait for that to happen.
900  */
901  if (nowait)
902  {
903  if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
904  {
905  *bytes_written = sent;
906  return SHM_MQ_DETACHED;
907  }
908  if (shm_mq_get_receiver(mq) == NULL)
909  {
910  *bytes_written = sent;
911  return SHM_MQ_WOULD_BLOCK;
912  }
913  }
914  else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
915  mqh->mqh_handle))
916  {
917  mq->mq_detached = true;
918  *bytes_written = sent;
919  return SHM_MQ_DETACHED;
920  }
921  mqh->mqh_counterparty_attached = true;
922 
923  /*
924  * The receiver may have read some data after attaching, so we
925  * must not wait without rechecking the queue state.
926  */
927  }
928  else if (available == 0)
929  {
930  /*
931  * Since mq->mqh_counterparty_attached is known to be true at this
932  * point, mq_receiver has been set, and it can't change once set.
933  * Therefore, we can read it without acquiring the spinlock.
934  */
937 
938  /* Skip manipulation of our latch if nowait = true. */
939  if (nowait)
940  {
941  *bytes_written = sent;
942  return SHM_MQ_WOULD_BLOCK;
943  }
944 
945  /*
946  * Wait for our latch to be set. It might already be set for some
947  * unrelated reason, but that'll just result in one extra trip
948  * through the loop. It's worth it to avoid resetting the latch
949  * at top of loop, because setting an already-set latch is much
950  * cheaper than setting one that has been reset.
951  */
954 
955  /* Reset the latch so we don't spin. */
957 
958  /* An interrupt may have occurred while we were waiting. */
960  }
961  else
962  {
963  Size offset;
964  Size sendnow;
965 
966  offset = wb % (uint64) ringsize;
967  sendnow = Min(available, ringsize - offset);
968 
969  /*
970  * Write as much data as we can via a single memcpy(). Make sure
971  * these writes happen after the read of mq_bytes_read, above.
972  * This barrier pairs with the one in shm_mq_inc_bytes_read.
973  * (Since we're separating the read of mq_bytes_read from a
974  * subsequent write to mq_ring, we need a full barrier here.)
975  */
977  memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
978  (char *) data + sent, sendnow);
979  sent += sendnow;
980 
981  /*
982  * Update count of bytes written, with alignment padding. Note
983  * that this will never actually insert any padding except at the
984  * end of a run of bytes, because the buffer size is a multiple of
985  * MAXIMUM_ALIGNOF, and each read is as well.
986  */
987  Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
988  shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
989 
990  /*
991  * For efficiency, we don't set the reader's latch here. We'll do
992  * that only when the buffer fills up or after writing an entire
993  * message.
994  */
995  }
996  }
997 
998  *bytes_written = sent;
999  return SHM_MQ_SUCCESS;
1000 }
1001 
1002 /*
1003  * Wait until at least *nbytesp bytes are available to be read from the
1004  * shared message queue, or until the buffer wraps around. If the queue is
1005  * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
1006  * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
1007  * to the location at which data bytes can be read, *nbytesp is set to the
1008  * number of bytes which can be read at that address, and the return value
1009  * is SHM_MQ_SUCCESS.
1010  */
1011 static shm_mq_result
1012 shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
1013  Size *nbytesp, void **datap)
1014 {
1015  shm_mq *mq = mqh->mqh_queue;
1016  Size ringsize = mq->mq_ring_size;
1017  uint64 used;
1018  uint64 written;
1019 
1020  for (;;)
1021  {
1022  Size offset;
1023  uint64 read;
1024 
1025  /* Get bytes written, so we can compute what's available to read. */
1026  written = pg_atomic_read_u64(&mq->mq_bytes_written);
1027 
1028  /*
1029  * Get bytes read. Include bytes we could consume but have not yet
1030  * consumed.
1031  */
1032  read = pg_atomic_read_u64(&mq->mq_bytes_read) +
1033  mqh->mqh_consume_pending;
1034  used = written - read;
1035  Assert(used <= ringsize);
1036  offset = read % (uint64) ringsize;
1037 
1038  /* If we have enough data or buffer has wrapped, we're done. */
1039  if (used >= bytes_needed || offset + used >= ringsize)
1040  {
1041  *nbytesp = Min(used, ringsize - offset);
1042  *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1043 
1044  /*
1045  * Separate the read of mq_bytes_written, above, from caller's
1046  * attempt to read the data itself. Pairs with the barrier in
1047  * shm_mq_inc_bytes_written.
1048  */
1049  pg_read_barrier();
1050  return SHM_MQ_SUCCESS;
1051  }
1052 
1053  /*
1054  * Fall out before waiting if the queue has been detached.
1055  *
1056  * Note that we don't check for this until *after* considering whether
1057  * the data already available is enough, since the receiver can finish
1058  * receiving a message stored in the buffer even after the sender has
1059  * detached.
1060  */
1061  if (mq->mq_detached)
1062  {
1063  /*
1064  * If the writer advanced mq_bytes_written and then set
1065  * mq_detached, we might not have read the final value of
1066  * mq_bytes_written above. Insert a read barrier and then check
1067  * again if mq_bytes_written has advanced.
1068  */
1069  pg_read_barrier();
1070  if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
1071  continue;
1072 
1073  return SHM_MQ_DETACHED;
1074  }
1075 
1076  /*
1077  * We didn't get enough data to satisfy the request, so mark any data
1078  * previously-consumed as read to make more buffer space.
1079  */
1080  if (mqh->mqh_consume_pending > 0)
1081  {
1083  mqh->mqh_consume_pending = 0;
1084  }
1085 
1086  /* Skip manipulation of our latch if nowait = true. */
1087  if (nowait)
1088  return SHM_MQ_WOULD_BLOCK;
1089 
1090  /*
1091  * Wait for our latch to be set. It might already be set for some
1092  * unrelated reason, but that'll just result in one extra trip through
1093  * the loop. It's worth it to avoid resetting the latch at top of
1094  * loop, because setting an already-set latch is much cheaper than
1095  * setting one that has been reset.
1096  */
1099 
1100  /* Reset the latch so we don't spin. */
1102 
1103  /* An interrupt may have occurred while we were waiting. */
1105  }
1106 }
1107 
1108 /*
1109  * Test whether a counterparty who may not even be alive yet is definitely gone.
1110  */
1111 static bool
1113 {
1114  pid_t pid;
1115 
1116  /* If the queue has been detached, counterparty is definitely gone. */
1117  if (mq->mq_detached)
1118  return true;
1119 
1120  /* If there's a handle, check worker status. */
1121  if (handle != NULL)
1122  {
1124 
1125  /* Check for unexpected worker death. */
1126  status = GetBackgroundWorkerPid(handle, &pid);
1127  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1128  {
1129  /* Mark it detached, just to make it official. */
1130  mq->mq_detached = true;
1131  return true;
1132  }
1133  }
1134 
1135  /* Counterparty is not definitively gone. */
1136  return false;
1137 }
1138 
1139 /*
1140  * This is used when a process is waiting for its counterpart to attach to the
1141  * queue. We exit when the other process attaches as expected, or, if
1142  * handle != NULL, when the referenced background process or the postmaster
1143  * dies. Note that if handle == NULL, and the process fails to attach, we'll
1144  * potentially get stuck here forever waiting for a process that may never
1145  * start. We do check for interrupts, though.
1146  *
1147  * ptr is a pointer to the memory address that we're expecting to become
1148  * non-NULL when our counterpart attaches to the queue.
1149  */
1150 static bool
1152 {
1153  bool result = false;
1154 
1155  for (;;)
1156  {
1158  pid_t pid;
1159 
1160  /* Acquire the lock just long enough to check the pointer. */
1161  SpinLockAcquire(&mq->mq_mutex);
1162  result = (*ptr != NULL);
1163  SpinLockRelease(&mq->mq_mutex);
1164 
1165  /* Fail if detached; else succeed if initialized. */
1166  if (mq->mq_detached)
1167  {
1168  result = false;
1169  break;
1170  }
1171  if (result)
1172  break;
1173 
1174  if (handle != NULL)
1175  {
1176  /* Check for unexpected worker death. */
1177  status = GetBackgroundWorkerPid(handle, &pid);
1178  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1179  {
1180  result = false;
1181  break;
1182  }
1183  }
1184 
1185  /* Wait to be signalled. */
1188 
1189  /* Reset the latch so we don't spin. */
1191 
1192  /* An interrupt may have occurred while we were waiting. */
1194  }
1195 
1196  return result;
1197 }
1198 
1199 /*
1200  * Increment the number of bytes read.
1201  */
1202 static void
1204 {
1205  PGPROC *sender;
1206 
1207  /*
1208  * Separate prior reads of mq_ring from the increment of mq_bytes_read
1209  * which follows. This pairs with the full barrier in
1210  * shm_mq_send_bytes(). We only need a read barrier here because the
1211  * increment of mq_bytes_read is actually a read followed by a dependent
1212  * write.
1213  */
1214  pg_read_barrier();
1215 
1216  /*
1217  * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1218  * else can be changing this value. This method should be cheaper.
1219  */
1221  pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1222 
1223  /*
1224  * We shouldn't have any bytes to read without a sender, so we can read
1225  * mq_sender here without a lock. Once it's initialized, it can't change.
1226  */
1227  sender = mq->mq_sender;
1228  Assert(sender != NULL);
1229  SetLatch(&sender->procLatch);
1230 }
1231 
1232 /*
1233  * Increment the number of bytes written.
1234  */
1235 static void
1237 {
1238  /*
1239  * Separate prior reads of mq_ring from the write of mq_bytes_written
1240  * which we're about to do. Pairs with the read barrier found in
1241  * shm_mq_receive_bytes.
1242  */
1243  pg_write_barrier();
1244 
1245  /*
1246  * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1247  * else can be changing this value. This method avoids taking the bus
1248  * lock unnecessarily.
1249  */
1252 }
1253 
1254 /* Shim for on_dsm_detach callback. */
1255 static void
1257 {
1258  shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1259 
1261 }
int slock_t
Definition: s_lock.h:934
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
Definition: shm_mq.c:1236
Size mqh_partial_bytes
Definition: shm_mq.c:138
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:793
PGPROC * MyProc
Definition: proc.c:68
#define PointerGetDatum(X)
Definition: postgres.h:556
#define SpinLockInit(lock)
Definition: spin.h:60
#define Min(x, y)
Definition: c.h:904
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:249
unsigned char uint8
Definition: c.h:356
PGPROC * mq_receiver
Definition: shm_mq.c:73
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:234
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:134
Size mqh_consume_pending
Definition: shm_mq.c:137
void SetLatch(Latch *latch)
Definition: latch.c:436
bool mqh_counterparty_attached
Definition: shm_mq.c:141
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:975
void ResetLatch(Latch *latch)
Definition: latch.c:519
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
const Size shm_mq_minimum_size
Definition: shm_mq.c:160
Latch procLatch
Definition: proc.h:104
char * mqh_buffer
Definition: shm_mq.c:135
Size len
Definition: shm_mq.h:32
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:344
#define pg_compiler_barrier()
Definition: atomics.h:133
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1151
#define SpinLockAcquire(lock)
Definition: spin.h:62
bool mqh_length_word_complete
Definition: shm_mq.c:140
void pfree(void *pointer)
Definition: mcxt.c:1056
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:415
const char * data
Definition: shm_mq.h:31
shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh)
Definition: shm_mq.c:770
#define MQH_INITIAL_BUFSIZE
Definition: shm_mq.c:163
slock_t mq_mutex
Definition: shm_mq.c:72
shm_mq * shm_mq_create(void *address, Size size)
Definition: shm_mq.c:169
Size mqh_buflen
Definition: shm_mq.c:136
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition: shm_mq.c:1012
dsm_segment * mqh_segment
Definition: shm_mq.c:133
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
Size mqh_expected_bytes
Definition: shm_mq.c:139
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
Definition: shm_mq.c:1203
BgwHandleStatus
Definition: bgworker.h:102
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition: shm_mq.c:857
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:216
static void shm_mq_detach_internal(shm_mq *mq)
Definition: shm_mq.c:825
#define SpinLockRelease(lock)
Definition: spin.h:64
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1112
uintptr_t Datum
Definition: postgres.h:367
shm_mq * mqh_queue
Definition: shm_mq.c:132
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:310
bool mq_detached
Definition: shm_mq.c:78
shm_mq_result
Definition: shm_mq.h:36
#define pg_memory_barrier()
Definition: atomics.h:145
#define Max(x, y)
Definition: c.h:898
#define Assert(condition)
Definition: c.h:732
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:75
#define pg_read_barrier()
Definition: atomics.h:158
size_t Size
Definition: c.h:466
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
Definition: shm_mq.c:320
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
Definition: shm_mq.c:346
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
Definition: shm_mq.c:848
#define MAXALIGN(LEN)
Definition: c.h:685
Size mq_ring_size
Definition: shm_mq.c:77
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
Definition: shm_mq.c:80
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:282
#define DatumGetPointer(X)
Definition: postgres.h:549
static StringInfoData tmpbuf
Definition: walsender.c:154
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
void * palloc(Size size)
Definition: mcxt.c:949
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1256
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
int i
Definition: shm_mq.c:70
#define pg_write_barrier()
Definition: atomics.h:159
void * arg
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
uint8 mq_ring_offset
Definition: shm_mq.c:79
MemoryContext mqh_context
Definition: shm_mq.c:142
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:198
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:227
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:540
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:990
Definition: proc.h:95
#define WL_LATCH_SET
Definition: latch.h:124
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:76
#define read(a, b, c)
Definition: win32.h:13
#define offsetof(type, field)
Definition: c.h:655
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1044
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
#define MAXALIGN_DOWN(LEN)
Definition: c.h:697
PGPROC * mq_sender
Definition: shm_mq.c:74