173#define MQH_INITIAL_BUFSIZE 8192
197 mq->mq_detached =
false;
214 mq->mq_receiver = proc;
232 mq->mq_sender = proc;
298 mqh->mqh_segment = seg;
299 mqh->mqh_handle = handle;
302 mqh->mqh_consume_pending = 0;
303 mqh->mqh_send_pending = 0;
304 mqh->mqh_partial_bytes = 0;
305 mqh->mqh_expected_bytes = 0;
306 mqh->mqh_length_word_complete =
false;
307 mqh->mqh_counterparty_attached =
false;
324 mqh->mqh_handle = handle;
385 errmsg(
"cannot send a message of size %zu via shared memory queue",
389 while (!
mqh->mqh_length_word_complete)
393 ((
char *) &nbytes) +
mqh->mqh_partial_bytes,
399 mqh->mqh_partial_bytes = 0;
400 mqh->mqh_length_word_complete =
false;
405 if (
mqh->mqh_partial_bytes >=
sizeof(
Size))
409 mqh->mqh_partial_bytes = 0;
410 mqh->mqh_length_word_complete =
true;
421 Assert(
mqh->mqh_partial_bytes <= nbytes);
422 offset =
mqh->mqh_partial_bytes;
476 mqh->mqh_partial_bytes = 0;
477 mqh->mqh_length_word_complete =
false;
501 mqh->mqh_length_word_complete =
false;
502 mqh->mqh_partial_bytes = 0;
510 }
while (
mqh->mqh_partial_bytes < nbytes);
513 mqh->mqh_partial_bytes = 0;
514 mqh->mqh_length_word_complete =
false;
524 if (
mqh->mqh_counterparty_attached)
532 mqh->mqh_counterparty_attached =
true;
545 mqh->mqh_send_pending = 0;
585 if (!
mqh->mqh_counterparty_attached)
614 mq->mq_detached =
true;
617 mqh->mqh_counterparty_attached =
true;
627 if (
mqh->mqh_consume_pending >
mq->mq_ring_size / 4)
630 mqh->mqh_consume_pending = 0;
634 while (!
mqh->mqh_length_word_complete)
648 if (
mqh->mqh_partial_bytes == 0 &&
rb >=
sizeof(
Size))
668 mqh->mqh_expected_bytes = nbytes;
669 mqh->mqh_length_word_complete =
true;
690 if (
mqh->mqh_partial_bytes +
rb >
sizeof(
Size))
701 if (
mqh->mqh_partial_bytes >=
sizeof(
Size))
704 mqh->mqh_expected_bytes = *(
Size *)
mqh->mqh_buffer;
705 mqh->mqh_length_word_complete =
true;
706 mqh->mqh_partial_bytes = 0;
710 nbytes =
mqh->mqh_expected_bytes;
720 errmsg(
"invalid message size %zu in shared memory queue",
723 if (
mqh->mqh_partial_bytes == 0)
735 mqh->mqh_length_word_complete =
false;
747 if (
mqh->mqh_buflen < nbytes)
779 mqh->mqh_partial_bytes +=
rb;
792 if (
mqh->mqh_partial_bytes >= nbytes)
807 mqh->mqh_length_word_complete =
false;
808 mqh->mqh_partial_bytes = 0;
848 if (
mqh->mqh_send_pending > 0)
851 mqh->mqh_send_pending = 0;
858 if (
mqh->mqh_segment)
896 mq->mq_detached =
true;
909 return mqh->mqh_queue;
925 while (
sent < nbytes)
976 mq->mq_detached =
true;
980 mqh->mqh_counterparty_attached =
true;
1004 mqh->mqh_send_pending = 0;
1045 memcpy(&
mq->mq_ring[
mq->mq_ring_offset + offset],
1102 mqh->mqh_consume_pending;
1111 *
datap = &
mq->mq_ring[
mq->mq_ring_offset + offset];
1130 if (
mq->mq_detached)
1149 if (
mqh->mqh_consume_pending > 0)
1152 mqh->mqh_consume_pending = 0;
1186 if (
mq->mq_detached)
1199 mq->mq_detached =
true;
1222 bool result =
false;
1231 result = (*ptr !=
NULL);
1235 if (
mq->mq_detached)
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
#define pg_memory_barrier()
#define pg_read_barrier()
#define pg_write_barrier()
#define pg_compiler_barrier()
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
#define MAXALIGN_DOWN(LEN)
#define Assert(condition)
#define FLEXIBLE_ARRAY_MEMBER
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
int errcode(int sqlerrcode)
#define ereport(elevel,...)
#define palloc_object(type)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
void * MemoryContextAlloc(MemoryContext context, Size size)
void pfree(void *pointer)
MemoryContext CurrentMemoryContext
#define CHECK_FOR_INTERRUPTS()
#define pg_nextpower2_size_t
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
shm_mq * shm_mq_create(void *address, Size size)
PGPROC * shm_mq_get_receiver(shm_mq *mq)
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
#define MQH_INITIAL_BUFSIZE
PGPROC * shm_mq_get_sender(shm_mq *mq)
void shm_mq_detach(shm_mq_handle *mqh)
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool force_flush)
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
static void shm_mq_detach_internal(shm_mq *mq)
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
const Size shm_mq_minimum_size
static void SpinLockRelease(volatile slock_t *lock)
static void SpinLockAcquire(volatile slock_t *lock)
static void SpinLockInit(volatile slock_t *lock)
BackgroundWorkerHandle * mqh_handle
bool mqh_counterparty_attached
dsm_segment * mqh_segment
bool mqh_length_word_complete
MemoryContext mqh_context
pg_atomic_uint64 mq_bytes_written
pg_atomic_uint64 mq_bytes_read
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
#define WL_EXIT_ON_PM_DEATH
static StringInfoData tmpbuf