PostgreSQL Source Code  git master
shm_mq.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * shm_mq.c
4  * single-reader, single-writer shared memory message queue
5  *
6  * Both the sender and the receiver must have a PGPROC; their respective
7  * process latches are used for synchronization. Only the sender may send,
8  * and only the receiver may receive. This is intended to allow a user
9  * backend to communicate with worker backends that it has registered.
10  *
11  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  * src/backend/storage/ipc/shm_mq.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "port/pg_bitutils.h"
24 #include "postmaster/bgworker.h"
25 #include "storage/procsignal.h"
26 #include "storage/shm_mq.h"
27 #include "storage/spin.h"
28 #include "utils/memutils.h"
29 
30 /*
31  * This structure represents the actual queue, stored in shared memory.
32  *
33  * Some notes on synchronization:
34  *
35  * mq_receiver and mq_bytes_read can only be changed by the receiver; and
36  * mq_sender and mq_bytes_written can only be changed by the sender.
37  * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
38  * they cannot change once set, and thus may be read without a lock once this
39  * is known to be the case.
40  *
41  * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead,
42  * they are written atomically using 8 byte loads and stores. Memory barriers
43  * must be carefully used to synchronize reads and writes of these values with
44  * reads and writes of the actual data in mq_ring.
45  *
46  * mq_detached needs no locking. It can be set by either the sender or the
47  * receiver, but only ever from false to true, so redundant writes don't
48  * matter. It is important that if we set mq_detached and then set the
49  * counterparty's latch, the counterparty must be certain to see the change
50  * after waking up. Since SetLatch begins with a memory barrier and ResetLatch
51  * ends with one, this should be OK.
52  *
53  * mq_ring_size and mq_ring_offset never change after initialization, and
54  * can therefore be read without the lock.
55  *
56  * Importantly, mq_ring can be safely read and written without a lock.
57  * At any given time, the difference between mq_bytes_read and
58  * mq_bytes_written defines the number of bytes within mq_ring that contain
59  * unread data, and mq_bytes_read defines the position where those bytes
60  * begin. The sender can increase the number of unread bytes at any time,
61  * but only the receiver can give license to overwrite those bytes, by
62  * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
63  * the unread bytes it knows to be present without the lock. Conversely,
64  * the sender can write to the unused portion of the ring buffer without
65  * the lock, because nobody else can be reading or writing those bytes. The
66  * receiver could be making more bytes unused by incrementing mq_bytes_read,
67  * but that's OK. Note that it would be unsafe for the receiver to read any
68  * data it's already marked as read, or to write any data; and it would be
69  * unsafe for the sender to reread any data after incrementing
70  * mq_bytes_written, but fortunately there's no need for any of that.
71  */
72 struct shm_mq
73 {
83 };
84 
85 /*
86  * This structure is a backend-private handle for access to a queue.
87  *
88  * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
89  * an optional pointer to the dynamic shared memory segment that contains it.
90  * (If mqh_segment is provided, we register an on_dsm_detach callback to
91  * make sure we detach from the queue before detaching from DSM.)
92  *
93  * If this queue is intended to connect the current process with a background
94  * worker that started it, the user can pass a pointer to the worker handle
95  * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
96  * is to allow us to begin sending to or receiving from that queue before the
97  * process we'll be communicating with has even been started. If it fails
98  * to start, the handle will allow us to notice that and fail cleanly, rather
99  * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
100  * simple cases - e.g. where there are just 2 processes communicating; in
101  * more complex scenarios, every process may not have a BackgroundWorkerHandle
102  * available, or may need to watch for the failure of more than one other
103  * process at a time.
104  *
105  * When a message exists as a contiguous chunk of bytes in the queue - that is,
106  * it is smaller than the size of the ring buffer and does not wrap around
107  * the end - we return the message to the caller as a pointer into the buffer.
108  * For messages that are larger or happen to wrap, we reassemble the message
109  * locally by copying the chunks into a backend-local buffer. mqh_buffer is
110  * the buffer, and mqh_buflen is the number of bytes allocated for it.
111  *
112  * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
113  * are used to track the state of non-blocking operations. When the caller
114  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
115  * are expected to retry the call at a later time with the same argument;
116  * we need to retain enough state to pick up where we left off.
117  * mqh_length_word_complete tracks whether we are done sending or receiving
118  * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
119  * the number of bytes read or written for either the length word or the
120  * message itself, and mqh_expected_bytes - which is used only for reads -
121  * tracks the expected total size of the payload.
122  *
123  * mqh_counterparty_attached tracks whether we know the counterparty to have
124  * attached to the queue at some previous point. This lets us avoid some
125  * mutex acquisitions.
126  *
127  * mqh_context is the memory context in effect at the time we attached to
128  * the shm_mq. The shm_mq_handle itself is allocated in this context, and
129  * we make sure any other allocations we do happen in this context as well,
130  * to avoid nasty surprises.
131  */
133 {
137  char *mqh_buffer;
145 };
146 
147 static void shm_mq_detach_internal(shm_mq *mq);
149  const void *data, bool nowait, Size *bytes_written);
151  Size bytes_needed, bool nowait, Size *nbytesp,
152  void **datap);
153 static bool shm_mq_counterparty_gone(shm_mq *mq,
154  BackgroundWorkerHandle *handle);
155 static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
156  BackgroundWorkerHandle *handle);
157 static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
158 static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
159 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
160 
161 /* Minimum queue size is enough for header and at least one chunk of data. */
163 MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
164 
165 #define MQH_INITIAL_BUFSIZE 8192
166 
167 /*
168  * Initialize a new shared message queue.
169  */
170 shm_mq *
171 shm_mq_create(void *address, Size size)
172 {
173  shm_mq *mq = address;
174  Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
175 
176  /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
177  size = MAXALIGN_DOWN(size);
178 
179  /* Queue size must be large enough to hold some data. */
180  Assert(size > data_offset);
181 
182  /* Initialize queue header. */
183  SpinLockInit(&mq->mq_mutex);
184  mq->mq_receiver = NULL;
185  mq->mq_sender = NULL;
188  mq->mq_ring_size = size - data_offset;
189  mq->mq_detached = false;
190  mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
191 
192  return mq;
193 }
194 
195 /*
196  * Set the identity of the process that will receive from a shared message
197  * queue.
198  */
199 void
201 {
202  PGPROC *sender;
203 
205  Assert(mq->mq_receiver == NULL);
206  mq->mq_receiver = proc;
207  sender = mq->mq_sender;
209 
210  if (sender != NULL)
211  SetLatch(&sender->procLatch);
212 }
213 
214 /*
215  * Set the identity of the process that will send to a shared message queue.
216  */
217 void
219 {
220  PGPROC *receiver;
221 
223  Assert(mq->mq_sender == NULL);
224  mq->mq_sender = proc;
225  receiver = mq->mq_receiver;
227 
228  if (receiver != NULL)
229  SetLatch(&receiver->procLatch);
230 }
231 
232 /*
233  * Get the configured receiver.
234  */
235 PGPROC *
237 {
238  PGPROC *receiver;
239 
241  receiver = mq->mq_receiver;
243 
244  return receiver;
245 }
246 
247 /*
248  * Get the configured sender.
249  */
250 PGPROC *
252 {
253  PGPROC *sender;
254 
256  sender = mq->mq_sender;
258 
259  return sender;
260 }
261 
262 /*
263  * Attach to a shared message queue so we can send or receive messages.
264  *
265  * The memory context in effect at the time this function is called should
266  * be one which will last for at least as long as the message queue itself.
267  * We'll allocate the handle in that context, and future allocations that
268  * are needed to buffer incoming data will happen in that context as well.
269  *
270  * If seg != NULL, the queue will be automatically detached when that dynamic
271  * shared memory segment is detached.
272  *
273  * If handle != NULL, the queue can be read or written even before the
274  * other process has attached. We'll wait for it to do so if needed. The
275  * handle must be for a background worker initialized with bgw_notify_pid
276  * equal to our PID.
277  *
278  * shm_mq_detach() should be called when done. This will free the
279  * shm_mq_handle and mark the queue itself as detached, so that our
280  * counterpart won't get stuck waiting for us to fill or drain the queue
281  * after we've already lost interest.
282  */
285 {
286  shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
287 
288  Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
289  mqh->mqh_queue = mq;
290  mqh->mqh_segment = seg;
291  mqh->mqh_handle = handle;
292  mqh->mqh_buffer = NULL;
293  mqh->mqh_buflen = 0;
294  mqh->mqh_consume_pending = 0;
295  mqh->mqh_partial_bytes = 0;
296  mqh->mqh_expected_bytes = 0;
297  mqh->mqh_length_word_complete = false;
298  mqh->mqh_counterparty_attached = false;
300 
301  if (seg != NULL)
303 
304  return mqh;
305 }
306 
307 /*
308  * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
309  * been passed to shm_mq_attach.
310  */
311 void
313 {
314  Assert(mqh->mqh_handle == NULL);
315  mqh->mqh_handle = handle;
316 }
317 
318 /*
319  * Write a message into a shared message queue.
320  */
322 shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
323 {
324  shm_mq_iovec iov;
325 
326  iov.data = data;
327  iov.len = nbytes;
328 
329  return shm_mq_sendv(mqh, &iov, 1, nowait);
330 }
331 
332 /*
333  * Write a message into a shared message queue, gathered from multiple
334  * addresses.
335  *
336  * When nowait = false, we'll wait on our process latch when the ring buffer
337  * fills up, and then continue writing once the receiver has drained some data.
338  * The process latch is reset after each wait.
339  *
340  * When nowait = true, we do not manipulate the state of the process latch;
341  * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
342  * this case, the caller should call this function again, with the same
343  * arguments, each time the process latch is set. (Once begun, the sending
344  * of a message cannot be aborted except by detaching from the queue; changing
345  * the length or payload will corrupt the queue.)
346  */
348 shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
349 {
350  shm_mq_result res;
351  shm_mq *mq = mqh->mqh_queue;
352  PGPROC *receiver;
353  Size nbytes = 0;
354  Size bytes_written;
355  int i;
356  int which_iov = 0;
357  Size offset;
358 
359  Assert(mq->mq_sender == MyProc);
360 
361  /* Compute total size of write. */
362  for (i = 0; i < iovcnt; ++i)
363  nbytes += iov[i].len;
364 
365  /* Prevent writing messages overwhelming the receiver. */
366  if (nbytes > MaxAllocSize)
367  ereport(ERROR,
368  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
369  errmsg("cannot send a message of size %zu via shared memory queue",
370  nbytes)));
371 
372  /* Try to write, or finish writing, the length word into the buffer. */
373  while (!mqh->mqh_length_word_complete)
374  {
375  Assert(mqh->mqh_partial_bytes < sizeof(Size));
376  res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
377  ((char *) &nbytes) + mqh->mqh_partial_bytes,
378  nowait, &bytes_written);
379 
380  if (res == SHM_MQ_DETACHED)
381  {
382  /* Reset state in case caller tries to send another message. */
383  mqh->mqh_partial_bytes = 0;
384  mqh->mqh_length_word_complete = false;
385  return res;
386  }
387  mqh->mqh_partial_bytes += bytes_written;
388 
389  if (mqh->mqh_partial_bytes >= sizeof(Size))
390  {
391  Assert(mqh->mqh_partial_bytes == sizeof(Size));
392 
393  mqh->mqh_partial_bytes = 0;
394  mqh->mqh_length_word_complete = true;
395  }
396 
397  if (res != SHM_MQ_SUCCESS)
398  return res;
399 
400  /* Length word can't be split unless bigger than required alignment. */
401  Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
402  }
403 
404  /* Write the actual data bytes into the buffer. */
405  Assert(mqh->mqh_partial_bytes <= nbytes);
406  offset = mqh->mqh_partial_bytes;
407  do
408  {
409  Size chunksize;
410 
411  /* Figure out which bytes need to be sent next. */
412  if (offset >= iov[which_iov].len)
413  {
414  offset -= iov[which_iov].len;
415  ++which_iov;
416  if (which_iov >= iovcnt)
417  break;
418  continue;
419  }
420 
421  /*
422  * We want to avoid copying the data if at all possible, but every
423  * chunk of bytes we write into the queue has to be MAXALIGN'd, except
424  * the last. Thus, if a chunk other than the last one ends on a
425  * non-MAXALIGN'd boundary, we have to combine the tail end of its
426  * data with data from one or more following chunks until we either
427  * reach the last chunk or accumulate a number of bytes which is
428  * MAXALIGN'd.
429  */
430  if (which_iov + 1 < iovcnt &&
431  offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
432  {
433  char tmpbuf[MAXIMUM_ALIGNOF];
434  int j = 0;
435 
436  for (;;)
437  {
438  if (offset < iov[which_iov].len)
439  {
440  tmpbuf[j] = iov[which_iov].data[offset];
441  j++;
442  offset++;
443  if (j == MAXIMUM_ALIGNOF)
444  break;
445  }
446  else
447  {
448  offset -= iov[which_iov].len;
449  which_iov++;
450  if (which_iov >= iovcnt)
451  break;
452  }
453  }
454 
455  res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
456 
457  if (res == SHM_MQ_DETACHED)
458  {
459  /* Reset state in case caller tries to send another message. */
460  mqh->mqh_partial_bytes = 0;
461  mqh->mqh_length_word_complete = false;
462  return res;
463  }
464 
465  mqh->mqh_partial_bytes += bytes_written;
466  if (res != SHM_MQ_SUCCESS)
467  return res;
468  continue;
469  }
470 
471  /*
472  * If this is the last chunk, we can write all the data, even if it
473  * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
474  * MAXALIGN_DOWN the write size.
475  */
476  chunksize = iov[which_iov].len - offset;
477  if (which_iov + 1 < iovcnt)
478  chunksize = MAXALIGN_DOWN(chunksize);
479  res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
480  nowait, &bytes_written);
481 
482  if (res == SHM_MQ_DETACHED)
483  {
484  /* Reset state in case caller tries to send another message. */
485  mqh->mqh_length_word_complete = false;
486  mqh->mqh_partial_bytes = 0;
487  return res;
488  }
489 
490  mqh->mqh_partial_bytes += bytes_written;
491  offset += bytes_written;
492  if (res != SHM_MQ_SUCCESS)
493  return res;
494  } while (mqh->mqh_partial_bytes < nbytes);
495 
496  /* Reset for next message. */
497  mqh->mqh_partial_bytes = 0;
498  mqh->mqh_length_word_complete = false;
499 
500  /* If queue has been detached, let caller know. */
501  if (mq->mq_detached)
502  return SHM_MQ_DETACHED;
503 
504  /*
505  * If the counterparty is known to have attached, we can read mq_receiver
506  * without acquiring the spinlock and assume it isn't NULL. Otherwise,
507  * more caution is needed.
508  */
509  if (mqh->mqh_counterparty_attached)
510  receiver = mq->mq_receiver;
511  else
512  {
514  receiver = mq->mq_receiver;
516  if (receiver == NULL)
517  return SHM_MQ_SUCCESS;
518  mqh->mqh_counterparty_attached = true;
519  }
520 
521  /* Notify receiver of the newly-written data, and return. */
522  SetLatch(&receiver->procLatch);
523  return SHM_MQ_SUCCESS;
524 }
525 
526 /*
527  * Receive a message from a shared message queue.
528  *
529  * We set *nbytes to the message length and *data to point to the message
530  * payload. If the entire message exists in the queue as a single,
531  * contiguous chunk, *data will point directly into shared memory; otherwise,
532  * it will point to a temporary buffer. This mostly avoids data copying in
533  * the hoped-for case where messages are short compared to the buffer size,
534  * while still allowing longer messages. In either case, the return value
535  * remains valid until the next receive operation is performed on the queue.
536  *
537  * When nowait = false, we'll wait on our process latch when the ring buffer
538  * is empty and we have not yet received a full message. The sender will
539  * set our process latch after more data has been written, and we'll resume
540  * processing. Each call will therefore return a complete message
541  * (unless the sender detaches the queue).
542  *
543  * When nowait = true, we do not manipulate the state of the process latch;
544  * instead, whenever the buffer is empty and we need to read from it, we
545  * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
546  * function again after the process latch has been set.
547  */
549 shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
550 {
551  shm_mq *mq = mqh->mqh_queue;
552  shm_mq_result res;
553  Size rb = 0;
554  Size nbytes;
555  void *rawdata;
556 
557  Assert(mq->mq_receiver == MyProc);
558 
559  /* We can't receive data until the sender has attached. */
560  if (!mqh->mqh_counterparty_attached)
561  {
562  if (nowait)
563  {
564  int counterparty_gone;
565 
566  /*
567  * We shouldn't return at this point at all unless the sender
568  * hasn't attached yet. However, the correct return value depends
569  * on whether the sender is still attached. If we first test
570  * whether the sender has ever attached and then test whether the
571  * sender has detached, there's a race condition: a sender that
572  * attaches and detaches very quickly might fool us into thinking
573  * the sender never attached at all. So, test whether our
574  * counterparty is definitively gone first, and only afterwards
575  * check whether the sender ever attached in the first place.
576  */
577  counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
578  if (shm_mq_get_sender(mq) == NULL)
579  {
580  if (counterparty_gone)
581  return SHM_MQ_DETACHED;
582  else
583  return SHM_MQ_WOULD_BLOCK;
584  }
585  }
586  else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
587  && shm_mq_get_sender(mq) == NULL)
588  {
589  mq->mq_detached = true;
590  return SHM_MQ_DETACHED;
591  }
592  mqh->mqh_counterparty_attached = true;
593  }
594 
595  /*
596  * If we've consumed an amount of data greater than 1/4th of the ring
597  * size, mark it consumed in shared memory. We try to avoid doing this
598  * unnecessarily when only a small amount of data has been consumed,
599  * because SetLatch() is fairly expensive and we don't want to do it too
600  * often.
601  */
602  if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
603  {
605  mqh->mqh_consume_pending = 0;
606  }
607 
608  /* Try to read, or finish reading, the length word from the buffer. */
609  while (!mqh->mqh_length_word_complete)
610  {
611  /* Try to receive the message length word. */
612  Assert(mqh->mqh_partial_bytes < sizeof(Size));
613  res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
614  nowait, &rb, &rawdata);
615  if (res != SHM_MQ_SUCCESS)
616  return res;
617 
618  /*
619  * Hopefully, we'll receive the entire message length word at once.
620  * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
621  * multiple reads.
622  */
623  if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
624  {
625  Size needed;
626 
627  nbytes = *(Size *) rawdata;
628 
629  /* If we've already got the whole message, we're done. */
630  needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
631  if (rb >= needed)
632  {
633  mqh->mqh_consume_pending += needed;
634  *nbytesp = nbytes;
635  *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
636  return SHM_MQ_SUCCESS;
637  }
638 
639  /*
640  * We don't have the whole message, but we at least have the whole
641  * length word.
642  */
643  mqh->mqh_expected_bytes = nbytes;
644  mqh->mqh_length_word_complete = true;
645  mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
646  rb -= MAXALIGN(sizeof(Size));
647  }
648  else
649  {
650  Size lengthbytes;
651 
652  /* Can't be split unless bigger than required alignment. */
653  Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
654 
655  /* Message word is split; need buffer to reassemble. */
656  if (mqh->mqh_buffer == NULL)
657  {
661  }
662  Assert(mqh->mqh_buflen >= sizeof(Size));
663 
664  /* Copy partial length word; remember to consume it. */
665  if (mqh->mqh_partial_bytes + rb > sizeof(Size))
666  lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
667  else
668  lengthbytes = rb;
669  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
670  lengthbytes);
671  mqh->mqh_partial_bytes += lengthbytes;
672  mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
673  rb -= lengthbytes;
674 
675  /* If we now have the whole word, we're ready to read payload. */
676  if (mqh->mqh_partial_bytes >= sizeof(Size))
677  {
678  Assert(mqh->mqh_partial_bytes == sizeof(Size));
679  mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
680  mqh->mqh_length_word_complete = true;
681  mqh->mqh_partial_bytes = 0;
682  }
683  }
684  }
685  nbytes = mqh->mqh_expected_bytes;
686 
687  /*
688  * Should be disallowed on the sending side already, but better check and
689  * error out on the receiver side as well rather than trying to read a
690  * prohibitively large message.
691  */
692  if (nbytes > MaxAllocSize)
693  ereport(ERROR,
694  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
695  errmsg("invalid message size %zu in shared memory queue",
696  nbytes)));
697 
698  if (mqh->mqh_partial_bytes == 0)
699  {
700  /*
701  * Try to obtain the whole message in a single chunk. If this works,
702  * we need not copy the data and can return a pointer directly into
703  * shared memory.
704  */
705  res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
706  if (res != SHM_MQ_SUCCESS)
707  return res;
708  if (rb >= nbytes)
709  {
710  mqh->mqh_length_word_complete = false;
711  mqh->mqh_consume_pending += MAXALIGN(nbytes);
712  *nbytesp = nbytes;
713  *datap = rawdata;
714  return SHM_MQ_SUCCESS;
715  }
716 
717  /*
718  * The message has wrapped the buffer. We'll need to copy it in order
719  * to return it to the client in one chunk. First, make sure we have
720  * a large enough buffer available.
721  */
722  if (mqh->mqh_buflen < nbytes)
723  {
724  Size newbuflen;
725 
726  /*
727  * Increase size to the next power of 2 that's >= nbytes, but
728  * limit to MaxAllocSize.
729  */
730  newbuflen = pg_nextpower2_size_t(nbytes);
731  newbuflen = Min(newbuflen, MaxAllocSize);
732 
733  if (mqh->mqh_buffer != NULL)
734  {
735  pfree(mqh->mqh_buffer);
736  mqh->mqh_buffer = NULL;
737  mqh->mqh_buflen = 0;
738  }
739  mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
740  mqh->mqh_buflen = newbuflen;
741  }
742  }
743 
744  /* Loop until we've copied the entire message. */
745  for (;;)
746  {
747  Size still_needed;
748 
749  /* Copy as much as we can. */
750  Assert(mqh->mqh_partial_bytes + rb <= nbytes);
751  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
752  mqh->mqh_partial_bytes += rb;
753 
754  /*
755  * Update count of bytes that can be consumed, accounting for
756  * alignment padding. Note that this will never actually insert any
757  * padding except at the end of a message, because the buffer size is
758  * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
759  */
760  Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
761  mqh->mqh_consume_pending += MAXALIGN(rb);
762 
763  /* If we got all the data, exit the loop. */
764  if (mqh->mqh_partial_bytes >= nbytes)
765  break;
766 
767  /* Wait for some more data. */
768  still_needed = nbytes - mqh->mqh_partial_bytes;
769  res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
770  if (res != SHM_MQ_SUCCESS)
771  return res;
772  if (rb > still_needed)
773  rb = still_needed;
774  }
775 
776  /* Return the complete message, and reset for next message. */
777  *nbytesp = nbytes;
778  *datap = mqh->mqh_buffer;
779  mqh->mqh_length_word_complete = false;
780  mqh->mqh_partial_bytes = 0;
781  return SHM_MQ_SUCCESS;
782 }
783 
784 /*
785  * Wait for the other process that's supposed to use this queue to attach
786  * to it.
787  *
788  * The return value is SHM_MQ_DETACHED if the worker has already detached or
789  * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
790  * Note that we will only be able to detect that the worker has died before
791  * attaching if a background worker handle was passed to shm_mq_attach().
792  */
795 {
796  shm_mq *mq = mqh->mqh_queue;
797  PGPROC **victim;
798 
799  if (shm_mq_get_receiver(mq) == MyProc)
800  victim = &mq->mq_sender;
801  else
802  {
804  victim = &mq->mq_receiver;
805  }
806 
807  if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
808  return SHM_MQ_SUCCESS;
809  else
810  return SHM_MQ_DETACHED;
811 }
812 
813 /*
814  * Detach from a shared message queue, and destroy the shm_mq_handle.
815  */
816 void
818 {
819  /* Notify counterparty that we're outta here. */
821 
822  /* Cancel on_dsm_detach callback, if any. */
823  if (mqh->mqh_segment)
826  PointerGetDatum(mqh->mqh_queue));
827 
828  /* Release local memory associated with handle. */
829  if (mqh->mqh_buffer != NULL)
830  pfree(mqh->mqh_buffer);
831  pfree(mqh);
832 }
833 
834 /*
835  * Notify counterparty that we're detaching from shared message queue.
836  *
837  * The purpose of this function is to make sure that the process
838  * with which we're communicating doesn't block forever waiting for us to
839  * fill or drain the queue once we've lost interest. When the sender
840  * detaches, the receiver can read any messages remaining in the queue;
841  * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
842  * further attempts to send messages will likewise return SHM_MQ_DETACHED.
843  *
844  * This is separated out from shm_mq_detach() because if the on_dsm_detach
845  * callback fires, we only want to do this much. We do not try to touch
846  * the local shm_mq_handle, as it may have been pfree'd already.
847  */
848 static void
850 {
851  PGPROC *victim;
852 
854  if (mq->mq_sender == MyProc)
855  victim = mq->mq_receiver;
856  else
857  {
858  Assert(mq->mq_receiver == MyProc);
859  victim = mq->mq_sender;
860  }
861  mq->mq_detached = true;
863 
864  if (victim != NULL)
865  SetLatch(&victim->procLatch);
866 }
867 
868 /*
869  * Get the shm_mq from handle.
870  */
871 shm_mq *
873 {
874  return mqh->mqh_queue;
875 }
876 
877 /*
878  * Write bytes into a shared message queue.
879  */
880 static shm_mq_result
881 shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
882  bool nowait, Size *bytes_written)
883 {
884  shm_mq *mq = mqh->mqh_queue;
885  Size sent = 0;
886  uint64 used;
887  Size ringsize = mq->mq_ring_size;
888  Size available;
889 
890  while (sent < nbytes)
891  {
892  uint64 rb;
893  uint64 wb;
894 
895  /* Compute number of ring buffer bytes used and available. */
898  Assert(wb >= rb);
899  used = wb - rb;
900  Assert(used <= ringsize);
901  available = Min(ringsize - used, nbytes - sent);
902 
903  /*
904  * Bail out if the queue has been detached. Note that we would be in
905  * trouble if the compiler decided to cache the value of
906  * mq->mq_detached in a register or on the stack across loop
907  * iterations. It probably shouldn't do that anyway since we'll
908  * always return, call an external function that performs a system
909  * call, or reach a memory barrier at some point later in the loop,
910  * but just to be sure, insert a compiler barrier here.
911  */
913  if (mq->mq_detached)
914  {
915  *bytes_written = sent;
916  return SHM_MQ_DETACHED;
917  }
918 
919  if (available == 0 && !mqh->mqh_counterparty_attached)
920  {
921  /*
922  * The queue is full, so if the receiver isn't yet known to be
923  * attached, we must wait for that to happen.
924  */
925  if (nowait)
926  {
927  if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
928  {
929  *bytes_written = sent;
930  return SHM_MQ_DETACHED;
931  }
932  if (shm_mq_get_receiver(mq) == NULL)
933  {
934  *bytes_written = sent;
935  return SHM_MQ_WOULD_BLOCK;
936  }
937  }
938  else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
939  mqh->mqh_handle))
940  {
941  mq->mq_detached = true;
942  *bytes_written = sent;
943  return SHM_MQ_DETACHED;
944  }
945  mqh->mqh_counterparty_attached = true;
946 
947  /*
948  * The receiver may have read some data after attaching, so we
949  * must not wait without rechecking the queue state.
950  */
951  }
952  else if (available == 0)
953  {
954  /*
955  * Since mq->mqh_counterparty_attached is known to be true at this
956  * point, mq_receiver has been set, and it can't change once set.
957  * Therefore, we can read it without acquiring the spinlock.
958  */
961 
962  /* Skip manipulation of our latch if nowait = true. */
963  if (nowait)
964  {
965  *bytes_written = sent;
966  return SHM_MQ_WOULD_BLOCK;
967  }
968 
969  /*
970  * Wait for our latch to be set. It might already be set for some
971  * unrelated reason, but that'll just result in one extra trip
972  * through the loop. It's worth it to avoid resetting the latch
973  * at top of loop, because setting an already-set latch is much
974  * cheaper than setting one that has been reset.
975  */
978 
979  /* Reset the latch so we don't spin. */
981 
982  /* An interrupt may have occurred while we were waiting. */
984  }
985  else
986  {
987  Size offset;
988  Size sendnow;
989 
990  offset = wb % (uint64) ringsize;
991  sendnow = Min(available, ringsize - offset);
992 
993  /*
994  * Write as much data as we can via a single memcpy(). Make sure
995  * these writes happen after the read of mq_bytes_read, above.
996  * This barrier pairs with the one in shm_mq_inc_bytes_read.
997  * (Since we're separating the read of mq_bytes_read from a
998  * subsequent write to mq_ring, we need a full barrier here.)
999  */
1001  memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
1002  (char *) data + sent, sendnow);
1003  sent += sendnow;
1004 
1005  /*
1006  * Update count of bytes written, with alignment padding. Note
1007  * that this will never actually insert any padding except at the
1008  * end of a run of bytes, because the buffer size is a multiple of
1009  * MAXIMUM_ALIGNOF, and each read is as well.
1010  */
1011  Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
1012  shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
1013 
1014  /*
1015  * For efficiency, we don't set the reader's latch here. We'll do
1016  * that only when the buffer fills up or after writing an entire
1017  * message.
1018  */
1019  }
1020  }
1021 
1022  *bytes_written = sent;
1023  return SHM_MQ_SUCCESS;
1024 }
1025 
1026 /*
1027  * Wait until at least *nbytesp bytes are available to be read from the
1028  * shared message queue, or until the buffer wraps around. If the queue is
1029  * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
1030  * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
1031  * to the location at which data bytes can be read, *nbytesp is set to the
1032  * number of bytes which can be read at that address, and the return value
1033  * is SHM_MQ_SUCCESS.
1034  */
1035 static shm_mq_result
1036 shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
1037  Size *nbytesp, void **datap)
1038 {
1039  shm_mq *mq = mqh->mqh_queue;
1040  Size ringsize = mq->mq_ring_size;
1041  uint64 used;
1042  uint64 written;
1043 
1044  for (;;)
1045  {
1046  Size offset;
1047  uint64 read;
1048 
1049  /* Get bytes written, so we can compute what's available to read. */
1050  written = pg_atomic_read_u64(&mq->mq_bytes_written);
1051 
1052  /*
1053  * Get bytes read. Include bytes we could consume but have not yet
1054  * consumed.
1055  */
1056  read = pg_atomic_read_u64(&mq->mq_bytes_read) +
1057  mqh->mqh_consume_pending;
1058  used = written - read;
1059  Assert(used <= ringsize);
1060  offset = read % (uint64) ringsize;
1061 
1062  /* If we have enough data or buffer has wrapped, we're done. */
1063  if (used >= bytes_needed || offset + used >= ringsize)
1064  {
1065  *nbytesp = Min(used, ringsize - offset);
1066  *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1067 
1068  /*
1069  * Separate the read of mq_bytes_written, above, from caller's
1070  * attempt to read the data itself. Pairs with the barrier in
1071  * shm_mq_inc_bytes_written.
1072  */
1073  pg_read_barrier();
1074  return SHM_MQ_SUCCESS;
1075  }
1076 
1077  /*
1078  * Fall out before waiting if the queue has been detached.
1079  *
1080  * Note that we don't check for this until *after* considering whether
1081  * the data already available is enough, since the receiver can finish
1082  * receiving a message stored in the buffer even after the sender has
1083  * detached.
1084  */
1085  if (mq->mq_detached)
1086  {
1087  /*
1088  * If the writer advanced mq_bytes_written and then set
1089  * mq_detached, we might not have read the final value of
1090  * mq_bytes_written above. Insert a read barrier and then check
1091  * again if mq_bytes_written has advanced.
1092  */
1093  pg_read_barrier();
1094  if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
1095  continue;
1096 
1097  return SHM_MQ_DETACHED;
1098  }
1099 
1100  /*
1101  * We didn't get enough data to satisfy the request, so mark any data
1102  * previously-consumed as read to make more buffer space.
1103  */
1104  if (mqh->mqh_consume_pending > 0)
1105  {
1107  mqh->mqh_consume_pending = 0;
1108  }
1109 
1110  /* Skip manipulation of our latch if nowait = true. */
1111  if (nowait)
1112  return SHM_MQ_WOULD_BLOCK;
1113 
1114  /*
1115  * Wait for our latch to be set. It might already be set for some
1116  * unrelated reason, but that'll just result in one extra trip through
1117  * the loop. It's worth it to avoid resetting the latch at top of
1118  * loop, because setting an already-set latch is much cheaper than
1119  * setting one that has been reset.
1120  */
1123 
1124  /* Reset the latch so we don't spin. */
1126 
1127  /* An interrupt may have occurred while we were waiting. */
1129  }
1130 }
1131 
1132 /*
1133  * Test whether a counterparty who may not even be alive yet is definitely gone.
1134  */
1135 static bool
1137 {
1138  pid_t pid;
1139 
1140  /* If the queue has been detached, counterparty is definitely gone. */
1141  if (mq->mq_detached)
1142  return true;
1143 
1144  /* If there's a handle, check worker status. */
1145  if (handle != NULL)
1146  {
1148 
1149  /* Check for unexpected worker death. */
1150  status = GetBackgroundWorkerPid(handle, &pid);
1151  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1152  {
1153  /* Mark it detached, just to make it official. */
1154  mq->mq_detached = true;
1155  return true;
1156  }
1157  }
1158 
1159  /* Counterparty is not definitively gone. */
1160  return false;
1161 }
1162 
1163 /*
1164  * This is used when a process is waiting for its counterpart to attach to the
1165  * queue. We exit when the other process attaches as expected, or, if
1166  * handle != NULL, when the referenced background process or the postmaster
1167  * dies. Note that if handle == NULL, and the process fails to attach, we'll
1168  * potentially get stuck here forever waiting for a process that may never
1169  * start. We do check for interrupts, though.
1170  *
1171  * ptr is a pointer to the memory address that we're expecting to become
1172  * non-NULL when our counterpart attaches to the queue.
1173  */
1174 static bool
1176 {
1177  bool result = false;
1178 
1179  for (;;)
1180  {
1182  pid_t pid;
1183 
1184  /* Acquire the lock just long enough to check the pointer. */
1185  SpinLockAcquire(&mq->mq_mutex);
1186  result = (*ptr != NULL);
1187  SpinLockRelease(&mq->mq_mutex);
1188 
1189  /* Fail if detached; else succeed if initialized. */
1190  if (mq->mq_detached)
1191  {
1192  result = false;
1193  break;
1194  }
1195  if (result)
1196  break;
1197 
1198  if (handle != NULL)
1199  {
1200  /* Check for unexpected worker death. */
1201  status = GetBackgroundWorkerPid(handle, &pid);
1202  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1203  {
1204  result = false;
1205  break;
1206  }
1207  }
1208 
1209  /* Wait to be signaled. */
1212 
1213  /* Reset the latch so we don't spin. */
1215 
1216  /* An interrupt may have occurred while we were waiting. */
1218  }
1219 
1220  return result;
1221 }
1222 
1223 /*
1224  * Increment the number of bytes read.
1225  */
1226 static void
1228 {
1229  PGPROC *sender;
1230 
1231  /*
1232  * Separate prior reads of mq_ring from the increment of mq_bytes_read
1233  * which follows. This pairs with the full barrier in
1234  * shm_mq_send_bytes(). We only need a read barrier here because the
1235  * increment of mq_bytes_read is actually a read followed by a dependent
1236  * write.
1237  */
1238  pg_read_barrier();
1239 
1240  /*
1241  * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1242  * else can be changing this value. This method should be cheaper.
1243  */
1245  pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1246 
1247  /*
1248  * We shouldn't have any bytes to read without a sender, so we can read
1249  * mq_sender here without a lock. Once it's initialized, it can't change.
1250  */
1251  sender = mq->mq_sender;
1252  Assert(sender != NULL);
1253  SetLatch(&sender->procLatch);
1254 }
1255 
1256 /*
1257  * Increment the number of bytes written.
1258  */
1259 static void
1261 {
1262  /*
1263  * Separate prior reads of mq_ring from the write of mq_bytes_written
1264  * which we're about to do. Pairs with the read barrier found in
1265  * shm_mq_receive_bytes.
1266  */
1267  pg_write_barrier();
1268 
1269  /*
1270  * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1271  * else can be changing this value. This method avoids taking the bus
1272  * lock unnecessarily.
1273  */
1276 }
1277 
1278 /* Shim for on_dsm_detach callback. */
1279 static void
1281 {
1282  shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1283 
1285 }
int slock_t
Definition: s_lock.h:934
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
Definition: shm_mq.c:1260
Size mqh_partial_bytes
Definition: shm_mq.c:140
#define pg_nextpower2_size_t(num)
Definition: pg_bitutils.h:191
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:817
PGPROC * MyProc
Definition: proc.c:68
#define PointerGetDatum(X)
Definition: postgres.h:600
#define SpinLockInit(lock)
Definition: spin.h:60
#define Min(x, y)
Definition: c.h:986
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:251
unsigned char uint8
Definition: c.h:439
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:350
int errcode(int sqlerrcode)
Definition: elog.c:698
PGPROC * mq_receiver
Definition: shm_mq.c:75
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:236
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:136
Size mqh_consume_pending
Definition: shm_mq.c:139
void SetLatch(Latch *latch)
Definition: latch.c:567
bool mqh_counterparty_attached
Definition: shm_mq.c:143
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1096
void ResetLatch(Latch *latch)
Definition: latch.c:660
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
const Size shm_mq_minimum_size
Definition: shm_mq.c:162
Latch procLatch
Definition: proc.h:130
char * mqh_buffer
Definition: shm_mq.c:137
Size len
Definition: shm_mq.h:32
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:452
#define pg_compiler_barrier()
Definition: atomics.h:133
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1175
#define SpinLockAcquire(lock)
Definition: spin.h:62
bool mqh_length_word_complete
Definition: shm_mq.c:142
void pfree(void *pointer)
Definition: mcxt.c:1169
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:415
const char * data
Definition: shm_mq.h:31
#define ERROR
Definition: elog.h:46
shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh)
Definition: shm_mq.c:794
#define MQH_INITIAL_BUFSIZE
Definition: shm_mq.c:165
slock_t mq_mutex
Definition: shm_mq.c:74
shm_mq * shm_mq_create(void *address, Size size)
Definition: shm_mq.c:171
Size mqh_buflen
Definition: shm_mq.c:138
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition: shm_mq.c:1036
dsm_segment * mqh_segment
Definition: shm_mq.c:135
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
Size mqh_expected_bytes
Definition: shm_mq.c:141
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
Definition: shm_mq.c:1227
BgwHandleStatus
Definition: bgworker.h:102
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition: shm_mq.c:881
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:218
static void shm_mq_detach_internal(shm_mq *mq)
Definition: shm_mq.c:849
#define MaxAllocSize
Definition: memutils.h:40
#define SpinLockRelease(lock)
Definition: spin.h:64
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1136
uintptr_t Datum
Definition: postgres.h:411
shm_mq * mqh_queue
Definition: shm_mq.c:134
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:312
bool mq_detached
Definition: shm_mq.c:80
#define ereport(elevel,...)
Definition: elog.h:157
shm_mq_result
Definition: shm_mq.h:36
#define pg_memory_barrier()
Definition: atomics.h:145
#define Assert(condition)
Definition: c.h:804
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:77
#define pg_read_barrier()
Definition: atomics.h:158
size_t Size
Definition: c.h:540
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
Definition: shm_mq.c:322
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
Definition: shm_mq.c:348
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
Definition: shm_mq.c:872
#define MAXALIGN(LEN)
Definition: c.h:757
Size mq_ring_size
Definition: shm_mq.c:79
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
Definition: shm_mq.c:82
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:284
#define DatumGetPointer(X)
Definition: postgres.h:593
static StringInfoData tmpbuf
Definition: walsender.c:159
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
void * palloc(Size size)
Definition: mcxt.c:1062
int errmsg(const char *fmt,...)
Definition: elog.c:909
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1280
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
int i
Definition: shm_mq.c:72
#define pg_write_barrier()
Definition: atomics.h:159
void * arg
struct Latch * MyLatch
Definition: globals.c:57
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
uint8 mq_ring_offset
Definition: shm_mq.c:81
MemoryContext mqh_context
Definition: shm_mq.c:144
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:200
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:227
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:549
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1111
Definition: proc.h:121
#define WL_LATCH_SET
Definition: latch.h:125
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:78
#define read(a, b, c)
Definition: win32.h:13
#define offsetof(type, field)
Definition: c.h:727
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1085
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define MAXALIGN_DOWN(LEN)
Definition: c.h:769
PGPROC * mq_sender
Definition: shm_mq.c:76