PostgreSQL Source Code git master
test-oauth-curl.c
Go to the documentation of this file.
1/*
2 * test-oauth-curl.c
3 *
4 * A unit test driver for libpq-oauth. This #includes oauth-curl.c, which lets
5 * the tests reference static functions and other internals.
6 *
7 * USE_ASSERT_CHECKING is required, to make it easy for tests to wrap
8 * must-succeed code as part of test setup.
9 *
10 * Copyright (c) 2025, PostgreSQL Global Development Group
11 */
12
13#include "oauth-curl.c"
14
15#include <fcntl.h>
16
17#ifdef USE_ASSERT_CHECKING
18
19/*
20 * TAP Helpers
21 */
22
23static int num_tests = 0;
24
25/*
26 * Reports ok/not ok to the TAP stream on stdout.
27 */
28#define ok(OK, TEST) \
29 ok_impl(OK, TEST, #OK, __FILE__, __LINE__)
30
31static bool
32ok_impl(bool ok, const char *test, const char *teststr, const char *file, int line)
33{
34 printf("%sok %d - %s\n", ok ? "" : "not ", ++num_tests, test);
35
36 if (!ok)
37 {
38 printf("# at %s:%d:\n", file, line);
39 printf("# expression is false: %s\n", teststr);
40 }
41
42 return ok;
43}
44
45/*
46 * Like ok(this == that), but with more diagnostics on failure.
47 *
48 * Only works on ints, but luckily that's all we need here. Note that the much
49 * simpler-looking macro implementation
50 *
51 * is_diag(ok(THIS == THAT, TEST), THIS, #THIS, THAT, #THAT)
52 *
53 * suffers from multiple evaluation of the macro arguments...
54 */
55#define is(THIS, THAT, TEST) \
56 do { \
57 int this_ = (THIS), \
58 that_ = (THAT); \
59 is_diag( \
60 ok_impl(this_ == that_, TEST, #THIS " == " #THAT, __FILE__, __LINE__), \
61 this_, #THIS, that_, #THAT \
62 ); \
63 } while (0)
64
65static bool
66is_diag(bool ok, int this, const char *thisstr, int that, const char *thatstr)
67{
68 if (!ok)
69 printf("# %s = %d; %s = %d\n", thisstr, this, thatstr, that);
70
71 return ok;
72}
73
74/*
75 * Utilities
76 */
77
78/*
79 * Creates a partially-initialized async_ctx for the purposes of testing. Free
80 * with free_test_actx().
81 */
82static struct async_ctx *
83init_test_actx(void)
84{
85 struct async_ctx *actx;
86
87 actx = calloc(1, sizeof(*actx));
88 Assert(actx);
89
90 actx->mux = PGINVALID_SOCKET;
91 actx->timerfd = -1;
92 actx->debugging = true;
93
94 initPQExpBuffer(&actx->errbuf);
95
97
98 return actx;
99}
100
101static void
102free_test_actx(struct async_ctx *actx)
103{
104 termPQExpBuffer(&actx->errbuf);
105
106 if (actx->mux != PGINVALID_SOCKET)
107 close(actx->mux);
108 if (actx->timerfd >= 0)
109 close(actx->timerfd);
110
111 free(actx);
112}
113
114static char dummy_buf[4 * 1024]; /* for fill_pipe/drain_pipe */
115
116/*
117 * Writes to the write side of a pipe until it won't take any more data. Returns
118 * the amount written.
119 */
120static ssize_t
121fill_pipe(int fd)
122{
123 int mode;
124 ssize_t written = 0;
125
126 /* Don't block. */
127 Assert((mode = fcntl(fd, F_GETFL)) != -1);
128 Assert(fcntl(fd, F_SETFL, mode | O_NONBLOCK) == 0);
129
130 while (true)
131 {
132 ssize_t w;
133
134 w = write(fd, dummy_buf, sizeof(dummy_buf));
135 if (w < 0)
136 {
137 if (errno != EAGAIN && errno != EWOULDBLOCK)
138 {
139 perror("write to pipe");
140 written = -1;
141 }
142 break;
143 }
144
145 written += w;
146 }
147
148 /* Reset the descriptor flags. */
149 Assert(fcntl(fd, F_SETFD, mode) == 0);
150
151 return written;
152}
153
154/*
155 * Drains the requested amount of data from the read side of a pipe.
156 */
157static bool
158drain_pipe(int fd, ssize_t n)
159{
160 Assert(n > 0);
161
162 while (n)
163 {
164 size_t to_read = (n <= sizeof(dummy_buf)) ? n : sizeof(dummy_buf);
165 ssize_t drained;
166
167 drained = read(fd, dummy_buf, to_read);
168 if (drained < 0)
169 {
170 perror("read from pipe");
171 return false;
172 }
173
174 n -= drained;
175 }
176
177 return true;
178}
179
180/*
181 * Tests whether the multiplexer is marked ready by the deadline. This is a
182 * macro so that file/line information makes sense during failures.
183 *
184 * NB: our current multiplexer implementations (epoll/kqueue) are *readable*
185 * when the underlying libcurl sockets are *writable*. This behavior is pinned
186 * here to record that expectation; PGRES_POLLING_READING is hardcoded
187 * throughout the flow and would need to be changed if a new multiplexer does
188 * something different.
189 */
190#define mux_is_ready(MUX, DEADLINE, TEST) \
191 do { \
192 int res_ = PQsocketPoll(MUX, 1, 0, DEADLINE); \
193 Assert(res_ != -1); \
194 ok(res_ > 0, "multiplexer is ready " TEST); \
195 } while (0)
196
197/*
198 * The opposite of mux_is_ready().
199 */
200#define mux_is_not_ready(MUX, TEST) \
201 do { \
202 int res_ = PQsocketPoll(MUX, 1, 0, 0); \
203 Assert(res_ != -1); \
204 is(res_, 0, "multiplexer is not ready " TEST); \
205 } while (0)
206
207/*
208 * Test Suites
209 */
210
211/* Per-suite timeout. Set via the PG_TEST_TIMEOUT_DEFAULT envvar. */
212static pg_usec_time_t timeout_us = 180 * 1000 * 1000;
213
214static void
215test_set_timer(void)
216{
217 struct async_ctx *actx = init_test_actx();
218 const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
219
220 printf("# test_set_timer\n");
221
222 /* A zero-duration timer should result in a near-immediate ready signal. */
223 Assert(set_timer(actx, 0));
224 mux_is_ready(actx->mux, deadline, "when timer expires");
225 is(timer_expired(actx), 1, "timer_expired() returns 1 when timer expires");
226
227 /* Resetting the timer far in the future should unset the ready signal. */
228 Assert(set_timer(actx, INT_MAX));
229 mux_is_not_ready(actx->mux, "when timer is reset to the future");
230 is(timer_expired(actx), 0, "timer_expired() returns 0 with unexpired timer");
231
232 /* Setting another zero-duration timer should override the previous one. */
233 Assert(set_timer(actx, 0));
234 mux_is_ready(actx->mux, deadline, "when timer is re-expired");
235 is(timer_expired(actx), 1, "timer_expired() returns 1 when timer is re-expired");
236
237 /* And disabling that timer should once again unset the ready signal. */
238 Assert(set_timer(actx, -1));
239 mux_is_not_ready(actx->mux, "when timer is unset");
240 is(timer_expired(actx), 0, "timer_expired() returns 0 when timer is unset");
241
242 {
243 bool expired;
244
245 /* Make sure drain_timer_events() functions correctly as well. */
246 Assert(set_timer(actx, 0));
247 mux_is_ready(actx->mux, deadline, "when timer is re-expired (drain_timer_events)");
248
249 Assert(drain_timer_events(actx, &expired));
250 mux_is_not_ready(actx->mux, "when timer is drained after expiring");
251 is(expired, 1, "drain_timer_events() reports expiration");
252 is(timer_expired(actx), 0, "timer_expired() returns 0 after timer is drained");
253
254 /* A second drain should do nothing. */
255 Assert(drain_timer_events(actx, &expired));
256 mux_is_not_ready(actx->mux, "when timer is drained a second time");
257 is(expired, 0, "drain_timer_events() reports no expiration");
258 is(timer_expired(actx), 0, "timer_expired() still returns 0");
259 }
260
261 free_test_actx(actx);
262}
263
264static void
265test_register_socket(void)
266{
267 struct async_ctx *actx = init_test_actx();
268 int pipefd[2];
269 int rfd,
270 wfd;
271 bool bidirectional;
272
273 /* Create a local pipe for communication. */
274 Assert(pipe(pipefd) == 0);
275 rfd = pipefd[0];
276 wfd = pipefd[1];
277
278 /*
279 * Some platforms (FreeBSD) implement bidirectional pipes, affecting the
280 * behavior of some of these tests. Store that knowledge for later.
281 */
282 bidirectional = PQsocketPoll(rfd /* read */ , 0, 1 /* write */ , 0) > 0;
283
284 /*
285 * This suite runs twice -- once using CURL_POLL_IN/CURL_POLL_OUT for
286 * read/write operations, respectively, and once using CURL_POLL_INOUT for
287 * both sides.
288 */
289 for (int inout = 0; inout < 2; inout++)
290 {
291 const int in_event = inout ? CURL_POLL_INOUT : CURL_POLL_IN;
292 const int out_event = inout ? CURL_POLL_INOUT : CURL_POLL_OUT;
293 const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
294 size_t bidi_pipe_size = 0; /* silence compiler warnings */
295
296 printf("# test_register_socket %s\n", inout ? "(INOUT)" : "");
297
298 /*
299 * At the start of the test, the read side should be blocked and the
300 * write side should be open. (There's a mistake at the end of this
301 * loop otherwise.)
302 */
303 Assert(PQsocketPoll(rfd, 1, 0, 0) == 0);
304 Assert(PQsocketPoll(wfd, 0, 1, 0) > 0);
305
306 /*
307 * For bidirectional systems, emulate unidirectional behavior here by
308 * filling up the "read side" of the pipe.
309 */
310 if (bidirectional)
311 Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
312
313 /* Listen on the read side. The multiplexer shouldn't be ready yet. */
314 Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
315 mux_is_not_ready(actx->mux, "when fd is not readable");
316
317 /* Writing to the pipe should result in a read-ready multiplexer. */
318 Assert(write(wfd, "x", 1) == 1);
319 mux_is_ready(actx->mux, deadline, "when fd is readable");
320
321 /*
322 * Update the registration to wait on write events instead. The
323 * multiplexer should be unset.
324 */
325 Assert(register_socket(NULL, rfd, CURL_POLL_OUT, actx, NULL) == 0);
326 mux_is_not_ready(actx->mux, "when waiting for writes on readable fd");
327
328 /* Re-register for read events. */
329 Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
330 mux_is_ready(actx->mux, deadline, "when waiting for reads again");
331
332 /* Stop listening. The multiplexer should be unset. */
333 Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
334 mux_is_not_ready(actx->mux, "when readable fd is removed");
335
336 /* Listen again. */
337 Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
338 mux_is_ready(actx->mux, deadline, "when readable fd is re-added");
339
340 /*
341 * Draining the pipe should unset the multiplexer again, once the old
342 * event is cleared.
343 */
344 Assert(drain_pipe(rfd, 1));
346 mux_is_not_ready(actx->mux, "when fd is drained");
347
348 /* Undo any unidirectional emulation. */
349 if (bidirectional)
350 Assert(drain_pipe(wfd, bidi_pipe_size));
351
352 /* Listen on the write side. An empty buffer should be writable. */
353 Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
354 Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
355 mux_is_ready(actx->mux, deadline, "when fd is writable");
356
357 /* As above, wait on read events instead. */
358 Assert(register_socket(NULL, wfd, CURL_POLL_IN, actx, NULL) == 0);
359 mux_is_not_ready(actx->mux, "when waiting for reads on writable fd");
360
361 /* Re-register for write events. */
362 Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
363 mux_is_ready(actx->mux, deadline, "when waiting for writes again");
364
365 {
366 ssize_t written;
367
368 /*
369 * Fill the pipe. Once the old writable event is cleared, the mux
370 * should not be ready.
371 */
372 Assert((written = fill_pipe(wfd)) > 0);
373 printf("# pipe buffer is full at %zd bytes\n", written);
374
376 mux_is_not_ready(actx->mux, "when fd buffer is full");
377
378 /* Drain the pipe again. */
379 Assert(drain_pipe(rfd, written));
380 mux_is_ready(actx->mux, deadline, "when fd buffer is drained");
381 }
382
383 /* Stop listening. */
384 Assert(register_socket(NULL, wfd, CURL_POLL_REMOVE, actx, NULL) == 0);
385 mux_is_not_ready(actx->mux, "when fd is removed");
386
387 /* Make sure an expired timer doesn't interfere with event draining. */
388 {
389 bool expired;
390
391 /* Make the rfd appear unidirectional if necessary. */
392 if (bidirectional)
393 Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
394
395 /* Set the timer and wait for it to expire. */
396 Assert(set_timer(actx, 0));
397 Assert(PQsocketPoll(actx->timerfd, 1, 0, deadline) > 0);
398 is(timer_expired(actx), 1, "timer is expired");
399
400 /* Register for read events and make the fd readable. */
401 Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
402 Assert(write(wfd, "x", 1) == 1);
403 mux_is_ready(actx->mux, deadline, "when fd is readable and timer expired");
404
405 /*
406 * Draining the pipe should unset the multiplexer again, once the
407 * old event is drained and the timer is reset.
408 *
409 * Order matters, since comb_multiplexer() doesn't have to remove
410 * stale events when active events exist. Follow the call sequence
411 * used in the code: drain the timer expiration, drain the pipe,
412 * then clear the stale events.
413 */
414 Assert(drain_timer_events(actx, &expired));
415 Assert(drain_pipe(rfd, 1));
417
418 is(expired, 1, "drain_timer_events() reports expiration");
419 is(timer_expired(actx), 0, "timer is no longer expired");
420 mux_is_not_ready(actx->mux, "when fd is drained and timer reset");
421
422 /* Stop listening. */
423 Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
424
425 /* Undo any unidirectional emulation. */
426 if (bidirectional)
427 Assert(drain_pipe(wfd, bidi_pipe_size));
428 }
429
430 /* Ensure comb_multiplexer() can handle multiple stale events. */
431 {
432 int rfd2,
433 wfd2;
434
435 /* Create a second local pipe. */
436 Assert(pipe(pipefd) == 0);
437 rfd2 = pipefd[0];
438 wfd2 = pipefd[1];
439
440 /* Make both rfds appear unidirectional if necessary. */
441 if (bidirectional)
442 {
443 Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
444 Assert(fill_pipe(rfd2) == bidi_pipe_size);
445 }
446
447 /* Register for read events on both fds, and make them readable. */
448 Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
449 Assert(register_socket(NULL, rfd2, in_event, actx, NULL) == 0);
450
451 Assert(write(wfd, "x", 1) == 1);
452 Assert(write(wfd2, "x", 1) == 1);
453
454 mux_is_ready(actx->mux, deadline, "when two fds are readable");
455
456 /*
457 * Drain both fds. comb_multiplexer() should then ensure that the
458 * mux is no longer readable.
459 */
460 Assert(drain_pipe(rfd, 1));
461 Assert(drain_pipe(rfd2, 1));
463 mux_is_not_ready(actx->mux, "when two fds are drained");
464
465 /* Stop listening. */
466 Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
467 Assert(register_socket(NULL, rfd2, CURL_POLL_REMOVE, actx, NULL) == 0);
468
469 /* Undo any unidirectional emulation. */
470 if (bidirectional)
471 {
472 Assert(drain_pipe(wfd, bidi_pipe_size));
473 Assert(drain_pipe(wfd2, bidi_pipe_size));
474 }
475
476 close(rfd2);
477 close(wfd2);
478 }
479 }
480
481 close(rfd);
482 close(wfd);
483 free_test_actx(actx);
484}
485
486int
487main(int argc, char *argv[])
488{
489 const char *timeout;
490
491 /* Grab the default timeout. */
492 timeout = getenv("PG_TEST_TIMEOUT_DEFAULT");
493 if (timeout)
494 {
495 int timeout_s = atoi(timeout);
496
497 if (timeout_s > 0)
498 timeout_us = timeout_s * 1000 * 1000;
499 }
500
501 /*
502 * Set up line buffering for our output, to let stderr interleave in the
503 * log files.
504 */
505 setvbuf(stdout, NULL, PG_IOLBF, 0);
506
507 test_set_timer();
508 test_register_socket();
509
510 printf("1..%d\n", num_tests);
511 return 0;
512}
513
514#else /* !USE_ASSERT_CHECKING */
515
516/*
517 * Skip the test suite when we don't have assertions.
518 */
519int
520main(int argc, char *argv[])
521{
522 printf("1..0 # skip: cassert is not enabled\n");
523
524 return 0;
525}
526
527#endif /* USE_ASSERT_CHECKING */
int PQsocketPoll(int sock, int forRead, int forWrite, pg_usec_time_t end_time)
Definition: fe-misc.c:1141
pg_usec_time_t PQgetCurrentTimeUSec(void)
Definition: fe-misc.c:1235
Assert(PointerIsAligned(start, uint64))
#define calloc(a, b)
Definition: header.h:55
#define free(a)
Definition: header.h:65
#define close(a)
Definition: win32.h:12
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
int64_t pg_usec_time_t
Definition: libpq-fe.h:238
static bool drain_timer_events(struct async_ctx *actx, bool *was_expired)
Definition: oauth-curl.c:1590
static bool setup_multiplexer(struct async_ctx *actx)
Definition: oauth-curl.c:1174
static int register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx, void *socketp)
Definition: oauth-curl.c:1233
static bool set_timer(struct async_ctx *actx, long timeout)
Definition: oauth-curl.c:1447
static int timer_expired(struct async_ctx *actx)
Definition: oauth-curl.c:1544
static bool comb_multiplexer(struct async_ctx *actx)
Definition: oauth-curl.c:1399
static PgChecksumMode mode
Definition: pg_checksums.c:56
#define PG_IOLBF
Definition: port.h:410
#define PGINVALID_SOCKET
Definition: port.h:31
#define printf(...)
Definition: port.h:266
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:90
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:129
static void test(void)
static int fd(const char *x, int i)
Definition: preproc-init.c:105
pgsocket mux
Definition: oauth-curl.c:233
PQExpBufferData errbuf
Definition: oauth-curl.c:267
int timerfd
Definition: oauth-curl.c:232
bool debugging
Definition: oauth-curl.c:280
int main(int argc, char *argv[])
#define EWOULDBLOCK
Definition: win32_port.h:370
#define EAGAIN
Definition: win32_port.h:362