61 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
62 errmsg(
"repeat count size must be an integer value greater than or equal to zero")));
71 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
72 errmsg(
"number of workers must be an integer value greater than zero")));
78 res =
shm_mq_send(outqh, message_size, message_contents,
false,
true);
81 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
82 errmsg(
"could not send message")));
94 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
95 errmsg(
"could not receive message")));
98 if (--loop_count <= 0)
105 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
106 errmsg(
"could not send message")));
141 int32 send_count = 0;
142 int32 receive_count = 0;
153 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
154 errmsg(
"repeat count size must be an integer value greater than or equal to zero")));
162 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
163 errmsg(
"number of workers must be an integer value greater than or equal to zero")));
180 if (send_count < loop_count)
191 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
192 errmsg(
"could not send message")));
199 if (receive_count < loop_count)
212 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
213 errmsg(
"could not receive message")));
221 if (send_count != receive_count)
223 (
errcode(ERRCODE_INTERNAL_ERROR),
224 errmsg(
"message sent %d times, but received %d times",
225 send_count, receive_count)));
262 if (origlen != newlen)
264 (
errmsg(
"message corrupted"),
265 errdetail(
"The original message was %zu bytes but the final message is %zu bytes.",
268 for (
i = 0;
i < origlen; ++
i)
269 if (origdata[
i] != newdata[
i])
271 (
errmsg(
"message corrupted"),
272 errdetail(
"The new and original messages differ at byte %zu of %zu.",
i, origlen)));
static void PGresult * res
void dsm_detach(dsm_segment *seg)
int errdetail(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
#define PG_GETARG_TEXT_PP(n)
#define PG_GETARG_INT64(n)
#define PG_GETARG_INT32(n)
#define PG_GETARG_BOOL(n)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_EXIT_ON_PM_DEATH
#define CHECK_FOR_INTERRUPTS()
void test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, shm_mq_handle **output, shm_mq_handle **input)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Datum test_shm_mq_pipelined(PG_FUNCTION_ARGS)
static void verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
PG_FUNCTION_INFO_V1(test_shm_mq)
Datum test_shm_mq(PG_FUNCTION_ARGS)
static uint32 we_message_queue
#define VARSIZE_ANY_EXHDR(PTR)
uint32 WaitEventExtensionNew(const char *wait_event_name)