more asyncio foo

This commit is contained in:
Nadja Reitzenstein 2021-11-24 21:39:11 +01:00
parent ec78aa6fc9
commit ad5c4061de
13 changed files with 723 additions and 68 deletions

View File

@ -13,4 +13,13 @@ nix = "0.23"
bitflags = "1.3"
ptr_meta = "0.1"
# SegQueue for task waiting on CQE or SQE being available again.
crossbeam-queue = "0.3"
# AsyncRead, AsyncWrite, AsyncSeek and related traits
futures-io = "0.3"
[dev-dependencies]
# As Mr. Torgue would put it: THE MOST EXTREME F*CKING ASYNC FUNCTION RUNNNER! EXPLOSIONS!
extreme = "666.666.666666"
futures-lite = "1.12"

View File

@ -0,0 +1,63 @@
use std::fs::File;
use std::future::Future;
use std::io;
use std::os::unix::prelude::AsRawFd;
use std::sync::Arc;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use asyncio::ctypes::IORING_OP;
use asyncio::io_uring::IoUring;
use futures_lite::io::AsyncReadExt;
pub fn drive<T>(iouring: &IoUring, mut f: impl Future<Output = io::Result<T>>) -> io::Result<T> {
static VTABLE: RawWakerVTable = RawWakerVTable::new(
|clone_me| unsafe {
let arc = Arc::from_raw(clone_me);
std::mem::forget(arc.clone());
RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
},
|wake_me| unsafe { Arc::from_raw(wake_me); },
|wake_by_ref_me| unsafe {},
|drop_me| unsafe { drop(Arc::from_raw(drop_me)) },
);
let mut f = unsafe { std::pin::Pin::new_unchecked(&mut f) };
let park = Arc::new(());
let sender = Arc::into_raw(park.clone());
let raw_waker = RawWaker::new(sender as *const _, &VTABLE);
let waker = unsafe { Waker::from_raw(raw_waker) };
let mut cx = Context::from_waker(&waker);
loop {
match f.as_mut().poll(&mut cx) {
Poll::Ready(t) => return t,
Poll::Pending => {
iouring.handle_completions();
match iouring.submit_wait() {
Ok(_) => {}
Err(e) => return Err(e),
}
}
}
}
}
fn main() {
let file = File::open("/tmp/poem").unwrap();
let fd = file.as_raw_fd();
let mut ring: &'static IoUring = Box::leak(Box::new(IoUring::setup(4).unwrap()));
let mut async_file = asyncio::fs::File::new(fd, ring);
let mut buf = Box::new([0u8; 4096]);
let f = async move {
let len = async_file.read(&mut buf[..]).await?;
println!("Read {} bytes:", len);
let str = unsafe { std::str::from_utf8_unchecked(&buf[..len]) };
println!("{}", str);
Ok(())
};
drive(ring, f);
}

View File

@ -1,13 +1,54 @@
use std::fs::File;
use std::os::unix::prelude::AsRawFd;
use asyncio::ctypes::IORING_OP;
use asyncio::io_uring::IoUring;
fn main() {
let file = File::open("")
let ring = IoUring::setup(64).unwrap();
let cqes = ring.cqes();
ring.try_prepare(1, |sqes| {
let sqe = sqes.next().unwrap();
sqe.set_opcode(IORING_OP::READ)
let file = File::open("/tmp/poem").unwrap();
let fd = file.as_raw_fd();
let ring = IoUring::setup(4).unwrap();
let mut cqes = ring.cqes();
let buf = Box::new([0u8; 4096]);
ring.try_prepare(3, |mut sqes| {
let mut sqe = sqes.next().unwrap();
sqe.set_opcode(IORING_OP::READ);
sqe.set_address(buf.as_ptr() as u64);
sqe.set_fd(fd);
sqe.set_len(4096);
let mut sqe = sqes.next().unwrap();
sqe.set_opcode(IORING_OP::NOP);
sqe.set_userdata(0xCAFEBABE);
let mut sqe = sqes.next().unwrap();
sqe.set_opcode(IORING_OP::NOP);
sqe.set_userdata(0xDEADBEEF);
}).unwrap();
let mut amt = 0;
while amt == 0 {
amt = ring.submit().unwrap();
}
println!("{}", amt);
for _ in 0..3 {
let mut cqe = None;
while cqe.is_none() {
cqe = cqes.next();
}
let cqe = cqe.unwrap();
println!("{:?}", cqe);
if cqe.user_data == 0xCAFEBABE {
println!("cafebabe");
} else if cqe.user_data == 0xDEADBEEF {
println!("deadbeef");
}
if let Ok(len) = cqe.result() {
let out = unsafe { std::str::from_utf8_unchecked(&buf[0..len as usize]) };
println!("{}", out);
}
}
}

View File

@ -1,19 +0,0 @@
Dunkel wars, der Mond schien helle,
schneebedeckt die grüne Flur,
als ein Wagen blitzesschnelle,
langsam um die Ecke fuhr.
Drinnen saßen stehend Leute,
schweigend ins Gespräch vertieft,
als ein totgeschossner Hase
auf der Sandbank Schlittschuh lief.
Und ein blondgelockter Jüngling
mit kohlrabenschwarzem Haar
saß auf einer grünen Kiste,
die rot angestrichen war.
Neben ihm ne alte Schrulle,
zählte kaum erst sechzehn Jahr,
in der Hand ne Butterstulle,
die mit Schmalz bestrichen war.

View File

@ -5,23 +5,32 @@ use std::mem::ManuallyDrop;
use std::task::Waker;
use crate::cancellation::Cancellation;
pub struct Completion<'cx> {
// TODO: Completions for linked requests? How would you handle having multiple results? In one
// Completion struct or using multiple? If the latter, prepare needs to set user_data
// for all intermediary SQE explicitly.
pub struct Completion {
state: ManuallyDrop<Box<Cell<State>>>,
marker: PhantomData<fn(&'cx ()) -> &'cx ()>,
}
enum State {
Submitted(Waker),
Completed(io::Result<u32>),
Completed(io::Result<i32>),
Cancelled(Cancellation),
Empty,
}
impl<'cx> Completion<'cx> {
impl Completion {
pub fn new(waker: Waker) -> Self {
Self {
state: ManuallyDrop::new(Box::new(Cell::new(State::Submitted(waker)))),
marker: PhantomData,
}
}
pub(crate) unsafe fn from_raw(ptr: u64) -> Self {
let ptr = ptr as usize as *mut Cell<State>;
let state = ManuallyDrop::new(Box::from_raw(ptr));
Self {
state,
}
}
@ -29,7 +38,7 @@ impl<'cx> Completion<'cx> {
self.state.as_ptr() as *const _ as usize as u64
}
pub fn check(self, waker: &Waker) -> Result<io::Result<u32>, Self> {
pub fn check(self, waker: &Waker) -> Result<io::Result<i32>, Self> {
match self.state.replace(State::Empty) {
State::Submitted(old_waker) => {
// If the given waker wakes a different task than the one we were constructed
@ -59,7 +68,7 @@ impl<'cx> Completion<'cx> {
}
}
pub fn complete(self, result: io::Result<u32>) {
pub fn complete(self, result: io::Result<i32>) {
match self.state.replace(State::Completed(result)) {
State::Submitted(waker) => {
waker.wake();

View File

@ -1,8 +1,12 @@
use std::cell::UnsafeCell;
use std::os::unix::prelude::RawFd;
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::{AtomicU32, compiler_fence, Ordering};
use std::task::{Context, Poll, Waker};
use crossbeam_queue::SegQueue;
use nix::sys::mman::munmap;
use crate::completion::Completion;
use crate::cqe::CQE;
use crate::ctypes::{CQOffsets, IORING_CQ};
@ -15,18 +19,24 @@ pub struct CQ {
/// main problem that can happen otherwise is that the kernel assumes it lost completions
/// which we already successfully pulled from the queue.
head: &'static AtomicU32,
cached_head: UnsafeCell<u32>,
/// Tail of the completion queue. Moved by the kernel when new completions are stored.
///
/// Since this is modified by the kernel we should use atomic operations to read it, making
/// sure both the kernel and any program have a consistent view of its contents.
tail: &'static AtomicU32,
/// A cached version of `tail` which additionally counts reserved slots for future
/// completions, i.e. slots that the kernel will fill in the future.
predicted_tail: UnsafeCell<u32>,
ring_mask: u32,
num_entries: u32,
flags: &'static AtomicU32,
entries: &'static [CQE],
waiters: SegQueue<Waker>,
// cq_ptr is set to `None` if we used a single mmap for both SQ and CQ.
cq_ptr: *mut libc::c_void,
cq_map_size: usize,
@ -58,8 +68,8 @@ impl CQ {
let num_entries = *(ptr.offset(offs.ring_entries as isize).cast());
let head: &AtomicU32 = &*(ptr.offset(offs.head as isize).cast());
let cached_head = UnsafeCell::new(head.load(Ordering::Acquire));
let tail: &AtomicU32 = &*(ptr.offset(offs.tail as isize).cast());
let predicted_tail = UnsafeCell::new(head.load(Ordering::Acquire));
let flags: &AtomicU32 = &*(ptr.offset(offs.flags as isize).cast());
let entries = std::slice::from_raw_parts(
ptr.offset(offs.cqes as isize).cast(),
@ -68,7 +78,7 @@ impl CQ {
Self {
head,
cached_head,
predicted_tail,
tail,
ring_mask,
num_entries,
@ -76,18 +86,61 @@ impl CQ {
entries,
waiters: SegQueue::new(),
// Only store a pointer if we used a separate mmap() syscall for the CQ
cq_ptr: if split_mmap { ptr } else { std::ptr::null_mut() },
cq_map_size,
}
}
#[inline(always)]
fn predicted_tail(&self) -> &mut u32 {
unsafe { &mut *self.predicted_tail.get() }
}
#[inline(always)]
/// Currently used + reserved slots
pub fn used(&self) -> u32 {
let tail = *self.predicted_tail();
let head = self.head.load(Ordering::Relaxed);
compiler_fence(Ordering::Acquire);
tail.wrapping_sub(head)
}
#[inline(always)]
/// Amount of available slots taking reservations into account.
pub fn available(&self) -> u32 {
self.num_entries - self.used()
}
/// Try to reserve a number of CQ slots to make sure that
pub fn try_reserve(&self, count: u32) -> bool {
if self.available() >= count {
let tail = self.predicted_tail();
*tail = (*tail).wrapping_add(count);
true
} else {
false
}
}
pub fn poll_reserve(self: Pin<&mut Self>, ctx: &mut Context<'_>, count: u32) -> Poll<()> {
if self.available() >= count {
Poll::Ready(())
} else {
self.waiters.push(ctx.waker().clone());
Poll::Pending
}
}
pub fn get_next(&self) -> Option<&CQE> {
let tail = self.tail.load(Ordering::Acquire);
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Relaxed);
let head = self.head.load(Ordering::Relaxed);
if tail == head {
None
} else {
compiler_fence(Ordering::Acquire);
self.head.fetch_add(1, Ordering::Release);
let index = (head & self.ring_mask) as usize;
Some(&self.entries[index])
@ -95,8 +148,153 @@ impl CQ {
}
pub fn ready(&self) -> u32 {
let tail = self.tail.load(Ordering::Acquire);
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Relaxed);
let head = self.head.load(Ordering::Relaxed);
compiler_fence(Ordering::Acquire);
tail.wrapping_sub(head)
}
pub fn handle(&self, handler: impl Fn(&CQE)) {
let tail = self.tail.load(Ordering::Relaxed);
let head = self.head.load(Ordering::Relaxed);
for i in head..tail {
let index = (i & self.ring_mask) as usize;
let cqe = &self.entries[index];
handler(cqe);
}
compiler_fence(Ordering::Acquire);
self.head.store(tail, Ordering::Release);
}
#[cfg(test)]
fn test_insert_cqe(&self, cqe: impl Iterator<Item=CQE>) {
let head = self.head.load(Ordering::Relaxed);
let mut tail = self.tail.load(Ordering::Acquire);
unsafe {
for entry in cqe {
let index = (tail & self.ring_mask) as usize;
// Yes, this is absolutely not safe or defined behaviour in the first place. This
// function must *never* be used outside simple testing setups.
let ptr = &self.entries[index] as *const _ as *mut CQE;
ptr.write(entry);
tail += 1;
// If we would overflow, crash instead
assert!((tail - head) <= self.num_entries, "test_insert_cqe overflowed the buffer");
}
}
self.tail.store(tail, Ordering::Release);
}
}
mod tests {
use std::sync::atomic::AtomicU64;
use super::*;
fn gen_cq(num_entries: u32) -> CQ {
let head = Box::leak(Box::new(AtomicU32::new(0)));
let tail = Box::leak(Box::new(AtomicU32::new(0)));
let flags = Box::leak(Box::new(AtomicU32::new(0)));
let entries = Box::leak((0..num_entries).map(|_| CQE::default()).collect());
CQ {
head,
tail,
predicted_tail: UnsafeCell::new(0),
ring_mask: num_entries - 1,
num_entries,
flags,
entries,
cq_ptr: std::ptr::null_mut(),
cq_map_size: 0,
waiters: SegQueue::new(),
}
}
#[test]
fn test_test_insert_cqe() {
let cq = gen_cq(4);
cq.test_insert_cqe([
CQE {
user_data: 1,
.. Default::default()
},
CQE {
user_data: 2,
.. Default::default()
},
CQE {
user_data: 3,
.. Default::default()
},
CQE {
user_data: 4,
.. Default::default()
},
].into_iter());
println!("{:?}", cq.entries);
for i in 0..4 {
assert_eq!(cq.entries[i].user_data, (i+1) as u64);
}
}
#[test]
#[should_panic]
fn test_test_insert_cqe_overflow() {
let cq = gen_cq(2);
cq.test_insert_cqe([
CQE {
user_data: 1,
.. Default::default()
},
CQE {
user_data: 2,
.. Default::default()
},
CQE {
user_data: 3,
.. Default::default()
},
CQE {
user_data: 4,
.. Default::default()
},
].into_iter());
println!("{:?}", cq.entries);
}
#[test]
fn test_cq_reserve_insert() {
let cq = gen_cq(4);
assert_eq!(cq.tail.load(Ordering::Relaxed), 0);
assert_eq!(cq.head.load(Ordering::Relaxed), 0);
assert_eq!(*cq.predicted_tail(), 0);
cq.try_reserve(2);
assert_eq!(cq.tail.load(Ordering::Relaxed), 0);
assert_eq!(*cq.predicted_tail(), 2);
cq.test_insert_cqe([
CQE {
user_data: 1,
.. Default::default()
},
CQE {
user_data: 2,
.. Default::default()
},
].into_iter());
assert_eq!(cq.head.load(Ordering::Relaxed), 0);
assert_eq!(cq.tail.load(Ordering::Relaxed), 2);
assert_eq!(*cq.predicted_tail(), 2);
let mut o = AtomicU64::new(1);
cq.handle(|cqe| {
assert_eq!(cqe.user_data, o.fetch_add(1, Ordering::Relaxed))
});
assert_eq!(o.load(Ordering::Relaxed), 3);
}
}

View File

@ -9,7 +9,7 @@ use crate::io_uring::{IoUring};
/// Completion Queue Event
pub struct CQE {
pub user_data: u64,
res: i32,
pub(crate) res: i32,
pub flags: IOCQE,
}

47
runtime/asyncio/src/fs.rs Normal file
View File

@ -0,0 +1,47 @@
use std::cell::Cell;
use std::io::IoSliceMut;
use std::os::unix::prelude::RawFd;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_io::AsyncRead;
use crate::completion::Completion;
use crate::ctypes::IORING_OP;
use crate::io_uring::IoUring;
use crate::sqe::{SQE, SQEs};
use crate::submission::Submission;
pub struct File {
fd: RawFd,
submission: Submission,
}
impl File {
pub fn new(fd: RawFd, io_uring: &'static IoUring) -> Self {
Self { fd, submission: Submission::new(io_uring) }
}
fn prepare_read<'sq>(
fd: RawFd,
buf: &mut [u8],
sqes: &mut SQEs<'sq>,
) -> SQE<'sq>
{
let mut sqe = sqes.next().expect("prepare_read requires at least one SQE");
sqe.set_opcode(IORING_OP::READ);
sqe.set_address(buf.as_ptr() as u64);
sqe.set_fd(fd);
sqe.set_len(buf.len() as i32);
sqe
}
}
impl AsyncRead for File {
fn poll_read(mut self: Pin<&mut Self>, ctx: &mut Context<'_>, buf: &mut [u8])
-> Poll<std::io::Result<usize>>
{
let fd = self.fd;
Pin::new(&mut self.submission).poll(ctx, 1, |sqes| {
Self::prepare_read(fd, buf, sqes)
}).map(|res| res.map(|val| val as usize))
}
}

View File

@ -1,20 +1,22 @@
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::io;
use std::marker::PhantomData;
use std::mem::{size_of, align_of};
use std::ops::Deref;
use std::sync::atomic::{AtomicU32, Ordering};
use std::os::unix::prelude::RawFd;
use std::pin::Pin;
use std::ptr::NonNull;
use std::task::{Context, Poll, Waker};
use crossbeam_queue::SegQueue;
use std::sync::Arc;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use nix::sys::{mman, mman::{MapFlags, ProtFlags}};
use crate::completion::Completion;
use crate::cq::CQ;
use crate::cqe::{CQE, CQEs};
use crate::ctypes::{CQOffsets, IORING_ENTER, SQOffsets};
use crate::sq::SQ;
use crate::sqe::SQEs;
use crate::sqe::{SQE, SQEs};
use super::ctypes::{Params, io_uring_sqe, IORING_CQ, IORING_FEAT,
IORING_OFF_CQ_RING, IORING_OFF_SQ_RING, IORING_OFF_SQES, IORING_SQ};
use super::syscall;
@ -25,8 +27,6 @@ pub struct IoUring {
params: Params,
sq: SQ,
cq: CQ,
waiting: SegQueue<(u32, Waker)>,
}
unsafe fn mmap(map_size: usize, fd: RawFd, offset: i64) -> nix::Result<*mut libc::c_void> {
@ -49,7 +49,6 @@ impl IoUring {
(params.sq_entries as usize) * size_of::<u32>();
let mut cq_map_size = (params.cq_off.cqes as usize) +
(params.cq_entries as usize) * size_of::<CQE>();
println!("{:?} {}", params.sq_off, sq_map_size);
// If we can use a single mmap() syscall to map sq, cq and cqe the size of the total map
// is the largest of `sq_map_size` and `cq_map_size`.
@ -58,7 +57,6 @@ impl IoUring {
cq_map_size = sq_map_size;
}
println!("{:?}", params.cq_off);
let sq_ptr = unsafe {
mmap(sq_map_size as usize, fd, IORING_OFF_SQ_RING as i64)?
};
@ -99,29 +97,72 @@ impl IoUring {
params,
sq,
cq,
waiting: SegQueue::new(),
})
}
pub fn try_prepare<'cx>(
pub fn try_prepare(
&self,
count: u32,
prepare: impl FnOnce(SQEs<'_>)
) -> Option<()> {
// TODO: Lock the required amount of slots on both submission and completion queue, then
// construct the sqes.
self.handle_completions();
if !self.cq.try_reserve(count) {
return None;
}
if let Some(sqes) = self.sq.try_reserve(count) {
Some(prepare(sqes))
let start = sqes.start();
prepare(sqes);
self.sq.prepare(start, count);
Some(())
} else {
None
}
}
pub fn submit(&self) -> io::Result<u32> {
self.sq.submit(self.fd)
pub fn poll_prepare<'cx>(
mut self: Pin<&mut Self>,
ctx: &mut Context<'cx>,
count: u32,
prepare: impl for<'sq> FnOnce(SQEs<'sq>, &mut Context<'cx>) -> Completion
) -> Poll<Completion> {
Pin::new(&mut self.sq).poll_prepare(ctx, count, prepare)
}
pub fn cqes(&self) -> CQEs {
CQEs::new(&self.cq)
pub fn poll_submit<'cx>(
mut self: Pin<&mut Self>,
ctx: &mut Context<'cx>,
head: u32,
) -> Poll<()> {
let fd = self.fd;
Pin::new(&mut self.sq).poll_submit(ctx, fd, head)
}
pub fn submit_wait(&self) -> io::Result<u32> {
self.sq.submit_wait(self.fd)
}
pub fn handle_completions(&self) {
self.cq.handle(|cqe| {
let udata = cqe.user_data;
if udata != 0 {
let completion = unsafe {
Completion::from_raw(udata)
};
completion.complete(cqe.result())
}
});
}
}
impl Future for &IoUring {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.handle_completions();
match self.sq.submit(self.fd, Some(cx.waker())) {
Ok(_) => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
}
}
}

View File

@ -9,9 +9,11 @@ mod sqe;
mod cq;
mod cqe;
mod submission;
mod completion;
mod cancellation;
pub mod fs;
#[macro_export]
macro_rules! ready {

View File

@ -1,13 +1,17 @@
use std::cell::UnsafeCell;
use std::cell::{Cell, UnsafeCell};
use std::fmt::{Debug, Formatter};
use std::io;
use std::mem::ManuallyDrop;
use std::os::unix::prelude::RawFd;
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicU32, compiler_fence, fence, Ordering};
use std::task::{Context, Poll, Waker};
use crossbeam_queue::SegQueue;
use nix::sys::mman::munmap;
use crate::completion::Completion;
use crate::ctypes::{IORING_ENTER, IORING_SQ, io_uring_sqe, SQOffsets};
use crate::sqe::SQEs;
use crate::sqe::{SQE, SQEs};
use crate::syscall;
pub struct SQ {
@ -46,6 +50,10 @@ pub struct SQ {
sq_ptr: NonNull<()>,
sq_map_size: usize,
sqes_map_size: usize,
/// Queue of tasks waiting for a submission, either because they need free slots or because
waiters: SegQueue<Waker>,
submitter: Cell<Option<Waker>>,
}
static_assertions::assert_not_impl_any!(SQ: Send, Sync);
@ -125,6 +133,8 @@ impl SQ {
sq_ptr,
sq_map_size,
sqes_map_size,
waiters: SegQueue::new(),
submitter: Cell::new(None),
}
}
@ -169,9 +179,50 @@ impl SQ {
self.num_entries - self.used()
}
#[inline(always)]
fn to_submit(&self) -> u32 {
let shared_tail = self.array_tail.load(Ordering::Relaxed);
let cached_tail = *self.cached_tail();
cached_tail.wrapping_sub(shared_tail)
}
pub fn submit_wait(&self, fd: RawFd) -> io::Result<u32> {
// Ensure that the writes into the array are not moved after the write of the tail.
// Otherwise kernelside may read completely wrong indices from array.
compiler_fence(Ordering::Release);
self.array_tail.store(*self.cached_tail(), Ordering::Release);
let retval = syscall::enter(
fd,
self.num_entries,
1,
IORING_ENTER::GETEVENTS,
std::ptr::null(),
0,
)? as u32;
// Return SQE into circulation that we successfully submitted to the kernel.
self.increment_head(retval);
self.notify();
Ok(retval)
}
/// Submit all prepared entries to the kernel. This function will return the number of
/// entries successfully submitted to the kernel.
pub fn submit(&self, fd: RawFd) -> io::Result<u32> {
pub fn submit(&self, fd: RawFd, waker: Option<&Waker>) -> io::Result<u32> {
if let Some(waker) = waker {
let new = if let Some(old) = self.submitter.take() {
if old.will_wake(waker) { old } else { waker.clone() }
} else {
waker.clone()
};
self.submitter.set(Some(new));
}
// Ensure that the writes into the array are not moved after the write of the tail.
// Otherwise kernelside may read completely wrong indices from array.
compiler_fence(Ordering::Release);
self.array_tail.store(*self.cached_tail(), Ordering::Release);
let retval = syscall::enter(
fd,
self.num_entries,
@ -182,6 +233,7 @@ impl SQ {
)? as u32;
// Return SQE into circulation that we successfully submitted to the kernel.
self.increment_head(retval);
self.notify();
Ok(retval)
}
@ -204,7 +256,7 @@ impl SQ {
/// point to `count` entries in `self.sqes` starting at `start`. This allows actions to be
/// submitted to the kernel even when there are still reserved SQE in between that weren't yet
/// filled.
fn prepare(&self, start: u32, count: u32) {
pub fn prepare(&self, start: u32, count: u32) {
// Load the tail of the array (i.e. where we will start filling)
let tail = self.cached_tail();
let mut head = start;
@ -221,10 +273,59 @@ impl SQ {
head = head.wrapping_add(1);
*tail = (*tail).wrapping_add(1);
}
// Ensure that the writes into the array are not moved after the write of the tail.
// Otherwise kernelside may read completely wrong indices from array.
compiler_fence(Ordering::Release);
self.array_tail.store(*tail, Ordering::Release);
// FIXME: This should really be done by epoll
if let Some(waker) = self.submitter.take() {
waker.wake_by_ref();
self.submitter.set(Some(waker));
}
}
pub fn poll_prepare<'cx>(
self: Pin<&mut Self>,
ctx: &mut Context<'cx>,
count: u32,
prepare: impl for<'sq> FnOnce(SQEs<'sq>, &mut Context<'cx>) -> Completion
) -> Poll<Completion> {
if let Some(sqes) = self.try_reserve(count) {
let start = sqes.start();
let completion = prepare(sqes, ctx);
self.prepare(start, count);
Poll::Ready(completion)
} else {
self.waiters.push(ctx.waker().clone());
Poll::Pending
}
}
/// Suggest to submit pending events to the kernel. Returns `Ready` when the relevant event
/// was submitted to the kernel, i.e. when kernelside `head` >= the given `head`.
pub fn poll_submit(self: Pin<&mut Self>, ctx: &mut Context<'_>, fd: RawFd, head: u32)
-> Poll<()>
{
let shared_tail = self.array_tail.load(Ordering::Relaxed);
let cached_tail = *self.cached_tail();
let to_submit = cached_tail.wrapping_sub(shared_tail);
// TODO: Do some smart cookie thinking here and batch submissions in a sensible way
if to_submit > 4 {
self.submit(fd, None);
}
if *self.sqes_head() < head {
self.waiters.push(ctx.waker().clone());
Poll::Pending
} else {
Poll::Ready(())
}
}
pub fn notify(&self) {
if self.waiters.len() > 0 && self.available() > 0 {
while let Some(waker) = self.waiters.pop() {
waker.wake()
}
}
}
pub fn try_reserve(&self, count: u32) -> Option<SQEs<'_>> {
@ -273,7 +374,9 @@ mod tests {
sqes,
sq_ptr: NonNull::dangling(),
sq_map_size: 0,
sqes_map_size: 0
sqes_map_size: 0,
waiters: SegQueue::new(),
submitter: Cell::new(None),
})
}
}

View File

@ -1,7 +1,8 @@
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::os::unix::prelude::RawFd;
use std::slice::IterMut;
use crate::ctypes::{IORING_OP, IOSQE, io_uring_sqe};
use crate::ctypes::{IORING_OP, IOSQE, io_uring_sqe, SQEOpFlags};
#[derive(Debug)]
pub struct SQE<'iou> {
@ -37,6 +38,26 @@ impl<'iou> SQE<'iou> {
pub fn set_len(&mut self, len: i32) {
self.sqe.len = len;
}
#[inline(always)]
pub fn set_fd(&mut self, fd: RawFd) {
self.sqe.fd = fd;
}
#[inline(always)]
pub fn set_offset(&mut self, offset: u64) {
self.sqe.offset = offset;
}
#[inline(always)]
pub fn set_op_flags(&mut self, op_flags: SQEOpFlags) {
self.sqe.op_flags = op_flags;
}
pub fn prepare_cancel(&mut self, user_data: u64) {
self.set_opcode(IORING_OP::ASYNC_CANCEL);
self.set_address(user_data);
}
}
pub struct SQEs<'iou> {
@ -90,6 +111,10 @@ impl<'iou> SQEs<'iou> {
self.capacity
}
pub fn used(&self) -> u32 {
self.count
}
fn consume(&mut self) -> Option<SQE<'iou>> {
if self.count >= self.capacity {
None

View File

@ -0,0 +1,136 @@
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::cancellation::Cancellation;
use crate::completion::Completion;
use crate::io_uring::IoUring;
use crate::sq::SQ;
use crate::sqe::{SQE, SQEs};
pub struct Submission {
iouring: &'static IoUring,
state: State,
}
enum State {
Inert,
Prepared(u32, Completion),
Submitted(Completion),
Cancelled(u64),
Lost,
}
impl Submission {
pub fn new(iouring: &'static IoUring) -> Self {
Self { iouring, state: State::Inert }
}
fn split_pinned(self: Pin<&mut Self>) -> (Pin<&mut IoUring>, &mut State) {
unsafe {
let this = Pin::get_unchecked_mut(self);
let iouring = &mut *(this.iouring as *const _ as *mut _);
(Pin::new_unchecked(iouring), &mut this.state)
}
}
pub fn poll(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>,
count: u32,
prepare: impl for<'sq> FnOnce(&mut SQEs<'sq>) -> SQE<'sq>
) -> Poll<io::Result<i32>> {
match self.state {
State::Inert | State::Cancelled(_) => {
let head = crate::ready!(self.as_mut().poll_prepare(ctx, count, prepare));
crate::ready!(self.as_mut().poll_submit(ctx, head));
self.poll_complete(ctx)
},
State::Prepared(head, _) => {
crate::ready!(self.as_mut().poll_submit(ctx, head));
self.poll_complete(ctx)
},
State::Submitted(_) => self.poll_complete(ctx),
State::Lost => {
panic!("Ring in invalid state")
},
}
}
pub fn poll_prepare(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
count: u32,
prepare: impl for<'sq> FnOnce(&mut SQEs<'sq>) -> SQE<'sq>
) -> Poll<u32> {
let (sq, state) = self.split_pinned();
let mut head = 0u32;
let completion = match *state {
State::Inert => {
crate::ready!(sq.poll_prepare(ctx, count, |mut sqes, ctx| {
*state = State::Lost;
let mut sqe = prepare(&mut sqes);
let completion = Completion::new(ctx.waker().clone());
sqe.set_userdata(completion.addr());
head = sqes.used();
completion
}))
},
State::Cancelled(prev) => {
crate::ready!(sq.poll_prepare(ctx, count + 1, |mut sqes, ctx| {
*state = State::Lost;
sqes.soft_linked().next().unwrap().prepare_cancel(prev);
let mut sqe = prepare(&mut sqes);
let completion = Completion::new(ctx.waker().clone());
sqe.set_userdata(completion.addr());
head = sqes.used();
completion
}))
},
_ => unreachable!(),
};
*state = State::Prepared(head, completion);
Poll::Ready(head)
}
pub fn poll_submit(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
head: u32,
) -> Poll<()> {
let (iouring, state) = self.split_pinned();
match iouring.poll_submit(ctx, head) {
Poll::Ready(()) => {
match std::mem::replace(state, State::Lost) {
State::Prepared(_, completion) => {
*state = State::Submitted(completion);
},
_ => unreachable!(),
}
Poll::Ready(())
},
Poll::Pending => Poll::Pending,
}
}
pub fn poll_complete(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<io::Result<i32>> {
let (_, state) = self.split_pinned();
if let State::Submitted(completion) = std::mem::replace(state, State::Inert) {
match completion.check(ctx.waker()) {
Ok(result) => return Poll::Ready(result),
Err(completion) => {
*state = State::Submitted(completion)
}
}
}
Poll::Pending
}
}