PostgreSQL Source Code git master
Loading...
Searching...
No Matches
test.c File Reference
#include "postgres.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/proc.h"
#include "varatt.h"
#include "test_shm_mq.h"
Include dependency graph for test.c:

Go to the source code of this file.

Functions

 PG_FUNCTION_INFO_V1 (test_shm_mq)
 
 PG_FUNCTION_INFO_V1 (test_shm_mq_pipelined)
 
static void verify_message (Size origlen, char *origdata, Size newlen, char *newdata)
 
Datum test_shm_mq (PG_FUNCTION_ARGS)
 
Datum test_shm_mq_pipelined (PG_FUNCTION_ARGS)
 

Variables

 PG_MODULE_MAGIC
 
static uint32 we_message_queue = 0
 

Function Documentation

◆ PG_FUNCTION_INFO_V1() [1/2]

PG_FUNCTION_INFO_V1 ( test_shm_mq  )

◆ PG_FUNCTION_INFO_V1() [2/2]

PG_FUNCTION_INFO_V1 ( test_shm_mq_pipelined  )

◆ test_shm_mq()

Datum test_shm_mq ( PG_FUNCTION_ARGS  )

Definition at line 44 of file test.c.

45{
46 int64 queue_size = PG_GETARG_INT64(0);
47 text *message = PG_GETARG_TEXT_PP(1);
48 char *message_contents = VARDATA_ANY(message);
49 int message_size = VARSIZE_ANY_EXHDR(message);
51 int32 nworkers = PG_GETARG_INT32(3);
52 dsm_segment *seg;
55 shm_mq_result res;
56 Size len;
57 void *data;
58
59 /* A negative loopcount is nonsensical. */
60 if (loop_count < 0)
63 errmsg("repeat count size must be an integer value greater than or equal to zero")));
64
65 /*
66 * Since this test sends data using the blocking interfaces, it cannot
67 * send data to itself. Therefore, a minimum of 1 worker is required. Of
68 * course, a negative worker count is nonsensical.
69 */
70 if (nworkers <= 0)
73 errmsg("number of workers must be an integer value greater than zero")));
74
75 /* Set up dynamic shared memory segment and background workers. */
76 test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
77
78 /* Send the initial message. */
79 res = shm_mq_send(outqh, message_size, message_contents, false, true);
80 if (res != SHM_MQ_SUCCESS)
83 errmsg("could not send message")));
84
85 /*
86 * Receive a message and send it back out again. Do this a number of
87 * times equal to the loop count.
88 */
89 for (;;)
90 {
91 /* Receive a message. */
92 res = shm_mq_receive(inqh, &len, &data, false);
93 if (res != SHM_MQ_SUCCESS)
96 errmsg("could not receive message")));
97
98 /* If this is supposed to be the last iteration, stop here. */
99 if (--loop_count <= 0)
100 break;
101
102 /* Send it back out. */
103 res = shm_mq_send(outqh, len, data, false, true);
104 if (res != SHM_MQ_SUCCESS)
107 errmsg("could not send message")));
108 }
109
110 /*
111 * Finally, check that we got back the same message from the last
112 * iteration that we originally sent.
113 */
114 verify_message(message_size, message_contents, len, data);
115
116 /* Clean up. */
117 dsm_detach(seg);
118
120}
int64_t int64
Definition c.h:576
int32_t int32
Definition c.h:575
size_t Size
Definition c.h:652
void dsm_detach(dsm_segment *seg)
Definition dsm.c:803
int errcode(int sqlerrcode)
Definition elog.c:874
int errmsg(const char *fmt,...)
Definition elog.c:1093
#define ERROR
Definition elog.h:39
#define ereport(elevel,...)
Definition elog.h:150
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_GETARG_INT64(n)
Definition fmgr.h:284
#define PG_GETARG_INT32(n)
Definition fmgr.h:269
const void size_t len
const void * data
static int fb(int x)
void test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, shm_mq_handle **output, shm_mq_handle **input)
Definition setup.c:51
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition shm_mq.c:573
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition shm_mq.c:330
shm_mq_result
Definition shm_mq.h:39
@ SHM_MQ_SUCCESS
Definition shm_mq.h:40
Definition c.h:739
static void verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
Definition test.c:259
static Size VARSIZE_ANY_EXHDR(const void *PTR)
Definition varatt.h:472
static char * VARDATA_ANY(const void *PTR)
Definition varatt.h:486

References data, dsm_detach(), ereport, errcode(), errmsg(), ERROR, fb(), len, PG_GETARG_INT32, PG_GETARG_INT64, PG_GETARG_TEXT_PP, PG_RETURN_VOID, shm_mq_receive(), shm_mq_send(), SHM_MQ_SUCCESS, test_shm_mq_setup(), VARDATA_ANY(), VARSIZE_ANY_EXHDR(), and verify_message().

◆ test_shm_mq_pipelined()

Datum test_shm_mq_pipelined ( PG_FUNCTION_ARGS  )

Definition at line 133 of file test.c.

134{
135 int64 queue_size = PG_GETARG_INT64(0);
136 text *message = PG_GETARG_TEXT_PP(1);
137 char *message_contents = VARDATA_ANY(message);
138 int message_size = VARSIZE_ANY_EXHDR(message);
140 int32 nworkers = PG_GETARG_INT32(3);
141 bool verify = PG_GETARG_BOOL(4);
142 int32 send_count = 0;
144 dsm_segment *seg;
147 shm_mq_result res;
148 Size len;
149 void *data;
150
151 /* A negative loopcount is nonsensical. */
152 if (loop_count < 0)
155 errmsg("repeat count size must be an integer value greater than or equal to zero")));
156
157 /*
158 * Using the nonblocking interfaces, we can even send data to ourselves,
159 * so the minimum number of workers for this test is zero.
160 */
161 if (nworkers < 0)
164 errmsg("number of workers must be an integer value greater than or equal to zero")));
165
166 /* Set up dynamic shared memory segment and background workers. */
167 test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
168
169 /* Main loop. */
170 for (;;)
171 {
172 bool wait = true;
173
174 /*
175 * If we haven't yet sent the message the requisite number of times,
176 * try again to send it now. Note that when shm_mq_send() returns
177 * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
178 * same message size and contents; that's not an issue here because
179 * we're sending the same message every time.
180 */
182 {
183 res = shm_mq_send(outqh, message_size, message_contents, true,
184 true);
185 if (res == SHM_MQ_SUCCESS)
186 {
187 ++send_count;
188 wait = false;
189 }
190 else if (res == SHM_MQ_DETACHED)
193 errmsg("could not send message")));
194 }
195
196 /*
197 * If we haven't yet received the message the requisite number of
198 * times, try to receive it again now.
199 */
201 {
202 res = shm_mq_receive(inqh, &len, &data, true);
203 if (res == SHM_MQ_SUCCESS)
204 {
206 /* Verifying every time is slow, so it's optional. */
207 if (verify)
208 verify_message(message_size, message_contents, len, data);
209 wait = false;
210 }
211 else if (res == SHM_MQ_DETACHED)
214 errmsg("could not receive message")));
215 }
216 else
217 {
218 /*
219 * Otherwise, we've received the message enough times. This
220 * shouldn't happen unless we've also sent it enough times.
221 */
225 errmsg("message sent %d times, but received %d times",
227 break;
228 }
229
230 if (wait)
231 {
232 /* first time, allocate or get the custom wait event */
233 if (we_message_queue == 0)
234 we_message_queue = WaitEventExtensionNew("TestShmMqMessageQueue");
235
236 /*
237 * If we made no progress, wait for one of the other processes to
238 * which we are connected to set our latch, indicating that they
239 * have read or written data and therefore there may now be work
240 * for us to do.
241 */
246 }
247 }
248
249 /* Clean up. */
250 dsm_detach(seg);
251
253}
#define PG_GETARG_BOOL(n)
Definition fmgr.h:274
struct Latch * MyLatch
Definition globals.c:63
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
@ SHM_MQ_DETACHED
Definition shm_mq.h:42
static uint32 we_message_queue
Definition test.c:33
uint32 WaitEventExtensionNew(const char *wait_event_name)
Definition wait_event.c:163
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET

References CHECK_FOR_INTERRUPTS, data, dsm_detach(), ereport, errcode(), errmsg(), ERROR, fb(), len, MyLatch, PG_GETARG_BOOL, PG_GETARG_INT32, PG_GETARG_INT64, PG_GETARG_TEXT_PP, PG_RETURN_VOID, ResetLatch(), SHM_MQ_DETACHED, shm_mq_receive(), shm_mq_send(), SHM_MQ_SUCCESS, test_shm_mq_setup(), VARDATA_ANY(), VARSIZE_ANY_EXHDR(), verify_message(), WaitEventExtensionNew(), WaitLatch(), we_message_queue, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

◆ verify_message()

static void verify_message ( Size  origlen,
char origdata,
Size  newlen,
char newdata 
)
static

Definition at line 259 of file test.c.

260{
261 Size i;
262
263 if (origlen != newlen)
265 (errmsg("message corrupted"),
266 errdetail("The original message was %zu bytes but the final message is %zu bytes.",
267 origlen, newlen)));
268
269 for (i = 0; i < origlen; ++i)
270 if (origdata[i] != newdata[i])
272 (errmsg("message corrupted"),
273 errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
274}
int errdetail(const char *fmt,...) pg_attribute_printf(1
int i
Definition isn.c:77

References ereport, errdetail(), errmsg(), ERROR, fb(), and i.

Referenced by test_shm_mq(), and test_shm_mq_pipelined().

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 24 of file test.c.

◆ we_message_queue

uint32 we_message_queue = 0
static

Definition at line 33 of file test.c.

Referenced by test_shm_mq_pipelined().