PostgreSQL Source Code  git master
shm_mq.c File Reference
#include "postgres.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "port/pg_bitutils.h"
#include "postmaster/bgworker.h"
#include "storage/procsignal.h"
#include "storage/shm_mq.h"
#include "storage/spin.h"
#include "utils/memutils.h"
Include dependency graph for shm_mq.c:

Go to the source code of this file.

Data Structures

struct  shm_mq
 
struct  shm_mq_handle
 

Macros

#define MQH_INITIAL_BUFSIZE   8192
 

Functions

static void shm_mq_detach_internal (shm_mq *mq)
 
static shm_mq_result shm_mq_send_bytes (shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
 
static shm_mq_result shm_mq_receive_bytes (shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
 
static bool shm_mq_counterparty_gone (shm_mq *mq, BackgroundWorkerHandle *handle)
 
static bool shm_mq_wait_internal (shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
 
static void shm_mq_inc_bytes_read (shm_mq *mq, Size n)
 
static void shm_mq_inc_bytes_written (shm_mq *mq, Size n)
 
static void shm_mq_detach_callback (dsm_segment *seg, Datum arg)
 
shm_mqshm_mq_create (void *address, Size size)
 
void shm_mq_set_receiver (shm_mq *mq, PGPROC *proc)
 
void shm_mq_set_sender (shm_mq *mq, PGPROC *proc)
 
PGPROCshm_mq_get_receiver (shm_mq *mq)
 
PGPROCshm_mq_get_sender (shm_mq *mq)
 
shm_mq_handleshm_mq_attach (shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 
void shm_mq_set_handle (shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
 
shm_mq_result shm_mq_send (shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
 
shm_mq_result shm_mq_sendv (shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
 
shm_mq_result shm_mq_receive (shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 
shm_mq_result shm_mq_wait_for_attach (shm_mq_handle *mqh)
 
void shm_mq_detach (shm_mq_handle *mqh)
 
shm_mqshm_mq_get_queue (shm_mq_handle *mqh)
 

Variables

const Size shm_mq_minimum_size
 

Macro Definition Documentation

◆ MQH_INITIAL_BUFSIZE

#define MQH_INITIAL_BUFSIZE   8192

Definition at line 165 of file shm_mq.c.

Referenced by shm_mq_receive().

Function Documentation

◆ shm_mq_attach()

shm_mq_handle* shm_mq_attach ( shm_mq mq,
dsm_segment seg,
BackgroundWorkerHandle handle 
)

Definition at line 284 of file shm_mq.c.

References Assert, CurrentMemoryContext, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_expected_bytes, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_segment, MyProc, on_dsm_detach(), palloc(), PointerGetDatum, and shm_mq_detach_callback().

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ParallelWorkerMain(), ReinitializeParallelDSM(), and test_shm_mq_setup().

285 {
286  shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
287 
288  Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
289  mqh->mqh_queue = mq;
290  mqh->mqh_segment = seg;
291  mqh->mqh_handle = handle;
292  mqh->mqh_buffer = NULL;
293  mqh->mqh_buflen = 0;
294  mqh->mqh_consume_pending = 0;
295  mqh->mqh_partial_bytes = 0;
296  mqh->mqh_expected_bytes = 0;
297  mqh->mqh_length_word_complete = false;
298  mqh->mqh_counterparty_attached = false;
300 
301  if (seg != NULL)
303 
304  return mqh;
305 }
Size mqh_partial_bytes
Definition: shm_mq.c:140
PGPROC * MyProc
Definition: proc.c:68
#define PointerGetDatum(X)
Definition: postgres.h:600
PGPROC * mq_receiver
Definition: shm_mq.c:75
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:136
Size mqh_consume_pending
Definition: shm_mq.c:139
bool mqh_counterparty_attached
Definition: shm_mq.c:143
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1096
char * mqh_buffer
Definition: shm_mq.c:137
bool mqh_length_word_complete
Definition: shm_mq.c:142
Size mqh_buflen
Definition: shm_mq.c:138
dsm_segment * mqh_segment
Definition: shm_mq.c:135
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
Size mqh_expected_bytes
Definition: shm_mq.c:141
shm_mq * mqh_queue
Definition: shm_mq.c:134
#define Assert(condition)
Definition: c.h:804
void * palloc(Size size)
Definition: mcxt.c:1062
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1284
MemoryContext mqh_context
Definition: shm_mq.c:144
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_counterparty_gone()

static bool shm_mq_counterparty_gone ( shm_mq mq,
BackgroundWorkerHandle handle 
)
static

Definition at line 1140 of file shm_mq.c.

References BGWH_NOT_YET_STARTED, BGWH_STARTED, GetBackgroundWorkerPid(), shm_mq::mq_detached, and status().

Referenced by shm_mq_receive(), and shm_mq_send_bytes().

1141 {
1142  pid_t pid;
1143 
1144  /* If the queue has been detached, counterparty is definitely gone. */
1145  if (mq->mq_detached)
1146  return true;
1147 
1148  /* If there's a handle, check worker status. */
1149  if (handle != NULL)
1150  {
1152 
1153  /* Check for unexpected worker death. */
1154  status = GetBackgroundWorkerPid(handle, &pid);
1155  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1156  {
1157  /* Mark it detached, just to make it official. */
1158  mq->mq_detached = true;
1159  return true;
1160  }
1161  }
1162 
1163  /* Counterparty is not definitively gone. */
1164  return false;
1165 }
BgwHandleStatus
Definition: bgworker.h:102
bool mq_detached
Definition: shm_mq.c:80
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:227
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1085

◆ shm_mq_create()

shm_mq* shm_mq_create ( void *  address,
Size  size 
)

Definition at line 171 of file shm_mq.c.

References Assert, MAXALIGN, MAXALIGN_DOWN, shm_mq::mq_bytes_read, shm_mq::mq_bytes_written, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_ring, shm_mq::mq_ring_offset, shm_mq::mq_ring_size, shm_mq::mq_sender, offsetof, pg_atomic_init_u64(), and SpinLockInit.

Referenced by ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

172 {
173  shm_mq *mq = address;
174  Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
175 
176  /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
177  size = MAXALIGN_DOWN(size);
178 
179  /* Queue size must be large enough to hold some data. */
180  Assert(size > data_offset);
181 
182  /* Initialize queue header. */
183  SpinLockInit(&mq->mq_mutex);
184  mq->mq_receiver = NULL;
185  mq->mq_sender = NULL;
188  mq->mq_ring_size = size - data_offset;
189  mq->mq_detached = false;
190  mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
191 
192  return mq;
193 }
#define SpinLockInit(lock)
Definition: spin.h:60
PGPROC * mq_receiver
Definition: shm_mq.c:75
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:415
slock_t mq_mutex
Definition: shm_mq.c:74
bool mq_detached
Definition: shm_mq.c:80
#define Assert(condition)
Definition: c.h:804
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:77
size_t Size
Definition: c.h:540
#define MAXALIGN(LEN)
Definition: c.h:757
Size mq_ring_size
Definition: shm_mq.c:79
Definition: shm_mq.c:72
uint8 mq_ring_offset
Definition: shm_mq.c:81
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:78
#define offsetof(type, field)
Definition: c.h:727
#define MAXALIGN_DOWN(LEN)
Definition: c.h:769
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_detach()

void shm_mq_detach ( shm_mq_handle mqh)

Definition at line 821 of file shm_mq.c.

References cancel_on_dsm_detach(), shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_segment, pfree(), PointerGetDatum, shm_mq_detach_callback(), and shm_mq_detach_internal().

Referenced by DestroyParallelContext(), ExecParallelFinish(), HandleParallelMessage(), LaunchParallelWorkers(), mq_putmessage(), tqueueDestroyReceiver(), and tqueueShutdownReceiver().

822 {
823  /* Notify counterparty that we're outta here. */
825 
826  /* Cancel on_dsm_detach callback, if any. */
827  if (mqh->mqh_segment)
830  PointerGetDatum(mqh->mqh_queue));
831 
832  /* Release local memory associated with handle. */
833  if (mqh->mqh_buffer != NULL)
834  pfree(mqh->mqh_buffer);
835  pfree(mqh);
836 }
#define PointerGetDatum(X)
Definition: postgres.h:600
char * mqh_buffer
Definition: shm_mq.c:137
void pfree(void *pointer)
Definition: mcxt.c:1169
dsm_segment * mqh_segment
Definition: shm_mq.c:135
static void shm_mq_detach_internal(shm_mq *mq)
Definition: shm_mq.c:853
shm_mq * mqh_queue
Definition: shm_mq.c:134
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1284
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1111

◆ shm_mq_detach_callback()

static void shm_mq_detach_callback ( dsm_segment seg,
Datum  arg 
)
static

Definition at line 1284 of file shm_mq.c.

References DatumGetPointer, and shm_mq_detach_internal().

Referenced by shm_mq_attach(), and shm_mq_detach().

1285 {
1286  shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1287 
1289 }
static void shm_mq_detach_internal(shm_mq *mq)
Definition: shm_mq.c:853
#define DatumGetPointer(X)
Definition: postgres.h:593
Definition: shm_mq.c:72
void * arg

◆ shm_mq_detach_internal()

static void shm_mq_detach_internal ( shm_mq mq)
static

Definition at line 853 of file shm_mq.c.

References Assert, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, MyProc, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_detach(), and shm_mq_detach_callback().

854 {
855  PGPROC *victim;
856 
858  if (mq->mq_sender == MyProc)
859  victim = mq->mq_receiver;
860  else
861  {
862  Assert(mq->mq_receiver == MyProc);
863  victim = mq->mq_sender;
864  }
865  mq->mq_detached = true;
867 
868  if (victim != NULL)
869  SetLatch(&victim->procLatch);
870 }
PGPROC * MyProc
Definition: proc.c:68
PGPROC * mq_receiver
Definition: shm_mq.c:75
void SetLatch(Latch *latch)
Definition: latch.c:567
Latch procLatch
Definition: proc.h:130
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:74
#define SpinLockRelease(lock)
Definition: spin.h:64
bool mq_detached
Definition: shm_mq.c:80
#define Assert(condition)
Definition: c.h:804
Definition: proc.h:121
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_get_queue()

shm_mq* shm_mq_get_queue ( shm_mq_handle mqh)

Definition at line 876 of file shm_mq.c.

References shm_mq_handle::mqh_queue.

Referenced by WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

877 {
878  return mqh->mqh_queue;
879 }
shm_mq * mqh_queue
Definition: shm_mq.c:134

◆ shm_mq_get_receiver()

PGPROC* shm_mq_get_receiver ( shm_mq mq)

Definition at line 236 of file shm_mq.c.

References shm_mq::mq_mutex, shm_mq::mq_receiver, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_send_bytes(), and shm_mq_wait_for_attach().

237 {
238  PGPROC *receiver;
239 
241  receiver = mq->mq_receiver;
243 
244  return receiver;
245 }
PGPROC * mq_receiver
Definition: shm_mq.c:75
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:74
#define SpinLockRelease(lock)
Definition: spin.h:64
Definition: proc.h:121

◆ shm_mq_get_sender()

PGPROC* shm_mq_get_sender ( shm_mq mq)

Definition at line 251 of file shm_mq.c.

References shm_mq::mq_mutex, shm_mq::mq_sender, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_receive(), shm_mq_wait_for_attach(), WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

252 {
253  PGPROC *sender;
254 
256  sender = mq->mq_sender;
258 
259  return sender;
260 }
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:74
#define SpinLockRelease(lock)
Definition: spin.h:64
Definition: proc.h:121
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_inc_bytes_read()

static void shm_mq_inc_bytes_read ( shm_mq mq,
Size  n 
)
static

Definition at line 1231 of file shm_mq.c.

References Assert, shm_mq::mq_bytes_read, shm_mq::mq_sender, pg_atomic_read_u64(), pg_atomic_write_u64(), pg_read_barrier, PGPROC::procLatch, and SetLatch().

Referenced by shm_mq_receive(), and shm_mq_receive_bytes().

1232 {
1233  PGPROC *sender;
1234 
1235  /*
1236  * Separate prior reads of mq_ring from the increment of mq_bytes_read
1237  * which follows. This pairs with the full barrier in
1238  * shm_mq_send_bytes(). We only need a read barrier here because the
1239  * increment of mq_bytes_read is actually a read followed by a dependent
1240  * write.
1241  */
1242  pg_read_barrier();
1243 
1244  /*
1245  * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1246  * else can be changing this value. This method should be cheaper.
1247  */
1249  pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1250 
1251  /*
1252  * We shouldn't have any bytes to read without a sender, so we can read
1253  * mq_sender here without a lock. Once it's initialized, it can't change.
1254  */
1255  sender = mq->mq_sender;
1256  Assert(sender != NULL);
1257  SetLatch(&sender->procLatch);
1258 }
void SetLatch(Latch *latch)
Definition: latch.c:567
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
Latch procLatch
Definition: proc.h:130
#define Assert(condition)
Definition: c.h:804
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:77
#define pg_read_barrier()
Definition: atomics.h:158
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
Definition: proc.h:121
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_inc_bytes_written()

static void shm_mq_inc_bytes_written ( shm_mq mq,
Size  n 
)
static

Definition at line 1264 of file shm_mq.c.

References shm_mq::mq_bytes_written, pg_atomic_read_u64(), pg_atomic_write_u64(), and pg_write_barrier.

Referenced by shm_mq_send_bytes().

1265 {
1266  /*
1267  * Separate prior reads of mq_ring from the write of mq_bytes_written
1268  * which we're about to do. Pairs with the read barrier found in
1269  * shm_mq_receive_bytes.
1270  */
1271  pg_write_barrier();
1272 
1273  /*
1274  * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1275  * else can be changing this value. This method avoids taking the bus
1276  * lock unnecessarily.
1277  */
1280 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
#define pg_write_barrier()
Definition: atomics.h:159
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:78

◆ shm_mq_receive()

shm_mq_result shm_mq_receive ( shm_mq_handle mqh,
Size nbytesp,
void **  datap,
bool  nowait 
)

Definition at line 549 of file shm_mq.c.

References Assert, ereport, errcode(), errmsg(), ERROR, MAXALIGN, MaxAllocSize, MemoryContextAlloc(), Min, shm_mq::mq_detached, shm_mq::mq_receiver, shm_mq::mq_ring_size, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_expected_bytes, shm_mq_handle::mqh_handle, MQH_INITIAL_BUFSIZE, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, MyProc, pfree(), pg_nextpower2_32(), pg_nextpower2_64(), shm_mq_counterparty_gone(), SHM_MQ_DETACHED, shm_mq_get_sender(), shm_mq_inc_bytes_read(), shm_mq_receive_bytes(), SHM_MQ_SUCCESS, shm_mq_wait_internal(), and SHM_MQ_WOULD_BLOCK.

Referenced by copy_messages(), HandleParallelMessages(), test_shm_mq(), test_shm_mq_pipelined(), and TupleQueueReaderNext().

550 {
551  shm_mq *mq = mqh->mqh_queue;
552  shm_mq_result res;
553  Size rb = 0;
554  Size nbytes;
555  void *rawdata;
556 
557  Assert(mq->mq_receiver == MyProc);
558 
559  /* We can't receive data until the sender has attached. */
560  if (!mqh->mqh_counterparty_attached)
561  {
562  if (nowait)
563  {
564  int counterparty_gone;
565 
566  /*
567  * We shouldn't return at this point at all unless the sender
568  * hasn't attached yet. However, the correct return value depends
569  * on whether the sender is still attached. If we first test
570  * whether the sender has ever attached and then test whether the
571  * sender has detached, there's a race condition: a sender that
572  * attaches and detaches very quickly might fool us into thinking
573  * the sender never attached at all. So, test whether our
574  * counterparty is definitively gone first, and only afterwards
575  * check whether the sender ever attached in the first place.
576  */
577  counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
578  if (shm_mq_get_sender(mq) == NULL)
579  {
580  if (counterparty_gone)
581  return SHM_MQ_DETACHED;
582  else
583  return SHM_MQ_WOULD_BLOCK;
584  }
585  }
586  else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
587  && shm_mq_get_sender(mq) == NULL)
588  {
589  mq->mq_detached = true;
590  return SHM_MQ_DETACHED;
591  }
592  mqh->mqh_counterparty_attached = true;
593  }
594 
595  /*
596  * If we've consumed an amount of data greater than 1/4th of the ring
597  * size, mark it consumed in shared memory. We try to avoid doing this
598  * unnecessarily when only a small amount of data has been consumed,
599  * because SetLatch() is fairly expensive and we don't want to do it too
600  * often.
601  */
602  if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
603  {
605  mqh->mqh_consume_pending = 0;
606  }
607 
608  /* Try to read, or finish reading, the length word from the buffer. */
609  while (!mqh->mqh_length_word_complete)
610  {
611  /* Try to receive the message length word. */
612  Assert(mqh->mqh_partial_bytes < sizeof(Size));
613  res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
614  nowait, &rb, &rawdata);
615  if (res != SHM_MQ_SUCCESS)
616  return res;
617 
618  /*
619  * Hopefully, we'll receive the entire message length word at once.
620  * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
621  * multiple reads.
622  */
623  if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
624  {
625  Size needed;
626 
627  nbytes = *(Size *) rawdata;
628 
629  /* If we've already got the whole message, we're done. */
630  needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
631  if (rb >= needed)
632  {
633  mqh->mqh_consume_pending += needed;
634  *nbytesp = nbytes;
635  *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
636  return SHM_MQ_SUCCESS;
637  }
638 
639  /*
640  * We don't have the whole message, but we at least have the whole
641  * length word.
642  */
643  mqh->mqh_expected_bytes = nbytes;
644  mqh->mqh_length_word_complete = true;
645  mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
646  rb -= MAXALIGN(sizeof(Size));
647  }
648  else
649  {
650  Size lengthbytes;
651 
652  /* Can't be split unless bigger than required alignment. */
653  Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
654 
655  /* Message word is split; need buffer to reassemble. */
656  if (mqh->mqh_buffer == NULL)
657  {
661  }
662  Assert(mqh->mqh_buflen >= sizeof(Size));
663 
664  /* Copy partial length word; remember to consume it. */
665  if (mqh->mqh_partial_bytes + rb > sizeof(Size))
666  lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
667  else
668  lengthbytes = rb;
669  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
670  lengthbytes);
671  mqh->mqh_partial_bytes += lengthbytes;
672  mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
673  rb -= lengthbytes;
674 
675  /* If we now have the whole word, we're ready to read payload. */
676  if (mqh->mqh_partial_bytes >= sizeof(Size))
677  {
678  Assert(mqh->mqh_partial_bytes == sizeof(Size));
679  mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
680  mqh->mqh_length_word_complete = true;
681  mqh->mqh_partial_bytes = 0;
682  }
683  }
684  }
685  nbytes = mqh->mqh_expected_bytes;
686 
687  /*
688  * Should be disallowed on the sending side already, but better check and
689  * error out on the receiver side as well rather than trying to read a
690  * prohibitively large message.
691  */
692  if (nbytes > MaxAllocSize)
693  ereport(ERROR,
694  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
695  errmsg("invalid message size %zu in shared memory queue",
696  nbytes)));
697 
698  if (mqh->mqh_partial_bytes == 0)
699  {
700  /*
701  * Try to obtain the whole message in a single chunk. If this works,
702  * we need not copy the data and can return a pointer directly into
703  * shared memory.
704  */
705  res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
706  if (res != SHM_MQ_SUCCESS)
707  return res;
708  if (rb >= nbytes)
709  {
710  mqh->mqh_length_word_complete = false;
711  mqh->mqh_consume_pending += MAXALIGN(nbytes);
712  *nbytesp = nbytes;
713  *datap = rawdata;
714  return SHM_MQ_SUCCESS;
715  }
716 
717  /*
718  * The message has wrapped the buffer. We'll need to copy it in order
719  * to return it to the client in one chunk. First, make sure we have
720  * a large enough buffer available.
721  */
722  if (mqh->mqh_buflen < nbytes)
723  {
724  Size newbuflen;
725 
726  /*
727  * Increase size to the next power of 2 that's >= nbytes, but
728  * limit to MaxAllocSize.
729  */
730 #if SIZEOF_SIZE_T == 4
731  newbuflen = pg_nextpower2_32(nbytes);
732 #else
733  newbuflen = pg_nextpower2_64(nbytes);
734 #endif
735  newbuflen = Min(newbuflen, MaxAllocSize);
736 
737  if (mqh->mqh_buffer != NULL)
738  {
739  pfree(mqh->mqh_buffer);
740  mqh->mqh_buffer = NULL;
741  mqh->mqh_buflen = 0;
742  }
743  mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
744  mqh->mqh_buflen = newbuflen;
745  }
746  }
747 
748  /* Loop until we've copied the entire message. */
749  for (;;)
750  {
751  Size still_needed;
752 
753  /* Copy as much as we can. */
754  Assert(mqh->mqh_partial_bytes + rb <= nbytes);
755  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
756  mqh->mqh_partial_bytes += rb;
757 
758  /*
759  * Update count of bytes that can be consumed, accounting for
760  * alignment padding. Note that this will never actually insert any
761  * padding except at the end of a message, because the buffer size is
762  * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
763  */
764  Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
765  mqh->mqh_consume_pending += MAXALIGN(rb);
766 
767  /* If we got all the data, exit the loop. */
768  if (mqh->mqh_partial_bytes >= nbytes)
769  break;
770 
771  /* Wait for some more data. */
772  still_needed = nbytes - mqh->mqh_partial_bytes;
773  res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
774  if (res != SHM_MQ_SUCCESS)
775  return res;
776  if (rb > still_needed)
777  rb = still_needed;
778  }
779 
780  /* Return the complete message, and reset for next message. */
781  *nbytesp = nbytes;
782  *datap = mqh->mqh_buffer;
783  mqh->mqh_length_word_complete = false;
784  mqh->mqh_partial_bytes = 0;
785  return SHM_MQ_SUCCESS;
786 }
Size mqh_partial_bytes
Definition: shm_mq.c:140
PGPROC * MyProc
Definition: proc.c:68
#define Min(x, y)
Definition: c.h:986
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:251
int errcode(int sqlerrcode)
Definition: elog.c:698
PGPROC * mq_receiver
Definition: shm_mq.c:75
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:136
Size mqh_consume_pending
Definition: shm_mq.c:139
bool mqh_counterparty_attached
Definition: shm_mq.c:143
char * mqh_buffer
Definition: shm_mq.c:137
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1179
bool mqh_length_word_complete
Definition: shm_mq.c:142
void pfree(void *pointer)
Definition: mcxt.c:1169
#define ERROR
Definition: elog.h:46
#define MQH_INITIAL_BUFSIZE
Definition: shm_mq.c:165
Size mqh_buflen
Definition: shm_mq.c:138
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition: shm_mq.c:1040
static uint32 pg_nextpower2_32(uint32 num)
Definition: pg_bitutils.h:146
Size mqh_expected_bytes
Definition: shm_mq.c:141
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
Definition: shm_mq.c:1231
static uint64 pg_nextpower2_64(uint64 num)
Definition: pg_bitutils.h:169
#define MaxAllocSize
Definition: memutils.h:40
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1140
shm_mq * mqh_queue
Definition: shm_mq.c:134
bool mq_detached
Definition: shm_mq.c:80
#define ereport(elevel,...)
Definition: elog.h:157
shm_mq_result
Definition: shm_mq.h:36
#define Assert(condition)
Definition: c.h:804
size_t Size
Definition: c.h:540
#define MAXALIGN(LEN)
Definition: c.h:757
Size mq_ring_size
Definition: shm_mq.c:79
int errmsg(const char *fmt,...)
Definition: elog.c:909
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
Definition: shm_mq.c:72
MemoryContext mqh_context
Definition: shm_mq.c:144
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_receive_bytes()

static shm_mq_result shm_mq_receive_bytes ( shm_mq_handle mqh,
Size  bytes_needed,
bool  nowait,
Size nbytesp,
void **  datap 
)
static

Definition at line 1040 of file shm_mq.c.

References Assert, CHECK_FOR_INTERRUPTS, Min, shm_mq::mq_bytes_read, shm_mq::mq_bytes_written, shm_mq::mq_detached, shm_mq::mq_ring, shm_mq::mq_ring_offset, shm_mq::mq_ring_size, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_queue, MyLatch, pg_atomic_read_u64(), pg_read_barrier, read, ResetLatch(), SHM_MQ_DETACHED, shm_mq_inc_bytes_read(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, WAIT_EVENT_MQ_RECEIVE, WaitLatch(), WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by shm_mq_receive().

1042 {
1043  shm_mq *mq = mqh->mqh_queue;
1044  Size ringsize = mq->mq_ring_size;
1045  uint64 used;
1046  uint64 written;
1047 
1048  for (;;)
1049  {
1050  Size offset;
1051  uint64 read;
1052 
1053  /* Get bytes written, so we can compute what's available to read. */
1054  written = pg_atomic_read_u64(&mq->mq_bytes_written);
1055 
1056  /*
1057  * Get bytes read. Include bytes we could consume but have not yet
1058  * consumed.
1059  */
1060  read = pg_atomic_read_u64(&mq->mq_bytes_read) +
1061  mqh->mqh_consume_pending;
1062  used = written - read;
1063  Assert(used <= ringsize);
1064  offset = read % (uint64) ringsize;
1065 
1066  /* If we have enough data or buffer has wrapped, we're done. */
1067  if (used >= bytes_needed || offset + used >= ringsize)
1068  {
1069  *nbytesp = Min(used, ringsize - offset);
1070  *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1071 
1072  /*
1073  * Separate the read of mq_bytes_written, above, from caller's
1074  * attempt to read the data itself. Pairs with the barrier in
1075  * shm_mq_inc_bytes_written.
1076  */
1077  pg_read_barrier();
1078  return SHM_MQ_SUCCESS;
1079  }
1080 
1081  /*
1082  * Fall out before waiting if the queue has been detached.
1083  *
1084  * Note that we don't check for this until *after* considering whether
1085  * the data already available is enough, since the receiver can finish
1086  * receiving a message stored in the buffer even after the sender has
1087  * detached.
1088  */
1089  if (mq->mq_detached)
1090  {
1091  /*
1092  * If the writer advanced mq_bytes_written and then set
1093  * mq_detached, we might not have read the final value of
1094  * mq_bytes_written above. Insert a read barrier and then check
1095  * again if mq_bytes_written has advanced.
1096  */
1097  pg_read_barrier();
1098  if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
1099  continue;
1100 
1101  return SHM_MQ_DETACHED;
1102  }
1103 
1104  /*
1105  * We didn't get enough data to satisfy the request, so mark any data
1106  * previously-consumed as read to make more buffer space.
1107  */
1108  if (mqh->mqh_consume_pending > 0)
1109  {
1111  mqh->mqh_consume_pending = 0;
1112  }
1113 
1114  /* Skip manipulation of our latch if nowait = true. */
1115  if (nowait)
1116  return SHM_MQ_WOULD_BLOCK;
1117 
1118  /*
1119  * Wait for our latch to be set. It might already be set for some
1120  * unrelated reason, but that'll just result in one extra trip through
1121  * the loop. It's worth it to avoid resetting the latch at top of
1122  * loop, because setting an already-set latch is much cheaper than
1123  * setting one that has been reset.
1124  */
1127 
1128  /* Reset the latch so we don't spin. */
1130 
1131  /* An interrupt may have occurred while we were waiting. */
1133  }
1134 }
#define Min(x, y)
Definition: c.h:986
Size mqh_consume_pending
Definition: shm_mq.c:139
void ResetLatch(Latch *latch)
Definition: latch.c:660
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:452
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
Definition: shm_mq.c:1231
shm_mq * mqh_queue
Definition: shm_mq.c:134
bool mq_detached
Definition: shm_mq.c:80
#define Assert(condition)
Definition: c.h:804
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:77
#define pg_read_barrier()
Definition: atomics.h:158
size_t Size
Definition: c.h:540
Size mq_ring_size
Definition: shm_mq.c:79
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
Definition: shm_mq.c:82
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
Definition: shm_mq.c:72
struct Latch * MyLatch
Definition: globals.c:57
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
uint8 mq_ring_offset
Definition: shm_mq.c:81
#define WL_LATCH_SET
Definition: latch.h:125
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:78
#define read(a, b, c)
Definition: win32.h:13
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130

◆ shm_mq_send()

shm_mq_result shm_mq_send ( shm_mq_handle mqh,
Size  nbytes,
const void *  data,
bool  nowait 
)

Definition at line 322 of file shm_mq.c.

References shm_mq_iovec::data, shm_mq_iovec::len, and shm_mq_sendv().

Referenced by copy_messages(), test_shm_mq(), test_shm_mq_pipelined(), and tqueueReceiveSlot().

323 {
324  shm_mq_iovec iov;
325 
326  iov.data = data;
327  iov.len = nbytes;
328 
329  return shm_mq_sendv(mqh, &iov, 1, nowait);
330 }
Size len
Definition: shm_mq.h:32
const char * data
Definition: shm_mq.h:31
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
Definition: shm_mq.c:348

◆ shm_mq_send_bytes()

static shm_mq_result shm_mq_send_bytes ( shm_mq_handle mqh,
Size  nbytes,
const void *  data,
bool  nowait,
Size bytes_written 
)
static

Definition at line 885 of file shm_mq.c.

References Assert, CHECK_FOR_INTERRUPTS, MAXALIGN, Min, shm_mq::mq_bytes_read, shm_mq::mq_bytes_written, shm_mq::mq_detached, shm_mq::mq_receiver, shm_mq::mq_ring, shm_mq::mq_ring_offset, shm_mq::mq_ring_size, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_queue, MyLatch, pg_atomic_read_u64(), pg_compiler_barrier, pg_memory_barrier, PGPROC::procLatch, ResetLatch(), SetLatch(), shm_mq_counterparty_gone(), SHM_MQ_DETACHED, shm_mq_get_receiver(), shm_mq_inc_bytes_written(), SHM_MQ_SUCCESS, shm_mq_wait_internal(), SHM_MQ_WOULD_BLOCK, WAIT_EVENT_MQ_SEND, WaitLatch(), WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by shm_mq_sendv().

887 {
888  shm_mq *mq = mqh->mqh_queue;
889  Size sent = 0;
890  uint64 used;
891  Size ringsize = mq->mq_ring_size;
892  Size available;
893 
894  while (sent < nbytes)
895  {
896  uint64 rb;
897  uint64 wb;
898 
899  /* Compute number of ring buffer bytes used and available. */
902  Assert(wb >= rb);
903  used = wb - rb;
904  Assert(used <= ringsize);
905  available = Min(ringsize - used, nbytes - sent);
906 
907  /*
908  * Bail out if the queue has been detached. Note that we would be in
909  * trouble if the compiler decided to cache the value of
910  * mq->mq_detached in a register or on the stack across loop
911  * iterations. It probably shouldn't do that anyway since we'll
912  * always return, call an external function that performs a system
913  * call, or reach a memory barrier at some point later in the loop,
914  * but just to be sure, insert a compiler barrier here.
915  */
917  if (mq->mq_detached)
918  {
919  *bytes_written = sent;
920  return SHM_MQ_DETACHED;
921  }
922 
923  if (available == 0 && !mqh->mqh_counterparty_attached)
924  {
925  /*
926  * The queue is full, so if the receiver isn't yet known to be
927  * attached, we must wait for that to happen.
928  */
929  if (nowait)
930  {
931  if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
932  {
933  *bytes_written = sent;
934  return SHM_MQ_DETACHED;
935  }
936  if (shm_mq_get_receiver(mq) == NULL)
937  {
938  *bytes_written = sent;
939  return SHM_MQ_WOULD_BLOCK;
940  }
941  }
942  else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
943  mqh->mqh_handle))
944  {
945  mq->mq_detached = true;
946  *bytes_written = sent;
947  return SHM_MQ_DETACHED;
948  }
949  mqh->mqh_counterparty_attached = true;
950 
951  /*
952  * The receiver may have read some data after attaching, so we
953  * must not wait without rechecking the queue state.
954  */
955  }
956  else if (available == 0)
957  {
958  /*
959  * Since mq->mqh_counterparty_attached is known to be true at this
960  * point, mq_receiver has been set, and it can't change once set.
961  * Therefore, we can read it without acquiring the spinlock.
962  */
965 
966  /* Skip manipulation of our latch if nowait = true. */
967  if (nowait)
968  {
969  *bytes_written = sent;
970  return SHM_MQ_WOULD_BLOCK;
971  }
972 
973  /*
974  * Wait for our latch to be set. It might already be set for some
975  * unrelated reason, but that'll just result in one extra trip
976  * through the loop. It's worth it to avoid resetting the latch
977  * at top of loop, because setting an already-set latch is much
978  * cheaper than setting one that has been reset.
979  */
982 
983  /* Reset the latch so we don't spin. */
985 
986  /* An interrupt may have occurred while we were waiting. */
988  }
989  else
990  {
991  Size offset;
992  Size sendnow;
993 
994  offset = wb % (uint64) ringsize;
995  sendnow = Min(available, ringsize - offset);
996 
997  /*
998  * Write as much data as we can via a single memcpy(). Make sure
999  * these writes happen after the read of mq_bytes_read, above.
1000  * This barrier pairs with the one in shm_mq_inc_bytes_read.
1001  * (Since we're separating the read of mq_bytes_read from a
1002  * subsequent write to mq_ring, we need a full barrier here.)
1003  */
1005  memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
1006  (char *) data + sent, sendnow);
1007  sent += sendnow;
1008 
1009  /*
1010  * Update count of bytes written, with alignment padding. Note
1011  * that this will never actually insert any padding except at the
1012  * end of a run of bytes, because the buffer size is a multiple of
1013  * MAXIMUM_ALIGNOF, and each read is as well.
1014  */
1015  Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
1016  shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
1017 
1018  /*
1019  * For efficiency, we don't set the reader's latch here. We'll do
1020  * that only when the buffer fills up or after writing an entire
1021  * message.
1022  */
1023  }
1024  }
1025 
1026  *bytes_written = sent;
1027  return SHM_MQ_SUCCESS;
1028 }
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
Definition: shm_mq.c:1264
#define Min(x, y)
Definition: c.h:986
PGPROC * mq_receiver
Definition: shm_mq.c:75
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:236
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:136
void SetLatch(Latch *latch)
Definition: latch.c:567
bool mqh_counterparty_attached
Definition: shm_mq.c:143
void ResetLatch(Latch *latch)
Definition: latch.c:660
Latch procLatch
Definition: proc.h:130
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:452
#define pg_compiler_barrier()
Definition: atomics.h:133
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1179
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1140
shm_mq * mqh_queue
Definition: shm_mq.c:134
bool mq_detached
Definition: shm_mq.c:80
#define pg_memory_barrier()
Definition: atomics.h:145
#define Assert(condition)
Definition: c.h:804
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:77
size_t Size
Definition: c.h:540
#define MAXALIGN(LEN)
Definition: c.h:757
Size mq_ring_size
Definition: shm_mq.c:79
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
Definition: shm_mq.c:82
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
Definition: shm_mq.c:72
struct Latch * MyLatch
Definition: globals.c:57
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
uint8 mq_ring_offset
Definition: shm_mq.c:81
#define WL_LATCH_SET
Definition: latch.h:125
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:78
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130

◆ shm_mq_sendv()

shm_mq_result shm_mq_sendv ( shm_mq_handle mqh,
shm_mq_iovec iov,
int  iovcnt,
bool  nowait 
)

Definition at line 348 of file shm_mq.c.

References Assert, shm_mq_iovec::data, ereport, errcode(), errmsg(), ERROR, i, shm_mq_iovec::len, MAXALIGN_DOWN, MaxAllocSize, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, MyProc, PGPROC::procLatch, SetLatch(), SHM_MQ_DETACHED, shm_mq_send_bytes(), SHM_MQ_SUCCESS, SpinLockAcquire, SpinLockRelease, and tmpbuf.

Referenced by mq_putmessage(), and shm_mq_send().

349 {
350  shm_mq_result res;
351  shm_mq *mq = mqh->mqh_queue;
352  PGPROC *receiver;
353  Size nbytes = 0;
354  Size bytes_written;
355  int i;
356  int which_iov = 0;
357  Size offset;
358 
359  Assert(mq->mq_sender == MyProc);
360 
361  /* Compute total size of write. */
362  for (i = 0; i < iovcnt; ++i)
363  nbytes += iov[i].len;
364 
365  /* Prevent writing messages overwhelming the receiver. */
366  if (nbytes > MaxAllocSize)
367  ereport(ERROR,
368  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
369  errmsg("cannot send a message of size %zu via shared memory queue",
370  nbytes)));
371 
372  /* Try to write, or finish writing, the length word into the buffer. */
373  while (!mqh->mqh_length_word_complete)
374  {
375  Assert(mqh->mqh_partial_bytes < sizeof(Size));
376  res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
377  ((char *) &nbytes) + mqh->mqh_partial_bytes,
378  nowait, &bytes_written);
379 
380  if (res == SHM_MQ_DETACHED)
381  {
382  /* Reset state in case caller tries to send another message. */
383  mqh->mqh_partial_bytes = 0;
384  mqh->mqh_length_word_complete = false;
385  return res;
386  }
387  mqh->mqh_partial_bytes += bytes_written;
388 
389  if (mqh->mqh_partial_bytes >= sizeof(Size))
390  {
391  Assert(mqh->mqh_partial_bytes == sizeof(Size));
392 
393  mqh->mqh_partial_bytes = 0;
394  mqh->mqh_length_word_complete = true;
395  }
396 
397  if (res != SHM_MQ_SUCCESS)
398  return res;
399 
400  /* Length word can't be split unless bigger than required alignment. */
401  Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
402  }
403 
404  /* Write the actual data bytes into the buffer. */
405  Assert(mqh->mqh_partial_bytes <= nbytes);
406  offset = mqh->mqh_partial_bytes;
407  do
408  {
409  Size chunksize;
410 
411  /* Figure out which bytes need to be sent next. */
412  if (offset >= iov[which_iov].len)
413  {
414  offset -= iov[which_iov].len;
415  ++which_iov;
416  if (which_iov >= iovcnt)
417  break;
418  continue;
419  }
420 
421  /*
422  * We want to avoid copying the data if at all possible, but every
423  * chunk of bytes we write into the queue has to be MAXALIGN'd, except
424  * the last. Thus, if a chunk other than the last one ends on a
425  * non-MAXALIGN'd boundary, we have to combine the tail end of its
426  * data with data from one or more following chunks until we either
427  * reach the last chunk or accumulate a number of bytes which is
428  * MAXALIGN'd.
429  */
430  if (which_iov + 1 < iovcnt &&
431  offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
432  {
433  char tmpbuf[MAXIMUM_ALIGNOF];
434  int j = 0;
435 
436  for (;;)
437  {
438  if (offset < iov[which_iov].len)
439  {
440  tmpbuf[j] = iov[which_iov].data[offset];
441  j++;
442  offset++;
443  if (j == MAXIMUM_ALIGNOF)
444  break;
445  }
446  else
447  {
448  offset -= iov[which_iov].len;
449  which_iov++;
450  if (which_iov >= iovcnt)
451  break;
452  }
453  }
454 
455  res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
456 
457  if (res == SHM_MQ_DETACHED)
458  {
459  /* Reset state in case caller tries to send another message. */
460  mqh->mqh_partial_bytes = 0;
461  mqh->mqh_length_word_complete = false;
462  return res;
463  }
464 
465  mqh->mqh_partial_bytes += bytes_written;
466  if (res != SHM_MQ_SUCCESS)
467  return res;
468  continue;
469  }
470 
471  /*
472  * If this is the last chunk, we can write all the data, even if it
473  * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
474  * MAXALIGN_DOWN the write size.
475  */
476  chunksize = iov[which_iov].len - offset;
477  if (which_iov + 1 < iovcnt)
478  chunksize = MAXALIGN_DOWN(chunksize);
479  res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
480  nowait, &bytes_written);
481 
482  if (res == SHM_MQ_DETACHED)
483  {
484  /* Reset state in case caller tries to send another message. */
485  mqh->mqh_length_word_complete = false;
486  mqh->mqh_partial_bytes = 0;
487  return res;
488  }
489 
490  mqh->mqh_partial_bytes += bytes_written;
491  offset += bytes_written;
492  if (res != SHM_MQ_SUCCESS)
493  return res;
494  } while (mqh->mqh_partial_bytes < nbytes);
495 
496  /* Reset for next message. */
497  mqh->mqh_partial_bytes = 0;
498  mqh->mqh_length_word_complete = false;
499 
500  /* If queue has been detached, let caller know. */
501  if (mq->mq_detached)
502  return SHM_MQ_DETACHED;
503 
504  /*
505  * If the counterparty is known to have attached, we can read mq_receiver
506  * without acquiring the spinlock and assume it isn't NULL. Otherwise,
507  * more caution is needed.
508  */
509  if (mqh->mqh_counterparty_attached)
510  receiver = mq->mq_receiver;
511  else
512  {
514  receiver = mq->mq_receiver;
516  if (receiver == NULL)
517  return SHM_MQ_SUCCESS;
518  mqh->mqh_counterparty_attached = true;
519  }
520 
521  /* Notify receiver of the newly-written data, and return. */
522  SetLatch(&receiver->procLatch);
523  return SHM_MQ_SUCCESS;
524 }
Size mqh_partial_bytes
Definition: shm_mq.c:140
PGPROC * MyProc
Definition: proc.c:68
int errcode(int sqlerrcode)
Definition: elog.c:698
PGPROC * mq_receiver
Definition: shm_mq.c:75
void SetLatch(Latch *latch)
Definition: latch.c:567
bool mqh_counterparty_attached
Definition: shm_mq.c:143
Latch procLatch
Definition: proc.h:130
Size len
Definition: shm_mq.h:32
#define SpinLockAcquire(lock)
Definition: spin.h:62
bool mqh_length_word_complete
Definition: shm_mq.c:142
const char * data
Definition: shm_mq.h:31
#define ERROR
Definition: elog.h:46
slock_t mq_mutex
Definition: shm_mq.c:74
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition: shm_mq.c:885
#define MaxAllocSize
Definition: memutils.h:40
#define SpinLockRelease(lock)
Definition: spin.h:64
shm_mq * mqh_queue
Definition: shm_mq.c:134
bool mq_detached
Definition: shm_mq.c:80
#define ereport(elevel,...)
Definition: elog.h:157
shm_mq_result
Definition: shm_mq.h:36
#define Assert(condition)
Definition: c.h:804
size_t Size
Definition: c.h:540
static StringInfoData tmpbuf
Definition: walsender.c:159
int errmsg(const char *fmt,...)
Definition: elog.c:909
int i
Definition: shm_mq.c:72
Definition: proc.h:121
#define MAXALIGN_DOWN(LEN)
Definition: c.h:769
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_set_handle()

void shm_mq_set_handle ( shm_mq_handle mqh,
BackgroundWorkerHandle handle 
)

Definition at line 312 of file shm_mq.c.

References Assert, and shm_mq_handle::mqh_handle.

Referenced by ExecParallelCreateReaders(), and LaunchParallelWorkers().

313 {
314  Assert(mqh->mqh_handle == NULL);
315  mqh->mqh_handle = handle;
316 }
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:136
#define Assert(condition)
Definition: c.h:804

◆ shm_mq_set_receiver()

void shm_mq_set_receiver ( shm_mq mq,
PGPROC proc 
)

Definition at line 200 of file shm_mq.c.

References Assert, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by attach_to_queues(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

201 {
202  PGPROC *sender;
203 
205  Assert(mq->mq_receiver == NULL);
206  mq->mq_receiver = proc;
207  sender = mq->mq_sender;
209 
210  if (sender != NULL)
211  SetLatch(&sender->procLatch);
212 }
PGPROC * mq_receiver
Definition: shm_mq.c:75
void SetLatch(Latch *latch)
Definition: latch.c:567
Latch procLatch
Definition: proc.h:130
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:74
#define SpinLockRelease(lock)
Definition: spin.h:64
#define Assert(condition)
Definition: c.h:804
Definition: proc.h:121
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_set_sender()

void shm_mq_set_sender ( shm_mq mq,
PGPROC proc 
)

Definition at line 218 of file shm_mq.c.

References Assert, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ParallelWorkerMain(), and setup_dynamic_shared_memory().

219 {
220  PGPROC *receiver;
221 
223  Assert(mq->mq_sender == NULL);
224  mq->mq_sender = proc;
225  receiver = mq->mq_receiver;
227 
228  if (receiver != NULL)
229  SetLatch(&receiver->procLatch);
230 }
PGPROC * mq_receiver
Definition: shm_mq.c:75
void SetLatch(Latch *latch)
Definition: latch.c:567
Latch procLatch
Definition: proc.h:130
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:74
#define SpinLockRelease(lock)
Definition: spin.h:64
#define Assert(condition)
Definition: c.h:804
Definition: proc.h:121
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_wait_for_attach()

shm_mq_result shm_mq_wait_for_attach ( shm_mq_handle mqh)

Definition at line 798 of file shm_mq.c.

References Assert, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_queue, MyProc, SHM_MQ_DETACHED, shm_mq_get_receiver(), shm_mq_get_sender(), SHM_MQ_SUCCESS, and shm_mq_wait_internal().

799 {
800  shm_mq *mq = mqh->mqh_queue;
801  PGPROC **victim;
802 
803  if (shm_mq_get_receiver(mq) == MyProc)
804  victim = &mq->mq_sender;
805  else
806  {
808  victim = &mq->mq_receiver;
809  }
810 
811  if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
812  return SHM_MQ_SUCCESS;
813  else
814  return SHM_MQ_DETACHED;
815 }
PGPROC * MyProc
Definition: proc.c:68
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:251
PGPROC * mq_receiver
Definition: shm_mq.c:75
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:236
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:136
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1179
shm_mq * mqh_queue
Definition: shm_mq.c:134
#define Assert(condition)
Definition: c.h:804
Definition: shm_mq.c:72
Definition: proc.h:121
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_wait_internal()

static bool shm_mq_wait_internal ( shm_mq mq,
PGPROC **  ptr,
BackgroundWorkerHandle handle 
)
static

Definition at line 1179 of file shm_mq.c.

References BGWH_NOT_YET_STARTED, BGWH_STARTED, CHECK_FOR_INTERRUPTS, GetBackgroundWorkerPid(), shm_mq::mq_detached, shm_mq::mq_mutex, MyLatch, ResetLatch(), SpinLockAcquire, SpinLockRelease, status(), WAIT_EVENT_MQ_INTERNAL, WaitLatch(), WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by shm_mq_receive(), shm_mq_send_bytes(), and shm_mq_wait_for_attach().

1180 {
1181  bool result = false;
1182 
1183  for (;;)
1184  {
1186  pid_t pid;
1187 
1188  /* Acquire the lock just long enough to check the pointer. */
1189  SpinLockAcquire(&mq->mq_mutex);
1190  result = (*ptr != NULL);
1191  SpinLockRelease(&mq->mq_mutex);
1192 
1193  /* Fail if detached; else succeed if initialized. */
1194  if (mq->mq_detached)
1195  {
1196  result = false;
1197  break;
1198  }
1199  if (result)
1200  break;
1201 
1202  if (handle != NULL)
1203  {
1204  /* Check for unexpected worker death. */
1205  status = GetBackgroundWorkerPid(handle, &pid);
1206  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1207  {
1208  result = false;
1209  break;
1210  }
1211  }
1212 
1213  /* Wait to be signaled. */
1216 
1217  /* Reset the latch so we don't spin. */
1219 
1220  /* An interrupt may have occurred while we were waiting. */
1222  }
1223 
1224  return result;
1225 }
void ResetLatch(Latch *latch)
Definition: latch.c:660
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:452
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:74
BgwHandleStatus
Definition: bgworker.h:102
#define SpinLockRelease(lock)
Definition: spin.h:64
bool mq_detached
Definition: shm_mq.c:80
struct Latch * MyLatch
Definition: globals.c:57
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:227
#define WL_LATCH_SET
Definition: latch.h:125
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1085
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130

Variable Documentation

◆ shm_mq_minimum_size

const Size shm_mq_minimum_size
Initial value:
=
MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF
#define MAXALIGN(LEN)
Definition: c.h:757
Definition: shm_mq.c:72
#define offsetof(type, field)
Definition: c.h:727

Definition at line 162 of file shm_mq.c.

Referenced by setup_dynamic_shared_memory().