172#define MQH_INITIAL_BUFSIZE 8192
196 mq->mq_detached =
false;
213 mq->mq_receiver = proc;
231 mq->mq_sender = proc;
297 mqh->mqh_segment = seg;
298 mqh->mqh_handle = handle;
301 mqh->mqh_consume_pending = 0;
302 mqh->mqh_send_pending = 0;
303 mqh->mqh_partial_bytes = 0;
304 mqh->mqh_expected_bytes = 0;
305 mqh->mqh_length_word_complete =
false;
306 mqh->mqh_counterparty_attached =
false;
323 mqh->mqh_handle = handle;
384 errmsg(
"cannot send a message of size %zu via shared memory queue",
388 while (!
mqh->mqh_length_word_complete)
392 ((
char *) &nbytes) +
mqh->mqh_partial_bytes,
398 mqh->mqh_partial_bytes = 0;
399 mqh->mqh_length_word_complete =
false;
404 if (
mqh->mqh_partial_bytes >=
sizeof(
Size))
408 mqh->mqh_partial_bytes = 0;
409 mqh->mqh_length_word_complete =
true;
420 Assert(
mqh->mqh_partial_bytes <= nbytes);
421 offset =
mqh->mqh_partial_bytes;
475 mqh->mqh_partial_bytes = 0;
476 mqh->mqh_length_word_complete =
false;
500 mqh->mqh_length_word_complete =
false;
501 mqh->mqh_partial_bytes = 0;
509 }
while (
mqh->mqh_partial_bytes < nbytes);
512 mqh->mqh_partial_bytes = 0;
513 mqh->mqh_length_word_complete =
false;
523 if (
mqh->mqh_counterparty_attached)
531 mqh->mqh_counterparty_attached =
true;
544 mqh->mqh_send_pending = 0;
584 if (!
mqh->mqh_counterparty_attached)
613 mq->mq_detached =
true;
616 mqh->mqh_counterparty_attached =
true;
626 if (
mqh->mqh_consume_pending >
mq->mq_ring_size / 4)
629 mqh->mqh_consume_pending = 0;
633 while (!
mqh->mqh_length_word_complete)
647 if (
mqh->mqh_partial_bytes == 0 &&
rb >=
sizeof(
Size))
667 mqh->mqh_expected_bytes = nbytes;
668 mqh->mqh_length_word_complete =
true;
689 if (
mqh->mqh_partial_bytes +
rb >
sizeof(
Size))
700 if (
mqh->mqh_partial_bytes >=
sizeof(
Size))
703 mqh->mqh_expected_bytes = *(
Size *)
mqh->mqh_buffer;
704 mqh->mqh_length_word_complete =
true;
705 mqh->mqh_partial_bytes = 0;
709 nbytes =
mqh->mqh_expected_bytes;
719 errmsg(
"invalid message size %zu in shared memory queue",
722 if (
mqh->mqh_partial_bytes == 0)
734 mqh->mqh_length_word_complete =
false;
746 if (
mqh->mqh_buflen < nbytes)
778 mqh->mqh_partial_bytes +=
rb;
791 if (
mqh->mqh_partial_bytes >= nbytes)
806 mqh->mqh_length_word_complete =
false;
807 mqh->mqh_partial_bytes = 0;
847 if (
mqh->mqh_send_pending > 0)
850 mqh->mqh_send_pending = 0;
857 if (
mqh->mqh_segment)
895 mq->mq_detached =
true;
908 return mqh->mqh_queue;
924 while (
sent < nbytes)
975 mq->mq_detached =
true;
979 mqh->mqh_counterparty_attached =
true;
1003 mqh->mqh_send_pending = 0;
1044 memcpy(&
mq->mq_ring[
mq->mq_ring_offset + offset],
1101 mqh->mqh_consume_pending;
1110 *
datap = &
mq->mq_ring[
mq->mq_ring_offset + offset];
1129 if (
mq->mq_detached)
1148 if (
mqh->mqh_consume_pending > 0)
1151 mqh->mqh_consume_pending = 0;
1185 if (
mq->mq_detached)
1198 mq->mq_detached =
true;
1221 bool result =
false;
1230 result = (*ptr !=
NULL);
1234 if (
mq->mq_detached)
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
#define pg_memory_barrier()
#define pg_read_barrier()
#define pg_write_barrier()
#define pg_compiler_barrier()
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
#define MAXALIGN_DOWN(LEN)
#define Assert(condition)
#define FLEXIBLE_ARRAY_MEMBER
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
#define palloc_object(type)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
void * MemoryContextAlloc(MemoryContext context, Size size)
void pfree(void *pointer)
MemoryContext CurrentMemoryContext
#define CHECK_FOR_INTERRUPTS()
#define pg_nextpower2_size_t
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written)
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
shm_mq * shm_mq_create(void *address, Size size)
PGPROC * shm_mq_get_receiver(shm_mq *mq)
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
#define MQH_INITIAL_BUFSIZE
PGPROC * shm_mq_get_sender(shm_mq *mq)
void shm_mq_detach(shm_mq_handle *mqh)
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg)
static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool force_flush)
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
static void shm_mq_detach_internal(shm_mq *mq)
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n)
shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap)
const Size shm_mq_minimum_size
static void SpinLockRelease(volatile slock_t *lock)
static void SpinLockAcquire(volatile slock_t *lock)
static void SpinLockInit(volatile slock_t *lock)
BackgroundWorkerHandle * mqh_handle
bool mqh_counterparty_attached
dsm_segment * mqh_segment
bool mqh_length_word_complete
MemoryContext mqh_context
pg_atomic_uint64 mq_bytes_written
pg_atomic_uint64 mq_bytes_read
char mq_ring[FLEXIBLE_ARRAY_MEMBER]
#define WL_EXIT_ON_PM_DEATH
static StringInfoData tmpbuf