59 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
60 errmsg(
"repeat count size must be an integer value greater than or equal to zero")));
69 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
70 errmsg(
"number of workers must be an integer value greater than zero")));
76 res =
shm_mq_send(outqh, message_size, message_contents,
false,
true);
79 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
80 errmsg(
"could not send message")));
92 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
93 errmsg(
"could not receive message")));
96 if (--loop_count <= 0)
103 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
104 errmsg(
"could not send message")));
139 int32 send_count = 0;
140 int32 receive_count = 0;
151 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
152 errmsg(
"repeat count size must be an integer value greater than or equal to zero")));
160 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
161 errmsg(
"number of workers must be an integer value greater than or equal to zero")));
178 if (send_count < loop_count)
189 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
190 errmsg(
"could not send message")));
197 if (receive_count < loop_count)
210 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
211 errmsg(
"could not receive message")));
219 if (send_count != receive_count)
221 (
errcode(ERRCODE_INTERNAL_ERROR),
222 errmsg(
"message sent %d times, but received %d times",
223 send_count, receive_count)));
256 if (origlen != newlen)
258 (
errmsg(
"message corrupted"),
259 errdetail(
"The original message was %zu bytes but the final message is %zu bytes.",
262 for (
i = 0;
i < origlen; ++
i)
263 if (origdata[
i] != newdata[
i])
265 (
errmsg(
"message corrupted"),
266 errdetail(
"The new and original messages differ at byte %zu of %zu.",
i, origlen)));
static void PGresult * res
void dsm_detach(dsm_segment *seg)
int errdetail(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
#define PG_GETARG_TEXT_PP(n)
#define PG_GETARG_INT64(n)
#define PG_GETARG_INT32(n)
#define PG_GETARG_BOOL(n)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_EXIT_ON_PM_DEATH
#define CHECK_FOR_INTERRUPTS()
#define VARSIZE_ANY_EXHDR(PTR)
void test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, shm_mq_handle **output, shm_mq_handle **input)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Datum test_shm_mq_pipelined(PG_FUNCTION_ARGS)
static void verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
PG_FUNCTION_INFO_V1(test_shm_mq)
Datum test_shm_mq(PG_FUNCTION_ARGS)
#define PG_WAIT_EXTENSION