PostgreSQL Source Code  git master
test.c
Go to the documentation of this file.
1 /*--------------------------------------------------------------------------
2  *
3  * test.c
4  * Test harness code for shared memory message queues.
5  *
6  * Copyright (c) 2013-2024, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/test/modules/test_shm_mq/test.c
10  *
11  * -------------------------------------------------------------------------
12  */
13 
14 #include "postgres.h"
15 
16 #include "fmgr.h"
17 #include "miscadmin.h"
18 #include "pgstat.h"
19 #include "varatt.h"
20 
21 #include "test_shm_mq.h"
22 
24 
27 
28 static void verify_message(Size origlen, char *origdata, Size newlen,
29  char *newdata);
30 
31 /* value cached, fetched from shared memory */
33 
34 /*
35  * Simple test of the shared memory message queue infrastructure.
36  *
37  * We set up a ring of message queues passing through 1 or more background
38  * processes and eventually looping back to ourselves. We then send a message
39  * through the ring a number of times indicated by the loop count. At the end,
40  * we check whether the final message matches the one we started with.
41  */
42 Datum
44 {
45  int64 queue_size = PG_GETARG_INT64(0);
46  text *message = PG_GETARG_TEXT_PP(1);
47  char *message_contents = VARDATA_ANY(message);
48  int message_size = VARSIZE_ANY_EXHDR(message);
49  int32 loop_count = PG_GETARG_INT32(2);
50  int32 nworkers = PG_GETARG_INT32(3);
51  dsm_segment *seg;
52  shm_mq_handle *outqh;
53  shm_mq_handle *inqh;
55  Size len;
56  void *data;
57 
58  /* A negative loopcount is nonsensical. */
59  if (loop_count < 0)
60  ereport(ERROR,
61  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
62  errmsg("repeat count size must be an integer value greater than or equal to zero")));
63 
64  /*
65  * Since this test sends data using the blocking interfaces, it cannot
66  * send data to itself. Therefore, a minimum of 1 worker is required. Of
67  * course, a negative worker count is nonsensical.
68  */
69  if (nworkers <= 0)
70  ereport(ERROR,
71  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
72  errmsg("number of workers must be an integer value greater than zero")));
73 
74  /* Set up dynamic shared memory segment and background workers. */
75  test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
76 
77  /* Send the initial message. */
78  res = shm_mq_send(outqh, message_size, message_contents, false, true);
79  if (res != SHM_MQ_SUCCESS)
80  ereport(ERROR,
81  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
82  errmsg("could not send message")));
83 
84  /*
85  * Receive a message and send it back out again. Do this a number of
86  * times equal to the loop count.
87  */
88  for (;;)
89  {
90  /* Receive a message. */
91  res = shm_mq_receive(inqh, &len, &data, false);
92  if (res != SHM_MQ_SUCCESS)
93  ereport(ERROR,
94  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
95  errmsg("could not receive message")));
96 
97  /* If this is supposed to be the last iteration, stop here. */
98  if (--loop_count <= 0)
99  break;
100 
101  /* Send it back out. */
102  res = shm_mq_send(outqh, len, data, false, true);
103  if (res != SHM_MQ_SUCCESS)
104  ereport(ERROR,
105  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
106  errmsg("could not send message")));
107  }
108 
109  /*
110  * Finally, check that we got back the same message from the last
111  * iteration that we originally sent.
112  */
113  verify_message(message_size, message_contents, len, data);
114 
115  /* Clean up. */
116  dsm_detach(seg);
117 
118  PG_RETURN_VOID();
119 }
120 
121 /*
122  * Pipelined test of the shared memory message queue infrastructure.
123  *
124  * As in the basic test, we set up a ring of message queues passing through
125  * 1 or more background processes and eventually looping back to ourselves.
126  * Then, we send N copies of the user-specified message through the ring and
127  * receive them all back. Since this might fill up all message queues in the
128  * ring and then stall, we must be prepared to begin receiving the messages
129  * back before we've finished sending them.
130  */
131 Datum
133 {
134  int64 queue_size = PG_GETARG_INT64(0);
135  text *message = PG_GETARG_TEXT_PP(1);
136  char *message_contents = VARDATA_ANY(message);
137  int message_size = VARSIZE_ANY_EXHDR(message);
138  int32 loop_count = PG_GETARG_INT32(2);
139  int32 nworkers = PG_GETARG_INT32(3);
140  bool verify = PG_GETARG_BOOL(4);
141  int32 send_count = 0;
142  int32 receive_count = 0;
143  dsm_segment *seg;
144  shm_mq_handle *outqh;
145  shm_mq_handle *inqh;
147  Size len;
148  void *data;
149 
150  /* A negative loopcount is nonsensical. */
151  if (loop_count < 0)
152  ereport(ERROR,
153  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
154  errmsg("repeat count size must be an integer value greater than or equal to zero")));
155 
156  /*
157  * Using the nonblocking interfaces, we can even send data to ourselves,
158  * so the minimum number of workers for this test is zero.
159  */
160  if (nworkers < 0)
161  ereport(ERROR,
162  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
163  errmsg("number of workers must be an integer value greater than or equal to zero")));
164 
165  /* Set up dynamic shared memory segment and background workers. */
166  test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
167 
168  /* Main loop. */
169  for (;;)
170  {
171  bool wait = true;
172 
173  /*
174  * If we haven't yet sent the message the requisite number of times,
175  * try again to send it now. Note that when shm_mq_send() returns
176  * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
177  * same message size and contents; that's not an issue here because
178  * we're sending the same message every time.
179  */
180  if (send_count < loop_count)
181  {
182  res = shm_mq_send(outqh, message_size, message_contents, true,
183  true);
184  if (res == SHM_MQ_SUCCESS)
185  {
186  ++send_count;
187  wait = false;
188  }
189  else if (res == SHM_MQ_DETACHED)
190  ereport(ERROR,
191  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
192  errmsg("could not send message")));
193  }
194 
195  /*
196  * If we haven't yet received the message the requisite number of
197  * times, try to receive it again now.
198  */
199  if (receive_count < loop_count)
200  {
201  res = shm_mq_receive(inqh, &len, &data, true);
202  if (res == SHM_MQ_SUCCESS)
203  {
204  ++receive_count;
205  /* Verifying every time is slow, so it's optional. */
206  if (verify)
207  verify_message(message_size, message_contents, len, data);
208  wait = false;
209  }
210  else if (res == SHM_MQ_DETACHED)
211  ereport(ERROR,
212  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
213  errmsg("could not receive message")));
214  }
215  else
216  {
217  /*
218  * Otherwise, we've received the message enough times. This
219  * shouldn't happen unless we've also sent it enough times.
220  */
221  if (send_count != receive_count)
222  ereport(ERROR,
223  (errcode(ERRCODE_INTERNAL_ERROR),
224  errmsg("message sent %d times, but received %d times",
225  send_count, receive_count)));
226  break;
227  }
228 
229  if (wait)
230  {
231  /* first time, allocate or get the custom wait event */
232  if (we_message_queue == 0)
233  we_message_queue = WaitEventExtensionNew("TestShmMqMessageQueue");
234 
235  /*
236  * If we made no progress, wait for one of the other processes to
237  * which we are connected to set our latch, indicating that they
238  * have read or written data and therefore there may now be work
239  * for us to do.
240  */
245  }
246  }
247 
248  /* Clean up. */
249  dsm_detach(seg);
250 
251  PG_RETURN_VOID();
252 }
253 
254 /*
255  * Verify that two messages are the same.
256  */
257 static void
258 verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
259 {
260  Size i;
261 
262  if (origlen != newlen)
263  ereport(ERROR,
264  (errmsg("message corrupted"),
265  errdetail("The original message was %zu bytes but the final message is %zu bytes.",
266  origlen, newlen)));
267 
268  for (i = 0; i < origlen; ++i)
269  if (origdata[i] != newdata[i])
270  ereport(ERROR,
271  (errmsg("message corrupted"),
272  errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
273 }
unsigned int uint32
Definition: c.h:492
signed int int32
Definition: c.h:482
size_t Size
Definition: c.h:584
void dsm_detach(dsm_segment *seg)
Definition: dsm.c:803
int errdetail(const char *fmt,...)
Definition: elog.c:1203
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_GETARG_INT64(n)
Definition: fmgr.h:283
#define PG_GETARG_INT32(n)
Definition: fmgr.h:269
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
struct Latch * MyLatch
Definition: globals.c:62
int i
Definition: isn.c:72
void ResetLatch(Latch *latch)
Definition: latch.c:724
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:517
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
const void size_t len
const void * data
uintptr_t Datum
Definition: postgres.h:64
void test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, shm_mq_handle **output, shm_mq_handle **input)
Definition: setup.c:50
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:572
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition: shm_mq.c:329
shm_mq_result
Definition: shm_mq.h:37
@ SHM_MQ_SUCCESS
Definition: shm_mq.h:38
@ SHM_MQ_DETACHED
Definition: shm_mq.h:40
Definition: c.h:666
Datum test_shm_mq_pipelined(PG_FUNCTION_ARGS)
Definition: test.c:132
static void verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
Definition: test.c:258
PG_MODULE_MAGIC
Definition: test.c:23
PG_FUNCTION_INFO_V1(test_shm_mq)
Datum test_shm_mq(PG_FUNCTION_ARGS)
Definition: test.c:43
static uint32 we_message_queue
Definition: test.c:32
#define VARDATA_ANY(PTR)
Definition: varatt.h:324
#define VARSIZE_ANY_EXHDR(PTR)
Definition: varatt.h:317
uint32 WaitEventExtensionNew(const char *wait_event_name)
Definition: wait_event.c:163