diff --git a/runtime/asyncio/Cargo.toml b/runtime/asyncio/Cargo.toml index 524e614..55a9cac 100644 --- a/runtime/asyncio/Cargo.toml +++ b/runtime/asyncio/Cargo.toml @@ -13,4 +13,13 @@ nix = "0.23" bitflags = "1.3" ptr_meta = "0.1" -crossbeam-queue = "0.3" \ No newline at end of file +# SegQueue for task waiting on CQE or SQE being available again. +crossbeam-queue = "0.3" + +# AsyncRead, AsyncWrite, AsyncSeek and related traits +futures-io = "0.3" + +[dev-dependencies] +# As Mr. Torgue would put it: THE MOST EXTREME F*CKING ASYNC FUNCTION RUNNNER! EXPLOSIONS! +extreme = "666.666.666666" +futures-lite = "1.12" \ No newline at end of file diff --git a/runtime/asyncio/examples/future.rs b/runtime/asyncio/examples/future.rs new file mode 100644 index 0000000..94d3e98 --- /dev/null +++ b/runtime/asyncio/examples/future.rs @@ -0,0 +1,63 @@ +use std::fs::File; +use std::future::Future; +use std::io; +use std::os::unix::prelude::AsRawFd; +use std::sync::Arc; +use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; +use asyncio::ctypes::IORING_OP; +use asyncio::io_uring::IoUring; + +use futures_lite::io::AsyncReadExt; + +pub fn drive(iouring: &IoUring, mut f: impl Future>) -> io::Result { + static VTABLE: RawWakerVTable = RawWakerVTable::new( + |clone_me| unsafe { + let arc = Arc::from_raw(clone_me); + std::mem::forget(arc.clone()); + RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE) + }, + |wake_me| unsafe { Arc::from_raw(wake_me); }, + |wake_by_ref_me| unsafe {}, + |drop_me| unsafe { drop(Arc::from_raw(drop_me)) }, + ); + + let mut f = unsafe { std::pin::Pin::new_unchecked(&mut f) }; + let park = Arc::new(()); + let sender = Arc::into_raw(park.clone()); + let raw_waker = RawWaker::new(sender as *const _, &VTABLE); + let waker = unsafe { Waker::from_raw(raw_waker) }; + let mut cx = Context::from_waker(&waker); + + loop { + match f.as_mut().poll(&mut cx) { + Poll::Ready(t) => return t, + Poll::Pending => { + iouring.handle_completions(); + match iouring.submit_wait() { + Ok(_) => {} + Err(e) => return Err(e), + } + } + } + } +} + +fn main() { + let file = File::open("/tmp/poem").unwrap(); + let fd = file.as_raw_fd(); + + let mut ring: &'static IoUring = Box::leak(Box::new(IoUring::setup(4).unwrap())); + + let mut async_file = asyncio::fs::File::new(fd, ring); + + let mut buf = Box::new([0u8; 4096]); + + let f = async move { + let len = async_file.read(&mut buf[..]).await?; + println!("Read {} bytes:", len); + let str = unsafe { std::str::from_utf8_unchecked(&buf[..len]) }; + println!("{}", str); + Ok(()) + }; + drive(ring, f); +} \ No newline at end of file diff --git a/runtime/asyncio/examples/raw.rs b/runtime/asyncio/examples/raw.rs index c2a8993..0845fe7 100644 --- a/runtime/asyncio/examples/raw.rs +++ b/runtime/asyncio/examples/raw.rs @@ -1,13 +1,54 @@ use std::fs::File; +use std::os::unix::prelude::AsRawFd; use asyncio::ctypes::IORING_OP; use asyncio::io_uring::IoUring; + fn main() { - let file = File::open("") - let ring = IoUring::setup(64).unwrap(); - let cqes = ring.cqes(); - ring.try_prepare(1, |sqes| { - let sqe = sqes.next().unwrap(); - sqe.set_opcode(IORING_OP::READ) + let file = File::open("/tmp/poem").unwrap(); + let fd = file.as_raw_fd(); + + let ring = IoUring::setup(4).unwrap(); + let mut cqes = ring.cqes(); + + let buf = Box::new([0u8; 4096]); + ring.try_prepare(3, |mut sqes| { + let mut sqe = sqes.next().unwrap(); + sqe.set_opcode(IORING_OP::READ); + sqe.set_address(buf.as_ptr() as u64); + sqe.set_fd(fd); + sqe.set_len(4096); + + let mut sqe = sqes.next().unwrap(); + sqe.set_opcode(IORING_OP::NOP); + sqe.set_userdata(0xCAFEBABE); + + let mut sqe = sqes.next().unwrap(); + sqe.set_opcode(IORING_OP::NOP); + sqe.set_userdata(0xDEADBEEF); }).unwrap(); + let mut amt = 0; + while amt == 0 { + amt = ring.submit().unwrap(); + } + println!("{}", amt); + + for _ in 0..3 { + let mut cqe = None; + while cqe.is_none() { + cqe = cqes.next(); + } + let cqe = cqe.unwrap(); + println!("{:?}", cqe); + if cqe.user_data == 0xCAFEBABE { + println!("cafebabe"); + } else if cqe.user_data == 0xDEADBEEF { + println!("deadbeef"); + } + + if let Ok(len) = cqe.result() { + let out = unsafe { std::str::from_utf8_unchecked(&buf[0..len as usize]) }; + println!("{}", out); + } + } } \ No newline at end of file diff --git a/runtime/asyncio/poem b/runtime/asyncio/poem deleted file mode 100644 index de37932..0000000 --- a/runtime/asyncio/poem +++ /dev/null @@ -1,19 +0,0 @@ -Dunkel war’s, der Mond schien helle, -schneebedeckt die grüne Flur, -als ein Wagen blitzesschnelle, -langsam um die Ecke fuhr. - -Drinnen saßen stehend Leute, -schweigend ins Gespräch vertieft, -als ein totgeschoss’ner Hase -auf der Sandbank Schlittschuh lief. - -Und ein blondgelockter Jüngling -mit kohlrabenschwarzem Haar -saß auf einer grünen Kiste, -die rot angestrichen war. - -Neben ihm ’ne alte Schrulle, -zählte kaum erst sechzehn Jahr, -in der Hand ’ne Butterstulle, -die mit Schmalz bestrichen war. diff --git a/runtime/asyncio/src/completion.rs b/runtime/asyncio/src/completion.rs index 5fda033..637b3f7 100644 --- a/runtime/asyncio/src/completion.rs +++ b/runtime/asyncio/src/completion.rs @@ -5,23 +5,32 @@ use std::mem::ManuallyDrop; use std::task::Waker; use crate::cancellation::Cancellation; -pub struct Completion<'cx> { +// TODO: Completions for linked requests? How would you handle having multiple results? In one +// Completion struct or using multiple? If the latter, prepare needs to set user_data +// for all intermediary SQE explicitly. +pub struct Completion { state: ManuallyDrop>>, - marker: PhantomData &'cx ()>, } enum State { Submitted(Waker), - Completed(io::Result), + Completed(io::Result), Cancelled(Cancellation), Empty, } -impl<'cx> Completion<'cx> { +impl Completion { pub fn new(waker: Waker) -> Self { Self { state: ManuallyDrop::new(Box::new(Cell::new(State::Submitted(waker)))), - marker: PhantomData, + } + } + + pub(crate) unsafe fn from_raw(ptr: u64) -> Self { + let ptr = ptr as usize as *mut Cell; + let state = ManuallyDrop::new(Box::from_raw(ptr)); + Self { + state, } } @@ -29,7 +38,7 @@ impl<'cx> Completion<'cx> { self.state.as_ptr() as *const _ as usize as u64 } - pub fn check(self, waker: &Waker) -> Result, Self> { + pub fn check(self, waker: &Waker) -> Result, Self> { match self.state.replace(State::Empty) { State::Submitted(old_waker) => { // If the given waker wakes a different task than the one we were constructed @@ -59,7 +68,7 @@ impl<'cx> Completion<'cx> { } } - pub fn complete(self, result: io::Result) { + pub fn complete(self, result: io::Result) { match self.state.replace(State::Completed(result)) { State::Submitted(waker) => { waker.wake(); diff --git a/runtime/asyncio/src/cq.rs b/runtime/asyncio/src/cq.rs index 49a776f..1092b3c 100644 --- a/runtime/asyncio/src/cq.rs +++ b/runtime/asyncio/src/cq.rs @@ -1,8 +1,12 @@ use std::cell::UnsafeCell; use std::os::unix::prelude::RawFd; +use std::pin::Pin; use std::ptr::NonNull; -use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::atomic::{AtomicU32, compiler_fence, Ordering}; +use std::task::{Context, Poll, Waker}; +use crossbeam_queue::SegQueue; use nix::sys::mman::munmap; +use crate::completion::Completion; use crate::cqe::CQE; use crate::ctypes::{CQOffsets, IORING_CQ}; @@ -15,18 +19,24 @@ pub struct CQ { /// main problem that can happen otherwise is that the kernel assumes it lost completions /// which we already successfully pulled from the queue. head: &'static AtomicU32, - cached_head: UnsafeCell, /// Tail of the completion queue. Moved by the kernel when new completions are stored. /// /// Since this is modified by the kernel we should use atomic operations to read it, making /// sure both the kernel and any program have a consistent view of its contents. tail: &'static AtomicU32, + + /// A cached version of `tail` which additionally counts reserved slots for future + /// completions, i.e. slots that the kernel will fill in the future. + predicted_tail: UnsafeCell, + ring_mask: u32, num_entries: u32, flags: &'static AtomicU32, entries: &'static [CQE], + waiters: SegQueue, + // cq_ptr is set to `None` if we used a single mmap for both SQ and CQ. cq_ptr: *mut libc::c_void, cq_map_size: usize, @@ -58,8 +68,8 @@ impl CQ { let num_entries = *(ptr.offset(offs.ring_entries as isize).cast()); let head: &AtomicU32 = &*(ptr.offset(offs.head as isize).cast()); - let cached_head = UnsafeCell::new(head.load(Ordering::Acquire)); let tail: &AtomicU32 = &*(ptr.offset(offs.tail as isize).cast()); + let predicted_tail = UnsafeCell::new(head.load(Ordering::Acquire)); let flags: &AtomicU32 = &*(ptr.offset(offs.flags as isize).cast()); let entries = std::slice::from_raw_parts( ptr.offset(offs.cqes as isize).cast(), @@ -68,7 +78,7 @@ impl CQ { Self { head, - cached_head, + predicted_tail, tail, ring_mask, num_entries, @@ -76,18 +86,61 @@ impl CQ { entries, + waiters: SegQueue::new(), + // Only store a pointer if we used a separate mmap() syscall for the CQ cq_ptr: if split_mmap { ptr } else { std::ptr::null_mut() }, cq_map_size, } } + #[inline(always)] + fn predicted_tail(&self) -> &mut u32 { + unsafe { &mut *self.predicted_tail.get() } + } + + #[inline(always)] + /// Currently used + reserved slots + pub fn used(&self) -> u32 { + let tail = *self.predicted_tail(); + let head = self.head.load(Ordering::Relaxed); + compiler_fence(Ordering::Acquire); + tail.wrapping_sub(head) + } + + #[inline(always)] + /// Amount of available slots taking reservations into account. + pub fn available(&self) -> u32 { + self.num_entries - self.used() + } + + /// Try to reserve a number of CQ slots to make sure that + pub fn try_reserve(&self, count: u32) -> bool { + if self.available() >= count { + let tail = self.predicted_tail(); + *tail = (*tail).wrapping_add(count); + true + } else { + false + } + } + + pub fn poll_reserve(self: Pin<&mut Self>, ctx: &mut Context<'_>, count: u32) -> Poll<()> { + if self.available() >= count { + Poll::Ready(()) + } else { + self.waiters.push(ctx.waker().clone()); + Poll::Pending + } + } + pub fn get_next(&self) -> Option<&CQE> { - let tail = self.tail.load(Ordering::Acquire); - let head = self.head.load(Ordering::Acquire); + let tail = self.tail.load(Ordering::Relaxed); + let head = self.head.load(Ordering::Relaxed); if tail == head { None } else { + compiler_fence(Ordering::Acquire); self.head.fetch_add(1, Ordering::Release); let index = (head & self.ring_mask) as usize; Some(&self.entries[index]) @@ -95,8 +148,153 @@ impl CQ { } pub fn ready(&self) -> u32 { - let tail = self.tail.load(Ordering::Acquire); - let head = self.head.load(Ordering::Acquire); + let tail = self.tail.load(Ordering::Relaxed); + let head = self.head.load(Ordering::Relaxed); + compiler_fence(Ordering::Acquire); tail.wrapping_sub(head) } + + pub fn handle(&self, handler: impl Fn(&CQE)) { + let tail = self.tail.load(Ordering::Relaxed); + let head = self.head.load(Ordering::Relaxed); + + for i in head..tail { + let index = (i & self.ring_mask) as usize; + let cqe = &self.entries[index]; + handler(cqe); + } + + compiler_fence(Ordering::Acquire); + self.head.store(tail, Ordering::Release); + } + + #[cfg(test)] + fn test_insert_cqe(&self, cqe: impl Iterator) { + let head = self.head.load(Ordering::Relaxed); + let mut tail = self.tail.load(Ordering::Acquire); + unsafe { + for entry in cqe { + let index = (tail & self.ring_mask) as usize; + // Yes, this is absolutely not safe or defined behaviour in the first place. This + // function must *never* be used outside simple testing setups. + let ptr = &self.entries[index] as *const _ as *mut CQE; + ptr.write(entry); + tail += 1; + + // If we would overflow, crash instead + assert!((tail - head) <= self.num_entries, "test_insert_cqe overflowed the buffer"); + } + } + self.tail.store(tail, Ordering::Release); + } +} + +mod tests { + use std::sync::atomic::AtomicU64; + use super::*; + + fn gen_cq(num_entries: u32) -> CQ { + let head = Box::leak(Box::new(AtomicU32::new(0))); + let tail = Box::leak(Box::new(AtomicU32::new(0))); + let flags = Box::leak(Box::new(AtomicU32::new(0))); + let entries = Box::leak((0..num_entries).map(|_| CQE::default()).collect()); + + CQ { + head, + tail, + predicted_tail: UnsafeCell::new(0), + ring_mask: num_entries - 1, + num_entries, + flags, + entries, + cq_ptr: std::ptr::null_mut(), + cq_map_size: 0, + waiters: SegQueue::new(), + } + } + + #[test] + fn test_test_insert_cqe() { + let cq = gen_cq(4); + cq.test_insert_cqe([ + CQE { + user_data: 1, + .. Default::default() + }, + CQE { + user_data: 2, + .. Default::default() + }, + CQE { + user_data: 3, + .. Default::default() + }, + CQE { + user_data: 4, + .. Default::default() + }, + ].into_iter()); + println!("{:?}", cq.entries); + for i in 0..4 { + assert_eq!(cq.entries[i].user_data, (i+1) as u64); + } + } + + #[test] + #[should_panic] + fn test_test_insert_cqe_overflow() { + let cq = gen_cq(2); + cq.test_insert_cqe([ + CQE { + user_data: 1, + .. Default::default() + }, + CQE { + user_data: 2, + .. Default::default() + }, + CQE { + user_data: 3, + .. Default::default() + }, + CQE { + user_data: 4, + .. Default::default() + }, + ].into_iter()); + println!("{:?}", cq.entries); + } + + #[test] + fn test_cq_reserve_insert() { + let cq = gen_cq(4); + assert_eq!(cq.tail.load(Ordering::Relaxed), 0); + assert_eq!(cq.head.load(Ordering::Relaxed), 0); + assert_eq!(*cq.predicted_tail(), 0); + + cq.try_reserve(2); + assert_eq!(cq.tail.load(Ordering::Relaxed), 0); + assert_eq!(*cq.predicted_tail(), 2); + + cq.test_insert_cqe([ + CQE { + user_data: 1, + .. Default::default() + }, + CQE { + user_data: 2, + .. Default::default() + }, + ].into_iter()); + + assert_eq!(cq.head.load(Ordering::Relaxed), 0); + assert_eq!(cq.tail.load(Ordering::Relaxed), 2); + assert_eq!(*cq.predicted_tail(), 2); + + let mut o = AtomicU64::new(1); + cq.handle(|cqe| { + assert_eq!(cqe.user_data, o.fetch_add(1, Ordering::Relaxed)) + }); + assert_eq!(o.load(Ordering::Relaxed), 3); + } } \ No newline at end of file diff --git a/runtime/asyncio/src/cqe.rs b/runtime/asyncio/src/cqe.rs index a0810f4..f5c3e70 100644 --- a/runtime/asyncio/src/cqe.rs +++ b/runtime/asyncio/src/cqe.rs @@ -9,7 +9,7 @@ use crate::io_uring::{IoUring}; /// Completion Queue Event pub struct CQE { pub user_data: u64, - res: i32, + pub(crate) res: i32, pub flags: IOCQE, } diff --git a/runtime/asyncio/src/fs.rs b/runtime/asyncio/src/fs.rs new file mode 100644 index 0000000..758d2e4 --- /dev/null +++ b/runtime/asyncio/src/fs.rs @@ -0,0 +1,47 @@ +use std::cell::Cell; +use std::io::IoSliceMut; +use std::os::unix::prelude::RawFd; +use std::pin::Pin; +use std::task::{Context, Poll}; +use futures_io::AsyncRead; +use crate::completion::Completion; +use crate::ctypes::IORING_OP; +use crate::io_uring::IoUring; +use crate::sqe::{SQE, SQEs}; +use crate::submission::Submission; + +pub struct File { + fd: RawFd, + submission: Submission, +} + +impl File { + pub fn new(fd: RawFd, io_uring: &'static IoUring) -> Self { + Self { fd, submission: Submission::new(io_uring) } + } + + fn prepare_read<'sq>( + fd: RawFd, + buf: &mut [u8], + sqes: &mut SQEs<'sq>, + ) -> SQE<'sq> + { + let mut sqe = sqes.next().expect("prepare_read requires at least one SQE"); + sqe.set_opcode(IORING_OP::READ); + sqe.set_address(buf.as_ptr() as u64); + sqe.set_fd(fd); + sqe.set_len(buf.len() as i32); + sqe + } +} + +impl AsyncRead for File { + fn poll_read(mut self: Pin<&mut Self>, ctx: &mut Context<'_>, buf: &mut [u8]) + -> Poll> + { + let fd = self.fd; + Pin::new(&mut self.submission).poll(ctx, 1, |sqes| { + Self::prepare_read(fd, buf, sqes) + }).map(|res| res.map(|val| val as usize)) + } +} \ No newline at end of file diff --git a/runtime/asyncio/src/io_uring.rs b/runtime/asyncio/src/io_uring.rs index 051224b..9a37ce0 100644 --- a/runtime/asyncio/src/io_uring.rs +++ b/runtime/asyncio/src/io_uring.rs @@ -1,20 +1,22 @@ use std::fmt::{Debug, Formatter}; +use std::future::Future; use std::io; use std::marker::PhantomData; use std::mem::{size_of, align_of}; +use std::ops::Deref; use std::sync::atomic::{AtomicU32, Ordering}; use std::os::unix::prelude::RawFd; use std::pin::Pin; use std::ptr::NonNull; -use std::task::{Context, Poll, Waker}; -use crossbeam_queue::SegQueue; +use std::sync::Arc; +use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; use nix::sys::{mman, mman::{MapFlags, ProtFlags}}; use crate::completion::Completion; use crate::cq::CQ; use crate::cqe::{CQE, CQEs}; use crate::ctypes::{CQOffsets, IORING_ENTER, SQOffsets}; use crate::sq::SQ; -use crate::sqe::SQEs; +use crate::sqe::{SQE, SQEs}; use super::ctypes::{Params, io_uring_sqe, IORING_CQ, IORING_FEAT, IORING_OFF_CQ_RING, IORING_OFF_SQ_RING, IORING_OFF_SQES, IORING_SQ}; use super::syscall; @@ -25,8 +27,6 @@ pub struct IoUring { params: Params, sq: SQ, cq: CQ, - - waiting: SegQueue<(u32, Waker)>, } unsafe fn mmap(map_size: usize, fd: RawFd, offset: i64) -> nix::Result<*mut libc::c_void> { @@ -49,7 +49,6 @@ impl IoUring { (params.sq_entries as usize) * size_of::(); let mut cq_map_size = (params.cq_off.cqes as usize) + (params.cq_entries as usize) * size_of::(); - println!("{:?} {}", params.sq_off, sq_map_size); // If we can use a single mmap() syscall to map sq, cq and cqe the size of the total map // is the largest of `sq_map_size` and `cq_map_size`. @@ -58,7 +57,6 @@ impl IoUring { cq_map_size = sq_map_size; } - println!("{:?}", params.cq_off); let sq_ptr = unsafe { mmap(sq_map_size as usize, fd, IORING_OFF_SQ_RING as i64)? }; @@ -99,29 +97,72 @@ impl IoUring { params, sq, cq, - waiting: SegQueue::new(), }) } - pub fn try_prepare<'cx>( + pub fn try_prepare( &self, count: u32, prepare: impl FnOnce(SQEs<'_>) ) -> Option<()> { - // TODO: Lock the required amount of slots on both submission and completion queue, then - // construct the sqes. + self.handle_completions(); + if !self.cq.try_reserve(count) { + return None; + } + if let Some(sqes) = self.sq.try_reserve(count) { - Some(prepare(sqes)) + let start = sqes.start(); + prepare(sqes); + self.sq.prepare(start, count); + Some(()) } else { None } } - pub fn submit(&self) -> io::Result { - self.sq.submit(self.fd) + pub fn poll_prepare<'cx>( + mut self: Pin<&mut Self>, + ctx: &mut Context<'cx>, + count: u32, + prepare: impl for<'sq> FnOnce(SQEs<'sq>, &mut Context<'cx>) -> Completion + ) -> Poll { + Pin::new(&mut self.sq).poll_prepare(ctx, count, prepare) } - pub fn cqes(&self) -> CQEs { - CQEs::new(&self.cq) + pub fn poll_submit<'cx>( + mut self: Pin<&mut Self>, + ctx: &mut Context<'cx>, + head: u32, + ) -> Poll<()> { + let fd = self.fd; + Pin::new(&mut self.sq).poll_submit(ctx, fd, head) + } + + pub fn submit_wait(&self) -> io::Result { + self.sq.submit_wait(self.fd) + } + + pub fn handle_completions(&self) { + self.cq.handle(|cqe| { + let udata = cqe.user_data; + if udata != 0 { + let completion = unsafe { + Completion::from_raw(udata) + }; + completion.complete(cqe.result()) + } + }); + } +} + +impl Future for &IoUring { + type Output = io::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.handle_completions(); + match self.sq.submit(self.fd, Some(cx.waker())) { + Ok(_) => Poll::Pending, + Err(e) => Poll::Ready(Err(e)), + } } } \ No newline at end of file diff --git a/runtime/asyncio/src/lib.rs b/runtime/asyncio/src/lib.rs index 04624dc..2cc161d 100644 --- a/runtime/asyncio/src/lib.rs +++ b/runtime/asyncio/src/lib.rs @@ -9,9 +9,11 @@ mod sqe; mod cq; mod cqe; +mod submission; mod completion; mod cancellation; +pub mod fs; #[macro_export] macro_rules! ready { diff --git a/runtime/asyncio/src/sq.rs b/runtime/asyncio/src/sq.rs index 82969d2..27c2474 100644 --- a/runtime/asyncio/src/sq.rs +++ b/runtime/asyncio/src/sq.rs @@ -1,13 +1,17 @@ -use std::cell::UnsafeCell; +use std::cell::{Cell, UnsafeCell}; use std::fmt::{Debug, Formatter}; use std::io; use std::mem::ManuallyDrop; use std::os::unix::prelude::RawFd; +use std::pin::Pin; use std::ptr::NonNull; use std::sync::atomic::{AtomicU32, compiler_fence, fence, Ordering}; +use std::task::{Context, Poll, Waker}; +use crossbeam_queue::SegQueue; use nix::sys::mman::munmap; +use crate::completion::Completion; use crate::ctypes::{IORING_ENTER, IORING_SQ, io_uring_sqe, SQOffsets}; -use crate::sqe::SQEs; +use crate::sqe::{SQE, SQEs}; use crate::syscall; pub struct SQ { @@ -46,6 +50,10 @@ pub struct SQ { sq_ptr: NonNull<()>, sq_map_size: usize, sqes_map_size: usize, + + /// Queue of tasks waiting for a submission, either because they need free slots or because + waiters: SegQueue, + submitter: Cell>, } static_assertions::assert_not_impl_any!(SQ: Send, Sync); @@ -125,6 +133,8 @@ impl SQ { sq_ptr, sq_map_size, sqes_map_size, + waiters: SegQueue::new(), + submitter: Cell::new(None), } } @@ -169,9 +179,50 @@ impl SQ { self.num_entries - self.used() } + #[inline(always)] + fn to_submit(&self) -> u32 { + let shared_tail = self.array_tail.load(Ordering::Relaxed); + let cached_tail = *self.cached_tail(); + cached_tail.wrapping_sub(shared_tail) + } + + pub fn submit_wait(&self, fd: RawFd) -> io::Result { + // Ensure that the writes into the array are not moved after the write of the tail. + // Otherwise kernelside may read completely wrong indices from array. + compiler_fence(Ordering::Release); + self.array_tail.store(*self.cached_tail(), Ordering::Release); + + let retval = syscall::enter( + fd, + self.num_entries, + 1, + IORING_ENTER::GETEVENTS, + std::ptr::null(), + 0, + )? as u32; + // Return SQE into circulation that we successfully submitted to the kernel. + self.increment_head(retval); + self.notify(); + Ok(retval) + } + /// Submit all prepared entries to the kernel. This function will return the number of /// entries successfully submitted to the kernel. - pub fn submit(&self, fd: RawFd) -> io::Result { + pub fn submit(&self, fd: RawFd, waker: Option<&Waker>) -> io::Result { + if let Some(waker) = waker { + let new = if let Some(old) = self.submitter.take() { + if old.will_wake(waker) { old } else { waker.clone() } + } else { + waker.clone() + }; + self.submitter.set(Some(new)); + } + + // Ensure that the writes into the array are not moved after the write of the tail. + // Otherwise kernelside may read completely wrong indices from array. + compiler_fence(Ordering::Release); + self.array_tail.store(*self.cached_tail(), Ordering::Release); + let retval = syscall::enter( fd, self.num_entries, @@ -182,6 +233,7 @@ impl SQ { )? as u32; // Return SQE into circulation that we successfully submitted to the kernel. self.increment_head(retval); + self.notify(); Ok(retval) } @@ -204,7 +256,7 @@ impl SQ { /// point to `count` entries in `self.sqes` starting at `start`. This allows actions to be /// submitted to the kernel even when there are still reserved SQE in between that weren't yet /// filled. - fn prepare(&self, start: u32, count: u32) { + pub fn prepare(&self, start: u32, count: u32) { // Load the tail of the array (i.e. where we will start filling) let tail = self.cached_tail(); let mut head = start; @@ -221,10 +273,59 @@ impl SQ { head = head.wrapping_add(1); *tail = (*tail).wrapping_add(1); } - // Ensure that the writes into the array are not moved after the write of the tail. - // Otherwise kernelside may read completely wrong indices from array. - compiler_fence(Ordering::Release); - self.array_tail.store(*tail, Ordering::Release); + + // FIXME: This should really be done by epoll + if let Some(waker) = self.submitter.take() { + waker.wake_by_ref(); + self.submitter.set(Some(waker)); + } + } + + pub fn poll_prepare<'cx>( + self: Pin<&mut Self>, + ctx: &mut Context<'cx>, + count: u32, + prepare: impl for<'sq> FnOnce(SQEs<'sq>, &mut Context<'cx>) -> Completion + ) -> Poll { + if let Some(sqes) = self.try_reserve(count) { + let start = sqes.start(); + let completion = prepare(sqes, ctx); + self.prepare(start, count); + Poll::Ready(completion) + } else { + self.waiters.push(ctx.waker().clone()); + Poll::Pending + } + } + + /// Suggest to submit pending events to the kernel. Returns `Ready` when the relevant event + /// was submitted to the kernel, i.e. when kernelside `head` >= the given `head`. + pub fn poll_submit(self: Pin<&mut Self>, ctx: &mut Context<'_>, fd: RawFd, head: u32) + -> Poll<()> + { + let shared_tail = self.array_tail.load(Ordering::Relaxed); + let cached_tail = *self.cached_tail(); + let to_submit = cached_tail.wrapping_sub(shared_tail); + + // TODO: Do some smart cookie thinking here and batch submissions in a sensible way + if to_submit > 4 { + self.submit(fd, None); + } + + if *self.sqes_head() < head { + self.waiters.push(ctx.waker().clone()); + Poll::Pending + } else { + Poll::Ready(()) + } + } + + pub fn notify(&self) { + if self.waiters.len() > 0 && self.available() > 0 { + while let Some(waker) = self.waiters.pop() { + waker.wake() + } + } } pub fn try_reserve(&self, count: u32) -> Option> { @@ -273,7 +374,9 @@ mod tests { sqes, sq_ptr: NonNull::dangling(), sq_map_size: 0, - sqes_map_size: 0 + sqes_map_size: 0, + waiters: SegQueue::new(), + submitter: Cell::new(None), }) } } diff --git a/runtime/asyncio/src/sqe.rs b/runtime/asyncio/src/sqe.rs index 42518f0..af35993 100644 --- a/runtime/asyncio/src/sqe.rs +++ b/runtime/asyncio/src/sqe.rs @@ -1,7 +1,8 @@ use std::cell::UnsafeCell; use std::ops::{Deref, DerefMut}; +use std::os::unix::prelude::RawFd; use std::slice::IterMut; -use crate::ctypes::{IORING_OP, IOSQE, io_uring_sqe}; +use crate::ctypes::{IORING_OP, IOSQE, io_uring_sqe, SQEOpFlags}; #[derive(Debug)] pub struct SQE<'iou> { @@ -37,6 +38,26 @@ impl<'iou> SQE<'iou> { pub fn set_len(&mut self, len: i32) { self.sqe.len = len; } + + #[inline(always)] + pub fn set_fd(&mut self, fd: RawFd) { + self.sqe.fd = fd; + } + + #[inline(always)] + pub fn set_offset(&mut self, offset: u64) { + self.sqe.offset = offset; + } + + #[inline(always)] + pub fn set_op_flags(&mut self, op_flags: SQEOpFlags) { + self.sqe.op_flags = op_flags; + } + + pub fn prepare_cancel(&mut self, user_data: u64) { + self.set_opcode(IORING_OP::ASYNC_CANCEL); + self.set_address(user_data); + } } pub struct SQEs<'iou> { @@ -90,6 +111,10 @@ impl<'iou> SQEs<'iou> { self.capacity } + pub fn used(&self) -> u32 { + self.count + } + fn consume(&mut self) -> Option> { if self.count >= self.capacity { None diff --git a/runtime/asyncio/src/submission.rs b/runtime/asyncio/src/submission.rs new file mode 100644 index 0000000..76fad24 --- /dev/null +++ b/runtime/asyncio/src/submission.rs @@ -0,0 +1,136 @@ +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use crate::cancellation::Cancellation; +use crate::completion::Completion; +use crate::io_uring::IoUring; +use crate::sq::SQ; +use crate::sqe::{SQE, SQEs}; + +pub struct Submission { + iouring: &'static IoUring, + state: State, +} + +enum State { + Inert, + Prepared(u32, Completion), + Submitted(Completion), + Cancelled(u64), + Lost, +} + +impl Submission { + pub fn new(iouring: &'static IoUring) -> Self { + Self { iouring, state: State::Inert } + } + + fn split_pinned(self: Pin<&mut Self>) -> (Pin<&mut IoUring>, &mut State) { + unsafe { + let this = Pin::get_unchecked_mut(self); + let iouring = &mut *(this.iouring as *const _ as *mut _); + (Pin::new_unchecked(iouring), &mut this.state) + } + } + + pub fn poll( + mut self: Pin<&mut Self>, + ctx: &mut Context<'_>, + count: u32, + prepare: impl for<'sq> FnOnce(&mut SQEs<'sq>) -> SQE<'sq> + ) -> Poll> { + match self.state { + State::Inert | State::Cancelled(_) => { + let head = crate::ready!(self.as_mut().poll_prepare(ctx, count, prepare)); + crate::ready!(self.as_mut().poll_submit(ctx, head)); + self.poll_complete(ctx) + }, + State::Prepared(head, _) => { + crate::ready!(self.as_mut().poll_submit(ctx, head)); + self.poll_complete(ctx) + }, + State::Submitted(_) => self.poll_complete(ctx), + State::Lost => { + panic!("Ring in invalid state") + }, + } + } + + pub fn poll_prepare( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + count: u32, + prepare: impl for<'sq> FnOnce(&mut SQEs<'sq>) -> SQE<'sq> + ) -> Poll { + let (sq, state) = self.split_pinned(); + let mut head = 0u32; + let completion = match *state { + State::Inert => { + crate::ready!(sq.poll_prepare(ctx, count, |mut sqes, ctx| { + *state = State::Lost; + + let mut sqe = prepare(&mut sqes); + let completion = Completion::new(ctx.waker().clone()); + sqe.set_userdata(completion.addr()); + + head = sqes.used(); + completion + })) + }, + State::Cancelled(prev) => { + crate::ready!(sq.poll_prepare(ctx, count + 1, |mut sqes, ctx| { + *state = State::Lost; + + sqes.soft_linked().next().unwrap().prepare_cancel(prev); + + let mut sqe = prepare(&mut sqes); + let completion = Completion::new(ctx.waker().clone()); + sqe.set_userdata(completion.addr()); + + head = sqes.used(); + completion + })) + }, + _ => unreachable!(), + }; + *state = State::Prepared(head, completion); + Poll::Ready(head) + } + + pub fn poll_submit( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + head: u32, + ) -> Poll<()> { + let (iouring, state) = self.split_pinned(); + match iouring.poll_submit(ctx, head) { + Poll::Ready(()) => { + match std::mem::replace(state, State::Lost) { + State::Prepared(_, completion) => { + *state = State::Submitted(completion); + }, + _ => unreachable!(), + } + Poll::Ready(()) + }, + Poll::Pending => Poll::Pending, + } + } + + pub fn poll_complete( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + let (_, state) = self.split_pinned(); + if let State::Submitted(completion) = std::mem::replace(state, State::Inert) { + match completion.check(ctx.waker()) { + Ok(result) => return Poll::Ready(result), + Err(completion) => { + *state = State::Submitted(completion) + } + } + } + + Poll::Pending + } +} \ No newline at end of file