PostgreSQL Source Code  git master
shm_mq.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * shm_mq.c
4  * single-reader, single-writer shared memory message queue
5  *
6  * Both the sender and the receiver must have a PGPROC; their respective
7  * process latches are used for synchronization. Only the sender may send,
8  * and only the receiver may receive. This is intended to allow a user
9  * backend to communicate with worker backends that it has registered.
10  *
11  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  * src/backend/storage/ipc/shm_mq.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "postmaster/bgworker.h"
24 #include "storage/procsignal.h"
25 #include "storage/shm_mq.h"
26 #include "storage/spin.h"
27 #include "utils/memutils.h"
28 
29 /*
30  * This structure represents the actual queue, stored in shared memory.
31  *
32  * Some notes on synchronization:
33  *
34  * mq_receiver and mq_bytes_read can only be changed by the receiver; and
35  * mq_sender and mq_bytes_written can only be changed by the sender.
36  * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
37  * they cannot change once set, and thus may be read without a lock once this
38  * is known to be the case.
39  *
40  * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead,
41  * they are written atomically using 8 byte loads and stores. Memory barriers
42  * must be carefully used to synchronize reads and writes of these values with
43  * reads and writes of the actual data in mq_ring.
44  *
45  * mq_detached needs no locking. It can be set by either the sender or the
46  * receiver, but only ever from false to true, so redundant writes don't
47  * matter. It is important that if we set mq_detached and then set the
48  * counterparty's latch, the counterparty must be certain to see the change
49  * after waking up. Since SetLatch begins with a memory barrier and ResetLatch
50  * ends with one, this should be OK.
51  *
52  * mq_ring_size and mq_ring_offset never change after initialization, and
53  * can therefore be read without the lock.
54  *
55  * Importantly, mq_ring can be safely read and written without a lock.
56  * At any given time, the difference between mq_bytes_read and
57  * mq_bytes_written defines the number of bytes within mq_ring that contain
58  * unread data, and mq_bytes_read defines the position where those bytes
59  * begin. The sender can increase the number of unread bytes at any time,
60  * but only the receiver can give license to overwrite those bytes, by
61  * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
62  * the unread bytes it knows to be present without the lock. Conversely,
63  * the sender can write to the unused portion of the ring buffer without
64  * the lock, because nobody else can be reading or writing those bytes. The
65  * receiver could be making more bytes unused by incrementing mq_bytes_read,
66  * but that's OK. Note that it would be unsafe for the receiver to read any
67  * data it's already marked as read, or to write any data; and it would be
68  * unsafe for the sender to reread any data after incrementing
69  * mq_bytes_written, but fortunately there's no need for any of that.
70  */
71 struct shm_mq
72 {
82 };
83 
84 /*
85  * This structure is a backend-private handle for access to a queue.
86  *
87  * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
88  * an optional pointer to the dynamic shared memory segment that contains it.
89  * (If mqh_segment is provided, we register an on_dsm_detach callback to
90  * make sure we detach from the queue before detaching from DSM.)
91  *
92  * If this queue is intended to connect the current process with a background
93  * worker that started it, the user can pass a pointer to the worker handle
94  * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
95  * is to allow us to begin sending to or receiving from that queue before the
96  * process we'll be communicating with has even been started. If it fails
97  * to start, the handle will allow us to notice that and fail cleanly, rather
98  * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
99  * simple cases - e.g. where there are just 2 processes communicating; in
100  * more complex scenarios, every process may not have a BackgroundWorkerHandle
101  * available, or may need to watch for the failure of more than one other
102  * process at a time.
103  *
104  * When a message exists as a contiguous chunk of bytes in the queue - that is,
105  * it is smaller than the size of the ring buffer and does not wrap around
106  * the end - we return the message to the caller as a pointer into the buffer.
107  * For messages that are larger or happen to wrap, we reassemble the message
108  * locally by copying the chunks into a backend-local buffer. mqh_buffer is
109  * the buffer, and mqh_buflen is the number of bytes allocated for it.
110  *
111  * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
112  * are used to track the state of non-blocking operations. When the caller
113  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
114  * are expected to retry the call at a later time with the same argument;
115  * we need to retain enough state to pick up where we left off.
116  * mqh_length_word_complete tracks whether we are done sending or receiving
117  * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
118  * the number of bytes read or written for either the length word or the
119  * message itself, and mqh_expected_bytes - which is used only for reads -
120  * tracks the expected total size of the payload.
121  *
122  * mqh_counterparty_attached tracks whether we know the counterparty to have
123  * attached to the queue at some previous point. This lets us avoid some
124  * mutex acquisitions.
125  *
126  * mqh_context is the memory context in effect at the time we attached to
127  * the shm_mq. The shm_mq_handle itself is allocated in this context, and
128  * we make sure any other allocations we do happen in this context as well,
129  * to avoid nasty surprises.
130  */
132 {
136  char *mqh_buffer;
144 };
145 
146 static void shm_mq_detach_internal(shm_mq *mq);
148  const void *data, bool nowait, Size *bytes_written);
150  Size bytes_needed, bool nowait, Size *nbytesp,
151  void **datap);
152 static bool shm_mq_counterparty_gone(shm_mq *mq,
153  BackgroundWorkerHandle *handle);
154 static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
155  BackgroundWorkerHandle *handle);
156 static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
157 static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
158 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
159 
160 /* Minimum queue size is enough for header and at least one chunk of data. */
162 MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
163 
164 #define MQH_INITIAL_BUFSIZE 8192
165 
166 /*
167  * Initialize a new shared message queue.
168  */
169 shm_mq *
170 shm_mq_create(void *address, Size size)
171 {
172  shm_mq *mq = address;
173  Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
174 
175  /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
176  size = MAXALIGN_DOWN(size);
177 
178  /* Queue size must be large enough to hold some data. */
179  Assert(size > data_offset);
180 
181  /* Initialize queue header. */
182  SpinLockInit(&mq->mq_mutex);
183  mq->mq_receiver = NULL;
184  mq->mq_sender = NULL;
187  mq->mq_ring_size = size - data_offset;
188  mq->mq_detached = false;
189  mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
190 
191  return mq;
192 }
193 
194 /*
195  * Set the identity of the process that will receive from a shared message
196  * queue.
197  */
198 void
200 {
201  PGPROC *sender;
202 
204  Assert(mq->mq_receiver == NULL);
205  mq->mq_receiver = proc;
206  sender = mq->mq_sender;
208 
209  if (sender != NULL)
210  SetLatch(&sender->procLatch);
211 }
212 
213 /*
214  * Set the identity of the process that will send to a shared message queue.
215  */
216 void
218 {
219  PGPROC *receiver;
220 
222  Assert(mq->mq_sender == NULL);
223  mq->mq_sender = proc;
224  receiver = mq->mq_receiver;
226 
227  if (receiver != NULL)
228  SetLatch(&receiver->procLatch);
229 }
230 
231 /*
232  * Get the configured receiver.
233  */
234 PGPROC *
236 {
237  PGPROC *receiver;
238 
240  receiver = mq->mq_receiver;
242 
243  return receiver;
244 }
245 
246 /*
247  * Get the configured sender.
248  */
249 PGPROC *
251 {
252  PGPROC *sender;
253 
255  sender = mq->mq_sender;
257 
258  return sender;
259 }
260 
261 /*
262  * Attach to a shared message queue so we can send or receive messages.
263  *
264  * The memory context in effect at the time this function is called should
265  * be one which will last for at least as long as the message queue itself.
266  * We'll allocate the handle in that context, and future allocations that
267  * are needed to buffer incoming data will happen in that context as well.
268  *
269  * If seg != NULL, the queue will be automatically detached when that dynamic
270  * shared memory segment is detached.
271  *
272  * If handle != NULL, the queue can be read or written even before the
273  * other process has attached. We'll wait for it to do so if needed. The
274  * handle must be for a background worker initialized with bgw_notify_pid
275  * equal to our PID.
276  *
277  * shm_mq_detach() should be called when done. This will free the
278  * shm_mq_handle and mark the queue itself as detached, so that our
279  * counterpart won't get stuck waiting for us to fill or drain the queue
280  * after we've already lost interest.
281  */
284 {
285  shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
286 
287  Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
288  mqh->mqh_queue = mq;
289  mqh->mqh_segment = seg;
290  mqh->mqh_handle = handle;
291  mqh->mqh_buffer = NULL;
292  mqh->mqh_buflen = 0;
293  mqh->mqh_consume_pending = 0;
294  mqh->mqh_partial_bytes = 0;
295  mqh->mqh_expected_bytes = 0;
296  mqh->mqh_length_word_complete = false;
297  mqh->mqh_counterparty_attached = false;
299 
300  if (seg != NULL)
302 
303  return mqh;
304 }
305 
306 /*
307  * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
308  * been passed to shm_mq_attach.
309  */
310 void
312 {
313  Assert(mqh->mqh_handle == NULL);
314  mqh->mqh_handle = handle;
315 }
316 
317 /*
318  * Write a message into a shared message queue.
319  */
321 shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
322 {
323  shm_mq_iovec iov;
324 
325  iov.data = data;
326  iov.len = nbytes;
327 
328  return shm_mq_sendv(mqh, &iov, 1, nowait);
329 }
330 
331 /*
332  * Write a message into a shared message queue, gathered from multiple
333  * addresses.
334  *
335  * When nowait = false, we'll wait on our process latch when the ring buffer
336  * fills up, and then continue writing once the receiver has drained some data.
337  * The process latch is reset after each wait.
338  *
339  * When nowait = true, we do not manipulate the state of the process latch;
340  * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
341  * this case, the caller should call this function again, with the same
342  * arguments, each time the process latch is set. (Once begun, the sending
343  * of a message cannot be aborted except by detaching from the queue; changing
344  * the length or payload will corrupt the queue.)
345  */
347 shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
348 {
349  shm_mq_result res;
350  shm_mq *mq = mqh->mqh_queue;
351  PGPROC *receiver;
352  Size nbytes = 0;
353  Size bytes_written;
354  int i;
355  int which_iov = 0;
356  Size offset;
357 
358  Assert(mq->mq_sender == MyProc);
359 
360  /* Compute total size of write. */
361  for (i = 0; i < iovcnt; ++i)
362  nbytes += iov[i].len;
363 
364  /* Prevent writing messages overwhelming the receiver. */
365  if (nbytes > MaxAllocSize)
366  ereport(ERROR,
367  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
368  errmsg("cannot send a message of size %zu via shared memory queue",
369  nbytes)));
370 
371  /* Try to write, or finish writing, the length word into the buffer. */
372  while (!mqh->mqh_length_word_complete)
373  {
374  Assert(mqh->mqh_partial_bytes < sizeof(Size));
375  res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
376  ((char *) &nbytes) + mqh->mqh_partial_bytes,
377  nowait, &bytes_written);
378 
379  if (res == SHM_MQ_DETACHED)
380  {
381  /* Reset state in case caller tries to send another message. */
382  mqh->mqh_partial_bytes = 0;
383  mqh->mqh_length_word_complete = false;
384  return res;
385  }
386  mqh->mqh_partial_bytes += bytes_written;
387 
388  if (mqh->mqh_partial_bytes >= sizeof(Size))
389  {
390  Assert(mqh->mqh_partial_bytes == sizeof(Size));
391 
392  mqh->mqh_partial_bytes = 0;
393  mqh->mqh_length_word_complete = true;
394  }
395 
396  if (res != SHM_MQ_SUCCESS)
397  return res;
398 
399  /* Length word can't be split unless bigger than required alignment. */
400  Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
401  }
402 
403  /* Write the actual data bytes into the buffer. */
404  Assert(mqh->mqh_partial_bytes <= nbytes);
405  offset = mqh->mqh_partial_bytes;
406  do
407  {
408  Size chunksize;
409 
410  /* Figure out which bytes need to be sent next. */
411  if (offset >= iov[which_iov].len)
412  {
413  offset -= iov[which_iov].len;
414  ++which_iov;
415  if (which_iov >= iovcnt)
416  break;
417  continue;
418  }
419 
420  /*
421  * We want to avoid copying the data if at all possible, but every
422  * chunk of bytes we write into the queue has to be MAXALIGN'd, except
423  * the last. Thus, if a chunk other than the last one ends on a
424  * non-MAXALIGN'd boundary, we have to combine the tail end of its
425  * data with data from one or more following chunks until we either
426  * reach the last chunk or accumulate a number of bytes which is
427  * MAXALIGN'd.
428  */
429  if (which_iov + 1 < iovcnt &&
430  offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
431  {
432  char tmpbuf[MAXIMUM_ALIGNOF];
433  int j = 0;
434 
435  for (;;)
436  {
437  if (offset < iov[which_iov].len)
438  {
439  tmpbuf[j] = iov[which_iov].data[offset];
440  j++;
441  offset++;
442  if (j == MAXIMUM_ALIGNOF)
443  break;
444  }
445  else
446  {
447  offset -= iov[which_iov].len;
448  which_iov++;
449  if (which_iov >= iovcnt)
450  break;
451  }
452  }
453 
454  res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
455 
456  if (res == SHM_MQ_DETACHED)
457  {
458  /* Reset state in case caller tries to send another message. */
459  mqh->mqh_partial_bytes = 0;
460  mqh->mqh_length_word_complete = false;
461  return res;
462  }
463 
464  mqh->mqh_partial_bytes += bytes_written;
465  if (res != SHM_MQ_SUCCESS)
466  return res;
467  continue;
468  }
469 
470  /*
471  * If this is the last chunk, we can write all the data, even if it
472  * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
473  * MAXALIGN_DOWN the write size.
474  */
475  chunksize = iov[which_iov].len - offset;
476  if (which_iov + 1 < iovcnt)
477  chunksize = MAXALIGN_DOWN(chunksize);
478  res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
479  nowait, &bytes_written);
480 
481  if (res == SHM_MQ_DETACHED)
482  {
483  /* Reset state in case caller tries to send another message. */
484  mqh->mqh_length_word_complete = false;
485  mqh->mqh_partial_bytes = 0;
486  return res;
487  }
488 
489  mqh->mqh_partial_bytes += bytes_written;
490  offset += bytes_written;
491  if (res != SHM_MQ_SUCCESS)
492  return res;
493  } while (mqh->mqh_partial_bytes < nbytes);
494 
495  /* Reset for next message. */
496  mqh->mqh_partial_bytes = 0;
497  mqh->mqh_length_word_complete = false;
498 
499  /* If queue has been detached, let caller know. */
500  if (mq->mq_detached)
501  return SHM_MQ_DETACHED;
502 
503  /*
504  * If the counterparty is known to have attached, we can read mq_receiver
505  * without acquiring the spinlock and assume it isn't NULL. Otherwise,
506  * more caution is needed.
507  */
508  if (mqh->mqh_counterparty_attached)
509  receiver = mq->mq_receiver;
510  else
511  {
513  receiver = mq->mq_receiver;
515  if (receiver == NULL)
516  return SHM_MQ_SUCCESS;
517  mqh->mqh_counterparty_attached = true;
518  }
519 
520  /* Notify receiver of the newly-written data, and return. */
521  SetLatch(&receiver->procLatch);
522  return SHM_MQ_SUCCESS;
523 }
524 
525 /*
526  * Receive a message from a shared message queue.
527  *
528  * We set *nbytes to the message length and *data to point to the message
529  * payload. If the entire message exists in the queue as a single,
530  * contiguous chunk, *data will point directly into shared memory; otherwise,
531  * it will point to a temporary buffer. This mostly avoids data copying in
532  * the hoped-for case where messages are short compared to the buffer size,
533  * while still allowing longer messages. In either case, the return value
534  * remains valid until the next receive operation is performed on the queue.
535  *
536  * When nowait = false, we'll wait on our process latch when the ring buffer
537  * is empty and we have not yet received a full message. The sender will
538  * set our process latch after more data has been written, and we'll resume
539  * processing. Each call will therefore return a complete message
540  * (unless the sender detaches the queue).
541  *
542  * When nowait = true, we do not manipulate the state of the process latch;
543  * instead, whenever the buffer is empty and we need to read from it, we
544  * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
545  * function again after the process latch has been set.
546  */
548 shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
549 {
550  shm_mq *mq = mqh->mqh_queue;
551  shm_mq_result res;
552  Size rb = 0;
553  Size nbytes;
554  void *rawdata;
555 
556  Assert(mq->mq_receiver == MyProc);
557 
558  /* We can't receive data until the sender has attached. */
559  if (!mqh->mqh_counterparty_attached)
560  {
561  if (nowait)
562  {
563  int counterparty_gone;
564 
565  /*
566  * We shouldn't return at this point at all unless the sender
567  * hasn't attached yet. However, the correct return value depends
568  * on whether the sender is still attached. If we first test
569  * whether the sender has ever attached and then test whether the
570  * sender has detached, there's a race condition: a sender that
571  * attaches and detaches very quickly might fool us into thinking
572  * the sender never attached at all. So, test whether our
573  * counterparty is definitively gone first, and only afterwards
574  * check whether the sender ever attached in the first place.
575  */
576  counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
577  if (shm_mq_get_sender(mq) == NULL)
578  {
579  if (counterparty_gone)
580  return SHM_MQ_DETACHED;
581  else
582  return SHM_MQ_WOULD_BLOCK;
583  }
584  }
585  else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
586  && shm_mq_get_sender(mq) == NULL)
587  {
588  mq->mq_detached = true;
589  return SHM_MQ_DETACHED;
590  }
591  mqh->mqh_counterparty_attached = true;
592  }
593 
594  /*
595  * If we've consumed an amount of data greater than 1/4th of the ring
596  * size, mark it consumed in shared memory. We try to avoid doing this
597  * unnecessarily when only a small amount of data has been consumed,
598  * because SetLatch() is fairly expensive and we don't want to do it too
599  * often.
600  */
601  if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
602  {
604  mqh->mqh_consume_pending = 0;
605  }
606 
607  /* Try to read, or finish reading, the length word from the buffer. */
608  while (!mqh->mqh_length_word_complete)
609  {
610  /* Try to receive the message length word. */
611  Assert(mqh->mqh_partial_bytes < sizeof(Size));
612  res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
613  nowait, &rb, &rawdata);
614  if (res != SHM_MQ_SUCCESS)
615  return res;
616 
617  /*
618  * Hopefully, we'll receive the entire message length word at once.
619  * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
620  * multiple reads.
621  */
622  if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
623  {
624  Size needed;
625 
626  nbytes = *(Size *) rawdata;
627 
628  /* If we've already got the whole message, we're done. */
629  needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
630  if (rb >= needed)
631  {
632  mqh->mqh_consume_pending += needed;
633  *nbytesp = nbytes;
634  *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
635  return SHM_MQ_SUCCESS;
636  }
637 
638  /*
639  * We don't have the whole message, but we at least have the whole
640  * length word.
641  */
642  mqh->mqh_expected_bytes = nbytes;
643  mqh->mqh_length_word_complete = true;
644  mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
645  rb -= MAXALIGN(sizeof(Size));
646  }
647  else
648  {
649  Size lengthbytes;
650 
651  /* Can't be split unless bigger than required alignment. */
652  Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
653 
654  /* Message word is split; need buffer to reassemble. */
655  if (mqh->mqh_buffer == NULL)
656  {
660  }
661  Assert(mqh->mqh_buflen >= sizeof(Size));
662 
663  /* Copy partial length word; remember to consume it. */
664  if (mqh->mqh_partial_bytes + rb > sizeof(Size))
665  lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
666  else
667  lengthbytes = rb;
668  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
669  lengthbytes);
670  mqh->mqh_partial_bytes += lengthbytes;
671  mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
672  rb -= lengthbytes;
673 
674  /* If we now have the whole word, we're ready to read payload. */
675  if (mqh->mqh_partial_bytes >= sizeof(Size))
676  {
677  Assert(mqh->mqh_partial_bytes == sizeof(Size));
678  mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
679  mqh->mqh_length_word_complete = true;
680  mqh->mqh_partial_bytes = 0;
681  }
682  }
683  }
684  nbytes = mqh->mqh_expected_bytes;
685 
686  /*
687  * Should be disallowed on the sending side already, but better check and
688  * error out on the receiver side as well rather than trying to read a
689  * prohibitively large message.
690  */
691  if (nbytes > MaxAllocSize)
692  ereport(ERROR,
693  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
694  errmsg("invalid message size %zu in shared memory queue",
695  nbytes)));
696 
697  if (mqh->mqh_partial_bytes == 0)
698  {
699  /*
700  * Try to obtain the whole message in a single chunk. If this works,
701  * we need not copy the data and can return a pointer directly into
702  * shared memory.
703  */
704  res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
705  if (res != SHM_MQ_SUCCESS)
706  return res;
707  if (rb >= nbytes)
708  {
709  mqh->mqh_length_word_complete = false;
710  mqh->mqh_consume_pending += MAXALIGN(nbytes);
711  *nbytesp = nbytes;
712  *datap = rawdata;
713  return SHM_MQ_SUCCESS;
714  }
715 
716  /*
717  * The message has wrapped the buffer. We'll need to copy it in order
718  * to return it to the client in one chunk. First, make sure we have
719  * a large enough buffer available.
720  */
721  if (mqh->mqh_buflen < nbytes)
722  {
723  Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
724 
725  /*
726  * Double the buffer size until the payload fits, but limit to
727  * MaxAllocSize.
728  */
729  while (newbuflen < nbytes)
730  newbuflen *= 2;
731  newbuflen = Min(newbuflen, MaxAllocSize);
732 
733  if (mqh->mqh_buffer != NULL)
734  {
735  pfree(mqh->mqh_buffer);
736  mqh->mqh_buffer = NULL;
737  mqh->mqh_buflen = 0;
738  }
739  mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
740  mqh->mqh_buflen = newbuflen;
741  }
742  }
743 
744  /* Loop until we've copied the entire message. */
745  for (;;)
746  {
747  Size still_needed;
748 
749  /* Copy as much as we can. */
750  Assert(mqh->mqh_partial_bytes + rb <= nbytes);
751  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
752  mqh->mqh_partial_bytes += rb;
753 
754  /*
755  * Update count of bytes that can be consumed, accounting for
756  * alignment padding. Note that this will never actually insert any
757  * padding except at the end of a message, because the buffer size is
758  * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
759  */
760  Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
761  mqh->mqh_consume_pending += MAXALIGN(rb);
762 
763  /* If we got all the data, exit the loop. */
764  if (mqh->mqh_partial_bytes >= nbytes)
765  break;
766 
767  /* Wait for some more data. */
768  still_needed = nbytes - mqh->mqh_partial_bytes;
769  res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
770  if (res != SHM_MQ_SUCCESS)
771  return res;
772  if (rb > still_needed)
773  rb = still_needed;
774  }
775 
776  /* Return the complete message, and reset for next message. */
777  *nbytesp = nbytes;
778  *datap = mqh->mqh_buffer;
779  mqh->mqh_length_word_complete = false;
780  mqh->mqh_partial_bytes = 0;
781  return SHM_MQ_SUCCESS;
782 }
783 
784 /*
785  * Wait for the other process that's supposed to use this queue to attach
786  * to it.
787  *
788  * The return value is SHM_MQ_DETACHED if the worker has already detached or
789  * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
790  * Note that we will only be able to detect that the worker has died before
791  * attaching if a background worker handle was passed to shm_mq_attach().
792  */
795 {
796  shm_mq *mq = mqh->mqh_queue;
797  PGPROC **victim;
798 
799  if (shm_mq_get_receiver(mq) == MyProc)
800  victim = &mq->mq_sender;
801  else
802  {
804  victim = &mq->mq_receiver;
805  }
806 
807  if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
808  return SHM_MQ_SUCCESS;
809  else
810  return SHM_MQ_DETACHED;
811 }
812 
813 /*
814  * Detach from a shared message queue, and destroy the shm_mq_handle.
815  */
816 void
818 {
819  /* Notify counterparty that we're outta here. */
821 
822  /* Cancel on_dsm_detach callback, if any. */
823  if (mqh->mqh_segment)
826  PointerGetDatum(mqh->mqh_queue));
827 
828  /* Release local memory associated with handle. */
829  if (mqh->mqh_buffer != NULL)
830  pfree(mqh->mqh_buffer);
831  pfree(mqh);
832 }
833 
834 /*
835  * Notify counterparty that we're detaching from shared message queue.
836  *
837  * The purpose of this function is to make sure that the process
838  * with which we're communicating doesn't block forever waiting for us to
839  * fill or drain the queue once we've lost interest. When the sender
840  * detaches, the receiver can read any messages remaining in the queue;
841  * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
842  * further attempts to send messages will likewise return SHM_MQ_DETACHED.
843  *
844  * This is separated out from shm_mq_detach() because if the on_dsm_detach
845  * callback fires, we only want to do this much. We do not try to touch
846  * the local shm_mq_handle, as it may have been pfree'd already.
847  */
848 static void
850 {
851  PGPROC *victim;
852 
854  if (mq->mq_sender == MyProc)
855  victim = mq->mq_receiver;
856  else
857  {
858  Assert(mq->mq_receiver == MyProc);
859  victim = mq->mq_sender;
860  }
861  mq->mq_detached = true;
863 
864  if (victim != NULL)
865  SetLatch(&victim->procLatch);
866 }
867 
868 /*
869  * Get the shm_mq from handle.
870  */
871 shm_mq *
873 {
874  return mqh->mqh_queue;
875 }
876 
877 /*
878  * Write bytes into a shared message queue.
879  */
880 static shm_mq_result
881 shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
882  bool nowait, Size *bytes_written)
883 {
884  shm_mq *mq = mqh->mqh_queue;
885  Size sent = 0;
886  uint64 used;
887  Size ringsize = mq->mq_ring_size;
888  Size available;
889 
890  while (sent < nbytes)
891  {
892  uint64 rb;
893  uint64 wb;
894 
895  /* Compute number of ring buffer bytes used and available. */
898  Assert(wb >= rb);
899  used = wb - rb;
900  Assert(used <= ringsize);
901  available = Min(ringsize - used, nbytes - sent);
902 
903  /*
904  * Bail out if the queue has been detached. Note that we would be in
905  * trouble if the compiler decided to cache the value of
906  * mq->mq_detached in a register or on the stack across loop
907  * iterations. It probably shouldn't do that anyway since we'll
908  * always return, call an external function that performs a system
909  * call, or reach a memory barrier at some point later in the loop,
910  * but just to be sure, insert a compiler barrier here.
911  */
913  if (mq->mq_detached)
914  {
915  *bytes_written = sent;
916  return SHM_MQ_DETACHED;
917  }
918 
919  if (available == 0 && !mqh->mqh_counterparty_attached)
920  {
921  /*
922  * The queue is full, so if the receiver isn't yet known to be
923  * attached, we must wait for that to happen.
924  */
925  if (nowait)
926  {
927  if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
928  {
929  *bytes_written = sent;
930  return SHM_MQ_DETACHED;
931  }
932  if (shm_mq_get_receiver(mq) == NULL)
933  {
934  *bytes_written = sent;
935  return SHM_MQ_WOULD_BLOCK;
936  }
937  }
938  else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
939  mqh->mqh_handle))
940  {
941  mq->mq_detached = true;
942  *bytes_written = sent;
943  return SHM_MQ_DETACHED;
944  }
945  mqh->mqh_counterparty_attached = true;
946 
947  /*
948  * The receiver may have read some data after attaching, so we
949  * must not wait without rechecking the queue state.
950  */
951  }
952  else if (available == 0)
953  {
954  /*
955  * Since mq->mqh_counterparty_attached is known to be true at this
956  * point, mq_receiver has been set, and it can't change once set.
957  * Therefore, we can read it without acquiring the spinlock.
958  */
961 
962  /* Skip manipulation of our latch if nowait = true. */
963  if (nowait)
964  {
965  *bytes_written = sent;
966  return SHM_MQ_WOULD_BLOCK;
967  }
968 
969  /*
970  * Wait for our latch to be set. It might already be set for some
971  * unrelated reason, but that'll just result in one extra trip
972  * through the loop. It's worth it to avoid resetting the latch
973  * at top of loop, because setting an already-set latch is much
974  * cheaper than setting one that has been reset.
975  */
978 
979  /* Reset the latch so we don't spin. */
981 
982  /* An interrupt may have occurred while we were waiting. */
984  }
985  else
986  {
987  Size offset;
988  Size sendnow;
989 
990  offset = wb % (uint64) ringsize;
991  sendnow = Min(available, ringsize - offset);
992 
993  /*
994  * Write as much data as we can via a single memcpy(). Make sure
995  * these writes happen after the read of mq_bytes_read, above.
996  * This barrier pairs with the one in shm_mq_inc_bytes_read.
997  * (Since we're separating the read of mq_bytes_read from a
998  * subsequent write to mq_ring, we need a full barrier here.)
999  */
1001  memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
1002  (char *) data + sent, sendnow);
1003  sent += sendnow;
1004 
1005  /*
1006  * Update count of bytes written, with alignment padding. Note
1007  * that this will never actually insert any padding except at the
1008  * end of a run of bytes, because the buffer size is a multiple of
1009  * MAXIMUM_ALIGNOF, and each read is as well.
1010  */
1011  Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
1012  shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
1013 
1014  /*
1015  * For efficiency, we don't set the reader's latch here. We'll do
1016  * that only when the buffer fills up or after writing an entire
1017  * message.
1018  */
1019  }
1020  }
1021 
1022  *bytes_written = sent;
1023  return SHM_MQ_SUCCESS;
1024 }
1025 
1026 /*
1027  * Wait until at least *nbytesp bytes are available to be read from the
1028  * shared message queue, or until the buffer wraps around. If the queue is
1029  * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
1030  * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
1031  * to the location at which data bytes can be read, *nbytesp is set to the
1032  * number of bytes which can be read at that address, and the return value
1033  * is SHM_MQ_SUCCESS.
1034  */
1035 static shm_mq_result
1036 shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
1037  Size *nbytesp, void **datap)
1038 {
1039  shm_mq *mq = mqh->mqh_queue;
1040  Size ringsize = mq->mq_ring_size;
1041  uint64 used;
1042  uint64 written;
1043 
1044  for (;;)
1045  {
1046  Size offset;
1047  uint64 read;
1048 
1049  /* Get bytes written, so we can compute what's available to read. */
1050  written = pg_atomic_read_u64(&mq->mq_bytes_written);
1051 
1052  /*
1053  * Get bytes read. Include bytes we could consume but have not yet
1054  * consumed.
1055  */
1056  read = pg_atomic_read_u64(&mq->mq_bytes_read) +
1057  mqh->mqh_consume_pending;
1058  used = written - read;
1059  Assert(used <= ringsize);
1060  offset = read % (uint64) ringsize;
1061 
1062  /* If we have enough data or buffer has wrapped, we're done. */
1063  if (used >= bytes_needed || offset + used >= ringsize)
1064  {
1065  *nbytesp = Min(used, ringsize - offset);
1066  *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1067 
1068  /*
1069  * Separate the read of mq_bytes_written, above, from caller's
1070  * attempt to read the data itself. Pairs with the barrier in
1071  * shm_mq_inc_bytes_written.
1072  */
1073  pg_read_barrier();
1074  return SHM_MQ_SUCCESS;
1075  }
1076 
1077  /*
1078  * Fall out before waiting if the queue has been detached.
1079  *
1080  * Note that we don't check for this until *after* considering whether
1081  * the data already available is enough, since the receiver can finish
1082  * receiving a message stored in the buffer even after the sender has
1083  * detached.
1084  */
1085  if (mq->mq_detached)
1086  {
1087  /*
1088  * If the writer advanced mq_bytes_written and then set
1089  * mq_detached, we might not have read the final value of
1090  * mq_bytes_written above. Insert a read barrier and then check
1091  * again if mq_bytes_written has advanced.
1092  */
1093  pg_read_barrier();
1094  if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
1095  continue;
1096 
1097  return SHM_MQ_DETACHED;
1098  }
1099 
1100  /*
1101  * We didn't get enough data to satisfy the request, so mark any data
1102  * previously-consumed as read to make more buffer space.
1103  */
1104  if (mqh->mqh_consume_pending > 0)
1105  {
1107  mqh->mqh_consume_pending = 0;
1108  }
1109 
1110  /* Skip manipulation of our latch if nowait = true. */
1111  if (nowait)
1112  return SHM_MQ_WOULD_BLOCK;
1113 
1114  /*
1115  * Wait for our latch to be set. It might already be set for some
1116  * unrelated reason, but that'll just result in one extra trip through
1117  * the loop. It's worth it to avoid resetting the latch at top of
1118  * loop, because setting an already-set latch is much cheaper than
1119  * setting one that has been reset.
1120  */
1123 
1124  /* Reset the latch so we don't spin. */
1126 
1127  /* An interrupt may have occurred while we were waiting. */
1129  }
1130 }
1131 
1132 /*
1133  * Test whether a counterparty who may not even be alive yet is definitely gone.
1134  */
1135 static bool
1137 {
1138  pid_t pid;
1139 
1140  /* If the queue has been detached, counterparty is definitely gone. */
1141  if (mq->mq_detached)
1142  return true;
1143 
1144  /* If there's a handle, check worker status. */
1145  if (handle != NULL)
1146  {
1148 
1149  /* Check for unexpected worker death. */
1150  status = GetBackgroundWorkerPid(handle, &pid);
1151  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1152  {
1153  /* Mark it detached, just to make it official. */
1154  mq->mq_detached = true;
1155  return true;
1156  }
1157  }
1158 
1159  /* Counterparty is not definitively gone. */
1160  return false;
1161 }
1162 
1163 /*
1164  * This is used when a process is waiting for its counterpart to attach to the
1165  * queue. We exit when the other process attaches as expected, or, if
1166  * handle != NULL, when the referenced background process or the postmaster
1167  * dies. Note that if handle == NULL, and the process fails to attach, we'll
1168  * potentially get stuck here forever waiting for a process that may never
1169  * start. We do check for interrupts, though.
1170  *
1171  * ptr is a pointer to the memory address that we're expecting to become
1172  * non-NULL when our counterpart attaches to the queue.
1173  */
1174 static bool
1176 {
1177  bool result = false;
1178 
1179  for (;;)
1180  {
1182  pid_t pid;
1183 
1184  /* Acquire the lock just long enough to check the pointer. */
1185  SpinLockAcquire(&mq->mq_mutex);
1186  result = (*ptr != NULL);
1187  SpinLockRelease(&mq->mq_mutex);
1188 
1189  /* Fail if detached; else succeed if initialized. */
1190  if (mq->mq_detached)
1191  {
1192  result = false;
1193  break;
1194  }
1195  if (result)
1196  break;
1197 
1198  if (handle != NULL)
1199  {
1200  /* Check for unexpected worker death. */
1201  status = GetBackgroundWorkerPid(handle, &pid);
1202  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1203  {
1204  result = false;
1205  break;
1206  }
1207  }
1208 
1209  /* Wait to be signaled. */
1212 
1213  /* Reset the latch so we don't spin. */
1215 
1216  /* An interrupt may have occurred while we were waiting. */
1218  }
1219 
1220  return result;
1221 }
1222 
1223 /*
1224  * Increment the number of bytes read.
1225  */
1226 static void
1228 {
1229  PGPROC *sender;
1230 
1231  /*
1232  * Separate prior reads of mq_ring from the increment of mq_bytes_read
1233  * which follows. This pairs with the full barrier in
1234  * shm_mq_send_bytes(). We only need a read barrier here because the
1235  * increment of mq_bytes_read is actually a read followed by a dependent
1236  * write.
1237  */
1238  pg_read_barrier();
1239 
1240  /*
1241  * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1242  * else can be changing this value. This method should be cheaper.
1243  */
1245  pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1246 
1247  /*
1248  * We shouldn't have any bytes to read without a sender, so we can read
1249  * mq_sender here without a lock. Once it's initialized, it can't change.
1250  */
1251  sender = mq->mq_sender;
1252  Assert(sender != NULL);
1253  SetLatch(&sender->procLatch);
1254 }
1255 
1256 /*
1257  * Increment the number of bytes written.
1258  */
1259 static void
1261 {
1262  /*
1263  * Separate prior reads of mq_ring from the write of mq_bytes_written
1264  * which we're about to do. Pairs with the read barrier found in
1265  * shm_mq_receive_bytes.
1266  */
1267  pg_write_barrier();
1268 
1269  /*
1270  * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1271  * else can be changing this value. This method avoids taking the bus
1272  * lock unnecessarily.
1273  */
1276 }
1277 
1278 /* Shim for on_dsm_detach callback. */
1279 static void
1281 {
1282  shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1283 
1285 }
int slock_t
Definition: s_lock.h:934
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
Definition: shm_mq.c:1260
Size mqh_partial_bytes
Definition: shm_mq.c:139
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:817
PGPROC * MyProc
Definition: proc.c:68
#define PointerGetDatum(X)
Definition: postgres.h:600
#define SpinLockInit(lock)
Definition: spin.h:60
#define Min(x, y)
Definition: c.h:986
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:250
unsigned char uint8
Definition: c.h:439
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:350
int errcode(int sqlerrcode)
Definition: elog.c:698
PGPROC * mq_receiver
Definition: shm_mq.c:74
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:235
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:135
Size mqh_consume_pending
Definition: shm_mq.c:138
void SetLatch(Latch *latch)
Definition: latch.c:567
bool mqh_counterparty_attached
Definition: shm_mq.c:142
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1096
void ResetLatch(Latch *latch)
Definition: latch.c:660
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
const Size shm_mq_minimum_size
Definition: shm_mq.c:161
Latch procLatch
Definition: proc.h:130
char * mqh_buffer
Definition: shm_mq.c:136
Size len
Definition: shm_mq.h:32
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:452
#define pg_compiler_barrier()
Definition: atomics.h:133
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1175
#define SpinLockAcquire(lock)
Definition: spin.h:62
bool mqh_length_word_complete
Definition: shm_mq.c:141
void pfree(void *pointer)
Definition: mcxt.c:1169
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:415
const char * data
Definition: shm_mq.h:31
#define ERROR
Definition: elog.h:46
shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh)
Definition: shm_mq.c:794
#define MQH_INITIAL_BUFSIZE
Definition: shm_mq.c:164
slock_t mq_mutex
Definition: shm_mq.c:73
shm_mq * shm_mq_create(void *address, Size size)
Definition: shm_mq.c:170
Size mqh_buflen
Definition: shm_mq.c:137
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition: shm_mq.c:1036
dsm_segment * mqh_segment
Definition: shm_mq.c:134
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
Size mqh_expected_bytes
Definition: shm_mq.c:140
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
Definition: shm_mq.c:1227
BgwHandleStatus
Definition: bgworker.h:102
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition: shm_mq.c:881
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:217
static void shm_mq_detach_internal(shm_mq *mq)
Definition: shm_mq.c:849
#define MaxAllocSize
Definition: memutils.h:40
#define SpinLockRelease(lock)
Definition: spin.h:64
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1136
uintptr_t Datum
Definition: postgres.h:411
shm_mq * mqh_queue
Definition: shm_mq.c:133
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:311
bool mq_detached
Definition: shm_mq.c:79
#define ereport(elevel,...)
Definition: elog.h:157
shm_mq_result
Definition: shm_mq.h:36
#define pg_memory_barrier()
Definition: atomics.h:145
#define Max(x, y)
Definition: c.h:980
#define Assert(condition)
Definition: c.h:804
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:76
#define pg_read_barrier()
Definition: atomics.h:158
size_t Size
Definition: c.h:540
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
Definition: shm_mq.c:321
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
Definition: shm_mq.c:347
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
Definition: shm_mq.c:872
#define MAXALIGN(LEN)
Definition: c.h:757
Size mq_ring_size
Definition: shm_mq.c:78
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
Definition: shm_mq.c:81
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:283
#define DatumGetPointer(X)
Definition: postgres.h:593
static StringInfoData tmpbuf
Definition: walsender.c:159
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
void * palloc(Size size)
Definition: mcxt.c:1062
int errmsg(const char *fmt,...)
Definition: elog.c:909
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1280
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
int i
Definition: shm_mq.c:71
#define pg_write_barrier()
Definition: atomics.h:159
void * arg
struct Latch * MyLatch
Definition: globals.c:57
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:102
uint8 mq_ring_offset
Definition: shm_mq.c:80
MemoryContext mqh_context
Definition: shm_mq.c:143
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:199
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:227
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:548
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1111
Definition: proc.h:121
#define WL_LATCH_SET
Definition: latch.h:125
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:77
#define read(a, b, c)
Definition: win32.h:13
#define offsetof(type, field)
Definition: c.h:727
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1074
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define MAXALIGN_DOWN(LEN)
Definition: c.h:769
PGPROC * mq_sender
Definition: shm_mq.c:75