Hi… I am well aware that this diff view is very suboptimal. It will be fixed when the refactored server comes along!
Add some multishot functions
The io_uring module provides access to Linux's io_uring subsystem. The documentation for this module is somewhat scarce: users are presumed to be familiar with io_uring. Thus, it is recommended that a reading of this module is paired with the Linux documentation, which may be available from your local liburing package under the io_uring_setup, io_uring_enter, and io_uring_register man pages. This module may be considered to be roughly equivalent to liburing's abstraction layer.
Note that CQEs from multishot requests will have [[cqe_flags::F_MORE]] set if the application should expect further CQE entries from the original request. If not, the request will no longer generate further events.
// SPDX-License-Identifier: MPL-2.0
// (c) 2021 Alexey Yerin <yyp@disroot.org>
// (c) 2021 Drew DeVault <sir@cmpwn.com>
// (c) 2022 Eyal Sawady <ecs@d2evs.net>
// (c) 2025 Runxi Yu <me@runxiyu.org>
use endian;
use rt;
use types;
use types::c;
fn prep(sq: *sqe, op: sqe_op, flags: sqe_flags...) void = {
rt::memset(sq, 0, size(sqe));
sq.opcode = op;
for (let i = 0z; i < len(flags); i += 1) {
sq.flags |= flags[i];
};
};
fn preprw(
sqe: *sqe,
op: sqe_op,
fd: int,
addr: nullable *opaque,
length: uint,
offs: u64,
flags: sqe_flags...
) void = {
prep(sqe, op, flags...);
sqe.fd = fd;
sqe.addr = addr;
sqe.length = length;
sqe.off = offs;
};
// Sets the user data field of an [[sqe]]. This is copied to the [[cqe]] and can
// be used to correlate a completion event with the original SQE.
export fn sqe_set_data(sqe: *sqe, user_data: *opaque) void = {
static assert(size(uintptr) <= size(u64));
sqe.user_data = user_data: uintptr: u64;
};
// Sets the BUFFER_SELECT flag and sets the desired buffer group. See
// [[op_provide_buffers]] for configuring buffer groups, and [[cqe_get_buffer_id]] to
// retrieve the buffer used from the corresponding [[cqe]].
export fn sqe_set_buffer_select(sqe: *sqe, group: u16) void = {
sqe.flags |= sqe_flags::BUFFER_SELECT;
sqe.buf_group = group;
};
// Prepares a no-op "operation" for an [[sqe]].
export fn op_nop(sqe: *sqe, flags: sqe_flags...) void = {
prep(sqe, sqe_op::NOP, flags...);
};
// Prepares a vectored read operation for an [[sqe]].
export fn op_readv(
sqe: *sqe,
fd: int,
iov: []rt::iovec,
offs: size,
flags: sqe_flags...
) void = {
preprw(sqe, sqe_op::READV, fd,
iov: *[*]rt::iovec, len(iov): uint, offs, flags...);
};
// Prepares a vectored write operation for an [[sqe]].
export fn op_writev(
sqe: *sqe,
fd: int,
iov: []rt::iovec,
offs: size,
flags: sqe_flags...
) void = {
preprw(sqe, sqe_op::WRITEV, fd,
iov: *[*]rt::iovec, len(iov): uint, offs, flags...);
};
// Prepares a read operation for an [[sqe]].
export fn op_read(
sqe: *sqe,
fd: int,
buf: *opaque,
count: size,
offs: u64,
flags: sqe_flags...
) void = {
assert(count <= types::U32_MAX);
preprw(sqe, sqe_op::READ, fd, buf, count: u32, offs, flags...);
};
// Prepares a write operation for an [[sqe]].
export fn op_write(
sqe: *sqe,
fd: int,
buf: *opaque,
count: size,
offs: u64,
flags: sqe_flags...
) void = {
assert(count <= types::U32_MAX);
preprw(sqe, sqe_op::WRITE, fd, buf, count: u32, offs, flags...);
};
// Prepares a read for a fixed buffer previously registered with
// [[ring_register_buffers]]. The buf and count parameters must refer to an address
// which falls within the buffer referenced by the index parameter.
export fn op_read_fixed(
sqe: *sqe,
fd: int,
buf: *opaque,
count: size,
index: u16,
flags: sqe_flags...
) void = {
assert(count <= types::U32_MAX);
preprw(sqe, sqe_op::READ_FIXED, fd, buf, count: u32, 0, flags...);
sqe.buf_index = index;
};
// Prepares a write for a fixed buffer previously registered with
// [[ring_register_buffers]]. The buf and count parameters must refer to an address
// which falls within the buffer referenced by the index parameter.
export fn op_write_fixed(
sqe: *sqe,
fd: int,
buf: *opaque,
count: size,
index: u16,
flags: sqe_flags...
) void = {
assert(count <= types::U32_MAX);
preprw(sqe, sqe_op::WRITE_FIXED, fd, buf, count: u32, 0, flags...);
sqe.buf_index = index;
};
// Prepares an fsync operation for an [[sqe]]. Note that operations are executed
// in parallel and not are completed in submission order, so an fsync submitted
// after a write may not cause the write to be accounted for by the fsync unless
// [[sqe_flags::IO_LINK]] is used.
export fn op_fsync(
sqe: *sqe,
fd: int,
fsync_flags: op_fsync_flags,
flags: sqe_flags...
) void = {
preprw(sqe, sqe_op::FSYNC, fd, null, 0, 0, flags...);
sqe.fsync_flags = fsync_flags;
};
// Adds a request to poll a file descriptor for the given set of poll events.
// This will only happen once, the poll request must be submitted with a new SQE
// to re-poll the file descriptor later. The caller must call [[sqe_set_data]] to
// provide a user data field in order to use [[op_poll_remove]] to remove this poll
// request later.
export fn op_poll_add(
sqe: *sqe,
fd: int,
poll_mask: uint,
flags: sqe_flags...
) void = {
preprw(sqe, sqe_op::POLL_ADD, fd, null, 0, 0, flags...);
assert(endian::host == &endian::little); // TODO?
sqe.poll32_events = poll_mask: u32;
};
// Prepares a multishot poll operation for an [[sqe]].
export fn op_poll_add_multishot(
sqe: *sqe,
fd: int,
poll_mask: uint,
flags: sqe_flags...
) void = {
op_poll_add(sqe, fd, poll_mask, flags...);
sqe.length |= rt::IORING_POLL_ADD_MULTI;
};
// Removes an existing poll request by matching the SQE's user_data field. See
// [[sqe_set_data]].
export fn op_poll_remove(sqe: *sqe, user_data: *opaque, flags: sqe_flags...) void = {
preprw(sqe, sqe_op::POLL_REMOVE, -1, null, 0, 0, flags...);
sqe_set_data(sqe, user_data);
};
// Prepares a sendmsg operation for an [[sqe]], equivalent to the sendmsg(2)
// system call.
export fn op_sendmsg(
sqe: *sqe,
fd: int,
msghdr: *rt::msghdr,
sendmsg_flags: int,
flags: sqe_flags...
) void = {
preprw(sqe, sqe_op::SENDMSG, fd, msghdr, 0, 0, flags...);
sqe.msg_flags = sendmsg_flags;
};
// Prepares a recvmsg operation for an [[sqe]], equivalent to the recvmsg(2)
// system call.
export fn op_recvmsg(
sqe: *sqe,
fd: int,
msghdr: *rt::msghdr,
recvmsg_flags: int,
flags: sqe_flags...
) void = {
preprw(sqe, sqe_op::RECVMSG, fd, msghdr, 0, 0, flags...);
sqe.msg_flags = recvmsg_flags;
};
// Prepares a multishot recvmsg operation for an [[sqe]].
//
// Buffer selection is required.
export fn op_recvmsg_multishot(
sqe: *sqe,
fd: int,
msghdr: *rt::msghdr,
recvmsg_flags: int,
flags: sqe_flags...
) void = {
op_recvmsg(sqe, fd, msghdr, recvmsg_flags, flags...);
assert(recvmsg_flags & rt::MSG_WAITALL == 0);
sqe.ioprio |= rt::IORING_RECV_MULTISHOT;
};
// Prepares a send operation for an [[sqe]], equivalent to the send(2) system
// call.
export fn op_send(
sqe: *sqe,
fd: int,
buf: *opaque,
count: size,
send_flags: int,
flags: sqe_flags...
) void = {
assert(count <= types::U32_MAX);
preprw(sqe, sqe_op::SEND, fd, buf, count: u32, 0, flags...);
sqe.msg_flags = send_flags;
};
// Prepares a recv operation for an [[sqe]], equivalent to the recv(2) system
// call.
export fn op_recv(
sqe: *sqe,
fd: int,
buf: *opaque,
count: size,
recv_flags: int,
flags: sqe_flags...
) void = {
assert(count <= types::U32_MAX);
preprw(sqe, sqe_op::RECV, fd, buf, count: u32, 0, flags...);
sqe.msg_flags = recv_flags;
};
// Prepares a multishot recv operation for an [[sqe]].
//
// Buffer selection is required.
export fn op_recv_multishot(
sqe: *sqe,
fd: int,
count: size,
recv_flags: int,
flags: sqe_flags...
) void = {
assert(count == 0);
assert(recv_flags & rt::MSG_WAITALL == 0);
preprw(sqe, sqe_op::RECV, fd, null, 0, 0, flags...);
sqe.msg_flags = recv_flags;
sqe.ioprio |= rt::IORING_RECV_MULTISHOT;
};
// Prepares a timeout operation for an [[sqe]]. "ts" should be a timespec
// describing the desired timeout, and "events" may optionally be used to define
// a number of completion events to wake after (or zero to wake only after the
// timeout expires). The caller must call [[sqe_set_data]] to provide a user data
// field in order to use [[op_timeout_remove]] to cancel this timeout later.
export fn op_timeout(
sqe: *sqe,
ts: *rt::timespec,
events: uint,
to_flags: op_timeout_flags,
flags: sqe_flags...
) void = {
preprw(sqe, sqe_op::TIMEOUT, 0, ts, 1, events, flags...);
sqe.timeout_flags = to_flags;
};
// Prepares a multishot timeout operation for an [[sqe]].
export fn op_timeout_multishot(
sqe: *sqe,
ts: *rt::timespec,
events: uint,
to_flags: op_timeout_flags,
flags: sqe_flags...
) void = {
op_timeout(sqe, ts, events, to_flags, flags...);
sqe.timeout_flags |= op_timeout_flags::MULTISHOT;
};
// Removes an existing timeout request by matching the SQE's user_data field.
// See [[sqe_set_data]].
export fn op_timeout_remove(
sqe: *sqe,
user_data: *opaque,
to_flags: op_timeout_flags,
flags: sqe_flags...
) void = {
preprw(sqe, sqe_op::TIMEOUT_REMOVE, 0, user_data, 0, 0, flags...);
sqe.timeout_flags = to_flags;
};
// Updates an existing timeout request by matching the SQE's user_data field.
// See [[sqe_set_data]].
export fn op_timeout_update(
sqe: *sqe,
user_data: *opaque,
ts: *rt::timespec,
events: uint,
to_flags: op_timeout_flags,
flags: sqe_flags...
) void = {
preprw(sqe, sqe_op::TIMEOUT_REMOVE, 0, user_data, 0, events, flags...);
sqe.timeout_flags = to_flags | op_timeout_flags::UPDATE;
sqe.addr2 = ts;
};
// Prepares a timeout operation for an [[sqe]] which is linked to the previous
// SQE, effectively setting an upper limit on how long that SQE can take to
// complete. "ts" should be a timespec describing the desired timeout. The
// caller must call [[sqe_set_data]] to provide a user data field in order to use
// [[op_timeout_remove]] to cancel this timeout later.
export fn op_link_timeout(
sqe: *sqe,
ts: *rt::timespec,
to_flags: op_timeout_flags,
flags: sqe_flags...
) void = {
preprw(sqe, sqe_op::LINK_TIMEOUT, 0, ts, 1, 0, flags...);
sqe.timeout_flags = to_flags;
};
// Prepares a socket accept operation for an [[sqe]]. Equivalent to accept4(2).
export fn op_accept(
sqe: *sqe,
fd: int,
addr: nullable *rt::sockaddr,
addrlen: nullable *uint,
aflags: uint,
flags: sqe_flags...
) void = {
preprw(sqe, sqe_op::ACCEPT, fd, addr, 0, 0, flags...);
sqe.accept_flags = aflags;
sqe.addr2 = addrlen;
};
// Prepares a multishot accept operation for an [[sqe]].
export fn op_accept_multishot(
sqe: *sqe,
fd: int,
addr: nullable *rt::sockaddr,
addrlen: nullable *uint,
aflags: uint,
flags: sqe_flags...
) void = {
op_accept(sqe, fd, addr, addrlen, aflags, flags...);
sqe.ioprio |= rt::IORING_ACCEPT_MULTISHOT;
};
// Prepares an [[sqe]] operation which opens a file. The path must be a C
// string, i.e. NUL terminated; see [[strings::to_c]].
export fn op_openat(
sqe: *sqe,
dirfd: int,
path: *const c::char,
oflags: int,
mode: uint,
flags: sqe_flags...
) void = {
preprw(sqe, sqe_op::OPENAT, dirfd, path, mode, 0, flags...);
sqe.open_flags = oflags: u32;
};
// Prepares an [[sqe]] operation which closes a file descriptor.
export fn op_close(sqe: *sqe, fd: int, flags: sqe_flags...) void = {
preprw(sqe, sqe_op::CLOSE, fd, null, 0, 0, flags...);
};
// Prepares an [[sqe]] operation which provides a buffer pool to the kernel.
// len(pool) must be equal to nbuf * bufsz. See [[sqe_set_buffer_select]] to use
// the buffer pool for a subsequent request.
export fn op_provide_buffers(
sqe: *sqe,
group: u16,
pool: []u8,
nbuf: size,
bufsz: size,
bufid: u16,
flags: sqe_flags...
) void = {
assert(len(pool) == nbuf * bufsz);
preprw(sqe, sqe_op::PROVIDE_BUFFERS, nbuf: int, pool: *[*]u8,
bufsz: uint, bufid: uint, flags...);
sqe.buf_group = group;
};
// Removes buffers previously registered with [[op_provide_buffers]].
export fn op_remove_buffers(
sqe: *sqe,
nbuf: size,
group: u16,
flags: sqe_flags...
) void = {
preprw(sqe, sqe_op::REMOVE_BUFFERS, nbuf: int, null, 0, 0, flags...);
sqe.buf_group = group;
};
// SPDX-License-Identifier: MPL-2.0
// (c) 2021 Alexey Yerin <yyp@disroot.org>
// (c) 2021 Drew DeVault <sir@cmpwn.com>
// (c) 2021-2022 Eyal Sawady <ecs@d2evs.net>
// (c) 2025 Runxi Yu <me@runxiyu.org>
use errors;
// Returned when buffer pool use was configured for an [[sqe]], but there are no
// buffers available.
export type nobuffers = !void;
// All errors which may be returned by this module.
export type error = !(errors::error | nobuffers);
// Converts an [[error]] into a human-readable string.
export fn strerror(err: error) const str = {
match (err) {
case nobuffers =>
return "Buffer pool exhausted";
case let err: errors::error =>
return errors::strerror(err);
};
};
// The maximum value for the first parameter of [[ring_init]].
export def MAX_ENTRIES: uint = 4096;
def CQE_BUFFER_SHIFT: u32 = 16;
def OFF_SQ_RING: u64 = 0;
def OFF_CQ_RING: u64 = 0x8000000;
def OFF_SQES: u64 = 0x10000000;
// An io_uring [[sqe]] operation.
export type sqe_op = enum u8 {
NOP,
READV,
WRITEV,
FSYNC,
READ_FIXED,
WRITE_FIXED,
POLL_ADD,
POLL_REMOVE,
SYNC_FILE_RANGE,
SENDMSG,
RECVMSG,
TIMEOUT,
TIMEOUT_REMOVE,
ACCEPT,
ASYNC_CANCEL,
LINK_TIMEOUT,
CONNECT,
FALLOCATE,
OPENAT,
CLOSE,
FILES_UPDATE,
STATX,
READ,
WRITE,
FADVISE,
MADVISE,
SEND,
RECV,
OPENAT2,
EPOLL_CTL,
SPLICE,
PROVIDE_BUFFERS,
REMOVE_BUFFERS,
TEE,
};
// Flags for an [[sqe]].
export type sqe_flags = enum u8 {
NONE = 0,
// Use fixed fileset
FIXED_FILE = 1 << 0,
// Issue after inflight IO
IO_DRAIN = 1 << 1,
// Links next sqe
IO_LINK = 1 << 2,
// Like LINK, but stronger
IO_HARDLINK = 1 << 3,
// Always go async
ASYNC = 1 << 4,
// Select buffer from sqe.buf_group
BUFFER_SELECT = 1 << 5,
};
// Flags for an fsync operation.
export type op_fsync_flags = enum u32 {
NONE = 0,
DATASYNC = 1 << 0,
};
// Flags for a timeout operation.
export type op_timeout_flags = enum u32 {
NONE = 0,
// If set, the timeout will be "absolute", waiting until CLOCK_MONOTONIC
// reaches the time defined by the timespec. If unset, it will be
// interpted as a duration relative to the I/O submission.
ABS = 1 << 0,
// When combined with [[sqe_op::TIMEOUT_REMOVE]], causes the submission to
// When combined with [[sqe_op::TIMEOUT_REMOVE]], causes the submission to
// update the timer rather than remove it. UPDATE = 1 << 1,
// Use CLOCK_BOOTTIME instead of CLOCK_MONOTONIC when ABS is set. // Does not make sense when ABS is unset. BOOTTIME = 1 << 2, // Use CLOCK_REALTIME instead of CLOCK_MONOTONIC when ABS is set. // Does not make sense when ABS is unset. REALTIME = 1 << 3, // Targets a linked timeout when used with [[op_timeout_flags::UPDATE]]. LINK_TIMEOUT_UPDATE = 1 << 4, // Treat -ETIME completions as success. ETIME_SUCCESS = 1 << 5, // Make the timeout generate multiple completions until cancelled. MULTISHOT = 1 << 6,
};
// Flags for a splice operation.
export type op_splice_flags = enum u32 {
NONE = 0,
F_FD_IN_FIXED = 1 << 31,
};
// Flags for a [[cqe]].
export type cqe_flags = enum u32 {
NONE = 0,
F_BUFFER = 1 << 0,
F_MORE = 1 << 1,
};
// A submission queue entry.
export type sqe = struct {
opcode: sqe_op,
flags: sqe_flags,
ioprio: u16,
fd: i32,
union {
off: u64,
addr2: nullable *opaque,
},
union {
addr: nullable *opaque,
splice_off_in: u64,
},
length: u32,
union {
rw_flags: int,
fsync_flags: op_fsync_flags,
poll_events: u16,
poll32_events: u32,
sync_range_flags: u32,
msg_flags: int,
timeout_flags: op_timeout_flags,
accept_flags: u32,
cancel_flags: u32,
open_flags: u32,
statx_flags: u32,
fadvise_advice: u32,
splice_flags: op_splice_flags,
},
user_data: u64,
union {
struct {
union {
buf_index: u16,
buf_group: u16,
},
personality: u16,
splice_fd_in: i32,
},
pad2: [3]u64,
},
};
// A completion queue entry.
export type cqe = struct {
user_data: u64,
// Consider using [[cqe_result]] instead.
res: i32,
flags: cqe_flags,
};
// Filled with the offset for mmap(2)
export type sq_offsets = struct {
head: u32,
tail: u32,
ring_mask: u32,
ring_entries: u32,
flags: u32,
dropped: u32,
array: u32,
resv1: u32,
resv2: u64,
};
// Flags for the sq ring.
export type sq_flags = enum u32 {
NONE = 0,
// Needs io_uring_enter wakeup
NEED_WAKEUP = 1 << 0,
// CQ ring is overflown
CQ_OVERFLOW = 1 << 1,
};
// Filled with the offset for mmap(2)
export type cq_offsets = struct {
head: u32,
tail: u32,
ring_mask: u32,
ring_entries: u32,
overflow: u32,
cqes: u32,
flags: u32,
resv1: u32,
resv2: u64,
};
// Flags for the cq ring.
export type cq_flags = enum u32 {
NONE = 0,
EVENTFD_DISABLED = 1 << 0,
};
// Flags for setup operation.
export type ring_setup_flags = enum u32 {
NONE = 0,
// io_context is polled
IOPOLL = 1 << 0,
// SQ poll thread
SQPOLL = 1 << 1,
// sq_thread_cpu is valid
SQ_AFF = 1 << 2,
// App defines CQ size
CQSIZE = 1 << 3,
// Clamp SQ/CQ ring sizes
CLAMP = 1 << 4,
// Attach to existing wq
ATTACH_WQ = 1 << 5,
// Start with ring disabled
R_DISABLED = 1 << 6,
};
// Parameters for [[ring_init]]. Partially completed by the kernel.
export type ring_params = struct {
sq_entries: u32,
cq_entries: u32,
flags: ring_setup_flags,
sq_thread_cpu: u32,
sq_thread_idle: u32,
features: ring_features,
wq_fd: u32,
resv: [3]u32,
sq_off: sq_offsets,
cq_off: cq_offsets,
};
// Features supported by the kernel.
export type ring_features = enum u32 {
NONE = 0,
SINGLE_MMAP = 1 << 0,
NODROP = 1 << 1,
SUBMIT_STABLE = 1 << 2,
RW_CUR_POS = 1 << 3,
CUR_PERSONALITY = 1 << 4,
FAST_POLL = 1 << 5,
POLL_32BITS = 1 << 6,
};
// Flags for enter operation.
type enter_flags = enum uint {
NONE = 0,
GETEVENTS = 1 << 0,
SQ_WAKEUP = 1 << 1,
SQ_WAIT = 1 << 2,
};
// Register operations.
export type ring_register_op = enum uint {
REGISTER_BUFFERS,
UNREGISTER_BUFFERS,
REGISTER_FILES,
UNREGISTER_FILES,
REGISTER_EVENTFD,
UNREGISTER_EVENTFD,
REGISTER_FILES_UPDATE,
REGISTER_EVENTFD_ASYNC,
REGISTER_PROBE,
REGISTER_PERSONALITY,
UNREGISTER_PERSONALITY,
REGISTER_RESTRICTIONS,
REGISTER_ENABLE_RINGS,
};
// Information for a REGISTER_FILES_UPDATE operation.
export type ring_files_update = struct {
offs: u32,
resv: u32,
fds: *int,
};
// // Flags for a probe operation.
// export type op_probe_flags = enum u16 {
// NONE = 0,
// SUPPORTED = 1 << 0,
// };
//
// // REGISTER_PROBE operation details.
// export type probe_op = struct {
// op: u8,
// resv: u8,
// flags: op_probe_flags,
// resv2: u32,
// };
//
// // Summary of REGISTER_PROBE results.
// export type probe = struct {
// last_op: u8,
// ops_len: u8,
// resv: u16,
// resv2: [3]u32,
// ops: [*]probe_op,
// };
// Details for a REGISTER_RESTRICTIONS operation.
export type ring_register_restriction_details = struct {
opcode: ring_register_restriction_op,
union {
register_op: ring_register_op,
sqe_op: sqe_op,
flags: sqe_flags,
},
resv: u8,
resv2: [3]u32,
};
// Opcode for a [[ring_register_restriction_details]].
export type ring_register_restriction_op = enum u16 {
NONE = 0,
// Allow an io_uring_register(2) opcode
REGISTER_OP = 0,
// Allow an sqe opcode
SQE_OP = 1,
// Allow sqe flags
SQE_FLAGS_ALLOWED = 2,
// Require sqe flags (these flags must be set on each submission)
SQE_FLAGS_REQUIRED = 3,
};
// State for an io_uring.
export type io_uring = struct {
sq: sq,
cq: cq,
fd: int,
flags: ring_setup_flags,
features: ring_features,
};
// Submission queue state.
export type sq = struct {
khead: *uint,
ktail: *uint,
kring_mask: *uint,
kring_entries: *uint,
kflags: *sq_flags,
kdropped: *uint,
array: *[*]uint,
sqes: *[*]sqe,
sqe_head: uint,
sqe_tail: uint,
ring_sz: size,
ring_ptr: *opaque,
};
// Completion queue state.
export type cq = struct {
khead: *uint,
ktail: *uint,
kring_mask: *uint,
kring_entries: *uint,
kflags: *cq_flags,
koverflow: *uint,
cqes: *[*]cqe,
ring_sz: size,
ring_ptr: *opaque,
};