PostgreSQL Source Code  git master
shm_mq.c File Reference
#include "postgres.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "storage/procsignal.h"
#include "storage/shm_mq.h"
#include "storage/spin.h"
#include "utils/memutils.h"
Include dependency graph for shm_mq.c:

Go to the source code of this file.

Data Structures

struct  shm_mq
 
struct  shm_mq_handle
 

Macros

#define MQH_INITIAL_BUFSIZE   8192
 

Functions

static void shm_mq_detach_internal (shm_mq *mq)
 
static shm_mq_result shm_mq_send_bytes (shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
 
static shm_mq_result shm_mq_receive_bytes (shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
 
static bool shm_mq_counterparty_gone (shm_mq *mq, BackgroundWorkerHandle *handle)
 
static bool shm_mq_wait_internal (shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
 
static void shm_mq_inc_bytes_read (shm_mq *mq, Size n)
 
static void shm_mq_inc_bytes_written (shm_mq *mq, Size n)
 
static void shm_mq_detach_callback (dsm_segment *seg, Datum arg)
 
shm_mqshm_mq_create (void *address, Size size)
 
void shm_mq_set_receiver (shm_mq *mq, PGPROC *proc)
 
void shm_mq_set_sender (shm_mq *mq, PGPROC *proc)
 
PGPROCshm_mq_get_receiver (shm_mq *mq)
 
PGPROCshm_mq_get_sender (shm_mq *mq)
 
shm_mq_handleshm_mq_attach (shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 
void shm_mq_set_handle (shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
 
shm_mq_result shm_mq_send (shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
 
shm_mq_result shm_mq_sendv (shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
 
shm_mq_result shm_mq_receive (shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 
shm_mq_result shm_mq_wait_for_attach (shm_mq_handle *mqh)
 
void shm_mq_detach (shm_mq_handle *mqh)
 
shm_mqshm_mq_get_queue (shm_mq_handle *mqh)
 

Variables

const Size shm_mq_minimum_size
 

Macro Definition Documentation

◆ MQH_INITIAL_BUFSIZE

#define MQH_INITIAL_BUFSIZE   8192

Definition at line 164 of file shm_mq.c.

Referenced by shm_mq_receive().

Function Documentation

◆ shm_mq_attach()

shm_mq_handle* shm_mq_attach ( shm_mq mq,
dsm_segment seg,
BackgroundWorkerHandle handle 
)

Definition at line 283 of file shm_mq.c.

References Assert, CurrentMemoryContext, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_expected_bytes, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_segment, MyProc, on_dsm_detach(), palloc(), PointerGetDatum, and shm_mq_detach_callback().

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ParallelWorkerMain(), ReinitializeParallelDSM(), and test_shm_mq_setup().

284 {
285  shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
286 
287  Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
288  mqh->mqh_queue = mq;
289  mqh->mqh_segment = seg;
290  mqh->mqh_handle = handle;
291  mqh->mqh_buffer = NULL;
292  mqh->mqh_buflen = 0;
293  mqh->mqh_consume_pending = 0;
294  mqh->mqh_partial_bytes = 0;
295  mqh->mqh_expected_bytes = 0;
296  mqh->mqh_length_word_complete = false;
297  mqh->mqh_counterparty_attached = false;
299 
300  if (seg != NULL)
302 
303  return mqh;
304 }
Size mqh_partial_bytes
Definition: shm_mq.c:139
PGPROC * MyProc
Definition: proc.c:67
#define PointerGetDatum(X)
Definition: postgres.h:556
PGPROC * mq_receiver
Definition: shm_mq.c:74
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:135
Size mqh_consume_pending
Definition: shm_mq.c:138
bool mqh_counterparty_attached
Definition: shm_mq.c:142
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1091
char * mqh_buffer
Definition: shm_mq.c:136
bool mqh_length_word_complete
Definition: shm_mq.c:141
Size mqh_buflen
Definition: shm_mq.c:137
dsm_segment * mqh_segment
Definition: shm_mq.c:134
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
Size mqh_expected_bytes
Definition: shm_mq.c:140
shm_mq * mqh_queue
Definition: shm_mq.c:133
#define Assert(condition)
Definition: c.h:800
void * palloc(Size size)
Definition: mcxt.c:950
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1280
MemoryContext mqh_context
Definition: shm_mq.c:143
PGPROC * mq_sender
Definition: shm_mq.c:75

◆ shm_mq_counterparty_gone()

static bool shm_mq_counterparty_gone ( shm_mq mq,
BackgroundWorkerHandle handle 
)
static

Definition at line 1136 of file shm_mq.c.

References BGWH_NOT_YET_STARTED, BGWH_STARTED, GetBackgroundWorkerPid(), shm_mq::mq_detached, and status().

Referenced by shm_mq_receive(), and shm_mq_send_bytes().

1137 {
1138  pid_t pid;
1139 
1140  /* If the queue has been detached, counterparty is definitely gone. */
1141  if (mq->mq_detached)
1142  return true;
1143 
1144  /* If there's a handle, check worker status. */
1145  if (handle != NULL)
1146  {
1148 
1149  /* Check for unexpected worker death. */
1150  status = GetBackgroundWorkerPid(handle, &pid);
1151  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1152  {
1153  /* Mark it detached, just to make it official. */
1154  mq->mq_detached = true;
1155  return true;
1156  }
1157  }
1158 
1159  /* Counterparty is not definitively gone. */
1160  return false;
1161 }
BgwHandleStatus
Definition: bgworker.h:102
bool mq_detached
Definition: shm_mq.c:79
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:227
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1030

◆ shm_mq_create()

shm_mq* shm_mq_create ( void *  address,
Size  size 
)

Definition at line 170 of file shm_mq.c.

References Assert, MAXALIGN, MAXALIGN_DOWN, shm_mq::mq_bytes_read, shm_mq::mq_bytes_written, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_ring, shm_mq::mq_ring_offset, shm_mq::mq_ring_size, shm_mq::mq_sender, offsetof, pg_atomic_init_u64(), and SpinLockInit.

Referenced by ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

171 {
172  shm_mq *mq = address;
173  Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
174 
175  /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
176  size = MAXALIGN_DOWN(size);
177 
178  /* Queue size must be large enough to hold some data. */
179  Assert(size > data_offset);
180 
181  /* Initialize queue header. */
182  SpinLockInit(&mq->mq_mutex);
183  mq->mq_receiver = NULL;
184  mq->mq_sender = NULL;
187  mq->mq_ring_size = size - data_offset;
188  mq->mq_detached = false;
189  mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
190 
191  return mq;
192 }
#define SpinLockInit(lock)
Definition: spin.h:60
PGPROC * mq_receiver
Definition: shm_mq.c:74
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:415
slock_t mq_mutex
Definition: shm_mq.c:73
bool mq_detached
Definition: shm_mq.c:79
#define Assert(condition)
Definition: c.h:800
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:76
size_t Size
Definition: c.h:528
#define MAXALIGN(LEN)
Definition: c.h:753
Size mq_ring_size
Definition: shm_mq.c:78
Definition: shm_mq.c:71
uint8 mq_ring_offset
Definition: shm_mq.c:80
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:77
#define offsetof(type, field)
Definition: c.h:723
#define MAXALIGN_DOWN(LEN)
Definition: c.h:765
PGPROC * mq_sender
Definition: shm_mq.c:75

◆ shm_mq_detach()

void shm_mq_detach ( shm_mq_handle mqh)

Definition at line 817 of file shm_mq.c.

References cancel_on_dsm_detach(), shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_segment, pfree(), PointerGetDatum, shm_mq_detach_callback(), and shm_mq_detach_internal().

Referenced by DestroyParallelContext(), ExecParallelFinish(), HandleParallelMessage(), LaunchParallelWorkers(), mq_putmessage(), tqueueDestroyReceiver(), and tqueueShutdownReceiver().

818 {
819  /* Notify counterparty that we're outta here. */
821 
822  /* Cancel on_dsm_detach callback, if any. */
823  if (mqh->mqh_segment)
826  PointerGetDatum(mqh->mqh_queue));
827 
828  /* Release local memory associated with handle. */
829  if (mqh->mqh_buffer != NULL)
830  pfree(mqh->mqh_buffer);
831  pfree(mqh);
832 }
#define PointerGetDatum(X)
Definition: postgres.h:556
char * mqh_buffer
Definition: shm_mq.c:136
void pfree(void *pointer)
Definition: mcxt.c:1057
dsm_segment * mqh_segment
Definition: shm_mq.c:134
static void shm_mq_detach_internal(shm_mq *mq)
Definition: shm_mq.c:849
shm_mq * mqh_queue
Definition: shm_mq.c:133
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1280
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1106

◆ shm_mq_detach_callback()

static void shm_mq_detach_callback ( dsm_segment seg,
Datum  arg 
)
static

Definition at line 1280 of file shm_mq.c.

References DatumGetPointer, and shm_mq_detach_internal().

Referenced by shm_mq_attach(), and shm_mq_detach().

1281 {
1282  shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1283 
1285 }
static void shm_mq_detach_internal(shm_mq *mq)
Definition: shm_mq.c:849
#define DatumGetPointer(X)
Definition: postgres.h:549
Definition: shm_mq.c:71
void * arg

◆ shm_mq_detach_internal()

static void shm_mq_detach_internal ( shm_mq mq)
static

Definition at line 849 of file shm_mq.c.

References Assert, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, MyProc, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_detach(), and shm_mq_detach_callback().

850 {
851  PGPROC *victim;
852 
854  if (mq->mq_sender == MyProc)
855  victim = mq->mq_receiver;
856  else
857  {
858  Assert(mq->mq_receiver == MyProc);
859  victim = mq->mq_sender;
860  }
861  mq->mq_detached = true;
863 
864  if (victim != NULL)
865  SetLatch(&victim->procLatch);
866 }
PGPROC * MyProc
Definition: proc.c:67
PGPROC * mq_receiver
Definition: shm_mq.c:74
void SetLatch(Latch *latch)
Definition: latch.c:505
Latch procLatch
Definition: proc.h:129
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:73
#define SpinLockRelease(lock)
Definition: spin.h:64
bool mq_detached
Definition: shm_mq.c:79
#define Assert(condition)
Definition: c.h:800
Definition: proc.h:120
PGPROC * mq_sender
Definition: shm_mq.c:75

◆ shm_mq_get_queue()

shm_mq* shm_mq_get_queue ( shm_mq_handle mqh)

Definition at line 872 of file shm_mq.c.

References shm_mq_handle::mqh_queue.

Referenced by WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

873 {
874  return mqh->mqh_queue;
875 }
shm_mq * mqh_queue
Definition: shm_mq.c:133

◆ shm_mq_get_receiver()

PGPROC* shm_mq_get_receiver ( shm_mq mq)

Definition at line 235 of file shm_mq.c.

References shm_mq::mq_mutex, shm_mq::mq_receiver, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_send_bytes(), and shm_mq_wait_for_attach().

236 {
237  PGPROC *receiver;
238 
240  receiver = mq->mq_receiver;
242 
243  return receiver;
244 }
PGPROC * mq_receiver
Definition: shm_mq.c:74
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:73
#define SpinLockRelease(lock)
Definition: spin.h:64
Definition: proc.h:120

◆ shm_mq_get_sender()

PGPROC* shm_mq_get_sender ( shm_mq mq)

Definition at line 250 of file shm_mq.c.

References shm_mq::mq_mutex, shm_mq::mq_sender, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_receive(), shm_mq_wait_for_attach(), WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

251 {
252  PGPROC *sender;
253 
255  sender = mq->mq_sender;
257 
258  return sender;
259 }
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:73
#define SpinLockRelease(lock)
Definition: spin.h:64
Definition: proc.h:120
PGPROC * mq_sender
Definition: shm_mq.c:75

◆ shm_mq_inc_bytes_read()

static void shm_mq_inc_bytes_read ( shm_mq mq,
Size  n 
)
static

Definition at line 1227 of file shm_mq.c.

References Assert, shm_mq::mq_bytes_read, shm_mq::mq_sender, pg_atomic_read_u64(), pg_atomic_write_u64(), pg_read_barrier, PGPROC::procLatch, and SetLatch().

Referenced by shm_mq_receive(), and shm_mq_receive_bytes().

1228 {
1229  PGPROC *sender;
1230 
1231  /*
1232  * Separate prior reads of mq_ring from the increment of mq_bytes_read
1233  * which follows. This pairs with the full barrier in
1234  * shm_mq_send_bytes(). We only need a read barrier here because the
1235  * increment of mq_bytes_read is actually a read followed by a dependent
1236  * write.
1237  */
1238  pg_read_barrier();
1239 
1240  /*
1241  * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1242  * else can be changing this value. This method should be cheaper.
1243  */
1245  pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1246 
1247  /*
1248  * We shouldn't have any bytes to read without a sender, so we can read
1249  * mq_sender here without a lock. Once it's initialized, it can't change.
1250  */
1251  sender = mq->mq_sender;
1252  Assert(sender != NULL);
1253  SetLatch(&sender->procLatch);
1254 }
void SetLatch(Latch *latch)
Definition: latch.c:505
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
Latch procLatch
Definition: proc.h:129
#define Assert(condition)
Definition: c.h:800
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:76
#define pg_read_barrier()
Definition: atomics.h:158
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
Definition: proc.h:120
PGPROC * mq_sender
Definition: shm_mq.c:75

◆ shm_mq_inc_bytes_written()

static void shm_mq_inc_bytes_written ( shm_mq mq,
Size  n 
)
static

Definition at line 1260 of file shm_mq.c.

References shm_mq::mq_bytes_written, pg_atomic_read_u64(), pg_atomic_write_u64(), and pg_write_barrier.

Referenced by shm_mq_send_bytes().

1261 {
1262  /*
1263  * Separate prior reads of mq_ring from the write of mq_bytes_written
1264  * which we're about to do. Pairs with the read barrier found in
1265  * shm_mq_receive_bytes.
1266  */
1267  pg_write_barrier();
1268 
1269  /*
1270  * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1271  * else can be changing this value. This method avoids taking the bus
1272  * lock unnecessarily.
1273  */
1276 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
#define pg_write_barrier()
Definition: atomics.h:159
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:77

◆ shm_mq_receive()

shm_mq_result shm_mq_receive ( shm_mq_handle mqh,
Size nbytesp,
void **  datap,
bool  nowait 
)

Definition at line 548 of file shm_mq.c.

References Assert, ereport, errcode(), errmsg(), ERROR, Max, MAXALIGN, MaxAllocSize, MemoryContextAlloc(), Min, shm_mq::mq_detached, shm_mq::mq_receiver, shm_mq::mq_ring_size, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_expected_bytes, shm_mq_handle::mqh_handle, MQH_INITIAL_BUFSIZE, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, MyProc, pfree(), shm_mq_counterparty_gone(), SHM_MQ_DETACHED, shm_mq_get_sender(), shm_mq_inc_bytes_read(), shm_mq_receive_bytes(), SHM_MQ_SUCCESS, shm_mq_wait_internal(), and SHM_MQ_WOULD_BLOCK.

Referenced by copy_messages(), HandleParallelMessages(), test_shm_mq(), test_shm_mq_pipelined(), and TupleQueueReaderNext().

549 {
550  shm_mq *mq = mqh->mqh_queue;
551  shm_mq_result res;
552  Size rb = 0;
553  Size nbytes;
554  void *rawdata;
555 
556  Assert(mq->mq_receiver == MyProc);
557 
558  /* We can't receive data until the sender has attached. */
559  if (!mqh->mqh_counterparty_attached)
560  {
561  if (nowait)
562  {
563  int counterparty_gone;
564 
565  /*
566  * We shouldn't return at this point at all unless the sender
567  * hasn't attached yet. However, the correct return value depends
568  * on whether the sender is still attached. If we first test
569  * whether the sender has ever attached and then test whether the
570  * sender has detached, there's a race condition: a sender that
571  * attaches and detaches very quickly might fool us into thinking
572  * the sender never attached at all. So, test whether our
573  * counterparty is definitively gone first, and only afterwards
574  * check whether the sender ever attached in the first place.
575  */
576  counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
577  if (shm_mq_get_sender(mq) == NULL)
578  {
579  if (counterparty_gone)
580  return SHM_MQ_DETACHED;
581  else
582  return SHM_MQ_WOULD_BLOCK;
583  }
584  }
585  else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
586  && shm_mq_get_sender(mq) == NULL)
587  {
588  mq->mq_detached = true;
589  return SHM_MQ_DETACHED;
590  }
591  mqh->mqh_counterparty_attached = true;
592  }
593 
594  /*
595  * If we've consumed an amount of data greater than 1/4th of the ring
596  * size, mark it consumed in shared memory. We try to avoid doing this
597  * unnecessarily when only a small amount of data has been consumed,
598  * because SetLatch() is fairly expensive and we don't want to do it too
599  * often.
600  */
601  if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
602  {
604  mqh->mqh_consume_pending = 0;
605  }
606 
607  /* Try to read, or finish reading, the length word from the buffer. */
608  while (!mqh->mqh_length_word_complete)
609  {
610  /* Try to receive the message length word. */
611  Assert(mqh->mqh_partial_bytes < sizeof(Size));
612  res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
613  nowait, &rb, &rawdata);
614  if (res != SHM_MQ_SUCCESS)
615  return res;
616 
617  /*
618  * Hopefully, we'll receive the entire message length word at once.
619  * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
620  * multiple reads.
621  */
622  if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
623  {
624  Size needed;
625 
626  nbytes = *(Size *) rawdata;
627 
628  /* If we've already got the whole message, we're done. */
629  needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
630  if (rb >= needed)
631  {
632  mqh->mqh_consume_pending += needed;
633  *nbytesp = nbytes;
634  *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
635  return SHM_MQ_SUCCESS;
636  }
637 
638  /*
639  * We don't have the whole message, but we at least have the whole
640  * length word.
641  */
642  mqh->mqh_expected_bytes = nbytes;
643  mqh->mqh_length_word_complete = true;
644  mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
645  rb -= MAXALIGN(sizeof(Size));
646  }
647  else
648  {
649  Size lengthbytes;
650 
651  /* Can't be split unless bigger than required alignment. */
652  Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
653 
654  /* Message word is split; need buffer to reassemble. */
655  if (mqh->mqh_buffer == NULL)
656  {
660  }
661  Assert(mqh->mqh_buflen >= sizeof(Size));
662 
663  /* Copy partial length word; remember to consume it. */
664  if (mqh->mqh_partial_bytes + rb > sizeof(Size))
665  lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
666  else
667  lengthbytes = rb;
668  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
669  lengthbytes);
670  mqh->mqh_partial_bytes += lengthbytes;
671  mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
672  rb -= lengthbytes;
673 
674  /* If we now have the whole word, we're ready to read payload. */
675  if (mqh->mqh_partial_bytes >= sizeof(Size))
676  {
677  Assert(mqh->mqh_partial_bytes == sizeof(Size));
678  mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
679  mqh->mqh_length_word_complete = true;
680  mqh->mqh_partial_bytes = 0;
681  }
682  }
683  }
684  nbytes = mqh->mqh_expected_bytes;
685 
686  /*
687  * Should be disallowed on the sending side already, but better check and
688  * error out on the receiver side as well rather than trying to read a
689  * prohibitively large message.
690  */
691  if (nbytes > MaxAllocSize)
692  ereport(ERROR,
693  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
694  errmsg("invalid message size %zu in shared memory queue",
695  nbytes)));
696 
697  if (mqh->mqh_partial_bytes == 0)
698  {
699  /*
700  * Try to obtain the whole message in a single chunk. If this works,
701  * we need not copy the data and can return a pointer directly into
702  * shared memory.
703  */
704  res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
705  if (res != SHM_MQ_SUCCESS)
706  return res;
707  if (rb >= nbytes)
708  {
709  mqh->mqh_length_word_complete = false;
710  mqh->mqh_consume_pending += MAXALIGN(nbytes);
711  *nbytesp = nbytes;
712  *datap = rawdata;
713  return SHM_MQ_SUCCESS;
714  }
715 
716  /*
717  * The message has wrapped the buffer. We'll need to copy it in order
718  * to return it to the client in one chunk. First, make sure we have
719  * a large enough buffer available.
720  */
721  if (mqh->mqh_buflen < nbytes)
722  {
723  Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
724 
725  /*
726  * Double the buffer size until the payload fits, but limit to
727  * MaxAllocSize.
728  */
729  while (newbuflen < nbytes)
730  newbuflen *= 2;
731  newbuflen = Min(newbuflen, MaxAllocSize);
732 
733  if (mqh->mqh_buffer != NULL)
734  {
735  pfree(mqh->mqh_buffer);
736  mqh->mqh_buffer = NULL;
737  mqh->mqh_buflen = 0;
738  }
739  mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
740  mqh->mqh_buflen = newbuflen;
741  }
742  }
743 
744  /* Loop until we've copied the entire message. */
745  for (;;)
746  {
747  Size still_needed;
748 
749  /* Copy as much as we can. */
750  Assert(mqh->mqh_partial_bytes + rb <= nbytes);
751  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
752  mqh->mqh_partial_bytes += rb;
753 
754  /*
755  * Update count of bytes that can be consumed, accounting for
756  * alignment padding. Note that this will never actually insert any
757  * padding except at the end of a message, because the buffer size is
758  * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
759  */
760  Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
761  mqh->mqh_consume_pending += MAXALIGN(rb);
762 
763  /* If we got all the data, exit the loop. */
764  if (mqh->mqh_partial_bytes >= nbytes)
765  break;
766 
767  /* Wait for some more data. */
768  still_needed = nbytes - mqh->mqh_partial_bytes;
769  res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
770  if (res != SHM_MQ_SUCCESS)
771  return res;
772  if (rb > still_needed)
773  rb = still_needed;
774  }
775 
776  /* Return the complete message, and reset for next message. */
777  *nbytesp = nbytes;
778  *datap = mqh->mqh_buffer;
779  mqh->mqh_length_word_complete = false;
780  mqh->mqh_partial_bytes = 0;
781  return SHM_MQ_SUCCESS;
782 }
Size mqh_partial_bytes
Definition: shm_mq.c:139
PGPROC * MyProc
Definition: proc.c:67
#define Min(x, y)
Definition: c.h:982
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:250
int errcode(int sqlerrcode)
Definition: elog.c:691
PGPROC * mq_receiver
Definition: shm_mq.c:74
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:135
Size mqh_consume_pending
Definition: shm_mq.c:138
bool mqh_counterparty_attached
Definition: shm_mq.c:142
char * mqh_buffer
Definition: shm_mq.c:136
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1175
bool mqh_length_word_complete
Definition: shm_mq.c:141
void pfree(void *pointer)
Definition: mcxt.c:1057
#define ERROR
Definition: elog.h:43
#define MQH_INITIAL_BUFSIZE
Definition: shm_mq.c:164
Size mqh_buflen
Definition: shm_mq.c:137
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition: shm_mq.c:1036
Size mqh_expected_bytes
Definition: shm_mq.c:140
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
Definition: shm_mq.c:1227
#define MaxAllocSize
Definition: memutils.h:40
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1136
shm_mq * mqh_queue
Definition: shm_mq.c:133
bool mq_detached
Definition: shm_mq.c:79
#define ereport(elevel,...)
Definition: elog.h:155
shm_mq_result
Definition: shm_mq.h:36
#define Max(x, y)
Definition: c.h:976
#define Assert(condition)
Definition: c.h:800
size_t Size
Definition: c.h:528
#define MAXALIGN(LEN)
Definition: c.h:753
Size mq_ring_size
Definition: shm_mq.c:78
int errmsg(const char *fmt,...)
Definition: elog.c:902
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
Definition: shm_mq.c:71
MemoryContext mqh_context
Definition: shm_mq.c:143
PGPROC * mq_sender
Definition: shm_mq.c:75

◆ shm_mq_receive_bytes()

static shm_mq_result shm_mq_receive_bytes ( shm_mq_handle mqh,
Size  bytes_needed,
bool  nowait,
Size nbytesp,
void **  datap 
)
static

Definition at line 1036 of file shm_mq.c.

References Assert, CHECK_FOR_INTERRUPTS, Min, shm_mq::mq_bytes_read, shm_mq::mq_bytes_written, shm_mq::mq_detached, shm_mq::mq_ring, shm_mq::mq_ring_offset, shm_mq::mq_ring_size, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_queue, MyLatch, pg_atomic_read_u64(), pg_read_barrier, read, ResetLatch(), SHM_MQ_DETACHED, shm_mq_inc_bytes_read(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, WAIT_EVENT_MQ_RECEIVE, WaitLatch(), WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by shm_mq_receive().

1038 {
1039  shm_mq *mq = mqh->mqh_queue;
1040  Size ringsize = mq->mq_ring_size;
1041  uint64 used;
1042  uint64 written;
1043 
1044  for (;;)
1045  {
1046  Size offset;
1047  uint64 read;
1048 
1049  /* Get bytes written, so we can compute what's available to read. */
1050  written = pg_atomic_read_u64(&mq->mq_bytes_written);
1051 
1052  /*
1053  * Get bytes read. Include bytes we could consume but have not yet
1054  * consumed.
1055  */
1056  read = pg_atomic_read_u64(&mq->mq_bytes_read) +
1057  mqh->mqh_consume_pending;
1058  used = written - read;
1059  Assert(used <= ringsize);
1060  offset = read % (uint64) ringsize;
1061 
1062  /* If we have enough data or buffer has wrapped, we're done. */
1063  if (used >= bytes_needed || offset + used >= ringsize)
1064  {
1065  *nbytesp = Min(used, ringsize - offset);
1066  *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1067 
1068  /*
1069  * Separate the read of mq_bytes_written, above, from caller's
1070  * attempt to read the data itself. Pairs with the barrier in
1071  * shm_mq_inc_bytes_written.
1072  */
1073  pg_read_barrier();
1074  return SHM_MQ_SUCCESS;
1075  }
1076 
1077  /*
1078  * Fall out before waiting if the queue has been detached.
1079  *
1080  * Note that we don't check for this until *after* considering whether
1081  * the data already available is enough, since the receiver can finish
1082  * receiving a message stored in the buffer even after the sender has
1083  * detached.
1084  */
1085  if (mq->mq_detached)
1086  {
1087  /*
1088  * If the writer advanced mq_bytes_written and then set
1089  * mq_detached, we might not have read the final value of
1090  * mq_bytes_written above. Insert a read barrier and then check
1091  * again if mq_bytes_written has advanced.
1092  */
1093  pg_read_barrier();
1094  if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
1095  continue;
1096 
1097  return SHM_MQ_DETACHED;
1098  }
1099 
1100  /*
1101  * We didn't get enough data to satisfy the request, so mark any data
1102  * previously-consumed as read to make more buffer space.
1103  */
1104  if (mqh->mqh_consume_pending > 0)
1105  {
1107  mqh->mqh_consume_pending = 0;
1108  }
1109 
1110  /* Skip manipulation of our latch if nowait = true. */
1111  if (nowait)
1112  return SHM_MQ_WOULD_BLOCK;
1113 
1114  /*
1115  * Wait for our latch to be set. It might already be set for some
1116  * unrelated reason, but that'll just result in one extra trip through
1117  * the loop. It's worth it to avoid resetting the latch at top of
1118  * loop, because setting an already-set latch is much cheaper than
1119  * setting one that has been reset.
1120  */
1123 
1124  /* Reset the latch so we don't spin. */
1126 
1127  /* An interrupt may have occurred while we were waiting. */
1129  }
1130 }
#define Min(x, y)
Definition: c.h:982
Size mqh_consume_pending
Definition: shm_mq.c:138
void ResetLatch(Latch *latch)
Definition: latch.c:588
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:390
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
Definition: shm_mq.c:1227
shm_mq * mqh_queue
Definition: shm_mq.c:133
bool mq_detached
Definition: shm_mq.c:79
#define Assert(condition)
Definition: c.h:800
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:76
#define pg_read_barrier()
Definition: atomics.h:158
size_t Size
Definition: c.h:528
Size mq_ring_size
Definition: shm_mq.c:78
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
Definition: shm_mq.c:81
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
Definition: shm_mq.c:71
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
uint8 mq_ring_offset
Definition: shm_mq.c:80
#define WL_LATCH_SET
Definition: latch.h:124
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:77
#define read(a, b, c)
Definition: win32.h:13
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ shm_mq_send()

shm_mq_result shm_mq_send ( shm_mq_handle mqh,
Size  nbytes,
const void *  data,
bool  nowait 
)

Definition at line 321 of file shm_mq.c.

References shm_mq_iovec::data, shm_mq_iovec::len, and shm_mq_sendv().

Referenced by copy_messages(), test_shm_mq(), test_shm_mq_pipelined(), and tqueueReceiveSlot().

322 {
323  shm_mq_iovec iov;
324 
325  iov.data = data;
326  iov.len = nbytes;
327 
328  return shm_mq_sendv(mqh, &iov, 1, nowait);
329 }
Size len
Definition: shm_mq.h:32
const char * data
Definition: shm_mq.h:31
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
Definition: shm_mq.c:347

◆ shm_mq_send_bytes()

static shm_mq_result shm_mq_send_bytes ( shm_mq_handle mqh,
Size  nbytes,
const void *  data,
bool  nowait,
Size bytes_written 
)
static

Definition at line 881 of file shm_mq.c.

References Assert, CHECK_FOR_INTERRUPTS, MAXALIGN, Min, shm_mq::mq_bytes_read, shm_mq::mq_bytes_written, shm_mq::mq_detached, shm_mq::mq_receiver, shm_mq::mq_ring, shm_mq::mq_ring_offset, shm_mq::mq_ring_size, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_queue, MyLatch, pg_atomic_read_u64(), pg_compiler_barrier, pg_memory_barrier, PGPROC::procLatch, ResetLatch(), SetLatch(), shm_mq_counterparty_gone(), SHM_MQ_DETACHED, shm_mq_get_receiver(), shm_mq_inc_bytes_written(), SHM_MQ_SUCCESS, shm_mq_wait_internal(), SHM_MQ_WOULD_BLOCK, WAIT_EVENT_MQ_SEND, WaitLatch(), WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by shm_mq_sendv().

883 {
884  shm_mq *mq = mqh->mqh_queue;
885  Size sent = 0;
886  uint64 used;
887  Size ringsize = mq->mq_ring_size;
888  Size available;
889 
890  while (sent < nbytes)
891  {
892  uint64 rb;
893  uint64 wb;
894 
895  /* Compute number of ring buffer bytes used and available. */
898  Assert(wb >= rb);
899  used = wb - rb;
900  Assert(used <= ringsize);
901  available = Min(ringsize - used, nbytes - sent);
902 
903  /*
904  * Bail out if the queue has been detached. Note that we would be in
905  * trouble if the compiler decided to cache the value of
906  * mq->mq_detached in a register or on the stack across loop
907  * iterations. It probably shouldn't do that anyway since we'll
908  * always return, call an external function that performs a system
909  * call, or reach a memory barrier at some point later in the loop,
910  * but just to be sure, insert a compiler barrier here.
911  */
913  if (mq->mq_detached)
914  {
915  *bytes_written = sent;
916  return SHM_MQ_DETACHED;
917  }
918 
919  if (available == 0 && !mqh->mqh_counterparty_attached)
920  {
921  /*
922  * The queue is full, so if the receiver isn't yet known to be
923  * attached, we must wait for that to happen.
924  */
925  if (nowait)
926  {
927  if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
928  {
929  *bytes_written = sent;
930  return SHM_MQ_DETACHED;
931  }
932  if (shm_mq_get_receiver(mq) == NULL)
933  {
934  *bytes_written = sent;
935  return SHM_MQ_WOULD_BLOCK;
936  }
937  }
938  else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
939  mqh->mqh_handle))
940  {
941  mq->mq_detached = true;
942  *bytes_written = sent;
943  return SHM_MQ_DETACHED;
944  }
945  mqh->mqh_counterparty_attached = true;
946 
947  /*
948  * The receiver may have read some data after attaching, so we
949  * must not wait without rechecking the queue state.
950  */
951  }
952  else if (available == 0)
953  {
954  /*
955  * Since mq->mqh_counterparty_attached is known to be true at this
956  * point, mq_receiver has been set, and it can't change once set.
957  * Therefore, we can read it without acquiring the spinlock.
958  */
961 
962  /* Skip manipulation of our latch if nowait = true. */
963  if (nowait)
964  {
965  *bytes_written = sent;
966  return SHM_MQ_WOULD_BLOCK;
967  }
968 
969  /*
970  * Wait for our latch to be set. It might already be set for some
971  * unrelated reason, but that'll just result in one extra trip
972  * through the loop. It's worth it to avoid resetting the latch
973  * at top of loop, because setting an already-set latch is much
974  * cheaper than setting one that has been reset.
975  */
978 
979  /* Reset the latch so we don't spin. */
981 
982  /* An interrupt may have occurred while we were waiting. */
984  }
985  else
986  {
987  Size offset;
988  Size sendnow;
989 
990  offset = wb % (uint64) ringsize;
991  sendnow = Min(available, ringsize - offset);
992 
993  /*
994  * Write as much data as we can via a single memcpy(). Make sure
995  * these writes happen after the read of mq_bytes_read, above.
996  * This barrier pairs with the one in shm_mq_inc_bytes_read.
997  * (Since we're separating the read of mq_bytes_read from a
998  * subsequent write to mq_ring, we need a full barrier here.)
999  */
1001  memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
1002  (char *) data + sent, sendnow);
1003  sent += sendnow;
1004 
1005  /*
1006  * Update count of bytes written, with alignment padding. Note
1007  * that this will never actually insert any padding except at the
1008  * end of a run of bytes, because the buffer size is a multiple of
1009  * MAXIMUM_ALIGNOF, and each read is as well.
1010  */
1011  Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
1012  shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
1013 
1014  /*
1015  * For efficiency, we don't set the reader's latch here. We'll do
1016  * that only when the buffer fills up or after writing an entire
1017  * message.
1018  */
1019  }
1020  }
1021 
1022  *bytes_written = sent;
1023  return SHM_MQ_SUCCESS;
1024 }
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
Definition: shm_mq.c:1260
#define Min(x, y)
Definition: c.h:982
PGPROC * mq_receiver
Definition: shm_mq.c:74
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:235
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:135
void SetLatch(Latch *latch)
Definition: latch.c:505
bool mqh_counterparty_attached
Definition: shm_mq.c:142
void ResetLatch(Latch *latch)
Definition: latch.c:588
Latch procLatch
Definition: proc.h:129
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:390
#define pg_compiler_barrier()
Definition: atomics.h:133
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1175
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1136
shm_mq * mqh_queue
Definition: shm_mq.c:133
bool mq_detached
Definition: shm_mq.c:79
#define pg_memory_barrier()
Definition: atomics.h:145
#define Assert(condition)
Definition: c.h:800
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:76
size_t Size
Definition: c.h:528
#define MAXALIGN(LEN)
Definition: c.h:753
Size mq_ring_size
Definition: shm_mq.c:78
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
Definition: shm_mq.c:81
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
Definition: shm_mq.c:71
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
uint8 mq_ring_offset
Definition: shm_mq.c:80
#define WL_LATCH_SET
Definition: latch.h:124
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:77
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ shm_mq_sendv()

shm_mq_result shm_mq_sendv ( shm_mq_handle mqh,
shm_mq_iovec iov,
int  iovcnt,
bool  nowait 
)

Definition at line 347 of file shm_mq.c.

References Assert, shm_mq_iovec::data, ereport, errcode(), errmsg(), ERROR, i, shm_mq_iovec::len, MAXALIGN_DOWN, MaxAllocSize, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, MyProc, PGPROC::procLatch, SetLatch(), SHM_MQ_DETACHED, shm_mq_send_bytes(), SHM_MQ_SUCCESS, SpinLockAcquire, SpinLockRelease, and tmpbuf.

Referenced by mq_putmessage(), and shm_mq_send().

348 {
349  shm_mq_result res;
350  shm_mq *mq = mqh->mqh_queue;
351  PGPROC *receiver;
352  Size nbytes = 0;
353  Size bytes_written;
354  int i;
355  int which_iov = 0;
356  Size offset;
357 
358  Assert(mq->mq_sender == MyProc);
359 
360  /* Compute total size of write. */
361  for (i = 0; i < iovcnt; ++i)
362  nbytes += iov[i].len;
363 
364  /* Prevent writing messages overwhelming the receiver. */
365  if (nbytes > MaxAllocSize)
366  ereport(ERROR,
367  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
368  errmsg("cannot send a message of size %zu via shared memory queue",
369  nbytes)));
370 
371  /* Try to write, or finish writing, the length word into the buffer. */
372  while (!mqh->mqh_length_word_complete)
373  {
374  Assert(mqh->mqh_partial_bytes < sizeof(Size));
375  res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
376  ((char *) &nbytes) + mqh->mqh_partial_bytes,
377  nowait, &bytes_written);
378 
379  if (res == SHM_MQ_DETACHED)
380  {
381  /* Reset state in case caller tries to send another message. */
382  mqh->mqh_partial_bytes = 0;
383  mqh->mqh_length_word_complete = false;
384  return res;
385  }
386  mqh->mqh_partial_bytes += bytes_written;
387 
388  if (mqh->mqh_partial_bytes >= sizeof(Size))
389  {
390  Assert(mqh->mqh_partial_bytes == sizeof(Size));
391 
392  mqh->mqh_partial_bytes = 0;
393  mqh->mqh_length_word_complete = true;
394  }
395 
396  if (res != SHM_MQ_SUCCESS)
397  return res;
398 
399  /* Length word can't be split unless bigger than required alignment. */
400  Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
401  }
402 
403  /* Write the actual data bytes into the buffer. */
404  Assert(mqh->mqh_partial_bytes <= nbytes);
405  offset = mqh->mqh_partial_bytes;
406  do
407  {
408  Size chunksize;
409 
410  /* Figure out which bytes need to be sent next. */
411  if (offset >= iov[which_iov].len)
412  {
413  offset -= iov[which_iov].len;
414  ++which_iov;
415  if (which_iov >= iovcnt)
416  break;
417  continue;
418  }
419 
420  /*
421  * We want to avoid copying the data if at all possible, but every
422  * chunk of bytes we write into the queue has to be MAXALIGN'd, except
423  * the last. Thus, if a chunk other than the last one ends on a
424  * non-MAXALIGN'd boundary, we have to combine the tail end of its
425  * data with data from one or more following chunks until we either
426  * reach the last chunk or accumulate a number of bytes which is
427  * MAXALIGN'd.
428  */
429  if (which_iov + 1 < iovcnt &&
430  offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
431  {
432  char tmpbuf[MAXIMUM_ALIGNOF];
433  int j = 0;
434 
435  for (;;)
436  {
437  if (offset < iov[which_iov].len)
438  {
439  tmpbuf[j] = iov[which_iov].data[offset];
440  j++;
441  offset++;
442  if (j == MAXIMUM_ALIGNOF)
443  break;
444  }
445  else
446  {
447  offset -= iov[which_iov].len;
448  which_iov++;
449  if (which_iov >= iovcnt)
450  break;
451  }
452  }
453 
454  res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
455 
456  if (res == SHM_MQ_DETACHED)
457  {
458  /* Reset state in case caller tries to send another message. */
459  mqh->mqh_partial_bytes = 0;
460  mqh->mqh_length_word_complete = false;
461  return res;
462  }
463 
464  mqh->mqh_partial_bytes += bytes_written;
465  if (res != SHM_MQ_SUCCESS)
466  return res;
467  continue;
468  }
469 
470  /*
471  * If this is the last chunk, we can write all the data, even if it
472  * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
473  * MAXALIGN_DOWN the write size.
474  */
475  chunksize = iov[which_iov].len - offset;
476  if (which_iov + 1 < iovcnt)
477  chunksize = MAXALIGN_DOWN(chunksize);
478  res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
479  nowait, &bytes_written);
480 
481  if (res == SHM_MQ_DETACHED)
482  {
483  /* Reset state in case caller tries to send another message. */
484  mqh->mqh_length_word_complete = false;
485  mqh->mqh_partial_bytes = 0;
486  return res;
487  }
488 
489  mqh->mqh_partial_bytes += bytes_written;
490  offset += bytes_written;
491  if (res != SHM_MQ_SUCCESS)
492  return res;
493  } while (mqh->mqh_partial_bytes < nbytes);
494 
495  /* Reset for next message. */
496  mqh->mqh_partial_bytes = 0;
497  mqh->mqh_length_word_complete = false;
498 
499  /* If queue has been detached, let caller know. */
500  if (mq->mq_detached)
501  return SHM_MQ_DETACHED;
502 
503  /*
504  * If the counterparty is known to have attached, we can read mq_receiver
505  * without acquiring the spinlock and assume it isn't NULL. Otherwise,
506  * more caution is needed.
507  */
508  if (mqh->mqh_counterparty_attached)
509  receiver = mq->mq_receiver;
510  else
511  {
513  receiver = mq->mq_receiver;
515  if (receiver == NULL)
516  return SHM_MQ_SUCCESS;
517  mqh->mqh_counterparty_attached = true;
518  }
519 
520  /* Notify receiver of the newly-written data, and return. */
521  SetLatch(&receiver->procLatch);
522  return SHM_MQ_SUCCESS;
523 }
Size mqh_partial_bytes
Definition: shm_mq.c:139
PGPROC * MyProc
Definition: proc.c:67
int errcode(int sqlerrcode)
Definition: elog.c:691
PGPROC * mq_receiver
Definition: shm_mq.c:74
void SetLatch(Latch *latch)
Definition: latch.c:505
bool mqh_counterparty_attached
Definition: shm_mq.c:142
Latch procLatch
Definition: proc.h:129
Size len
Definition: shm_mq.h:32
#define SpinLockAcquire(lock)
Definition: spin.h:62
bool mqh_length_word_complete
Definition: shm_mq.c:141
const char * data
Definition: shm_mq.h:31
#define ERROR
Definition: elog.h:43
slock_t mq_mutex
Definition: shm_mq.c:73
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition: shm_mq.c:881
#define MaxAllocSize
Definition: memutils.h:40
#define SpinLockRelease(lock)
Definition: spin.h:64
shm_mq * mqh_queue
Definition: shm_mq.c:133
bool mq_detached
Definition: shm_mq.c:79
#define ereport(elevel,...)
Definition: elog.h:155
shm_mq_result
Definition: shm_mq.h:36
#define Assert(condition)
Definition: c.h:800
size_t Size
Definition: c.h:528
static StringInfoData tmpbuf
Definition: walsender.c:159
int errmsg(const char *fmt,...)
Definition: elog.c:902
int i
Definition: shm_mq.c:71
Definition: proc.h:120
#define MAXALIGN_DOWN(LEN)
Definition: c.h:765
PGPROC * mq_sender
Definition: shm_mq.c:75

◆ shm_mq_set_handle()

void shm_mq_set_handle ( shm_mq_handle mqh,
BackgroundWorkerHandle handle 
)

Definition at line 311 of file shm_mq.c.

References Assert, and shm_mq_handle::mqh_handle.

Referenced by ExecParallelCreateReaders(), and LaunchParallelWorkers().

312 {
313  Assert(mqh->mqh_handle == NULL);
314  mqh->mqh_handle = handle;
315 }
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:135
#define Assert(condition)
Definition: c.h:800

◆ shm_mq_set_receiver()

void shm_mq_set_receiver ( shm_mq mq,
PGPROC proc 
)

Definition at line 199 of file shm_mq.c.

References Assert, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by attach_to_queues(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

200 {
201  PGPROC *sender;
202 
204  Assert(mq->mq_receiver == NULL);
205  mq->mq_receiver = proc;
206  sender = mq->mq_sender;
208 
209  if (sender != NULL)
210  SetLatch(&sender->procLatch);
211 }
PGPROC * mq_receiver
Definition: shm_mq.c:74
void SetLatch(Latch *latch)
Definition: latch.c:505
Latch procLatch
Definition: proc.h:129
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:73
#define SpinLockRelease(lock)
Definition: spin.h:64
#define Assert(condition)
Definition: c.h:800
Definition: proc.h:120
PGPROC * mq_sender
Definition: shm_mq.c:75

◆ shm_mq_set_sender()

void shm_mq_set_sender ( shm_mq mq,
PGPROC proc 
)

Definition at line 217 of file shm_mq.c.

References Assert, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ParallelWorkerMain(), and setup_dynamic_shared_memory().

218 {
219  PGPROC *receiver;
220 
222  Assert(mq->mq_sender == NULL);
223  mq->mq_sender = proc;
224  receiver = mq->mq_receiver;
226 
227  if (receiver != NULL)
228  SetLatch(&receiver->procLatch);
229 }
PGPROC * mq_receiver
Definition: shm_mq.c:74
void SetLatch(Latch *latch)
Definition: latch.c:505
Latch procLatch
Definition: proc.h:129
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:73
#define SpinLockRelease(lock)
Definition: spin.h:64
#define Assert(condition)
Definition: c.h:800
Definition: proc.h:120
PGPROC * mq_sender
Definition: shm_mq.c:75

◆ shm_mq_wait_for_attach()

shm_mq_result shm_mq_wait_for_attach ( shm_mq_handle mqh)

Definition at line 794 of file shm_mq.c.

References Assert, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_queue, MyProc, SHM_MQ_DETACHED, shm_mq_get_receiver(), shm_mq_get_sender(), SHM_MQ_SUCCESS, and shm_mq_wait_internal().

795 {
796  shm_mq *mq = mqh->mqh_queue;
797  PGPROC **victim;
798 
799  if (shm_mq_get_receiver(mq) == MyProc)
800  victim = &mq->mq_sender;
801  else
802  {
804  victim = &mq->mq_receiver;
805  }
806 
807  if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
808  return SHM_MQ_SUCCESS;
809  else
810  return SHM_MQ_DETACHED;
811 }
PGPROC * MyProc
Definition: proc.c:67
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:250
PGPROC * mq_receiver
Definition: shm_mq.c:74
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:235
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:135
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1175
shm_mq * mqh_queue
Definition: shm_mq.c:133
#define Assert(condition)
Definition: c.h:800
Definition: shm_mq.c:71
Definition: proc.h:120
PGPROC * mq_sender
Definition: shm_mq.c:75

◆ shm_mq_wait_internal()

static bool shm_mq_wait_internal ( shm_mq mq,
PGPROC **  ptr,
BackgroundWorkerHandle handle 
)
static

Definition at line 1175 of file shm_mq.c.

References BGWH_NOT_YET_STARTED, BGWH_STARTED, CHECK_FOR_INTERRUPTS, GetBackgroundWorkerPid(), shm_mq::mq_detached, shm_mq::mq_mutex, MyLatch, ResetLatch(), SpinLockAcquire, SpinLockRelease, status(), WAIT_EVENT_MQ_INTERNAL, WaitLatch(), WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by shm_mq_receive(), shm_mq_send_bytes(), and shm_mq_wait_for_attach().

1176 {
1177  bool result = false;
1178 
1179  for (;;)
1180  {
1182  pid_t pid;
1183 
1184  /* Acquire the lock just long enough to check the pointer. */
1185  SpinLockAcquire(&mq->mq_mutex);
1186  result = (*ptr != NULL);
1187  SpinLockRelease(&mq->mq_mutex);
1188 
1189  /* Fail if detached; else succeed if initialized. */
1190  if (mq->mq_detached)
1191  {
1192  result = false;
1193  break;
1194  }
1195  if (result)
1196  break;
1197 
1198  if (handle != NULL)
1199  {
1200  /* Check for unexpected worker death. */
1201  status = GetBackgroundWorkerPid(handle, &pid);
1202  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1203  {
1204  result = false;
1205  break;
1206  }
1207  }
1208 
1209  /* Wait to be signaled. */
1212 
1213  /* Reset the latch so we don't spin. */
1215 
1216  /* An interrupt may have occurred while we were waiting. */
1218  }
1219 
1220  return result;
1221 }
void ResetLatch(Latch *latch)
Definition: latch.c:588
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:390
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:73
BgwHandleStatus
Definition: bgworker.h:102
#define SpinLockRelease(lock)
Definition: spin.h:64
bool mq_detached
Definition: shm_mq.c:79
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:227
#define WL_LATCH_SET
Definition: latch.h:124
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1030
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

Variable Documentation

◆ shm_mq_minimum_size

const Size shm_mq_minimum_size
Initial value:
=
MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF
#define MAXALIGN(LEN)
Definition: c.h:753
Definition: shm_mq.c:71
#define offsetof(type, field)
Definition: c.h:723

Definition at line 161 of file shm_mq.c.

Referenced by setup_dynamic_shared_memory().