30#ifdef IOMETHOD_IO_URING_ENABLED
48#define PGAIO_MAX_LOCAL_COMPLETED_IO 32
52static size_t pgaio_uring_shmem_size(
void);
53static void pgaio_uring_shmem_init(
bool first_time);
54static void pgaio_uring_init_backend(
void);
59static void pgaio_uring_sq_from_io(
PgAioHandle *ioh,
struct io_uring_sqe *sqe);
72 .shmem_size = pgaio_uring_shmem_size,
73 .shmem_init = pgaio_uring_shmem_init,
74 .init_backend = pgaio_uring_init_backend,
76 .submit = pgaio_uring_submit,
77 .wait_one = pgaio_uring_wait_one,
86typedef struct pg_attribute_aligned (PG_CACHE_LINE_SIZE)
97 struct io_uring io_uring_ring;
107typedef struct PgAioUringCaps
116static PgAioUringContext *pgaio_uring_contexts;
119static PgAioUringContext *pgaio_my_uring_context;
121static PgAioUringCaps pgaio_uring_caps =
128pgaio_uring_procs(
void)
141pgaio_uring_check_capabilities(
void)
143 if (pgaio_uring_caps.checked)
159#if defined(HAVE_LIBURING_QUEUE_INIT_MEM) && defined(IORING_SETUP_NO_MMAP)
161 struct io_uring test_ring;
164 struct io_uring_params p = {0};
175 ring_size = 1024 * 1024;
181 ring_size -= ring_size % sysconf(_SC_PAGESIZE);
183 ring_ptr = mmap(NULL, ring_size, PROT_READ | PROT_WRITE, MAP_SHARED |
MAP_ANONYMOUS, -1, 0);
186 "mmap(%zu) to determine io_uring_queue_init_mem() support failed: %m",
189 ret = io_uring_queue_init_mem(
io_max_concurrency, &test_ring, &p, ring_ptr, ring_size);
192 pgaio_uring_caps.mem_init_size = ret;
195 "can use combined memory mapping for io_uring, each ring needs %d bytes",
199 io_uring_queue_exit(&test_ring);
211 "cannot use combined memory mapping for io_uring, ring creation failed: %m");
215 if (munmap(ring_ptr, ring_size) != 0)
221 "can't use combined memory mapping for io_uring, kernel or liburing too old");
225 pgaio_uring_caps.checked =
true;
232pgaio_uring_context_shmem_size(
void)
234 return mul_size(pgaio_uring_procs(),
sizeof(PgAioUringContext));
242pgaio_uring_ring_shmem_size(
void)
246 if (pgaio_uring_caps.mem_init_size > 0)
253 sz =
add_size(sz, sysconf(_SC_PAGESIZE));
255 pgaio_uring_caps.mem_init_size));
262pgaio_uring_shmem_size(
void)
270 pgaio_uring_check_capabilities();
272 sz = pgaio_uring_context_shmem_size();
273 sz =
add_size(sz, pgaio_uring_ring_shmem_size());
279pgaio_uring_shmem_init(
bool first_time)
281 int TotalProcs = pgaio_uring_procs();
284 size_t ring_mem_remain = 0;
285 char *ring_mem_next = 0;
292 shmem =
ShmemInitStruct(
"AioUringContext", pgaio_uring_shmem_size(), &found);
296 pgaio_uring_contexts = (PgAioUringContext *) shmem;
297 shmem += pgaio_uring_context_shmem_size();
300 if (pgaio_uring_caps.mem_init_size > 0)
302 ring_mem_remain = pgaio_uring_ring_shmem_size();
303 ring_mem_next = shmem;
306 ring_mem_next = (
char *)
TYPEALIGN(sysconf(_SC_PAGESIZE), ring_mem_next);
309 ring_mem_remain -= ring_mem_next - shmem;
310 shmem += ring_mem_next - shmem;
312 shmem += ring_mem_remain;
315 for (
int contextno = 0; contextno < TotalProcs; contextno++)
317 PgAioUringContext *context = &pgaio_uring_contexts[contextno];
344#if defined(HAVE_LIBURING_QUEUE_INIT_MEM) && defined(IORING_SETUP_NO_MMAP)
345 if (pgaio_uring_caps.mem_init_size > 0)
347 struct io_uring_params p = {0};
349 ret = io_uring_queue_init_mem(
io_max_concurrency, &context->io_uring_ring, &p, ring_mem_next, ring_mem_remain);
351 ring_mem_remain -= ret;
352 ring_mem_next += ret;
363 int err = ERRCODE_INTERNAL_ERROR;
368 err = ERRCODE_INSUFFICIENT_PRIVILEGE;
369 hint =
_(
"Check if io_uring is disabled via /proc/sys/kernel/io_uring_disabled.");
371 else if (-ret == EMFILE)
373 err = ERRCODE_INSUFFICIENT_RESOURCES;
374 hint =
psprintf(
_(
"Consider increasing \"ulimit -n\" to at least %d."),
377 else if (-ret == ENOSYS)
379 err = ERRCODE_FEATURE_NOT_SUPPORTED;
380 hint =
_(
"The kernel does not support io_uring.");
388 errmsg(
"could not setup io_uring queue: %m"),
389 hint != NULL ?
errhint(
"%s", hint) : 0);
392 LWLockInitialize(&context->completion_lock, LWTRANCHE_AIO_URING_COMPLETION);
397pgaio_uring_init_backend(
void)
401 pgaio_my_uring_context = &pgaio_uring_contexts[
MyProcNumber];
407 struct io_uring *uring_instance = &pgaio_my_uring_context->io_uring_ring;
412 for (
int i = 0;
i < num_staged_ios;
i++)
415 struct io_uring_sqe *sqe;
417 sqe = io_uring_get_sqe(uring_instance);
420 elog(
ERROR,
"io_uring submission queue is unexpectedly full");
423 pgaio_uring_sq_from_io(ioh, sqe);
442 io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
452 ret = io_uring_submit(uring_instance);
458 "aio method uring: submit EINTR, nios: %d",
483 elog(
PANIC,
"io_uring submit failed: %m");
485 else if (ret != num_staged_ios)
488 elog(
PANIC,
"io_uring submit submitted only %d of %d",
489 ret, num_staged_ios);
494 "aio method uring: submitted %d IOs",
500 return num_staged_ios;
504pgaio_uring_completion_error_callback(
void *
arg)
520 owner_pid = owner_proc->
pid;
522 errcontext(
"completing I/O on behalf of process %d", owner_pid);
526pgaio_uring_drain_locked(PgAioUringContext *context)
534 errcallback.
callback = pgaio_uring_completion_error_callback;
543 orig_ready = ready = io_uring_cq_ready(&context->io_uring_ring);
547 struct io_uring_cqe *cqes[PGAIO_MAX_LOCAL_COMPLETED_IO];
552 io_uring_peek_batch_cqe(&context->io_uring_ring,
554 Min(PGAIO_MAX_LOCAL_COMPLETED_IO, ready));
559 for (
int i = 0;
i < ncqes;
i++)
561 struct io_uring_cqe *cqe = cqes[
i];
564 ioh = io_uring_cqe_get_data(cqe);
565 errcallback.
arg = ioh;
566 io_uring_cqe_seen(&context->io_uring_ring, cqe);
569 errcallback.
arg = NULL;
575 "drained %d/%d, now expecting %d",
576 ncqes, orig_ready, io_uring_cq_ready(&context->io_uring_ring));
587 PgAioUringContext *owner_context = &pgaio_uring_contexts[owner_procno];
601 "wait_one io_gen: %" PRIu64
", ref_gen: %" PRIu64
", cycle %d",
612 else if (io_uring_cq_ready(&owner_context->io_uring_ring))
620 struct io_uring_cqe *cqes;
624 ret = io_uring_wait_cqes(&owner_context->io_uring_ring, &cqes, 1, NULL, NULL);
647 pgaio_uring_drain_locked(owner_context);
654 "wait_one with %d sleeps",
659pgaio_uring_sq_from_io(
PgAioHandle *ioh,
struct io_uring_sqe *sqe)
669 io_uring_prep_read(sqe,
677 io_uring_prep_readv(sqe,
690 io_uring_prep_write(sqe,
698 io_uring_prep_writev(sqe,
707 elog(
ERROR,
"trying to prepare invalid IO operation for execution");
710 io_uring_sqe_set_data(sqe, ioh);
void pgaio_io_process_completion(PgAioHandle *ioh, int result)
PgAioBackend * pgaio_my_backend
bool pgaio_io_was_recycled(PgAioHandle *ioh, uint64 ref_generation, PgAioHandleState *state)
void pgaio_io_prepare_submit(PgAioHandle *ioh)
#define pgaio_debug(elevel, msg,...)
#define pgaio_debug_io(elevel, ioh, msg,...)
#define PGAIO_SUBMIT_BATCH_SIZE
#define TYPEALIGN(ALIGNVAL, LEN)
ErrorContextCallback * error_context_stack
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
int max_files_per_process
Assert(PointerIsAligned(start, uint64))
static uint32 dclist_count(const dclist_head *head)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockInitialize(LWLock *lock, int tranche_id)
#define START_CRIT_SECTION()
#define END_CRIT_SECTION()
#define NUM_AUXILIARY_PROCS
#define GetPGProcByNumber(n)
char * psprintf(const char *fmt,...)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
struct ErrorContextCallback * previous
void(* callback)(void *arg)
bool wait_on_fd_before_close
dclist_head in_flight_ios
struct PgAioOpData::@126 read
struct PgAioOpData::@127 write
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)