Warning: Due to various recent migrations, viewing non-HEAD refs may be broken.
/aio/loop.ha (raw)
use errors;
use linux::io_uring;
use io;
// An asynchronous event loop.
export type loop = struct {
// The underlying io_uring for this event loop.
io_uring: io_uring::io_uring,
};
// Creates a new event loop. The user must pass the return value to [[finish]]
// to free associated resources when done using the loop.
//
// The optional "entries" parameter controls how many submission/completion
// queue entries the submission/completion queues could hold. Most applications
// should not need to configure this parameter.
export fn loop_new(entries: u32 = 4096) (loop | error) = {
let params = io_uring::ring_params {
...
};
let ring = io_uring::ring_init(entries, ¶ms)!; // TODO
return loop {
io_uring = ring,
};
};
// Frees resources associated with an event loop. Must only be called once per
// event loop object. Invalidates all buffers and other objects associated with
// the event loop.
export fn loop_finish(loop: *loop) void = {
io_uring::ring_exit(&loop.io_uring);
};
// Submit the queued I/O asynchronously. Returns the number of submissions
// accepted by the kernel.
export fn loop_submit(loop: *loop) (uint | error) = {
match (io_uring::ring_submit(&loop.io_uring)) {
case let n: uint =>
return n;
case let e: errors::error =>
return e;
};
};
// Dispatches the event loop, waiting for new events and calling their callbacks
// as appropriate.
export fn loop_run(loop: *loop) void = { // TODO: return type?
for (true) {
let cqe = match(io_uring::cqe_wait(&loop.io_uring)) {
case errors::interrupted => continue;
case errors::busy => abort("NODROP issues... handle later");
case errors::exists => abort("thread submitting work is invalid but we arent doing threads?");
case let c: *io_uring::cqe => yield c;
};
let ctx = match (io_uring::cqe_get_data(cqe)) {
case null => abort("context must not be null");
case let o: *opaque => yield o;
};
let ctx = ctx: *op_ctx;
match (*ctx) {
case let c: op_read_ctx =>
let res = match (io_uring::cqe_result(cqe)) {
case let e: errors::error => yield e: io::error;
case let n: int =>
yield switch (n) {
case 0 => yield io::EOF;
case => yield n: size;
};
};
c.callback(loop, c.file, res, cqe.flags: completion_flags, c.user);
case void =>
abort("my cat walked through the door");
};
free(ctx);
io_uring::cqe_seen(loop, cqe);
};
};