Warning: Due to various recent migrations, viewing non-HEAD refs may be broken.
/aio/loop.ha (raw)
use errors;
use linux::io_uring;
use io;
// An asynchronous event loop.
export type loop = struct {
	// The underlying io_uring for this event loop.
	io_uring: io_uring::io_uring,
};
// Creates a new event loop. The user must pass the return value to [[finish]]
// to free associated resources when done using the loop.
//
// The optional "entries" parameter controls how many submission/completion
// queue entries the submission/completion queues could hold. Most applications
// should not need to configure this parameter.
export fn loop_new(entries: u32 = 4096) (loop | error) = {
	let params = io_uring::ring_params {
		...
	};
	let ring = io_uring::ring_init(entries, ¶ms)!; // TODO
	return loop {
		io_uring = ring,
	};
};
// Frees resources associated with an event loop. Must only be called once per
// event loop object. Invalidates all buffers and other objects associated with
// the event loop.
export fn loop_finish(loop: *loop) void = {
	io_uring::ring_exit(&loop.io_uring);
};
// Submit the queued I/O asynchronously. Returns the number of submissions
// accepted by the kernel.
export fn loop_submit(loop: *loop) (uint | error) = {
	match (io_uring::ring_submit(&loop.io_uring)) {
	case let n: uint =>
		return n;
	case let e: errors::error =>
		return e;
	};
};
// Dispatches the event loop, waiting for new events and calling their callbacks
// as appropriate.
export fn loop_run(loop: *loop) void = { // TODO: return type?
	for (true) {
		let cqe = match(io_uring::cqe_wait(&loop.io_uring)) {
		case errors::interrupted => continue;
		case errors::busy => abort("NODROP issues... handle later");
		case errors::exists => abort("thread submitting work is invalid but we arent doing threads?");
		case let c: *io_uring::cqe => yield c;
		};
		let ctx = match (io_uring::cqe_get_data(cqe)) {
		case null => abort("context must not be null");
		case let o: *opaque => yield o;
		};
		let ctx = ctx: *op_ctx;
		match (*ctx) {
		case let c: op_read_ctx =>
			let res = match (io_uring::cqe_result(cqe)) {
			case let e: errors::error => yield e: io::error;
			case let n: int =>
				yield switch (n) {
				case 0 => yield io::EOF;
				case => yield n: size;
				};
			};
			c.callback(loop, c.file, res, cqe.flags: completion_flags, c.user);
		case void =>
			abort("my cat walked through the door");
		};
		free(ctx);
		io_uring::cqe_seen(loop, cqe);
	};
};