PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
shm_mq.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * shm_mq.c
4  * single-reader, single-writer shared memory message queue
5  *
6  * Both the sender and the receiver must have a PGPROC; their respective
7  * process latches are used for synchronization. Only the sender may send,
8  * and only the receiver may receive. This is intended to allow a user
9  * backend to communicate with worker backends that it has registered.
10  *
11  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  * src/include/storage/shm_mq.h
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "postmaster/bgworker.h"
24 #include "storage/procsignal.h"
25 #include "storage/shm_mq.h"
26 #include "storage/spin.h"
27 
28 /*
29  * This structure represents the actual queue, stored in shared memory.
30  *
31  * Some notes on synchronization:
32  *
33  * mq_receiver and mq_bytes_read can only be changed by the receiver; and
34  * mq_sender and mq_bytes_written can only be changed by the sender. However,
35  * because most of these fields are 8 bytes and we don't assume that 8 byte
36  * reads and writes are atomic, the spinlock must be taken whenever the field
37  * is updated, and whenever it is read by a process other than the one allowed
38  * to modify it. But the process that is allowed to modify it is also allowed
39  * to read it without the lock. On architectures where 8-byte writes are
40  * atomic, we could replace these spinlocks with memory barriers, but
41  * testing found no performance benefit, so it seems best to keep things
42  * simple for now.
43  *
44  * mq_detached can be set by either the sender or the receiver, so the mutex
45  * must be held to read or write it. Memory barriers could be used here as
46  * well, if needed.
47  *
48  * mq_ring_size and mq_ring_offset never change after initialization, and
49  * can therefore be read without the lock.
50  *
51  * Importantly, mq_ring can be safely read and written without a lock. Were
52  * this not the case, we'd have to hold the spinlock for much longer
53  * intervals, and performance might suffer. Fortunately, that's not
54  * necessary. At any given time, the difference between mq_bytes_read and
55  * mq_bytes_written defines the number of bytes within mq_ring that contain
56  * unread data, and mq_bytes_read defines the position where those bytes
57  * begin. The sender can increase the number of unread bytes at any time,
58  * but only the receiver can give license to overwrite those bytes, by
59  * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
60  * the unread bytes it knows to be present without the lock. Conversely,
61  * the sender can write to the unused portion of the ring buffer without
62  * the lock, because nobody else can be reading or writing those bytes. The
63  * receiver could be making more bytes unused by incrementing mq_bytes_read,
64  * but that's OK. Note that it would be unsafe for the receiver to read any
65  * data it's already marked as read, or to write any data; and it would be
66  * unsafe for the sender to reread any data after incrementing
67  * mq_bytes_written, but fortunately there's no need for any of that.
68  */
69 struct shm_mq
70 {
74  uint64 mq_bytes_read;
79  char mq_ring[FLEXIBLE_ARRAY_MEMBER];
80 };
81 
82 /*
83  * This structure is a backend-private handle for access to a queue.
84  *
85  * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
86  * a pointer to the dynamic shared memory segment that contains it.
87  *
88  * If this queue is intended to connect the current process with a background
89  * worker that started it, the user can pass a pointer to the worker handle
90  * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
91  * is to allow us to begin sending to or receiving from that queue before the
92  * process we'll be communicating with has even been started. If it fails
93  * to start, the handle will allow us to notice that and fail cleanly, rather
94  * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
95  * simple cases - e.g. where there are just 2 processes communicating; in
96  * more complex scenarios, every process may not have a BackgroundWorkerHandle
97  * available, or may need to watch for the failure of more than one other
98  * process at a time.
99  *
100  * When a message exists as a contiguous chunk of bytes in the queue - that is,
101  * it is smaller than the size of the ring buffer and does not wrap around
102  * the end - we return the message to the caller as a pointer into the buffer.
103  * For messages that are larger or happen to wrap, we reassemble the message
104  * locally by copying the chunks into a backend-local buffer. mqh_buffer is
105  * the buffer, and mqh_buflen is the number of bytes allocated for it.
106  *
107  * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
108  * are used to track the state of non-blocking operations. When the caller
109  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
110  * are expected to retry the call at a later time with the same argument;
111  * we need to retain enough state to pick up where we left off.
112  * mqh_length_word_complete tracks whether we are done sending or receiving
113  * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
114  * the number of bytes read or written for either the length word or the
115  * message itself, and mqh_expected_bytes - which is used only for reads -
116  * tracks the expected total size of the payload.
117  *
118  * mqh_counterparty_attached tracks whether we know the counterparty to have
119  * attached to the queue at some previous point. This lets us avoid some
120  * mutex acquisitions.
121  *
122  * mqh_context is the memory context in effect at the time we attached to
123  * the shm_mq. The shm_mq_handle itself is allocated in this context, and
124  * we make sure any other allocations we do happen in this context as well,
125  * to avoid nasty surprises.
126  */
128 {
132  char *mqh_buffer;
140 };
141 
143  const void *data, bool nowait, Size *bytes_written);
144 static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
145  bool nowait, Size *nbytesp, void **datap);
146 static bool shm_mq_counterparty_gone(volatile shm_mq *mq,
147  BackgroundWorkerHandle *handle);
148 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile * ptr,
149  BackgroundWorkerHandle *handle);
150 static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
151 static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n);
152 static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
153 static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n);
154 static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
155 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
156 
157 /* Minimum queue size is enough for header and at least one chunk of data. */
159 MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
160 
161 #define MQH_INITIAL_BUFSIZE 8192
162 
163 /*
164  * Initialize a new shared message queue.
165  */
166 shm_mq *
167 shm_mq_create(void *address, Size size)
168 {
169  shm_mq *mq = address;
170  Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
171 
172  /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
173  size = MAXALIGN_DOWN(size);
174 
175  /* Queue size must be large enough to hold some data. */
176  Assert(size > data_offset);
177 
178  /* Initialize queue header. */
179  SpinLockInit(&mq->mq_mutex);
180  mq->mq_receiver = NULL;
181  mq->mq_sender = NULL;
182  mq->mq_bytes_read = 0;
183  mq->mq_bytes_written = 0;
184  mq->mq_ring_size = size - data_offset;
185  mq->mq_detached = false;
186  mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
187 
188  return mq;
189 }
190 
191 /*
192  * Set the identity of the process that will receive from a shared message
193  * queue.
194  */
195 void
197 {
198  volatile shm_mq *vmq = mq;
199  PGPROC *sender;
200 
202  Assert(vmq->mq_receiver == NULL);
203  vmq->mq_receiver = proc;
204  sender = vmq->mq_sender;
206 
207  if (sender != NULL)
208  SetLatch(&sender->procLatch);
209 }
210 
211 /*
212  * Set the identity of the process that will send to a shared message queue.
213  */
214 void
216 {
217  volatile shm_mq *vmq = mq;
218  PGPROC *receiver;
219 
221  Assert(vmq->mq_sender == NULL);
222  vmq->mq_sender = proc;
223  receiver = vmq->mq_receiver;
225 
226  if (receiver != NULL)
227  SetLatch(&receiver->procLatch);
228 }
229 
230 /*
231  * Get the configured receiver.
232  */
233 PGPROC *
235 {
236  volatile shm_mq *vmq = mq;
237  PGPROC *receiver;
238 
240  receiver = vmq->mq_receiver;
242 
243  return receiver;
244 }
245 
246 /*
247  * Get the configured sender.
248  */
249 PGPROC *
251 {
252  volatile shm_mq *vmq = mq;
253  PGPROC *sender;
254 
256  sender = vmq->mq_sender;
258 
259  return sender;
260 }
261 
262 /*
263  * Attach to a shared message queue so we can send or receive messages.
264  *
265  * The memory context in effect at the time this function is called should
266  * be one which will last for at least as long as the message queue itself.
267  * We'll allocate the handle in that context, and future allocations that
268  * are needed to buffer incoming data will happen in that context as well.
269  *
270  * If seg != NULL, the queue will be automatically detached when that dynamic
271  * shared memory segment is detached.
272  *
273  * If handle != NULL, the queue can be read or written even before the
274  * other process has attached. We'll wait for it to do so if needed. The
275  * handle must be for a background worker initialized with bgw_notify_pid
276  * equal to our PID.
277  *
278  * shm_mq_detach() should be called when done. This will free the
279  * shm_mq_handle and mark the queue itself as detached, so that our
280  * counterpart won't get stuck waiting for us to fill or drain the queue
281  * after we've already lost interest.
282  */
285 {
286  shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
287 
288  Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
289  mqh->mqh_queue = mq;
290  mqh->mqh_segment = seg;
291  mqh->mqh_buffer = NULL;
292  mqh->mqh_handle = handle;
293  mqh->mqh_buflen = 0;
294  mqh->mqh_consume_pending = 0;
296  mqh->mqh_partial_bytes = 0;
297  mqh->mqh_length_word_complete = false;
298  mqh->mqh_counterparty_attached = false;
299 
300  if (seg != NULL)
302 
303  return mqh;
304 }
305 
306 /*
307  * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
308  * been passed to shm_mq_attach.
309  */
310 void
312 {
313  Assert(mqh->mqh_handle == NULL);
314  mqh->mqh_handle = handle;
315 }
316 
317 /*
318  * Write a message into a shared message queue.
319  */
321 shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
322 {
323  shm_mq_iovec iov;
324 
325  iov.data = data;
326  iov.len = nbytes;
327 
328  return shm_mq_sendv(mqh, &iov, 1, nowait);
329 }
330 
331 /*
332  * Write a message into a shared message queue, gathered from multiple
333  * addresses.
334  *
335  * When nowait = false, we'll wait on our process latch when the ring buffer
336  * fills up, and then continue writing once the receiver has drained some data.
337  * The process latch is reset after each wait.
338  *
339  * When nowait = true, we do not manipulate the state of the process latch;
340  * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
341  * this case, the caller should call this function again, with the same
342  * arguments, each time the process latch is set. (Once begun, the sending
343  * of a message cannot be aborted except by detaching from the queue; changing
344  * the length or payload will corrupt the queue.)
345  */
347 shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
348 {
349  shm_mq_result res;
350  shm_mq *mq = mqh->mqh_queue;
351  Size nbytes = 0;
352  Size bytes_written;
353  int i;
354  int which_iov = 0;
355  Size offset;
356 
357  Assert(mq->mq_sender == MyProc);
358 
359  /* Compute total size of write. */
360  for (i = 0; i < iovcnt; ++i)
361  nbytes += iov[i].len;
362 
363  /* Try to write, or finish writing, the length word into the buffer. */
364  while (!mqh->mqh_length_word_complete)
365  {
366  Assert(mqh->mqh_partial_bytes < sizeof(Size));
367  res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
368  ((char *) &nbytes) +mqh->mqh_partial_bytes,
369  nowait, &bytes_written);
370 
371  if (res == SHM_MQ_DETACHED)
372  {
373  /* Reset state in case caller tries to send another message. */
374  mqh->mqh_partial_bytes = 0;
375  mqh->mqh_length_word_complete = false;
376  return res;
377  }
378  mqh->mqh_partial_bytes += bytes_written;
379 
380  if (mqh->mqh_partial_bytes >= sizeof(Size))
381  {
382  Assert(mqh->mqh_partial_bytes == sizeof(Size));
383 
384  mqh->mqh_partial_bytes = 0;
385  mqh->mqh_length_word_complete = true;
386  }
387 
388  if (res != SHM_MQ_SUCCESS)
389  return res;
390 
391  /* Length word can't be split unless bigger than required alignment. */
392  Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
393  }
394 
395  /* Write the actual data bytes into the buffer. */
396  Assert(mqh->mqh_partial_bytes <= nbytes);
397  offset = mqh->mqh_partial_bytes;
398  do
399  {
400  Size chunksize;
401 
402  /* Figure out which bytes need to be sent next. */
403  if (offset >= iov[which_iov].len)
404  {
405  offset -= iov[which_iov].len;
406  ++which_iov;
407  if (which_iov >= iovcnt)
408  break;
409  continue;
410  }
411 
412  /*
413  * We want to avoid copying the data if at all possible, but every
414  * chunk of bytes we write into the queue has to be MAXALIGN'd, except
415  * the last. Thus, if a chunk other than the last one ends on a
416  * non-MAXALIGN'd boundary, we have to combine the tail end of its
417  * data with data from one or more following chunks until we either
418  * reach the last chunk or accumulate a number of bytes which is
419  * MAXALIGN'd.
420  */
421  if (which_iov + 1 < iovcnt &&
422  offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
423  {
424  char tmpbuf[MAXIMUM_ALIGNOF];
425  int j = 0;
426 
427  for (;;)
428  {
429  if (offset < iov[which_iov].len)
430  {
431  tmpbuf[j] = iov[which_iov].data[offset];
432  j++;
433  offset++;
434  if (j == MAXIMUM_ALIGNOF)
435  break;
436  }
437  else
438  {
439  offset -= iov[which_iov].len;
440  which_iov++;
441  if (which_iov >= iovcnt)
442  break;
443  }
444  }
445 
446  res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
447 
448  if (res == SHM_MQ_DETACHED)
449  {
450  /* Reset state in case caller tries to send another message. */
451  mqh->mqh_partial_bytes = 0;
452  mqh->mqh_length_word_complete = false;
453  return res;
454  }
455 
456  mqh->mqh_partial_bytes += bytes_written;
457  if (res != SHM_MQ_SUCCESS)
458  return res;
459  continue;
460  }
461 
462  /*
463  * If this is the last chunk, we can write all the data, even if it
464  * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
465  * MAXALIGN_DOWN the write size.
466  */
467  chunksize = iov[which_iov].len - offset;
468  if (which_iov + 1 < iovcnt)
469  chunksize = MAXALIGN_DOWN(chunksize);
470  res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
471  nowait, &bytes_written);
472 
473  if (res == SHM_MQ_DETACHED)
474  {
475  /* Reset state in case caller tries to send another message. */
476  mqh->mqh_length_word_complete = false;
477  mqh->mqh_partial_bytes = 0;
478  return res;
479  }
480 
481  mqh->mqh_partial_bytes += bytes_written;
482  offset += bytes_written;
483  if (res != SHM_MQ_SUCCESS)
484  return res;
485  } while (mqh->mqh_partial_bytes < nbytes);
486 
487  /* Reset for next message. */
488  mqh->mqh_partial_bytes = 0;
489  mqh->mqh_length_word_complete = false;
490 
491  /* Notify receiver of the newly-written data, and return. */
492  return shm_mq_notify_receiver(mq);
493 }
494 
495 /*
496  * Receive a message from a shared message queue.
497  *
498  * We set *nbytes to the message length and *data to point to the message
499  * payload. If the entire message exists in the queue as a single,
500  * contiguous chunk, *data will point directly into shared memory; otherwise,
501  * it will point to a temporary buffer. This mostly avoids data copying in
502  * the hoped-for case where messages are short compared to the buffer size,
503  * while still allowing longer messages. In either case, the return value
504  * remains valid until the next receive operation is performed on the queue.
505  *
506  * When nowait = false, we'll wait on our process latch when the ring buffer
507  * is empty and we have not yet received a full message. The sender will
508  * set our process latch after more data has been written, and we'll resume
509  * processing. Each call will therefore return a complete message
510  * (unless the sender detaches the queue).
511  *
512  * When nowait = true, we do not manipulate the state of the process latch;
513  * instead, whenever the buffer is empty and we need to read from it, we
514  * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
515  * function again after the process latch has been set.
516  */
518 shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
519 {
520  shm_mq *mq = mqh->mqh_queue;
521  shm_mq_result res;
522  Size rb = 0;
523  Size nbytes;
524  void *rawdata;
525 
526  Assert(mq->mq_receiver == MyProc);
527 
528  /* We can't receive data until the sender has attached. */
529  if (!mqh->mqh_counterparty_attached)
530  {
531  if (nowait)
532  {
533  int counterparty_gone;
534 
535  /*
536  * We shouldn't return at this point at all unless the sender
537  * hasn't attached yet. However, the correct return value depends
538  * on whether the sender is still attached. If we first test
539  * whether the sender has ever attached and then test whether the
540  * sender has detached, there's a race condition: a sender that
541  * attaches and detaches very quickly might fool us into thinking
542  * the sender never attached at all. So, test whether our
543  * counterparty is definitively gone first, and only afterwards
544  * check whether the sender ever attached in the first place.
545  */
546  counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
547  if (shm_mq_get_sender(mq) == NULL)
548  {
549  if (counterparty_gone)
550  return SHM_MQ_DETACHED;
551  else
552  return SHM_MQ_WOULD_BLOCK;
553  }
554  }
555  else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
556  && shm_mq_get_sender(mq) == NULL)
557  {
558  mq->mq_detached = true;
559  return SHM_MQ_DETACHED;
560  }
561  mqh->mqh_counterparty_attached = true;
562  }
563 
564  /* Consume any zero-copy data from previous receive operation. */
565  if (mqh->mqh_consume_pending > 0)
566  {
568  mqh->mqh_consume_pending = 0;
569  }
570 
571  /* Try to read, or finish reading, the length word from the buffer. */
572  while (!mqh->mqh_length_word_complete)
573  {
574  /* Try to receive the message length word. */
575  Assert(mqh->mqh_partial_bytes < sizeof(Size));
576  res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes,
577  nowait, &rb, &rawdata);
578  if (res != SHM_MQ_SUCCESS)
579  return res;
580 
581  /*
582  * Hopefully, we'll receive the entire message length word at once.
583  * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
584  * multiple reads.
585  */
586  if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
587  {
588  Size needed;
589 
590  nbytes = *(Size *) rawdata;
591 
592  /* If we've already got the whole message, we're done. */
593  needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
594  if (rb >= needed)
595  {
596  /*
597  * Technically, we could consume the message length
598  * information at this point, but the extra write to shared
599  * memory wouldn't be free and in most cases we would reap no
600  * benefit.
601  */
602  mqh->mqh_consume_pending = needed;
603  *nbytesp = nbytes;
604  *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
605  return SHM_MQ_SUCCESS;
606  }
607 
608  /*
609  * We don't have the whole message, but we at least have the whole
610  * length word.
611  */
612  mqh->mqh_expected_bytes = nbytes;
613  mqh->mqh_length_word_complete = true;
614  shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size)));
615  rb -= MAXALIGN(sizeof(Size));
616  }
617  else
618  {
619  Size lengthbytes;
620 
621  /* Can't be split unless bigger than required alignment. */
622  Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
623 
624  /* Message word is split; need buffer to reassemble. */
625  if (mqh->mqh_buffer == NULL)
626  {
630  }
631  Assert(mqh->mqh_buflen >= sizeof(Size));
632 
633  /* Copy and consume partial length word. */
634  if (mqh->mqh_partial_bytes + rb > sizeof(Size))
635  lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
636  else
637  lengthbytes = rb;
638  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
639  lengthbytes);
640  mqh->mqh_partial_bytes += lengthbytes;
641  shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes));
642  rb -= lengthbytes;
643 
644  /* If we now have the whole word, we're ready to read payload. */
645  if (mqh->mqh_partial_bytes >= sizeof(Size))
646  {
647  Assert(mqh->mqh_partial_bytes == sizeof(Size));
648  mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
649  mqh->mqh_length_word_complete = true;
650  mqh->mqh_partial_bytes = 0;
651  }
652  }
653  }
654  nbytes = mqh->mqh_expected_bytes;
655 
656  if (mqh->mqh_partial_bytes == 0)
657  {
658  /*
659  * Try to obtain the whole message in a single chunk. If this works,
660  * we need not copy the data and can return a pointer directly into
661  * shared memory.
662  */
663  res = shm_mq_receive_bytes(mq, nbytes, nowait, &rb, &rawdata);
664  if (res != SHM_MQ_SUCCESS)
665  return res;
666  if (rb >= nbytes)
667  {
668  mqh->mqh_length_word_complete = false;
669  mqh->mqh_consume_pending = MAXALIGN(nbytes);
670  *nbytesp = nbytes;
671  *datap = rawdata;
672  return SHM_MQ_SUCCESS;
673  }
674 
675  /*
676  * The message has wrapped the buffer. We'll need to copy it in order
677  * to return it to the client in one chunk. First, make sure we have
678  * a large enough buffer available.
679  */
680  if (mqh->mqh_buflen < nbytes)
681  {
682  Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
683 
684  while (newbuflen < nbytes)
685  newbuflen *= 2;
686 
687  if (mqh->mqh_buffer != NULL)
688  {
689  pfree(mqh->mqh_buffer);
690  mqh->mqh_buffer = NULL;
691  mqh->mqh_buflen = 0;
692  }
693  mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
694  mqh->mqh_buflen = newbuflen;
695  }
696  }
697 
698  /* Loop until we've copied the entire message. */
699  for (;;)
700  {
701  Size still_needed;
702 
703  /* Copy as much as we can. */
704  Assert(mqh->mqh_partial_bytes + rb <= nbytes);
705  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
706  mqh->mqh_partial_bytes += rb;
707 
708  /*
709  * Update count of bytes read, with alignment padding. Note that this
710  * will never actually insert any padding except at the end of a
711  * message, because the buffer size is a multiple of MAXIMUM_ALIGNOF,
712  * and each read and write is as well.
713  */
714  Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
716 
717  /* If we got all the data, exit the loop. */
718  if (mqh->mqh_partial_bytes >= nbytes)
719  break;
720 
721  /* Wait for some more data. */
722  still_needed = nbytes - mqh->mqh_partial_bytes;
723  res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
724  if (res != SHM_MQ_SUCCESS)
725  return res;
726  if (rb > still_needed)
727  rb = still_needed;
728  }
729 
730  /* Return the complete message, and reset for next message. */
731  *nbytesp = nbytes;
732  *datap = mqh->mqh_buffer;
733  mqh->mqh_length_word_complete = false;
734  mqh->mqh_partial_bytes = 0;
735  return SHM_MQ_SUCCESS;
736 }
737 
738 /*
739  * Wait for the other process that's supposed to use this queue to attach
740  * to it.
741  *
742  * The return value is SHM_MQ_DETACHED if the worker has already detached or
743  * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
744  * Note that we will only be able to detect that the worker has died before
745  * attaching if a background worker handle was passed to shm_mq_attach().
746  */
749 {
750  shm_mq *mq = mqh->mqh_queue;
751  PGPROC **victim;
752 
753  if (shm_mq_get_receiver(mq) == MyProc)
754  victim = &mq->mq_sender;
755  else
756  {
758  victim = &mq->mq_receiver;
759  }
760 
761  if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
762  return SHM_MQ_SUCCESS;
763  else
764  return SHM_MQ_DETACHED;
765 }
766 
767 /*
768  * Detach a shared message queue.
769  *
770  * The purpose of this function is to make sure that the process
771  * with which we're communicating doesn't block forever waiting for us to
772  * fill or drain the queue once we've lost interest. Whem the sender
773  * detaches, the receiver can read any messages remaining in the queue;
774  * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
775  * further attempts to send messages will likewise return SHM_MQ_DETACHED.
776  */
777 void
779 {
780  volatile shm_mq *vmq = mq;
781  PGPROC *victim;
782 
784  if (vmq->mq_sender == MyProc)
785  victim = vmq->mq_receiver;
786  else
787  {
788  Assert(vmq->mq_receiver == MyProc);
789  victim = vmq->mq_sender;
790  }
791  vmq->mq_detached = true;
793 
794  if (victim != NULL)
795  SetLatch(&victim->procLatch);
796 }
797 
798 /*
799  * Get the shm_mq from handle.
800  */
801 shm_mq *
803 {
804  return mqh->mqh_queue;
805 }
806 
807 /*
808  * Write bytes into a shared message queue.
809  */
810 static shm_mq_result
811 shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
812  bool nowait, Size *bytes_written)
813 {
814  shm_mq *mq = mqh->mqh_queue;
815  Size sent = 0;
816  uint64 used;
817  Size ringsize = mq->mq_ring_size;
818  Size available;
819 
820  while (sent < nbytes)
821  {
822  bool detached;
823  uint64 rb;
824 
825  /* Compute number of ring buffer bytes used and available. */
826  rb = shm_mq_get_bytes_read(mq, &detached);
827  Assert(mq->mq_bytes_written >= rb);
828  used = mq->mq_bytes_written - rb;
829  Assert(used <= ringsize);
830  available = Min(ringsize - used, nbytes - sent);
831 
832  /* Bail out if the queue has been detached. */
833  if (detached)
834  {
835  *bytes_written = sent;
836  return SHM_MQ_DETACHED;
837  }
838 
839  if (available == 0 && !mqh->mqh_counterparty_attached)
840  {
841  /*
842  * The queue is full, so if the receiver isn't yet known to be
843  * attached, we must wait for that to happen.
844  */
845  if (nowait)
846  {
847  if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
848  {
849  *bytes_written = sent;
850  return SHM_MQ_DETACHED;
851  }
852  if (shm_mq_get_receiver(mq) == NULL)
853  {
854  *bytes_written = sent;
855  return SHM_MQ_WOULD_BLOCK;
856  }
857  }
858  else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
859  mqh->mqh_handle))
860  {
861  mq->mq_detached = true;
862  *bytes_written = sent;
863  return SHM_MQ_DETACHED;
864  }
865  mqh->mqh_counterparty_attached = true;
866 
867  /*
868  * The receiver may have read some data after attaching, so we
869  * must not wait without rechecking the queue state.
870  */
871  }
872  else if (available == 0)
873  {
874  shm_mq_result res;
875 
876  /* Let the receiver know that we need them to read some data. */
877  res = shm_mq_notify_receiver(mq);
878  if (res != SHM_MQ_SUCCESS)
879  {
880  *bytes_written = sent;
881  return res;
882  }
883 
884  /* Skip manipulation of our latch if nowait = true. */
885  if (nowait)
886  {
887  *bytes_written = sent;
888  return SHM_MQ_WOULD_BLOCK;
889  }
890 
891  /*
892  * Wait for our latch to be set. It might already be set for some
893  * unrelated reason, but that'll just result in one extra trip
894  * through the loop. It's worth it to avoid resetting the latch
895  * at top of loop, because setting an already-set latch is much
896  * cheaper than setting one that has been reset.
897  */
899 
900  /* Reset the latch so we don't spin. */
902 
903  /* An interrupt may have occurred while we were waiting. */
905  }
906  else
907  {
908  Size offset = mq->mq_bytes_written % (uint64) ringsize;
909  Size sendnow = Min(available, ringsize - offset);
910 
911  /* Write as much data as we can via a single memcpy(). */
912  memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
913  (char *) data + sent, sendnow);
914  sent += sendnow;
915 
916  /*
917  * Update count of bytes written, with alignment padding. Note
918  * that this will never actually insert any padding except at the
919  * end of a run of bytes, because the buffer size is a multiple of
920  * MAXIMUM_ALIGNOF, and each read is as well.
921  */
922  Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
923  shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
924 
925  /*
926  * For efficiency, we don't set the reader's latch here. We'll do
927  * that only when the buffer fills up or after writing an entire
928  * message.
929  */
930  }
931  }
932 
933  *bytes_written = sent;
934  return SHM_MQ_SUCCESS;
935 }
936 
937 /*
938  * Wait until at least *nbytesp bytes are available to be read from the
939  * shared message queue, or until the buffer wraps around. If the queue is
940  * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
941  * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
942  * to the location at which data bytes can be read, *nbytesp is set to the
943  * number of bytes which can be read at that address, and the return value
944  * is SHM_MQ_SUCCESS.
945  */
946 static shm_mq_result
947 shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
948  Size *nbytesp, void **datap)
949 {
950  Size ringsize = mq->mq_ring_size;
951  uint64 used;
952  uint64 written;
953 
954  for (;;)
955  {
956  Size offset;
957  bool detached;
958 
959  /* Get bytes written, so we can compute what's available to read. */
960  written = shm_mq_get_bytes_written(mq, &detached);
961  used = written - mq->mq_bytes_read;
962  Assert(used <= ringsize);
963  offset = mq->mq_bytes_read % (uint64) ringsize;
964 
965  /* If we have enough data or buffer has wrapped, we're done. */
966  if (used >= bytes_needed || offset + used >= ringsize)
967  {
968  *nbytesp = Min(used, ringsize - offset);
969  *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
970  return SHM_MQ_SUCCESS;
971  }
972 
973  /*
974  * Fall out before waiting if the queue has been detached.
975  *
976  * Note that we don't check for this until *after* considering whether
977  * the data already available is enough, since the receiver can finish
978  * receiving a message stored in the buffer even after the sender has
979  * detached.
980  */
981  if (detached)
982  return SHM_MQ_DETACHED;
983 
984  /* Skip manipulation of our latch if nowait = true. */
985  if (nowait)
986  return SHM_MQ_WOULD_BLOCK;
987 
988  /*
989  * Wait for our latch to be set. It might already be set for some
990  * unrelated reason, but that'll just result in one extra trip through
991  * the loop. It's worth it to avoid resetting the latch at top of
992  * loop, because setting an already-set latch is much cheaper than
993  * setting one that has been reset.
994  */
996 
997  /* Reset the latch so we don't spin. */
999 
1000  /* An interrupt may have occurred while we were waiting. */
1002  }
1003 }
1004 
1005 /*
1006  * Test whether a counterparty who may not even be alive yet is definitely gone.
1007  */
1008 static bool
1010 {
1011  bool detached;
1012  pid_t pid;
1013 
1014  /* Acquire the lock just long enough to check the pointer. */
1015  SpinLockAcquire(&mq->mq_mutex);
1016  detached = mq->mq_detached;
1017  SpinLockRelease(&mq->mq_mutex);
1018 
1019  /* If the queue has been detached, counterparty is definitely gone. */
1020  if (detached)
1021  return true;
1022 
1023  /* If there's a handle, check worker status. */
1024  if (handle != NULL)
1025  {
1027 
1028  /* Check for unexpected worker death. */
1029  status = GetBackgroundWorkerPid(handle, &pid);
1030  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1031  {
1032  /* Mark it detached, just to make it official. */
1033  SpinLockAcquire(&mq->mq_mutex);
1034  mq->mq_detached = true;
1035  SpinLockRelease(&mq->mq_mutex);
1036  return true;
1037  }
1038  }
1039 
1040  /* Counterparty is not definitively gone. */
1041  return false;
1042 }
1043 
1044 /*
1045  * This is used when a process is waiting for its counterpart to attach to the
1046  * queue. We exit when the other process attaches as expected, or, if
1047  * handle != NULL, when the referenced background process or the postmaster
1048  * dies. Note that if handle == NULL, and the process fails to attach, we'll
1049  * potentially get stuck here forever waiting for a process that may never
1050  * start. We do check for interrupts, though.
1051  *
1052  * ptr is a pointer to the memory address that we're expecting to become
1053  * non-NULL when our counterpart attaches to the queue.
1054  */
1055 static bool
1056 shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile * ptr,
1057  BackgroundWorkerHandle *handle)
1058 {
1059  bool result = false;
1060 
1061  for (;;)
1062  {
1064  pid_t pid;
1065  bool detached;
1066 
1067  /* Acquire the lock just long enough to check the pointer. */
1068  SpinLockAcquire(&mq->mq_mutex);
1069  detached = mq->mq_detached;
1070  result = (*ptr != NULL);
1071  SpinLockRelease(&mq->mq_mutex);
1072 
1073  /* Fail if detached; else succeed if initialized. */
1074  if (detached)
1075  {
1076  result = false;
1077  break;
1078  }
1079  if (result)
1080  break;
1081 
1082  if (handle != NULL)
1083  {
1084  /* Check for unexpected worker death. */
1085  status = GetBackgroundWorkerPid(handle, &pid);
1086  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1087  {
1088  result = false;
1089  break;
1090  }
1091  }
1092 
1093  /* Wait to be signalled. */
1095 
1096  /* Reset the latch so we don't spin. */
1098 
1099  /* An interrupt may have occurred while we were waiting. */
1101  }
1102 
1103  return result;
1104 }
1105 
1106 /*
1107  * Get the number of bytes read. The receiver need not use this to access
1108  * the count of bytes read, but the sender must.
1109  */
1110 static uint64
1111 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached)
1112 {
1113  uint64 v;
1114 
1115  SpinLockAcquire(&mq->mq_mutex);
1116  v = mq->mq_bytes_read;
1117  *detached = mq->mq_detached;
1118  SpinLockRelease(&mq->mq_mutex);
1119 
1120  return v;
1121 }
1122 
1123 /*
1124  * Increment the number of bytes read.
1125  */
1126 static void
1128 {
1129  PGPROC *sender;
1130 
1131  SpinLockAcquire(&mq->mq_mutex);
1132  mq->mq_bytes_read += n;
1133  sender = mq->mq_sender;
1134  SpinLockRelease(&mq->mq_mutex);
1135 
1136  /* We shouldn't have any bytes to read without a sender. */
1137  Assert(sender != NULL);
1138  SetLatch(&sender->procLatch);
1139 }
1140 
1141 /*
1142  * Get the number of bytes written. The sender need not use this to access
1143  * the count of bytes written, but the receiver must.
1144  */
1145 static uint64
1146 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached)
1147 {
1148  uint64 v;
1149 
1150  SpinLockAcquire(&mq->mq_mutex);
1151  v = mq->mq_bytes_written;
1152  *detached = mq->mq_detached;
1153  SpinLockRelease(&mq->mq_mutex);
1154 
1155  return v;
1156 }
1157 
1158 /*
1159  * Increment the number of bytes written.
1160  */
1161 static void
1163 {
1164  SpinLockAcquire(&mq->mq_mutex);
1165  mq->mq_bytes_written += n;
1166  SpinLockRelease(&mq->mq_mutex);
1167 }
1168 
1169 /*
1170  * Set sender's latch, unless queue is detached.
1171  */
1172 static shm_mq_result
1174 {
1175  PGPROC *receiver;
1176  bool detached;
1177 
1178  SpinLockAcquire(&mq->mq_mutex);
1179  detached = mq->mq_detached;
1180  receiver = mq->mq_receiver;
1181  SpinLockRelease(&mq->mq_mutex);
1182 
1183  if (detached)
1184  return SHM_MQ_DETACHED;
1185  if (receiver)
1186  SetLatch(&receiver->procLatch);
1187  return SHM_MQ_SUCCESS;
1188 }
1189 
1190 /* Shim for on_dsm_callback. */
1191 static void
1193 {
1194  shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1195 
1196  shm_mq_detach(mq);
1197 }
int slock_t
Definition: s_lock.h:888
Size mqh_partial_bytes
Definition: shm_mq.c:135
PGPROC * MyProc
Definition: proc.c:67
#define PointerGetDatum(X)
Definition: postgres.h:562
#define SpinLockInit(lock)
Definition: spin.h:60
#define Min(x, y)
Definition: c.h:806
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:250
unsigned char uint8
Definition: c.h:266
PGPROC * mq_receiver
Definition: shm_mq.c:72
static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached)
Definition: shm_mq.c:1146
return result
Definition: formatting.c:1618
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:234
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:131
Size mqh_consume_pending
Definition: shm_mq.c:134
void ResetLatch(volatile Latch *latch)
Definition: latch.c:450
bool mqh_counterparty_attached
Definition: shm_mq.c:138
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1037
static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq)
Definition: shm_mq.c:1173
static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition: shm_mq.c:947
const Size shm_mq_minimum_size
Definition: shm_mq.c:158
Latch procLatch
Definition: proc.h:103
char * mqh_buffer
Definition: shm_mq.c:132
Size len
Definition: shm_mq.h:32
void shm_mq_detach(shm_mq *mq)
Definition: shm_mq.c:778
#define SpinLockAcquire(lock)
Definition: spin.h:62
static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1056
uint64 mq_bytes_written
Definition: shm_mq.c:75
bool mqh_length_word_complete
Definition: shm_mq.c:137
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:288
void pfree(void *pointer)
Definition: mcxt.c:950
const char * data
Definition: shm_mq.h:31
shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh)
Definition: shm_mq.c:748
static bool shm_mq_counterparty_gone(volatile shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1009
#define MQH_INITIAL_BUFSIZE
Definition: shm_mq.c:161
slock_t mq_mutex
Definition: shm_mq.c:71
shm_mq * shm_mq_create(void *address, Size size)
Definition: shm_mq.c:167
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition: shm_mq.c:811
static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n)
Definition: shm_mq.c:1162
Size mqh_buflen
Definition: shm_mq.c:133
dsm_segment * mqh_segment
Definition: shm_mq.c:130
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
Size mqh_expected_bytes
Definition: shm_mq.c:136
BgwHandleStatus
Definition: bgworker.h:101
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:215
#define SpinLockRelease(lock)
Definition: spin.h:64
uintptr_t Datum
Definition: postgres.h:372
shm_mq * mqh_queue
Definition: shm_mq.c:129
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:311
bool mq_detached
Definition: shm_mq.c:77
shm_mq_result
Definition: shm_mq.h:36
#define Max(x, y)
Definition: c.h:800
void SetLatch(volatile Latch *latch)
Definition: latch.c:367
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n)
Definition: shm_mq.c:1127
size_t Size
Definition: c.h:356
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
Definition: shm_mq.c:321
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
Definition: shm_mq.c:347
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
Definition: shm_mq.c:802
#define MAXALIGN(LEN)
Definition: c.h:588
Size mq_ring_size
Definition: shm_mq.c:76
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
Definition: shm_mq.c:79
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:284
#define DatumGetPointer(X)
Definition: postgres.h:555
static StringInfoData tmpbuf
Definition: walsender.c:155
void * palloc(Size size)
Definition: mcxt.c:849
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1192
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
int i
Definition: shm_mq.c:69
void * arg
struct Latch * MyLatch
Definition: globals.c:51
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
uint8 mq_ring_offset
Definition: shm_mq.c:78
MemoryContext mqh_context
Definition: shm_mq.c:139
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:196
uint64 mq_bytes_read
Definition: shm_mq.c:74
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:224
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:518
Definition: proc.h:94
#define WL_LATCH_SET
Definition: latch.h:124
#define offsetof(type, field)
Definition: c.h:555
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1041
static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached)
Definition: shm_mq.c:1111
#define MAXALIGN_DOWN(LEN)
Definition: c.h:600
PGPROC * mq_sender
Definition: shm_mq.c:73