171#define MQH_INITIAL_BUFSIZE 8192
195 mq->mq_detached =
false;
212 mq->mq_receiver = proc;
230 mq->mq_sender = proc;
296 mqh->mqh_segment = seg;
297 mqh->mqh_handle = handle;
300 mqh->mqh_consume_pending = 0;
301 mqh->mqh_send_pending = 0;
302 mqh->mqh_partial_bytes = 0;
303 mqh->mqh_expected_bytes = 0;
304 mqh->mqh_length_word_complete =
false;
305 mqh->mqh_counterparty_attached =
false;
322 mqh->mqh_handle = handle;
383 errmsg(
"cannot send a message of size %zu via shared memory queue",
387 while (!
mqh->mqh_length_word_complete)
391 ((
char *) &nbytes) +
mqh->mqh_partial_bytes,
397 mqh->mqh_partial_bytes = 0;
398 mqh->mqh_length_word_complete =
false;
403 if (
mqh->mqh_partial_bytes >=
sizeof(
Size))
407 mqh->mqh_partial_bytes = 0;
408 mqh->mqh_length_word_complete =
true;
419 Assert(
mqh->mqh_partial_bytes <= nbytes);
420 offset =
mqh->mqh_partial_bytes;
474 mqh->mqh_partial_bytes = 0;
475 mqh->mqh_length_word_complete =
false;
499 mqh->mqh_length_word_complete =
false;
500 mqh->mqh_partial_bytes = 0;
508 }
while (
mqh->mqh_partial_bytes < nbytes);
511 mqh->mqh_partial_bytes = 0;
512 mqh->mqh_length_word_complete =
false;
522 if (
mqh->mqh_counterparty_attached)
530 mqh->mqh_counterparty_attached =
true;
543 mqh->mqh_send_pending = 0;
583 if (!
mqh->mqh_counterparty_attached)
612 mq->mq_detached =
true;
615 mqh->mqh_counterparty_attached =
true;
625 if (
mqh->mqh_consume_pending >
mq->mq_ring_size / 4)
628 mqh->mqh_consume_pending = 0;
632 while (!
mqh->mqh_length_word_complete)
646 if (
mqh->mqh_partial_bytes == 0 &&
rb >=
sizeof(
Size))
666 mqh->mqh_expected_bytes = nbytes;
667 mqh->mqh_length_word_complete =
true;
688 if (
mqh->mqh_partial_bytes +
rb >
sizeof(
Size))
699 if (
mqh->mqh_partial_bytes >=
sizeof(
Size))
702 mqh->mqh_expected_bytes = *(
Size *)
mqh->mqh_buffer;
703 mqh->mqh_length_word_complete =
true;
704 mqh->mqh_partial_bytes = 0;
708 nbytes =
mqh->mqh_expected_bytes;
718 errmsg(
"invalid message size %zu in shared memory queue",
721 if (
mqh->mqh_partial_bytes == 0)
733 mqh->mqh_length_word_complete =
false;
745 if (
mqh->mqh_buflen < nbytes)
777 mqh->mqh_partial_bytes +=
rb;
790 if (
mqh->mqh_partial_bytes >= nbytes)
805 mqh->mqh_length_word_complete =
false;
806 mqh->mqh_partial_bytes = 0;
846 if (
mqh->mqh_send_pending > 0)
849 mqh->mqh_send_pending = 0;
856 if (
mqh->mqh_segment)
894 mq->mq_detached =
true;
907 return mqh->mqh_queue;
923 while (
sent < nbytes)
974 mq->mq_detached =
true;
978 mqh->mqh_counterparty_attached =
true;
1002 mqh->mqh_send_pending = 0;
1043 memcpy(&
mq->mq_ring[
mq->mq_ring_offset + offset],
1100 mqh->mqh_consume_pending;
1109 *
datap = &
mq->mq_ring[
mq->mq_ring_offset + offset];
1128 if (
mq->mq_detached)
1147 if (
mqh->mqh_consume_pending > 0)
1150 mqh->mqh_consume_pending = 0;
1184 if (
mq->mq_detached)
1197 mq->mq_detached =
true;
1220 bool result =
false;
1229 result = (*ptr !=
NULL);
1233 if (
mq->mq_detached)
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
#define pg_memory_barrier()
#define pg_read_barrier()
#define pg_write_barrier()
#define pg_compiler_barrier()
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
#define MAXALIGN_DOWN(LEN)
#define Assert(condition)
#define FLEXIBLE_ARRAY_MEMBER
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
#define palloc_object(type)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
void * MemoryContextAlloc(MemoryContext context, Size size)
void pfree(void *pointer)
MemoryContext CurrentMemoryContext
#define CHECK_FOR_INTERRUPTS()
#define pg_nextpower2_size_t
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
shm_mq * shm_mq_create(void *address, Size size)
PGPROC * shm_mq_get_receiver(shm_mq *mq)
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
#define MQH_INITIAL_BUFSIZE
PGPROC * shm_mq_get_sender(shm_mq *mq)
void shm_mq_detach(shm_mq_handle *mqh)
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool force_flush)
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
static void shm_mq_detach_internal(shm_mq *mq)
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
const Size shm_mq_minimum_size
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
BackgroundWorkerHandle * mqh_handle
bool mqh_counterparty_attached
dsm_segment * mqh_segment
bool mqh_length_word_complete
MemoryContext mqh_context
pg_atomic_uint64 mq_bytes_written
pg_atomic_uint64 mq_bytes_read
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
#define WL_EXIT_ON_PM_DEATH
static StringInfoData tmpbuf