PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
method_io_uring.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * method_io_uring.c
4 * AIO - perform AIO using Linux' io_uring
5 *
6 * For now we create one io_uring instance for each backend. These io_uring
7 * instances have to be created in postmaster, during startup, to allow other
8 * backends to process IO completions, if the issuing backend is currently
9 * busy doing other things. Other backends may not use another backend's
10 * io_uring instance to submit IO, that'd require additional locking that
11 * would likely be harmful for performance.
12 *
13 * We likely will want to introduce a backend-local io_uring instance in the
14 * future, e.g. for FE/BE network IO.
15 *
16 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
17 * Portions Copyright (c) 1994, Regents of the University of California
18 *
19 * IDENTIFICATION
20 * src/backend/storage/aio/method_io_uring.c
21 *
22 *-------------------------------------------------------------------------
23 */
24
25#include "postgres.h"
26
27/* included early, for IOMETHOD_IO_URING_ENABLED */
28#include "storage/aio.h"
29
30#ifdef IOMETHOD_IO_URING_ENABLED
31
32#include <liburing.h>
33
34#include "miscadmin.h"
36#include "storage/fd.h"
37#include "storage/proc.h"
38#include "storage/shmem.h"
39#include "storage/lwlock.h"
40#include "storage/procnumber.h"
41#include "utils/wait_event.h"
42
43
44/* number of completions processed at once */
45#define PGAIO_MAX_LOCAL_COMPLETED_IO 32
46
47
48/* Entry points for IoMethodOps. */
49static size_t pgaio_uring_shmem_size(void);
50static void pgaio_uring_shmem_init(bool first_time);
51static void pgaio_uring_init_backend(void);
52static int pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
53static void pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation);
54
55/* helper functions */
56static void pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe);
57
58
59const IoMethodOps pgaio_uring_ops = {
60 /*
61 * While io_uring mostly is OK with FDs getting closed while the IO is in
62 * flight, that is not true for IOs submitted with IOSQE_ASYNC.
63 *
64 * See
65 * https://postgr.es/m/5ons2rtmwarqqhhexb3dnqulw5rjgwgoct57vpdau4rujlrffj%403fls6d2mkiwc
66 */
68
69 .shmem_size = pgaio_uring_shmem_size,
70 .shmem_init = pgaio_uring_shmem_init,
71 .init_backend = pgaio_uring_init_backend,
72
73 .submit = pgaio_uring_submit,
74 .wait_one = pgaio_uring_wait_one,
75};
76
77/*
78 * Per-backend state when using io_method=io_uring
79 *
80 * Align the whole struct to a cacheline boundary, to prevent false sharing
81 * between completion_lock and prior backend's io_uring_ring.
82 */
83typedef struct pg_attribute_aligned (PG_CACHE_LINE_SIZE)
84PgAioUringContext
85{
86 /*
87 * Multiple backends can process completions for this backend's io_uring
88 * instance (e.g. when the backend issuing IO is busy doing something
89 * else). To make that safe we have to ensure that only a single backend
90 * gets io completions from the io_uring instance at a time.
91 */
92 LWLock completion_lock;
93
94 struct io_uring io_uring_ring;
95} PgAioUringContext;
96
97/* PgAioUringContexts for all backends */
98static PgAioUringContext *pgaio_uring_contexts;
99
100/* the current backend's context */
101static PgAioUringContext *pgaio_my_uring_context;
102
103
104static uint32
105pgaio_uring_procs(void)
106{
107 /*
108 * We can subtract MAX_IO_WORKERS here as io workers are never used at the
109 * same time as io_method=io_uring.
110 */
112}
113
114static Size
115pgaio_uring_context_shmem_size(void)
116{
117 return mul_size(pgaio_uring_procs(), sizeof(PgAioUringContext));
118}
119
120static size_t
121pgaio_uring_shmem_size(void)
122{
123 return pgaio_uring_context_shmem_size();
124}
125
126static void
127pgaio_uring_shmem_init(bool first_time)
128{
130 bool found;
131
132 pgaio_uring_contexts = (PgAioUringContext *)
133 ShmemInitStruct("AioUring", pgaio_uring_shmem_size(), &found);
134
135 if (found)
136 return;
137
138 for (int contextno = 0; contextno < TotalProcs; contextno++)
139 {
140 PgAioUringContext *context = &pgaio_uring_contexts[contextno];
141 int ret;
142
143 /*
144 * Right now a high TotalProcs will cause problems in two ways:
145 *
146 * - RLIMIT_NOFILE needs to be big enough to allow all
147 * io_uring_queue_init() calls to succeed.
148 *
149 * - RLIMIT_NOFILE needs to be big enough to still have enough file
150 * descriptors to satisfy set_max_safe_fds() left over. Or, even
151 * better, have max_files_per_process left over FDs.
152 *
153 * We probably should adjust the soft RLIMIT_NOFILE to ensure that.
154 *
155 *
156 * XXX: Newer versions of io_uring support sharing the workers that
157 * execute some asynchronous IOs between io_uring instances. It might
158 * be worth using that - also need to evaluate if that causes
159 * noticeable additional contention?
160 */
161 ret = io_uring_queue_init(io_max_concurrency, &context->io_uring_ring, 0);
162 if (ret < 0)
163 {
164 char *hint = NULL;
165 int err = ERRCODE_INTERNAL_ERROR;
166
167 /* add hints for some failures that errno explains sufficiently */
168 if (-ret == EPERM)
169 {
170 err = ERRCODE_INSUFFICIENT_PRIVILEGE;
171 hint = _("Check if io_uring is disabled via /proc/sys/kernel/io_uring_disabled.");
172 }
173 else if (-ret == EMFILE)
174 {
175 err = ERRCODE_INSUFFICIENT_RESOURCES;
176 hint = psprintf(_("Consider increasing \"ulimit -n\" to at least %d."),
177 TotalProcs + max_files_per_process);
178 }
179 else if (-ret == ENOSYS)
180 {
181 err = ERRCODE_FEATURE_NOT_SUPPORTED;
182 hint = _("Kernel does not support io_uring.");
183 }
184
185 /* update errno to allow %m to work */
186 errno = -ret;
187
189 errcode(err),
190 errmsg("could not setup io_uring queue: %m"),
191 hint != NULL ? errhint("%s", hint) : 0);
192 }
193
194 LWLockInitialize(&context->completion_lock, LWTRANCHE_AIO_URING_COMPLETION);
195 }
196}
197
198static void
199pgaio_uring_init_backend(void)
200{
201 Assert(MyProcNumber < pgaio_uring_procs());
202
203 pgaio_my_uring_context = &pgaio_uring_contexts[MyProcNumber];
204}
205
206static int
207pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
208{
209 struct io_uring *uring_instance = &pgaio_my_uring_context->io_uring_ring;
210 int in_flight_before = dclist_count(&pgaio_my_backend->in_flight_ios);
211
212 Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
213
214 for (int i = 0; i < num_staged_ios; i++)
215 {
216 PgAioHandle *ioh = staged_ios[i];
217 struct io_uring_sqe *sqe;
218
219 sqe = io_uring_get_sqe(uring_instance);
220
221 if (!sqe)
222 elog(ERROR, "io_uring submission queue is unexpectedly full");
223
225 pgaio_uring_sq_from_io(ioh, sqe);
226
227 /*
228 * io_uring executes IO in process context if possible. That's
229 * generally good, as it reduces context switching. When performing a
230 * lot of buffered IO that means that copying between page cache and
231 * userspace memory happens in the foreground, as it can't be
232 * offloaded to DMA hardware as is possible when using direct IO. When
233 * executing a lot of buffered IO this causes io_uring to be slower
234 * than worker mode, as worker mode parallelizes the copying. io_uring
235 * can be told to offload work to worker threads instead.
236 *
237 * If an IO is buffered IO and we already have IOs in flight or
238 * multiple IOs are being submitted, we thus tell io_uring to execute
239 * the IO in the background. We don't do so for the first few IOs
240 * being submitted as executing in this process' context has lower
241 * latency.
242 */
243 if (in_flight_before > 4 && (ioh->flags & PGAIO_HF_BUFFERED))
244 io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
245
246 in_flight_before++;
247 }
248
249 while (true)
250 {
251 int ret;
252
253 pgstat_report_wait_start(WAIT_EVENT_AIO_IO_URING_SUBMIT);
254 ret = io_uring_submit(uring_instance);
256
257 if (ret == -EINTR)
258 {
260 "aio method uring: submit EINTR, nios: %d",
261 num_staged_ios);
262 }
263 else if (ret < 0)
264 {
265 /*
266 * The io_uring_enter() manpage suggests that the appropriate
267 * reaction to EAGAIN is:
268 *
269 * "The application should wait for some completions and try
270 * again"
271 *
272 * However, it seems unlikely that that would help in our case, as
273 * we apply a low limit to the number of outstanding IOs and thus
274 * also outstanding completions, making it unlikely that we'd get
275 * EAGAIN while the OS is in good working order.
276 *
277 * Additionally, it would be problematic to just wait here, our
278 * caller might hold critical locks. It'd possibly lead to
279 * delaying the crash-restart that seems likely to occur when the
280 * kernel is under such heavy memory pressure.
281 *
282 * Update errno to allow %m to work.
283 */
284 errno = -ret;
285 elog(PANIC, "io_uring submit failed: %m");
286 }
287 else if (ret != num_staged_ios)
288 {
289 /* likely unreachable, but if it is, we would need to re-submit */
290 elog(PANIC, "io_uring submit submitted only %d of %d",
291 ret, num_staged_ios);
292 }
293 else
294 {
296 "aio method uring: submitted %d IOs",
297 num_staged_ios);
298 break;
299 }
300 }
301
302 return num_staged_ios;
303}
304
305static void
306pgaio_uring_completion_error_callback(void *arg)
307{
308 ProcNumber owner;
309 PGPROC *owner_proc;
310 int32 owner_pid;
311 PgAioHandle *ioh = arg;
312
313 if (!ioh)
314 return;
315
316 /* No need for context if a backend is completing the IO for itself */
317 if (ioh->owner_procno == MyProcNumber)
318 return;
319
320 owner = ioh->owner_procno;
321 owner_proc = GetPGProcByNumber(owner);
322 owner_pid = owner_proc->pid;
323
324 errcontext("completing I/O on behalf of process %d", owner_pid);
325}
326
327static void
328pgaio_uring_drain_locked(PgAioUringContext *context)
329{
330 int ready;
331 int orig_ready;
332 ErrorContextCallback errcallback = {0};
333
334 Assert(LWLockHeldByMeInMode(&context->completion_lock, LW_EXCLUSIVE));
335
336 errcallback.callback = pgaio_uring_completion_error_callback;
337 errcallback.previous = error_context_stack;
338 error_context_stack = &errcallback;
339
340 /*
341 * Don't drain more events than available right now. Otherwise it's
342 * plausible that one backend could get stuck, for a while, receiving CQEs
343 * without actually processing them.
344 */
345 orig_ready = ready = io_uring_cq_ready(&context->io_uring_ring);
346
347 while (ready > 0)
348 {
349 struct io_uring_cqe *cqes[PGAIO_MAX_LOCAL_COMPLETED_IO];
350 uint32 ncqes;
351
353 ncqes =
354 io_uring_peek_batch_cqe(&context->io_uring_ring,
355 cqes,
356 Min(PGAIO_MAX_LOCAL_COMPLETED_IO, ready));
357 Assert(ncqes <= ready);
358
359 ready -= ncqes;
360
361 for (int i = 0; i < ncqes; i++)
362 {
363 struct io_uring_cqe *cqe = cqes[i];
364 PgAioHandle *ioh;
365
366 ioh = io_uring_cqe_get_data(cqe);
367 errcallback.arg = ioh;
368 io_uring_cqe_seen(&context->io_uring_ring, cqe);
369
370 pgaio_io_process_completion(ioh, cqe->res);
371 errcallback.arg = NULL;
372 }
373
375
377 "drained %d/%d, now expecting %d",
378 ncqes, orig_ready, io_uring_cq_ready(&context->io_uring_ring));
379 }
380
381 error_context_stack = errcallback.previous;
382}
383
384static void
385pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation)
386{
388 ProcNumber owner_procno = ioh->owner_procno;
389 PgAioUringContext *owner_context = &pgaio_uring_contexts[owner_procno];
390 bool expect_cqe;
391 int waited = 0;
392
393 /*
394 * XXX: It would be nice to have a smarter locking scheme, nearly all the
395 * time the backend owning the ring will consume the completions, making
396 * the locking unnecessarily expensive.
397 */
398 LWLockAcquire(&owner_context->completion_lock, LW_EXCLUSIVE);
399
400 while (true)
401 {
403 "wait_one io_gen: %llu, ref_gen: %llu, cycle %d",
404 (long long unsigned) ioh->generation,
405 (long long unsigned) ref_generation,
406 waited);
407
408 if (pgaio_io_was_recycled(ioh, ref_generation, &state) ||
410 {
411 /* the IO was completed by another backend */
412 break;
413 }
414 else if (io_uring_cq_ready(&owner_context->io_uring_ring))
415 {
416 /* no need to wait in the kernel, io_uring has a completion */
417 expect_cqe = true;
418 }
419 else
420 {
421 int ret;
422 struct io_uring_cqe *cqes;
423
424 /* need to wait in the kernel */
425 pgstat_report_wait_start(WAIT_EVENT_AIO_IO_URING_EXECUTION);
426 ret = io_uring_wait_cqes(&owner_context->io_uring_ring, &cqes, 1, NULL, NULL);
428
429 if (ret == -EINTR)
430 {
431 continue;
432 }
433 else if (ret != 0)
434 {
435 /* see comment after io_uring_submit() */
436 errno = -ret;
437 elog(PANIC, "io_uring wait failed: %m");
438 }
439 else
440 {
441 Assert(cqes != NULL);
442 expect_cqe = true;
443 waited++;
444 }
445 }
446
447 if (expect_cqe)
448 {
449 pgaio_uring_drain_locked(owner_context);
450 }
451 }
452
453 LWLockRelease(&owner_context->completion_lock);
454
456 "wait_one with %d sleeps",
457 waited);
458}
459
460static void
461pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe)
462{
463 struct iovec *iov;
464
465 switch (ioh->op)
466 {
467 case PGAIO_OP_READV:
468 iov = &pgaio_ctl->iovecs[ioh->iovec_off];
469 if (ioh->op_data.read.iov_length == 1)
470 {
471 io_uring_prep_read(sqe,
472 ioh->op_data.read.fd,
473 iov->iov_base,
474 iov->iov_len,
475 ioh->op_data.read.offset);
476 }
477 else
478 {
479 io_uring_prep_readv(sqe,
480 ioh->op_data.read.fd,
481 iov,
483 ioh->op_data.read.offset);
484
485 }
486 break;
487
488 case PGAIO_OP_WRITEV:
489 iov = &pgaio_ctl->iovecs[ioh->iovec_off];
490 if (ioh->op_data.write.iov_length == 1)
491 {
492 io_uring_prep_write(sqe,
493 ioh->op_data.write.fd,
494 iov->iov_base,
495 iov->iov_len,
496 ioh->op_data.write.offset);
497 }
498 else
499 {
500 io_uring_prep_writev(sqe,
501 ioh->op_data.write.fd,
502 iov,
504 ioh->op_data.write.offset);
505 }
506 break;
507
508 case PGAIO_OP_INVALID:
509 elog(ERROR, "trying to prepare invalid IO operation for execution");
510 }
511
512 io_uring_sqe_set_data(sqe, ioh);
513}
514
515#endif /* IOMETHOD_IO_URING_ENABLED */
void pgaio_io_process_completion(PgAioHandle *ioh, int result)
Definition: aio.c:525
PgAioBackend * pgaio_my_backend
Definition: aio.c:81
int io_max_concurrency
Definition: aio.c:75
PgAioCtl * pgaio_ctl
Definition: aio.c:78
bool pgaio_io_was_recycled(PgAioHandle *ioh, uint64 ref_generation, PgAioHandleState *state)
Definition: aio.c:556
void pgaio_io_prepare_submit(PgAioHandle *ioh)
Definition: aio.c:507
@ PGAIO_OP_WRITEV
Definition: aio.h:93
@ PGAIO_OP_INVALID
Definition: aio.h:90
@ PGAIO_OP_READV
Definition: aio.h:92
@ PGAIO_HF_BUFFERED
Definition: aio.h:77
PgAioHandleState
Definition: aio_internal.h:44
@ PGAIO_HS_SUBMITTED
Definition: aio_internal.h:69
#define pgaio_debug(elevel, msg,...)
Definition: aio_internal.h:376
#define pgaio_debug_io(elevel, ioh, msg,...)
Definition: aio_internal.h:389
#define PGAIO_SUBMIT_BATCH_SIZE
Definition: aio_internal.h:28
#define Min(x, y)
Definition: c.h:975
int32_t int32
Definition: c.h:498
uint64_t uint64
Definition: c.h:503
uint16_t uint16
Definition: c.h:501
uint32_t uint32
Definition: c.h:502
size_t Size
Definition: c.h:576
ErrorContextCallback * error_context_stack
Definition: elog.c:95
int errhint(const char *fmt,...)
Definition: elog.c:1318
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define _(x)
Definition: elog.c:91
#define errcontext
Definition: elog.h:197
#define DEBUG3
Definition: elog.h:28
#define PANIC
Definition: elog.h:42
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
#define DEBUG4
Definition: elog.h:27
void err(int eval, const char *fmt,...)
Definition: err.c:43
int max_files_per_process
Definition: fd.c:146
ProcNumber MyProcNumber
Definition: globals.c:90
int MaxBackends
Definition: globals.c:146
Assert(PointerIsAligned(start, uint64))
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
int i
Definition: isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1180
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:2027
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1900
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:719
@ LWTRANCHE_AIO_URING_COMPLETION
Definition: lwlock.h:223
@ LW_EXCLUSIVE
Definition: lwlock.h:114
#define START_CRIT_SECTION()
Definition: miscadmin.h:149
#define END_CRIT_SECTION()
Definition: miscadmin.h:151
void * arg
#define MAX_IO_WORKERS
Definition: proc.h:454
#define NUM_AUXILIARY_PROCS
Definition: proc.h:455
#define GetPGProcByNumber(n)
Definition: proc.h:432
int ProcNumber
Definition: procnumber.h:24
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
struct ErrorContextCallback * previous
Definition: elog.h:296
void(* callback)(void *arg)
Definition: elog.h:297
bool wait_on_fd_before_close
Definition: aio_internal.h:262
Definition: lwlock.h:42
Definition: proc.h:171
int pid
Definition: proc.h:191
dclist_head in_flight_ios
Definition: aio_internal.h:219
struct iovec * iovecs
Definition: aio_internal.h:234
int32 owner_procno
Definition: aio_internal.h:125
PgAioOp op
Definition: aio_internal.h:105
PgAioOpData op_data
Definition: aio_internal.h:174
uint32 iovec_off
Definition: aio_internal.h:164
uint64 generation
Definition: aio_internal.h:146
Definition: regguts.h:323
uint64 offset
Definition: aio.h:140
int fd
Definition: aio.h:138
uint16 iov_length
Definition: aio.h:139
struct PgAioOpData::@123 write
struct PgAioOpData::@122 read
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:69
static void pgstat_report_wait_end(void)
Definition: wait_event.h:85
#define EINTR
Definition: win32_port.h:364