Warning: Due to various recent migrations, viewing non-HEAD refs may be broken.
/linux/io_uring/cqe.ha (raw)
// SPDX-License-Identifier: MPL-2.0
// (c) 2021 Alexey Yerin <yyp@disroot.org>
// (c) 2021 Drew DeVault <sir@cmpwn.com>
// (c) 2021 Eyal Sawady <ecs@d2evs.net>
// (c) 2025 Runxi Yu <me@runxiyu.org>
use errors;
use rt;
// Advances the completion queue by N items.
export fn cq_advance(ring: *io_uring, n: uint) void = {
*ring.cq.khead = *ring.cq.khead + n;
};
// Call after processing a [[cqe]]. The cqe is returned to the pool and cannot
// be used by the application again.
export fn cqe_seen(ring: *io_uring, cqe: *cqe) void = cq_advance(ring, 1);
// Waits until a CQE is available, then returns it. The caller must pass the
// returned CQE to [[cqe_seen]] to advance the queue.
export fn cqe_wait(ring: *io_uring) (*cqe | error) = {
match (get_cqe(ring, 0, 1)) {
case let err: error =>
return err;
case let cq: nullable *cqe =>
return cq as *cqe; // XXX: Correct?
};
};
// Peeks the next CQE from the queue and returns it, or null if none are
// pending. The caller must pass the returned CQE to [[cqe_seen]] to advance the
// queue.
export fn cqe_peek(ring: *io_uring) (nullable *cqe | error) = get_cqe(ring, 0, 0);
// Returns the result of a [[cqe]], or an error if unsuccessful.
export fn cqe_result(cqe: *cqe) (int | error) = {
if (cqe.res >= 0) {
return cqe.res;
};
switch (-cqe.res) {
case rt::ENOBUFS =>
return nobuffers;
case =>
return errors::errno(-cqe.res: rt::errno);
};
};
// Gets the user data field of a [[cqe]]. See [[sqe_set_data]] for the corresponding
// SQE function.
export fn cqe_get_data(cqe: *cqe) nullable *opaque =
cqe.user_data: uintptr: nullable *opaque;
// Returns the buffer ID used for this [[cqe]] in combination with
// [[sqe_set_buffer_select]]. Aborts the program if this CQE was not configured to
// use a buffer pool.
export fn cqe_get_buffer_id(cqe: *cqe) u16 = {
// TODO: Handle ENOBUFS
assert(cqe.flags & cqe_flags::F_BUFFER > 0,
"get_buffer_id called for CQE without buffer");
return (cqe.flags: u32 >> CQE_BUFFER_SHIFT): u16;
};
fn peek_cqe_(ring: *io_uring) (nullable *cqe, uint) = {
let head = *ring.cq.khead;
let tail = *ring.cq.ktail;
let mask = *ring.cq.kring_mask;
let avail = tail - head;
if (avail == 0) {
return (null, 0);
};
return (&ring.cq.cqes[head & mask], avail);
};
fn get_cqe(
ring: *io_uring,
submit: uint,
wait: uint,
) (nullable *cqe | error) = {
let cq: nullable *cqe = null;
for (cq == null) {
let enter = false, overflow = false;
let flags = enter_flags::NONE;
// TODO: tuple destructuring
let tup = peek_cqe_(ring);
let avail = tup.1;
cq = tup.0;
if (cq == null && wait == 0 && submit == 0) {
if (!needs_flush(ring)) {
return null;
};
overflow = true;
};
if (wait > avail || overflow) {
flags |= enter_flags::GETEVENTS;
enter = true;
};
if (submit > 0) {
needs_enter(ring, &flags);
enter = true;
};
if (!enter) {
break;
};
match (rt::io_uring_enter(ring.fd,
submit, wait, flags: uint, null)) {
case let err: rt::errno =>
return errors::errno(err);
case let n: uint =>
submit -= n;
};
};
return cq;
};