From a5c1b1f6be2b4f1b23c1eb3c0637091ecffda97a Mon Sep 17 00:00:00 2001 From: Drew DeVault Date: Sat, 15 May 2021 21:00:54 -0400 Subject: [PATCH] linux::io_uring: re-home from linux::uring Signed-off-by: Drew DeVault --- io_uring/queue.ha | 137 +++++++++++++++++++++++++++++++++++++++++++++++++++++ io_uring/setup.ha | 91 +++++++++++++++++++++++++++++++++++++++++++++++++++++ io_uring/sqe.ha | 39 +++++++++++++++++++++++++++++++++++++++ io_uring/uring.ha | 337 +++++++++++++++++++++++++++++++++++++++++++++++++++++ diff --git a/io_uring/queue.ha b/io_uring/queue.ha new file mode 100644 index 0000000000000000000000000000000000000000..bb1989f5f377e71549abe1aebfecb671834ae4b3 --- /dev/null +++ b/io_uring/queue.ha @@ -0,0 +1,137 @@ +use errors; +use rt; + +// TODO: Atomics + +// Returns the next available [[sqe]] for this [[io_uring]], or null if the +// queue is full. +export fn get_sqe(ring: *io_uring) nullable *sqe = { + const sq = &ring.sq; + const head = *sq.khead, next = sq.sqe_tail + 1; + if (next - head <= *sq.kring_entries) { + let sqe = &sq.sqes[sq.sqe_tail & *sq.kring_mask]; + sq.sqe_tail = next; + return sqe; + }; + return null; +}; + +fn needs_enter(ring: *io_uring, flags: *enter_flags) bool = { + if (ring.flags & setup_flags::IOPOLL == setup_flags::IOPOLL) { + return true; + }; + + if (*ring.sq.kflags & sqring_flags::NEED_WAKEUP == sqring_flags::NEED_WAKEUP) { + *flags |= enter_flags::SQ_WAKEUP; + return true; + }; + + return false; +}; + +fn needs_flush(ring: *io_uring) bool = + *ring.sq.kflags & sqring_flags::CQ_OVERFLOW == sqring_flags::CQ_OVERFLOW; + +// Submits queued I/O asynchronously. Returns the number of submissions accepted +// by the kernel. +export fn submit(ring: *io_uring) (uint | errors::opaque) = + do_submit(ring, flush_sq(ring), 0u); + +// Submits queued I/O asynchronously and blocks until at least "wait" events are +// complete. If setup_flags::IOPOLL was configured for this ring, the meaning of +// the "wait" parameter is different: a non-zero value will block until at least +// one event is completed. +// +// Returns the number of submissions accepted by the kernel. +export fn submit_wait(ring: *io_uring, wait: uint) (uint | errors::opaque) = + do_submit(ring, flush_sq(ring), wait); + +fn flush_sq(ring: *io_uring) uint = { + let sq = &ring.sq; + let ktail = *sq.ktail; + const mask = *sq.kring_mask; + + if (sq.sqe_head == sq.sqe_tail) { + return ktail - *sq.khead; + }; + + for (let n = sq.sqe_tail - sq.sqe_head; n > 0; n -= 1u) { + sq.array[ktail & mask] = sq.sqe_head & mask; + ktail += 1u; + sq.sqe_head += 1u; + }; + + *sq.ktail = ktail; + return ktail - *sq.khead; +}; + +fn do_submit( + ring: *io_uring, + submitted: uint, + wait: uint, +) (uint | errors::opaque) = { + let flags: enter_flags = enter_flags::GETEVENTS; + if (needs_enter(ring, &flags) || wait != 0) { + return match (rt::io_uring_enter(ring.fd, + submitted, wait, flags, null)) { + err: rt::errno => errors::errno(err), + n: uint => n, + }; + } else { + return submitted; + }; +}; + +fn peek_cqe(ring: *io_uring) (nullable *cqe, uint) = { + let head = *ring.cq.khead; + let tail = *ring.cq.ktail; + let mask = *ring.cq.kring_mask; + let avail = tail - head; + if (avail == 0) { + return (null, 0); + }; + return (&ring.cq.cqes[head & mask], avail); +}; + +export fn get_cqe( + ring: *io_uring, + submit: uint, + wait: uint, +) (nullable *cqe | errors::opaque) = { + let cq: nullable *cqe = null; + for (cq == null) { + let enter = false, overflow = false; + let flags: enter_flags = 0; + + // TODO: tuple destructuring + let tup = peek_cqe(ring); + let avail = tup.1; + cq = tup.0; + + if (cq == null && wait == 0 && submit == 0) { + if (!needs_flush(ring)) { + // TODO: EAGAIN + abort(); + }; + overflow = true; + }; + if (wait > avail || overflow) { + flags |= enter_flags::GETEVENTS; + enter = true; + }; + if (submit > 0) { + needs_enter(ring, &flags); + enter = true; + }; + if (!enter) { + break; + }; + + match (rt::io_uring_enter(ring.fd, + submit, wait, flags: uint, null)) { + err: rt::errno => return errors::errno(err), + n: uint => submit -= n, + }; + }; + return cq; +}; diff --git a/io_uring/setup.ha b/io_uring/setup.ha new file mode 100644 index 0000000000000000000000000000000000000000..99084afef2c343e3b7db6bde9461705bd63518ee --- /dev/null +++ b/io_uring/setup.ha @@ -0,0 +1,91 @@ +use errors; +use rt; + +// Sets up an io_uring. The params parameter must be initialized with the +// desired flags, sq_thread_cpu, and sq_thread_idle parameters; the remaining +// fields are initialized by the kernel. +export fn setup(entries: u32, params: *params) (io_uring | error) = { + const fd = match (rt::io_uring_setup(entries, params)) { + err: rt::errno => return errors::errno(err), + fd: int => fd, + }; + + let uring = io_uring { + sq = sq { ... }, + cq = cq { ... }, + fd = fd, + flags = params.flags, + features = params.features, + }; + let sq = &uring.sq, cq = &uring.cq; + + sq.ring_sz = params.sq_off.array + params.sq_entries * size(uint); + cq.ring_sz = params.cq_off.cqes + params.cq_entries * size(cqe); + + if (uring.features & features::SINGLE_MMAP == features::SINGLE_MMAP) { + if (cq.ring_sz > sq.ring_sz) { + sq.ring_sz = cq.ring_sz; + }; + cq.ring_sz = sq.ring_sz; + }; + + sq.ring_ptr = match (rt::mmap(null, + params.sq_off.array + entries * size(u32), + rt::PROT_READ | rt::PROT_WRITE, + rt::MAP_SHARED | rt::MAP_POPULATE, + fd, OFF_SQ_RING)) { + err: rt::errno => return errors::errno(err), + ptr: *void => ptr, + }; + + cq.ring_ptr = if (uring.features & features::SINGLE_MMAP == features::SINGLE_MMAP) { + sq.ring_ptr; + } else match (rt::mmap(null, cq.ring_sz, + rt::PROT_READ | rt::PROT_WRITE, + rt::MAP_SHARED | rt::MAP_POPULATE, + fd, OFF_CQ_RING)) { + err: rt::errno => return errors::errno(err), + ptr: *void => ptr, + }; + + const ring_ptr = sq.ring_ptr: uintptr; + sq.khead = (ring_ptr + params.sq_off.head: uintptr): *uint; + sq.ktail = (ring_ptr + params.sq_off.tail: uintptr): *uint; + sq.kring_mask = (ring_ptr + params.sq_off.ring_mask: uintptr): *uint; + sq.kring_entries = (ring_ptr + params.sq_off.ring_entries: uintptr): *uint; + sq.kflags = (ring_ptr + params.sq_off.flags: uintptr): *sqring_flags; + sq.kdropped = (ring_ptr + params.sq_off.dropped: uintptr): *uint; + sq.array = (ring_ptr + params.sq_off.array: uintptr): *[*]uint; + sq.sqes = match (rt::mmap(null, + params.sq_entries * size(sqe), + rt::PROT_READ | rt::PROT_WRITE, + rt::MAP_SHARED | rt::MAP_POPULATE, + fd, OFF_SQES)) { + err: rt::errno => return errors::errno(err), + ptr: *void => ptr: *[*]sqe, + }; + + const ring_ptr = cq.ring_ptr: uintptr; + cq.khead = (ring_ptr + params.cq_off.head: uintptr): *uint; + cq.ktail = (ring_ptr + params.cq_off.tail: uintptr): *uint; + cq.kring_mask = (ring_ptr + params.cq_off.ring_mask: uintptr): *uint; + cq.kring_entries = (ring_ptr + params.cq_off.ring_entries: uintptr): *uint; + cq.koverflow = (ring_ptr + params.cq_off.overflow: uintptr): *uint; + cq.cqes = (ring_ptr + params.cq_off.cqes: uintptr): *[*]cqe; + + if (params.cq_off.flags != 0) { + cq.kflags = (ring_ptr + params.cq_off.flags: uintptr): *cqring_flags; + }; + + return uring; +}; + +// Frees state associated with an [[io_uring]]. +export fn finish(ring: *io_uring) void = { + let sq = &ring.sq, cq = &ring.cq; + rt::munmap(sq.ring_ptr, sq.ring_sz): void; + if (cq.ring_ptr != null: *void && cq.ring_ptr != sq.ring_ptr) { + rt::munmap(cq.ring_ptr, cq.ring_sz): void; + }; + rt::close(ring.fd): void; +}; diff --git a/io_uring/sqe.ha b/io_uring/sqe.ha new file mode 100644 index 0000000000000000000000000000000000000000..924efaaa6025cab727bbbf8994458e3c4da9aa58 --- /dev/null +++ b/io_uring/sqe.ha @@ -0,0 +1,39 @@ +use types; + +fn prep(sq: *sqe, op: op, flags: sqe_flags...) void = { + // XXX: Is this compatible with the spec? + *sq = sqe { opcode = op, ... }; + for (let i = 0z; i < len(flags); i += 1) { + sq.flags |= flags[i]; + }; +}; + +// Prepares a read operation for an [[sqe]]. +export fn read( + sqe: *sqe, + fd: int, + buf: *void, + count: size, + flags: sqe_flags..., +) void = { + prep(sqe, op::READ, flags...); + sqe.fd = fd; + sqe.addr = buf: uintptr: u64; + assert(count <= types::U32_MAX); + sqe.length = count: u32; +}; + +// Prepares a write operation for an [[sqe]]. +export fn write( + sqe: *sqe, + fd: int, + buf: *void, + count: size, + flags: sqe_flags..., +) void = { + prep(sqe, op::WRITE, flags...); + sqe.fd = fd; + sqe.addr = buf: uintptr: u64; + assert(count <= types::U32_MAX); + sqe.length = count: u32; +}; diff --git a/io_uring/uring.ha b/io_uring/uring.ha new file mode 100644 index 0000000000000000000000000000000000000000..4513159be88e1fa01c40e734ac676cef016570df --- /dev/null +++ b/io_uring/uring.ha @@ -0,0 +1,337 @@ +use errors; + +// All errors which may be returned by this module. +export type error = errors::opaque; + +// Converts an [[error]] into a human-readable string. +export fn strerror(err: error) const str = { + return errors::strerror(err); +}; + +def CQE_BUFFER_SHIFT: uint = 16; +def OFF_SQ_RING: u64 = 0; +def OFF_CQ_RING: u64 = 0x8000000; +def OFF_SQES: u64 = 0x10000000; + +// An io_uring [[sqe]] operation. +export type op = enum u8 { + NOP, + READV, + WRITEV, + FSYNC, + READ_FIXED, + WRITE_FIXED, + POLL_ADD, + POLL_REMOVE, + SYNC_FILE_RANGE, + SENDMSG, + RECVMSG, + TIMEOUT, + TIMEOUT_REMOVE, + ACCEPT, + ASYNC_CANCEL, + LINK_TIMEOUT, + CONNECT, + FALLOCATE, + OPENAT, + CLOSE, + FILES_UPDATE, + STATX, + READ, + WRITE, + FADVISE, + MADVISE, + SEND, + RECV, + OPENAT2, + EPOLL_CTL, + SPLICE, + PROVIDE_BUFFERS, + REMOVE_BUFFERS, + TEE, +}; + +// Flags for an [[sqe]]. +export type sqe_flags = enum u8 { + // Use fixed fileset + FIXED_FILE = 1 << 0, + // Issue after inflight IO + IO_DRAIN = 1 << 1, + // Links next sqe + IO_LINK = 1 << 2, + // Like LINK, but stronger + IO_HARDLINK = 1 << 3, + // Always go async + ASYNC = 1 << 4, + // Select buffer from sqe.buf_group + BUFFER_SELECT = 1 << 5, +}; + +// Flags for an fsync operation. +export type fsync_flags = enum u32 { + DATASYNC = 1 << 0, +}; + +// Flags for a timeout operation. +export type timeout_flags = enum u32 { + ABS = 1 << 0, +}; + +// Flags for a splice operation. +export type splice_flags = enum u32 { + F_FD_IN_FIXED = 1 << 31, +}; + +// Flags for a [[cqe]]. +export type cqe_flags = enum u32 { + F_BUFFER = 1 << 0, +}; + +// A submission queue entry. +export type sqe = struct { + opcode: op, + flags: sqe_flags, + ioprio: u16, + fd: i32, + union { + off: u64, + addr2: u64, + }, + union { + addr: u64, + splice_off_in: u64, + }, + length: u32, + union { + rw_flags: int, + fsync_flags: fsync_flags, + poll_events: u16, + poll32_events: u32, + sync_range_flags: u32, + msg_flags: u32, + timeout_flags: timeout_flags, + accept_flags: u32, + cancel_flags: u32, + open_flags: u32, + statx_flags: u32, + fadvise_advice: u32, + splice_flags: splice_flags, + }, + user_data: u64, + union { + struct { + union { + buf_index: u16, + buf_group: u16, + }, + personality: u16, + splice_fd_in: i32, + }, + pad2: [3]u64, + }, +}; + +// A completion queue entry. +export type cqe = struct { + user_data: u64, + res: i32, + flags: cqe_flags, +}; + +// Filled with the offset for mmap(2) +export type sqring_offsets = struct { + head: u32, + tail: u32, + ring_mask: u32, + ring_entries: u32, + flags: u32, + dropped: u32, + array: u32, + resv1: u32, + resv2: u64, +}; + +// Flags for the sq ring. +export type sqring_flags = enum u32 { + // Needs io_uring_enter wakeup + NEED_WAKEUP = 1 << 0, + // CQ ring is overflown + CQ_OVERFLOW = 1 << 1, +}; + +// Filled with the offset for mmap(2) +export type cqring_offsets = struct { + head: u32, + tail: u32, + ring_mask: u32, + ring_entries: u32, + overflow: u32, + cqes: u32, + flags: u32, + resv1: u32, + resv2: u64, +}; + +// Flags for the cq ring. +export type cqring_flags = enum u32 { + EVENTFD_DISABLED = 1 << 0, +}; + +// Flags for [[setup]]. +export type setup_flags = enum u32 { + // io_context is polled + IOPOLL = 1 << 0, + // SQ poll thread + SQPOLL = 1 << 1, + // sq_thread_cpu is valid + SQ_AFF = 1 << 2, + // App defines CQ size + CQSIZE = 1 << 3, + // Clamp SQ/CQ ring sizes + CLAMP = 1 << 4, + // Attach to existing wq + ATTACH_WQ = 1 << 5, + // Start with ring disabled + R_DISABLED = 1 << 6, +}; + +// Parameters for [[setup]]. Partially completed by the kernel. +export type params = struct { + sq_entries: u32, + cq_entries: u32, + flags: setup_flags, + sq_thread_cpu: u32, + sq_thread_idle: u32, + features: features, + wq_fd: u32, + resv: [3]u32, + sq_off: sqring_offsets, + cq_off: cqring_offsets, +}; + +// Features supported by the kernel. +export type features = enum u32 { + SINGLE_MMAP = 1 << 0, + NODROP = 1 << 1, + SUBMIT_STABLE = 1 << 2, + RW_CUR_POS = 1 << 3, + CUR_PERSONALITY = 1 << 4, + FAST_POLL = 1 << 5, + POLL_32BITS = 1 << 6, +}; + +// Flags for [[enter]]. +export type enter_flags = enum uint { + GETEVENTS = 1 << 0, + SQ_WAKEUP = 1 << 1, + SQ_WAIT = 1 << 2, +}; + +// Operations for [[register]]. +export type regop = enum uint { + REGISTER_BUFFERS, + UNREGISTER_BUFFERS, + REGISTER_FILES, + UNREGISTER_FILES, + REGISTER_EVENTFD, + UNREGISTER_EVENTFD, + REGISTER_FILES_UPDATE, + REGISTER_EVENTFD_ASYNC, + REGISTER_PROBE, + REGISTER_PERSONALITY, + UNREGISTER_PERSONALITY, + REGISTER_RESTRICTIONS, + REGISTER_ENABLE_RINGS, +}; + +// Information for a REGISTER_FILES_UPDATE operation. +export type files_update = struct { + offs: u32, + resv: u32, + // XXX: i32 but aligned to u64 + fds: u64, +}; + +// Flags for a probe operation. +export type op_flags = enum u16 { + SUPPORTED = 1 << 0, +}; + +// REGISTER_PROBE operation details. +export type probe_op = struct { + op: u8, + resv: u8, + flags: op_flags, + resv2: u32, +}; + +// Summary of REGISTER_PROBE results. +export type probe = struct { + last_op: u8, + ops_len: u8, + resv: u16, + resv2: [3]u32, + ops: [*]probe_op, +}; + +// Details for a REGISTER_RESTRICTIONS operation. +export type restriction = struct { + opcode: restriction_op, + union { + register_op: u8, + sqe_op: u8, + sqe_flags: u8, + }, + resv: u8, + resv2: [3]u32, +}; + +// Opcode for a [[restriction]]. +export type restriction_op = enum u16 { + // Allow an io_uring_register(2) opcode + REGISTER_OP = 0, + // Allow an sqe opcode + SQE_OP = 1, + // Allow sqe flags + SQE_FLAGS_ALLOWED = 2, + // Require sqe flags (these flags must be set on each submission) + SQE_FLAGS_REQUIRED = 3, +}; + +// State for an io_uring. +export type io_uring = struct { + sq: sq, + cq: cq, + fd: int, + flags: setup_flags, + features: features, +}; + +// Submission queue state. +export type sq = struct { + khead: *uint, + ktail: *uint, + kring_mask: *uint, + kring_entries: *uint, + kflags: *sqring_flags, + kdropped: *uint, + array: *[*]uint, + sqes: *[*]sqe, + sqe_head: uint, + sqe_tail: uint, + ring_sz: size, + ring_ptr: *void, +}; + +// Completion queue state. +export type cq = struct { + khead: *uint, + ktail: *uint, + kring_mask: *uint, + kring_entries: *uint, + kflags: *cqring_flags, + koverflow: *uint, + cqes: *[*]cqe, + ring_sz: size, + ring_ptr: *void, +}; -- 2.48.1