Lindenii Project Forge
Login

hare-aio

Asynchronous I/O event loops for Hare

Hi… I am well aware that this diff view is very suboptimal. It will be fixed when the refactored server comes along!

Commit info
ID
1446fc1748db7c1640eac40ef136a2775747b433
Author
Drew DeVault <sir@cmpwn.com>
Author date
Thu, 16 Sep 2021 17:55:09 +0200
Committer
Drew DeVault <sir@cmpwn.com>
Committer date
Sat, 25 Sep 2021 09:16:47 +0200
Actions
all: overhaul switch/match syntax

This changes the syntax of switch and match expressions following
similar changes to harec et al.

match (x) {
	case type =>
		do_work();
		yield 10;
	case x: type =>
		process(x);
		yield 20;
	case =>
		abort();
};

Signed-off-by: Drew DeVault <sir@cmpwn.com>
Signed-off-by: Alexey Yerin <yyp@disroot.org>
Co-authored-by: Alexey Yerin <yyp@disroot.org>
use errors;
use rt;

// Advances the completion queue by N items.
export fn cq_advance(ring: *io_uring, n: uint) void = {
	*ring.cq.khead = *ring.cq.khead + n;
};

// Call after processing a [[cqe]]. The cqe is returned to the pool and cannot
// be used by the application again.
export fn cqe_seen(ring: *io_uring, cqe: *cqe) void = cq_advance(ring, 1);

// Waits until a CQE is available, then returns it. The caller must pass the
// returned CQE to [[cqe_seen]] to advance the queue.
export fn wait(ring: *io_uring) (*cqe | error) = {
	return match (get_cqe(ring, 0, 1)) {
		err: error => err,
		cq: nullable *cqe => {
			assert(cq != null); // XXX: Correct?
			yield cq: *cqe;
		},
	match (get_cqe(ring, 0, 1)) {
	case err: error =>
		return err;
	case cq: nullable *cqe =>
		assert(cq != null); // XXX: Correct?
		return cq: *cqe;
	};
};

// Peeks the next CQE from the queue and returns it, or null if none are
// pending. The caller must pass the returned CQE to [[cqe_seen]] to advance the
// queue.
export fn peek(ring: *io_uring) (nullable *cqe | error) = get_cqe(ring, 0, 0);

// Returns the result of a [[cqe]], or an error if unsuccessful.
export fn result(cqe: *cqe) (int | error) = 
	if (cqe.res < 0) errors::errno(rt::wrap_errno(-cqe.res))
	else cqe.res;

// Gets the user data field of a [[cqe]]. See [[set_user]] for the corresponding
// SQE function.
export fn get_user(cqe: *cqe) nullable *void =
	cqe.user_data: uintptr: nullable *void;

fn peek_cqe(ring: *io_uring) (nullable *cqe, uint) = {
	let head = *ring.cq.khead;
	let tail = *ring.cq.ktail;
	let mask = *ring.cq.kring_mask;
	let avail = tail - head;
	if (avail == 0) {
		return (null, 0);
	};
	return (&ring.cq.cqes[head & mask], avail);
};

fn get_cqe(
	ring: *io_uring,
	submit: uint,
	wait: uint,
) (nullable *cqe | error) = {
	let cq: nullable *cqe = null;
	for (cq == null) {
		let enter = false, overflow = false;
		let flags: enter_flags = 0;

		// TODO: tuple destructuring
		let tup = peek_cqe(ring);
		let avail = tup.1;
		cq = tup.0;

		if (cq == null && wait == 0 && submit == 0) {
			if (!needs_flush(ring)) {
				return null;
			};
			overflow = true;
		};
		if (wait > avail || overflow) {
			flags |= enter_flags::GETEVENTS;
			enter = true;
		};
		if (submit > 0) {
			needs_enter(ring, &flags);
			enter = true;
		};
		if (!enter) {
			break;
		};

		match (rt::io_uring_enter(ring.fd,
				submit, wait, flags: uint, null)) {
			err: rt::errno => return errors::errno(err),
			n: uint => submit -= n,
			submit, wait, flags: uint, null)) {
		case err: rt::errno =>
			return errors::errno(err);
		case n: uint =>
			submit -= n;
		};
	};
	return cq;
};
use errors;
use rt;

// TODO: Atomics

// Returns the next available [[sqe]] for this [[io_uring]], or null if the
// queue is full.
export fn get_sqe(ring: *io_uring) nullable *sqe = {
	const sq = &ring.sq;
	const head = *sq.khead, next = sq.sqe_tail + 1;
	if (next - head <= *sq.kring_entries) {
		let sqe = &sq.sqes[sq.sqe_tail & *sq.kring_mask];
		sq.sqe_tail = next;
		return sqe;
	};
	return null;
};

// Returns the next available [[sqe]] for this [[io_uring]], or aborts the
// program if the queue is full.
export fn must_get_sqe(ring: *io_uring) *sqe = match (get_sqe(ring)) {
	null => abort("I/O queue full"),
	sq: *sqe => sq,
export fn must_get_sqe(ring: *io_uring) *sqe = {
	match (get_sqe(ring)) {
	case null =>
		abort("I/O queue full");
	case sq: *sqe =>
		return sq;
	};
};

fn needs_enter(ring: *io_uring, flags: *enter_flags) bool = {
	if (ring.flags & setup_flags::IOPOLL != setup_flags::IOPOLL) {
		return true;
	};

	if (*ring.sq.kflags & sqring_flags::NEED_WAKEUP == sqring_flags::NEED_WAKEUP) {
		*flags |= enter_flags::SQ_WAKEUP;
		return true;
	};

	return false;
};

fn needs_flush(ring: *io_uring) bool =
	*ring.sq.kflags & sqring_flags::CQ_OVERFLOW == sqring_flags::CQ_OVERFLOW;

// Submits queued I/O asynchronously. Returns the number of submissions accepted
// by the kernel.
export fn submit(ring: *io_uring) (uint | error) =
	do_submit(ring, flush_sq(ring), 0u);

// Submits queued I/O asynchronously and blocks until at least "wait" events are
// complete. If setup_flags::IOPOLL was configured for this ring, the meaning of
// the "wait" parameter is different: a non-zero value will block until at least
// one event is completed.
//
// Returns the number of submissions accepted by the kernel.
export fn submit_wait(ring: *io_uring, wait: uint) (uint | error) =
	do_submit(ring, flush_sq(ring), wait);

fn flush_sq(ring: *io_uring) uint = {
	let sq = &ring.sq;
	let ktail = *sq.ktail;
	const mask = *sq.kring_mask;

	if (sq.sqe_head == sq.sqe_tail) {
		return ktail - *sq.khead;
	};

	for (let n = sq.sqe_tail - sq.sqe_head; n > 0; n -= 1u) {
		sq.array[ktail & mask] = sq.sqe_head & mask;
		ktail += 1u;
		sq.sqe_head += 1u;
	};

	*sq.ktail = ktail;
	return ktail - *sq.khead;
};

fn do_submit(
	ring: *io_uring,
	submitted: uint,
	wait: uint,
) (uint | error) = {
	let flags: enter_flags = enter_flags::GETEVENTS;
	if (needs_enter(ring, &flags) || wait != 0) {
		return match (rt::io_uring_enter(ring.fd,
		match (rt::io_uring_enter(ring.fd,
				submitted, wait, flags, null)) {
			err: rt::errno => errors::errno(err),
			n: uint => n,
		case err: rt::errno =>
			return errors::errno(err);
		case n: uint =>
			return n;
		};
	} else {
		return submitted;
	};
};
use errors;
use rt;
use types;

// Registers a set of fixed buffers with an [[io_uring]]. Note that you must
// call [[unregister_buffers]] before registering a new set of buffers (even if
// some of them have similar addresses to the old set). The buffers must be
// anonymous, non-file-backed memory (e.g. the kind returned by alloc or
// rt::mmap).
export fn register_buffers(ring: *io_uring, iov: []rt::iovec) (void | error) = {
	assert(len(iov) <= types::UINT_MAX);
	return match (rt::io_uring_register(ring.fd, regop::REGISTER_BUFFERS,
	match (rt::io_uring_register(ring.fd, regop::REGISTER_BUFFERS,
			iov: *[*]rt::iovec, len(iov): uint)) {
		err: rt::errno => errors::errno(err),
		int => void,
	case err: rt::errno =>
		return errors::errno(err);
	case int => void;
	};
};

// Unregisters all fixed buffers associated with an [[io_uring]].
export fn unregister_buffers(ring: *io_uring) (void | error) = {
	return match (rt::io_uring_register(ring.fd,
	match (rt::io_uring_register(ring.fd,
			regop::UNREGISTER_BUFFERS, null, 0)) {
		err: rt::errno => errors::errno(err),
		int => void,
	case err: rt::errno =>
		return errors::errno(err);
	case int => void;
	};
};

// Registers a set of file descriptors with an [[io_uring]]. The set of files
// may be sparse, meaning that some are set to -1, to be updated later using
// [[register_files_update]].
export fn register_files(ring: *io_uring, files: []int) (void | error) = {
	assert(len(files) <= types::UINT_MAX);
	return match (rt::io_uring_register(ring.fd, regop::REGISTER_FILES,
	match (rt::io_uring_register(ring.fd, regop::REGISTER_FILES,
			files: *[*]int, len(files): uint)) {
		err: rt::errno => errors::errno(err),
		int => void,
	case err: rt::errno =>
		return errors::errno(err);
	case int => void;
	};
};

// Applies a set of [[files_update]]s to the set of files registered with an
// [[io_uring]].
export fn register_files_update(
	ring: *io_uring,
	updates: []files_update,
) (void | error) = {
	assert(len(updates) <= types::UINT_MAX);
	return match (rt::io_uring_register(ring.fd, regop::REGISTER_FILES_UPDATE,
	match (rt::io_uring_register(ring.fd, regop::REGISTER_FILES_UPDATE,
			updates: *[*]files_update, len(updates): uint)) {
		err: rt::errno => errors::errno(err),
		int => void,
	case err: rt::errno =>
		return errors::errno(err);
	case int => void;
	};
};

// Unregisters all files associated with an [[io_uring]].
export fn unregister_files(ring: *io_uring) (void | error) = {
	return match (rt::io_uring_register(ring.fd,
	match (rt::io_uring_register(ring.fd,
			regop::UNREGISTER_FILES, null, 0)) {
		err: rt::errno => errors::errno(err),
		int => void,
	case err: rt::errno =>
		return errors::errno(err);
	case int => void;
	};
};

// Registers an eventfd(2) with this [[io_uring]] to be notified of completion
// events.
export fn register_eventfd(ring: *io_uring, fd: int) (void | error) = {
	return match (rt::io_uring_register(ring.fd,
	match (rt::io_uring_register(ring.fd,
			regop::REGISTER_EVENTFD, &fd, 1)) {
		err: rt::errno => errors::errno(err),
		int => void,
	case err: rt::errno =>
		return errors::errno(err);
	case int => void;
	};
};

// Similar to [[register_eventfd]], but only notifies of events which complet
// asyncronously.
export fn register_eventfd_async(ring: *io_uring, fd: int) (void | error) = {
	return match (rt::io_uring_register(ring.fd,
	match (rt::io_uring_register(ring.fd,
			regop::REGISTER_EVENTFD_ASYNC, &fd, 1)) {
		err: rt::errno => errors::errno(err),
		int => void,
	case err: rt::errno =>
		return errors::errno(err);
	case int => void;
	};
};

// Unregisters the eventfd(2) associated with this [[io_uring]].
export fn unregister_eventfd(ring: *io_uring) (void | error) = {
	return match (rt::io_uring_register(ring.fd,
	match (rt::io_uring_register(ring.fd,
			regop::UNREGISTER_EVENTFD, null, 0)) {
		err: rt::errno => errors::errno(err),
		int => void,
	case err: rt::errno =>
		return errors::errno(err);
	case int => void;
	};
};

// XXX: This interface is pretty bad. It would be nice to improve on it
// a bit before making it part of the API.
//export fn register_probe(ring: *io_uring, out: *probe, nops: size) void = {
//	assert(nops * size(probe_op) <= types::UINT_MAX);
//	match (rt::io_uring_register(ring.fd, regop::REGISTER_PROBE,
//			out, (nops * size(probe)): uint)) {
//		rt::errno => abort("Unexpected io_uring REGISTER_PROBE error"),
//		void => void,
//	};
//};

// Registers the current process's credentials as a personality with an
// [[io_uring]], returning an ID. Set the personality field of an [[sqe]] to use
// that personality for an I/O submission.
export fn register_personality(ring: *io_uring) int = {
	return match (rt::io_uring_register(ring.fd,
	match (rt::io_uring_register(ring.fd,
			regop::REGISTER_PERSONALITY, null, 0)) {
		rt::errno => abort("Unexpected io_uring REGISTER_PERSONALITY error"),
		i: int => i,
	case rt::errno =>
		abort("Unexpected io_uring REGISTER_PERSONALITY error");
	case i: int =>
		return i;
	};
};

// Unregisters a personality previously configured with
// [[register_personality]].
export fn unregister_personality(ring: *io_uring, id: int) (void | error) = {
	return match (rt::io_uring_register(ring.fd,
	match (rt::io_uring_register(ring.fd,
			regop::UNREGISTER_PERSONALITY, null, id: uint)) {
		err: rt::errno => errors::errno(err),
		int => void,
	case err: rt::errno =>
		return errors::errno(err);
	case int => void;
	};
};

// Enables submissions for an [[io_uring]] which was started in the disabled
// state via [[setup_flags::R_DISABLED]]. Future access to this io_uring is
// subject to any configured restrictions.
export fn register_enable_rings(ring: *io_uring) (void | error) = {
	return match (rt::io_uring_register(ring.fd,
	match (rt::io_uring_register(ring.fd,
			regop::REGISTER_ENABLE_RINGS, null, 0)) {
		err: rt::errno => errors::errno(err),
		int => void,
	case err: rt::errno =>
		return errors::errno(err);
	case int => void;
	};
};

// Registers a restriction for this [[io_uring]], limiting the kinds of future
// registrations and I/O submissions which are permitted for it. This is only
// accepted if the [[io_uring]] was set up in a disabled state via
// [[setup_flags::R_DISABLED]].
export fn register_restrictions(ring: *io_uring, res: []restriction) (void | error) = {
	assert(len(res) < types::UINT_MAX);
	return match (rt::io_uring_register(ring.fd, regop::REGISTER_RESTRICTIONS,
	match (rt::io_uring_register(ring.fd, regop::REGISTER_RESTRICTIONS,
			res: *[*]restriction, len(res): uint)) {
		err: rt::errno => errors::errno(err),
		int => void,
	case err: rt::errno =>
		return errors::errno(err);
	case int => void;
	};
};
use errors;
use rt;

// Sets up an io_uring. The params parameter must be initialized with the
// desired flags, sq_thread_cpu, and sq_thread_idle parameters; the remaining
// fields are initialized by the kernel.
export fn setup(entries: u32, params: *params) (io_uring | error) = {
	const fd = match (rt::io_uring_setup(entries, params)) {
		err: rt::errno => return errors::errno(err),
		fd: int => fd,
	case err: rt::errno =>
		return errors::errno(err);
	case fd: int =>
		yield fd;
	};

	let uring = io_uring {
		sq = sq { ... },
		cq = cq { ... },
		fd = fd,
		flags = params.flags,
		features = params.features,
	};
	let sq = &uring.sq, cq = &uring.cq;

	sq.ring_sz = params.sq_off.array + params.sq_entries * size(uint);
	cq.ring_sz = params.cq_off.cqes + params.cq_entries * size(cqe);

	if (uring.features & features::SINGLE_MMAP == features::SINGLE_MMAP) {
		if (cq.ring_sz > sq.ring_sz) {
			sq.ring_sz = cq.ring_sz;
		};
		cq.ring_sz = sq.ring_sz;
	};

	sq.ring_ptr = match (rt::mmap(null,
			params.sq_off.array + entries * size(u32),
			rt::PROT_READ | rt::PROT_WRITE,
			rt::MAP_SHARED | rt::MAP_POPULATE,
			fd, OFF_SQ_RING)) {
		err: rt::errno => return errors::errno(err),
		ptr: *void => ptr,
	case err: rt::errno =>
		return errors::errno(err);
	case ptr: *void =>
		yield ptr;
	};

	cq.ring_ptr = if (uring.features & features::SINGLE_MMAP == features::SINGLE_MMAP) {
		yield sq.ring_ptr;
	} else match (rt::mmap(null, cq.ring_sz,
			rt::PROT_READ | rt::PROT_WRITE,
			rt::MAP_SHARED | rt::MAP_POPULATE,
			fd, OFF_CQ_RING)) {
		err: rt::errno => return errors::errno(err),
		ptr: *void => ptr,
	case err: rt::errno =>
		return errors::errno(err);
	case ptr: *void =>
		yield ptr;
	};

	const ring_ptr = sq.ring_ptr: uintptr;
	sq.khead = (ring_ptr + params.sq_off.head: uintptr): *uint;
	sq.ktail = (ring_ptr + params.sq_off.tail: uintptr): *uint;
	sq.kring_mask = (ring_ptr + params.sq_off.ring_mask: uintptr): *uint;
	sq.kring_entries = (ring_ptr + params.sq_off.ring_entries: uintptr): *uint;
	sq.kflags = (ring_ptr + params.sq_off.flags: uintptr): *sqring_flags;
	sq.kdropped = (ring_ptr + params.sq_off.dropped: uintptr): *uint;
	sq.array = (ring_ptr + params.sq_off.array: uintptr): *[*]uint;
	sq.sqes = match (rt::mmap(null,
			params.sq_entries * size(sqe),
			rt::PROT_READ | rt::PROT_WRITE,
			rt::MAP_SHARED | rt::MAP_POPULATE,
			fd, OFF_SQES)) {
		err: rt::errno => return errors::errno(err),
		ptr: *void => ptr: *[*]sqe,
	case err: rt::errno =>
		return errors::errno(err);
	case ptr: *void =>
		yield ptr: *[*]sqe;
	};

	const ring_ptr = cq.ring_ptr: uintptr;
	cq.khead = (ring_ptr + params.cq_off.head: uintptr): *uint;
	cq.ktail = (ring_ptr + params.cq_off.tail: uintptr): *uint;
	cq.kring_mask = (ring_ptr + params.cq_off.ring_mask: uintptr): *uint;
	cq.kring_entries = (ring_ptr + params.cq_off.ring_entries: uintptr): *uint;
	cq.koverflow = (ring_ptr + params.cq_off.overflow: uintptr): *uint;
	cq.cqes = (ring_ptr + params.cq_off.cqes: uintptr): *[*]cqe;

	if (params.cq_off.flags != 0) {
		cq.kflags = (ring_ptr + params.cq_off.flags: uintptr): *cqring_flags;
	};

	return uring;
};

// Frees state associated with an [[io_uring]].
export fn finish(ring: *io_uring) void = {
	let sq = &ring.sq, cq = &ring.cq;
	rt::munmap(sq.ring_ptr, sq.ring_sz): void;
	if (cq.ring_ptr != null: *void && cq.ring_ptr != sq.ring_ptr) {
		rt::munmap(cq.ring_ptr, cq.ring_sz): void;
	};
	rt::close(ring.fd): void;
};