Lindenii Project Forge
Login

hare-aio

Asynchronous I/O event loops for Hare

Warning: Due to various recent migrations, viewing non-HEAD refs may be broken.

/linux/io_uring/sqe.ha (raw)

// SPDX-License-Identifier: MPL-2.0
// (c) 2021 Alexey Yerin <yyp@disroot.org>
// (c) 2021 Drew DeVault <sir@cmpwn.com>
// (c) 2022 Eyal Sawady <ecs@d2evs.net>
// (c) 2025 Runxi Yu <me@runxiyu.org>
use endian;
use rt;
use types;
use types::c;

fn prep(sq: *sqe, op: sqe_op, flags: sqe_flags...) void = {
	rt::memset(sq, 0, size(sqe));
	sq.opcode = op;
	for (let i = 0z; i < len(flags); i += 1) {
		sq.flags |= flags[i];
	};
};

fn preprw(
	sqe: *sqe,
	op: sqe_op,
	fd: int,
	addr: nullable *opaque,
	length: uint,
	offs: u64,
	flags: sqe_flags...
) void = {
	prep(sqe, op, flags...);
	sqe.fd = fd;
	sqe.addr = addr;
	sqe.length = length;
	sqe.off = offs;
};

// Sets the user data field of an [[sqe]]. This is copied to the [[cqe]] and can
// be used to correlate a completion event with the original SQE.
export fn sqe_set_data(sqe: *sqe, user_data: *opaque) void = {
	static assert(size(uintptr) <= size(u64));
	sqe.user_data = user_data: uintptr: u64;
};

// Sets the BUFFER_SELECT flag and sets the desired buffer group. See
// [[op_provide_buffers]] for configuring buffer groups, and [[cqe_get_buffer_id]] to
// retrieve the buffer used from the corresponding [[cqe]].
export fn sqe_set_buffer_select(sqe: *sqe, group: u16) void = {
	sqe.flags |= sqe_flags::BUFFER_SELECT;
	sqe.buf_group = group;
};

// Prepares a no-op "operation" for an [[sqe]].
export fn op_nop(sqe: *sqe, flags: sqe_flags...) void = {
	prep(sqe, sqe_op::NOP, flags...);
};

// Prepares a vectored read operation for an [[sqe]].
export fn op_readv(
	sqe: *sqe,
	fd: int,
	iov: []rt::iovec,
	offs: size,
	flags: sqe_flags...
) void = {
	preprw(sqe, sqe_op::READV, fd,
		iov: *[*]rt::iovec, len(iov): uint, offs, flags...);
};

// Prepares a vectored write operation for an [[sqe]].
export fn op_writev(
	sqe: *sqe,
	fd: int,
	iov: []rt::iovec,
	offs: size,
	flags: sqe_flags...
) void = {
	preprw(sqe, sqe_op::WRITEV, fd,
		iov: *[*]rt::iovec, len(iov): uint, offs, flags...);
};

// Prepares a read operation for an [[sqe]].
export fn op_read(
	sqe: *sqe,
	fd: int,
	buf: *opaque,
	count: size,
	offs: u64,
	flags: sqe_flags...
) void = {
	assert(count <= types::U32_MAX);
	preprw(sqe, sqe_op::READ, fd, buf, count: u32, offs, flags...);
};

// Prepares a write operation for an [[sqe]].
export fn op_write(
	sqe: *sqe,
	fd: int,
	buf: *opaque,
	count: size,
	offs: u64,
	flags: sqe_flags...
) void = {
	assert(count <= types::U32_MAX);
	preprw(sqe, sqe_op::WRITE, fd, buf, count: u32, offs, flags...);
};

// Prepares a read for a fixed buffer previously registered with
// [[ring_register_buffers]]. The buf and count parameters must refer to an address
// which falls within the buffer referenced by the index parameter.
export fn op_read_fixed(
	sqe: *sqe,
	fd: int,
	buf: *opaque,
	count: size,
	index: u16,
	flags: sqe_flags...
) void = {
	assert(count <= types::U32_MAX);
	preprw(sqe, sqe_op::READ_FIXED, fd, buf, count: u32, 0, flags...);
	sqe.buf_index = index;
};

// Prepares a write for a fixed buffer previously registered with
// [[ring_register_buffers]]. The buf and count parameters must refer to an address
// which falls within the buffer referenced by the index parameter.
export fn op_write_fixed(
	sqe: *sqe,
	fd: int,
	buf: *opaque,
	count: size,
	index: u16,
	flags: sqe_flags...
) void = {
	assert(count <= types::U32_MAX);
	preprw(sqe, sqe_op::WRITE_FIXED, fd, buf, count: u32, 0, flags...);
	sqe.buf_index = index;
};

// Prepares an fsync operation for an [[sqe]]. Note that operations are executed
// in parallel and not are completed in submission order, so an fsync submitted
// after a write may not cause the write to be accounted for by the fsync unless
// [[sqe_flags::IO_LINK]] is used.
export fn op_fsync(
	sqe: *sqe,
	fd: int,
	fsync_flags: op_fsync_flags,
	flags: sqe_flags...
) void = {
	preprw(sqe, sqe_op::FSYNC, fd, null, 0, 0, flags...);
	sqe.fsync_flags = fsync_flags;
};

// Adds a request to poll a file descriptor for the given set of poll events.
// This will only happen once, the poll request must be submitted with a new SQE
// to re-poll the file descriptor later. The caller must call [[sqe_set_data]] to
// provide a user data field in order to use [[op_poll_remove]] to remove this poll
// request later.
export fn op_poll_add(
	sqe: *sqe,
	fd: int,
	poll_mask: uint,
	flags: sqe_flags...
) void = {
	preprw(sqe, sqe_op::POLL_ADD, fd, null, 0, 0, flags...);
	assert(endian::host == &endian::little); // TODO?
	sqe.poll32_events = poll_mask: u32;
};

// Prepares a multishot poll operation for an [[sqe]].
export fn op_poll_add_multishot(
	sqe: *sqe,
	fd: int,
	poll_mask: uint,
	flags: sqe_flags...
) void = {
	op_poll_add(sqe, fd, poll_mask, flags...);
	sqe.length |= rt::IORING_POLL_ADD_MULTI;
};

// Removes an existing poll request by matching the SQE's user_data field. See
// [[sqe_set_data]].
export fn op_poll_remove(sqe: *sqe, user_data: *opaque, flags: sqe_flags...) void = {
	preprw(sqe, sqe_op::POLL_REMOVE, -1, null, 0, 0, flags...);
	sqe_set_data(sqe, user_data);
};

// Prepares a sendmsg operation for an [[sqe]], equivalent to the sendmsg(2)
// system call.
export fn op_sendmsg(
	sqe: *sqe,
	fd: int,
	msghdr: *rt::msghdr,
	sendmsg_flags: int,
	flags: sqe_flags...
) void = {
	preprw(sqe, sqe_op::SENDMSG, fd, msghdr, 0, 0, flags...);
	sqe.msg_flags = sendmsg_flags;
};

// Prepares a recvmsg operation for an [[sqe]], equivalent to the recvmsg(2)
// system call.
export fn op_recvmsg(
	sqe: *sqe,
	fd: int,
	msghdr: *rt::msghdr,
	recvmsg_flags: int,
	flags: sqe_flags...
) void = {
	preprw(sqe, sqe_op::RECVMSG, fd, msghdr, 0, 0, flags...);
	sqe.msg_flags = recvmsg_flags;
};

// Prepares a multishot recvmsg operation for an [[sqe]].
//
// Buffer selection is required.
export fn op_recvmsg_multishot(
	sqe: *sqe,
	fd: int,
	msghdr: *rt::msghdr,
	recvmsg_flags: int,
	flags: sqe_flags...
) void = {
	op_recvmsg(sqe, fd, msghdr, recvmsg_flags, flags...);
	assert(recvmsg_flags & rt::MSG_WAITALL == 0);
	sqe.ioprio |= rt::IORING_RECV_MULTISHOT;
};

// Prepares a send operation for an [[sqe]], equivalent to the send(2) system
// call.
export fn op_send(
	sqe: *sqe,
	fd: int,
	buf: *opaque,
	count: size,
	send_flags: int,
	flags: sqe_flags...
) void = {
	assert(count <= types::U32_MAX);
	preprw(sqe, sqe_op::SEND, fd, buf, count: u32, 0, flags...);
	sqe.msg_flags = send_flags;
};

// Prepares a recv operation for an [[sqe]], equivalent to the recv(2) system
// call.
export fn op_recv(
	sqe: *sqe,
	fd: int,
	buf: *opaque,
	count: size,
	recv_flags: int,
	flags: sqe_flags...
) void = {
	assert(count <= types::U32_MAX);
	preprw(sqe, sqe_op::RECV, fd, buf, count: u32, 0, flags...);
	sqe.msg_flags = recv_flags;
};

// Prepares a multishot recv operation for an [[sqe]].
//
// Buffer selection is required.
export fn op_recv_multishot(
	sqe: *sqe,
	fd: int,
	count: size,
	recv_flags: int,
	flags: sqe_flags...
) void = {
	assert(count == 0);
	assert(recv_flags & rt::MSG_WAITALL == 0);
	preprw(sqe, sqe_op::RECV, fd, null, 0, 0, flags...);
	sqe.msg_flags = recv_flags;
	sqe.ioprio |= rt::IORING_RECV_MULTISHOT;
};

// Prepares a timeout operation for an [[sqe]]. "ts" should be a timespec
// describing the desired timeout, and "events" may optionally be used to define
// a number of completion events to wake after (or zero to wake only after the
// timeout expires). The caller must call [[sqe_set_data]] to provide a user data
// field in order to use [[op_timeout_remove]] to cancel this timeout later.
export fn op_timeout(
	sqe: *sqe,
	ts: *rt::timespec,
	events: uint,
	to_flags: op_timeout_flags,
	flags: sqe_flags...
) void = {
	preprw(sqe, sqe_op::TIMEOUT, 0, ts, 1, events, flags...);
	sqe.timeout_flags = to_flags;
};

// Prepares a multishot timeout operation for an [[sqe]].
export fn op_timeout_multishot(
	sqe: *sqe,
	ts: *rt::timespec,
	events: uint,
	to_flags: op_timeout_flags,
	flags: sqe_flags...
) void = {
	op_timeout(sqe, ts, events, to_flags, flags...);
	sqe.timeout_flags |= op_timeout_flags::MULTISHOT;
};

// Removes an existing timeout request by matching the SQE's user_data field.
// See [[sqe_set_data]].
export fn op_timeout_remove(
	sqe: *sqe,
	user_data: *opaque,
	to_flags: op_timeout_flags,
	flags: sqe_flags...
) void = {
	preprw(sqe, sqe_op::TIMEOUT_REMOVE, 0, user_data, 0, 0, flags...);
	sqe.timeout_flags = to_flags;
};

// Updates an existing timeout request by matching the SQE's user_data field.
// See [[sqe_set_data]].
export fn op_timeout_update(
	sqe: *sqe,
	user_data: *opaque,
	ts: *rt::timespec,
	events: uint,
	to_flags: op_timeout_flags,
	flags: sqe_flags...
) void = {
	preprw(sqe, sqe_op::TIMEOUT_REMOVE, 0, user_data, 0, events, flags...);
	sqe.timeout_flags = to_flags | op_timeout_flags::UPDATE;
	sqe.addr2 = ts;
};

// Prepares a timeout operation for an [[sqe]] which is linked to the previous
// SQE, effectively setting an upper limit on how long that SQE can take to
// complete. "ts" should be a timespec describing the desired timeout. The
// caller must call [[sqe_set_data]] to provide a user data field in order to use
// [[op_timeout_remove]] to cancel this timeout later.
export fn op_link_timeout(
	sqe: *sqe,
	ts: *rt::timespec,
	to_flags: op_timeout_flags,
	flags: sqe_flags...
) void = {
	preprw(sqe, sqe_op::LINK_TIMEOUT, 0, ts, 1, 0, flags...);
	sqe.timeout_flags = to_flags;
};

// Prepares a socket accept operation for an [[sqe]]. Equivalent to accept4(2).
export fn op_accept(
	sqe: *sqe,
	fd: int,
	addr: nullable *rt::sockaddr,
	addrlen: nullable *uint,
	aflags: uint,
	flags: sqe_flags...
) void = {
	preprw(sqe, sqe_op::ACCEPT, fd, addr, 0, 0, flags...);
	sqe.accept_flags = aflags;
	sqe.addr2 = addrlen;
};

// Prepares a multishot accept operation for an [[sqe]].
export fn op_accept_multishot(
	sqe: *sqe,
	fd: int,
	addr: nullable *rt::sockaddr,
	addrlen: nullable *uint,
	aflags: uint,
	flags: sqe_flags...
) void = {
	op_accept(sqe, fd, addr, addrlen, aflags, flags...);
	sqe.ioprio |= rt::IORING_ACCEPT_MULTISHOT;
};

// Prepares an [[sqe]] operation which opens a file. The path must be a C
// string, i.e. NUL terminated; see [[strings::to_c]].
export fn op_openat(
	sqe: *sqe,
	dirfd: int,
	path: *const c::char,
	oflags: int,
	mode: uint,
	flags: sqe_flags...
) void = {
	preprw(sqe, sqe_op::OPENAT, dirfd, path, mode, 0, flags...);
	sqe.open_flags = oflags: u32;
};

// Prepares an [[sqe]] operation which closes a file descriptor.
export fn op_close(sqe: *sqe, fd: int, flags: sqe_flags...) void = {
	preprw(sqe, sqe_op::CLOSE, fd, null, 0, 0, flags...);
};

// Prepares an [[sqe]] operation which provides a buffer pool to the kernel.
// len(pool) must be equal to nbuf * bufsz. See [[sqe_set_buffer_select]] to use
// the buffer pool for a subsequent request.
export fn op_provide_buffers(
	sqe: *sqe,
	group: u16,
	pool: []u8,
	nbuf: size,
	bufsz: size,
	bufid: u16,
	flags: sqe_flags...
) void = {
	assert(len(pool) == nbuf * bufsz);
	preprw(sqe, sqe_op::PROVIDE_BUFFERS, nbuf: int, pool: *[*]u8,
		bufsz: uint, bufid: uint, flags...);
	sqe.buf_group = group;
};

// Removes buffers previously registered with [[op_provide_buffers]].
export fn op_remove_buffers(
	sqe: *sqe,
	nbuf: size,
	group: u16,
	flags: sqe_flags...
) void = {
	preprw(sqe, sqe_op::REMOVE_BUFFERS, nbuf: int, null, 0, 0, flags...);
	sqe.buf_group = group;
};