63 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
64 errmsg(
"repeat count size must be an integer value greater than or equal to zero")));
73 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
74 errmsg(
"number of workers must be an integer value greater than zero")));
80 res =
shm_mq_send(outqh, message_size, message_contents,
false,
true);
83 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
84 errmsg(
"could not send message")));
96 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
97 errmsg(
"could not receive message")));
100 if (--loop_count <= 0)
107 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
108 errmsg(
"could not send message")));
143 int32 send_count = 0;
144 int32 receive_count = 0;
155 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
156 errmsg(
"repeat count size must be an integer value greater than or equal to zero")));
164 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
165 errmsg(
"number of workers must be an integer value greater than or equal to zero")));
182 if (send_count < loop_count)
184 res =
shm_mq_send(outqh, message_size, message_contents,
true,
193 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
194 errmsg(
"could not send message")));
201 if (receive_count < loop_count)
214 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
215 errmsg(
"could not receive message")));
223 if (send_count != receive_count)
225 (
errcode(ERRCODE_INTERNAL_ERROR),
226 errmsg(
"message sent %d times, but received %d times",
227 send_count, receive_count)));
264 if (origlen != newlen)
266 (
errmsg(
"message corrupted"),
267 errdetail(
"The original message was %zu bytes but the final message is %zu bytes.",
270 for (
i = 0;
i < origlen; ++
i)
271 if (origdata[
i] != newdata[
i])
273 (
errmsg(
"message corrupted"),
274 errdetail(
"The new and original messages differ at byte %zu of %zu.",
i, origlen)));
void dsm_detach(dsm_segment *seg)
int errcode(int sqlerrcode)
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define ereport(elevel,...)
#define PG_GETARG_TEXT_PP(n)
#define PG_GETARG_INT64(n)
#define PG_FUNCTION_INFO_V1(funcname)
#define PG_GETARG_INT32(n)
#define PG_GETARG_BOOL(n)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define CHECK_FOR_INTERRUPTS()
void test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, shm_mq_handle **output, shm_mq_handle **input)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Datum test_shm_mq_pipelined(PG_FUNCTION_ARGS)
static void verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
Datum test_shm_mq(PG_FUNCTION_ARGS)
static uint32 we_message_queue
static Size VARSIZE_ANY_EXHDR(const void *PTR)
static char * VARDATA_ANY(const void *PTR)
uint32 WaitEventExtensionNew(const char *wait_event_name)
#define WL_EXIT_ON_PM_DEATH