30#ifdef IOMETHOD_IO_URING_ENABLED
45#define PGAIO_MAX_LOCAL_COMPLETED_IO 32
49static size_t pgaio_uring_shmem_size(
void);
50static void pgaio_uring_shmem_init(
bool first_time);
51static void pgaio_uring_init_backend(
void);
56static void pgaio_uring_sq_from_io(
PgAioHandle *ioh,
struct io_uring_sqe *sqe);
69 .shmem_size = pgaio_uring_shmem_size,
70 .shmem_init = pgaio_uring_shmem_init,
71 .init_backend = pgaio_uring_init_backend,
73 .submit = pgaio_uring_submit,
74 .wait_one = pgaio_uring_wait_one,
83typedef struct pg_attribute_aligned (PG_CACHE_LINE_SIZE)
94 struct io_uring io_uring_ring;
98static PgAioUringContext *pgaio_uring_contexts;
101static PgAioUringContext *pgaio_my_uring_context;
105pgaio_uring_procs(
void)
115pgaio_uring_context_shmem_size(
void)
117 return mul_size(pgaio_uring_procs(),
sizeof(PgAioUringContext));
121pgaio_uring_shmem_size(
void)
123 return pgaio_uring_context_shmem_size();
127pgaio_uring_shmem_init(
bool first_time)
132 pgaio_uring_contexts = (PgAioUringContext *)
138 for (
int contextno = 0; contextno < TotalProcs; contextno++)
140 PgAioUringContext *context = &pgaio_uring_contexts[contextno];
165 int err = ERRCODE_INTERNAL_ERROR;
170 err = ERRCODE_INSUFFICIENT_PRIVILEGE;
171 hint =
_(
"Check if io_uring is disabled via /proc/sys/kernel/io_uring_disabled.");
173 else if (-ret == EMFILE)
175 err = ERRCODE_INSUFFICIENT_RESOURCES;
176 hint =
psprintf(
_(
"Consider increasing \"ulimit -n\" to at least %d."),
179 else if (-ret == ENOSYS)
181 err = ERRCODE_FEATURE_NOT_SUPPORTED;
182 hint =
_(
"Kernel does not support io_uring.");
190 errmsg(
"could not setup io_uring queue: %m"),
191 hint != NULL ?
errhint(
"%s", hint) : 0);
199pgaio_uring_init_backend(
void)
203 pgaio_my_uring_context = &pgaio_uring_contexts[
MyProcNumber];
209 struct io_uring *uring_instance = &pgaio_my_uring_context->io_uring_ring;
214 for (
int i = 0;
i < num_staged_ios;
i++)
217 struct io_uring_sqe *sqe;
219 sqe = io_uring_get_sqe(uring_instance);
222 elog(
ERROR,
"io_uring submission queue is unexpectedly full");
225 pgaio_uring_sq_from_io(ioh, sqe);
244 io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
254 ret = io_uring_submit(uring_instance);
260 "aio method uring: submit EINTR, nios: %d",
285 elog(
PANIC,
"io_uring submit failed: %m");
287 else if (ret != num_staged_ios)
290 elog(
PANIC,
"io_uring submit submitted only %d of %d",
291 ret, num_staged_ios);
296 "aio method uring: submitted %d IOs",
302 return num_staged_ios;
306pgaio_uring_completion_error_callback(
void *
arg)
322 owner_pid = owner_proc->
pid;
324 errcontext(
"completing I/O on behalf of process %d", owner_pid);
328pgaio_uring_drain_locked(PgAioUringContext *context)
336 errcallback.
callback = pgaio_uring_completion_error_callback;
345 orig_ready = ready = io_uring_cq_ready(&context->io_uring_ring);
349 struct io_uring_cqe *cqes[PGAIO_MAX_LOCAL_COMPLETED_IO];
354 io_uring_peek_batch_cqe(&context->io_uring_ring,
356 Min(PGAIO_MAX_LOCAL_COMPLETED_IO, ready));
361 for (
int i = 0;
i < ncqes;
i++)
363 struct io_uring_cqe *cqe = cqes[
i];
366 ioh = io_uring_cqe_get_data(cqe);
367 errcallback.
arg = ioh;
368 io_uring_cqe_seen(&context->io_uring_ring, cqe);
371 errcallback.
arg = NULL;
377 "drained %d/%d, now expecting %d",
378 ncqes, orig_ready, io_uring_cq_ready(&context->io_uring_ring));
389 PgAioUringContext *owner_context = &pgaio_uring_contexts[owner_procno];
403 "wait_one io_gen: %llu, ref_gen: %llu, cycle %d",
405 (
long long unsigned) ref_generation,
414 else if (io_uring_cq_ready(&owner_context->io_uring_ring))
422 struct io_uring_cqe *cqes;
426 ret = io_uring_wait_cqes(&owner_context->io_uring_ring, &cqes, 1, NULL, NULL);
449 pgaio_uring_drain_locked(owner_context);
456 "wait_one with %d sleeps",
461pgaio_uring_sq_from_io(
PgAioHandle *ioh,
struct io_uring_sqe *sqe)
471 io_uring_prep_read(sqe,
479 io_uring_prep_readv(sqe,
492 io_uring_prep_write(sqe,
500 io_uring_prep_writev(sqe,
509 elog(
ERROR,
"trying to prepare invalid IO operation for execution");
512 io_uring_sqe_set_data(sqe, ioh);
void pgaio_io_process_completion(PgAioHandle *ioh, int result)
PgAioBackend * pgaio_my_backend
bool pgaio_io_was_recycled(PgAioHandle *ioh, uint64 ref_generation, PgAioHandleState *state)
void pgaio_io_prepare_submit(PgAioHandle *ioh)
#define pgaio_debug(elevel, msg,...)
#define pgaio_debug_io(elevel, ioh, msg,...)
#define PGAIO_SUBMIT_BATCH_SIZE
ErrorContextCallback * error_context_stack
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
int max_files_per_process
Assert(PointerIsAligned(start, uint64))
static uint32 dclist_count(const dclist_head *head)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockInitialize(LWLock *lock, int tranche_id)
@ LWTRANCHE_AIO_URING_COMPLETION
#define START_CRIT_SECTION()
#define END_CRIT_SECTION()
#define NUM_AUXILIARY_PROCS
#define GetPGProcByNumber(n)
char * psprintf(const char *fmt,...)
Size mul_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
struct ErrorContextCallback * previous
void(* callback)(void *arg)
bool wait_on_fd_before_close
dclist_head in_flight_ios
struct PgAioOpData::@123 write
struct PgAioOpData::@122 read
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)