155 const void *
data,
bool nowait,
Size *bytes_written);
157 Size bytes_needed,
bool nowait,
Size *nbytesp,
171 #define MQH_INITIAL_BUFSIZE 8192
234 if (receiver != NULL)
376 for (
i = 0;
i < iovcnt; ++
i)
377 nbytes += iov[
i].
len;
382 (
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
383 errmsg(
"cannot send a message of size %zu via shared memory queue",
392 nowait, &bytes_written);
426 if (offset >= iov[which_iov].
len)
428 offset -= iov[which_iov].
len;
430 if (which_iov >= iovcnt)
444 if (which_iov + 1 < iovcnt &&
445 offset + MAXIMUM_ALIGNOF > iov[which_iov].
len)
447 char tmpbuf[MAXIMUM_ALIGNOF];
452 if (offset < iov[which_iov].
len)
457 if (
j == MAXIMUM_ALIGNOF)
462 offset -= iov[which_iov].
len;
464 if (which_iov >= iovcnt)
490 chunksize = iov[which_iov].
len - offset;
491 if (which_iov + 1 < iovcnt)
494 nowait, &bytes_written);
505 offset += bytes_written;
529 if (receiver != NULL)
541 if (receiver != NULL)
587 int counterparty_gone;
603 if (counterparty_gone)
637 nowait, &rb, &rawdata);
650 nbytes = *(
Size *) rawdata;
717 (
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
718 errmsg(
"invalid message size %zu in shared memory queue",
798 if (rb > still_needed)
915 bool nowait,
Size *bytes_written)
923 while (sent < nbytes)
934 available =
Min(ringsize - used, nbytes - sent);
948 *bytes_written = sent;
962 *bytes_written = sent;
967 *bytes_written = sent;
975 *bytes_written = sent;
985 else if (available == 0)
1007 *bytes_written = sent;
1019 WAIT_EVENT_MESSAGE_QUEUE_SEND);
1032 offset = wb % (uint64) ringsize;
1033 sendnow =
Min(available, ringsize - offset);
1044 (
char *)
data + sent, sendnow);
1065 *bytes_written = sent;
1080 Size *nbytesp,
void **datap)
1101 used = written -
read;
1102 Assert(used <= ringsize);
1103 offset =
read % (uint64) ringsize;
1106 if (used >= bytes_needed || offset + used >= ringsize)
1108 *nbytesp =
Min(used, ringsize - offset);
1165 WAIT_EVENT_MESSAGE_QUEUE_RECEIVE);
1220 bool result =
false;
1229 result = (*ptr != NULL);
1254 WAIT_EVENT_MESSAGE_QUEUE_INTERNAL);
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
#define pg_memory_barrier()
#define pg_read_barrier()
#define pg_write_barrier()
#define pg_compiler_barrier()
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
#define MAXALIGN_DOWN(LEN)
#define Assert(condition)
#define FLEXIBLE_ARRAY_MEMBER
static void PGresult * res
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_EXIT_ON_PM_DEATH
void pfree(void *pointer)
MemoryContext CurrentMemoryContext
void * MemoryContextAlloc(MemoryContext context, Size size)
#define CHECK_FOR_INTERRUPTS()
#define pg_nextpower2_size_t
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
#define MQH_INITIAL_BUFSIZE
shm_mq * shm_mq_create(void *address, Size size)
void shm_mq_detach(shm_mq_handle *mqh)
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool force_flush)
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
static void shm_mq_detach_internal(shm_mq *mq)
PGPROC * shm_mq_get_receiver(shm_mq *mq)
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
PGPROC * shm_mq_get_sender(shm_mq *mq)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh)
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
const Size shm_mq_minimum_size
static pg_noinline void Size size
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
BackgroundWorkerHandle * mqh_handle
bool mqh_counterparty_attached
dsm_segment * mqh_segment
bool mqh_length_word_complete
MemoryContext mqh_context
pg_atomic_uint64 mq_bytes_written
pg_atomic_uint64 mq_bytes_read
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
static StringInfoData tmpbuf