PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
shm_mq.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * shm_mq.c
4  * single-reader, single-writer shared memory message queue
5  *
6  * Both the sender and the receiver must have a PGPROC; their respective
7  * process latches are used for synchronization. Only the sender may send,
8  * and only the receiver may receive. This is intended to allow a user
9  * backend to communicate with worker backends that it has registered.
10  *
11  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  * src/include/storage/shm_mq.h
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "postmaster/bgworker.h"
24 #include "storage/procsignal.h"
25 #include "storage/shm_mq.h"
26 #include "storage/spin.h"
27 
28 /*
29  * This structure represents the actual queue, stored in shared memory.
30  *
31  * Some notes on synchronization:
32  *
33  * mq_receiver and mq_bytes_read can only be changed by the receiver; and
34  * mq_sender and mq_bytes_written can only be changed by the sender. However,
35  * because most of these fields are 8 bytes and we don't assume that 8 byte
36  * reads and writes are atomic, the spinlock must be taken whenever the field
37  * is updated, and whenever it is read by a process other than the one allowed
38  * to modify it. But the process that is allowed to modify it is also allowed
39  * to read it without the lock. On architectures where 8-byte writes are
40  * atomic, we could replace these spinlocks with memory barriers, but
41  * testing found no performance benefit, so it seems best to keep things
42  * simple for now.
43  *
44  * mq_detached can be set by either the sender or the receiver, so the mutex
45  * must be held to read or write it. Memory barriers could be used here as
46  * well, if needed.
47  *
48  * mq_ring_size and mq_ring_offset never change after initialization, and
49  * can therefore be read without the lock.
50  *
51  * Importantly, mq_ring can be safely read and written without a lock. Were
52  * this not the case, we'd have to hold the spinlock for much longer
53  * intervals, and performance might suffer. Fortunately, that's not
54  * necessary. At any given time, the difference between mq_bytes_read and
55  * mq_bytes_written defines the number of bytes within mq_ring that contain
56  * unread data, and mq_bytes_read defines the position where those bytes
57  * begin. The sender can increase the number of unread bytes at any time,
58  * but only the receiver can give license to overwrite those bytes, by
59  * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
60  * the unread bytes it knows to be present without the lock. Conversely,
61  * the sender can write to the unused portion of the ring buffer without
62  * the lock, because nobody else can be reading or writing those bytes. The
63  * receiver could be making more bytes unused by incrementing mq_bytes_read,
64  * but that's OK. Note that it would be unsafe for the receiver to read any
65  * data it's already marked as read, or to write any data; and it would be
66  * unsafe for the sender to reread any data after incrementing
67  * mq_bytes_written, but fortunately there's no need for any of that.
68  */
69 struct shm_mq
70 {
74  uint64 mq_bytes_read;
79  char mq_ring[FLEXIBLE_ARRAY_MEMBER];
80 };
81 
82 /*
83  * This structure is a backend-private handle for access to a queue.
84  *
85  * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
86  * an optional pointer to the dynamic shared memory segment that contains it.
87  * (If mqh_segment is provided, we register an on_dsm_detach callback to
88  * make sure we detach from the queue before detaching from DSM.)
89  *
90  * If this queue is intended to connect the current process with a background
91  * worker that started it, the user can pass a pointer to the worker handle
92  * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
93  * is to allow us to begin sending to or receiving from that queue before the
94  * process we'll be communicating with has even been started. If it fails
95  * to start, the handle will allow us to notice that and fail cleanly, rather
96  * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
97  * simple cases - e.g. where there are just 2 processes communicating; in
98  * more complex scenarios, every process may not have a BackgroundWorkerHandle
99  * available, or may need to watch for the failure of more than one other
100  * process at a time.
101  *
102  * When a message exists as a contiguous chunk of bytes in the queue - that is,
103  * it is smaller than the size of the ring buffer and does not wrap around
104  * the end - we return the message to the caller as a pointer into the buffer.
105  * For messages that are larger or happen to wrap, we reassemble the message
106  * locally by copying the chunks into a backend-local buffer. mqh_buffer is
107  * the buffer, and mqh_buflen is the number of bytes allocated for it.
108  *
109  * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
110  * are used to track the state of non-blocking operations. When the caller
111  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
112  * are expected to retry the call at a later time with the same argument;
113  * we need to retain enough state to pick up where we left off.
114  * mqh_length_word_complete tracks whether we are done sending or receiving
115  * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
116  * the number of bytes read or written for either the length word or the
117  * message itself, and mqh_expected_bytes - which is used only for reads -
118  * tracks the expected total size of the payload.
119  *
120  * mqh_counterparty_attached tracks whether we know the counterparty to have
121  * attached to the queue at some previous point. This lets us avoid some
122  * mutex acquisitions.
123  *
124  * mqh_context is the memory context in effect at the time we attached to
125  * the shm_mq. The shm_mq_handle itself is allocated in this context, and
126  * we make sure any other allocations we do happen in this context as well,
127  * to avoid nasty surprises.
128  */
130 {
134  char *mqh_buffer;
142 };
143 
144 static void shm_mq_detach_internal(shm_mq *mq);
146  const void *data, bool nowait, Size *bytes_written);
147 static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
148  bool nowait, Size *nbytesp, void **datap);
149 static bool shm_mq_counterparty_gone(volatile shm_mq *mq,
150  BackgroundWorkerHandle *handle);
151 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
152  BackgroundWorkerHandle *handle);
153 static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
154 static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n);
155 static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
156 static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n);
157 static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
158 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
159 
160 /* Minimum queue size is enough for header and at least one chunk of data. */
162 MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
163 
164 #define MQH_INITIAL_BUFSIZE 8192
165 
166 /*
167  * Initialize a new shared message queue.
168  */
169 shm_mq *
170 shm_mq_create(void *address, Size size)
171 {
172  shm_mq *mq = address;
173  Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
174 
175  /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
176  size = MAXALIGN_DOWN(size);
177 
178  /* Queue size must be large enough to hold some data. */
179  Assert(size > data_offset);
180 
181  /* Initialize queue header. */
182  SpinLockInit(&mq->mq_mutex);
183  mq->mq_receiver = NULL;
184  mq->mq_sender = NULL;
185  mq->mq_bytes_read = 0;
186  mq->mq_bytes_written = 0;
187  mq->mq_ring_size = size - data_offset;
188  mq->mq_detached = false;
189  mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
190 
191  return mq;
192 }
193 
194 /*
195  * Set the identity of the process that will receive from a shared message
196  * queue.
197  */
198 void
200 {
201  volatile shm_mq *vmq = mq;
202  PGPROC *sender;
203 
205  Assert(vmq->mq_receiver == NULL);
206  vmq->mq_receiver = proc;
207  sender = vmq->mq_sender;
209 
210  if (sender != NULL)
211  SetLatch(&sender->procLatch);
212 }
213 
214 /*
215  * Set the identity of the process that will send to a shared message queue.
216  */
217 void
219 {
220  volatile shm_mq *vmq = mq;
221  PGPROC *receiver;
222 
224  Assert(vmq->mq_sender == NULL);
225  vmq->mq_sender = proc;
226  receiver = vmq->mq_receiver;
228 
229  if (receiver != NULL)
230  SetLatch(&receiver->procLatch);
231 }
232 
233 /*
234  * Get the configured receiver.
235  */
236 PGPROC *
238 {
239  volatile shm_mq *vmq = mq;
240  PGPROC *receiver;
241 
243  receiver = vmq->mq_receiver;
245 
246  return receiver;
247 }
248 
249 /*
250  * Get the configured sender.
251  */
252 PGPROC *
254 {
255  volatile shm_mq *vmq = mq;
256  PGPROC *sender;
257 
259  sender = vmq->mq_sender;
261 
262  return sender;
263 }
264 
265 /*
266  * Attach to a shared message queue so we can send or receive messages.
267  *
268  * The memory context in effect at the time this function is called should
269  * be one which will last for at least as long as the message queue itself.
270  * We'll allocate the handle in that context, and future allocations that
271  * are needed to buffer incoming data will happen in that context as well.
272  *
273  * If seg != NULL, the queue will be automatically detached when that dynamic
274  * shared memory segment is detached.
275  *
276  * If handle != NULL, the queue can be read or written even before the
277  * other process has attached. We'll wait for it to do so if needed. The
278  * handle must be for a background worker initialized with bgw_notify_pid
279  * equal to our PID.
280  *
281  * shm_mq_detach() should be called when done. This will free the
282  * shm_mq_handle and mark the queue itself as detached, so that our
283  * counterpart won't get stuck waiting for us to fill or drain the queue
284  * after we've already lost interest.
285  */
288 {
289  shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
290 
291  Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
292  mqh->mqh_queue = mq;
293  mqh->mqh_segment = seg;
294  mqh->mqh_handle = handle;
295  mqh->mqh_buffer = NULL;
296  mqh->mqh_buflen = 0;
297  mqh->mqh_consume_pending = 0;
298  mqh->mqh_partial_bytes = 0;
299  mqh->mqh_expected_bytes = 0;
300  mqh->mqh_length_word_complete = false;
301  mqh->mqh_counterparty_attached = false;
303 
304  if (seg != NULL)
306 
307  return mqh;
308 }
309 
310 /*
311  * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
312  * been passed to shm_mq_attach.
313  */
314 void
316 {
317  Assert(mqh->mqh_handle == NULL);
318  mqh->mqh_handle = handle;
319 }
320 
321 /*
322  * Write a message into a shared message queue.
323  */
325 shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
326 {
327  shm_mq_iovec iov;
328 
329  iov.data = data;
330  iov.len = nbytes;
331 
332  return shm_mq_sendv(mqh, &iov, 1, nowait);
333 }
334 
335 /*
336  * Write a message into a shared message queue, gathered from multiple
337  * addresses.
338  *
339  * When nowait = false, we'll wait on our process latch when the ring buffer
340  * fills up, and then continue writing once the receiver has drained some data.
341  * The process latch is reset after each wait.
342  *
343  * When nowait = true, we do not manipulate the state of the process latch;
344  * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
345  * this case, the caller should call this function again, with the same
346  * arguments, each time the process latch is set. (Once begun, the sending
347  * of a message cannot be aborted except by detaching from the queue; changing
348  * the length or payload will corrupt the queue.)
349  */
351 shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
352 {
353  shm_mq_result res;
354  shm_mq *mq = mqh->mqh_queue;
355  Size nbytes = 0;
356  Size bytes_written;
357  int i;
358  int which_iov = 0;
359  Size offset;
360 
361  Assert(mq->mq_sender == MyProc);
362 
363  /* Compute total size of write. */
364  for (i = 0; i < iovcnt; ++i)
365  nbytes += iov[i].len;
366 
367  /* Try to write, or finish writing, the length word into the buffer. */
368  while (!mqh->mqh_length_word_complete)
369  {
370  Assert(mqh->mqh_partial_bytes < sizeof(Size));
371  res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
372  ((char *) &nbytes) + mqh->mqh_partial_bytes,
373  nowait, &bytes_written);
374 
375  if (res == SHM_MQ_DETACHED)
376  {
377  /* Reset state in case caller tries to send another message. */
378  mqh->mqh_partial_bytes = 0;
379  mqh->mqh_length_word_complete = false;
380  return res;
381  }
382  mqh->mqh_partial_bytes += bytes_written;
383 
384  if (mqh->mqh_partial_bytes >= sizeof(Size))
385  {
386  Assert(mqh->mqh_partial_bytes == sizeof(Size));
387 
388  mqh->mqh_partial_bytes = 0;
389  mqh->mqh_length_word_complete = true;
390  }
391 
392  if (res != SHM_MQ_SUCCESS)
393  return res;
394 
395  /* Length word can't be split unless bigger than required alignment. */
396  Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
397  }
398 
399  /* Write the actual data bytes into the buffer. */
400  Assert(mqh->mqh_partial_bytes <= nbytes);
401  offset = mqh->mqh_partial_bytes;
402  do
403  {
404  Size chunksize;
405 
406  /* Figure out which bytes need to be sent next. */
407  if (offset >= iov[which_iov].len)
408  {
409  offset -= iov[which_iov].len;
410  ++which_iov;
411  if (which_iov >= iovcnt)
412  break;
413  continue;
414  }
415 
416  /*
417  * We want to avoid copying the data if at all possible, but every
418  * chunk of bytes we write into the queue has to be MAXALIGN'd, except
419  * the last. Thus, if a chunk other than the last one ends on a
420  * non-MAXALIGN'd boundary, we have to combine the tail end of its
421  * data with data from one or more following chunks until we either
422  * reach the last chunk or accumulate a number of bytes which is
423  * MAXALIGN'd.
424  */
425  if (which_iov + 1 < iovcnt &&
426  offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
427  {
428  char tmpbuf[MAXIMUM_ALIGNOF];
429  int j = 0;
430 
431  for (;;)
432  {
433  if (offset < iov[which_iov].len)
434  {
435  tmpbuf[j] = iov[which_iov].data[offset];
436  j++;
437  offset++;
438  if (j == MAXIMUM_ALIGNOF)
439  break;
440  }
441  else
442  {
443  offset -= iov[which_iov].len;
444  which_iov++;
445  if (which_iov >= iovcnt)
446  break;
447  }
448  }
449 
450  res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
451 
452  if (res == SHM_MQ_DETACHED)
453  {
454  /* Reset state in case caller tries to send another message. */
455  mqh->mqh_partial_bytes = 0;
456  mqh->mqh_length_word_complete = false;
457  return res;
458  }
459 
460  mqh->mqh_partial_bytes += bytes_written;
461  if (res != SHM_MQ_SUCCESS)
462  return res;
463  continue;
464  }
465 
466  /*
467  * If this is the last chunk, we can write all the data, even if it
468  * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
469  * MAXALIGN_DOWN the write size.
470  */
471  chunksize = iov[which_iov].len - offset;
472  if (which_iov + 1 < iovcnt)
473  chunksize = MAXALIGN_DOWN(chunksize);
474  res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
475  nowait, &bytes_written);
476 
477  if (res == SHM_MQ_DETACHED)
478  {
479  /* Reset state in case caller tries to send another message. */
480  mqh->mqh_length_word_complete = false;
481  mqh->mqh_partial_bytes = 0;
482  return res;
483  }
484 
485  mqh->mqh_partial_bytes += bytes_written;
486  offset += bytes_written;
487  if (res != SHM_MQ_SUCCESS)
488  return res;
489  } while (mqh->mqh_partial_bytes < nbytes);
490 
491  /* Reset for next message. */
492  mqh->mqh_partial_bytes = 0;
493  mqh->mqh_length_word_complete = false;
494 
495  /* Notify receiver of the newly-written data, and return. */
496  return shm_mq_notify_receiver(mq);
497 }
498 
499 /*
500  * Receive a message from a shared message queue.
501  *
502  * We set *nbytes to the message length and *data to point to the message
503  * payload. If the entire message exists in the queue as a single,
504  * contiguous chunk, *data will point directly into shared memory; otherwise,
505  * it will point to a temporary buffer. This mostly avoids data copying in
506  * the hoped-for case where messages are short compared to the buffer size,
507  * while still allowing longer messages. In either case, the return value
508  * remains valid until the next receive operation is performed on the queue.
509  *
510  * When nowait = false, we'll wait on our process latch when the ring buffer
511  * is empty and we have not yet received a full message. The sender will
512  * set our process latch after more data has been written, and we'll resume
513  * processing. Each call will therefore return a complete message
514  * (unless the sender detaches the queue).
515  *
516  * When nowait = true, we do not manipulate the state of the process latch;
517  * instead, whenever the buffer is empty and we need to read from it, we
518  * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
519  * function again after the process latch has been set.
520  */
522 shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
523 {
524  shm_mq *mq = mqh->mqh_queue;
525  shm_mq_result res;
526  Size rb = 0;
527  Size nbytes;
528  void *rawdata;
529 
530  Assert(mq->mq_receiver == MyProc);
531 
532  /* We can't receive data until the sender has attached. */
533  if (!mqh->mqh_counterparty_attached)
534  {
535  if (nowait)
536  {
537  int counterparty_gone;
538 
539  /*
540  * We shouldn't return at this point at all unless the sender
541  * hasn't attached yet. However, the correct return value depends
542  * on whether the sender is still attached. If we first test
543  * whether the sender has ever attached and then test whether the
544  * sender has detached, there's a race condition: a sender that
545  * attaches and detaches very quickly might fool us into thinking
546  * the sender never attached at all. So, test whether our
547  * counterparty is definitively gone first, and only afterwards
548  * check whether the sender ever attached in the first place.
549  */
550  counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
551  if (shm_mq_get_sender(mq) == NULL)
552  {
553  if (counterparty_gone)
554  return SHM_MQ_DETACHED;
555  else
556  return SHM_MQ_WOULD_BLOCK;
557  }
558  }
559  else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
560  && shm_mq_get_sender(mq) == NULL)
561  {
562  mq->mq_detached = true;
563  return SHM_MQ_DETACHED;
564  }
565  mqh->mqh_counterparty_attached = true;
566  }
567 
568  /* Consume any zero-copy data from previous receive operation. */
569  if (mqh->mqh_consume_pending > 0)
570  {
572  mqh->mqh_consume_pending = 0;
573  }
574 
575  /* Try to read, or finish reading, the length word from the buffer. */
576  while (!mqh->mqh_length_word_complete)
577  {
578  /* Try to receive the message length word. */
579  Assert(mqh->mqh_partial_bytes < sizeof(Size));
580  res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes,
581  nowait, &rb, &rawdata);
582  if (res != SHM_MQ_SUCCESS)
583  return res;
584 
585  /*
586  * Hopefully, we'll receive the entire message length word at once.
587  * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
588  * multiple reads.
589  */
590  if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
591  {
592  Size needed;
593 
594  nbytes = *(Size *) rawdata;
595 
596  /* If we've already got the whole message, we're done. */
597  needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
598  if (rb >= needed)
599  {
600  /*
601  * Technically, we could consume the message length
602  * information at this point, but the extra write to shared
603  * memory wouldn't be free and in most cases we would reap no
604  * benefit.
605  */
606  mqh->mqh_consume_pending = needed;
607  *nbytesp = nbytes;
608  *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
609  return SHM_MQ_SUCCESS;
610  }
611 
612  /*
613  * We don't have the whole message, but we at least have the whole
614  * length word.
615  */
616  mqh->mqh_expected_bytes = nbytes;
617  mqh->mqh_length_word_complete = true;
618  shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size)));
619  rb -= MAXALIGN(sizeof(Size));
620  }
621  else
622  {
623  Size lengthbytes;
624 
625  /* Can't be split unless bigger than required alignment. */
626  Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
627 
628  /* Message word is split; need buffer to reassemble. */
629  if (mqh->mqh_buffer == NULL)
630  {
634  }
635  Assert(mqh->mqh_buflen >= sizeof(Size));
636 
637  /* Copy and consume partial length word. */
638  if (mqh->mqh_partial_bytes + rb > sizeof(Size))
639  lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
640  else
641  lengthbytes = rb;
642  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
643  lengthbytes);
644  mqh->mqh_partial_bytes += lengthbytes;
645  shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes));
646  rb -= lengthbytes;
647 
648  /* If we now have the whole word, we're ready to read payload. */
649  if (mqh->mqh_partial_bytes >= sizeof(Size))
650  {
651  Assert(mqh->mqh_partial_bytes == sizeof(Size));
652  mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
653  mqh->mqh_length_word_complete = true;
654  mqh->mqh_partial_bytes = 0;
655  }
656  }
657  }
658  nbytes = mqh->mqh_expected_bytes;
659 
660  if (mqh->mqh_partial_bytes == 0)
661  {
662  /*
663  * Try to obtain the whole message in a single chunk. If this works,
664  * we need not copy the data and can return a pointer directly into
665  * shared memory.
666  */
667  res = shm_mq_receive_bytes(mq, nbytes, nowait, &rb, &rawdata);
668  if (res != SHM_MQ_SUCCESS)
669  return res;
670  if (rb >= nbytes)
671  {
672  mqh->mqh_length_word_complete = false;
673  mqh->mqh_consume_pending = MAXALIGN(nbytes);
674  *nbytesp = nbytes;
675  *datap = rawdata;
676  return SHM_MQ_SUCCESS;
677  }
678 
679  /*
680  * The message has wrapped the buffer. We'll need to copy it in order
681  * to return it to the client in one chunk. First, make sure we have
682  * a large enough buffer available.
683  */
684  if (mqh->mqh_buflen < nbytes)
685  {
686  Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
687 
688  while (newbuflen < nbytes)
689  newbuflen *= 2;
690 
691  if (mqh->mqh_buffer != NULL)
692  {
693  pfree(mqh->mqh_buffer);
694  mqh->mqh_buffer = NULL;
695  mqh->mqh_buflen = 0;
696  }
697  mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
698  mqh->mqh_buflen = newbuflen;
699  }
700  }
701 
702  /* Loop until we've copied the entire message. */
703  for (;;)
704  {
705  Size still_needed;
706 
707  /* Copy as much as we can. */
708  Assert(mqh->mqh_partial_bytes + rb <= nbytes);
709  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
710  mqh->mqh_partial_bytes += rb;
711 
712  /*
713  * Update count of bytes read, with alignment padding. Note that this
714  * will never actually insert any padding except at the end of a
715  * message, because the buffer size is a multiple of MAXIMUM_ALIGNOF,
716  * and each read and write is as well.
717  */
718  Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
720 
721  /* If we got all the data, exit the loop. */
722  if (mqh->mqh_partial_bytes >= nbytes)
723  break;
724 
725  /* Wait for some more data. */
726  still_needed = nbytes - mqh->mqh_partial_bytes;
727  res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
728  if (res != SHM_MQ_SUCCESS)
729  return res;
730  if (rb > still_needed)
731  rb = still_needed;
732  }
733 
734  /* Return the complete message, and reset for next message. */
735  *nbytesp = nbytes;
736  *datap = mqh->mqh_buffer;
737  mqh->mqh_length_word_complete = false;
738  mqh->mqh_partial_bytes = 0;
739  return SHM_MQ_SUCCESS;
740 }
741 
742 /*
743  * Wait for the other process that's supposed to use this queue to attach
744  * to it.
745  *
746  * The return value is SHM_MQ_DETACHED if the worker has already detached or
747  * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
748  * Note that we will only be able to detect that the worker has died before
749  * attaching if a background worker handle was passed to shm_mq_attach().
750  */
753 {
754  shm_mq *mq = mqh->mqh_queue;
755  PGPROC **victim;
756 
757  if (shm_mq_get_receiver(mq) == MyProc)
758  victim = &mq->mq_sender;
759  else
760  {
762  victim = &mq->mq_receiver;
763  }
764 
765  if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
766  return SHM_MQ_SUCCESS;
767  else
768  return SHM_MQ_DETACHED;
769 }
770 
771 /*
772  * Detach from a shared message queue, and destroy the shm_mq_handle.
773  */
774 void
776 {
777  /* Notify counterparty that we're outta here. */
779 
780  /* Cancel on_dsm_detach callback, if any. */
781  if (mqh->mqh_segment)
784  PointerGetDatum(mqh->mqh_queue));
785 
786  /* Release local memory associated with handle. */
787  if (mqh->mqh_buffer != NULL)
788  pfree(mqh->mqh_buffer);
789  pfree(mqh);
790 }
791 
792 /*
793  * Notify counterparty that we're detaching from shared message queue.
794  *
795  * The purpose of this function is to make sure that the process
796  * with which we're communicating doesn't block forever waiting for us to
797  * fill or drain the queue once we've lost interest. When the sender
798  * detaches, the receiver can read any messages remaining in the queue;
799  * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
800  * further attempts to send messages will likewise return SHM_MQ_DETACHED.
801  *
802  * This is separated out from shm_mq_detach() because if the on_dsm_detach
803  * callback fires, we only want to do this much. We do not try to touch
804  * the local shm_mq_handle, as it may have been pfree'd already.
805  */
806 static void
808 {
809  volatile shm_mq *vmq = mq;
810  PGPROC *victim;
811 
813  if (vmq->mq_sender == MyProc)
814  victim = vmq->mq_receiver;
815  else
816  {
817  Assert(vmq->mq_receiver == MyProc);
818  victim = vmq->mq_sender;
819  }
820  vmq->mq_detached = true;
822 
823  if (victim != NULL)
824  SetLatch(&victim->procLatch);
825 }
826 
827 /*
828  * Get the shm_mq from handle.
829  */
830 shm_mq *
832 {
833  return mqh->mqh_queue;
834 }
835 
836 /*
837  * Write bytes into a shared message queue.
838  */
839 static shm_mq_result
840 shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
841  bool nowait, Size *bytes_written)
842 {
843  shm_mq *mq = mqh->mqh_queue;
844  Size sent = 0;
845  uint64 used;
846  Size ringsize = mq->mq_ring_size;
847  Size available;
848 
849  while (sent < nbytes)
850  {
851  bool detached;
852  uint64 rb;
853 
854  /* Compute number of ring buffer bytes used and available. */
855  rb = shm_mq_get_bytes_read(mq, &detached);
856  Assert(mq->mq_bytes_written >= rb);
857  used = mq->mq_bytes_written - rb;
858  Assert(used <= ringsize);
859  available = Min(ringsize - used, nbytes - sent);
860 
861  /* Bail out if the queue has been detached. */
862  if (detached)
863  {
864  *bytes_written = sent;
865  return SHM_MQ_DETACHED;
866  }
867 
868  if (available == 0 && !mqh->mqh_counterparty_attached)
869  {
870  /*
871  * The queue is full, so if the receiver isn't yet known to be
872  * attached, we must wait for that to happen.
873  */
874  if (nowait)
875  {
876  if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
877  {
878  *bytes_written = sent;
879  return SHM_MQ_DETACHED;
880  }
881  if (shm_mq_get_receiver(mq) == NULL)
882  {
883  *bytes_written = sent;
884  return SHM_MQ_WOULD_BLOCK;
885  }
886  }
887  else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
888  mqh->mqh_handle))
889  {
890  mq->mq_detached = true;
891  *bytes_written = sent;
892  return SHM_MQ_DETACHED;
893  }
894  mqh->mqh_counterparty_attached = true;
895 
896  /*
897  * The receiver may have read some data after attaching, so we
898  * must not wait without rechecking the queue state.
899  */
900  }
901  else if (available == 0)
902  {
903  shm_mq_result res;
904 
905  /* Let the receiver know that we need them to read some data. */
906  res = shm_mq_notify_receiver(mq);
907  if (res != SHM_MQ_SUCCESS)
908  {
909  *bytes_written = sent;
910  return res;
911  }
912 
913  /* Skip manipulation of our latch if nowait = true. */
914  if (nowait)
915  {
916  *bytes_written = sent;
917  return SHM_MQ_WOULD_BLOCK;
918  }
919 
920  /*
921  * Wait for our latch to be set. It might already be set for some
922  * unrelated reason, but that'll just result in one extra trip
923  * through the loop. It's worth it to avoid resetting the latch
924  * at top of loop, because setting an already-set latch is much
925  * cheaper than setting one that has been reset.
926  */
928 
929  /* Reset the latch so we don't spin. */
931 
932  /* An interrupt may have occurred while we were waiting. */
934  }
935  else
936  {
937  Size offset = mq->mq_bytes_written % (uint64) ringsize;
938  Size sendnow = Min(available, ringsize - offset);
939 
940  /* Write as much data as we can via a single memcpy(). */
941  memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
942  (char *) data + sent, sendnow);
943  sent += sendnow;
944 
945  /*
946  * Update count of bytes written, with alignment padding. Note
947  * that this will never actually insert any padding except at the
948  * end of a run of bytes, because the buffer size is a multiple of
949  * MAXIMUM_ALIGNOF, and each read is as well.
950  */
951  Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
952  shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
953 
954  /*
955  * For efficiency, we don't set the reader's latch here. We'll do
956  * that only when the buffer fills up or after writing an entire
957  * message.
958  */
959  }
960  }
961 
962  *bytes_written = sent;
963  return SHM_MQ_SUCCESS;
964 }
965 
966 /*
967  * Wait until at least *nbytesp bytes are available to be read from the
968  * shared message queue, or until the buffer wraps around. If the queue is
969  * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
970  * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
971  * to the location at which data bytes can be read, *nbytesp is set to the
972  * number of bytes which can be read at that address, and the return value
973  * is SHM_MQ_SUCCESS.
974  */
975 static shm_mq_result
976 shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
977  Size *nbytesp, void **datap)
978 {
979  Size ringsize = mq->mq_ring_size;
980  uint64 used;
981  uint64 written;
982 
983  for (;;)
984  {
985  Size offset;
986  bool detached;
987 
988  /* Get bytes written, so we can compute what's available to read. */
989  written = shm_mq_get_bytes_written(mq, &detached);
990  used = written - mq->mq_bytes_read;
991  Assert(used <= ringsize);
992  offset = mq->mq_bytes_read % (uint64) ringsize;
993 
994  /* If we have enough data or buffer has wrapped, we're done. */
995  if (used >= bytes_needed || offset + used >= ringsize)
996  {
997  *nbytesp = Min(used, ringsize - offset);
998  *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
999  return SHM_MQ_SUCCESS;
1000  }
1001 
1002  /*
1003  * Fall out before waiting if the queue has been detached.
1004  *
1005  * Note that we don't check for this until *after* considering whether
1006  * the data already available is enough, since the receiver can finish
1007  * receiving a message stored in the buffer even after the sender has
1008  * detached.
1009  */
1010  if (detached)
1011  return SHM_MQ_DETACHED;
1012 
1013  /* Skip manipulation of our latch if nowait = true. */
1014  if (nowait)
1015  return SHM_MQ_WOULD_BLOCK;
1016 
1017  /*
1018  * Wait for our latch to be set. It might already be set for some
1019  * unrelated reason, but that'll just result in one extra trip through
1020  * the loop. It's worth it to avoid resetting the latch at top of
1021  * loop, because setting an already-set latch is much cheaper than
1022  * setting one that has been reset.
1023  */
1025 
1026  /* Reset the latch so we don't spin. */
1028 
1029  /* An interrupt may have occurred while we were waiting. */
1031  }
1032 }
1033 
1034 /*
1035  * Test whether a counterparty who may not even be alive yet is definitely gone.
1036  */
1037 static bool
1039 {
1040  bool detached;
1041  pid_t pid;
1042 
1043  /* Acquire the lock just long enough to check the pointer. */
1044  SpinLockAcquire(&mq->mq_mutex);
1045  detached = mq->mq_detached;
1046  SpinLockRelease(&mq->mq_mutex);
1047 
1048  /* If the queue has been detached, counterparty is definitely gone. */
1049  if (detached)
1050  return true;
1051 
1052  /* If there's a handle, check worker status. */
1053  if (handle != NULL)
1054  {
1056 
1057  /* Check for unexpected worker death. */
1058  status = GetBackgroundWorkerPid(handle, &pid);
1059  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1060  {
1061  /* Mark it detached, just to make it official. */
1062  SpinLockAcquire(&mq->mq_mutex);
1063  mq->mq_detached = true;
1064  SpinLockRelease(&mq->mq_mutex);
1065  return true;
1066  }
1067  }
1068 
1069  /* Counterparty is not definitively gone. */
1070  return false;
1071 }
1072 
1073 /*
1074  * This is used when a process is waiting for its counterpart to attach to the
1075  * queue. We exit when the other process attaches as expected, or, if
1076  * handle != NULL, when the referenced background process or the postmaster
1077  * dies. Note that if handle == NULL, and the process fails to attach, we'll
1078  * potentially get stuck here forever waiting for a process that may never
1079  * start. We do check for interrupts, though.
1080  *
1081  * ptr is a pointer to the memory address that we're expecting to become
1082  * non-NULL when our counterpart attaches to the queue.
1083  */
1084 static bool
1085 shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
1086  BackgroundWorkerHandle *handle)
1087 {
1088  bool result = false;
1089 
1090  for (;;)
1091  {
1093  pid_t pid;
1094  bool detached;
1095 
1096  /* Acquire the lock just long enough to check the pointer. */
1097  SpinLockAcquire(&mq->mq_mutex);
1098  detached = mq->mq_detached;
1099  result = (*ptr != NULL);
1100  SpinLockRelease(&mq->mq_mutex);
1101 
1102  /* Fail if detached; else succeed if initialized. */
1103  if (detached)
1104  {
1105  result = false;
1106  break;
1107  }
1108  if (result)
1109  break;
1110 
1111  if (handle != NULL)
1112  {
1113  /* Check for unexpected worker death. */
1114  status = GetBackgroundWorkerPid(handle, &pid);
1115  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1116  {
1117  result = false;
1118  break;
1119  }
1120  }
1121 
1122  /* Wait to be signalled. */
1124 
1125  /* Reset the latch so we don't spin. */
1127 
1128  /* An interrupt may have occurred while we were waiting. */
1130  }
1131 
1132  return result;
1133 }
1134 
1135 /*
1136  * Get the number of bytes read. The receiver need not use this to access
1137  * the count of bytes read, but the sender must.
1138  */
1139 static uint64
1140 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached)
1141 {
1142  uint64 v;
1143 
1144  SpinLockAcquire(&mq->mq_mutex);
1145  v = mq->mq_bytes_read;
1146  *detached = mq->mq_detached;
1147  SpinLockRelease(&mq->mq_mutex);
1148 
1149  return v;
1150 }
1151 
1152 /*
1153  * Increment the number of bytes read.
1154  */
1155 static void
1157 {
1158  PGPROC *sender;
1159 
1160  SpinLockAcquire(&mq->mq_mutex);
1161  mq->mq_bytes_read += n;
1162  sender = mq->mq_sender;
1163  SpinLockRelease(&mq->mq_mutex);
1164 
1165  /* We shouldn't have any bytes to read without a sender. */
1166  Assert(sender != NULL);
1167  SetLatch(&sender->procLatch);
1168 }
1169 
1170 /*
1171  * Get the number of bytes written. The sender need not use this to access
1172  * the count of bytes written, but the receiver must.
1173  */
1174 static uint64
1175 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached)
1176 {
1177  uint64 v;
1178 
1179  SpinLockAcquire(&mq->mq_mutex);
1180  v = mq->mq_bytes_written;
1181  *detached = mq->mq_detached;
1182  SpinLockRelease(&mq->mq_mutex);
1183 
1184  return v;
1185 }
1186 
1187 /*
1188  * Increment the number of bytes written.
1189  */
1190 static void
1192 {
1193  SpinLockAcquire(&mq->mq_mutex);
1194  mq->mq_bytes_written += n;
1195  SpinLockRelease(&mq->mq_mutex);
1196 }
1197 
1198 /*
1199  * Set receiver's latch, unless queue is detached.
1200  */
1201 static shm_mq_result
1203 {
1204  PGPROC *receiver;
1205  bool detached;
1206 
1207  SpinLockAcquire(&mq->mq_mutex);
1208  detached = mq->mq_detached;
1209  receiver = mq->mq_receiver;
1210  SpinLockRelease(&mq->mq_mutex);
1211 
1212  if (detached)
1213  return SHM_MQ_DETACHED;
1214  if (receiver)
1215  SetLatch(&receiver->procLatch);
1216  return SHM_MQ_SUCCESS;
1217 }
1218 
1219 /* Shim for on_dsm_callback. */
1220 static void
1222 {
1223  shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1224 
1226 }
int slock_t
Definition: s_lock.h:888
Size mqh_partial_bytes
Definition: shm_mq.c:137
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:775
PGPROC * MyProc
Definition: proc.c:67
#define PointerGetDatum(X)
Definition: postgres.h:562
#define SpinLockInit(lock)
Definition: spin.h:60
#define Min(x, y)
Definition: c.h:812
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:253
unsigned char uint8
Definition: c.h:256
PGPROC * mq_receiver
Definition: shm_mq.c:72
static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached)
Definition: shm_mq.c:1175
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:237
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:133
Size mqh_consume_pending
Definition: shm_mq.c:136
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
bool mqh_counterparty_attached
Definition: shm_mq.c:140
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1037
static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq)
Definition: shm_mq.c:1202
static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition: shm_mq.c:976
const Size shm_mq_minimum_size
Definition: shm_mq.c:161
Latch procLatch
Definition: proc.h:104
char * mqh_buffer
Definition: shm_mq.c:134
Size len
Definition: shm_mq.h:32
#define SpinLockAcquire(lock)
Definition: spin.h:62
static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1085
uint64 mq_bytes_written
Definition: shm_mq.c:75
bool mqh_length_word_complete
Definition: shm_mq.c:139
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:336
void pfree(void *pointer)
Definition: mcxt.c:949
const char * data
Definition: shm_mq.h:31
shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh)
Definition: shm_mq.c:752
static bool shm_mq_counterparty_gone(volatile shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1038
#define MQH_INITIAL_BUFSIZE
Definition: shm_mq.c:164
slock_t mq_mutex
Definition: shm_mq.c:71
shm_mq * shm_mq_create(void *address, Size size)
Definition: shm_mq.c:170
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition: shm_mq.c:840
static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n)
Definition: shm_mq.c:1191
Size mqh_buflen
Definition: shm_mq.c:135
dsm_segment * mqh_segment
Definition: shm_mq.c:132
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
Size mqh_expected_bytes
Definition: shm_mq.c:138
BgwHandleStatus
Definition: bgworker.h:102
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:218
static void shm_mq_detach_internal(shm_mq *mq)
Definition: shm_mq.c:807
#define SpinLockRelease(lock)
Definition: spin.h:64
uintptr_t Datum
Definition: postgres.h:372
shm_mq * mqh_queue
Definition: shm_mq.c:131
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:315
bool mq_detached
Definition: shm_mq.c:77
shm_mq_result
Definition: shm_mq.h:36
#define Max(x, y)
Definition: c.h:806
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
#define Assert(condition)
Definition: c.h:681
static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n)
Definition: shm_mq.c:1156
size_t Size
Definition: c.h:350
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
Definition: shm_mq.c:325
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
Definition: shm_mq.c:351
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
Definition: shm_mq.c:831
#define MAXALIGN(LEN)
Definition: c.h:576
Size mq_ring_size
Definition: shm_mq.c:76
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
Definition: shm_mq.c:79
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:287
#define DatumGetPointer(X)
Definition: postgres.h:555
static StringInfoData tmpbuf
Definition: walsender.c:162
void * palloc(Size size)
Definition: mcxt.c:848
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1221
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:706
int i
Definition: shm_mq.c:69
void * arg
struct Latch * MyLatch
Definition: globals.c:52
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
uint8 mq_ring_offset
Definition: shm_mq.c:78
MemoryContext mqh_context
Definition: shm_mq.c:141
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:199
uint64 mq_bytes_read
Definition: shm_mq.c:74
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:225
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:522
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1052
Definition: proc.h:95
#define WL_LATCH_SET
Definition: latch.h:124
#define offsetof(type, field)
Definition: c.h:549
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1047
static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached)
Definition: shm_mq.c:1140
#define MAXALIGN_DOWN(LEN)
Definition: c.h:588
PGPROC * mq_sender
Definition: shm_mq.c:73