PostgreSQL Source Code  git master
test.c
Go to the documentation of this file.
1 /*--------------------------------------------------------------------------
2  *
3  * test.c
4  * Test harness code for shared memory message queues.
5  *
6  * Copyright (c) 2013-2023, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/test/modules/test_shm_mq/test.c
10  *
11  * -------------------------------------------------------------------------
12  */
13 
14 #include "postgres.h"
15 
16 #include "fmgr.h"
17 #include "miscadmin.h"
18 #include "pgstat.h"
19 #include "varatt.h"
20 
21 #include "test_shm_mq.h"
22 
24 
27 
28 static void verify_message(Size origlen, char *origdata, Size newlen,
29  char *newdata);
30 
31 /*
32  * Simple test of the shared memory message queue infrastructure.
33  *
34  * We set up a ring of message queues passing through 1 or more background
35  * processes and eventually looping back to ourselves. We then send a message
36  * through the ring a number of times indicated by the loop count. At the end,
37  * we check whether the final message matches the one we started with.
38  */
39 Datum
41 {
42  int64 queue_size = PG_GETARG_INT64(0);
43  text *message = PG_GETARG_TEXT_PP(1);
44  char *message_contents = VARDATA_ANY(message);
45  int message_size = VARSIZE_ANY_EXHDR(message);
46  int32 loop_count = PG_GETARG_INT32(2);
47  int32 nworkers = PG_GETARG_INT32(3);
48  dsm_segment *seg;
49  shm_mq_handle *outqh;
50  shm_mq_handle *inqh;
52  Size len;
53  void *data;
54 
55  /* A negative loopcount is nonsensical. */
56  if (loop_count < 0)
57  ereport(ERROR,
58  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
59  errmsg("repeat count size must be an integer value greater than or equal to zero")));
60 
61  /*
62  * Since this test sends data using the blocking interfaces, it cannot
63  * send data to itself. Therefore, a minimum of 1 worker is required. Of
64  * course, a negative worker count is nonsensical.
65  */
66  if (nworkers <= 0)
67  ereport(ERROR,
68  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
69  errmsg("number of workers must be an integer value greater than zero")));
70 
71  /* Set up dynamic shared memory segment and background workers. */
72  test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
73 
74  /* Send the initial message. */
75  res = shm_mq_send(outqh, message_size, message_contents, false, true);
76  if (res != SHM_MQ_SUCCESS)
77  ereport(ERROR,
78  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
79  errmsg("could not send message")));
80 
81  /*
82  * Receive a message and send it back out again. Do this a number of
83  * times equal to the loop count.
84  */
85  for (;;)
86  {
87  /* Receive a message. */
88  res = shm_mq_receive(inqh, &len, &data, false);
89  if (res != SHM_MQ_SUCCESS)
90  ereport(ERROR,
91  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
92  errmsg("could not receive message")));
93 
94  /* If this is supposed to be the last iteration, stop here. */
95  if (--loop_count <= 0)
96  break;
97 
98  /* Send it back out. */
99  res = shm_mq_send(outqh, len, data, false, true);
100  if (res != SHM_MQ_SUCCESS)
101  ereport(ERROR,
102  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
103  errmsg("could not send message")));
104  }
105 
106  /*
107  * Finally, check that we got back the same message from the last
108  * iteration that we originally sent.
109  */
110  verify_message(message_size, message_contents, len, data);
111 
112  /* Clean up. */
113  dsm_detach(seg);
114 
115  PG_RETURN_VOID();
116 }
117 
118 /*
119  * Pipelined test of the shared memory message queue infrastructure.
120  *
121  * As in the basic test, we set up a ring of message queues passing through
122  * 1 or more background processes and eventually looping back to ourselves.
123  * Then, we send N copies of the user-specified message through the ring and
124  * receive them all back. Since this might fill up all message queues in the
125  * ring and then stall, we must be prepared to begin receiving the messages
126  * back before we've finished sending them.
127  */
128 Datum
130 {
131  int64 queue_size = PG_GETARG_INT64(0);
132  text *message = PG_GETARG_TEXT_PP(1);
133  char *message_contents = VARDATA_ANY(message);
134  int message_size = VARSIZE_ANY_EXHDR(message);
135  int32 loop_count = PG_GETARG_INT32(2);
136  int32 nworkers = PG_GETARG_INT32(3);
137  bool verify = PG_GETARG_BOOL(4);
138  int32 send_count = 0;
139  int32 receive_count = 0;
140  dsm_segment *seg;
141  shm_mq_handle *outqh;
142  shm_mq_handle *inqh;
144  Size len;
145  void *data;
146 
147  /* A negative loopcount is nonsensical. */
148  if (loop_count < 0)
149  ereport(ERROR,
150  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
151  errmsg("repeat count size must be an integer value greater than or equal to zero")));
152 
153  /*
154  * Using the nonblocking interfaces, we can even send data to ourselves,
155  * so the minimum number of workers for this test is zero.
156  */
157  if (nworkers < 0)
158  ereport(ERROR,
159  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
160  errmsg("number of workers must be an integer value greater than or equal to zero")));
161 
162  /* Set up dynamic shared memory segment and background workers. */
163  test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
164 
165  /* Main loop. */
166  for (;;)
167  {
168  bool wait = true;
169 
170  /*
171  * If we haven't yet sent the message the requisite number of times,
172  * try again to send it now. Note that when shm_mq_send() returns
173  * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
174  * same message size and contents; that's not an issue here because
175  * we're sending the same message every time.
176  */
177  if (send_count < loop_count)
178  {
179  res = shm_mq_send(outqh, message_size, message_contents, true,
180  true);
181  if (res == SHM_MQ_SUCCESS)
182  {
183  ++send_count;
184  wait = false;
185  }
186  else if (res == SHM_MQ_DETACHED)
187  ereport(ERROR,
188  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
189  errmsg("could not send message")));
190  }
191 
192  /*
193  * If we haven't yet received the message the requisite number of
194  * times, try to receive it again now.
195  */
196  if (receive_count < loop_count)
197  {
198  res = shm_mq_receive(inqh, &len, &data, true);
199  if (res == SHM_MQ_SUCCESS)
200  {
201  ++receive_count;
202  /* Verifying every time is slow, so it's optional. */
203  if (verify)
204  verify_message(message_size, message_contents, len, data);
205  wait = false;
206  }
207  else if (res == SHM_MQ_DETACHED)
208  ereport(ERROR,
209  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
210  errmsg("could not receive message")));
211  }
212  else
213  {
214  /*
215  * Otherwise, we've received the message enough times. This
216  * shouldn't happen unless we've also sent it enough times.
217  */
218  if (send_count != receive_count)
219  ereport(ERROR,
220  (errcode(ERRCODE_INTERNAL_ERROR),
221  errmsg("message sent %d times, but received %d times",
222  send_count, receive_count)));
223  break;
224  }
225 
226  if (wait)
227  {
228  /*
229  * If we made no progress, wait for one of the other processes to
230  * which we are connected to set our latch, indicating that they
231  * have read or written data and therefore there may now be work
232  * for us to do.
233  */
238  }
239  }
240 
241  /* Clean up. */
242  dsm_detach(seg);
243 
244  PG_RETURN_VOID();
245 }
246 
247 /*
248  * Verify that two messages are the same.
249  */
250 static void
251 verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
252 {
253  Size i;
254 
255  if (origlen != newlen)
256  ereport(ERROR,
257  (errmsg("message corrupted"),
258  errdetail("The original message was %zu bytes but the final message is %zu bytes.",
259  origlen, newlen)));
260 
261  for (i = 0; i < origlen; ++i)
262  if (origdata[i] != newdata[i])
263  ereport(ERROR,
264  (errmsg("message corrupted"),
265  errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
266 }
signed int int32
Definition: c.h:478
size_t Size
Definition: c.h:589
void dsm_detach(dsm_segment *seg)
Definition: dsm.c:776
int errdetail(const char *fmt,...)
Definition: elog.c:1202
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_GETARG_INT64(n)
Definition: fmgr.h:283
#define PG_GETARG_INT32(n)
Definition: fmgr.h:269
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
struct Latch * MyLatch
Definition: globals.c:58
int i
Definition: isn.c:73
void ResetLatch(Latch *latch)
Definition: latch.c:699
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:492
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
const void size_t len
const void * data
uintptr_t Datum
Definition: postgres.h:64
void test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, shm_mq_handle **output, shm_mq_handle **input)
Definition: setup.c:48
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:573
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition: shm_mq.c:330
shm_mq_result
Definition: shm_mq.h:37
@ SHM_MQ_SUCCESS
Definition: shm_mq.h:38
@ SHM_MQ_DETACHED
Definition: shm_mq.h:40
Definition: c.h:671
Datum test_shm_mq_pipelined(PG_FUNCTION_ARGS)
Definition: test.c:129
static void verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
Definition: test.c:251
PG_MODULE_MAGIC
Definition: test.c:23
PG_FUNCTION_INFO_V1(test_shm_mq)
Datum test_shm_mq(PG_FUNCTION_ARGS)
Definition: test.c:40
#define VARDATA_ANY(PTR)
Definition: varatt.h:324
#define VARSIZE_ANY_EXHDR(PTR)
Definition: varatt.h:317
#define PG_WAIT_EXTENSION
Definition: wait_event.h:23