PostgreSQL Source Code git master
Loading...
Searching...
No Matches
test.c File Reference
#include "postgres.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/proc.h"
#include "utils/wait_event.h"
#include "varatt.h"
#include "test_shm_mq.h"
Include dependency graph for test.c:

Go to the source code of this file.

Functions

 PG_FUNCTION_INFO_V1 (test_shm_mq)
 
 PG_FUNCTION_INFO_V1 (test_shm_mq_pipelined)
 
static void verify_message (Size origlen, char *origdata, Size newlen, char *newdata)
 
Datum test_shm_mq (PG_FUNCTION_ARGS)
 
Datum test_shm_mq_pipelined (PG_FUNCTION_ARGS)
 

Variables

 PG_MODULE_MAGIC
 
static uint32 we_message_queue = 0
 

Function Documentation

◆ PG_FUNCTION_INFO_V1() [1/2]

PG_FUNCTION_INFO_V1 ( test_shm_mq  )

◆ PG_FUNCTION_INFO_V1() [2/2]

PG_FUNCTION_INFO_V1 ( test_shm_mq_pipelined  )

◆ test_shm_mq()

Datum test_shm_mq ( PG_FUNCTION_ARGS  )

Definition at line 45 of file test.c.

46{
47 int64 queue_size = PG_GETARG_INT64(0);
48 text *message = PG_GETARG_TEXT_PP(1);
49 char *message_contents = VARDATA_ANY(message);
50 int message_size = VARSIZE_ANY_EXHDR(message);
52 int32 nworkers = PG_GETARG_INT32(3);
53 dsm_segment *seg;
56 shm_mq_result res;
57 Size len;
58 void *data;
59
60 /* A negative loopcount is nonsensical. */
61 if (loop_count < 0)
64 errmsg("repeat count size must be an integer value greater than or equal to zero")));
65
66 /*
67 * Since this test sends data using the blocking interfaces, it cannot
68 * send data to itself. Therefore, a minimum of 1 worker is required. Of
69 * course, a negative worker count is nonsensical.
70 */
71 if (nworkers <= 0)
74 errmsg("number of workers must be an integer value greater than zero")));
75
76 /* Set up dynamic shared memory segment and background workers. */
77 test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
78
79 /* Send the initial message. */
80 res = shm_mq_send(outqh, message_size, message_contents, false, true);
81 if (res != SHM_MQ_SUCCESS)
84 errmsg("could not send message")));
85
86 /*
87 * Receive a message and send it back out again. Do this a number of
88 * times equal to the loop count.
89 */
90 for (;;)
91 {
92 /* Receive a message. */
93 res = shm_mq_receive(inqh, &len, &data, false);
94 if (res != SHM_MQ_SUCCESS)
97 errmsg("could not receive message")));
98
99 /* If this is supposed to be the last iteration, stop here. */
100 if (--loop_count <= 0)
101 break;
102
103 /* Send it back out. */
104 res = shm_mq_send(outqh, len, data, false, true);
105 if (res != SHM_MQ_SUCCESS)
108 errmsg("could not send message")));
109 }
110
111 /*
112 * Finally, check that we got back the same message from the last
113 * iteration that we originally sent.
114 */
115 verify_message(message_size, message_contents, len, data);
116
117 /* Clean up. */
118 dsm_detach(seg);
119
121}
int64_t int64
Definition c.h:615
int32_t int32
Definition c.h:614
size_t Size
Definition c.h:691
void dsm_detach(dsm_segment *seg)
Definition dsm.c:803
int errcode(int sqlerrcode)
Definition elog.c:874
#define ERROR
Definition elog.h:39
#define ereport(elevel,...)
Definition elog.h:150
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_GETARG_INT64(n)
Definition fmgr.h:284
#define PG_GETARG_INT32(n)
Definition fmgr.h:269
static char * errmsg
const void size_t len
const void * data
static int fb(int x)
void test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, shm_mq_handle **output, shm_mq_handle **input)
Definition setup.c:52
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition shm_mq.c:574
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition shm_mq.c:331
shm_mq_result
Definition shm_mq.h:39
@ SHM_MQ_SUCCESS
Definition shm_mq.h:40
Definition c.h:778
static void verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
Definition test.c:260
static Size VARSIZE_ANY_EXHDR(const void *PTR)
Definition varatt.h:472
static char * VARDATA_ANY(const void *PTR)
Definition varatt.h:486

References data, dsm_detach(), ereport, errcode(), errmsg, ERROR, fb(), len, PG_GETARG_INT32, PG_GETARG_INT64, PG_GETARG_TEXT_PP, PG_RETURN_VOID, shm_mq_receive(), shm_mq_send(), SHM_MQ_SUCCESS, test_shm_mq_setup(), VARDATA_ANY(), VARSIZE_ANY_EXHDR(), and verify_message().

◆ test_shm_mq_pipelined()

Datum test_shm_mq_pipelined ( PG_FUNCTION_ARGS  )

Definition at line 134 of file test.c.

135{
136 int64 queue_size = PG_GETARG_INT64(0);
137 text *message = PG_GETARG_TEXT_PP(1);
138 char *message_contents = VARDATA_ANY(message);
139 int message_size = VARSIZE_ANY_EXHDR(message);
141 int32 nworkers = PG_GETARG_INT32(3);
142 bool verify = PG_GETARG_BOOL(4);
143 int32 send_count = 0;
145 dsm_segment *seg;
148 shm_mq_result res;
149 Size len;
150 void *data;
151
152 /* A negative loopcount is nonsensical. */
153 if (loop_count < 0)
156 errmsg("repeat count size must be an integer value greater than or equal to zero")));
157
158 /*
159 * Using the nonblocking interfaces, we can even send data to ourselves,
160 * so the minimum number of workers for this test is zero.
161 */
162 if (nworkers < 0)
165 errmsg("number of workers must be an integer value greater than or equal to zero")));
166
167 /* Set up dynamic shared memory segment and background workers. */
168 test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
169
170 /* Main loop. */
171 for (;;)
172 {
173 bool wait = true;
174
175 /*
176 * If we haven't yet sent the message the requisite number of times,
177 * try again to send it now. Note that when shm_mq_send() returns
178 * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
179 * same message size and contents; that's not an issue here because
180 * we're sending the same message every time.
181 */
183 {
184 res = shm_mq_send(outqh, message_size, message_contents, true,
185 true);
186 if (res == SHM_MQ_SUCCESS)
187 {
188 ++send_count;
189 wait = false;
190 }
191 else if (res == SHM_MQ_DETACHED)
194 errmsg("could not send message")));
195 }
196
197 /*
198 * If we haven't yet received the message the requisite number of
199 * times, try to receive it again now.
200 */
202 {
203 res = shm_mq_receive(inqh, &len, &data, true);
204 if (res == SHM_MQ_SUCCESS)
205 {
207 /* Verifying every time is slow, so it's optional. */
208 if (verify)
209 verify_message(message_size, message_contents, len, data);
210 wait = false;
211 }
212 else if (res == SHM_MQ_DETACHED)
215 errmsg("could not receive message")));
216 }
217 else
218 {
219 /*
220 * Otherwise, we've received the message enough times. This
221 * shouldn't happen unless we've also sent it enough times.
222 */
226 errmsg("message sent %d times, but received %d times",
228 break;
229 }
230
231 if (wait)
232 {
233 /* first time, allocate or get the custom wait event */
234 if (we_message_queue == 0)
235 we_message_queue = WaitEventExtensionNew("TestShmMqMessageQueue");
236
237 /*
238 * If we made no progress, wait for one of the other processes to
239 * which we are connected to set our latch, indicating that they
240 * have read or written data and therefore there may now be work
241 * for us to do.
242 */
247 }
248 }
249
250 /* Clean up. */
251 dsm_detach(seg);
252
254}
#define PG_GETARG_BOOL(n)
Definition fmgr.h:274
struct Latch * MyLatch
Definition globals.c:63
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
@ SHM_MQ_DETACHED
Definition shm_mq.h:42
static uint32 we_message_queue
Definition test.c:34
uint32 WaitEventExtensionNew(const char *wait_event_name)
Definition wait_event.c:163
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET

References CHECK_FOR_INTERRUPTS, data, dsm_detach(), ereport, errcode(), errmsg, ERROR, fb(), len, MyLatch, PG_GETARG_BOOL, PG_GETARG_INT32, PG_GETARG_INT64, PG_GETARG_TEXT_PP, PG_RETURN_VOID, ResetLatch(), SHM_MQ_DETACHED, shm_mq_receive(), shm_mq_send(), SHM_MQ_SUCCESS, test_shm_mq_setup(), VARDATA_ANY(), VARSIZE_ANY_EXHDR(), verify_message(), WaitEventExtensionNew(), WaitLatch(), we_message_queue, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

◆ verify_message()

static void verify_message ( Size  origlen,
char origdata,
Size  newlen,
char newdata 
)
static

Definition at line 260 of file test.c.

261{
262 Size i;
263
264 if (origlen != newlen)
266 (errmsg("message corrupted"),
267 errdetail("The original message was %zu bytes but the final message is %zu bytes.",
268 origlen, newlen)));
269
270 for (i = 0; i < origlen; ++i)
271 if (origdata[i] != newdata[i])
273 (errmsg("message corrupted"),
274 errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
275}
int errdetail(const char *fmt,...) pg_attribute_printf(1
int i
Definition isn.c:77

References ereport, errdetail(), errmsg, ERROR, fb(), and i.

Referenced by test_shm_mq(), and test_shm_mq_pipelined().

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 25 of file test.c.

◆ we_message_queue

uint32 we_message_queue = 0
static

Definition at line 34 of file test.c.

Referenced by test_shm_mq_pipelined().