PostgreSQL Source Code git master
Loading...
Searching...
No Matches
test.c
Go to the documentation of this file.
1/*--------------------------------------------------------------------------
2 *
3 * test.c
4 * Test harness code for shared memory message queues.
5 *
6 * Copyright (c) 2013-2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/test/modules/test_shm_mq/test.c
10 *
11 * -------------------------------------------------------------------------
12 */
13
14#include "postgres.h"
15
16#include "fmgr.h"
17#include "miscadmin.h"
18#include "pgstat.h"
19#include "storage/proc.h"
20#include "utils/wait_event.h"
21#include "varatt.h"
22
23#include "test_shm_mq.h"
24
26
29
30static void verify_message(Size origlen, char *origdata, Size newlen,
31 char *newdata);
32
33/* value cached, fetched from shared memory */
35
36/*
37 * Simple test of the shared memory message queue infrastructure.
38 *
39 * We set up a ring of message queues passing through 1 or more background
40 * processes and eventually looping back to ourselves. We then send a message
41 * through the ring a number of times indicated by the loop count. At the end,
42 * we check whether the final message matches the one we started with.
43 */
46{
47 int64 queue_size = PG_GETARG_INT64(0);
48 text *message = PG_GETARG_TEXT_PP(1);
49 char *message_contents = VARDATA_ANY(message);
50 int message_size = VARSIZE_ANY_EXHDR(message);
52 int32 nworkers = PG_GETARG_INT32(3);
53 dsm_segment *seg;
56 shm_mq_result res;
57 Size len;
58 void *data;
59
60 /* A negative loopcount is nonsensical. */
61 if (loop_count < 0)
64 errmsg("repeat count size must be an integer value greater than or equal to zero")));
65
66 /*
67 * Since this test sends data using the blocking interfaces, it cannot
68 * send data to itself. Therefore, a minimum of 1 worker is required. Of
69 * course, a negative worker count is nonsensical.
70 */
71 if (nworkers <= 0)
74 errmsg("number of workers must be an integer value greater than zero")));
75
76 /* Set up dynamic shared memory segment and background workers. */
77 test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
78
79 /* Send the initial message. */
80 res = shm_mq_send(outqh, message_size, message_contents, false, true);
81 if (res != SHM_MQ_SUCCESS)
84 errmsg("could not send message")));
85
86 /*
87 * Receive a message and send it back out again. Do this a number of
88 * times equal to the loop count.
89 */
90 for (;;)
91 {
92 /* Receive a message. */
93 res = shm_mq_receive(inqh, &len, &data, false);
94 if (res != SHM_MQ_SUCCESS)
97 errmsg("could not receive message")));
98
99 /* If this is supposed to be the last iteration, stop here. */
100 if (--loop_count <= 0)
101 break;
102
103 /* Send it back out. */
104 res = shm_mq_send(outqh, len, data, false, true);
105 if (res != SHM_MQ_SUCCESS)
108 errmsg("could not send message")));
109 }
110
111 /*
112 * Finally, check that we got back the same message from the last
113 * iteration that we originally sent.
114 */
115 verify_message(message_size, message_contents, len, data);
116
117 /* Clean up. */
118 dsm_detach(seg);
119
121}
122
123/*
124 * Pipelined test of the shared memory message queue infrastructure.
125 *
126 * As in the basic test, we set up a ring of message queues passing through
127 * 1 or more background processes and eventually looping back to ourselves.
128 * Then, we send N copies of the user-specified message through the ring and
129 * receive them all back. Since this might fill up all message queues in the
130 * ring and then stall, we must be prepared to begin receiving the messages
131 * back before we've finished sending them.
132 */
133Datum
135{
136 int64 queue_size = PG_GETARG_INT64(0);
137 text *message = PG_GETARG_TEXT_PP(1);
138 char *message_contents = VARDATA_ANY(message);
139 int message_size = VARSIZE_ANY_EXHDR(message);
141 int32 nworkers = PG_GETARG_INT32(3);
142 bool verify = PG_GETARG_BOOL(4);
143 int32 send_count = 0;
145 dsm_segment *seg;
148 shm_mq_result res;
149 Size len;
150 void *data;
151
152 /* A negative loopcount is nonsensical. */
153 if (loop_count < 0)
156 errmsg("repeat count size must be an integer value greater than or equal to zero")));
157
158 /*
159 * Using the nonblocking interfaces, we can even send data to ourselves,
160 * so the minimum number of workers for this test is zero.
161 */
162 if (nworkers < 0)
165 errmsg("number of workers must be an integer value greater than or equal to zero")));
166
167 /* Set up dynamic shared memory segment and background workers. */
168 test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
169
170 /* Main loop. */
171 for (;;)
172 {
173 bool wait = true;
174
175 /*
176 * If we haven't yet sent the message the requisite number of times,
177 * try again to send it now. Note that when shm_mq_send() returns
178 * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
179 * same message size and contents; that's not an issue here because
180 * we're sending the same message every time.
181 */
183 {
184 res = shm_mq_send(outqh, message_size, message_contents, true,
185 true);
186 if (res == SHM_MQ_SUCCESS)
187 {
188 ++send_count;
189 wait = false;
190 }
191 else if (res == SHM_MQ_DETACHED)
194 errmsg("could not send message")));
195 }
196
197 /*
198 * If we haven't yet received the message the requisite number of
199 * times, try to receive it again now.
200 */
202 {
203 res = shm_mq_receive(inqh, &len, &data, true);
204 if (res == SHM_MQ_SUCCESS)
205 {
207 /* Verifying every time is slow, so it's optional. */
208 if (verify)
209 verify_message(message_size, message_contents, len, data);
210 wait = false;
211 }
212 else if (res == SHM_MQ_DETACHED)
215 errmsg("could not receive message")));
216 }
217 else
218 {
219 /*
220 * Otherwise, we've received the message enough times. This
221 * shouldn't happen unless we've also sent it enough times.
222 */
226 errmsg("message sent %d times, but received %d times",
228 break;
229 }
230
231 if (wait)
232 {
233 /* first time, allocate or get the custom wait event */
234 if (we_message_queue == 0)
235 we_message_queue = WaitEventExtensionNew("TestShmMqMessageQueue");
236
237 /*
238 * If we made no progress, wait for one of the other processes to
239 * which we are connected to set our latch, indicating that they
240 * have read or written data and therefore there may now be work
241 * for us to do.
242 */
247 }
248 }
249
250 /* Clean up. */
251 dsm_detach(seg);
252
254}
255
256/*
257 * Verify that two messages are the same.
258 */
259static void
261{
262 Size i;
263
264 if (origlen != newlen)
266 (errmsg("message corrupted"),
267 errdetail("The original message was %zu bytes but the final message is %zu bytes.",
268 origlen, newlen)));
269
270 for (i = 0; i < origlen; ++i)
271 if (origdata[i] != newdata[i])
273 (errmsg("message corrupted"),
274 errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
275}
int64_t int64
Definition c.h:615
int32_t int32
Definition c.h:614
uint32_t uint32
Definition c.h:618
size_t Size
Definition c.h:691
void dsm_detach(dsm_segment *seg)
Definition dsm.c:803
int errcode(int sqlerrcode)
Definition elog.c:874
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define ERROR
Definition elog.h:39
#define ereport(elevel,...)
Definition elog.h:150
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_GETARG_INT64(n)
Definition fmgr.h:284
#define PG_FUNCTION_INFO_V1(funcname)
Definition fmgr.h:417
#define PG_GETARG_INT32(n)
Definition fmgr.h:269
#define PG_GETARG_BOOL(n)
Definition fmgr.h:274
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
struct Latch * MyLatch
Definition globals.c:63
int i
Definition isn.c:77
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
static char * errmsg
const void size_t len
const void * data
uint64_t Datum
Definition postgres.h:70
static int fb(int x)
void test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, shm_mq_handle **output, shm_mq_handle **input)
Definition setup.c:52
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition shm_mq.c:574
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition shm_mq.c:331
shm_mq_result
Definition shm_mq.h:39
@ SHM_MQ_SUCCESS
Definition shm_mq.h:40
@ SHM_MQ_DETACHED
Definition shm_mq.h:42
Definition c.h:778
Datum test_shm_mq_pipelined(PG_FUNCTION_ARGS)
Definition test.c:134
static void verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
Definition test.c:260
PG_MODULE_MAGIC
Definition test.c:25
Datum test_shm_mq(PG_FUNCTION_ARGS)
Definition test.c:45
static uint32 we_message_queue
Definition test.c:34
static Size VARSIZE_ANY_EXHDR(const void *PTR)
Definition varatt.h:472
static char * VARDATA_ANY(const void *PTR)
Definition varatt.h:486
uint32 WaitEventExtensionNew(const char *wait_event_name)
Definition wait_event.c:163
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET