147 const void *data,
bool nowait,
Size *bytes_written);
149 Size bytes_needed,
bool nowait,
Size *nbytesp,
163 #define MQH_INITIAL_BUFSIZE 8192 178 Assert(size > data_offset);
226 if (receiver != NULL)
360 for (i = 0; i < iovcnt; ++
i)
361 nbytes += iov[i].len;
369 nowait, &bytes_written);
403 if (offset >= iov[which_iov].len)
405 offset -= iov[which_iov].
len;
407 if (which_iov >= iovcnt)
421 if (which_iov + 1 < iovcnt &&
422 offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
424 char tmpbuf[MAXIMUM_ALIGNOF];
429 if (offset < iov[which_iov].len)
431 tmpbuf[j] = iov[which_iov].
data[offset];
434 if (j == MAXIMUM_ALIGNOF)
439 offset -= iov[which_iov].
len;
441 if (which_iov >= iovcnt)
467 chunksize = iov[which_iov].
len - offset;
468 if (which_iov + 1 < iovcnt)
471 nowait, &bytes_written);
482 offset += bytes_written;
507 if (receiver == NULL)
555 int counterparty_gone;
571 if (counterparty_gone)
605 nowait, &rb, &rawdata);
618 nbytes = *(
Size *) rawdata;
706 while (newbuflen < nbytes)
748 if (rb > still_needed)
858 bool nowait,
Size *bytes_written)
866 while (sent < nbytes)
877 available =
Min(ringsize - used, nbytes - sent);
891 *bytes_written = sent;
905 *bytes_written = sent;
910 *bytes_written = sent;
918 *bytes_written = sent;
928 else if (available == 0)
941 *bytes_written = sent;
965 offset = wb % (uint64) ringsize;
966 sendnow =
Min(available, ringsize - offset);
977 (
char *) data + sent, sendnow);
997 *bytes_written = sent;
1012 Size *nbytesp,
void **datap)
1033 used = written -
read;
1034 Assert(used <= ringsize);
1035 offset = read % (uint64) ringsize;
1038 if (used >= bytes_needed || offset + used >= ringsize)
1040 *nbytesp =
Min(used, ringsize - offset);
1151 bool result =
false;
1160 result = (*ptr != NULL);
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
void shm_mq_detach(shm_mq_handle *mqh)
#define PointerGetDatum(X)
#define SpinLockInit(lock)
PGPROC * shm_mq_get_sender(shm_mq *mq)
PGPROC * shm_mq_get_receiver(shm_mq *mq)
BackgroundWorkerHandle * mqh_handle
void ResetLatch(volatile Latch *latch)
bool mqh_counterparty_attached
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
const Size shm_mq_minimum_size
#define pg_compiler_barrier()
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
#define SpinLockAcquire(lock)
bool mqh_length_word_complete
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
void pfree(void *pointer)
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh)
#define MQH_INITIAL_BUFSIZE
shm_mq * shm_mq_create(void *address, Size size)
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
dsm_segment * mqh_segment
MemoryContext CurrentMemoryContext
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
static void shm_mq_detach_internal(shm_mq *mq)
#define SpinLockRelease(lock)
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
#define pg_memory_barrier()
void SetLatch(volatile Latch *latch)
#define Assert(condition)
pg_atomic_uint64 mq_bytes_read
#define pg_read_barrier()
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
#define DatumGetPointer(X)
static StringInfoData tmpbuf
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
void * MemoryContextAlloc(MemoryContext context, Size size)
#define pg_write_barrier()
#define CHECK_FOR_INTERRUPTS()
MemoryContext mqh_context
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
static void static void status(const char *fmt,...) pg_attribute_printf(1
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
pg_atomic_uint64 mq_bytes_written
#define offsetof(type, field)
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
#define MAXALIGN_DOWN(LEN)