PostgreSQL Source Code git master
Loading...
Searching...
No Matches
test.c
Go to the documentation of this file.
1/*--------------------------------------------------------------------------
2 *
3 * test.c
4 * Test harness code for shared memory message queues.
5 *
6 * Copyright (c) 2013-2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/test/modules/test_shm_mq/test.c
10 *
11 * -------------------------------------------------------------------------
12 */
13
14#include "postgres.h"
15
16#include "fmgr.h"
17#include "miscadmin.h"
18#include "pgstat.h"
19#include "storage/proc.h"
20#include "varatt.h"
21
22#include "test_shm_mq.h"
23
25
28
29static void verify_message(Size origlen, char *origdata, Size newlen,
30 char *newdata);
31
32/* value cached, fetched from shared memory */
34
35/*
36 * Simple test of the shared memory message queue infrastructure.
37 *
38 * We set up a ring of message queues passing through 1 or more background
39 * processes and eventually looping back to ourselves. We then send a message
40 * through the ring a number of times indicated by the loop count. At the end,
41 * we check whether the final message matches the one we started with.
42 */
45{
46 int64 queue_size = PG_GETARG_INT64(0);
47 text *message = PG_GETARG_TEXT_PP(1);
48 char *message_contents = VARDATA_ANY(message);
49 int message_size = VARSIZE_ANY_EXHDR(message);
51 int32 nworkers = PG_GETARG_INT32(3);
52 dsm_segment *seg;
55 shm_mq_result res;
56 Size len;
57 void *data;
58
59 /* A negative loopcount is nonsensical. */
60 if (loop_count < 0)
63 errmsg("repeat count size must be an integer value greater than or equal to zero")));
64
65 /*
66 * Since this test sends data using the blocking interfaces, it cannot
67 * send data to itself. Therefore, a minimum of 1 worker is required. Of
68 * course, a negative worker count is nonsensical.
69 */
70 if (nworkers <= 0)
73 errmsg("number of workers must be an integer value greater than zero")));
74
75 /* Set up dynamic shared memory segment and background workers. */
76 test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
77
78 /* Send the initial message. */
79 res = shm_mq_send(outqh, message_size, message_contents, false, true);
80 if (res != SHM_MQ_SUCCESS)
83 errmsg("could not send message")));
84
85 /*
86 * Receive a message and send it back out again. Do this a number of
87 * times equal to the loop count.
88 */
89 for (;;)
90 {
91 /* Receive a message. */
92 res = shm_mq_receive(inqh, &len, &data, false);
93 if (res != SHM_MQ_SUCCESS)
96 errmsg("could not receive message")));
97
98 /* If this is supposed to be the last iteration, stop here. */
99 if (--loop_count <= 0)
100 break;
101
102 /* Send it back out. */
103 res = shm_mq_send(outqh, len, data, false, true);
104 if (res != SHM_MQ_SUCCESS)
107 errmsg("could not send message")));
108 }
109
110 /*
111 * Finally, check that we got back the same message from the last
112 * iteration that we originally sent.
113 */
114 verify_message(message_size, message_contents, len, data);
115
116 /* Clean up. */
117 dsm_detach(seg);
118
120}
121
122/*
123 * Pipelined test of the shared memory message queue infrastructure.
124 *
125 * As in the basic test, we set up a ring of message queues passing through
126 * 1 or more background processes and eventually looping back to ourselves.
127 * Then, we send N copies of the user-specified message through the ring and
128 * receive them all back. Since this might fill up all message queues in the
129 * ring and then stall, we must be prepared to begin receiving the messages
130 * back before we've finished sending them.
131 */
132Datum
134{
135 int64 queue_size = PG_GETARG_INT64(0);
136 text *message = PG_GETARG_TEXT_PP(1);
137 char *message_contents = VARDATA_ANY(message);
138 int message_size = VARSIZE_ANY_EXHDR(message);
140 int32 nworkers = PG_GETARG_INT32(3);
141 bool verify = PG_GETARG_BOOL(4);
142 int32 send_count = 0;
144 dsm_segment *seg;
147 shm_mq_result res;
148 Size len;
149 void *data;
150
151 /* A negative loopcount is nonsensical. */
152 if (loop_count < 0)
155 errmsg("repeat count size must be an integer value greater than or equal to zero")));
156
157 /*
158 * Using the nonblocking interfaces, we can even send data to ourselves,
159 * so the minimum number of workers for this test is zero.
160 */
161 if (nworkers < 0)
164 errmsg("number of workers must be an integer value greater than or equal to zero")));
165
166 /* Set up dynamic shared memory segment and background workers. */
167 test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
168
169 /* Main loop. */
170 for (;;)
171 {
172 bool wait = true;
173
174 /*
175 * If we haven't yet sent the message the requisite number of times,
176 * try again to send it now. Note that when shm_mq_send() returns
177 * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
178 * same message size and contents; that's not an issue here because
179 * we're sending the same message every time.
180 */
182 {
183 res = shm_mq_send(outqh, message_size, message_contents, true,
184 true);
185 if (res == SHM_MQ_SUCCESS)
186 {
187 ++send_count;
188 wait = false;
189 }
190 else if (res == SHM_MQ_DETACHED)
193 errmsg("could not send message")));
194 }
195
196 /*
197 * If we haven't yet received the message the requisite number of
198 * times, try to receive it again now.
199 */
201 {
202 res = shm_mq_receive(inqh, &len, &data, true);
203 if (res == SHM_MQ_SUCCESS)
204 {
206 /* Verifying every time is slow, so it's optional. */
207 if (verify)
208 verify_message(message_size, message_contents, len, data);
209 wait = false;
210 }
211 else if (res == SHM_MQ_DETACHED)
214 errmsg("could not receive message")));
215 }
216 else
217 {
218 /*
219 * Otherwise, we've received the message enough times. This
220 * shouldn't happen unless we've also sent it enough times.
221 */
225 errmsg("message sent %d times, but received %d times",
227 break;
228 }
229
230 if (wait)
231 {
232 /* first time, allocate or get the custom wait event */
233 if (we_message_queue == 0)
234 we_message_queue = WaitEventExtensionNew("TestShmMqMessageQueue");
235
236 /*
237 * If we made no progress, wait for one of the other processes to
238 * which we are connected to set our latch, indicating that they
239 * have read or written data and therefore there may now be work
240 * for us to do.
241 */
246 }
247 }
248
249 /* Clean up. */
250 dsm_detach(seg);
251
253}
254
255/*
256 * Verify that two messages are the same.
257 */
258static void
260{
261 Size i;
262
263 if (origlen != newlen)
265 (errmsg("message corrupted"),
266 errdetail("The original message was %zu bytes but the final message is %zu bytes.",
267 origlen, newlen)));
268
269 for (i = 0; i < origlen; ++i)
270 if (origdata[i] != newdata[i])
272 (errmsg("message corrupted"),
273 errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
274}
int64_t int64
Definition c.h:576
int32_t int32
Definition c.h:575
uint32_t uint32
Definition c.h:579
size_t Size
Definition c.h:652
void dsm_detach(dsm_segment *seg)
Definition dsm.c:803
int errcode(int sqlerrcode)
Definition elog.c:874
int errmsg(const char *fmt,...)
Definition elog.c:1093
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define ERROR
Definition elog.h:39
#define ereport(elevel,...)
Definition elog.h:150
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_GETARG_INT64(n)
Definition fmgr.h:284
#define PG_FUNCTION_INFO_V1(funcname)
Definition fmgr.h:417
#define PG_GETARG_INT32(n)
Definition fmgr.h:269
#define PG_GETARG_BOOL(n)
Definition fmgr.h:274
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
struct Latch * MyLatch
Definition globals.c:63
int i
Definition isn.c:77
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
const void size_t len
const void * data
uint64_t Datum
Definition postgres.h:70
static int fb(int x)
void test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, shm_mq_handle **output, shm_mq_handle **input)
Definition setup.c:51
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition shm_mq.c:573
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition shm_mq.c:330
shm_mq_result
Definition shm_mq.h:39
@ SHM_MQ_SUCCESS
Definition shm_mq.h:40
@ SHM_MQ_DETACHED
Definition shm_mq.h:42
Definition c.h:739
Datum test_shm_mq_pipelined(PG_FUNCTION_ARGS)
Definition test.c:133
static void verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
Definition test.c:259
PG_MODULE_MAGIC
Definition test.c:24
Datum test_shm_mq(PG_FUNCTION_ARGS)
Definition test.c:44
static uint32 we_message_queue
Definition test.c:33
static Size VARSIZE_ANY_EXHDR(const void *PTR)
Definition varatt.h:472
static char * VARDATA_ANY(const void *PTR)
Definition varatt.h:486
uint32 WaitEventExtensionNew(const char *wait_event_name)
Definition wait_event.c:163
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET