PostgreSQL Source Code  git master
test.c
Go to the documentation of this file.
1 /*--------------------------------------------------------------------------
2  *
3  * test.c
4  * Test harness code for shared memory message queues.
5  *
6  * Copyright (c) 2013-2022, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/test/modules/test_shm_mq/test.c
10  *
11  * -------------------------------------------------------------------------
12  */
13 
14 #include "postgres.h"
15 
16 #include "fmgr.h"
17 #include "miscadmin.h"
18 #include "pgstat.h"
19 
20 #include "test_shm_mq.h"
21 
23 
26 
27 static void verify_message(Size origlen, char *origdata, Size newlen,
28  char *newdata);
29 
30 /*
31  * Simple test of the shared memory message queue infrastructure.
32  *
33  * We set up a ring of message queues passing through 1 or more background
34  * processes and eventually looping back to ourselves. We then send a message
35  * through the ring a number of times indicated by the loop count. At the end,
36  * we check whether the final message matches the one we started with.
37  */
38 Datum
40 {
41  int64 queue_size = PG_GETARG_INT64(0);
42  text *message = PG_GETARG_TEXT_PP(1);
43  char *message_contents = VARDATA_ANY(message);
44  int message_size = VARSIZE_ANY_EXHDR(message);
45  int32 loop_count = PG_GETARG_INT32(2);
46  int32 nworkers = PG_GETARG_INT32(3);
47  dsm_segment *seg;
48  shm_mq_handle *outqh;
49  shm_mq_handle *inqh;
51  Size len;
52  void *data;
53 
54  /* A negative loopcount is nonsensical. */
55  if (loop_count < 0)
56  ereport(ERROR,
57  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
58  errmsg("repeat count size must be an integer value greater than or equal to zero")));
59 
60  /*
61  * Since this test sends data using the blocking interfaces, it cannot
62  * send data to itself. Therefore, a minimum of 1 worker is required. Of
63  * course, a negative worker count is nonsensical.
64  */
65  if (nworkers <= 0)
66  ereport(ERROR,
67  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
68  errmsg("number of workers must be an integer value greater than zero")));
69 
70  /* Set up dynamic shared memory segment and background workers. */
71  test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
72 
73  /* Send the initial message. */
74  res = shm_mq_send(outqh, message_size, message_contents, false, true);
75  if (res != SHM_MQ_SUCCESS)
76  ereport(ERROR,
77  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
78  errmsg("could not send message")));
79 
80  /*
81  * Receive a message and send it back out again. Do this a number of
82  * times equal to the loop count.
83  */
84  for (;;)
85  {
86  /* Receive a message. */
87  res = shm_mq_receive(inqh, &len, &data, false);
88  if (res != SHM_MQ_SUCCESS)
89  ereport(ERROR,
90  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
91  errmsg("could not receive message")));
92 
93  /* If this is supposed to be the last iteration, stop here. */
94  if (--loop_count <= 0)
95  break;
96 
97  /* Send it back out. */
98  res = shm_mq_send(outqh, len, data, false, true);
99  if (res != SHM_MQ_SUCCESS)
100  ereport(ERROR,
101  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
102  errmsg("could not send message")));
103  }
104 
105  /*
106  * Finally, check that we got back the same message from the last
107  * iteration that we originally sent.
108  */
109  verify_message(message_size, message_contents, len, data);
110 
111  /* Clean up. */
112  dsm_detach(seg);
113 
114  PG_RETURN_VOID();
115 }
116 
117 /*
118  * Pipelined test of the shared memory message queue infrastructure.
119  *
120  * As in the basic test, we set up a ring of message queues passing through
121  * 1 or more background processes and eventually looping back to ourselves.
122  * Then, we send N copies of the user-specified message through the ring and
123  * receive them all back. Since this might fill up all message queues in the
124  * ring and then stall, we must be prepared to begin receiving the messages
125  * back before we've finished sending them.
126  */
127 Datum
129 {
130  int64 queue_size = PG_GETARG_INT64(0);
131  text *message = PG_GETARG_TEXT_PP(1);
132  char *message_contents = VARDATA_ANY(message);
133  int message_size = VARSIZE_ANY_EXHDR(message);
134  int32 loop_count = PG_GETARG_INT32(2);
135  int32 nworkers = PG_GETARG_INT32(3);
136  bool verify = PG_GETARG_BOOL(4);
137  int32 send_count = 0;
138  int32 receive_count = 0;
139  dsm_segment *seg;
140  shm_mq_handle *outqh;
141  shm_mq_handle *inqh;
143  Size len;
144  void *data;
145 
146  /* A negative loopcount is nonsensical. */
147  if (loop_count < 0)
148  ereport(ERROR,
149  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
150  errmsg("repeat count size must be an integer value greater than or equal to zero")));
151 
152  /*
153  * Using the nonblocking interfaces, we can even send data to ourselves,
154  * so the minimum number of workers for this test is zero.
155  */
156  if (nworkers < 0)
157  ereport(ERROR,
158  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
159  errmsg("number of workers must be an integer value greater than or equal to zero")));
160 
161  /* Set up dynamic shared memory segment and background workers. */
162  test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
163 
164  /* Main loop. */
165  for (;;)
166  {
167  bool wait = true;
168 
169  /*
170  * If we haven't yet sent the message the requisite number of times,
171  * try again to send it now. Note that when shm_mq_send() returns
172  * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
173  * same message size and contents; that's not an issue here because
174  * we're sending the same message every time.
175  */
176  if (send_count < loop_count)
177  {
178  res = shm_mq_send(outqh, message_size, message_contents, true,
179  true);
180  if (res == SHM_MQ_SUCCESS)
181  {
182  ++send_count;
183  wait = false;
184  }
185  else if (res == SHM_MQ_DETACHED)
186  ereport(ERROR,
187  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
188  errmsg("could not send message")));
189  }
190 
191  /*
192  * If we haven't yet received the message the requisite number of
193  * times, try to receive it again now.
194  */
195  if (receive_count < loop_count)
196  {
197  res = shm_mq_receive(inqh, &len, &data, true);
198  if (res == SHM_MQ_SUCCESS)
199  {
200  ++receive_count;
201  /* Verifying every time is slow, so it's optional. */
202  if (verify)
203  verify_message(message_size, message_contents, len, data);
204  wait = false;
205  }
206  else if (res == SHM_MQ_DETACHED)
207  ereport(ERROR,
208  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
209  errmsg("could not receive message")));
210  }
211  else
212  {
213  /*
214  * Otherwise, we've received the message enough times. This
215  * shouldn't happen unless we've also sent it enough times.
216  */
217  if (send_count != receive_count)
218  ereport(ERROR,
219  (errcode(ERRCODE_INTERNAL_ERROR),
220  errmsg("message sent %d times, but received %d times",
221  send_count, receive_count)));
222  break;
223  }
224 
225  if (wait)
226  {
227  /*
228  * If we made no progress, wait for one of the other processes to
229  * which we are connected to set our latch, indicating that they
230  * have read or written data and therefore there may now be work
231  * for us to do.
232  */
237  }
238  }
239 
240  /* Clean up. */
241  dsm_detach(seg);
242 
243  PG_RETURN_VOID();
244 }
245 
246 /*
247  * Verify that two messages are the same.
248  */
249 static void
250 verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
251 {
252  Size i;
253 
254  if (origlen != newlen)
255  ereport(ERROR,
256  (errmsg("message corrupted"),
257  errdetail("The original message was %zu bytes but the final message is %zu bytes.",
258  origlen, newlen)));
259 
260  for (i = 0; i < origlen; ++i)
261  if (origdata[i] != newdata[i])
262  ereport(ERROR,
263  (errmsg("message corrupted"),
264  errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
265 }
signed int int32
Definition: c.h:430
size_t Size
Definition: c.h:541
void dsm_detach(dsm_segment *seg)
Definition: dsm.c:777
int errdetail(const char *fmt,...)
Definition: elog.c:1039
int errcode(int sqlerrcode)
Definition: elog.c:695
int errmsg(const char *fmt,...)
Definition: elog.c:906
#define ERROR
Definition: elog.h:35
#define ereport(elevel,...)
Definition: elog.h:145
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_GETARG_INT64(n)
Definition: fmgr.h:283
#define PG_GETARG_INT32(n)
Definition: fmgr.h:269
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
struct Latch * MyLatch
Definition: globals.c:58
int i
Definition: isn.c:73
void ResetLatch(Latch *latch)
Definition: latch.c:683
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:476
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
const void size_t len
const void * data
uintptr_t Datum
Definition: postgres.h:412
#define VARDATA_ANY(PTR)
Definition: postgres.h:362
#define VARSIZE_ANY_EXHDR(PTR)
Definition: postgres.h:355
void test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, shm_mq_handle **output, shm_mq_handle **input)
Definition: setup.c:48
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:573
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition: shm_mq.c:330
shm_mq_result
Definition: shm_mq.h:37
@ SHM_MQ_SUCCESS
Definition: shm_mq.h:38
@ SHM_MQ_DETACHED
Definition: shm_mq.h:40
Definition: c.h:623
Datum test_shm_mq_pipelined(PG_FUNCTION_ARGS)
Definition: test.c:128
static void verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
Definition: test.c:250
PG_MODULE_MAGIC
Definition: test.c:22
PG_FUNCTION_INFO_V1(test_shm_mq)
Datum test_shm_mq(PG_FUNCTION_ARGS)
Definition: test.c:39
#define PG_WAIT_EXTENSION
Definition: wait_event.h:23