Lindenii Project Forge
Login

hare-aio

Asynchronous I/O event loops for Hare

Hi… I am well aware that this diff view is very suboptimal. It will be fixed when the refactored server comes along!

Commit info
ID
0072ed8224c9fa4d1353abf138c6ef6241c9bcd7
Author
Drew DeVault <sir@cmpwn.com>
Author date
Mon, 17 May 2021 16:06:31 -0400
Committer
Drew DeVault <sir@cmpwn.com>
Committer date
Mon, 17 May 2021 16:06:31 -0400
Actions
linux::io_uring: implement io_uring_register
use errors;
use rt;
use types;

// Registers a set of fixed buffers with an [[io_uring]]. Note that you must
// call [[unregister_buffers]] before registering a new set of buffers (even if
// some of them have similar addresses to the old set).
export fn register_buffers(ring: *io_uring, iov: []rt::iovec) (void | error) = {
	assert(len(iov) <= types::UINT_MAX);
	return match (rt::io_uring_register(ring.fd, regop::REGISTER_BUFFERS,
			iov: *[*]rt::iovec, len(iov): uint)) {
		err: rt::errno => errors::errno(err),
		int => void,
	};
};

// Unregisters all fixed buffers associated with an [[io_uring]].
export fn unregister_buffers(ring: *io_uring) (void | error) = {
	return match (rt::io_uring_register(ring.fd,
			regop::UNREGISTER_BUFFERS, null, 0)) {
		err: rt::errno => errors::errno(err),
		int => void,
	};
};

// Registers a set of file descriptors with an [[io_uring]]. The set of files
// may be sparse, meaning that some are set to -1, to be updated later using
// [[register_files_update]].
export fn register_files(ring: *io_uring, files: []int) (void | error) = {
	assert(len(files) <= types::UINT_MAX);
	return match (rt::io_uring_register(ring.fd, regop::REGISTER_FILES,
			files: *[*]int, len(files): uint)) {
		err: rt::errno => errors::errno(err),
		int => void,
	};
};

// Applies a set of [[files_update]]s to the set of files registered with an
// [[io_uring]].
export fn register_files_update(
	ring: *io_uring,
	updates: []files_update,
) (void | error) = {
	assert(len(updates) <= types::UINT_MAX);
	return match (rt::io_uring_register(ring.fd, regop::REGISTER_FILES_UPDATE,
			updates: *[*]files_update, len(updates): uint)) {
		err: rt::errno => errors::errno(err),
		int => void,
	};
};

// Unregisters all files associated with an [[io_uring]].
export fn unregister_files(ring: *io_uring) (void | error) = {
	return match (rt::io_uring_register(ring.fd,
			regop::UNREGISTER_FILES, null, 0)) {
		err: rt::errno => errors::errno(err),
		int => void,
	};
};

// Registers an eventfd(2) with this [[io_uring]] to be notified of completion
// events.
export fn register_eventfd(ring: *io_uring, fd: int) (void | error) = {
	return match (rt::io_uring_register(ring.fd,
			regop::REGISTER_EVENTFD, &fd, 1)) {
		err: rt::errno => errors::errno(err),
		int => void,
	};
};

// Similar to [[register_eventfd]], but only notifies of events which complet
// asyncronously.
export fn register_eventfd_async(ring: *io_uring, fd: int) (void | error) = {
	return match (rt::io_uring_register(ring.fd,
			regop::REGISTER_EVENTFD_ASYNC, &fd, 1)) {
		err: rt::errno => errors::errno(err),
		int => void,
	};
};

// Unregisters the eventfd(2) associated with this [[io_uring]].
export fn unregister_eventfd(ring: *io_uring) (void | error) = {
	return match (rt::io_uring_register(ring.fd,
			regop::UNREGISTER_EVENTFD, null, 0)) {
		err: rt::errno => errors::errno(err),
		int => void,
	};
};

// XXX: This interface is pretty bad. It would be nice to improve on it
// a bit before making it part of the API.
//export fn register_probe(ring: *io_uring, out: *probe, nops: size) void = {
//	assert(nops * size(probe_op) <= types::UINT_MAX);
//	match (rt::io_uring_register(ring.fd, regop::REGISTER_PROBE,
//			out, (nops * size(probe)): uint)) {
//		rt::errno => abort("Unexpected io_uring REGISTER_PROBE error"),
//		void => void,
//	};
//};

// Registers the current process's credentials as a personality with an
// [[io_uring]], returning an ID. Set the personality field of an [[sqe]] to use
// that personality for an I/O submission.
export fn register_personality(ring: *io_uring) int = {
	return match (rt::io_uring_register(ring.fd,
			regop::REGISTER_PERSONALITY, null, 0)) {
		rt::errno => abort("Unexpected io_uring REGISTER_PERSONALITY error"),
		i: int => i,
	};
};

// Unregisters a personality previously configured with
// [[register_personality]].
export fn unregister_personality(ring: *io_uring, id: int) (void | error) = {
	return match (rt::io_uring_register(ring.fd,
			regop::UNREGISTER_PERSONALITY, null, id: uint)) {
		err: rt::errno => errors::errno(err),
		int => void,
	};
};

// Enables submissions for an [[io_uring]] which was started in the disabled
// state via [[setup_flags::R_DISABLED]]. Future access to this io_uring is
// subject to any configured restrictions.
export fn register_enable_rings(ring: *io_uring) (void | error) = {
	return match (rt::io_uring_register(ring.fd,
			regop::REGISTER_ENABLE_RINGS, null, 0)) {
		err: rt::errno => errors::errno(err),
		int => void,
	};
};

// Registers a restriction for this [[io_uring]], limiting the kinds of future
// registrations and I/O submissions which are permitted for it. This is only
// accepted if the [[io_uring]] was set up in a disabled state via
// [[setup_flags::R_DISABLED]].
export fn register_restrictions(ring: *io_uring, res: []restriction) (void | error) = {
	assert(len(res) < types::UINT_MAX);
	return match (rt::io_uring_register(ring.fd, regop::REGISTER_RESTRICTIONS,
			res: *[*]restriction, len(res): uint)) {
		err: rt::errno => errors::errno(err),
		int => void,
	};
};
// TODO:
// - Interface for buffer registration
use rt;
use types;

fn prep(sq: *sqe, op: op, flags: sqe_flags...) void = {
	// XXX: Is this compatible with the spec?
	*sq = sqe { opcode = op, ... };
	for (let i = 0z; i < len(flags); i += 1) {
		sq.flags |= flags[i];
	};
};

fn preprw(
	sqe: *sqe,
	op: op,
	fd: int,
	addr: nullable *void,
	length: uint,
	offs: u64,
	flags: sqe_flags...
) void = {
	prep(sqe, op, flags...);
	sqe.fd = fd;
	sqe.addr = addr;
	sqe.length = length;
	sqe.off = offs;
};

// Prepares a no-op "operation" for an [[sqe]].
export fn nop(sqe: *sqe, flags: sqe_flags...) void = {
	prep(sqe, op::NOP, flags...);
};

// Prepares a vectored read operation for an [[sqe]].
export fn readv(
	sqe: *sqe,
	fd: int,
	iov: []rt::iovec,
	offs: size,
) void = {
	preprw(sqe, op::READV, fd, iov: *[*]rt::iovec, len(iov): uint, offs);
};

// Prepares a vectored write operation for an [[sqe]].
export fn writev(
	sqe: *sqe,
	fd: int,
	iov: []rt::iovec,
	offs: size,
) void = {
	preprw(sqe, op::WRITEV, fd, iov: *[*]rt::iovec, len(iov): uint, offs);
};

// Prepares a read operation for an [[sqe]].
export fn read(
	sqe: *sqe,
	fd: int,
	buf: *void,
	count: size,
	flags: sqe_flags...,
) void = {
	assert(count <= types::U32_MAX);
	preprw(sqe, op::READ, fd, buf, count: u32, 0, flags...);
};

// Prepares a write operation for an [[sqe]].
export fn write(
	sqe: *sqe,
	fd: int,
	buf: *void,
	count: size,
	flags: sqe_flags...,
) void = {
	assert(count <= types::U32_MAX);
	preprw(sqe, op::WRITE, fd, buf, count: u32, 0, flags...);
};
use errors;

// All errors which may be returned by this module.
export type error = errors::opaque;

// Converts an [[error]] into a human-readable string.
export fn strerror(err: error) const str = {
	return errors::strerror(err);
};

def CQE_BUFFER_SHIFT: uint = 16;
def OFF_SQ_RING: u64 = 0;
def OFF_CQ_RING: u64 = 0x8000000;
def OFF_SQES: u64 = 0x10000000;

// An io_uring [[sqe]] operation.
export type op = enum u8 {
	NOP,
	READV,
	WRITEV,
	FSYNC,
	READ_FIXED,
	WRITE_FIXED,
	POLL_ADD,
	POLL_REMOVE,
	SYNC_FILE_RANGE,
	SENDMSG,
	RECVMSG,
	TIMEOUT,
	TIMEOUT_REMOVE,
	ACCEPT,
	ASYNC_CANCEL,
	LINK_TIMEOUT,
	CONNECT,
	FALLOCATE,
	OPENAT,
	CLOSE,
	FILES_UPDATE,
	STATX,
	READ,
	WRITE,
	FADVISE,
	MADVISE,
	SEND,
	RECV,
	OPENAT2,
	EPOLL_CTL,
	SPLICE,
	PROVIDE_BUFFERS,
	REMOVE_BUFFERS,
	TEE,
};

// Flags for an [[sqe]].
export type sqe_flags = enum u8 {
	// Use fixed fileset
	FIXED_FILE = 1 << 0,
	// Issue after inflight IO
	IO_DRAIN = 1 << 1,
	// Links next sqe
	IO_LINK = 1 << 2,
	// Like LINK, but stronger
	IO_HARDLINK = 1 << 3,
	// Always go async
	ASYNC = 1 << 4,
	// Select buffer from sqe.buf_group
	BUFFER_SELECT = 1 << 5,
};

// Flags for an fsync operation.
export type fsync_flags = enum u32 {
	DATASYNC = 1 << 0,
};

// Flags for a timeout operation.
export type timeout_flags = enum u32 {
	ABS = 1 << 0,
};

// Flags for a splice operation.
export type splice_flags = enum u32 {
	F_FD_IN_FIXED = 1 << 31,
};

// Flags for a [[cqe]].
export type cqe_flags = enum u32 {
	F_BUFFER = 1 << 0,
};

// A submission queue entry.
export type sqe = struct {
	opcode: op,
	flags: sqe_flags,
	ioprio: u16,
	fd: i32,
	union {
		off: u64,
		addr2: nullable *void,
	},
	union {
		addr: nullable *void,
		splice_off_in: u64,
	},
	length: u32,
	union {
		rw_flags: int,
		fsync_flags: fsync_flags,
		poll_events: u16,
		poll32_events: u32,
		sync_range_flags: u32,
		msg_flags: u32,
		timeout_flags: timeout_flags,
		accept_flags: u32,
		cancel_flags: u32,
		open_flags: u32,
		statx_flags: u32,
		fadvise_advice: u32,
		splice_flags: splice_flags,
	},
	user_data: u64,
	union {
		struct {
			union {
				buf_index: u16,
				buf_group: u16,
			},
			personality: u16,
			splice_fd_in: i32,
		},
		pad2: [3]u64,
	},
};

// A completion queue entry.
export type cqe = struct {
	user_data: u64,
	res: i32,
	flags: cqe_flags,
};

// Filled with the offset for mmap(2)
export type sqring_offsets = struct {
	head: u32,
	tail: u32,
	ring_mask: u32,
	ring_entries: u32,
	flags: u32,
	dropped: u32,
	array: u32,
	resv1: u32,
	resv2: u64,
};

// Flags for the sq ring.
export type sqring_flags = enum u32 {
	// Needs io_uring_enter wakeup
	NEED_WAKEUP = 1 << 0,
	// CQ ring is overflown
	CQ_OVERFLOW = 1 << 1,
};

// Filled with the offset for mmap(2)
export type cqring_offsets = struct {
	head: u32,
	tail: u32,
	ring_mask: u32,
	ring_entries: u32,
	overflow: u32,
	cqes: u32,
	flags: u32,
	resv1: u32,
	resv2: u64,
};

// Flags for the cq ring.
export type cqring_flags = enum u32 {
	EVENTFD_DISABLED = 1 << 0,
};

// Flags for [[setup]].
export type setup_flags = enum u32 {
	// io_context is polled
	IOPOLL = 1 << 0,
	// SQ poll thread
	SQPOLL = 1 << 1,
	// sq_thread_cpu is valid
	SQ_AFF = 1 << 2,
	// App defines CQ size
	CQSIZE = 1 << 3,
	// Clamp SQ/CQ ring sizes
	CLAMP = 1 << 4,
	// Attach to existing wq
	ATTACH_WQ = 1 << 5,
	// Start with ring disabled
	R_DISABLED = 1 << 6,
};

// Parameters for [[setup]]. Partially completed by the kernel.
export type params = struct {
	sq_entries: u32,
	cq_entries: u32,
	flags: setup_flags,
	sq_thread_cpu: u32,
	sq_thread_idle: u32,
	features: features,
	wq_fd: u32,
	resv: [3]u32,
	sq_off: sqring_offsets,
	cq_off: cqring_offsets,
};

// Features supported by the kernel.
export type features = enum u32 {
	SINGLE_MMAP = 1 << 0,
	NODROP = 1 << 1,
	SUBMIT_STABLE = 1 << 2,
	RW_CUR_POS = 1 << 3,
	CUR_PERSONALITY = 1 << 4,
	FAST_POLL = 1 << 5,
	POLL_32BITS = 1 << 6,
};

// Flags for [[enter]].
export type enter_flags = enum uint {
	GETEVENTS = 1 << 0,
	SQ_WAKEUP = 1 << 1,
	SQ_WAIT = 1 << 2,
};

// Operations for [[register]].
export type regop = enum uint {
	REGISTER_BUFFERS,
	UNREGISTER_BUFFERS,
	REGISTER_FILES,
	UNREGISTER_FILES,
	REGISTER_EVENTFD,
	UNREGISTER_EVENTFD,
	REGISTER_FILES_UPDATE,
	REGISTER_EVENTFD_ASYNC,
	REGISTER_PROBE,
	REGISTER_PERSONALITY,
	UNREGISTER_PERSONALITY,
	REGISTER_RESTRICTIONS,
	REGISTER_ENABLE_RINGS,
};

// Information for a REGISTER_FILES_UPDATE operation.
export type files_update = struct {
	offs: u32,
	resv: u32,
	// XXX: i32 but aligned to u64
	fds: u64,
};

// Flags for a probe operation.
export type op_flags = enum u16 {
	SUPPORTED = 1 << 0,
};

// REGISTER_PROBE operation details.
export type probe_op = struct {
	op: u8,
	resv: u8,
	flags: op_flags,
	resv2: u32,
};

// Summary of REGISTER_PROBE results.
export type probe = struct {
	last_op: u8,
	ops_len: u8,
	resv: u16,
	resv2: [3]u32,
	ops: [*]probe_op,
};

// Details for a REGISTER_RESTRICTIONS operation.
export type restriction = struct {
	opcode: restriction_op,
	opcode: resop,
	union {
		register_op: u8,
		sqe_op: u8,
		sqe_flags: u8,
		register_op: regop,
		sqe_op: op,
		sqe_flags: sqe_flags,
	},
	resv: u8,
	resv2: [3]u32,
};

// Opcode for a [[restriction]].
export type restriction_op = enum u16 {
export type resop = enum u16 {
	// Allow an io_uring_register(2) opcode
	REGISTER_OP = 0,
	// Allow an sqe opcode
	SQE_OP = 1,
	// Allow sqe flags
	SQE_FLAGS_ALLOWED = 2,
	// Require sqe flags (these flags must be set on each submission)
	SQE_FLAGS_REQUIRED = 3,
};

// State for an io_uring.
export type io_uring = struct {
	sq: sq,
	cq: cq,
	fd: int,
	flags: setup_flags,
	features: features,
};

// Submission queue state.
export type sq = struct {
	khead: *uint,
	ktail: *uint,
	kring_mask: *uint,
	kring_entries: *uint,
	kflags: *sqring_flags,
	kdropped: *uint,
	array: *[*]uint,
	sqes: *[*]sqe,
	sqe_head: uint,
	sqe_tail: uint,
	ring_sz: size,
	ring_ptr: *void,
};

// Completion queue state.
export type cq = struct {
	khead: *uint,
	ktail: *uint,
	kring_mask: *uint,
	kring_entries: *uint,
	kflags: *cqring_flags,
	koverflow: *uint,
	cqes: *[*]cqe,
	ring_sz: size,
	ring_ptr: *void,
};