PostgreSQL Source Code  git master
shm_mq.c File Reference
#include "postgres.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "port/pg_bitutils.h"
#include "postmaster/bgworker.h"
#include "storage/procsignal.h"
#include "storage/shm_mq.h"
#include "storage/spin.h"
#include "utils/memutils.h"
Include dependency graph for shm_mq.c:

Go to the source code of this file.

Data Structures

struct  shm_mq
 
struct  shm_mq_handle
 

Macros

#define MQH_INITIAL_BUFSIZE   8192
 

Functions

static void shm_mq_detach_internal (shm_mq *mq)
 
static shm_mq_result shm_mq_send_bytes (shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
 
static shm_mq_result shm_mq_receive_bytes (shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
 
static bool shm_mq_counterparty_gone (shm_mq *mq, BackgroundWorkerHandle *handle)
 
static bool shm_mq_wait_internal (shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
 
static void shm_mq_inc_bytes_read (shm_mq *mq, Size n)
 
static void shm_mq_inc_bytes_written (shm_mq *mq, Size n)
 
static void shm_mq_detach_callback (dsm_segment *seg, Datum arg)
 
shm_mqshm_mq_create (void *address, Size size)
 
void shm_mq_set_receiver (shm_mq *mq, PGPROC *proc)
 
void shm_mq_set_sender (shm_mq *mq, PGPROC *proc)
 
PGPROCshm_mq_get_receiver (shm_mq *mq)
 
PGPROCshm_mq_get_sender (shm_mq *mq)
 
shm_mq_handleshm_mq_attach (shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 
void shm_mq_set_handle (shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
 
shm_mq_result shm_mq_send (shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
 
shm_mq_result shm_mq_sendv (shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool force_flush)
 
shm_mq_result shm_mq_receive (shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 
shm_mq_result shm_mq_wait_for_attach (shm_mq_handle *mqh)
 
void shm_mq_detach (shm_mq_handle *mqh)
 
shm_mqshm_mq_get_queue (shm_mq_handle *mqh)
 

Variables

const Size shm_mq_minimum_size
 

Macro Definition Documentation

◆ MQH_INITIAL_BUFSIZE

#define MQH_INITIAL_BUFSIZE   8192

Definition at line 172 of file shm_mq.c.

Referenced by shm_mq_receive().

Function Documentation

◆ shm_mq_attach()

shm_mq_handle* shm_mq_attach ( shm_mq mq,
dsm_segment seg,
BackgroundWorkerHandle handle 
)

Definition at line 291 of file shm_mq.c.

References Assert, CurrentMemoryContext, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_expected_bytes, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_segment, shm_mq_handle::mqh_send_pending, MyProc, on_dsm_detach(), palloc(), PointerGetDatum, and shm_mq_detach_callback().

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ParallelWorkerMain(), ReinitializeParallelDSM(), and test_shm_mq_setup().

292 {
293  shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
294 
295  Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
296  mqh->mqh_queue = mq;
297  mqh->mqh_segment = seg;
298  mqh->mqh_handle = handle;
299  mqh->mqh_buffer = NULL;
300  mqh->mqh_buflen = 0;
301  mqh->mqh_consume_pending = 0;
302  mqh->mqh_send_pending = 0;
303  mqh->mqh_partial_bytes = 0;
304  mqh->mqh_expected_bytes = 0;
305  mqh->mqh_length_word_complete = false;
306  mqh->mqh_counterparty_attached = false;
308 
309  if (seg != NULL)
311 
312  return mqh;
313 }
Size mqh_partial_bytes
Definition: shm_mq.c:147
PGPROC * MyProc
Definition: proc.c:68
#define PointerGetDatum(X)
Definition: postgres.h:600
PGPROC * mq_receiver
Definition: shm_mq.c:75
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:142
Size mqh_consume_pending
Definition: shm_mq.c:145
bool mqh_counterparty_attached
Definition: shm_mq.c:150
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1096
char * mqh_buffer
Definition: shm_mq.c:143
bool mqh_length_word_complete
Definition: shm_mq.c:149
Size mqh_buflen
Definition: shm_mq.c:144
dsm_segment * mqh_segment
Definition: shm_mq.c:141
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
Size mqh_expected_bytes
Definition: shm_mq.c:148
shm_mq * mqh_queue
Definition: shm_mq.c:140
Size mqh_send_pending
Definition: shm_mq.c:146
#define Assert(condition)
Definition: c.h:804
void * palloc(Size size)
Definition: mcxt.c:1062
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1322
MemoryContext mqh_context
Definition: shm_mq.c:151
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_counterparty_gone()

static bool shm_mq_counterparty_gone ( shm_mq mq,
BackgroundWorkerHandle handle 
)
static

Definition at line 1178 of file shm_mq.c.

References BGWH_NOT_YET_STARTED, BGWH_STARTED, GetBackgroundWorkerPid(), shm_mq::mq_detached, and status().

Referenced by shm_mq_receive(), and shm_mq_send_bytes().

1179 {
1180  pid_t pid;
1181 
1182  /* If the queue has been detached, counterparty is definitely gone. */
1183  if (mq->mq_detached)
1184  return true;
1185 
1186  /* If there's a handle, check worker status. */
1187  if (handle != NULL)
1188  {
1190 
1191  /* Check for unexpected worker death. */
1192  status = GetBackgroundWorkerPid(handle, &pid);
1193  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1194  {
1195  /* Mark it detached, just to make it official. */
1196  mq->mq_detached = true;
1197  return true;
1198  }
1199  }
1200 
1201  /* Counterparty is not definitively gone. */
1202  return false;
1203 }
BgwHandleStatus
Definition: bgworker.h:103
bool mq_detached
Definition: shm_mq.c:80
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:229
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1068

◆ shm_mq_create()

shm_mq* shm_mq_create ( void *  address,
Size  size 
)

Definition at line 178 of file shm_mq.c.

References Assert, MAXALIGN, MAXALIGN_DOWN, shm_mq::mq_bytes_read, shm_mq::mq_bytes_written, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_ring, shm_mq::mq_ring_offset, shm_mq::mq_ring_size, shm_mq::mq_sender, offsetof, pg_atomic_init_u64(), and SpinLockInit.

Referenced by ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

179 {
180  shm_mq *mq = address;
181  Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
182 
183  /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
184  size = MAXALIGN_DOWN(size);
185 
186  /* Queue size must be large enough to hold some data. */
187  Assert(size > data_offset);
188 
189  /* Initialize queue header. */
190  SpinLockInit(&mq->mq_mutex);
191  mq->mq_receiver = NULL;
192  mq->mq_sender = NULL;
195  mq->mq_ring_size = size - data_offset;
196  mq->mq_detached = false;
197  mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
198 
199  return mq;
200 }
#define SpinLockInit(lock)
Definition: spin.h:60
PGPROC * mq_receiver
Definition: shm_mq.c:75
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:415
slock_t mq_mutex
Definition: shm_mq.c:74
bool mq_detached
Definition: shm_mq.c:80
#define Assert(condition)
Definition: c.h:804
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:77
size_t Size
Definition: c.h:540
#define MAXALIGN(LEN)
Definition: c.h:757
Size mq_ring_size
Definition: shm_mq.c:79
Definition: shm_mq.c:72
uint8 mq_ring_offset
Definition: shm_mq.c:81
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:78
#define offsetof(type, field)
Definition: c.h:727
#define MAXALIGN_DOWN(LEN)
Definition: c.h:769
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_detach()

void shm_mq_detach ( shm_mq_handle mqh)

Definition at line 842 of file shm_mq.c.

References cancel_on_dsm_detach(), shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_segment, shm_mq_handle::mqh_send_pending, pfree(), PointerGetDatum, shm_mq_detach_callback(), shm_mq_detach_internal(), and shm_mq_inc_bytes_written().

Referenced by DestroyParallelContext(), ExecParallelFinish(), HandleParallelMessage(), LaunchParallelWorkers(), mq_putmessage(), tqueueDestroyReceiver(), and tqueueShutdownReceiver().

843 {
844  /* Before detaching, notify the receiver about any already-written data. */
845  if (mqh->mqh_send_pending > 0)
846  {
848  mqh->mqh_send_pending = 0;
849  }
850 
851  /* Notify counterparty that we're outta here. */
853 
854  /* Cancel on_dsm_detach callback, if any. */
855  if (mqh->mqh_segment)
858  PointerGetDatum(mqh->mqh_queue));
859 
860  /* Release local memory associated with handle. */
861  if (mqh->mqh_buffer != NULL)
862  pfree(mqh->mqh_buffer);
863  pfree(mqh);
864 }
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
Definition: shm_mq.c:1302
#define PointerGetDatum(X)
Definition: postgres.h:600
char * mqh_buffer
Definition: shm_mq.c:143
void pfree(void *pointer)
Definition: mcxt.c:1169
dsm_segment * mqh_segment
Definition: shm_mq.c:141
static void shm_mq_detach_internal(shm_mq *mq)
Definition: shm_mq.c:881
shm_mq * mqh_queue
Definition: shm_mq.c:140
Size mqh_send_pending
Definition: shm_mq.c:146
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
Definition: shm_mq.c:1322
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1111

◆ shm_mq_detach_callback()

static void shm_mq_detach_callback ( dsm_segment seg,
Datum  arg 
)
static

Definition at line 1322 of file shm_mq.c.

References DatumGetPointer, and shm_mq_detach_internal().

Referenced by shm_mq_attach(), and shm_mq_detach().

1323 {
1324  shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1325 
1327 }
static void shm_mq_detach_internal(shm_mq *mq)
Definition: shm_mq.c:881
#define DatumGetPointer(X)
Definition: postgres.h:593
Definition: shm_mq.c:72
void * arg

◆ shm_mq_detach_internal()

static void shm_mq_detach_internal ( shm_mq mq)
static

Definition at line 881 of file shm_mq.c.

References Assert, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, MyProc, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_detach(), and shm_mq_detach_callback().

882 {
883  PGPROC *victim;
884 
886  if (mq->mq_sender == MyProc)
887  victim = mq->mq_receiver;
888  else
889  {
890  Assert(mq->mq_receiver == MyProc);
891  victim = mq->mq_sender;
892  }
893  mq->mq_detached = true;
895 
896  if (victim != NULL)
897  SetLatch(&victim->procLatch);
898 }
PGPROC * MyProc
Definition: proc.c:68
PGPROC * mq_receiver
Definition: shm_mq.c:75
void SetLatch(Latch *latch)
Definition: latch.c:567
Latch procLatch
Definition: proc.h:130
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:74
#define SpinLockRelease(lock)
Definition: spin.h:64
bool mq_detached
Definition: shm_mq.c:80
#define Assert(condition)
Definition: c.h:804
Definition: proc.h:121
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_get_queue()

shm_mq* shm_mq_get_queue ( shm_mq_handle mqh)

Definition at line 904 of file shm_mq.c.

References shm_mq_handle::mqh_queue.

Referenced by WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

905 {
906  return mqh->mqh_queue;
907 }
shm_mq * mqh_queue
Definition: shm_mq.c:140

◆ shm_mq_get_receiver()

PGPROC* shm_mq_get_receiver ( shm_mq mq)

Definition at line 243 of file shm_mq.c.

References shm_mq::mq_mutex, shm_mq::mq_receiver, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_send_bytes(), and shm_mq_wait_for_attach().

244 {
245  PGPROC *receiver;
246 
248  receiver = mq->mq_receiver;
250 
251  return receiver;
252 }
PGPROC * mq_receiver
Definition: shm_mq.c:75
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:74
#define SpinLockRelease(lock)
Definition: spin.h:64
Definition: proc.h:121

◆ shm_mq_get_sender()

PGPROC* shm_mq_get_sender ( shm_mq mq)

Definition at line 258 of file shm_mq.c.

References shm_mq::mq_mutex, shm_mq::mq_sender, SpinLockAcquire, and SpinLockRelease.

Referenced by shm_mq_receive(), shm_mq_wait_for_attach(), WaitForParallelWorkersToAttach(), and WaitForParallelWorkersToFinish().

259 {
260  PGPROC *sender;
261 
263  sender = mq->mq_sender;
265 
266  return sender;
267 }
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:74
#define SpinLockRelease(lock)
Definition: spin.h:64
Definition: proc.h:121
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_inc_bytes_read()

static void shm_mq_inc_bytes_read ( shm_mq mq,
Size  n 
)
static

Definition at line 1269 of file shm_mq.c.

References Assert, shm_mq::mq_bytes_read, shm_mq::mq_sender, pg_atomic_read_u64(), pg_atomic_write_u64(), pg_read_barrier, PGPROC::procLatch, and SetLatch().

Referenced by shm_mq_receive(), and shm_mq_receive_bytes().

1270 {
1271  PGPROC *sender;
1272 
1273  /*
1274  * Separate prior reads of mq_ring from the increment of mq_bytes_read
1275  * which follows. This pairs with the full barrier in
1276  * shm_mq_send_bytes(). We only need a read barrier here because the
1277  * increment of mq_bytes_read is actually a read followed by a dependent
1278  * write.
1279  */
1280  pg_read_barrier();
1281 
1282  /*
1283  * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1284  * else can be changing this value. This method should be cheaper.
1285  */
1287  pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1288 
1289  /*
1290  * We shouldn't have any bytes to read without a sender, so we can read
1291  * mq_sender here without a lock. Once it's initialized, it can't change.
1292  */
1293  sender = mq->mq_sender;
1294  Assert(sender != NULL);
1295  SetLatch(&sender->procLatch);
1296 }
void SetLatch(Latch *latch)
Definition: latch.c:567
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
Latch procLatch
Definition: proc.h:130
#define Assert(condition)
Definition: c.h:804
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:77
#define pg_read_barrier()
Definition: atomics.h:158
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
Definition: proc.h:121
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_inc_bytes_written()

static void shm_mq_inc_bytes_written ( shm_mq mq,
Size  n 
)
static

Definition at line 1302 of file shm_mq.c.

References shm_mq::mq_bytes_written, pg_atomic_read_u64(), pg_atomic_write_u64(), and pg_write_barrier.

Referenced by shm_mq_detach(), shm_mq_send_bytes(), and shm_mq_sendv().

1303 {
1304  /*
1305  * Separate prior reads of mq_ring from the write of mq_bytes_written
1306  * which we're about to do. Pairs with the read barrier found in
1307  * shm_mq_receive_bytes.
1308  */
1309  pg_write_barrier();
1310 
1311  /*
1312  * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1313  * else can be changing this value. This method avoids taking the bus
1314  * lock unnecessarily.
1315  */
1318 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
#define pg_write_barrier()
Definition: atomics.h:159
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:78

◆ shm_mq_receive()

shm_mq_result shm_mq_receive ( shm_mq_handle mqh,
Size nbytesp,
void **  datap,
bool  nowait 
)

Definition at line 574 of file shm_mq.c.

References Assert, ereport, errcode(), errmsg(), ERROR, MAXALIGN, MaxAllocSize, MemoryContextAlloc(), Min, shm_mq::mq_detached, shm_mq::mq_receiver, shm_mq::mq_ring_size, shm_mq::mq_sender, shm_mq_handle::mqh_buffer, shm_mq_handle::mqh_buflen, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_context, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_expected_bytes, shm_mq_handle::mqh_handle, MQH_INITIAL_BUFSIZE, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, MyProc, pfree(), pg_nextpower2_size_t, shm_mq_counterparty_gone(), SHM_MQ_DETACHED, shm_mq_get_sender(), shm_mq_inc_bytes_read(), shm_mq_receive_bytes(), SHM_MQ_SUCCESS, shm_mq_wait_internal(), and SHM_MQ_WOULD_BLOCK.

Referenced by copy_messages(), HandleParallelMessages(), test_shm_mq(), test_shm_mq_pipelined(), and TupleQueueReaderNext().

575 {
576  shm_mq *mq = mqh->mqh_queue;
577  shm_mq_result res;
578  Size rb = 0;
579  Size nbytes;
580  void *rawdata;
581 
582  Assert(mq->mq_receiver == MyProc);
583 
584  /* We can't receive data until the sender has attached. */
585  if (!mqh->mqh_counterparty_attached)
586  {
587  if (nowait)
588  {
589  int counterparty_gone;
590 
591  /*
592  * We shouldn't return at this point at all unless the sender
593  * hasn't attached yet. However, the correct return value depends
594  * on whether the sender is still attached. If we first test
595  * whether the sender has ever attached and then test whether the
596  * sender has detached, there's a race condition: a sender that
597  * attaches and detaches very quickly might fool us into thinking
598  * the sender never attached at all. So, test whether our
599  * counterparty is definitively gone first, and only afterwards
600  * check whether the sender ever attached in the first place.
601  */
602  counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
603  if (shm_mq_get_sender(mq) == NULL)
604  {
605  if (counterparty_gone)
606  return SHM_MQ_DETACHED;
607  else
608  return SHM_MQ_WOULD_BLOCK;
609  }
610  }
611  else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
612  && shm_mq_get_sender(mq) == NULL)
613  {
614  mq->mq_detached = true;
615  return SHM_MQ_DETACHED;
616  }
617  mqh->mqh_counterparty_attached = true;
618  }
619 
620  /*
621  * If we've consumed an amount of data greater than 1/4th of the ring
622  * size, mark it consumed in shared memory. We try to avoid doing this
623  * unnecessarily when only a small amount of data has been consumed,
624  * because SetLatch() is fairly expensive and we don't want to do it too
625  * often.
626  */
627  if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
628  {
630  mqh->mqh_consume_pending = 0;
631  }
632 
633  /* Try to read, or finish reading, the length word from the buffer. */
634  while (!mqh->mqh_length_word_complete)
635  {
636  /* Try to receive the message length word. */
637  Assert(mqh->mqh_partial_bytes < sizeof(Size));
638  res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
639  nowait, &rb, &rawdata);
640  if (res != SHM_MQ_SUCCESS)
641  return res;
642 
643  /*
644  * Hopefully, we'll receive the entire message length word at once.
645  * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
646  * multiple reads.
647  */
648  if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
649  {
650  Size needed;
651 
652  nbytes = *(Size *) rawdata;
653 
654  /* If we've already got the whole message, we're done. */
655  needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
656  if (rb >= needed)
657  {
658  mqh->mqh_consume_pending += needed;
659  *nbytesp = nbytes;
660  *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
661  return SHM_MQ_SUCCESS;
662  }
663 
664  /*
665  * We don't have the whole message, but we at least have the whole
666  * length word.
667  */
668  mqh->mqh_expected_bytes = nbytes;
669  mqh->mqh_length_word_complete = true;
670  mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
671  rb -= MAXALIGN(sizeof(Size));
672  }
673  else
674  {
675  Size lengthbytes;
676 
677  /* Can't be split unless bigger than required alignment. */
678  Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
679 
680  /* Message word is split; need buffer to reassemble. */
681  if (mqh->mqh_buffer == NULL)
682  {
686  }
687  Assert(mqh->mqh_buflen >= sizeof(Size));
688 
689  /* Copy partial length word; remember to consume it. */
690  if (mqh->mqh_partial_bytes + rb > sizeof(Size))
691  lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
692  else
693  lengthbytes = rb;
694  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
695  lengthbytes);
696  mqh->mqh_partial_bytes += lengthbytes;
697  mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
698  rb -= lengthbytes;
699 
700  /* If we now have the whole word, we're ready to read payload. */
701  if (mqh->mqh_partial_bytes >= sizeof(Size))
702  {
703  Assert(mqh->mqh_partial_bytes == sizeof(Size));
704  mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
705  mqh->mqh_length_word_complete = true;
706  mqh->mqh_partial_bytes = 0;
707  }
708  }
709  }
710  nbytes = mqh->mqh_expected_bytes;
711 
712  /*
713  * Should be disallowed on the sending side already, but better check and
714  * error out on the receiver side as well rather than trying to read a
715  * prohibitively large message.
716  */
717  if (nbytes > MaxAllocSize)
718  ereport(ERROR,
719  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
720  errmsg("invalid message size %zu in shared memory queue",
721  nbytes)));
722 
723  if (mqh->mqh_partial_bytes == 0)
724  {
725  /*
726  * Try to obtain the whole message in a single chunk. If this works,
727  * we need not copy the data and can return a pointer directly into
728  * shared memory.
729  */
730  res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
731  if (res != SHM_MQ_SUCCESS)
732  return res;
733  if (rb >= nbytes)
734  {
735  mqh->mqh_length_word_complete = false;
736  mqh->mqh_consume_pending += MAXALIGN(nbytes);
737  *nbytesp = nbytes;
738  *datap = rawdata;
739  return SHM_MQ_SUCCESS;
740  }
741 
742  /*
743  * The message has wrapped the buffer. We'll need to copy it in order
744  * to return it to the client in one chunk. First, make sure we have
745  * a large enough buffer available.
746  */
747  if (mqh->mqh_buflen < nbytes)
748  {
749  Size newbuflen;
750 
751  /*
752  * Increase size to the next power of 2 that's >= nbytes, but
753  * limit to MaxAllocSize.
754  */
755  newbuflen = pg_nextpower2_size_t(nbytes);
756  newbuflen = Min(newbuflen, MaxAllocSize);
757 
758  if (mqh->mqh_buffer != NULL)
759  {
760  pfree(mqh->mqh_buffer);
761  mqh->mqh_buffer = NULL;
762  mqh->mqh_buflen = 0;
763  }
764  mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
765  mqh->mqh_buflen = newbuflen;
766  }
767  }
768 
769  /* Loop until we've copied the entire message. */
770  for (;;)
771  {
772  Size still_needed;
773 
774  /* Copy as much as we can. */
775  Assert(mqh->mqh_partial_bytes + rb <= nbytes);
776  memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
777  mqh->mqh_partial_bytes += rb;
778 
779  /*
780  * Update count of bytes that can be consumed, accounting for
781  * alignment padding. Note that this will never actually insert any
782  * padding except at the end of a message, because the buffer size is
783  * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
784  */
785  Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
786  mqh->mqh_consume_pending += MAXALIGN(rb);
787 
788  /* If we got all the data, exit the loop. */
789  if (mqh->mqh_partial_bytes >= nbytes)
790  break;
791 
792  /* Wait for some more data. */
793  still_needed = nbytes - mqh->mqh_partial_bytes;
794  res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
795  if (res != SHM_MQ_SUCCESS)
796  return res;
797  if (rb > still_needed)
798  rb = still_needed;
799  }
800 
801  /* Return the complete message, and reset for next message. */
802  *nbytesp = nbytes;
803  *datap = mqh->mqh_buffer;
804  mqh->mqh_length_word_complete = false;
805  mqh->mqh_partial_bytes = 0;
806  return SHM_MQ_SUCCESS;
807 }
Size mqh_partial_bytes
Definition: shm_mq.c:147
#define pg_nextpower2_size_t(num)
Definition: pg_bitutils.h:191
PGPROC * MyProc
Definition: proc.c:68
#define Min(x, y)
Definition: c.h:986
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:258
int errcode(int sqlerrcode)
Definition: elog.c:698
PGPROC * mq_receiver
Definition: shm_mq.c:75
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:142
Size mqh_consume_pending
Definition: shm_mq.c:145
bool mqh_counterparty_attached
Definition: shm_mq.c:150
char * mqh_buffer
Definition: shm_mq.c:143
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1217
bool mqh_length_word_complete
Definition: shm_mq.c:149
void pfree(void *pointer)
Definition: mcxt.c:1169
#define ERROR
Definition: elog.h:46
#define MQH_INITIAL_BUFSIZE
Definition: shm_mq.c:172
Size mqh_buflen
Definition: shm_mq.c:144
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
Definition: shm_mq.c:1078
Size mqh_expected_bytes
Definition: shm_mq.c:148
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
Definition: shm_mq.c:1269
#define MaxAllocSize
Definition: memutils.h:40
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1178
shm_mq * mqh_queue
Definition: shm_mq.c:140
bool mq_detached
Definition: shm_mq.c:80
#define ereport(elevel,...)
Definition: elog.h:157
shm_mq_result
Definition: shm_mq.h:36
#define Assert(condition)
Definition: c.h:804
size_t Size
Definition: c.h:540
#define MAXALIGN(LEN)
Definition: c.h:757
Size mq_ring_size
Definition: shm_mq.c:79
int errmsg(const char *fmt,...)
Definition: elog.c:909
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
Definition: shm_mq.c:72
MemoryContext mqh_context
Definition: shm_mq.c:151
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_receive_bytes()

static shm_mq_result shm_mq_receive_bytes ( shm_mq_handle mqh,
Size  bytes_needed,
bool  nowait,
Size nbytesp,
void **  datap 
)
static

Definition at line 1078 of file shm_mq.c.

References Assert, CHECK_FOR_INTERRUPTS, Min, shm_mq::mq_bytes_read, shm_mq::mq_bytes_written, shm_mq::mq_detached, shm_mq::mq_ring, shm_mq::mq_ring_offset, shm_mq::mq_ring_size, shm_mq_handle::mqh_consume_pending, shm_mq_handle::mqh_queue, MyLatch, pg_atomic_read_u64(), pg_read_barrier, read, ResetLatch(), SHM_MQ_DETACHED, shm_mq_inc_bytes_read(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, WAIT_EVENT_MQ_RECEIVE, WaitLatch(), WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by shm_mq_receive().

1080 {
1081  shm_mq *mq = mqh->mqh_queue;
1082  Size ringsize = mq->mq_ring_size;
1083  uint64 used;
1084  uint64 written;
1085 
1086  for (;;)
1087  {
1088  Size offset;
1089  uint64 read;
1090 
1091  /* Get bytes written, so we can compute what's available to read. */
1092  written = pg_atomic_read_u64(&mq->mq_bytes_written);
1093 
1094  /*
1095  * Get bytes read. Include bytes we could consume but have not yet
1096  * consumed.
1097  */
1098  read = pg_atomic_read_u64(&mq->mq_bytes_read) +
1099  mqh->mqh_consume_pending;
1100  used = written - read;
1101  Assert(used <= ringsize);
1102  offset = read % (uint64) ringsize;
1103 
1104  /* If we have enough data or buffer has wrapped, we're done. */
1105  if (used >= bytes_needed || offset + used >= ringsize)
1106  {
1107  *nbytesp = Min(used, ringsize - offset);
1108  *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1109 
1110  /*
1111  * Separate the read of mq_bytes_written, above, from caller's
1112  * attempt to read the data itself. Pairs with the barrier in
1113  * shm_mq_inc_bytes_written.
1114  */
1115  pg_read_barrier();
1116  return SHM_MQ_SUCCESS;
1117  }
1118 
1119  /*
1120  * Fall out before waiting if the queue has been detached.
1121  *
1122  * Note that we don't check for this until *after* considering whether
1123  * the data already available is enough, since the receiver can finish
1124  * receiving a message stored in the buffer even after the sender has
1125  * detached.
1126  */
1127  if (mq->mq_detached)
1128  {
1129  /*
1130  * If the writer advanced mq_bytes_written and then set
1131  * mq_detached, we might not have read the final value of
1132  * mq_bytes_written above. Insert a read barrier and then check
1133  * again if mq_bytes_written has advanced.
1134  */
1135  pg_read_barrier();
1136  if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
1137  continue;
1138 
1139  return SHM_MQ_DETACHED;
1140  }
1141 
1142  /*
1143  * We didn't get enough data to satisfy the request, so mark any data
1144  * previously-consumed as read to make more buffer space.
1145  */
1146  if (mqh->mqh_consume_pending > 0)
1147  {
1149  mqh->mqh_consume_pending = 0;
1150  }
1151 
1152  /* Skip manipulation of our latch if nowait = true. */
1153  if (nowait)
1154  return SHM_MQ_WOULD_BLOCK;
1155 
1156  /*
1157  * Wait for our latch to be set. It might already be set for some
1158  * unrelated reason, but that'll just result in one extra trip through
1159  * the loop. It's worth it to avoid resetting the latch at top of
1160  * loop, because setting an already-set latch is much cheaper than
1161  * setting one that has been reset.
1162  */
1165 
1166  /* Reset the latch so we don't spin. */
1168 
1169  /* An interrupt may have occurred while we were waiting. */
1171  }
1172 }
#define Min(x, y)
Definition: c.h:986
Size mqh_consume_pending
Definition: shm_mq.c:145
void ResetLatch(Latch *latch)
Definition: latch.c:660
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:452
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
Definition: shm_mq.c:1269
shm_mq * mqh_queue
Definition: shm_mq.c:140
bool mq_detached
Definition: shm_mq.c:80
#define Assert(condition)
Definition: c.h:804
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:77
#define pg_read_barrier()
Definition: atomics.h:158
size_t Size
Definition: c.h:540
Size mq_ring_size
Definition: shm_mq.c:79
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
Definition: shm_mq.c:82
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
Definition: shm_mq.c:72
struct Latch * MyLatch
Definition: globals.c:57
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
uint8 mq_ring_offset
Definition: shm_mq.c:81
#define WL_LATCH_SET
Definition: latch.h:125
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:78
#define read(a, b, c)
Definition: win32.h:13
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130

◆ shm_mq_send()

shm_mq_result shm_mq_send ( shm_mq_handle mqh,
Size  nbytes,
const void *  data,
bool  nowait,
bool  force_flush 
)

Definition at line 330 of file shm_mq.c.

References shm_mq_iovec::data, shm_mq_iovec::len, and shm_mq_sendv().

Referenced by copy_messages(), test_shm_mq(), test_shm_mq_pipelined(), and tqueueReceiveSlot().

332 {
333  shm_mq_iovec iov;
334 
335  iov.data = data;
336  iov.len = nbytes;
337 
338  return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
339 }
Size len
Definition: shm_mq.h:32
const char * data
Definition: shm_mq.h:31
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool force_flush)
Definition: shm_mq.c:362

◆ shm_mq_send_bytes()

static shm_mq_result shm_mq_send_bytes ( shm_mq_handle mqh,
Size  nbytes,
const void *  data,
bool  nowait,
Size bytes_written 
)
static

Definition at line 913 of file shm_mq.c.

References Assert, CHECK_FOR_INTERRUPTS, MAXALIGN, Min, shm_mq::mq_bytes_read, shm_mq::mq_bytes_written, shm_mq::mq_detached, shm_mq::mq_receiver, shm_mq::mq_ring, shm_mq::mq_ring_offset, shm_mq::mq_ring_size, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_send_pending, MyLatch, pg_atomic_read_u64(), pg_compiler_barrier, pg_memory_barrier, PGPROC::procLatch, ResetLatch(), SetLatch(), shm_mq_counterparty_gone(), SHM_MQ_DETACHED, shm_mq_get_receiver(), shm_mq_inc_bytes_written(), SHM_MQ_SUCCESS, shm_mq_wait_internal(), SHM_MQ_WOULD_BLOCK, WAIT_EVENT_MQ_SEND, WaitLatch(), WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by shm_mq_sendv().

915 {
916  shm_mq *mq = mqh->mqh_queue;
917  Size sent = 0;
918  uint64 used;
919  Size ringsize = mq->mq_ring_size;
920  Size available;
921 
922  while (sent < nbytes)
923  {
924  uint64 rb;
925  uint64 wb;
926 
927  /* Compute number of ring buffer bytes used and available. */
930  Assert(wb >= rb);
931  used = wb - rb;
932  Assert(used <= ringsize);
933  available = Min(ringsize - used, nbytes - sent);
934 
935  /*
936  * Bail out if the queue has been detached. Note that we would be in
937  * trouble if the compiler decided to cache the value of
938  * mq->mq_detached in a register or on the stack across loop
939  * iterations. It probably shouldn't do that anyway since we'll
940  * always return, call an external function that performs a system
941  * call, or reach a memory barrier at some point later in the loop,
942  * but just to be sure, insert a compiler barrier here.
943  */
945  if (mq->mq_detached)
946  {
947  *bytes_written = sent;
948  return SHM_MQ_DETACHED;
949  }
950 
951  if (available == 0 && !mqh->mqh_counterparty_attached)
952  {
953  /*
954  * The queue is full, so if the receiver isn't yet known to be
955  * attached, we must wait for that to happen.
956  */
957  if (nowait)
958  {
959  if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
960  {
961  *bytes_written = sent;
962  return SHM_MQ_DETACHED;
963  }
964  if (shm_mq_get_receiver(mq) == NULL)
965  {
966  *bytes_written = sent;
967  return SHM_MQ_WOULD_BLOCK;
968  }
969  }
970  else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
971  mqh->mqh_handle))
972  {
973  mq->mq_detached = true;
974  *bytes_written = sent;
975  return SHM_MQ_DETACHED;
976  }
977  mqh->mqh_counterparty_attached = true;
978 
979  /*
980  * The receiver may have read some data after attaching, so we
981  * must not wait without rechecking the queue state.
982  */
983  }
984  else if (available == 0)
985  {
986  /* Update the pending send bytes in the shared memory. */
988 
989  /*
990  * Since mq->mqh_counterparty_attached is known to be true at this
991  * point, mq_receiver has been set, and it can't change once set.
992  * Therefore, we can read it without acquiring the spinlock.
993  */
996 
997  /*
998  * We have just updated the mqh_send_pending bytes in the shared
999  * memory so reset it.
1000  */
1001  mqh->mqh_send_pending = 0;
1002 
1003  /* Skip manipulation of our latch if nowait = true. */
1004  if (nowait)
1005  {
1006  *bytes_written = sent;
1007  return SHM_MQ_WOULD_BLOCK;
1008  }
1009 
1010  /*
1011  * Wait for our latch to be set. It might already be set for some
1012  * unrelated reason, but that'll just result in one extra trip
1013  * through the loop. It's worth it to avoid resetting the latch
1014  * at top of loop, because setting an already-set latch is much
1015  * cheaper than setting one that has been reset.
1016  */
1019 
1020  /* Reset the latch so we don't spin. */
1022 
1023  /* An interrupt may have occurred while we were waiting. */
1025  }
1026  else
1027  {
1028  Size offset;
1029  Size sendnow;
1030 
1031  offset = wb % (uint64) ringsize;
1032  sendnow = Min(available, ringsize - offset);
1033 
1034  /*
1035  * Write as much data as we can via a single memcpy(). Make sure
1036  * these writes happen after the read of mq_bytes_read, above.
1037  * This barrier pairs with the one in shm_mq_inc_bytes_read.
1038  * (Since we're separating the read of mq_bytes_read from a
1039  * subsequent write to mq_ring, we need a full barrier here.)
1040  */
1042  memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
1043  (char *) data + sent, sendnow);
1044  sent += sendnow;
1045 
1046  /*
1047  * Update count of bytes written, with alignment padding. Note
1048  * that this will never actually insert any padding except at the
1049  * end of a run of bytes, because the buffer size is a multiple of
1050  * MAXIMUM_ALIGNOF, and each read is as well.
1051  */
1052  Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
1053 
1054  /*
1055  * For efficiency, we don't update the bytes written in the shared
1056  * memory and also don't set the reader's latch here. Refer to
1057  * the comments atop the shm_mq_handle structure for more
1058  * information.
1059  */
1060  mqh->mqh_send_pending += MAXALIGN(sendnow);
1061  }
1062  }
1063 
1064  *bytes_written = sent;
1065  return SHM_MQ_SUCCESS;
1066 }
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
Definition: shm_mq.c:1302
#define Min(x, y)
Definition: c.h:986
PGPROC * mq_receiver
Definition: shm_mq.c:75
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:243
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:142
void SetLatch(Latch *latch)
Definition: latch.c:567
bool mqh_counterparty_attached
Definition: shm_mq.c:150
void ResetLatch(Latch *latch)
Definition: latch.c:660
Latch procLatch
Definition: proc.h:130
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:452
#define pg_compiler_barrier()
Definition: atomics.h:133
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1217
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1178
shm_mq * mqh_queue
Definition: shm_mq.c:140
Size mqh_send_pending
Definition: shm_mq.c:146
bool mq_detached
Definition: shm_mq.c:80
#define pg_memory_barrier()
Definition: atomics.h:145
#define Assert(condition)
Definition: c.h:804
pg_atomic_uint64 mq_bytes_read
Definition: shm_mq.c:77
size_t Size
Definition: c.h:540
#define MAXALIGN(LEN)
Definition: c.h:757
Size mq_ring_size
Definition: shm_mq.c:79
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
Definition: shm_mq.c:82
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
Definition: shm_mq.c:72
struct Latch * MyLatch
Definition: globals.c:57
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
uint8 mq_ring_offset
Definition: shm_mq.c:81
#define WL_LATCH_SET
Definition: latch.h:125
pg_atomic_uint64 mq_bytes_written
Definition: shm_mq.c:78
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130

◆ shm_mq_sendv()

shm_mq_result shm_mq_sendv ( shm_mq_handle mqh,
shm_mq_iovec iov,
int  iovcnt,
bool  nowait,
bool  force_flush 
)

Definition at line 362 of file shm_mq.c.

References Assert, shm_mq_iovec::data, ereport, errcode(), errmsg(), ERROR, i, shm_mq_iovec::len, MAXALIGN_DOWN, MaxAllocSize, shm_mq::mq_detached, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_ring_size, shm_mq::mq_sender, shm_mq_handle::mqh_counterparty_attached, shm_mq_handle::mqh_length_word_complete, shm_mq_handle::mqh_partial_bytes, shm_mq_handle::mqh_queue, shm_mq_handle::mqh_send_pending, MyProc, PGPROC::procLatch, SetLatch(), SHM_MQ_DETACHED, shm_mq_inc_bytes_written(), shm_mq_send_bytes(), SHM_MQ_SUCCESS, SpinLockAcquire, SpinLockRelease, and tmpbuf.

Referenced by mq_putmessage(), and shm_mq_send().

364 {
365  shm_mq_result res;
366  shm_mq *mq = mqh->mqh_queue;
367  PGPROC *receiver;
368  Size nbytes = 0;
369  Size bytes_written;
370  int i;
371  int which_iov = 0;
372  Size offset;
373 
374  Assert(mq->mq_sender == MyProc);
375 
376  /* Compute total size of write. */
377  for (i = 0; i < iovcnt; ++i)
378  nbytes += iov[i].len;
379 
380  /* Prevent writing messages overwhelming the receiver. */
381  if (nbytes > MaxAllocSize)
382  ereport(ERROR,
383  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
384  errmsg("cannot send a message of size %zu via shared memory queue",
385  nbytes)));
386 
387  /* Try to write, or finish writing, the length word into the buffer. */
388  while (!mqh->mqh_length_word_complete)
389  {
390  Assert(mqh->mqh_partial_bytes < sizeof(Size));
391  res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
392  ((char *) &nbytes) + mqh->mqh_partial_bytes,
393  nowait, &bytes_written);
394 
395  if (res == SHM_MQ_DETACHED)
396  {
397  /* Reset state in case caller tries to send another message. */
398  mqh->mqh_partial_bytes = 0;
399  mqh->mqh_length_word_complete = false;
400  return res;
401  }
402  mqh->mqh_partial_bytes += bytes_written;
403 
404  if (mqh->mqh_partial_bytes >= sizeof(Size))
405  {
406  Assert(mqh->mqh_partial_bytes == sizeof(Size));
407 
408  mqh->mqh_partial_bytes = 0;
409  mqh->mqh_length_word_complete = true;
410  }
411 
412  if (res != SHM_MQ_SUCCESS)
413  return res;
414 
415  /* Length word can't be split unless bigger than required alignment. */
416  Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
417  }
418 
419  /* Write the actual data bytes into the buffer. */
420  Assert(mqh->mqh_partial_bytes <= nbytes);
421  offset = mqh->mqh_partial_bytes;
422  do
423  {
424  Size chunksize;
425 
426  /* Figure out which bytes need to be sent next. */
427  if (offset >= iov[which_iov].len)
428  {
429  offset -= iov[which_iov].len;
430  ++which_iov;
431  if (which_iov >= iovcnt)
432  break;
433  continue;
434  }
435 
436  /*
437  * We want to avoid copying the data if at all possible, but every
438  * chunk of bytes we write into the queue has to be MAXALIGN'd, except
439  * the last. Thus, if a chunk other than the last one ends on a
440  * non-MAXALIGN'd boundary, we have to combine the tail end of its
441  * data with data from one or more following chunks until we either
442  * reach the last chunk or accumulate a number of bytes which is
443  * MAXALIGN'd.
444  */
445  if (which_iov + 1 < iovcnt &&
446  offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
447  {
448  char tmpbuf[MAXIMUM_ALIGNOF];
449  int j = 0;
450 
451  for (;;)
452  {
453  if (offset < iov[which_iov].len)
454  {
455  tmpbuf[j] = iov[which_iov].data[offset];
456  j++;
457  offset++;
458  if (j == MAXIMUM_ALIGNOF)
459  break;
460  }
461  else
462  {
463  offset -= iov[which_iov].len;
464  which_iov++;
465  if (which_iov >= iovcnt)
466  break;
467  }
468  }
469 
470  res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
471 
472  if (res == SHM_MQ_DETACHED)
473  {
474  /* Reset state in case caller tries to send another message. */
475  mqh->mqh_partial_bytes = 0;
476  mqh->mqh_length_word_complete = false;
477  return res;
478  }
479 
480  mqh->mqh_partial_bytes += bytes_written;
481  if (res != SHM_MQ_SUCCESS)
482  return res;
483  continue;
484  }
485 
486  /*
487  * If this is the last chunk, we can write all the data, even if it
488  * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
489  * MAXALIGN_DOWN the write size.
490  */
491  chunksize = iov[which_iov].len - offset;
492  if (which_iov + 1 < iovcnt)
493  chunksize = MAXALIGN_DOWN(chunksize);
494  res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
495  nowait, &bytes_written);
496 
497  if (res == SHM_MQ_DETACHED)
498  {
499  /* Reset state in case caller tries to send another message. */
500  mqh->mqh_length_word_complete = false;
501  mqh->mqh_partial_bytes = 0;
502  return res;
503  }
504 
505  mqh->mqh_partial_bytes += bytes_written;
506  offset += bytes_written;
507  if (res != SHM_MQ_SUCCESS)
508  return res;
509  } while (mqh->mqh_partial_bytes < nbytes);
510 
511  /* Reset for next message. */
512  mqh->mqh_partial_bytes = 0;
513  mqh->mqh_length_word_complete = false;
514 
515  /* If queue has been detached, let caller know. */
516  if (mq->mq_detached)
517  return SHM_MQ_DETACHED;
518 
519  /*
520  * If the counterparty is known to have attached, we can read mq_receiver
521  * without acquiring the spinlock and assume it isn't NULL. Otherwise,
522  * more caution is needed.
523  */
524  if (mqh->mqh_counterparty_attached)
525  receiver = mq->mq_receiver;
526  else
527  {
529  receiver = mq->mq_receiver;
531  if (receiver == NULL)
532  return SHM_MQ_SUCCESS;
533  mqh->mqh_counterparty_attached = true;
534  }
535 
536  /*
537  * If the caller has requested force flush or we have written more than 1/4
538  * of the ring size, mark it as written in shared memory and notify the
539  * receiver.
540  */
541  if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
542  {
544  SetLatch(&receiver->procLatch);
545  mqh->mqh_send_pending = 0;
546  }
547 
548  return SHM_MQ_SUCCESS;
549 }
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
Definition: shm_mq.c:1302
Size mqh_partial_bytes
Definition: shm_mq.c:147
PGPROC * MyProc
Definition: proc.c:68
int errcode(int sqlerrcode)
Definition: elog.c:698
PGPROC * mq_receiver
Definition: shm_mq.c:75
void SetLatch(Latch *latch)
Definition: latch.c:567
bool mqh_counterparty_attached
Definition: shm_mq.c:150
Latch procLatch
Definition: proc.h:130
Size len
Definition: shm_mq.h:32
#define SpinLockAcquire(lock)
Definition: spin.h:62
bool mqh_length_word_complete
Definition: shm_mq.c:149
const char * data
Definition: shm_mq.h:31
#define ERROR
Definition: elog.h:46
slock_t mq_mutex
Definition: shm_mq.c:74
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
Definition: shm_mq.c:913
#define MaxAllocSize
Definition: memutils.h:40
#define SpinLockRelease(lock)
Definition: spin.h:64
shm_mq * mqh_queue
Definition: shm_mq.c:140
Size mqh_send_pending
Definition: shm_mq.c:146
bool mq_detached
Definition: shm_mq.c:80
#define ereport(elevel,...)
Definition: elog.h:157
shm_mq_result
Definition: shm_mq.h:36
#define Assert(condition)
Definition: c.h:804
size_t Size
Definition: c.h:540
Size mq_ring_size
Definition: shm_mq.c:79
static StringInfoData tmpbuf
Definition: walsender.c:159
int errmsg(const char *fmt,...)
Definition: elog.c:909
int i
Definition: shm_mq.c:72
Definition: proc.h:121
#define MAXALIGN_DOWN(LEN)
Definition: c.h:769
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_set_handle()

void shm_mq_set_handle ( shm_mq_handle mqh,
BackgroundWorkerHandle handle 
)

Definition at line 320 of file shm_mq.c.

References Assert, and shm_mq_handle::mqh_handle.

Referenced by ExecParallelCreateReaders(), and LaunchParallelWorkers().

321 {
322  Assert(mqh->mqh_handle == NULL);
323  mqh->mqh_handle = handle;
324 }
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:142
#define Assert(condition)
Definition: c.h:804

◆ shm_mq_set_receiver()

void shm_mq_set_receiver ( shm_mq mq,
PGPROC proc 
)

Definition at line 207 of file shm_mq.c.

References Assert, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by attach_to_queues(), ExecParallelSetupTupleQueues(), InitializeParallelDSM(), ReinitializeParallelDSM(), and setup_dynamic_shared_memory().

208 {
209  PGPROC *sender;
210 
212  Assert(mq->mq_receiver == NULL);
213  mq->mq_receiver = proc;
214  sender = mq->mq_sender;
216 
217  if (sender != NULL)
218  SetLatch(&sender->procLatch);
219 }
PGPROC * mq_receiver
Definition: shm_mq.c:75
void SetLatch(Latch *latch)
Definition: latch.c:567
Latch procLatch
Definition: proc.h:130
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:74
#define SpinLockRelease(lock)
Definition: spin.h:64
#define Assert(condition)
Definition: c.h:804
Definition: proc.h:121
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_set_sender()

void shm_mq_set_sender ( shm_mq mq,
PGPROC proc 
)

Definition at line 225 of file shm_mq.c.

References Assert, shm_mq::mq_mutex, shm_mq::mq_receiver, shm_mq::mq_sender, PGPROC::procLatch, SetLatch(), SpinLockAcquire, and SpinLockRelease.

Referenced by attach_to_queues(), ExecParallelGetReceiver(), ParallelWorkerMain(), and setup_dynamic_shared_memory().

226 {
227  PGPROC *receiver;
228 
230  Assert(mq->mq_sender == NULL);
231  mq->mq_sender = proc;
232  receiver = mq->mq_receiver;
234 
235  if (receiver != NULL)
236  SetLatch(&receiver->procLatch);
237 }
PGPROC * mq_receiver
Definition: shm_mq.c:75
void SetLatch(Latch *latch)
Definition: latch.c:567
Latch procLatch
Definition: proc.h:130
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:74
#define SpinLockRelease(lock)
Definition: spin.h:64
#define Assert(condition)
Definition: c.h:804
Definition: proc.h:121
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_wait_for_attach()

shm_mq_result shm_mq_wait_for_attach ( shm_mq_handle mqh)

Definition at line 819 of file shm_mq.c.

References Assert, shm_mq::mq_receiver, shm_mq::mq_sender, shm_mq_handle::mqh_handle, shm_mq_handle::mqh_queue, MyProc, SHM_MQ_DETACHED, shm_mq_get_receiver(), shm_mq_get_sender(), SHM_MQ_SUCCESS, and shm_mq_wait_internal().

820 {
821  shm_mq *mq = mqh->mqh_queue;
822  PGPROC **victim;
823 
824  if (shm_mq_get_receiver(mq) == MyProc)
825  victim = &mq->mq_sender;
826  else
827  {
829  victim = &mq->mq_receiver;
830  }
831 
832  if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
833  return SHM_MQ_SUCCESS;
834  else
835  return SHM_MQ_DETACHED;
836 }
PGPROC * MyProc
Definition: proc.c:68
PGPROC * shm_mq_get_sender(shm_mq *mq)
Definition: shm_mq.c:258
PGPROC * mq_receiver
Definition: shm_mq.c:75
PGPROC * shm_mq_get_receiver(shm_mq *mq)
Definition: shm_mq.c:243
BackgroundWorkerHandle * mqh_handle
Definition: shm_mq.c:142
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:1217
shm_mq * mqh_queue
Definition: shm_mq.c:140
#define Assert(condition)
Definition: c.h:804
Definition: shm_mq.c:72
Definition: proc.h:121
PGPROC * mq_sender
Definition: shm_mq.c:76

◆ shm_mq_wait_internal()

static bool shm_mq_wait_internal ( shm_mq mq,
PGPROC **  ptr,
BackgroundWorkerHandle handle 
)
static

Definition at line 1217 of file shm_mq.c.

References BGWH_NOT_YET_STARTED, BGWH_STARTED, CHECK_FOR_INTERRUPTS, GetBackgroundWorkerPid(), shm_mq::mq_detached, shm_mq::mq_mutex, MyLatch, ResetLatch(), SpinLockAcquire, SpinLockRelease, status(), WAIT_EVENT_MQ_INTERNAL, WaitLatch(), WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by shm_mq_receive(), shm_mq_send_bytes(), and shm_mq_wait_for_attach().

1218 {
1219  bool result = false;
1220 
1221  for (;;)
1222  {
1224  pid_t pid;
1225 
1226  /* Acquire the lock just long enough to check the pointer. */
1227  SpinLockAcquire(&mq->mq_mutex);
1228  result = (*ptr != NULL);
1229  SpinLockRelease(&mq->mq_mutex);
1230 
1231  /* Fail if detached; else succeed if initialized. */
1232  if (mq->mq_detached)
1233  {
1234  result = false;
1235  break;
1236  }
1237  if (result)
1238  break;
1239 
1240  if (handle != NULL)
1241  {
1242  /* Check for unexpected worker death. */
1243  status = GetBackgroundWorkerPid(handle, &pid);
1244  if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1245  {
1246  result = false;
1247  break;
1248  }
1249  }
1250 
1251  /* Wait to be signaled. */
1254 
1255  /* Reset the latch so we don't spin. */
1257 
1258  /* An interrupt may have occurred while we were waiting. */
1260  }
1261 
1262  return result;
1263 }
void ResetLatch(Latch *latch)
Definition: latch.c:660
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:452
#define SpinLockAcquire(lock)
Definition: spin.h:62
slock_t mq_mutex
Definition: shm_mq.c:74
BgwHandleStatus
Definition: bgworker.h:103
#define SpinLockRelease(lock)
Definition: spin.h:64
bool mq_detached
Definition: shm_mq.c:80
struct Latch * MyLatch
Definition: globals.c:57
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:229
#define WL_LATCH_SET
Definition: latch.h:125
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1068
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130

Variable Documentation

◆ shm_mq_minimum_size

const Size shm_mq_minimum_size
Initial value:
=
MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF
#define MAXALIGN(LEN)
Definition: c.h:757
Definition: shm_mq.c:72
#define offsetof(type, field)
Definition: c.h:727

Definition at line 169 of file shm_mq.c.

Referenced by setup_dynamic_shared_memory().