156 const void *
data,
bool nowait,
Size *bytes_written);
158 Size bytes_needed,
bool nowait,
Size *nbytesp,
172 #define MQH_INITIAL_BUFSIZE 8192
187 Assert(size > data_offset);
235 if (receiver != NULL)
377 for (
i = 0;
i < iovcnt; ++
i)
378 nbytes += iov[
i].
len;
383 (
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
384 errmsg(
"cannot send a message of size %zu via shared memory queue",
393 nowait, &bytes_written);
427 if (offset >= iov[which_iov].
len)
429 offset -= iov[which_iov].
len;
431 if (which_iov >= iovcnt)
445 if (which_iov + 1 < iovcnt &&
446 offset + MAXIMUM_ALIGNOF > iov[which_iov].
len)
448 char tmpbuf[MAXIMUM_ALIGNOF];
453 if (offset < iov[which_iov].
len)
458 if (
j == MAXIMUM_ALIGNOF)
463 offset -= iov[which_iov].
len;
465 if (which_iov >= iovcnt)
491 chunksize = iov[which_iov].
len - offset;
492 if (which_iov + 1 < iovcnt)
495 nowait, &bytes_written);
506 offset += bytes_written;
531 if (receiver == NULL)
589 int counterparty_gone;
605 if (counterparty_gone)
639 nowait, &rb, &rawdata);
652 nbytes = *(
Size *) rawdata;
719 (
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
720 errmsg(
"invalid message size %zu in shared memory queue",
800 if (rb > still_needed)
917 bool nowait,
Size *bytes_written)
925 while (sent < nbytes)
936 available =
Min(ringsize - used, nbytes - sent);
950 *bytes_written = sent;
964 *bytes_written = sent;
969 *bytes_written = sent;
977 *bytes_written = sent;
987 else if (available == 0)
1009 *bytes_written = sent;
1034 offset = wb % (uint64) ringsize;
1035 sendnow =
Min(available, ringsize - offset);
1046 (
char *)
data + sent, sendnow);
1067 *bytes_written = sent;
1082 Size *nbytesp,
void **datap)
1103 used = written -
read;
1104 Assert(used <= ringsize);
1105 offset =
read % (uint64) ringsize;
1108 if (used >= bytes_needed || offset + used >= ringsize)
1110 *nbytesp =
Min(used, ringsize - offset);
1222 bool result =
false;
1231 result = (*ptr != NULL);
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
#define pg_memory_barrier()
#define pg_read_barrier()
#define pg_write_barrier()
#define pg_compiler_barrier()
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
#define MAXALIGN_DOWN(LEN)
#define offsetof(type, field)
#define FLEXIBLE_ARRAY_MEMBER
static void PGresult * res
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_EXIT_ON_PM_DEATH
Assert(fmt[strlen(fmt) - 1] !='\n')
void pfree(void *pointer)
MemoryContext CurrentMemoryContext
void * MemoryContextAlloc(MemoryContext context, Size size)
#define CHECK_FOR_INTERRUPTS()
#define pg_nextpower2_size_t(num)
static void static void status(const char *fmt,...) pg_attribute_printf(1
#define DatumGetPointer(X)
#define PointerGetDatum(X)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
#define MQH_INITIAL_BUFSIZE
shm_mq * shm_mq_create(void *address, Size size)
void shm_mq_detach(shm_mq_handle *mqh)
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool force_flush)
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
static void shm_mq_detach_internal(shm_mq *mq)
PGPROC * shm_mq_get_receiver(shm_mq *mq)
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
PGPROC * shm_mq_get_sender(shm_mq *mq)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh)
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
const Size shm_mq_minimum_size
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
BackgroundWorkerHandle * mqh_handle
bool mqh_counterparty_attached
dsm_segment * mqh_segment
bool mqh_length_word_complete
MemoryContext mqh_context
pg_atomic_uint64 mq_bytes_written
pg_atomic_uint64 mq_bytes_read
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
static StringInfoData tmpbuf