From 5ad8541d347b92d64b73ab4bcb35ff6939f3d76a Mon Sep 17 00:00:00 2001 From: Runxi Yu Date: Thu, 02 Oct 2025 04:42:10 +0800 Subject: [PATCH] Initialize aio:: --- aio/cqe.ha | 4 ++++ aio/errors.ha | 12 ++++++++++++ aio/loop.ha | 83 +++++++++++++++++++++++++++++++++++++++++++++++++++++ aio/op.ha | 9 +++++++++ aio/op_read.ha | 38 ++++++++++++++++++++++++++++++++++++++ aio/sqe.ha | 7 +++++++ cmd/read_file/main.ha | 42 ++++++++++++++++++++++++++++++++++++++++++ diff --git a/aio/cqe.ha b/aio/cqe.ha new file mode 100644 index 0000000000000000000000000000000000000000..d5fd545a81250decf0c2cd9aa187fa5470d1d9d8 --- /dev/null +++ b/aio/cqe.ha @@ -0,0 +1,4 @@ +use linux::io_uring; + +// Flags for when an I/O operation is completed. +export type completion_flags = io_uring::cqe_flags; diff --git a/aio/errors.ha b/aio/errors.ha new file mode 100644 index 0000000000000000000000000000000000000000..c2d01c3b881535b12e344fa013ce7f3826f8af34 --- /dev/null +++ b/aio/errors.ha @@ -0,0 +1,12 @@ +use errors; + +// There are no available submission queue entries on the submission queue. +// You should probably create a larger loop. +export type full = !void; + +// Buffer pool use was configured for a [[submission]], but there are no buffers +// available. +export type nobuffers = !void; + +// All errors which may be returned by this module. +export type error = !(errors::error | nobuffers | full); diff --git a/aio/loop.ha b/aio/loop.ha new file mode 100644 index 0000000000000000000000000000000000000000..194c6efe5233ff41e48cfff16d587042c122c388 --- /dev/null +++ b/aio/loop.ha @@ -0,0 +1,83 @@ +use errors; +use linux::io_uring; +use io; + +// An asynchronous event loop. +export type loop = struct { + // The underlying io_uring for this event loop. + io_uring: io_uring::io_uring, +}; + +// Creates a new event loop. The user must pass the return value to [[finish]] +// to free associated resources when done using the loop. +// +// The optional "entries" parameter controls how many submission/completion +// queue entries the submission/completion queues could hold. Most applications +// should not need to configure this parameter. +export fn loop_new(entries: u32 = 4096) (loop | error) = { + let params = io_uring::ring_params { + ... + }; + + let ring = io_uring::ring_init(entries, ¶ms)!; // TODO + + return loop { + io_uring = ring, + }; +}; + +// Frees resources associated with an event loop. Must only be called once per +// event loop object. Invalidates all buffers and other objects associated with +// the event loop. +export fn loop_finish(loop: *loop) void = { + io_uring::ring_exit(&loop.io_uring); +}; + +// Submit the queued I/O asynchronously. Returns the number of submissions +// accepted by the kernel. +export fn loop_submit(loop: *loop) (uint | error) = { + match (io_uring::ring_submit(&loop.io_uring)) { + case let n: uint => + return n; + case let e: errors::error => + return e; + }; +}; + +// Dispatches the event loop, waiting for new events and calling their callbacks +// as appropriate. +export fn loop_run(loop: *loop) void = { // TODO: return type? + for (true) { + let cqe = match(io_uring::cqe_wait(&loop.io_uring)) { + case errors::interrupted => continue; + case errors::busy => abort("NODROP issues... handle later"); + case errors::exists => abort("thread submitting work is invalid but we arent doing threads?"); + case let c: *io_uring::cqe => yield c; + }; + + let ctx = match (io_uring::cqe_get_data(cqe)) { + case null => abort("context must not be null"); + case let o: *opaque => yield o; + }; + + let ctx = ctx: *op_ctx; + + match (*ctx) { + case let c: op_read_ctx => + let res = match (io_uring::cqe_result(cqe)) { + case let e: errors::error => yield e: io::error; + case let n: int => + yield switch (n) { + case 0 => yield io::EOF; + case => yield n: size; + }; + }; + c.callback(loop, c.file, res, cqe.flags: completion_flags, c.user); + case void => + abort("my cat walked through the door"); + }; + + free(ctx); + io_uring::cqe_seen(loop, cqe); + }; +}; diff --git a/aio/op.ha b/aio/op.ha new file mode 100644 index 0000000000000000000000000000000000000000..5d6495ff39902f313abdba1ca498b8552dc795c7 --- /dev/null +++ b/aio/op.ha @@ -0,0 +1,9 @@ +// An operation context. +// +// Because we need to retreive the file descriptor, user pointer, callbacks, and +// other information related to each SQE when they come back as CQEs, we must +// encapsulate these in our own "context" structs. But each operation's context +// is different yet could not be identified without looking into user data +// itself, and we also need a way to distinguish between different types +// of events; so here's a tagged union thereof. +type op_ctx = (op_read_ctx | void); // TODO: replace void with actual other ctx's diff --git a/aio/op_read.ha b/aio/op_read.ha new file mode 100644 index 0000000000000000000000000000000000000000..ababfd173354638a1f1a5dc5f84aa4a813f68957 --- /dev/null +++ b/aio/op_read.ha @@ -0,0 +1,38 @@ +use io; +use linux::io_uring; + +// The callback type for [[op_read]]. +export type op_read_cb = fn(loop: *loop, file: io::file, r: (size | io::EOF | io::error), flags: completion_flags, user: nullable *opaque) void; + +// An I/O request context for [[op_read]]. +type op_read_ctx = struct { + callback: *op_read_cb, + user: nullable *opaque, + file: io::file, +}; + +// Prepares an I/O read request. +export fn op_read( + loop: *loop, + file: io::file, + buf: []u8, + offs: u64, + callback: *op_read_cb, + user: nullable *opaque, + flags: submission_flags +) (*submission | full | nomem) = { + let sqe = match (io_uring::ring_get_sqe(&loop.io_uring)) { + case null => + return full; + case let s: *io_uring::sqe => + yield s; + }; + io_uring::op_read(sqe, file: int, *(&buf: **opaque), len(buf), offs, flags: io_uring::sqe_flags); + let ctx: op_ctx = op_read_ctx { + callback = callback, + user = user, + file = file, + }; + io_uring::sqe_set_data(sqe, alloc(ctx)?); + return sqe: *submission; +}; diff --git a/aio/sqe.ha b/aio/sqe.ha new file mode 100644 index 0000000000000000000000000000000000000000..5f0e63c238132ac25986f97b3fc78742da8028ee --- /dev/null +++ b/aio/sqe.ha @@ -0,0 +1,7 @@ +use linux::io_uring; + +// An SQE. +export type submission = io_uring::sqe; + +// An SQE's lfags +export type submission_flags = io_uring::sqe_flags; diff --git a/cmd/read_file/main.ha b/cmd/read_file/main.ha new file mode 100644 index 0000000000000000000000000000000000000000..c8b15ae29c217868ff1b04bf94ee1e96745c0422 --- /dev/null +++ b/cmd/read_file/main.ha @@ -0,0 +1,42 @@ +use aio; +use fmt; +use io; +use os; + +let buf32: [32]u8 = [0...]; + +let buf: []u8 = buf32[..]; + +let ofs: size = 0; + +export fn main() void = { + let loop = aio::loop_new()!; + + let file = os::open("main.ha")!; + + aio::op_read(&loop, file, buf, ofs, &on_read, null, aio::submission_flags::NONE)!; + aio::loop_submit(&loop)!; + + aio::loop_run(&loop); + + // loop_finish(&loop); +}; + +fn on_read( + loop: *aio::loop, + file: io::file, + r: (size | io::EOF | io::error), + flags: aio::completion_flags, + user: nullable *opaque, +) void = { + match (r) { + case io::EOF => abort("EOF"); + case io::error => abort(io::strerror(r as io::error)); + case size => void; + }; + + io::write(os::stdout, buf[.. r as size])!; + ofs += r as size; + aio::op_read(loop, file, buf, ofs, &on_read, null, aio::submission_flags::NONE)!; // TODO: use multishot instead? or does that not work for files + aio::loop_submit(loop)!; +}; -- 2.48.1