This commit is contained in:
Nadja Reitzenstein 2021-11-24 02:44:34 +01:00
parent 55d6609e33
commit ec78aa6fc9
56 changed files with 8706 additions and 585 deletions

View File

@ -0,0 +1,16 @@
[package]
name = "asyncio"
version = "0.1.0"
edition = "2021"
description = "io_uring-first async I/O implementation"
readme = "README.md"
publish = false
[dependencies]
static_assertions = "1.1"
libc = "0.2"
nix = "0.23"
bitflags = "1.3"
ptr_meta = "0.1"
crossbeam-queue = "0.3"

View File

@ -0,0 +1,13 @@
use std::fs::File;
use asyncio::ctypes::IORING_OP;
use asyncio::io_uring::IoUring;
fn main() {
let file = File::open("")
let ring = IoUring::setup(64).unwrap();
let cqes = ring.cqes();
ring.try_prepare(1, |sqes| {
let sqe = sqes.next().unwrap();
sqe.set_opcode(IORING_OP::READ)
}).unwrap();
}

1768
runtime/asyncio/gen.rs Normal file

File diff suppressed because it is too large Load Diff

3040
runtime/asyncio/gen2.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1 @@
#include <linux/io_uring.h>

19
runtime/asyncio/poem Normal file
View File

@ -0,0 +1,19 @@
Dunkel wars, der Mond schien helle,
schneebedeckt die grüne Flur,
als ein Wagen blitzesschnelle,
langsam um die Ecke fuhr.
Drinnen saßen stehend Leute,
schweigend ins Gespräch vertieft,
als ein totgeschossner Hase
auf der Sandbank Schlittschuh lief.
Und ein blondgelockter Jüngling
mit kohlrabenschwarzem Haar
saß auf einer grünen Kiste,
die rot angestrichen war.
Neben ihm ne alte Schrulle,
zählte kaum erst sechzehn Jahr,
in der Hand ne Butterstulle,
die mit Schmalz bestrichen war.

View File

@ -0,0 +1,149 @@
use std::ptr;
use std::any::Any;
use std::ffi::CString;
use ptr_meta::DynMetadata;
/// Cancellation callback to clean up I/O resources
///
/// This allows IO actions to properly cancel and have their resources cleaned up without having to
/// worry about the current state of the io_uring queues.
pub struct Cancellation {
data: *mut (),
metadata: usize,
drop: unsafe fn (*mut (), usize),
}
pub unsafe trait Cancel {
fn into_raw(self) -> (*mut (), usize);
unsafe fn drop_raw(ptr: *mut (), metadata: usize);
}
pub unsafe trait CancelNarrow {
fn into_narrow_raw(self) -> *mut ();
unsafe fn drop_narrow_raw(ptr: *mut ());
}
unsafe impl<T: CancelNarrow> Cancel for T {
fn into_raw(self) -> (*mut (), usize) {
(T::into_narrow_raw(self), 0)
}
unsafe fn drop_raw(ptr: *mut (), _: usize) {
T::drop_narrow_raw(ptr)
}
}
unsafe impl<T> CancelNarrow for Box<T> {
fn into_narrow_raw(self) -> *mut () {
Box::into_raw(self) as *mut ()
}
unsafe fn drop_narrow_raw(ptr: *mut ()) {
drop(Box::from_raw(ptr))
}
}
unsafe impl<T> Cancel for Box<[T]> {
fn into_raw(self) -> (*mut (), usize) {
let len = self.len();
(Box::into_raw(self) as *mut (), len)
}
unsafe fn drop_raw(ptr: *mut (), metadata: usize) {
drop(Vec::from_raw_parts(ptr, metadata, metadata))
}
}
// Cancel impl for panics
unsafe impl Cancel for Box<dyn Any + Send + Sync> {
fn into_raw(self) -> (*mut (), usize) {
let ptr = Box::into_raw(self);
let metadata = ptr_meta::metadata(ptr as *mut dyn Any);
let metadata = unsafe {
// SAFETY: None. I happen to know that metadata is always exactly `usize`-sized for this
// type but only `std` can guarantee it.
std::mem::transmute::<_, usize>(metadata)
};
(ptr as *mut(), metadata)
}
unsafe fn drop_raw(ptr: *mut (), metadata: usize) {
let boxed: Box<dyn Any> = unsafe {
let metadata =
// SAFETY: We did it the other way around so this is safe if the previous step was.
std::mem::transmute::<_, DynMetadata<dyn Any>>(metadata);
// We can then (safely) construct a fat pointer from the metadata and data address
let ptr = ptr_meta::from_raw_parts_mut(ptr, metadata);
// SAFETY: We know the pointer is valid since `Self::into_raw` took ownership and the
// vtable was extracted from this known good reference.
Box::from_raw(ptr)
};
drop(boxed)
}
}
unsafe impl CancelNarrow for CString {
fn into_narrow_raw(self) -> *mut () {
self.into_raw() as *mut ()
}
unsafe fn drop_narrow_raw(ptr: *mut ()) {
drop(CString::from_raw(ptr as *mut libc::c_char));
}
}
unsafe impl CancelNarrow for () {
fn into_narrow_raw(self) -> *mut () {
ptr::null_mut()
}
unsafe fn drop_narrow_raw(_: *mut ()) {}
}
unsafe impl<T, F> Cancel for (T, F)
where T: CancelNarrow,
F: CancelNarrow,
{
fn into_raw(self) -> (*mut (), usize) {
let (t, f) = self;
let (t, _) = t.into_raw();
let (f, _) = f.into_raw();
(t, f as usize)
}
unsafe fn drop_raw(t: *mut (), f: usize) {
T::drop_raw(t, 0);
F::drop_raw(f as *mut (), 0);
}
}
impl Cancellation {
pub fn new<T: Cancel>(cancel: T) -> Self {
let (data, metadata) = cancel.into_raw();
Self { data, metadata, drop: T::drop_raw }
}
}
impl Drop for Cancellation {
fn drop(&mut self) {
unsafe {
(self.drop)(self.data, self.metadata)
}
}
}
impl<T: Cancel> From<T> for Cancellation {
fn from(cancel: T) -> Self {
Cancellation::new(cancel)
}
}
impl<T> From<Option<T>> for Cancellation
where Cancellation: From<T>
{
fn from(option: Option<T>) -> Self {
option.map_or(Cancellation::new(()), Cancellation::from)
}
}

View File

@ -0,0 +1,73 @@
use std::cell::Cell;
use std::io;
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use std::task::Waker;
use crate::cancellation::Cancellation;
pub struct Completion<'cx> {
state: ManuallyDrop<Box<Cell<State>>>,
marker: PhantomData<fn(&'cx ()) -> &'cx ()>,
}
enum State {
Submitted(Waker),
Completed(io::Result<u32>),
Cancelled(Cancellation),
Empty,
}
impl<'cx> Completion<'cx> {
pub fn new(waker: Waker) -> Self {
Self {
state: ManuallyDrop::new(Box::new(Cell::new(State::Submitted(waker)))),
marker: PhantomData,
}
}
pub fn addr(&self) -> u64 {
self.state.as_ptr() as *const _ as usize as u64
}
pub fn check(self, waker: &Waker) -> Result<io::Result<u32>, Self> {
match self.state.replace(State::Empty) {
State::Submitted(old_waker) => {
// If the given waker wakes a different task than the one we were constructed
// with we must replace our waker.
if !old_waker.will_wake(waker) {
self.state.replace(State::Submitted(waker.clone()));
} else {
self.state.replace(State::Submitted(old_waker));
}
Err(self)
},
State::Completed(result) => {
Ok(result)
},
_ => unreachable!(),
}
}
pub fn cancel(self, callback: Cancellation) {
match self.state.replace(State::Cancelled(callback)) {
State::Completed(_) => {
drop(self.state);
},
State::Submitted(_) => {
},
_ => unreachable!(),
}
}
pub fn complete(self, result: io::Result<u32>) {
match self.state.replace(State::Completed(result)) {
State::Submitted(waker) => {
waker.wake();
},
State::Cancelled(callback) => {
drop(callback);
},
_ => unreachable!(),
}
}
}

102
runtime/asyncio/src/cq.rs Normal file
View File

@ -0,0 +1,102 @@
use std::cell::UnsafeCell;
use std::os::unix::prelude::RawFd;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicU32, Ordering};
use nix::sys::mman::munmap;
use crate::cqe::CQE;
use crate::ctypes::{CQOffsets, IORING_CQ};
#[derive(Debug)]
pub struct CQ {
/// Head of the completion queue. Moved by the program to indicate that it has consumed
/// completions.
///
/// While it's important that the kernel sees the same value as the userspace program the
/// main problem that can happen otherwise is that the kernel assumes it lost completions
/// which we already successfully pulled from the queue.
head: &'static AtomicU32,
cached_head: UnsafeCell<u32>,
/// Tail of the completion queue. Moved by the kernel when new completions are stored.
///
/// Since this is modified by the kernel we should use atomic operations to read it, making
/// sure both the kernel and any program have a consistent view of its contents.
tail: &'static AtomicU32,
ring_mask: u32,
num_entries: u32,
flags: &'static AtomicU32,
entries: &'static [CQE],
// cq_ptr is set to `None` if we used a single mmap for both SQ and CQ.
cq_ptr: *mut libc::c_void,
cq_map_size: usize,
}
impl Drop for CQ {
fn drop(&mut self) {
if !self.cq_ptr.is_null() {
unsafe { munmap(self.cq_ptr, self.cq_map_size) };
}
}
}
impl CQ {
pub unsafe fn new(ptr: *mut libc::c_void,
offs: CQOffsets,
cq_entries: u32,
split_mmap: bool,
cq_map_size: usize,
) -> Self {
// Sanity check the pointer and offsets. If these fail we were probably passed an
// offsets from an uninitialized parameter struct.
assert!(!ptr.is_null());
assert_ne!(offs.head, offs.tail);
// Eagerly extract static values. Since they won't ever change again there's no reason to
// not read them now.
let ring_mask = *(ptr.offset(offs.ring_mask as isize).cast());
let num_entries = *(ptr.offset(offs.ring_entries as isize).cast());
let head: &AtomicU32 = &*(ptr.offset(offs.head as isize).cast());
let cached_head = UnsafeCell::new(head.load(Ordering::Acquire));
let tail: &AtomicU32 = &*(ptr.offset(offs.tail as isize).cast());
let flags: &AtomicU32 = &*(ptr.offset(offs.flags as isize).cast());
let entries = std::slice::from_raw_parts(
ptr.offset(offs.cqes as isize).cast(),
cq_entries as usize
);
Self {
head,
cached_head,
tail,
ring_mask,
num_entries,
flags,
entries,
// Only store a pointer if we used a separate mmap() syscall for the CQ
cq_ptr: if split_mmap { ptr } else { std::ptr::null_mut() },
cq_map_size,
}
}
pub fn get_next(&self) -> Option<&CQE> {
let tail = self.tail.load(Ordering::Acquire);
let head = self.head.load(Ordering::Acquire);
if tail == head {
None
} else {
self.head.fetch_add(1, Ordering::Release);
let index = (head & self.ring_mask) as usize;
Some(&self.entries[index])
}
}
pub fn ready(&self) -> u32 {
let tail = self.tail.load(Ordering::Acquire);
let head = self.head.load(Ordering::Acquire);
tail.wrapping_sub(head)
}
}

137
runtime/asyncio/src/cqe.rs Normal file
View File

@ -0,0 +1,137 @@
use std::io;
use std::ptr::NonNull;
use std::sync::atomic::Ordering;
use crate::cq::CQ;
use crate::io_uring::{IoUring};
#[repr(C)]
#[derive(Debug, PartialEq, Eq, Copy, Clone, Default)]
/// Completion Queue Event
pub struct CQE {
pub user_data: u64,
res: i32,
pub flags: IOCQE,
}
impl CQE {
pub fn raw_result(&self) -> i32 {
self.res
}
pub fn result(&self) -> io::Result<i32> {
if self.res < 0 {
let err = io::Error::from_raw_os_error(-self.res);
Err(err)
} else {
Ok(self.res)
}
}
}
pub struct CQEs<'a> {
cq: &'a CQ,
ready: u32,
}
impl<'a> CQEs<'a> {
pub fn new(cq: &'a CQ) -> Self {
Self { cq, ready: 0 }
}
fn get(&mut self) -> Option<CQE> {
self.cq.get_next().map(|cqe| *cqe)
}
fn ready(&mut self) -> u32 {
self.cq.ready()
}
}
impl<'a> Iterator for CQEs<'a> {
type Item = CQE;
fn next(&mut self) -> Option<Self::Item> {
if self.ready == 0 {
self.ready = self.ready();
if self.ready == 0 {
return None;
}
}
self.ready -= 1;
self.get()
}
}
bitflags::bitflags! {
#[derive(Default)]
#[repr(C)]
pub struct IOCQE: u32 {
const F_BUFFER = 1;
const F_MORE = 1 << 1;
}
}
static_assertions::assert_eq_size!(u32, IOCQE);
mod tests {
use super::*;
#[test]
fn test_result_into_std() {
let cqe = CQE { res: 0, .. Default::default() };
assert_eq!(cqe.result().unwrap(), 0);
let cqe = CQE { res: 42567, .. Default::default() };
assert_eq!(cqe.result().unwrap(), 42567);
let cqe = CQE { res: -32, .. Default::default() };
assert_eq!(cqe.result().unwrap_err().kind(), io::ErrorKind::BrokenPipe);
let cqe = CQE { res: -2, .. Default::default() };
assert_eq!(cqe.result().unwrap_err().kind(), io::ErrorKind::NotFound);
}
#[test]
fn test_layout_io_uring_cqe() {
assert_eq!(
::std::mem::size_of::<CQE>(),
16usize,
concat!("Size of: ", stringify!(io_uring_cqe))
);
assert_eq!(
::std::mem::align_of::<CQE>(),
8usize,
concat!("Alignment of ", stringify!(io_uring_cqe))
);
assert_eq!(
unsafe { &(*(::std::ptr::null::<CQE>())).user_data as *const _ as usize },
0usize,
concat!(
"Offset of field: ",
stringify!(io_uring_cqe),
"::",
stringify!(user_data)
)
);
assert_eq!(
unsafe { &(*(::std::ptr::null::<CQE>())).res as *const _ as usize },
8usize,
concat!(
"Offset of field: ",
stringify!(io_uring_cqe),
"::",
stringify!(res)
)
);
assert_eq!(
unsafe { &(*(::std::ptr::null::<CQE>())).flags as *const _ as usize },
12usize,
concat!(
"Offset of field: ",
stringify!(io_uring_cqe),
"::",
stringify!(flags)
)
);
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,127 @@
use std::fmt::{Debug, Formatter};
use std::io;
use std::marker::PhantomData;
use std::mem::{size_of, align_of};
use std::sync::atomic::{AtomicU32, Ordering};
use std::os::unix::prelude::RawFd;
use std::pin::Pin;
use std::ptr::NonNull;
use std::task::{Context, Poll, Waker};
use crossbeam_queue::SegQueue;
use nix::sys::{mman, mman::{MapFlags, ProtFlags}};
use crate::completion::Completion;
use crate::cq::CQ;
use crate::cqe::{CQE, CQEs};
use crate::ctypes::{CQOffsets, IORING_ENTER, SQOffsets};
use crate::sq::SQ;
use crate::sqe::SQEs;
use super::ctypes::{Params, io_uring_sqe, IORING_CQ, IORING_FEAT,
IORING_OFF_CQ_RING, IORING_OFF_SQ_RING, IORING_OFF_SQES, IORING_SQ};
use super::syscall;
#[derive(Debug)]
pub struct IoUring {
fd: RawFd,
params: Params,
sq: SQ,
cq: CQ,
waiting: SegQueue<(u32, Waker)>,
}
unsafe fn mmap(map_size: usize, fd: RawFd, offset: i64) -> nix::Result<*mut libc::c_void> {
mman::mmap(
std::ptr::null_mut(),
map_size,
ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
MapFlags::MAP_SHARED | MapFlags::MAP_POPULATE,
fd,
offset
)
}
impl IoUring {
pub fn setup(entries: u32) -> io::Result<Self> {
let mut params = Params::default();
let fd = syscall::setup(entries, &mut params)?;
let mut sq_map_size = (params.sq_off.array as usize) +
(params.sq_entries as usize) * size_of::<u32>();
let mut cq_map_size = (params.cq_off.cqes as usize) +
(params.cq_entries as usize) * size_of::<CQE>();
println!("{:?} {}", params.sq_off, sq_map_size);
// If we can use a single mmap() syscall to map sq, cq and cqe the size of the total map
// is the largest of `sq_map_size` and `cq_map_size`.
if params.features.contains(IORING_FEAT::SINGLE_MMAP) {
sq_map_size = sq_map_size.max(cq_map_size);
cq_map_size = sq_map_size;
}
println!("{:?}", params.cq_off);
let sq_ptr = unsafe {
mmap(sq_map_size as usize, fd, IORING_OFF_SQ_RING as i64)?
};
let sqes_map_size = (params.sq_entries as usize) * size_of::<io_uring_sqe>();
let sqes = unsafe {
let ptr = mmap(sqes_map_size, fd, IORING_OFF_SQES as i64)?.cast();
std::slice::from_raw_parts_mut(ptr, params.sq_entries as usize)
};
let sq = unsafe {
SQ::new(sq_ptr,
params.sq_off,
sqes,
sq_map_size,
sqes_map_size
)
};
let cq_ptr = if params.features.contains(IORING_FEAT::SINGLE_MMAP) {
sq_ptr
} else {
unsafe {
mmap(cq_map_size, fd, IORING_OFF_CQ_RING as i64)?
}
};
let cq = unsafe {
CQ::new(cq_ptr,
params.cq_off,
params.cq_entries,
sq_ptr != cq_ptr,
cq_map_size,
)
};
Ok(IoUring {
fd,
params,
sq,
cq,
waiting: SegQueue::new(),
})
}
pub fn try_prepare<'cx>(
&self,
count: u32,
prepare: impl FnOnce(SQEs<'_>)
) -> Option<()> {
// TODO: Lock the required amount of slots on both submission and completion queue, then
// construct the sqes.
if let Some(sqes) = self.sq.try_reserve(count) {
Some(prepare(sqes))
} else {
None
}
}
pub fn submit(&self) -> io::Result<u32> {
self.sq.submit(self.fd)
}
pub fn cqes(&self) -> CQEs {
CQEs::new(&self.cq)
}
}

View File

@ -0,0 +1,24 @@
// Raw typedefs and structs for kernel communication via syscalls
pub mod ctypes;
mod syscall;
pub mod io_uring;
mod sq;
mod sqe;
mod cq;
mod cqe;
mod completion;
mod cancellation;
#[macro_export]
macro_rules! ready {
($e:expr $(,)?) => {
match $e {
std::task::Poll::Ready(t) => t,
std::task::Poll::Pending => return std::task::Poll::Pending,
}
};
}

View File

@ -0,0 +1,5 @@
pub struct CQE {
}

426
runtime/asyncio/src/sq.rs Normal file
View File

@ -0,0 +1,426 @@
use std::cell::UnsafeCell;
use std::fmt::{Debug, Formatter};
use std::io;
use std::mem::ManuallyDrop;
use std::os::unix::prelude::RawFd;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicU32, compiler_fence, fence, Ordering};
use nix::sys::mman::munmap;
use crate::ctypes::{IORING_ENTER, IORING_SQ, io_uring_sqe, SQOffsets};
use crate::sqe::SQEs;
use crate::syscall;
pub struct SQ {
/// Head of the submission queue. This value is set by the kernel when it consumes SQE.
/// Thus we need to use atomic operations when passing information, making sure both the kernel
/// and program have a consistent view of its contents.
array_head: &'static AtomicU32,
/// The head of the sqes buffer. This value is our local cache of `array_head` that's not
/// shared with or modified by the kernel. We use it to index the start of the prepared SQE.
/// This means that this value lags behind after `array_head`.
sqes_head: UnsafeCell<u32>,
/// Tail of the submission queue. While this will be modified by the userspace program only,
/// the kernel uses atomic operations to read it so we want to use atomic operations to write
/// it.
array_tail: &'static AtomicU32,
// non-atomic cache of array_tail
cached_tail: UnsafeCell<u32>,
/// Tail of the sqes buffer. This value serves as our local cache of `array_tail` and, in
/// combination with `sqes_head` allows us to more efficiently submit SQE by skipping already
/// submitted ones.
/// `sqes_tail` marks the end of the prepared SQE.
sqes_tail: UnsafeCell<u32>,
ring_mask: u32,
num_entries: u32,
flags: &'static AtomicU32,
dropped: &'static AtomicU32,
array: &'static [AtomicU32],
sqes: &'static mut [UnsafeCell<io_uring_sqe>],
sq_ptr: NonNull<()>,
sq_map_size: usize,
sqes_map_size: usize,
}
static_assertions::assert_not_impl_any!(SQ: Send, Sync);
impl Drop for SQ {
fn drop(&mut self) {
unsafe {
munmap(self.sq_ptr.as_ptr().cast(), self.sq_map_size);
let sqes_ptr: *mut libc::c_void = self.sqes.as_mut_ptr().cast();
munmap(sqes_ptr, self.sqes_map_size);
}
}
}
impl Debug for SQ {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
unsafe {
// TODO: Complete
f.debug_struct("SQ")
.field("head", self.array_head)
.field("tail", self.array_tail)
.field("ring_mask", &self.ring_mask)
.field("num_entries", &self.num_entries)
.field("flags", self.flags)
.field("dropped", self.dropped)
.field("array", &self.array)
.finish()
}
}
}
impl SQ {
pub unsafe fn new(ptr: *mut libc::c_void,
offs: SQOffsets,
sqes: &'static mut [UnsafeCell<io_uring_sqe>],
sq_map_size: usize,
sqes_map_size: usize,
) -> Self {
// Sanity check the pointer and offsets. If these fail we were probably passed an
// offsets from an uninitialized parameter struct.
assert!(!ptr.is_null());
assert_ne!(offs.head, offs.tail);
// Eagerly extract static values. Since they won't ever change again there's no reason to
// not read them now.
let ring_mask = *(ptr.offset(offs.ring_mask as isize).cast());
let num_entries = *(ptr.offset(offs.ring_entries as isize).cast());
// These are valid Rust references; they are valid for the entire lifetime of self,
// properly initialized by the kernel and well aligned.
let array_head: &AtomicU32 = &*(ptr.offset(offs.head as isize).cast());
let sqes_head = UnsafeCell::new(array_head.load(Ordering::Acquire));
let array_tail: &AtomicU32 = &*ptr.offset(offs.tail as isize).cast();
let sqes_tail = UnsafeCell::new(array_tail.load(Ordering::Acquire));
let cached_tail = UnsafeCell::new(array_tail.load(Ordering::Acquire));
let flags = &*ptr.offset(offs.flags as isize).cast();
let dropped = &*ptr.offset(offs.dropped as isize).cast();
let array = std::slice::from_raw_parts(
ptr.offset(offs.array as isize).cast(),
sqes.len() as usize,
);
let sq_ptr = NonNull::new_unchecked(ptr).cast();
Self {
array_head,
sqes_head,
array_tail,
sqes_tail,
cached_tail,
ring_mask,
num_entries,
flags,
dropped,
array,
sqes,
sq_ptr,
sq_map_size,
sqes_map_size,
}
}
#[inline(always)]
fn sqes_head(&self) -> &mut u32 {
unsafe { &mut *self.sqes_head.get() }
}
#[inline(always)]
fn sqes_tail(&self) -> &mut u32 {
unsafe { &mut *self.sqes_tail.get() }
}
#[inline(always)]
fn cached_tail(&self) -> &mut u32 {
unsafe { &mut *self.cached_tail.get() }
}
#[inline(always)]
fn increment_tail(&self, count: u32) -> u32 {
let tail = self.sqes_tail();
let old = *tail;
*tail = (*tail).wrapping_add(count);
old
}
#[inline(always)]
fn increment_head(&self, count: u32) -> u32{
let head = self.sqes_head();
let old = *head;
*head = (*head).wrapping_add(count);
old
}
#[inline(always)]
fn used(&self) -> u32 {
(*self.sqes_tail()).wrapping_sub(*self.sqes_head())
}
#[inline(always)]
fn available(&self) -> u32 {
self.num_entries - self.used()
}
/// Submit all prepared entries to the kernel. This function will return the number of
/// entries successfully submitted to the kernel.
pub fn submit(&self, fd: RawFd) -> io::Result<u32> {
let retval = syscall::enter(
fd,
self.num_entries,
0,
IORING_ENTER::GETEVENTS,
std::ptr::null(),
0,
)? as u32;
// Return SQE into circulation that we successfully submitted to the kernel.
self.increment_head(retval);
Ok(retval)
}
/// Prepare actions for submission by shuffling them into the correct order.
///
/// Kernelside `array` is used to index into the sqes, more specifically the code behaves
/// like this:
/// ```C
/// u32 mask = ctx->sq_entries - 1;
/// u32 sq_idx = ctx->cached_sq_head++ & mask;
/// u32 head = READ_ONCE(ctx->sq_array[sq_idx]);
/// if (likely(head < ctx->sq_entries))
/// return &ctx->sq_sqes[head];
/// ```
/// Where `ctx->sq_entries` is the number of slots in the ring (i.e. simply a boundary check).
///
/// So we need to make sure that for every new entry since we last submitted we have the
/// correct index set. In our case shuffle will map the next `count` entries in `self.array` to
/// point to `count` entries in `self.sqes` starting at `start`. This allows actions to be
/// submitted to the kernel even when there are still reserved SQE in between that weren't yet
/// filled.
fn prepare(&self, start: u32, count: u32) {
// Load the tail of the array (i.e. where we will start filling)
let tail = self.cached_tail();
let mut head = start;
for _ in 0..count {
let index = (*tail & self.ring_mask) as usize;
// We can allow this store to be an Relaxed operation since updating the shared tail
// is done after a memory barrier.
self.array[index].store(head & self.ring_mask, Ordering::Relaxed);
// Same here. We need to take the overflow into account but don't have to explicitly
// handle it.
head = head.wrapping_add(1);
*tail = (*tail).wrapping_add(1);
}
// Ensure that the writes into the array are not moved after the write of the tail.
// Otherwise kernelside may read completely wrong indices from array.
compiler_fence(Ordering::Release);
self.array_tail.store(*tail, Ordering::Release);
}
pub fn try_reserve(&self, count: u32) -> Option<SQEs<'_>> {
if self.available() >= count {
let start = self.increment_tail(count);
Some(SQEs::new(self.sqes, start, count))
} else {
None
}
}
}
mod tests {
use std::mem::ManuallyDrop;
use std::sync::atomic::Ordering::Relaxed;
use crate::ctypes::{IORING_OP, IOSQE};
use super::*;
fn gen_sq(num_entries: u32, head: u32, tail: u32) -> ManuallyDrop<SQ> {
assert!((0 < num_entries && num_entries <= 4096), "entries must be between 1 and 4096");
assert_eq!(num_entries.count_ones(), 1, "entries must be a power of two");
let array_head = Box::leak(Box::new(AtomicU32::new(head)));
let array_tail = Box::leak(Box::new(AtomicU32::new(tail)));
let flags = Box::leak(Box::new(AtomicU32::new(0)));
let dropped = Box::leak(Box::new(AtomicU32::new(0)));
let array = Box::leak((0..num_entries)
.map(|n| AtomicU32::new(n))
.collect::<Box<[_]>>());
let sqes = Box::leak((0..num_entries)
.map(|_| UnsafeCell::new(io_uring_sqe::default()))
.collect::<Box<[_]>>());
unsafe {
ManuallyDrop::new(SQ {
array_head,
sqes_head: UnsafeCell::new(head),
array_tail,
sqes_tail: UnsafeCell::new(tail),
cached_tail: UnsafeCell::new(0),
ring_mask: num_entries - 1,
num_entries,
flags,
dropped,
array,
sqes,
sq_ptr: NonNull::dangling(),
sq_map_size: 0,
sqes_map_size: 0
})
}
}
#[test]
fn test_head_tail() {
let mut sq = gen_sq(64, 30, 30);
assert_eq!(*sq.sqes_head(), 30);
assert_eq!(*sq.sqes_tail(), 30);
assert_eq!(sq.used(), 0);
assert_eq!(sq.available(), 64);
sq.increment_tail(4);
assert_eq!(*sq.sqes_head(), 30);
assert_eq!(*sq.sqes_tail(), 34);
assert_eq!(sq.used(), 4);
assert_eq!(sq.available(), 60);
sq.increment_head(2);
assert_eq!(*sq.sqes_head(), 32);
assert_eq!(*sq.sqes_tail(), 34);
assert_eq!(sq.used(), 2);
assert_eq!(sq.available(), 62);
}
#[test]
fn test_sq_getter_setter() {
let mut sq = gen_sq(64, 30, 30);
assert_eq!(*sq.sqes_head(), 30);
assert_eq!(*sq.sqes_tail(), 30);
assert_eq!(sq.used(), 0);
assert_eq!(sq.available(), 64);
{
let mut sqes = sq.try_reserve(2).unwrap();
assert_eq!(sq.used(), 2);
let mut sqe = sqes.next().unwrap();
sqe.set_opcode(IORING_OP::READV);
sqe.add_flags(IOSQE::IO_HARDLINK);
let mut sqe = sqes.next().unwrap();
sqe.set_opcode(IORING_OP::WRITEV);
sqe.set_userdata(823);
}
assert_eq!(sq.used(), 2);
{
let sqes = &mut sq.sqes;
assert_eq!(sqes[30].get_mut().opcode, IORING_OP::READV);
assert_eq!(sqes[30].get_mut().flags, IOSQE::IO_HARDLINK);
assert_eq!(sqes[31].get_mut().opcode, IORING_OP::WRITEV);
assert_eq!(sqes[31].get_mut().user_data, 823);
}
}
#[test]
fn test_sq_full() {
let mut sq = gen_sq(64, 1, 65);
let sqe = sq.try_reserve(1);
assert!(sqe.is_none());
}
#[test]
fn test_out_of_order_submit() {
let mut sq = gen_sq(64, 0, 0);
let start;
{
let mut sqes = sq.try_reserve(4).unwrap();
start = sqes.start();
let mut sqe = sqes.next().unwrap();
sqe.set_opcode(IORING_OP::READV);
sqe.add_flags(IOSQE::IO_HARDLINK);
sqe.set_address(1);
let mut sqe = sqes.next().unwrap();
sqe.set_opcode(IORING_OP::READV);
sqe.add_flags(IOSQE::IO_HARDLINK);
sqe.set_address(2);
let mut sqe = sqes.next().unwrap();
sqe.set_opcode(IORING_OP::READV);
sqe.add_flags(IOSQE::IO_HARDLINK);
sqe.set_address(3);
let mut sqe = sqes.next().unwrap();
sqe.set_opcode(IORING_OP::READV);
sqe.set_address(4);
sqe.set_userdata(823);
}
assert_eq!(sq.used(), 4);
let start2;
{
let mut sqes = sq.try_reserve(4).unwrap();
start2 = sqes.start();
let mut sqe = sqes.next().unwrap();
sqe.set_opcode(IORING_OP::WRITEV);
sqe.add_flags(IOSQE::IO_LINK);
sqe.set_address(1);
let mut sqe = sqes.next().unwrap();
sqe.set_opcode(IORING_OP::WRITEV);
sqe.add_flags(IOSQE::IO_LINK);
sqe.set_address(2);
let mut sqe = sqes.next().unwrap();
sqe.set_opcode(IORING_OP::WRITEV);
sqe.add_flags(IOSQE::IO_LINK);
sqe.set_address(3);
let mut sqe = sqes.next().unwrap();
sqe.set_opcode(IORING_OP::WRITEV);
sqe.set_address(4);
sqe.set_userdata(0xDEADBEEF);
}
assert_eq!(sq.used(), 8);
sq.prepare(start2, 4);
sq.prepare(start, 4);
let sqes: Vec<_> = sq.sqes.iter_mut()
.map(|c| c.get_mut().clone())
.collect();
let mut out: Vec<_> = sq.array.iter().map(|n| {
let i = n.load(Relaxed) as usize;
sqes[i]
}).collect();
for (n, s) in out.iter().take(4).enumerate() {
assert_eq!(s.opcode, IORING_OP::WRITEV);
assert_eq!(s.address, n as u64 + 1);
if n == 3 {
assert_eq!(s.user_data, 0xDEADBEEF);
} else {
assert_eq!(s.flags, IOSQE::IO_LINK);
}
}
for (n, s) in out.iter().skip(4).take(4).enumerate() {
assert_eq!(s.opcode, IORING_OP::READV);
assert_eq!(s.address, n as u64 + 1);
if n == 3 {
assert_eq!(s.user_data, 823);
} else {
assert_eq!(s.flags, IOSQE::IO_HARDLINK);
}
}
let mut i = out.iter().skip(8);
while let Some(sqe) = i.next() {
assert_eq!(*sqe, io_uring_sqe::default());
}
}
}

337
runtime/asyncio/src/sqe.rs Normal file
View File

@ -0,0 +1,337 @@
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::slice::IterMut;
use crate::ctypes::{IORING_OP, IOSQE, io_uring_sqe};
#[derive(Debug)]
pub struct SQE<'iou> {
sqe: &'iou mut io_uring_sqe,
}
impl<'iou> SQE<'iou> {
pub fn new(sqe: &'iou mut io_uring_sqe) -> Self {
Self { sqe }
}
#[inline(always)]
pub fn add_flags(&mut self, flags: IOSQE) {
self.sqe.flags |= flags;
}
#[inline(always)]
pub fn set_opcode(&mut self, opcode: IORING_OP) {
self.sqe.opcode = opcode;
}
#[inline(always)]
pub fn set_userdata(&mut self, user_data: u64) {
self.sqe.user_data = user_data;
}
#[inline(always)]
pub fn set_address(&mut self, address: u64) {
self.sqe.address = address;
}
#[inline(always)]
pub fn set_len(&mut self, len: i32) {
self.sqe.len = len;
}
}
pub struct SQEs<'iou> {
slice: &'iou [UnsafeCell<io_uring_sqe>],
mask: u32,
start: u32,
count: u32,
capacity: u32,
}
impl<'iou> SQEs<'iou> {
pub(crate) fn new(slice: &'iou [UnsafeCell<io_uring_sqe>], start: u32, capacity: u32)
-> Self
{
let mask = (slice.len() - 1) as u32;
Self { slice, mask, count: 0, start, capacity }
}
pub fn last(&mut self) -> Option<SQE<'iou>> {
let mut last = None;
while let Some(sqe) = self.consume() { last = Some(sqe) }
last
}
/// An iterator of [`HardLinkedSQE`]s. These will be [`SQE`]s that are hard linked together.
///
/// Hard linked SQEs will occur sequentially. All of them will be completed, even if one of the
/// events resolves to an error.
pub fn hard_linked(&mut self) -> HardLinked<'iou, '_> {
HardLinked { sqes: self }
}
/// An iterator of [`SoftLinkedSQE`]s. These will be [`SQE`]s that are soft linked together.
///
/// Soft linked SQEs will occur sequentially. If one the events errors, all events after it
/// will be cancelled.
pub fn soft_linked(&mut self) -> SoftLinked<'iou, '_> {
SoftLinked { sqes: self }
}
/// Remaining [`SQE`]s that can be modified.
pub fn remaining(&self) -> u32 {
self.capacity - self.count
}
pub fn start(&self) -> u32 {
self.start
}
pub fn capacity(&self) -> u32 {
self.capacity
}
fn consume(&mut self) -> Option<SQE<'iou>> {
if self.count >= self.capacity {
None
} else {
let index = (self.start + self.count) & self.mask;
self.count += 1;
let sqe: &mut io_uring_sqe = unsafe {
&mut *self.slice.get_unchecked(index as usize).get()
};
// Ensure that all SQE passing through here are wiped into NOPs first.
*sqe = io_uring_sqe::default();
sqe.opcode = IORING_OP::NOP;
Some(SQE { sqe })
}
}
/// Exhaust this iterator, thus ensuring all entries are set to NOP
fn exhaust(&mut self) {
while let Some(_) = self.consume() {}
}
}
impl<'iou> Iterator for SQEs<'iou> {
type Item = SQE<'iou>;
fn next(&mut self) -> Option<SQE<'iou>> {
self.consume()
}
}
impl<'iou> Drop for SQEs<'iou> {
fn drop(&mut self) {
if self.count != 0 {
// This iterator is responsible for all of its SQE and must NOP every not used one.
self.exhaust()
}
}
}
/// An Iterator of [`SQE`]s which will be hard linked together.
pub struct HardLinked<'iou, 'a> {
sqes: &'a mut SQEs<'iou>,
}
impl<'iou> HardLinked<'iou, '_> {
pub fn terminate(self) -> Option<SQE<'iou>> {
self.sqes.consume()
}
}
impl<'iou> Iterator for HardLinked<'iou, '_> {
type Item = HardLinkedSQE<'iou>;
fn next(&mut self) -> Option<Self::Item> {
let is_final = self.sqes.remaining() == 1;
self.sqes.consume().map(|sqe| HardLinkedSQE { sqe, is_final })
}
}
pub struct HardLinkedSQE<'iou> {
sqe: SQE<'iou>,
is_final: bool,
}
impl<'iou> Deref for HardLinkedSQE<'iou> {
type Target = SQE<'iou>;
fn deref(&self) -> &SQE<'iou> {
&self.sqe
}
}
impl<'iou> DerefMut for HardLinkedSQE<'iou> {
fn deref_mut(&mut self) -> &mut SQE<'iou> {
&mut self.sqe
}
}
impl<'iou> Drop for HardLinkedSQE<'iou> {
fn drop(&mut self) {
if !self.is_final {
self.sqe.add_flags(IOSQE::IO_HARDLINK);
}
}
}
/// An Iterator of [`SQE`]s which will be soft linked together.
pub struct SoftLinked<'iou, 'a> {
sqes: &'a mut SQEs<'iou>,
}
impl<'iou> SoftLinked<'iou, '_> {
pub fn terminate(self) -> Option<SQE<'iou>> {
self.sqes.consume()
}
}
impl<'iou> Iterator for SoftLinked<'iou, '_> {
type Item = SoftLinkedSQE<'iou>;
fn next(&mut self) -> Option<Self::Item> {
let is_final = self.sqes.remaining() == 1;
self.sqes.consume().map(|sqe| SoftLinkedSQE { sqe, is_final })
}
}
pub struct SoftLinkedSQE<'iou> {
sqe: SQE<'iou>,
is_final: bool,
}
impl<'iou> Deref for SoftLinkedSQE<'iou> {
type Target = SQE<'iou>;
fn deref(&self) -> &SQE<'iou> {
&self.sqe
}
}
impl<'iou> DerefMut for SoftLinkedSQE<'iou> {
fn deref_mut(&mut self) -> &mut SQE<'iou> {
&mut self.sqe
}
}
impl<'iou> Drop for SoftLinkedSQE<'iou> {
fn drop(&mut self) {
if !self.is_final {
self.sqe.add_flags(IOSQE::IO_LINK);
}
}
}
mod tests {
use super::*;
fn gen_buf(num_entries: usize) -> &'static mut [UnsafeCell<io_uring_sqe>]{
Box::leak((0..num_entries)
.map(|_| UnsafeCell::new(io_uring_sqe::default()))
.collect::<Box<[_]>>())
}
#[test]
fn test_wrapping_sqes() {
let mut sqe_buf = gen_buf(64);
{
let mut sqes = SQEs::new(&mut sqe_buf[..], 62, 5);
assert_eq!(sqes.next().map(|i| i.sqe.user_data = 1), Some(()));
assert_eq!(sqes.next().map(|i| i.sqe.user_data = 2), Some(()));
assert_eq!(sqes.next().map(|i| i.sqe.user_data = 3), Some(()));
assert_eq!(sqes.next().map(|i| i.sqe.user_data = 4), Some(()));
assert_eq!(sqes.next().map(|i| i.sqe.user_data = 5), Some(()));
assert_eq!(sqes.next().map(|i| i.sqe.user_data = 6), None);
}
assert_eq!(sqe_buf[61].get_mut().user_data, 0);
assert_eq!(sqe_buf[62].get_mut().user_data, 1);
assert_eq!(sqe_buf[63].get_mut().user_data, 2);
assert_eq!(sqe_buf[0].get_mut().user_data, 3);
assert_eq!(sqe_buf[1].get_mut().user_data, 4);
assert_eq!(sqe_buf[2].get_mut().user_data, 5);
assert_eq!(sqe_buf[3].get_mut().user_data, 0);
}
#[test]
fn test_hard_linked_sqes() {
let mut sqe_buf = gen_buf(64);
{
let mut sqes = SQEs::new(&mut sqe_buf, 62, 5);
let mut linked = sqes.hard_linked();
assert_eq!(linked.next().map(|i| i.sqe.sqe.opcode = IORING_OP::READ), Some(()));
assert_eq!(linked.next().map(|i| i.sqe.sqe.opcode = IORING_OP::TEE), Some(()));
assert_eq!(linked.next().map(|i| i.sqe.sqe.opcode = IORING_OP::ACCEPT), Some(()));
assert_eq!(linked.next().map(|i| i.sqe.sqe.opcode = IORING_OP::CLOSE), Some(()));
assert_eq!(linked.next().map(|i| i.sqe.sqe.opcode = IORING_OP::CONNECT), Some(()));
assert_eq!(linked.next().map(|i| i.sqe.sqe.opcode = IORING_OP::FADVISE), None);
}
assert_eq!(sqe_buf[61].get_mut().opcode, IORING_OP::NOP);
assert_eq!(sqe_buf[61].get_mut().flags, IOSQE::empty());
assert_eq!(sqe_buf[62].get_mut().opcode, IORING_OP::READ);
assert_eq!(sqe_buf[62].get_mut().flags, IOSQE::IO_HARDLINK);
assert_eq!(sqe_buf[63].get_mut().opcode, IORING_OP::TEE);
assert_eq!(sqe_buf[63].get_mut().flags, IOSQE::IO_HARDLINK);
assert_eq!(sqe_buf[0].get_mut().opcode, IORING_OP::ACCEPT);
assert_eq!(sqe_buf[0].get_mut().flags, IOSQE::IO_HARDLINK);
assert_eq!(sqe_buf[1].get_mut().opcode, IORING_OP::CLOSE);
assert_eq!(sqe_buf[1].get_mut().flags, IOSQE::IO_HARDLINK);
assert_eq!(sqe_buf[2].get_mut().opcode, IORING_OP::CONNECT);
assert_eq!(sqe_buf[2].get_mut().flags, IOSQE::empty());
assert_eq!(sqe_buf[3].get_mut().opcode, IORING_OP::NOP);
assert_eq!(sqe_buf[3].get_mut().flags, IOSQE::empty());
}
#[test]
fn test_soft_linked_sqes() {
let mut sqe_buf = gen_buf(64);
{
let mut sqes = SQEs::new(&mut sqe_buf, 62, 5);
let mut linked = sqes.soft_linked();
assert_eq!(linked.next().map(|i| i.sqe.sqe.opcode = IORING_OP::READ), Some(()));
assert_eq!(linked.next().map(|i| i.sqe.sqe.opcode = IORING_OP::TEE), Some(()));
assert_eq!(linked.next().map(|i| i.sqe.sqe.opcode = IORING_OP::ACCEPT), Some(()));
assert_eq!(linked.next().map(|i| i.sqe.sqe.opcode = IORING_OP::CLOSE), Some(()));
assert_eq!(linked.next().map(|i| i.sqe.sqe.opcode = IORING_OP::CONNECT), Some(()));
assert_eq!(linked.next().map(|i| i.sqe.sqe.opcode = IORING_OP::FADVISE), None);
}
assert_eq!(sqe_buf[61].get_mut().opcode, IORING_OP::NOP);
assert_eq!(sqe_buf[61].get_mut().flags, IOSQE::empty());
assert_eq!(sqe_buf[62].get_mut().opcode, IORING_OP::READ);
assert_eq!(sqe_buf[62].get_mut().flags, IOSQE::IO_LINK);
assert_eq!(sqe_buf[63].get_mut().opcode, IORING_OP::TEE);
assert_eq!(sqe_buf[63].get_mut().flags, IOSQE::IO_LINK);
assert_eq!(sqe_buf[0].get_mut().opcode, IORING_OP::ACCEPT);
assert_eq!(sqe_buf[0].get_mut().flags, IOSQE::IO_LINK);
assert_eq!(sqe_buf[1].get_mut().opcode, IORING_OP::CLOSE);
assert_eq!(sqe_buf[1].get_mut().flags, IOSQE::IO_LINK);
assert_eq!(sqe_buf[2].get_mut().opcode, IORING_OP::CONNECT);
assert_eq!(sqe_buf[2].get_mut().flags, IOSQE::empty());
assert_eq!(sqe_buf[3].get_mut().opcode, IORING_OP::NOP);
assert_eq!(sqe_buf[3].get_mut().flags, IOSQE::empty());
}
}

View File

@ -0,0 +1,61 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use iou::{SQE, SQEs};
use super::{Event, Submission};
pub struct Completion<'cx> {
inner: super::Completion,
marker: PhantomData<fn(&'cx ()) -> &'cx ()>,
}
impl<'cx> Completion<'cx> {
pub(crate) fn new(mut sqe: SQE<'_>, _sqes: SQEs<'_>, cx: &mut Context<'cx>) -> Self {
let inner = super::Completion::new(cx.waker().clone());
// Make the userdata for the (final) SQE a pointer to the waker for the task blocking on
// this IO.
unsafe { sqe.set_user_data(inner.addr()) };
Self { inner, marker: PhantomData }
}
#[inline(always)]
pub(crate) fn into_inner(self) -> super::Completion {
self.inner
}
}
pub trait Driver: Clone {
/// Poll to prepare a number of submissions for the submission queue.
///
/// If the driver has space for `count` SQE available it calls `prepare` to have said `SQE`
/// inserted. A driver can assume that prepare will use exactly `count` slots. Using this
/// drivers can implement backpressure by returning `Poll::Pending` if less than `count`
/// slots are available and waking the respective task up if enough slots have become available.
fn poll_prepare<'cx>(
self: Pin<&mut Self>,
ctx: &mut Context<'cx>,
count: u32,
prepare: impl FnOnce(SQEs<'_>, &mut Context<'cx>) -> Completion<'cx>,
) -> Poll<Completion<'cx>>;
/// Suggestion for the driver to submit their queue to the kernel.
///
/// This will be called by tasks after they have finished preparing submissions. Drivers must
/// eventually submit these to the kernel but aren't required to do so right away.
fn poll_submit(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<()>;
/// Completion hint
///
/// This should return `Poll::Ready` if an completion with the given user_data may have been
/// received since the last call to this function. It is safe to always return `Poll::Ready`,
/// even if no actions were completed.
fn poll_complete(self: Pin<&mut Self>, ctx: &mut Context<'_>, user_data: u64) -> Poll<()>;
fn submit<E: Event>(self, event: E) -> Submission<Self, E>
where Self: Sized
{
Submission::new(self, event)
}
}

View File

@ -0,0 +1,27 @@
use std::mem::ManuallyDrop;
use std::os::unix::io::RawFd;
use iou::sqe::{SockFlag, SockAddrStorage};
use iou::registrar::UringFd;
use super::{Event, SQE, SQEs, Cancellation};
pub struct Accept<FD = RawFd> {
pub addr: Option<Box<SockAddrStorage>>,
pub fd: FD,
pub flags: SockFlag,
}
impl<FD: UringFd + Copy> Event for Accept<FD> {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_accept(self.fd, self.addr.as_deref_mut(), self.flags);
sqe
}
fn cancel(this: ManuallyDrop<Self>) -> Cancellation {
Cancellation::from(ManuallyDrop::into_inner(this).addr)
}
}

View File

@ -0,0 +1,19 @@
use std::os::unix::io::RawFd;
use iou::registrar::UringFd;
use super::{Event, SQE, SQEs};
pub struct Close<FD = RawFd> {
pub fd: FD,
}
impl<FD: UringFd + Copy> Event for Close<FD> {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_close(self.fd);
sqe
}
}

View File

@ -0,0 +1,26 @@
use std::mem::ManuallyDrop;
use std::os::unix::io::RawFd;
use iou::sqe::SockAddr;
use iou::registrar::UringFd;
use super::{Event, SQE, SQEs, Cancellation};
pub struct Connect<FD = RawFd> {
pub fd: FD,
pub addr: Box<SockAddr>,
}
impl<FD: UringFd + Copy> Event for Connect<FD> {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_connect(self.fd, &mut *self.addr);
sqe
}
fn cancel(this: ManuallyDrop<Self>) -> Cancellation {
Cancellation::from(ManuallyDrop::into_inner(this).addr)
}
}

View File

@ -0,0 +1,27 @@
use std::mem::ManuallyDrop;
use std::os::unix::io::RawFd;
use iou::sqe::{EpollOp, EpollEvent};
use super::{Event, SQE, SQEs, Cancellation};
pub struct EpollCtl {
pub epoll_fd: RawFd,
pub op: EpollOp,
pub fd: RawFd,
pub event: Option<Box<EpollEvent>>,
}
impl Event for EpollCtl {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_epoll_ctl(self.epoll_fd, self.op, self.fd, self.event.as_deref_mut());
sqe
}
fn cancel(this: ManuallyDrop<Self>) -> Cancellation {
Cancellation::from(ManuallyDrop::into_inner(this).event)
}
}

View File

@ -0,0 +1,23 @@
use std::os::unix::io::RawFd;
use iou::sqe::PosixFadviseAdvice;
use iou::registrar::UringFd;
use super::{Event, SQE, SQEs};
pub struct Fadvise<FD = RawFd> {
pub fd: FD,
pub offset: u64,
pub size: u64,
pub flags: PosixFadviseAdvice,
}
impl<FD: UringFd + Copy> Event for Fadvise<FD> {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_fadvise(self.fd, self.offset, self.size, self.flags);
sqe
}
}

View File

@ -0,0 +1,23 @@
use std::os::unix::io::RawFd;
use iou::registrar::UringFd;
use iou::sqe::FallocateFlags;
use super::{Event, SQE, SQEs};
pub struct Fallocate<FD = RawFd> {
pub fd: FD,
pub offset: u64,
pub size: u64,
pub flags: FallocateFlags,
}
impl<FD: UringFd + Copy> Event for Fallocate<FD> {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_fallocate(self.fd, self.offset, self.size, self.flags);
sqe
}
}

View File

@ -0,0 +1,23 @@
use std::mem::ManuallyDrop;
use std::os::unix::io::RawFd;
use super::{Event, SQE, SQEs, Cancellation};
pub struct FilesUpdate {
pub files: Box<[RawFd]>,
pub offset: u32,
}
impl Event for FilesUpdate {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_files_update(&self.files[..], self.offset);
sqe
}
fn cancel(this: ManuallyDrop<Self>) -> Cancellation {
Cancellation::from(ManuallyDrop::into_inner(this).files)
}
}

View File

@ -0,0 +1,21 @@
use std::os::unix::io::RawFd;
use iou::registrar::UringFd;
use iou::sqe::FsyncFlags;
use super::{Event, SQE, SQEs};
pub struct Fsync<FD = RawFd> {
pub fd: FD,
pub flags: FsyncFlags,
}
impl<FD: UringFd + Copy> Event for Fsync<FD> {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_fsync(self.fd, self.flags);
sqe
}
}

View File

@ -0,0 +1,56 @@
mod accept;
mod close;
mod connect;
mod epoll_ctl;
mod fadvise;
mod fallocate;
mod files_update;
mod fsync;
mod openat;
mod provide_buffers;
mod read;
mod readv;
mod recv;
mod send;
mod splice;
mod statx;
mod timeout;
mod write;
mod writev;
use std::mem::ManuallyDrop;
use iou::{SQE, SQEs};
use super::Cancellation;
pub use accept::Accept;
pub use close::Close;
pub use connect::Connect;
pub use epoll_ctl::EpollCtl;
pub use fadvise::Fadvise;
pub use fallocate::Fallocate;
pub use files_update::FilesUpdate;
pub use fsync::Fsync;
pub use openat::OpenAt;
pub use provide_buffers::ProvideBuffers;
pub use read::Read;
pub use readv::ReadVectored;
pub use recv::Recv;
pub use send::Send;
pub use splice::Splice;
pub use statx::Statx;
pub use timeout::Timeout;
pub use write::Write;
pub use writev::WriteVectored;
pub trait Event {
fn sqes_needed() -> u32;
unsafe fn prepare<'a>(&mut self, sqs: &mut SQEs<'a>) -> SQE<'a>;
fn cancel(_: ManuallyDrop<Self>) -> Cancellation
where Self: Sized
{
Cancellation::from(())
}
}

View File

@ -0,0 +1,39 @@
use std::ffi::CString;
use std::mem::ManuallyDrop;
use std::os::unix::ffi::OsStrExt;
use std::os::unix::prelude::RawFd;
use std::path::Path;
use iou::{SQE, SQEs};
use iou::sqe::{Mode, OFlag};
use crate::sys::linux::io_uring::cancellation::Cancellation;
use super::Event;
pub struct OpenAt {
pub path: CString,
pub dir_fd: RawFd,
pub flags: OFlag,
pub mode: Mode,
}
impl OpenAt {
pub fn without_dir(path: impl AsRef<Path>, flags: OFlag, mode: Mode) -> Self {
let path = CString::new(path.as_ref().as_os_str().as_bytes()).unwrap();
Self { path, dir_fd: libc::AT_FDCWD, flags, mode }
}
}
impl Event for OpenAt {
fn sqes_needed() -> u32 {
1
}
unsafe fn prepare<'a>(&mut self, sqs: &mut SQEs<'a>) -> SQE<'a> {
let mut sqe = sqs.single().unwrap();
sqe.prep_openat(self.dir_fd, &*self.path, self.flags, self.mode);
sqe
}
fn cancel(this: ManuallyDrop<Self>) -> Cancellation where Self: Sized {
ManuallyDrop::into_inner(this).path.into()
}
}

View File

@ -0,0 +1,40 @@
use std::mem::ManuallyDrop;
use iou::sqe::BufferGroupId;
use super::{Event, SQE, SQEs, Cancellation};
pub struct ProvideBuffers {
pub bufs: Box<[u8]>,
pub count: u32,
pub group: BufferGroupId,
pub index: u32,
}
impl Event for ProvideBuffers {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_provide_buffers(&mut self.bufs[..], self.count, self.group, self.index);
sqe
}
fn cancel(this: ManuallyDrop<Self>) -> Cancellation {
Cancellation::from(ManuallyDrop::into_inner(this).bufs)
}
}
pub struct RemoveBuffers {
pub count: u32,
pub group: BufferGroupId,
}
impl Event for RemoveBuffers {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_remove_buffers(self.count, self.group);
sqe
}
}

View File

@ -0,0 +1,47 @@
use std::mem::ManuallyDrop;
use std::os::unix::io::RawFd;
use iou::registrar::{UringFd, RegisteredBuf};
use super::{Event, SQE, SQEs, Cancellation};
/// A basic read event.
pub struct Read<FD = RawFd> {
pub fd: FD,
pub buf: Box<[u8]>,
pub offset: u64,
}
impl<FD: UringFd + Copy> Event for Read<FD> {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_read(self.fd, &mut self.buf[..], self.offset);
sqe
}
fn cancel(this: ManuallyDrop<Self>) -> Cancellation {
Cancellation::from(ManuallyDrop::into_inner(this).buf)
}
}
pub struct ReadFixed<FD = RawFd> {
pub fd: FD,
pub buf: RegisteredBuf,
pub offset: u64,
}
impl<FD: UringFd + Copy> Event for ReadFixed<FD> {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_read(self.fd, self.buf.as_mut(), self.offset);
sqe
}
fn cancel(this: ManuallyDrop<Self>) -> Cancellation {
Cancellation::from(ManuallyDrop::into_inner(this).buf)
}
}

View File

@ -0,0 +1,48 @@
use std::io::IoSliceMut;
use std::mem::ManuallyDrop;
use std::os::unix::io::RawFd;
use iou::registrar::UringFd;
use super::{Event, SQE, SQEs, Cancellation};
/// A `readv` event.
pub struct ReadVectored<FD = RawFd> {
pub fd: FD,
pub bufs: Box<[Box<[u8]>]>,
pub offset: u64,
}
impl<FD> ReadVectored<FD> {
fn as_iovecs(buffers: &mut [Box<[u8]>]) -> &mut [IoSliceMut] {
// Unsafe contract:
// This pointer cast is defined behaviour because Box<[u8]> (wide pointer)
// is currently ABI compatible with libc::iovec.
//
// Then, libc::iovec is guaranteed ABI compatible with IoSliceMut on Unix:
// https://doc.rust-lang.org/beta/std/io/struct.IoSliceMut.html
//
// We are relying on the internals of Box<[u8]>, but this is such a
// foundational part of Rust it's unlikely the data layout would change
// without warning.
//
// Pointer cast expression adapted from the "Turning a &mut T into an &mut U"
// example of: https://doc.rust-lang.org/std/mem/fn.transmute.html#alternatives
unsafe { &mut *(buffers as *mut [Box<[u8]>] as *mut [IoSliceMut]) }
}
}
impl<FD: UringFd + Copy> Event for ReadVectored<FD> {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_read_vectored(self.fd, Self::as_iovecs(&mut self.bufs[..]), self.offset);
sqe
}
fn cancel(this: ManuallyDrop<Self>) -> Cancellation {
Cancellation::from(ManuallyDrop::into_inner(this).bufs)
}
}

View File

@ -0,0 +1,27 @@
use std::mem::ManuallyDrop;
use std::os::unix::io::RawFd;
use iou::sqe::MsgFlags;
use iou::registrar::UringFd;
use super::{Event, SQE, SQEs, Cancellation};
pub struct Recv<FD = RawFd> {
pub fd: FD,
pub buf: Box<[u8]>,
pub flags: MsgFlags,
}
impl<FD: UringFd + Copy> Event for Recv<FD> {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_recv(self.fd, &mut self.buf[..], self.flags);
sqe
}
fn cancel(this: ManuallyDrop<Self>) -> Cancellation {
Cancellation::from(ManuallyDrop::into_inner(this).buf)
}
}

View File

@ -0,0 +1,27 @@
use std::mem::ManuallyDrop;
use std::os::unix::io::RawFd;
use iou::sqe::MsgFlags;
use iou::registrar::UringFd;
use super::{Event, SQE, SQEs, Cancellation};
pub struct Send<FD = RawFd> {
pub fd: FD,
pub buf: Box<[u8]>,
pub flags: MsgFlags,
}
impl<FD: UringFd + Copy> Event for Send<FD> {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_send(self.fd, &self.buf[..], self.flags);
sqe
}
fn cancel(this: ManuallyDrop<Self>) -> Cancellation {
Cancellation::from(ManuallyDrop::into_inner(this).buf)
}
}

View File

@ -0,0 +1,24 @@
use std::os::unix::io::RawFd;
use iou::sqe::SpliceFlags;
use super::{Event, SQE, SQEs};
pub struct Splice {
pub fd_in: RawFd,
pub off_in: i64,
pub fd_out: RawFd,
pub off_out: i64,
pub bytes: u32,
pub flags: SpliceFlags,
}
impl Event for Splice {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_splice(self.fd_in, self.off_in, self.fd_out, self.off_out, self.bytes, self.flags);
sqe
}
}

View File

@ -0,0 +1,53 @@
use std::ffi::CString;
use std::mem::{self, ManuallyDrop};
use std::os::unix::io::RawFd;
use std::os::unix::ffi::OsStrExt;
use std::path::Path;
use iou::sqe::{StatxFlags, StatxMode};
use iou::registrar::UringFd;
use super::{Event, SQE, SQEs, Cancellation};
pub struct Statx<FD = RawFd> {
pub dir_fd: FD,
pub path: CString,
pub flags: StatxFlags,
pub mask: StatxMode,
pub statx: Box<libc::statx>,
}
impl Statx {
pub fn without_dir(path: impl AsRef<Path>, flags: StatxFlags, mask: StatxMode) -> Statx {
let path = CString::new(path.as_ref().as_os_str().as_bytes()).unwrap();
let statx = unsafe { Box::new(mem::zeroed()) };
Statx { path, dir_fd: libc::AT_FDCWD, flags, mask, statx }
}
}
impl<FD: UringFd> Statx<FD> {
pub fn without_path(fd: FD, mut flags: StatxFlags, mask: StatxMode) -> Statx<FD> {
unsafe {
// TODO don't allocate? Use Cow? Use NULL?
let path = CString::new("").unwrap();
let statx = Box::new(mem::zeroed());
flags.insert(StatxFlags::AT_EMPTY_PATH);
Statx { dir_fd: fd, path, flags, mask, statx }
}
}
}
impl<FD: UringFd + Copy> Event for Statx<FD> {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_statx(self.dir_fd, self.path.as_c_str(), self.flags, self.mask, &mut *self.statx);
sqe
}
fn cancel(this: ManuallyDrop<Self>) -> Cancellation {
let this = ManuallyDrop::into_inner(this);
Cancellation::from((this.statx, this.path))
}
}

View File

@ -0,0 +1,67 @@
use std::mem::ManuallyDrop;
use std::time::Duration;
use super::{Event, SQE, SQEs, Cancellation};
use iou::sqe::TimeoutFlags;
pub struct StaticTimeout {
ts: uring_sys::__kernel_timespec,
events: u32,
flags: TimeoutFlags,
}
impl StaticTimeout {
pub const fn new(duration: Duration, events: u32, flags: TimeoutFlags) -> StaticTimeout {
StaticTimeout {
ts: timespec(duration),
events, flags,
}
}
}
impl Event for &'static StaticTimeout {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_timeout(&self.ts, self.events, self.flags);
sqe
}
}
pub struct Timeout {
ts: Box<uring_sys::__kernel_timespec>,
events: u32,
flags: TimeoutFlags,
}
impl Timeout {
pub fn new(duration: Duration, events: u32, flags: TimeoutFlags) -> Timeout {
Timeout {
ts: Box::new(timespec(duration)),
events, flags,
}
}
}
impl Event for Timeout {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_timeout(&*self.ts, self.events, self.flags);
sqe
}
fn cancel(this: ManuallyDrop<Self>) -> Cancellation {
Cancellation::from(ManuallyDrop::into_inner(this).ts)
}
}
const fn timespec(duration: Duration) -> uring_sys::__kernel_timespec {
uring_sys::__kernel_timespec {
tv_sec: duration.as_secs() as i64,
tv_nsec: duration.subsec_nanos() as _,
}
}

View File

@ -0,0 +1,47 @@
use std::mem::ManuallyDrop;
use std::os::unix::io::RawFd;
use iou::registrar::{UringFd, RegisteredBuf};
use super::{Event, SQE, SQEs, Cancellation};
/// A basic write event.
pub struct Write<FD = RawFd> {
pub fd: FD,
pub buf: Box<[u8]>,
pub offset: u64,
}
impl<FD: UringFd + Copy> Event for Write<FD> {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_write(self.fd, &self.buf[..], self.offset);
sqe
}
fn cancel(this: ManuallyDrop<Self>) -> Cancellation {
Cancellation::from(ManuallyDrop::into_inner(this).buf)
}
}
pub struct WriteFixed<FD = RawFd> {
pub fd: FD,
pub buf: RegisteredBuf,
pub offset: u64,
}
impl<FD: UringFd + Copy> Event for WriteFixed<FD> {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_write(self.fd, self.buf.as_ref(), self.offset);
sqe
}
fn cancel(this: ManuallyDrop<Self>) -> Cancellation {
Cancellation::from(ManuallyDrop::into_inner(this).buf)
}
}

View File

@ -0,0 +1,34 @@
use std::io::IoSlice;
use std::mem::ManuallyDrop;
use std::os::unix::io::RawFd;
use iou::registrar::UringFd;
use super::{Event, SQE, SQEs, Cancellation};
/// A `writev` event.
pub struct WriteVectored<FD = RawFd> {
pub fd: FD,
pub bufs: Box<[Box<[u8]>]>,
pub offset: u64,
}
impl<FD> WriteVectored<FD> {
fn iovecs(&self) -> &[IoSlice] {
unsafe { & *(&self.bufs[..] as *const [Box<[u8]>] as *const [IoSlice]) }
}
}
impl<FD: UringFd + Copy> Event for WriteVectored<FD> {
fn sqes_needed() -> u32 { 1 }
unsafe fn prepare<'sq>(&mut self, sqs: &mut SQEs<'sq>) -> SQE<'sq> {
let mut sqe = sqs.single().unwrap();
sqe.prep_write_vectored(self.fd, self.iovecs(), self.offset);
sqe
}
fn cancel(this: ManuallyDrop<Self>) -> Cancellation {
Cancellation::from(ManuallyDrop::into_inner(this).bufs)
}
}

View File

@ -0,0 +1,187 @@
// Imported here for modules
use std::future::Future;
use std::{fs, io};
use std::mem::ManuallyDrop;
use std::os::unix::prelude::{FromRawFd, RawFd};
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use super::{Driver, Ring, Submission, events::*};
use futures_core::ready;
use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead};
use iou::sqe::{Mode, OFlag};
pub struct File<D: Driver> {
ring: Ring<D>,
fd: RawFd,
active: Op,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
enum Op {
Read,
Write,
Close,
Nothing,
Statx,
Closed,
}
impl<D: Driver> File<D> {
fn from_fd(fd: RawFd, driver: D) -> File<D> {
File {
ring: Ring::new(driver),
fd,
active: Op::Nothing,
}
}
pub fn open<P: AsRef<Path>>(driver: D, path: P) -> impl Future<Output = io::Result<Self>> {
let flags = OFlag::O_CLOEXEC | OFlag::O_RDONLY;
open::Open(driver.submit(OpenAt::without_dir(
path, flags, Mode::from_bits(0o666).unwrap()
)))
}
pub fn create<P: AsRef<Path>>(driver: D, path: P) -> impl Future<Output = io::Result<Self>> {
let flags = OFlag::O_CLOEXEC | OFlag::O_WRONLY | OFlag::O_CREAT | OFlag::O_TRUNC;
create::Create(driver.submit(OpenAt::without_dir(
path, flags, Mode::from_bits(0o666).unwrap()
)))
}
}
mod open;
mod create;
impl<D: Driver> AsyncRead for File<D> {
fn poll_read(mut self: Pin<&mut Self>, ctx: &mut Context<'_>, buf: &mut [u8])
-> Poll<io::Result<usize>>
{
let mut inner = ready!(self.as_mut().poll_fill_buf(ctx))?;
let len = io::Read::read(&mut inner, buf)?;
self.consume(len);
Poll::Ready(Ok(len))
}
}
impl<D: Driver> AsyncBufRead for File<D> {
fn poll_fill_buf(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let fd = self.fd;
let (ring, buf, pos, ..) = self.split_with_buf();
buf.fill_buf(|buf| {
let n = ready!(ring.poll(ctx, 1, |sqs| {
let mut sqe = sqs.single().unwrap();
unsafe {
sqe.prep_read(fd, buf, *pos);
}
sqe
}))?;
*pos += n as u64;
Poll::Ready(Ok(n as u32))
})
}
fn consume(self: Pin<&mut Self>, amt: usize) {
self.buf().consume(amt);
}
}
impl<D: Driver> AsyncWrite for File<D> {
fn poll_write(mut self: Pin<&mut Self>, ctx: &mut Context<'_>, slice: &[u8]) -> Poll<io::Result<usize>> {
let fd = self.fd;
let (ring, buf, pos, ..) = self.split_with_buf();
let data = ready!(buf.fill_buf(|mut buf| {
Poll::Ready(Ok(io::Write::write(&mut buf, slice)? as u32))
}))?;
let n = ready!(ring.poll(ctx, 1, |sqs| {
let mut sqe = sqs.single().unwrap();
unsafe {
sqe.prep_write(fd, data, *pos);
}
sqe
}))?;
*pos += n as u64;
buf.clear();
Poll::Ready(Ok(n as usize))
}
fn poll_flush(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<()>> {
ready!(self.poll_write(ctx, &[]))?;
Poll::Ready(Ok(()))
}
fn poll_close(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.as_mut().guard_op(Op::Close);
let fd = self.fd;
ready!(self.as_mut().ring().poll(ctx, 1, |sqs| {
let mut sqe = sqs.single().unwrap();
unsafe {
sqe.prep_close(fd);
}
sqe
}))?;
self.confirm_close();
Poll::Ready(Ok(()))
}
}
impl<D: Driver> AsyncSeek for File<D> {
fn poll_seek(mut self: Pin<&mut Self>, ctx: &mut Context, pos: io::SeekFrom)
-> Poll<io::Result<u64>>
{
let (start, offset) = match pos {
io::SeekFrom::Start(n) => {
*self.as_mut().pos() = n;
return Poll::Ready(Ok(self.pos));
}
io::SeekFrom::Current(n) => (self.pos, n),
io::SeekFrom::End(n) => {
(ready!(self.as_mut().poll_file_size(ctx))?, n)
}
};
let valid_seek = if offset.is_negative() {
match start.checked_sub(offset.abs() as u64) {
Some(valid_seek) => valid_seek,
None => {
let invalid = io::Error::from(io::ErrorKind::InvalidInput);
return Poll::Ready(Err(invalid));
}
}
} else {
match start.checked_add(offset as u64) {
Some(valid_seek) => valid_seek,
None => {
let overflow = io::Error::from_raw_os_error(libc::EOVERFLOW);
return Poll::Ready(Err(overflow));
}
}
};
*self.as_mut().pos() = valid_seek;
Poll::Ready(Ok(self.pos))
}
}
impl<D: Driver> From<File<D>> for fs::File {
fn from(mut file: File<D>) -> fs::File {
file.cancel();
let file = ManuallyDrop::new(file);
unsafe {
fs::File::from_raw_fd(file.fd)
}
}
}
impl<D: Driver> Drop for File<D> {
fn drop(&mut self) {
match self.active {
Op::Closed => { }
Op::Nothing => unsafe { libc::close(self.fd); },
_ => self.cancel(),
}
}
}

View File

@ -0,0 +1,18 @@
use std::future::Future;
use futures_core::ready;
use super::*;
pub(super) struct Create<D: Driver>(pub(super) Submission<D, OpenAt>);
impl<D: Driver> Future for Create<D> {
type Output = io::Result<File<D>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut inner = unsafe {
self.map_unchecked_mut(|this| &mut this.0)
};
let (_, ready) = ready!(inner.as_mut().poll(cx));
let fd = ready? as i32;
Poll::Ready(Ok(File::from_fd(fd, inner.driver().clone())))
}
}

View File

@ -0,0 +1,18 @@
use std::future::Future;
use futures_core::ready;
use super::*;
pub(super) struct Open<D: Driver>(pub(super) Submission<D, OpenAt>);
impl<D: Driver> Future for Open<D> {
type Output = io::Result<File<D>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut inner = unsafe {
self.map_unchecked_mut(|this| &mut this.0)
};
let (_, ready) = ready!(inner.as_mut().poll(cx));
let fd = ready? as i32;
Poll::Ready(Ok(File::from_fd(fd, inner.driver().clone())))
}
}

View File

@ -0,0 +1,20 @@
mod completion;
use completion::Completion;
mod cancellation;
use cancellation::Cancellation;
mod ring;
use ring::Ring;
mod events;
use events::Event;
mod submission;
use submission::Submission;
mod driver;
use driver::Driver;
mod fs;

View File

@ -0,0 +1,176 @@
use std::{io, mem};
use std::pin::Pin;
use std::task::{Context, Poll};
use iou::{SQE, SQEs};
use super::{driver, Driver};
use super::Completion;
use futures_core::ready;
use crate::sys::linux::io_uring::cancellation::Cancellation;
///
pub struct Ring<D: Driver> {
state: State,
driver: D,
}
enum State {
Empty,
Prepared(Completion),
Submitted(Completion),
Cancelled(u64),
Lost,
}
impl<D: Driver> Ring<D> {
pub fn new(driver: D) -> Self {
Self {
state: State::Empty,
driver,
}
}
pub fn driver(&self) -> &D {
&self.driver
}
fn split_pinned(self: Pin<&mut Self>) -> (&mut State, Pin<&mut D>) {
unsafe {
let this = Pin::get_unchecked_mut(self);
(&mut this.state, Pin::new_unchecked(&mut this.driver))
}
}
pub fn poll(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>,
count: u32,
prepare: impl for<'sq> FnOnce(&mut SQEs<'sq>) -> SQE<'sq>,
) -> Poll<io::Result<u32>> {
match self.state {
State::Empty => {
ready!(self.as_mut().poll_prepare_empty(ctx, count, prepare));
ready!(self.as_mut().poll_submit(ctx));
self.poll_complete(ctx)
},
State::Cancelled(previous) => {
ready!(self.as_mut().poll_prepare_canceled(ctx, previous, count, prepare));
ready!(self.as_mut().poll_submit(ctx));
self.poll_complete(ctx)
},
State::Prepared(_) => match self.as_mut().poll_complete(ctx) {
Poll::Pending => {
ready!(self.as_mut().poll_submit(ctx));
self.poll_complete(ctx)
},
ready @ Poll::Ready(_) => ready,
},
State::Submitted(_) => self.poll_complete(ctx),
State::Lost => panic!("Lost events, ring is now in an invalid state"),
}
}
fn poll_prepare_empty(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
count: u32,
prepare: impl for<'sq> FnOnce(&mut SQEs<'sq>) -> SQE<'sq>,
) -> Poll<()> {
let (state, driver) = self.split_pinned();
let completion = ready!(driver.poll_prepare(ctx, count, |mut sqes, ctx| {
*state = State::Lost;
let sqe = prepare(&mut sqes);
let completion = driver::Completion::new(sqe, sqes, ctx);
completion
}));
*state = State::Prepared(completion.into_inner());
Poll::Ready(())
}
fn poll_prepare_canceled(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
previous: u64,
count: u32,
prepare: impl for<'sq> FnOnce(&mut SQEs<'sq>) -> SQE<'sq>,
) -> Poll<()> {
let (mut state, driver) = self.split_pinned();
let completion = ready!(driver.poll_prepare(ctx, count + 1, |mut sqes, ctx| {
*state = State::Lost;
unsafe { sqes.hard_linked().next().unwrap().prep_cancel(previous, 0); }
let sqe = prepare(&mut sqes);
let completion = driver::Completion::new(sqe, sqes, ctx);
completion
}));
*state = State::Prepared(completion.into_inner());
Poll::Ready(())
}
fn poll_submit(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<()> {
let (state, driver) = self.split_pinned();
let _ = ready!(driver.poll_submit(ctx));
if let State::Prepared(completion) | State::Submitted(completion)
= mem::replace(state, State::Lost)
{
*state = State::Submitted(completion);
Poll::Ready(())
} else {
unreachable!();
}
}
fn poll_complete(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<u32>> {
let (state, driver) = self.split_pinned();
match mem::replace(state, State::Lost) {
State::Prepared(completion) => {
ready!(driver.poll_complete(ctx, completion.addr()));
match completion.check(ctx.waker()) {
Ok(result) => {
*state = State::Empty;
Poll::Ready(result)
},
Err(completion) => {
*state = State::Prepared(completion);
Poll::Pending
}
}
},
State::Submitted(completion) => {
ready!(driver.poll_complete(ctx, completion.addr()));
match completion.check(ctx.waker()) {
Ok(result) => {
*state = State::Empty;
Poll::Ready(result)
},
Err(completion) => {
*state = State::Submitted(completion);
Poll::Pending
}
}
},
_ => unreachable!(),
}
}
pub fn cancel_pinned(self: Pin<&mut Self>, cancellation: Cancellation) {
self.split_pinned().0.cancel(cancellation);
}
pub fn cancel(&mut self, cancellation: Cancellation) {
self.state.cancel(cancellation)
}
}
impl State {
fn cancel(&mut self, cancellation: Cancellation) {
match mem::replace(self, State::Lost) {
State::Submitted(completion) | State::Prepared(completion) => {
*self = State::Cancelled(completion.addr());
completion.cancel(cancellation);
},
state=> {
*self = state;
}
}
}
}

View File

@ -0,0 +1,48 @@
use std::future::Future;
use futures_core::ready;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use super::{Ring, Driver, Event};
pub struct Submission<D: Driver, E: Event> {
ring: Ring<D>,
event: Option<E>,
}
impl<D: Driver, E: Event> Submission<D, E> {
pub fn new(driver: D, event: E) -> Self {
Self {
ring: Ring::new(driver),
event: Some(event),
}
}
pub fn driver(&self) -> &D {
self.ring.driver()
}
fn split_pinned(self: Pin<&mut Self>) -> (Pin<&mut Ring<D>>, &mut Option<E>) {
unsafe {
let this = Pin::get_unchecked_mut(self);
(Pin::new_unchecked(&mut this.ring), &mut this.event)
}
}
}
impl<D: Driver, E: Event> Future for Submission<D, E> {
type Output = (E, io::Result<u32>);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (ring, event) = self.split_pinned();
let result = if let Some(event) = event {
let count = E::sqes_needed();
ready!(ring.poll(cx, count, |sqes| unsafe { event.prepare(sqes) }))
} else {
panic!("polled Submission after completion")
};
Poll::Ready((event.take().unwrap(), result))
}
}

View File

@ -0,0 +1,5 @@
#[cfg(feature = "io_uring")]
mod io_uring;
#[cfg(feature = "epoll")]
mod epoll;

View File

@ -0,0 +1,2 @@
#[cfg(target_os = "linux")]
mod linux;

View File

@ -0,0 +1,72 @@
use std::io;
use std::os::unix::prelude::RawFd;
use libc::{c_ulong, c_long};
use crate::ctypes::{IORING_ENTER, IORING_REGISTER_OP};
use super::ctypes::Params;
const ENOMEM: i32 = 12;
const SYS_SETUP: c_long = libc::SYS_io_uring_setup;
const SYS_ENTER: c_long = libc::SYS_io_uring_enter;
const SYS_REGISTER: c_long = libc::SYS_io_uring_register;
/// Syscall io_uring_setup, creating the io_uring ringbuffers
pub fn setup(entries: u32, params: *mut Params) -> io::Result<RawFd> {
assert!((0 < entries && entries <= 4096), "entries must be between 1 and 4096");
assert_eq!(entries.count_ones(), 1, "entries must be a power of two");
let retval = unsafe {
libc::syscall(SYS_SETUP, entries, params)
};
if retval < 0 {
let err = io::Error::last_os_error();
if let Some(ENOMEM) = err.raw_os_error() {
return Err(io::Error::new(
io::ErrorKind::Other,
"Failed to lock enough memory. You may need to increase the memlock limit using \
rlimits"
));
}
return Err(err);
} else {
Ok(retval as RawFd)
}
}
static_assertions::assert_eq_size!(i64, c_long);
/// enter io_uring, returning when at least `min_complete` events have been completed
pub fn enter(fd: RawFd,
to_submit: u32,
min_complete: u32,
flags: IORING_ENTER,
args: *const libc::c_void,
argsz: libc::size_t
) -> io::Result<i64> {
let retval = unsafe {
libc::syscall(SYS_ENTER, fd, to_submit, min_complete, flags.bits(), args, argsz)
};
if retval < 0 {
let err = io::Error::last_os_error();
Err(err)
} else {
Ok(retval)
}
}
/// Register buffers or file descriptors with the kernel for faster usage and not having to use
/// atomics.
pub fn register(fd: RawFd, opcode: IORING_REGISTER_OP, args: *const (), nargs: u32)
-> io::Result<i64>
{
let retval = unsafe {
libc::syscall(SYS_REGISTER, fd, opcode, args, nargs)
};
if retval < 0 {
let err = io::Error::last_os_error();
Err(err)
} else {
Ok(retval)
}
}

View File

@ -1,67 +0,0 @@
#![feature(test)]
extern crate test;
use bastion_executor::blocking;
use lightproc::proc_stack::ProcStack;
use std::thread;
use std::time::Duration;
use test::Bencher;
#[cfg(feature = "tokio-runtime")]
mod tokio_benchs {
use super::*;
#[bench]
fn blocking(b: &mut Bencher) {
tokio_test::block_on(async { _blocking(b) });
}
#[bench]
fn blocking_single(b: &mut Bencher) {
tokio_test::block_on(async {
_blocking_single(b);
});
}
}
#[cfg(not(feature = "tokio-runtime"))]
mod no_tokio_benchs {
use super::*;
#[bench]
fn blocking(b: &mut Bencher) {
_blocking(b);
}
#[bench]
fn blocking_single(b: &mut Bencher) {
_blocking_single(b);
}
}
// Benchmark for a 10K burst task spawn
fn _blocking(b: &mut Bencher) {
b.iter(|| {
(0..10_000)
.map(|_| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
},
ProcStack::default(),
)
})
.collect::<Vec<_>>()
});
}
// Benchmark for a single blocking task spawn
fn _blocking_single(b: &mut Bencher) {
b.iter(|| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
},
ProcStack::default(),
)
});
}

View File

@ -1,69 +0,0 @@
#![feature(test)]
extern crate test;
use bastion_executor::blocking;
use bastion_executor::run::run;
use futures::future::join_all;
use lightproc::proc_stack::ProcStack;
use std::thread;
use std::time::Duration;
use test::Bencher;
#[cfg(feature = "tokio-runtime")]
mod tokio_benchs {
use super::*;
#[bench]
fn blocking(b: &mut Bencher) {
tokio_test::block_on(async { _blocking(b) });
}
#[bench]
fn blocking_single(b: &mut Bencher) {
tokio_test::block_on(async {
_blocking_single(b);
});
}
}
#[cfg(not(feature = "tokio-runtime"))]
mod no_tokio_benchs {
use super::*;
#[bench]
fn blocking(b: &mut Bencher) {
_blocking(b);
}
#[bench]
fn blocking_single(b: &mut Bencher) {
_blocking_single(b);
}
}
// Benchmark for a 10K burst task spawn
fn _blocking(b: &mut Bencher) {
b.iter(|| {
(0..10_000)
.map(|_| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
},
ProcStack::default(),
)
})
.collect::<Vec<_>>()
});
}
// Benchmark for a single blocking task spawn
fn _blocking_single(b: &mut Bencher) {
b.iter(|| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
},
ProcStack::default(),
)
});
}

View File

@ -1,165 +0,0 @@
//!
//! Pool of threads to run heavy processes
//!
//! We spawn futures onto the pool with [`spawn_blocking`] method of global run queue or
//! with corresponding [`Worker`]'s spawn method.
//!
//! [`Worker`]: crate::run_queue::Worker
use crate::thread_manager::{DynamicPoolManager, DynamicRunner};
use crossbeam_channel::{unbounded, Receiver, Sender};
use lazy_static::lazy_static;
use lightproc::lightproc::LightProc;
use lightproc::recoverable_handle::RecoverableHandle;
use once_cell::sync::{Lazy, OnceCell};
use std::future::Future;
use std::iter::Iterator;
use std::time::Duration;
use std::{env, thread};
use tracing::trace;
/// If low watermark isn't configured this is the default scaler value.
/// This value is used for the heuristics of the scaler
const DEFAULT_LOW_WATERMARK: u64 = 2;
const THREAD_RECV_TIMEOUT: Duration = Duration::from_millis(100);
/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
pub fn spawn_blocking<F, R>(future: F) -> RecoverableHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (task, handle) = LightProc::recoverable(future, schedule);
task.schedule();
handle
}
#[derive(Debug)]
struct BlockingRunner {
// We keep a handle to the tokio runtime here to make sure
// it will never be dropped while the DynamicPoolManager is alive,
// In case we need to spin up some threads.
#[cfg(feature = "tokio-runtime")]
runtime_handle: tokio::runtime::Handle,
}
impl DynamicRunner for BlockingRunner {
fn run_static(&self, park_timeout: Duration) -> ! {
loop {
while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) {
trace!("static thread: running task");
self.run(task);
}
trace!("static: empty queue, parking with timeout");
thread::park_timeout(park_timeout);
}
}
fn run_dynamic(&self, parker: impl Fn()) -> ! {
loop {
while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) {
trace!("dynamic thread: running task");
self.run(task);
}
trace!(
"dynamic thread: parking - {:?}",
std::thread::current().id()
);
parker();
}
}
fn run_standalone(&self) {
while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) {
self.run(task);
}
trace!("standalone thread: quitting.");
}
}
impl BlockingRunner {
fn run(&self, task: LightProc) {
#[cfg(feature = "tokio-runtime")]
{
self.runtime_handle.spawn_blocking(|| task.run());
}
#[cfg(not(feature = "tokio-runtime"))]
{
task.run();
}
}
}
/// Pool interface between the scheduler and thread pool
struct Pool {
sender: Sender<LightProc>,
receiver: Receiver<LightProc>,
}
static DYNAMIC_POOL_MANAGER: OnceCell<DynamicPoolManager<BlockingRunner>> = OnceCell::new();
static POOL: Lazy<Pool> = Lazy::new(|| {
#[cfg(feature = "tokio-runtime")]
{
let runner = BlockingRunner {
// We use current() here instead of try_current()
// because we want bastion to crash as soon as possible
// if there is no available runtime.
runtime_handle: tokio::runtime::Handle::current(),
};
DYNAMIC_POOL_MANAGER
.set(DynamicPoolManager::new(*low_watermark() as usize, runner))
.expect("couldn't create dynamic pool manager");
}
#[cfg(not(feature = "tokio-runtime"))]
{
let runner = BlockingRunner {};
DYNAMIC_POOL_MANAGER
.set(DynamicPoolManager::new(*low_watermark() as usize, runner))
.expect("couldn't create dynamic pool manager");
}
DYNAMIC_POOL_MANAGER
.get()
.expect("couldn't get static pool manager")
.initialize();
let (sender, receiver) = unbounded();
Pool { sender, receiver }
});
/// Enqueues work, attempting to send to the thread pool in a
/// nonblocking way and spinning up needed amount of threads
/// based on the previous statistics without relying on
/// if there is not a thread ready to accept the work or not.
fn schedule(t: LightProc) {
if let Err(err) = POOL.sender.try_send(t) {
// We were not able to send to the channel without
// blocking.
POOL.sender.send(err.into_inner()).unwrap();
}
// Add up for every incoming scheduled task
DYNAMIC_POOL_MANAGER.get().unwrap().increment_frequency();
}
///
/// Low watermark value, defines the bare minimum of the pool.
/// Spawns initial thread set.
/// Can be configurable with env var `BASTION_BLOCKING_THREADS` at runtime.
#[inline]
fn low_watermark() -> &'static u64 {
lazy_static! {
static ref LOW_WATERMARK: u64 = {
env::var_os("BASTION_BLOCKING_THREADS")
.map(|x| x.to_str().unwrap().parse::<u64>().unwrap())
.unwrap_or(DEFAULT_LOW_WATERMARK)
};
}
&*LOW_WATERMARK
}

View File

@ -1,5 +0,0 @@
#[derive(Debug)]
pub struct ProcStack {
}

View File

@ -1,67 +0,0 @@
//!
//! Where workers went to parking while no workload is in their worker queue.
//!
//! If a workload received pool will wake them up.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Condvar, Mutex};
/// The place where worker threads go to sleep.
///
/// Similar to how thread parking works, if a notification comes up while no threads are sleeping,
/// the next thread that attempts to go to sleep will pick up the notification immediately.
#[derive(Debug)]
#[allow(clippy::mutex_atomic)]
pub struct Sleepers {
/// How many threads are currently a sleep.
sleep: Mutex<usize>,
/// A condvar for notifying sleeping threads.
wake: Condvar,
/// Set to `true` if a notification came up while nobody was sleeping.
notified: AtomicBool,
}
#[allow(clippy::mutex_atomic)]
impl Default for Sleepers {
/// Creates a new `Sleepers`.
fn default() -> Self {
Self {
sleep: Mutex::new(0),
wake: Condvar::new(),
notified: AtomicBool::new(false),
}
}
}
#[allow(clippy::mutex_atomic)]
impl Sleepers {
/// Creates a new `Sleepers`.
pub fn new() -> Self {
Self::default()
}
/// Puts the current thread to sleep.
pub fn wait(&self) {
let mut sleep = self.sleep.lock().unwrap();
if !self.notified.swap(false, Ordering::SeqCst) {
*sleep += 1;
std::mem::drop(self.wake.wait(sleep).unwrap());
}
}
/// Notifies one thread.
pub fn notify_one(&self) {
if !self.notified.load(Ordering::SeqCst) {
let mut sleep = self.sleep.lock().unwrap();
if *sleep > 0 {
*sleep -= 1;
self.wake.notify_one();
} else {
self.notified.store(true, Ordering::SeqCst);
}
}
}
}

View File

@ -1,93 +0,0 @@
//!
//! SMP parallelism based cache affine worker implementation
//!
//! This worker implementation relies on worker run queue statistics which are hold in the pinned global memory
//! where workload distribution calculated and amended to their own local queues.
use crate::pool;
use lightproc::prelude::*;
use std::cell::Cell;
use std::ptr;
use std::time::Duration;
use crossbeam_deque::{Stealer, Worker};
use crate::proc_stack::ProcStack;
/// The timeout we'll use when parking before an other Steal attempt
pub const THREAD_PARK_TIMEOUT: Duration = Duration::from_millis(1);
thread_local! {
static STACK: Cell<*const ProcStack> = Cell::new(ptr::null_mut());
}
///
/// Set the current process's stack during the run of the future.
pub(crate) fn set_stack<F, R>(stack: *const ProcStack, f: F) -> R
where
F: FnOnce() -> R,
{
struct ResetStack<'a>(&'a Cell<*const ProcStack>);
impl Drop for ResetStack<'_> {
fn drop(&mut self) {
self.0.set(ptr::null());
}
}
STACK.with(|st| {
st.set(stack);
// create a guard to reset STACK even if the future panics. This is important since we
// must not drop the pointed-to ProcStack here in any case.
let _guard = ResetStack(st);
f()
})
}
/*
pub(crate) fn get_proc_stack<F, R>(f: F) -> Option<R>
where
F: FnOnce(&ProcStack) -> R,
{
let res = STACK.try_with(|st| unsafe { st.get().as_ref().map(f) });
match res {
Ok(Some(val)) => Some(val),
Ok(None) | Err(_) => None,
}
}
///
/// Get the stack currently in use for this thread
pub fn current() -> ProcStack {
get_proc_stack(|proc| proc.clone())
.expect("`proc::current()` called outside the context of the proc")
}
*/
pub(crate) fn schedule(proc: LightProc) {
pool::schedule(proc)
}
/// A worker thread running futures locally and stealing work from other workers if it runs empty.
pub struct WorkerThread {
queue: Worker<LightProc>,
}
impl WorkerThread {
pub fn new() -> Self {
Self {
queue: Worker::new_fifo(),
}
}
pub fn stealer(&self) -> Stealer<LightProc> {
self.queue.stealer()
}
pub fn tick(&self) {
if let Some(lightproc) = self.queue.pop() {
lightproc.run()
}
}
}

View File

@ -1,26 +0,0 @@
#[cfg(test)]
mod tests {
use bastion_executor::{placement, pool};
#[test]
fn affinity_replacement() {
let core_ids = placement::get_core_ids().unwrap();
dbg!(core_ids);
}
#[cfg(feature = "tokio-runtime")]
mod tokio_tests {
#[tokio::test]
async fn pool_check() {
super::pool::get();
}
}
#[cfg(not(feature = "tokio-runtime"))]
mod no_tokio_tests {
#[test]
fn pool_check() {
super::pool::get();
}
}
}

View File

@ -1,78 +0,0 @@
use crossbeam::channel::{unbounded, Sender};
use futures_executor as executor;
use lazy_static::lazy_static;
use lightproc::prelude::*;
use std::future::Future;
use std::sync::{Arc, Mutex};
use std::thread;
#[derive(Copy, Clone)]
pub struct GlobalState {
pub amount: usize,
}
fn spawn_on_thread<F, R>(future: F, gs: Arc<Mutex<GlobalState>>)
-> RecoverableHandle<Arc<Mutex<GlobalState>>, R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
lazy_static! {
// A channel that holds scheduled procs.
static ref QUEUE: Sender<LightProc> = {
let (sender, receiver) = unbounded::<LightProc>();
// Start the executor thread.
thread::spawn(move || {
for proc in receiver {
proc.run();
}
});
sender
};
}
let stack = ProcStack::build(Box::new(gs))
.initialize(Callback::wrap(|s: &mut Arc<Mutex<GlobalState>>| {
println!("initializing");
s.clone().lock().unwrap().amount += 1;
}))
.completed(Callback::wrap(|s: &mut Arc<Mutex<GlobalState>>| {
println!("completed");
s.clone().lock().unwrap().amount += 2;
}));
let schedule = |t| QUEUE.send(t).unwrap();
let (proc, handle) = LightProc::recoverable(future, schedule, stack);
let handle = handle
.on_panic(|s: &mut Arc<Mutex<GlobalState>>, _e| {
println!("panicked");
s.clone().lock().unwrap().amount += 3;
});
proc.schedule();
handle
}
fn main() {
let gs = Arc::new(Mutex::new(GlobalState { amount: 0 }));
let handle = spawn_on_thread(
async {
panic!("Panic here!");
},
gs.clone(),
);
executor::block_on(handle);
// 0 at the start
// +1 before the start
// +2 after panic occurs and completion triggers
// +3 after panic triggers
let amount = gs.lock().unwrap().amount;
assert_eq!(amount, 6);
println!("Amount: {}", amount);
}

View File

@ -1,15 +0,0 @@
use lightproc::proc_stack::ProcStack;
use lightproc::proc_state::EmptyProcState;
#[test]
fn stack_copy() {
let stack = ProcStack::default()
.with_pid(12)
.with_after_panic(|_s: &mut EmptyProcState| {
println!("After panic!");
});
let stack2 = stack;
assert_eq!(stack2.get_pid(), 12);
}