Lindenii Project Forge
Login

hare-aio

Asynchronous I/O event loops for Hare

Hi… I am well aware that this diff view is very suboptimal. It will be fixed when the refactored server comes along!

Commit info
ID
b324128783498660b663b12678ba9c34b2f72674
Author
Drew DeVault <sir@cmpwn.com>
Author date
Thu, 21 Oct 2021 10:33:48 +0200
Committer
Drew DeVault <sir@cmpwn.com>
Committer date
Fri, 22 Oct 2021 08:35:21 +0200
Actions
iobus: buffer pool management

Signed-off-by: Drew DeVault <sir@cmpwn.com>
use errors;
use rt;

// Advances the completion queue by N items.
export fn cq_advance(ring: *io_uring, n: uint) void = {
	*ring.cq.khead = *ring.cq.khead + n;
};

// Call after processing a [[cqe]]. The cqe is returned to the pool and cannot
// be used by the application again.
export fn cqe_seen(ring: *io_uring, cqe: *cqe) void = cq_advance(ring, 1);

// Waits until a CQE is available, then returns it. The caller must pass the
// returned CQE to [[cqe_seen]] to advance the queue.
export fn wait(ring: *io_uring) (*cqe | error) = {
	match (get_cqe(ring, 0, 1)) {
	case err: error =>
		return err;
	case cq: nullable *cqe =>
		assert(cq != null); // XXX: Correct?
		return cq: *cqe;
	};
};

// Peeks the next CQE from the queue and returns it, or null if none are
// pending. The caller must pass the returned CQE to [[cqe_seen]] to advance the
// queue.
export fn peek(ring: *io_uring) (nullable *cqe | error) = get_cqe(ring, 0, 0);

// Returns the result of a [[cqe]], or an error if unsuccessful.
export fn result(cqe: *cqe) (int | error) =
	if (cqe.res < 0) errors::errno(rt::wrap_errno(-cqe.res))
	else cqe.res;

// Gets the user data field of a [[cqe]]. See [[set_user]] for the corresponding
// SQE function.
export fn get_user(cqe: *cqe) nullable *void =
	cqe.user_data: uintptr: nullable *void;

// Returns the buffer ID used for this [[cqe]] in combination with
// [[set_buffer_select]]. Aborts the program if this CQE was not configured to
// use a buffer pool.
export fn get_buffer_id(cqe: *cqe) u16 = {
	// TODO: Handle ENOBUFS
	assert(cqe.flags & cqe_flags::F_BUFFER > 0,
		"get_buffer_id called for CQE without buffer");
	return (cqe.flags: u32 >> CQE_BUFFER_SHIFT): u16;
};

fn peek_cqe(ring: *io_uring) (nullable *cqe, uint) = {
	let head = *ring.cq.khead;
	let tail = *ring.cq.ktail;
	let mask = *ring.cq.kring_mask;
	let avail = tail - head;
	if (avail == 0) {
		return (null, 0);
	};
	return (&ring.cq.cqes[head & mask], avail);
};

fn get_cqe(
	ring: *io_uring,
	submit: uint,
	wait: uint,
) (nullable *cqe | error) = {
	let cq: nullable *cqe = null;
	for (cq == null) {
		let enter = false, overflow = false;
		let flags = enter_flags::NONE;

		// TODO: tuple destructuring
		let tup = peek_cqe(ring);
		let avail = tup.1;
		cq = tup.0;

		if (cq == null && wait == 0 && submit == 0) {
			if (!needs_flush(ring)) {
				return null;
			};
			overflow = true;
		};
		if (wait > avail || overflow) {
			flags |= enter_flags::GETEVENTS;
			enter = true;
		};
		if (submit > 0) {
			needs_enter(ring, &flags);
			enter = true;
		};
		if (!enter) {
			break;
		};

		match (rt::io_uring_enter(ring.fd,
			submit, wait, flags: uint, null)) {
		case err: rt::errno =>
			return errors::errno(err);
		case n: uint =>
			submit -= n;
		};
	};
	return cq;
};
use endian;
use rt;
use types;
use fmt;

fn prep(sq: *sqe, op: op, flags: flags...) void = {
	rt::memset(sq, 0, size(sqe));
	sq.opcode = op;
	for (let i = 0z; i < len(flags); i += 1) {
		sq.flags |= flags[i];
	};
};

fn preprw(
	sqe: *sqe,
	op: op,
	fd: int,
	addr: nullable *void,
	length: uint,
	offs: u64,
	flags: flags...
) void = {
	prep(sqe, op, flags...);
	sqe.fd = fd;
	sqe.addr = addr;
	sqe.length = length;
	sqe.off = offs;
};

// Sets the user data field of an [[sqe]]. This is copied to the [[cqe]] and can
// be used to correlate a completion event with the original SQE.
export fn set_user(sqe: *sqe, user_data: *void) void = {
	static assert(size(uintptr) <= size(u64));
	sqe.user_data = user_data: uintptr: u64;
};

// Sets the BUFFER_SELECT flag and sets the desired buffer group. See
// [[provide_buffers]] for configuring buffer groups, and [[get_buffer_id]] to
// retrieve the buffer used from the corresponding [[cqe]].
export fn set_buffer_select(sqe: *sqe, group: u16) void = {
	sqe.flags |= flags::BUFFER_SELECT;
	sqe.buf_group = group;
	sqe.extras.buffers.buf_group = group;
};

// Prepares a no-op "operation" for an [[sqe]].
export fn nop(sqe: *sqe, flags: flags...) void = {
	prep(sqe, op::NOP, flags...);
};

// Prepares a vectored read operation for an [[sqe]].
export fn readv(
	sqe: *sqe,
	fd: int,
	iov: []rt::iovec,
	offs: size,
	flags: flags...
) void = {
	preprw(sqe, op::READV, fd,
		iov: *[*]rt::iovec, len(iov): uint, offs, flags...);
};

// Prepares a vectored write operation for an [[sqe]].
export fn writev(
	sqe: *sqe,
	fd: int,
	iov: []rt::iovec,
	offs: size,
	flags: flags...
) void = {
	preprw(sqe, op::WRITEV, fd,
		iov: *[*]rt::iovec, len(iov): uint, offs, flags...);
};

// Prepares a read operation for an [[sqe]].
export fn read(
	sqe: *sqe,
	fd: int,
	buf: *void,
	count: size,
	offs: u64,
	flags: flags...
) void = {
	assert(count <= types::U32_MAX);
	preprw(sqe, op::READ, fd, buf, count: u32, offs, flags...);
};

// Prepares a write operation for an [[sqe]].
export fn write(
	sqe: *sqe,
	fd: int,
	buf: *void,
	count: size,
	offs: u64,
	flags: flags...
) void = {
	assert(count <= types::U32_MAX);
	preprw(sqe, op::WRITE, fd, buf, count: u32, offs, flags...);
};

// Prepares a read for a fixed buffer previously registered with
// [[register_buffers]]. The buf and count parameters must refer to an address
// which falls within the buffer referenced by the index parameter.
export fn read_fixed(
	sqe: *sqe,
	fd: int,
	buf: *void,
	count: size,
	index: u16,
	flags: flags...
) void = {
	assert(count <= types::U32_MAX);
	preprw(sqe, op::READ_FIXED, fd, buf, count: u32, 0, flags...);
	sqe.buf_index = index;
	sqe.extras.buffers.buf_index = index;
};

// Prepares a write for a fixed buffer previously registered with
// [[register_buffers]]. The buf and count parameters must refer to an address
// which falls within the buffer referenced by the index parameter.
export fn write_fixed(
	sqe: *sqe,
	fd: int,
	buf: *void,
	count: size,
	index: u16,
	flags: flags...
) void = {
	assert(count <= types::U32_MAX);
	preprw(sqe, op::WRITE_FIXED, fd, buf, count: u32, 0, flags...);
	sqe.buf_index = index;
	sqe.extras.buffers.buf_index = index;
};

// Prepares an fsync operation for an [[sqe]]. Note that operations are executed
// in parallel and not are completed in submission order, so an fsync submitted
// after a write may not cause the write to be accounted for by the fsync unless
// [[flags::IO_LINK]] is used.
export fn fsync(
	sqe: *sqe,
	fd: int,
	fsync_flags: fsync_flags,
	flags: flags...
) void = {
	preprw(sqe, op::FSYNC, fd, null, 0, 0, flags...);
	sqe.fsync_flags = fsync_flags;
};

// Adds a request to poll a file descriptor for the given set of poll events.
// This will only happen once, the poll request must be submitted with a new SQE
// to re-poll the file descriptor later. The caller must call [[setuser]] to
// provide a user data field in order to use [[poll_remove]] to remove this poll
// request later.
export fn poll_add(
	sqe: *sqe,
	fd: int,
	poll_mask: uint,
	flags: flags...
) void = {
	preprw(sqe, op::POLL_ADD, fd, null, 0, 0, flags...);
	assert(endian::host == &endian::little); // TODO?
	sqe.poll32_events = poll_mask: u32;
};

// Removes an existing poll request by matching the SQE's user_data field. See
// [[setuser]].
export fn poll_remove(sqe: *sqe, user_data: *void, flags: flags...) void = {
	preprw(sqe, op::POLL_REMOVE, -1, null, 0, 0, flags...);
	set_user(sqe, user_data);
};

// Prepares a sendmsg operation for an [[sqe]], equivalent to the sendmsg(2)
// system call.
export fn sendmsg(
	sqe: *sqe,
	fd: int,
	msghdr: *rt::msghdr,
	sendmsg_flags: int,
	flags: flags...
) void = {
	preprw(sqe, op::SENDMSG, fd, msghdr, 0, 0, flags...);
	sqe.msg_flags = sendmsg_flags;
};

// Prepares a recvmsg operation for an [[sqe]], equivalent to the recvmsg(2)
// system call.
export fn recvmsg(
	sqe: *sqe,
	fd: int,
	msghdr: *rt::msghdr,
	recvmsg_flags: int,
	flags: flags...
) void = {
	preprw(sqe, op::RECVMSG, fd, msghdr, 0, 0, flags...);
	sqe.msg_flags = recvmsg_flags;
};

// Prepares a send operation for an [[sqe]], equivalent to the send(2) system
// call.
export fn send(
	sqe: *sqe,
	fd: int,
	buf: *void,
	count: size,
	send_flags: int,
	flags: flags...
) void = {
	assert(count <= types::U32_MAX);
	preprw(sqe, op::SEND, fd, buf, count: u32, 0, flags...);
	sqe.msg_flags = send_flags;
};

// Prepares a recv operation for an [[sqe]], equivalent to the recv(2) system
// call.
export fn recv(
	sqe: *sqe,
	fd: int,
	buf: *void,
	count: size,
	recv_flags: int,
	flags: flags...
) void = {
	assert(count <= types::U32_MAX);
	preprw(sqe, op::RECV, fd, buf, count: u32, 0, flags...);
	sqe.msg_flags = recv_flags;
};

// Prepares a timeout operation for an [[sqe]]. "ts" should be a timespec
// describing the desired timeout, and "events" may optionally be used to define
// a number of completion events to wake after (or zero to wake only after the
// timeout expires). The caller must call [[setuser]] to provide a user data
// field in order to use [[timeout_remove]] to cancel this timeout later.
export fn timeout(
	sqe: *sqe,
	ts: *rt::timespec,
	events: uint,
	to_flags: timeout_flags,
	flags: flags...
) void = {
	preprw(sqe, op::TIMEOUT, 0, ts, 1, events, flags...);
	sqe.timeout_flags = to_flags;
};

// Removes an existing timeout request by matching the SQE's user_data field.
// See [[setuser]].
export fn timeout_remove(
	sqe: *sqe,
	user_data: *void,
	to_flags: timeout_flags,
	flags: flags...
) void = {
	preprw(sqe, op::TIMEOUT_REMOVE, 0, user_data, 0, 0, flags...);
	sqe.timeout_flags = to_flags;
};

// Updates an existing timeout request by matching the SQE's user_data field.
// See [[setuser]].
export fn timeout_update(
	sqe: *sqe,
	user_data: *void,
	ts: *rt::timespec,
	events: uint,
	to_flags: timeout_flags,
	flags: flags...
) void = {
	preprw(sqe, op::TIMEOUT_REMOVE, 0, user_data, 0, events, flags...);
	sqe.timeout_flags = to_flags | timeout_flags::UPDATE;
	sqe.addr2 = ts;
};

// Prepares a timeout operation for an [[sqe]] which is linked to the previous
// SQE, effectively setting an upper limit on how long that SQE can take to
// complete. "ts" should be a timespec describing the desired timeout. The
// caller must call [[setuser]] to provide a user data field in order to use
// [[timeout_remove]] to cancel this timeout later.
export fn link_timeout(
	sqe: *sqe,
	ts: *rt::timespec,
	to_flags: timeout_flags,
	flags: flags...
) void = {
	preprw(sqe, op::LINK_TIMEOUT, 0, ts, 1, 0, flags...);
	sqe.timeout_flags = to_flags;
};

// Prepares a socket accept operation for an [[sqe]]. Equivalent to accept4(2).
export fn accept(
	sqe: *sqe,
	fd: int,
	addr: nullable *rt::sockaddr,
	addrlen: nullable *uint,
	aflags: uint,
	flags: flags...
) void = {
	preprw(sqe, op::ACCEPT, fd, addr, 0, 0, flags...);
	sqe.accept_flags = aflags;
	sqe.addr2 = addrlen;
};

// Prepares an [[sqe]] operation which opens a file. The path must be a C
// string, i.e. NUL terminated; see [[strings::to_c]].
export fn openat(
	sqe: *sqe,
	dirfd: int,
	path: *const char,
	oflags: int,
	mode: uint,
	flags: flags...
) void = {
	preprw(sqe, op::OPENAT, dirfd, path, mode, 0, flags...);
	sqe.open_flags = oflags: u32;
};

// Prepares an [[sqe]] operation which closes a file descriptor.
export fn close(sqe: *sqe, fd: int, flags: flags...) void = {
	preprw(sqe, op::CLOSE, fd, null, 0, 0, flags...);
};

// Prepares an [[sqe]] operation which provides a buffer pool to the kernel.
// len(pool) must be equal to nbuf * bufsz. See [[set_buffer_select]] to use
// the buffer pool for a subsequent request.
export fn provide_buffers(
	sqe: *sqe,
	group: u16,
	pool: []u8,
	nbuf: size,
	bufsz: size,
	bufid: u16,
	flags: flags...
) void = {
	assert(len(pool) == nbuf * bufsz);
	preprw(sqe, op::PROVIDE_BUFFERS, nbuf: int, pool: *[*]u8,
		bufsz: uint, bufid: uint, flags...);
	sqe.buf_group = group;
	sqe.extras.buffers.buf_group = group;
};

// Removes buffers previously registered with [[provide_buffers]].
export fn remove_buffers(
	sqe: *sqe,
	nbuf: size,
	group: u16,
	flags: flags...
) void = {
	preprw(sqe, op::REMOVE_BUFFERS, nbuf: int, null, 0, 0, flags...);
	sqe.buf_group = group;
	sqe.extras.buffers.buf_group = group;
};
use errors;

// Returned when buffer pool use was configured for an [[sqe]], but there are no
// buffers available.
export type nobuffers = !void;

// All errors which may be returned by this module.
export type error = !(errors::error | nobuffers);

// Converts an [[error]] into a human-readable string.
export fn strerror(err: error) const str = {
	match (err) {
	case nobuffers =>
		return "Buffer pool exhausted";
	case err: errors::error =>
		return errors::strerror(err);
	};
};

// The maximum value for the first parameter of [[setup]].
export def MAX_ENTRIES: uint = 4096;

def CQE_BUFFER_SHIFT: u32 = 16;
def OFF_SQ_RING: u64 = 0;
def OFF_CQ_RING: u64 = 0x8000000;
def OFF_SQES: u64 = 0x10000000;

// An io_uring [[sqe]] operation.
export type op = enum u8 {
	NOP,
	READV,
	WRITEV,
	FSYNC,
	READ_FIXED,
	WRITE_FIXED,
	POLL_ADD,
	POLL_REMOVE,
	SYNC_FILE_RANGE,
	SENDMSG,
	RECVMSG,
	TIMEOUT,
	TIMEOUT_REMOVE,
	ACCEPT,
	ASYNC_CANCEL,
	LINK_TIMEOUT,
	CONNECT,
	FALLOCATE,
	OPENAT,
	CLOSE,
	FILES_UPDATE,
	STATX,
	READ,
	WRITE,
	FADVISE,
	MADVISE,
	SEND,
	RECV,
	OPENAT2,
	EPOLL_CTL,
	SPLICE,
	PROVIDE_BUFFERS,
	REMOVE_BUFFERS,
	TEE,
};

// Flags for an [[sqe]].
export type flags = enum u8 {
	NONE = 0,
	// Use fixed fileset
	FIXED_FILE = 1 << 0,
	// Issue after inflight IO
	IO_DRAIN = 1 << 1,
	// Links next sqe
	IO_LINK = 1 << 2,
	// Like LINK, but stronger
	IO_HARDLINK = 1 << 3,
	// Always go async
	ASYNC = 1 << 4,
	// Select buffer from sqe.buf_group
	BUFFER_SELECT = 1 << 5,
};

// Flags for an fsync operation.
export type fsync_flags = enum u32 {
	NONE = 0,
	DATASYNC = 1 << 0,
};

// Flags for a timeout operation.
export type timeout_flags = enum u32 {
	NONE = 0,
	// If set, the timeout will be "absolute", waiting until CLOCK_MONOTONIC
	// reaches the time defined by the timespec. If unset, it will be
	// interpted as a duration relative to the I/O submission.
	ABS = 1 << 0,
	// When combined with [[op::TIMEOUT_REMOVE]], causes the submission to
	// update the timer rather than remove it.
	UPDATE = 1 << 1,
};

// Flags for a splice operation.
export type splice_flags = enum u32 {
	NONE = 0,
	F_FD_IN_FIXED = 1 << 31,
};

// Flags for a [[cqe]].
export type cqe_flags = enum u32 {
	NONE = 0,
	F_BUFFER = 1 << 0,
	F_MORE = 1 << 1,
};

// A submission queue entry.
export type sqe = struct {
	opcode: op,
	flags: flags,
	ioprio: u16,
	fd: i32,
	union {
		off: u64,
		addr2: nullable *void,
	},
	union {
		addr: nullable *void,
		splice_off_in: u64,
	},
	length: u32,
	union {
		rw_flags: int,
		fsync_flags: fsync_flags,
		poll_events: u16,
		poll32_events: u32,
		sync_range_flags: u32,
		msg_flags: int,
		timeout_flags: timeout_flags,
		accept_flags: u32,
		cancel_flags: u32,
		open_flags: u32,
		statx_flags: u32,
		fadvise_advice: u32,
		splice_flags: splice_flags,
	},
	user_data: u64,
	// TODO: Remove the names on these embedded types
	// See https://todo.sr.ht/~sircmpwn/hare/493
	union {
		struct {
			union {
		extras: struct {
			buffers: union {
				buf_index: u16,
				buf_group: u16,
			},
			personality: u16,
			splice_fd_in: i32,
		},
		pad2: [3]u64,
	},
};

// A completion queue entry.
export type cqe = struct {
	user_data: u64,
	res: i32,
	flags: cqe_flags,
};

// Filled with the offset for mmap(2)
export type sqring_offsets = struct {
	head: u32,
	tail: u32,
	ring_mask: u32,
	ring_entries: u32,
	flags: u32,
	dropped: u32,
	array: u32,
	resv1: u32,
	resv2: u64,
};

// Flags for the sq ring.
export type sqring_flags = enum u32 {
	NONE = 0,
	// Needs io_uring_enter wakeup
	NEED_WAKEUP = 1 << 0,
	// CQ ring is overflown
	CQ_OVERFLOW = 1 << 1,
};

// Filled with the offset for mmap(2)
export type cqring_offsets = struct {
	head: u32,
	tail: u32,
	ring_mask: u32,
	ring_entries: u32,
	overflow: u32,
	cqes: u32,
	flags: u32,
	resv1: u32,
	resv2: u64,
};

// Flags for the cq ring.
export type cqring_flags = enum u32 {
	NONE = 0,
	EVENTFD_DISABLED = 1 << 0,
};

// Flags for [[setup]].
export type setup_flags = enum u32 {
	NONE = 0,
	// io_context is polled
	IOPOLL = 1 << 0,
	// SQ poll thread
	SQPOLL = 1 << 1,
	// sq_thread_cpu is valid
	SQ_AFF = 1 << 2,
	// App defines CQ size
	CQSIZE = 1 << 3,
	// Clamp SQ/CQ ring sizes
	CLAMP = 1 << 4,
	// Attach to existing wq
	ATTACH_WQ = 1 << 5,
	// Start with ring disabled
	R_DISABLED = 1 << 6,
};

// Parameters for [[setup]]. Partially completed by the kernel.
export type params = struct {
	sq_entries: u32,
	cq_entries: u32,
	flags: setup_flags,
	sq_thread_cpu: u32,
	sq_thread_idle: u32,
	features: features,
	wq_fd: u32,
	resv: [3]u32,
	sq_off: sqring_offsets,
	cq_off: cqring_offsets,
};

// Features supported by the kernel.
export type features = enum u32 {
	NONE = 0,
	SINGLE_MMAP = 1 << 0,
	NODROP = 1 << 1,
	SUBMIT_STABLE = 1 << 2,
	RW_CUR_POS = 1 << 3,
	CUR_PERSONALITY = 1 << 4,
	FAST_POLL = 1 << 5,
	POLL_32BITS = 1 << 6,
};

// Flags for [[enter]].
export type enter_flags = enum uint {
	NONE = 0,
	GETEVENTS = 1 << 0,
	SQ_WAKEUP = 1 << 1,
	SQ_WAIT = 1 << 2,
};

// Operations for [[register]].
export type regop = enum uint {
	REGISTER_BUFFERS,
	UNREGISTER_BUFFERS,
	REGISTER_FILES,
	UNREGISTER_FILES,
	REGISTER_EVENTFD,
	UNREGISTER_EVENTFD,
	REGISTER_FILES_UPDATE,
	REGISTER_EVENTFD_ASYNC,
	REGISTER_PROBE,
	REGISTER_PERSONALITY,
	UNREGISTER_PERSONALITY,
	REGISTER_RESTRICTIONS,
	REGISTER_ENABLE_RINGS,
};

// Information for a REGISTER_FILES_UPDATE operation.
export type files_update = struct {
	offs: u32,
	resv: u32,
	fds: *int,
};

// Flags for a probe operation.
export type op_flags = enum u16 {
	NONE = 0,
	SUPPORTED = 1 << 0,
};

// REGISTER_PROBE operation details.
export type probe_op = struct {
	op: u8,
	resv: u8,
	flags: op_flags,
	resv2: u32,
};

// Summary of REGISTER_PROBE results.
export type probe = struct {
	last_op: u8,
	ops_len: u8,
	resv: u16,
	resv2: [3]u32,
	ops: [*]probe_op,
};

// Details for a REGISTER_RESTRICTIONS operation.
export type restriction = struct {
	opcode: resop,
	union {
		register_op: regop,
		sqe_op: op,
		flags: flags,
	},
	resv: u8,
	resv2: [3]u32,
};

// Opcode for a [[restriction]].
export type resop = enum u16 {
	NONE = 0,
	// Allow an io_uring_register(2) opcode
	REGISTER_OP = 0,
	// Allow an sqe opcode
	SQE_OP = 1,
	// Allow sqe flags
	SQE_FLAGS_ALLOWED = 2,
	// Require sqe flags (these flags must be set on each submission)
	SQE_FLAGS_REQUIRED = 3,
};

// State for an io_uring.
export type io_uring = struct {
	sq: sq,
	cq: cq,
	fd: int,
	flags: setup_flags,
	features: features,
};

// Submission queue state.
export type sq = struct {
	khead: *uint,
	ktail: *uint,
	kring_mask: *uint,
	kring_entries: *uint,
	kflags: *sqring_flags,
	kdropped: *uint,
	array: *[*]uint,
	sqes: *[*]sqe,
	sqe_head: uint,
	sqe_tail: uint,
	ring_sz: size,
	ring_ptr: *void,
};

// Completion queue state.
export type cq = struct {
	khead: *uint,
	ktail: *uint,
	kring_mask: *uint,
	kring_entries: *uint,
	kflags: *cqring_flags,
	koverflow: *uint,
	cqes: *[*]cqe,
	ring_sz: size,
	ring_ptr: *void,
};