diff --git a/runtime/lightproc/Cargo.toml b/runtime/lightproc/Cargo.toml new file mode 100644 index 0000000..52403ea --- /dev/null +++ b/runtime/lightproc/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "lightproc" +version = "0.3.0" +publish = false +description = "Lightweight process abstraction for Rust" +authors = [] +keywords = ["fault-tolerant", "runtime", "actor", "system", "lightweight-process"] +categories = ["concurrency", "asynchronous"] +readme = "README.md" +license = "Apache-2.0/MIT" +edition = "2021" + +[dependencies] +crossbeam-utils = "0.8" +pin-utils = "0.1.0" +bitfield = "0.13.2" +bitflags = "1.3.2" + +[dev-dependencies] +crossbeam = "0.8" +futures-executor = "0.3" +lazy_static = "1.4.0" +async-std = "1.5" \ No newline at end of file diff --git a/runtime/lightproc/README.md b/runtime/lightproc/README.md new file mode 100644 index 0000000..aa02590 --- /dev/null +++ b/runtime/lightproc/README.md @@ -0,0 +1,55 @@ +# LightProc + + + + + + + + + + + + + + + + + + + + + + + + + + +
Latest Release + + Crates.io + +
License + + Crates.io + +
Build Status + + Build Status + +
Downloads + + Crates.io + +
Discord + + + +
+ +LightProc is Lightweight Process abstraction for Rust. + +Beneath the implementation: +* It uses futures with lifecycle callbacks to implement Erlang like processes. +* Contains basic pid(process id) to identify processes. +* All panics inside futures are propagated to upper layers. diff --git a/runtime/lightproc/examples/proc_panic.rs b/runtime/lightproc/examples/proc_panic.rs new file mode 100644 index 0000000..e3313a1 --- /dev/null +++ b/runtime/lightproc/examples/proc_panic.rs @@ -0,0 +1,61 @@ +use std::any::Any; +use std::fmt::Debug; +use std::ops::Deref; +use crossbeam::channel::{unbounded, Sender}; +use futures_executor as executor; +use lazy_static::lazy_static; +use lightproc::prelude::*; +use std::future::Future; +use std::thread; + +fn spawn_on_thread(future: F) -> RecoverableHandle +where + F: Future + Send + 'static, + R: Debug + Send + 'static, +{ + lazy_static! { + // A channel that holds scheduled procs. + static ref QUEUE: Sender = { + let (sender, receiver) = unbounded::(); + + // Start the executor thread. + thread::spawn(move || { + for proc in receiver { + proc.run(); + } + }); + + sender + }; + } + + let schedule = |t| (QUEUE.deref()).send(t).unwrap(); + let (proc, handle) = LightProc::recoverable( + future, + schedule + ); + + let handle = handle + .on_panic(|err: Box| { + match err.downcast::<&'static str>() { + Ok(reason) => println!("Future panicked: {}", &reason), + Err(err) => + println!("Future panicked with a non-text reason of typeid {:?}", + err.type_id()), + } + }); + + proc.schedule(); + + handle +} + +fn main() { + let handle = spawn_on_thread(async { + panic!("Panic here!"); + }); + + executor::block_on(handle); + + println!("But see, despite the inner future panicking we can continue executing as normal."); +} diff --git a/runtime/lightproc/examples/proc_run.rs b/runtime/lightproc/examples/proc_run.rs new file mode 100644 index 0000000..e8fb87f --- /dev/null +++ b/runtime/lightproc/examples/proc_run.rs @@ -0,0 +1,46 @@ +use crossbeam::channel; +use futures_executor as executor; +use lightproc::prelude::*; +use std::future::Future; +use std::sync::Arc; +use std::thread; +use std::time::Duration; + +fn spawn_on_thread(fut: F) -> ProcHandle +where + F: Future + Send + 'static, + R: Send + 'static, +{ + let (sender, receiver) = channel::unbounded(); + let sender = Arc::new(sender); + let s = Arc::downgrade(&sender); + + let future = async move { + let _ = sender; + fut.await + }; + + let schedule = move |t| s.upgrade().unwrap().send(t).unwrap(); + let (proc, handle) = LightProc::build( + future, + schedule, + ); + + proc.schedule(); + + thread::spawn(move || { + for proc in receiver { + proc.run(); + } + }); + + handle +} + +fn main() { + executor::block_on(spawn_on_thread(async { + println!("Sleeping!"); + async_std::task::sleep(Duration::from_secs(1)).await; + println!("Done sleeping"); + })); +} diff --git a/runtime/lightproc/examples/state_change.rs b/runtime/lightproc/examples/state_change.rs new file mode 100644 index 0000000..b27ccfc --- /dev/null +++ b/runtime/lightproc/examples/state_change.rs @@ -0,0 +1,78 @@ +use crossbeam::channel::{unbounded, Sender}; +use futures_executor as executor; +use lazy_static::lazy_static; +use lightproc::prelude::*; + +use std::future::Future; +use std::sync::{Arc, Mutex}; +use std::thread; + +#[derive(Copy, Clone)] +pub struct GlobalState { + pub amount: usize, +} + +fn spawn_on_thread(future: F, gs: Arc>) + -> RecoverableHandle>, R> +where + F: Future + Send + 'static, + R: Send + 'static, +{ + lazy_static! { + // A channel that holds scheduled procs. + static ref QUEUE: Sender = { + let (sender, receiver) = unbounded::(); + + // Start the executor thread. + thread::spawn(move || { + for proc in receiver { + proc.run(); + } + }); + + sender + }; + } + + let stack = ProcStack::build(Box::new(gs)) + .initialize(Callback::wrap(|s: &mut Arc>| { + println!("initializing"); + s.clone().lock().unwrap().amount += 1; + })) + .completed(Callback::wrap(|s: &mut Arc>| { + println!("completed"); + s.clone().lock().unwrap().amount += 2; + })); + + let schedule = |t| QUEUE.send(t).unwrap(); + let (proc, handle) = LightProc::recoverable(future, schedule, stack); + let handle = handle + .on_panic(|s: &mut Arc>, _e| { + println!("panicked"); + s.clone().lock().unwrap().amount += 3; + }); + + proc.schedule(); + + handle +} + +fn main() { + let gs = Arc::new(Mutex::new(GlobalState { amount: 0 })); + let handle = spawn_on_thread( + async { + panic!("Panic here!"); + }, + gs.clone(), + ); + + executor::block_on(handle); + + // 0 at the start + // +1 before the start + // +2 after panic occurs and completion triggers + // +3 after panic triggers + let amount = gs.lock().unwrap().amount; + assert_eq!(amount, 6); + println!("Amount: {}", amount); +} diff --git a/runtime/lightproc/src/catch_unwind.rs b/runtime/lightproc/src/catch_unwind.rs new file mode 100644 index 0000000..c7259b8 --- /dev/null +++ b/runtime/lightproc/src/catch_unwind.rs @@ -0,0 +1,36 @@ +use pin_utils::unsafe_pinned; +use std::any::Any; +use std::future::Future; +use std::panic::{catch_unwind, AssertUnwindSafe, UnwindSafe}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[derive(Debug)] +pub(crate) struct CatchUnwind +where + F: Future, +{ + future: F, +} + +impl CatchUnwind +where + F: Future + UnwindSafe, +{ + unsafe_pinned!(future: F); + + pub(crate) fn new(future: F) -> CatchUnwind { + CatchUnwind { future } + } +} + +impl Future for CatchUnwind +where + F: Future + UnwindSafe, +{ + type Output = Result>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + catch_unwind(AssertUnwindSafe(|| self.future().poll(cx)))?.map(Ok) + } +} diff --git a/runtime/lightproc/src/layout_helpers.rs b/runtime/lightproc/src/layout_helpers.rs new file mode 100644 index 0000000..2af15db --- /dev/null +++ b/runtime/lightproc/src/layout_helpers.rs @@ -0,0 +1,28 @@ +use std::alloc::Layout; +use std::io::{Error, ErrorKind}; + +#[inline] +pub(crate) fn extend(layout: Layout, next: Layout) -> (Layout, usize) { + let new_align = std::cmp::max(layout.align(), next.align()); + let pad = padding_needed_for(layout, next.align()); + + let offset = layout + .size() + .checked_add(pad) + .ok_or_else(|| Error::new(ErrorKind::Other, "Padding overflow check failed")) + .unwrap(); + let new_size = offset + .checked_add(next.size()) + .ok_or_else(|| Error::new(ErrorKind::Other, "New size can't be computed")) + .unwrap(); + + let layout = Layout::from_size_align(new_size, new_align).unwrap(); + (layout, offset) +} + +#[inline] +pub(crate) fn padding_needed_for(layout: Layout, align: usize) -> usize { + let len = layout.size(); + let len_rounded_up = len.wrapping_add(align).wrapping_sub(1) & !align.wrapping_sub(1); + len_rounded_up.wrapping_sub(len) +} diff --git a/runtime/lightproc/src/lib.rs b/runtime/lightproc/src/lib.rs new file mode 100644 index 0000000..fc325b0 --- /dev/null +++ b/runtime/lightproc/src/lib.rs @@ -0,0 +1,43 @@ +//! +//! +//! LightProc is Lightweight Process abstraction for Rust. +//! +//! Beneath the implementation: +//! * It uses futures with lifecycle callbacks to implement Erlang like processes. +//! * Contains basic pid(process id) to identify processes. +//! * All panics inside futures are propagated to upper layers. +//! +//! The naming convention of this crate comes from [Erlang's Lightweight Processes]. +//! +//! [Erlang's Lightweight Processes]: https://en.wikipedia.org/wiki/Light-weight_process +//! + +// Force missing implementations +#![forbid(missing_docs)] +#![forbid(missing_debug_implementations)] +#![forbid(unused_import_braces)] +#![forbid(unused_imports)] +#![forbid(unused_must_use)] +#![forbid(unused_variables)] + +mod catch_unwind; +mod layout_helpers; +mod proc_data; +mod proc_ext; +mod proc_layout; +mod proc_vtable; +mod raw_proc; +mod state; + +pub mod lightproc; +pub mod proc_handle; +pub mod recoverable_handle; + +/// The lightproc prelude. +/// +/// The prelude re-exports lightproc structs and handles from this crate. +pub mod prelude { + pub use crate::lightproc::*; + pub use crate::proc_handle::*; + pub use crate::recoverable_handle::*; +} diff --git a/runtime/lightproc/src/lightproc.rs b/runtime/lightproc/src/lightproc.rs new file mode 100644 index 0000000..e271a7a --- /dev/null +++ b/runtime/lightproc/src/lightproc.rs @@ -0,0 +1,192 @@ +//! +//! Lightweight process implementation which enables users +//! to create either panic recoverable process or +//! ordinary process. +//! +//! Lightweight processes needs a stack to use their lifecycle +//! operations like `before_start`, `after_complete` and more... +//! +//! # Example Usage +//! +//! ```rust +//! use lightproc::prelude::*; +//! +//! // ... future that does work +//! let future = async { +//! println!("Doing some work"); +//! }; +//! +//! // ... basic schedule function with no waker logic +//! fn schedule_function(proc: LightProc) {;} +//! +//! // ... creating a recoverable process +//! let panic_recoverable = LightProc::recoverable( +//! future, +//! schedule_function, +//! ); +//! ``` + +use crate::proc_data::ProcData; +use crate::proc_ext::ProcFutureExt; +use crate::proc_handle::ProcHandle; +use crate::raw_proc::RawProc; +use crate::recoverable_handle::RecoverableHandle; +use std::fmt::{self, Debug, Formatter}; +use std::future::Future; +use std::mem; +use std::panic::AssertUnwindSafe; +use std::ptr::NonNull; + +/// Shared functionality for both Send and !Send LightProc +pub struct LightProc { + /// A pointer to the heap-allocated proc. + pub(crate) raw_proc: NonNull<()>, +} + +// LightProc is both Sync and Send because it explicitly handles synchronization internally: +// The state of a `LightProc` is only modified atomically guaranteeing a consistent view from all +// threads. Existing handles are atomically reference counted so the proc itself will not be dropped +// until all pointers to it are themselves dropped. +// However, if the future or result inside the LightProc is !Send the executor must ensure that +// the `schedule` function does not move the LightProc to a different thread. +unsafe impl Send for LightProc {} +unsafe impl Sync for LightProc {} + +impl LightProc { + /// Creates a recoverable process which will catch panics in the given future. + /// + /// # Example + /// ```rust + /// # use std::any::Any; + /// # use lightproc::prelude::*; + /// # + /// # // ... basic schedule function with no waker logic + /// # fn schedule_function(proc: LightProc) {;} + /// # + /// let future = async { + /// panic!("oh no!"); + /// }; + /// // ... creating a recoverable process + /// let (proc, handle) = LightProc::recoverable( + /// future, + /// schedule_function, + /// ); + /// let handle = handle.on_panic(|s: &mut EmptyProcState, e: Box| { + /// let reason = e.downcast::(); + /// println!("future panicked!: {}", &reason); + /// }); + /// ``` + pub fn recoverable(future: F, schedule: S) -> (Self, RecoverableHandle) + where F: Future + 'static, + R: 'static, + S: Fn(LightProc) + 'static, + { + let recovery_future = AssertUnwindSafe(future).catch_unwind(); + let (proc, handle) = Self::build(recovery_future, schedule); + (proc, RecoverableHandle::new(handle)) + } + + /// + /// Creates a process which will stop its execution on occurrence of panic. + /// + /// # Example + /// ```rust + /// # use lightproc::prelude::*; + /// # + /// # // ... future that does work + /// # let future = async { + /// # println!("Doing some work"); + /// # }; + /// # + /// # // ... basic schedule function with no waker logic + /// # fn schedule_function(proc: LightProc) {;} + /// # + /// # // ... process stack with a lifecycle callback + /// # let proc_stack = + /// # ProcStack::default() + /// # .with_after_panic(|s: &mut EmptyProcState| { + /// # println!("After panic started!"); + /// # }); + /// # + /// // ... creating a standard process + /// let standard = LightProc::build( + /// future, + /// schedule_function, + /// ); + /// ``` + pub fn build(future: F, schedule: S) -> (Self, ProcHandle) + where F: Future + 'static, + R: 'static, + S: Fn(LightProc) + 'static, + { + let raw_proc = RawProc::allocate(future, schedule); + let proc = LightProc { raw_proc }; + let handle = ProcHandle::new(raw_proc); + (proc, handle) + } + + /// + /// Schedule the lightweight process with passed `schedule` function at the build time. + pub fn schedule(self) { + let ptr = self.raw_proc.as_ptr(); + let pdata = ptr as *const ProcData; + mem::forget(self); + + unsafe { + ((*pdata).vtable.schedule)(ptr); + } + } + + /// Run this LightProc. + /// + /// "Running" a lightproc means ticking it once and if it doesn't complete + /// immediately re-scheduling it as soon as it's Waker wakes it back up. + pub fn run(self) { + let ptr = self.raw_proc.as_ptr(); + let pdata = ptr as *const ProcData; + mem::forget(self); + + unsafe { + ((*pdata).vtable.tick)(ptr); + } + } + + /// Cancel polling the lightproc's inner future, thus cancelling the proc itself. + pub fn cancel(&self) { + let ptr = self.raw_proc.as_ptr(); + let pdata = ptr as *const ProcData; + + unsafe { + (*pdata).cancel(); + } + } +} + +impl Debug for LightProc { + fn fmt(&self, fmt: &mut Formatter) -> fmt::Result { + let ptr = self.raw_proc.as_ptr(); + let pdata = ptr as *const ProcData; + + fmt.debug_struct("LightProc") + .field("pdata", unsafe { &(*pdata) }) + .finish() + } +} + +impl Drop for LightProc { + fn drop(&mut self) { + let ptr = self.raw_proc.as_ptr(); + let pdata = ptr as *const ProcData; + + unsafe { + // Cancel the proc. + (*pdata).cancel(); + + // Drop the future. + ((*pdata).vtable.drop_future)(ptr); + + // Drop the proc reference. + ((*pdata).vtable.decrement)(ptr); + } + } +} diff --git a/runtime/lightproc/src/proc_data.rs b/runtime/lightproc/src/proc_data.rs new file mode 100644 index 0000000..9c9b7c6 --- /dev/null +++ b/runtime/lightproc/src/proc_data.rs @@ -0,0 +1,148 @@ +use crate::proc_vtable::ProcVTable; +use crate::state::*; +use crossbeam_utils::Backoff; +use std::cell::Cell; +use std::fmt::{self, Debug, Formatter}; +use std::sync::atomic::Ordering; +use std::task::Waker; + +/// The pdata of a proc. +/// +/// This pdata is stored right at the beginning of every heap-allocated proc. +pub(crate) struct ProcData { + /// Current state of the proc. + /// + /// Contains flags representing the current state and the reference count. + pub(crate) state: AtomicState, + + /// The proc that is blocked on the `ProcHandle`. + /// + /// This waker needs to be woken once the proc completes or is closed. + pub(crate) awaiter: Cell>, + + /// The virtual table. + /// + /// In addition to the actual waker virtual table, it also contains pointers to several other + /// methods necessary for bookkeeping the heap-allocated proc. + pub(crate) vtable: &'static ProcVTable, +} + +impl ProcData { + /// Cancels the proc. + /// + /// This method will only mark the proc as closed and will notify the awaiter, but it won't + /// reschedule the proc if it's not completed. + pub(crate) fn cancel(&self) { + let mut state = self.state.load(Ordering::Acquire); + + loop { + // If the proc has been completed or closed, it can't be cancelled. + if state.intersects(COMPLETED | CLOSED) { + break; + } + + // Mark the proc as closed. + match self.state.compare_exchange_weak( + state.into(), + (state | CLOSED).into(), + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Notify the awaiter that the proc has been closed. + if state.is_awaiter() { + self.notify(); + } + + break; + } + Err(s) => state = s, + } + } + } + + /// Notifies the proc blocked on the proc. + /// + /// If there is a registered waker, it will be removed from the pdata and woken. + #[inline] + pub(crate) fn notify(&self) { + if let Some(waker) = self.swap_awaiter(None) { + // We need a safeguard against panics because waking can panic. + waker.wake(); + } + } + + /// Notifies the proc blocked on the proc unless its waker matches `current`. + /// + /// If there is a registered waker, it will be removed from the pdata. + #[inline] + pub(crate) fn notify_unless(&self, current: &Waker) { + if let Some(waker) = self.swap_awaiter(None) { + if !waker.will_wake(current) { + // We need a safeguard against panics because waking can panic. + waker.wake(); + } + } + } + + /// Swaps the awaiter and returns the previous value. + #[inline] + pub(crate) fn swap_awaiter(&self, new: Option) -> Option { + let new_is_none = new.is_none(); + + // We're about to try acquiring the lock in a loop. If it's already being held by another + // thread, we'll have to spin for a while so it's best to employ a backoff strategy. + let backoff = Backoff::new(); + loop { + // Acquire the lock. If we're storing an awaiter, then also set the awaiter flag. + let state = if new_is_none { + self.state.fetch_or(LOCKED.into(), Ordering::Acquire) + } else { + self.state.fetch_or((LOCKED | AWAITER).into(), Ordering::Acquire) + }; + + // If the lock was acquired, break from the loop. + if state.is_locked() { + break; + } + + // Snooze for a little while because the lock is held by another thread. + backoff.snooze(); + } + + // Replace the awaiter. + let old = self.awaiter.replace(new); + + // Release the lock. If we've cleared the awaiter, then also unset the awaiter flag. + if new_is_none { + self.state.fetch_and((!LOCKED & !AWAITER).into(), Ordering::Release); + } else { + self.state.fetch_and((!LOCKED).into(), Ordering::Release); + } + + old + } +} + +impl Debug for ProcData { + fn fmt(&self, fmt: &mut Formatter) -> fmt::Result { + let state = self.state.load(Ordering::SeqCst); + + if fmt.alternate() { + fmt.debug_struct("ProcData") + .field("scheduled", &state.is_scheduled()) + .field("running", &state.is_running()) + .field("completed", &state.is_completed()) + .field("closed", &state.is_closed()) + .field("handle", &state.is_handle()) + .field("awaiter", &state.is_awaiter()) + .field("locked", &state.is_locked()) + .field("ref_count", &state.get_refcount()) + .finish() + } else { + fmt.debug_struct("ProcData") + .field("state", &state) + .finish() + } + } +} diff --git a/runtime/lightproc/src/proc_ext.rs b/runtime/lightproc/src/proc_ext.rs new file mode 100644 index 0000000..a3baa12 --- /dev/null +++ b/runtime/lightproc/src/proc_ext.rs @@ -0,0 +1,14 @@ +use crate::catch_unwind::CatchUnwind; +use std::future::Future; +use std::panic::UnwindSafe; + +pub(crate) trait ProcFutureExt: Future { + fn catch_unwind(self) -> CatchUnwind + where + Self: Sized + UnwindSafe, + { + CatchUnwind::new(self) + } +} + +impl ProcFutureExt for T where T: Future {} diff --git a/runtime/lightproc/src/proc_handle.rs b/runtime/lightproc/src/proc_handle.rs new file mode 100644 index 0000000..46c006f --- /dev/null +++ b/runtime/lightproc/src/proc_handle.rs @@ -0,0 +1,258 @@ +//! +//! Handle for tasks which don't need to unwind panics inside +//! the given futures. +use crate::proc_data::ProcData; +use crate::state::*; +use std::fmt::{self, Debug, Formatter}; +use std::future::Future; +use std::marker::{PhantomData, Unpin}; +use std::pin::Pin; +use std::ptr::NonNull; +use std::sync::atomic::Ordering; +use std::task::{Context, Poll}; + +/// A handle that awaits the result of a proc. +/// +/// This type is a future that resolves to an `Option` where: +/// +/// * `None` indicates the proc has panicked or was cancelled +/// * `Some(res)` indicates the proc has completed with `res` +pub struct ProcHandle { + /// A raw proc pointer. + pub(crate) raw_proc: NonNull<()>, + + /// A marker capturing the generic type `R`. + pub(crate) result: PhantomData, +} + +unsafe impl Send for ProcHandle {} +unsafe impl Sync for ProcHandle {} + +impl Unpin for ProcHandle {} + +impl ProcHandle { + pub(crate) fn new(raw_proc: NonNull<()>) -> Self { + Self { + raw_proc, + result: PhantomData, + } + } + + /// Cancels the proc. + /// + /// If the proc has already completed, calling this method will have no effect. + /// + /// When a proc is cancelled, its future cannot be polled again and will be dropped instead. + pub fn cancel(&self) { + let ptr = self.raw_proc.as_ptr(); + let pdata = ptr as *const ProcData; + + unsafe { + let mut state = (*pdata).state.load(Ordering::Acquire); + + loop { + // If the proc has been completed or closed, it can't be cancelled. + if state.intersects(COMPLETED | CLOSED) { + break; + } + + // If the proc is not scheduled nor running, we'll need to schedule it. + let new = if state.intersects(SCHEDULED | RUNNING) { + (state | SCHEDULED | CLOSED) + 1 + } else { + state | CLOSED + }; + + // Mark the proc as closed. + match (*pdata).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the proc is not scheduled nor running, schedule it so that its future + // gets dropped by the executor. + if !state.intersects(SCHEDULED | RUNNING) { + ((*pdata).vtable.schedule)(ptr); + } + + // Notify the awaiter that the proc has been closed. + if state.is_awaiter() { + (*pdata).notify(); + } + + break; + } + Err(s) => state = s, + } + } + } + } + + /// Returns current state of the handle. + pub fn state(&self) -> State { + let ptr = self.raw_proc.as_ptr(); + let pdata = ptr as *const ProcData; + unsafe { (*pdata).state.load(Ordering::SeqCst) } + } +} + +impl Future for ProcHandle { + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let ptr = self.raw_proc.as_ptr(); + let pdata = ptr as *const ProcData; + + unsafe { + let mut state = (*pdata).state.load(Ordering::Acquire); + + loop { + // If the proc has been closed, notify the awaiter and return `None`. + if state.is_closed() { + // Even though the awaiter is most likely the current proc, it could also be + // another proc. + (*pdata).notify_unless(cx.waker()); + return Poll::Ready(None); + } + + // If the proc is not completed, register the current proc. + if !state.is_completed() { + // Replace the waker with one associated with the current proc. We need a + // safeguard against panics because dropping the previous waker can panic. + (*pdata).swap_awaiter(Some(cx.waker().clone())); + + // Reload the state after registering. It is possible that the proc became + // completed or closed just before registration so we need to check for that. + state = (*pdata).state.load(Ordering::Acquire); + + // If the proc has been closed, notify the awaiter and return `None`. + if state.is_closed() { + // Even though the awaiter is most likely the current proc, it could also + // be another proc. + (*pdata).notify_unless(cx.waker()); + return Poll::Ready(None); + } + + // If the proc is still not completed, we're blocked on it. + if !state.is_completed() { + return Poll::Pending; + } + } + + // Since the proc is now completed, mark it as closed in order to grab its output. + match (*pdata).state.compare_exchange( + state, + state | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Notify the awaiter. Even though the awaiter is most likely the current + // proc, it could also be another proc. + if state.is_awaiter() { + (*pdata).notify_unless(cx.waker()); + } + + // Take the output from the proc. + let output = ((*pdata).vtable.get_output)(ptr) as *mut R; + return Poll::Ready(Some(output.read())); + } + Err(s) => state = s, + } + } + } + } +} + +impl Debug for ProcHandle { + fn fmt(&self, fmt: &mut Formatter) -> fmt::Result { + let ptr = self.raw_proc.as_ptr(); + let pdata = ptr as *const ProcData; + + fmt.debug_struct("ProcHandle") + .field("pdata", unsafe { &(*pdata) }) + .finish_non_exhaustive() + } +} + +impl Drop for ProcHandle { + fn drop(&mut self) { + let ptr = self.raw_proc.as_ptr(); + let pdata = ptr as *const ProcData; + + // A place where the output will be stored in case it needs to be dropped. + let mut output = None; + + unsafe { + // Optimistically assume the `ProcHandle` is being dropped just after creating the + // proc. This is a common case so if the handle is not used, the overhead of it is only + // one compare-exchange operation. + if let Err(mut state) = (*pdata).state.compare_exchange_weak( + SCHEDULED | HANDLE | REFERENCE, + SCHEDULED | REFERENCE, + Ordering::AcqRel, + Ordering::Acquire, + ) { + loop { + // If the proc has been completed but not yet closed, that means its output + // must be dropped. + if state.is_completed() && !state.is_closed() { + // Mark the proc as closed in order to grab its output. + match (*pdata).state.compare_exchange_weak( + state, + state | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Read the output. + output = Some((((*pdata).vtable.get_output)(ptr) as *mut R).read()); + + // Update the state variable because we're continuing the loop. + state |= CLOSED; + } + Err(s) => state = s, + } + } else { + // If this is the last reference to the proc and it's not closed, then + // close it and schedule one more time so that its future gets dropped by + // the executor. + let new = if state.get_refcount() == 0 && !state.is_closed() { + SCHEDULED | CLOSED | REFERENCE + } else { + state & !HANDLE + }; + + // Unset the handle flag. + match (*pdata).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If this is the last reference to the proc, we need to either + // schedule dropping its future or destroy it. + if state.get_refcount() == 0 { + if !state.is_closed() { + ((*pdata).vtable.schedule)(ptr); + } else { + ((*pdata).vtable.destroy)(ptr); + } + } + + break; + } + Err(s) => state = s, + } + } + } + } + } + + // Drop the output if it was taken out of the proc. + drop(output); + } +} diff --git a/runtime/lightproc/src/proc_layout.rs b/runtime/lightproc/src/proc_layout.rs new file mode 100644 index 0000000..cf1ee22 --- /dev/null +++ b/runtime/lightproc/src/proc_layout.rs @@ -0,0 +1,16 @@ +use std::alloc::Layout; + +#[derive(Clone, Copy)] +pub(crate) struct ProcLayout { + /// Memory layout of the whole proc. + pub(crate) layout: Layout, + + /// Offset into the proc at which the schedule function is stored. + pub(crate) offset_schedule: usize, + + /// Offset into the proc at which the future is stored. + pub(crate) offset_future: usize, + + /// Offset into the proc at which the output is stored. + pub(crate) offset_output: usize, +} diff --git a/runtime/lightproc/src/proc_vtable.rs b/runtime/lightproc/src/proc_vtable.rs new file mode 100644 index 0000000..baa5874 --- /dev/null +++ b/runtime/lightproc/src/proc_vtable.rs @@ -0,0 +1,25 @@ +use std::task::RawWakerVTable; + +/// The vtable for a proc. +pub(crate) struct ProcVTable { + /// The raw waker vtable. + pub(crate) raw_waker: RawWakerVTable, + + /// Schedules the proc. + pub(crate) schedule: unsafe fn(*const ()), + + /// Drops the future inside the proc. + pub(crate) drop_future: unsafe fn(*const ()), + + /// Returns a pointer to the output stored after completion. + pub(crate) get_output: unsafe fn(*const ()) -> *const (), + + /// Drops a waker or a proc. + pub(crate) decrement: unsafe fn(ptr: *const ()), + + /// Destroys the proc. + pub(crate) destroy: unsafe fn(*const ()), + + /// Ticks the proc. + pub(crate) tick: unsafe fn(*const ()), +} diff --git a/runtime/lightproc/src/raw_proc.rs b/runtime/lightproc/src/raw_proc.rs new file mode 100644 index 0000000..8334bca --- /dev/null +++ b/runtime/lightproc/src/raw_proc.rs @@ -0,0 +1,549 @@ +use crate::catch_unwind::CatchUnwind; +use crate::layout_helpers::extend; +use crate::lightproc::LightProc; +use crate::proc_data::ProcData; +use crate::proc_layout::ProcLayout; +use crate::proc_vtable::ProcVTable; +use crate::state::*; +use std::alloc::{self, Layout}; +use std::cell::Cell; +use std::future::Future; +use std::mem::{self, ManuallyDrop}; +use std::panic::AssertUnwindSafe; +use std::pin::Pin; +use std::ptr::NonNull; +use std::sync::atomic::Ordering; + +use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; + +/// Raw pointers to the fields of a proc. +pub(crate) struct RawProc { + pub(crate) pdata: *const ProcData, + pub(crate) schedule: *const S, + pub(crate) future: *mut F, + pub(crate) output: *mut R, +} + +impl RawProc +where + F: Future + 'static, + R: 'static, + S: Fn(LightProc) + 'static, +{ + /// Allocates a proc with the given `future` and `schedule` function. + /// + /// It is assumed there are initially only the `LightProc` reference and the `ProcHandle`. + pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> { + // Compute the layout of the proc for allocation. Abort if the computation fails. + let proc_layout = Self::proc_layout(); + + unsafe { + // Allocate enough space for the entire proc. + let raw_proc = match NonNull::new(alloc::alloc(proc_layout.layout) as *mut ()) { + None => std::process::abort(), + Some(p) => p, + }; + + let raw = Self::from_ptr(raw_proc.as_ptr()); + + + // Write the pdata as the first field of the proc. + (raw.pdata as *mut ProcData).write(ProcData { + state: AtomicState::new(SCHEDULED | HANDLE | REFERENCE), + awaiter: Cell::new(None), + vtable: &ProcVTable { + raw_waker: RawWakerVTable::new( + Self::clone_waker, + Self::wake, + Self::wake_by_ref, + Self::decrement, + ), + schedule: Self::schedule, + drop_future: Self::drop_future, + get_output: Self::get_output, + decrement: Self::decrement, + destroy: Self::destroy, + tick: Self::tick, + }, + }); + + // Write the schedule function as the third field of the proc. + (raw.schedule as *mut S).write(schedule); + + // Write the future as the fourth field of the proc. + raw.future.write(future); + + raw_proc + } + } + + /// Returns the memory layout for a proc. + #[inline(always)] + fn proc_layout() -> ProcLayout { + let layout_pdata = Layout::new::(); + let layout_schedule = Layout::new::(); + let layout_future = Layout::new::>>(); + let layout_output = Layout::new::(); + + let size_union = layout_future.size().max(layout_output.size()); + let align_union = layout_future.align().max(layout_output.align()); + let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) }; + + let layout = layout_pdata; + let (layout, offset_schedule) = extend(layout, layout_schedule); + let (layout, offset_union) = extend(layout, layout_union); + let offset_future = offset_union; + let offset_output = offset_union; + + ProcLayout { + layout, + offset_schedule, + offset_future, + offset_output, + } + } + + /// Creates a `RawProc` from a raw proc pointer. + #[inline] + pub(crate) fn from_ptr(ptr: *const ()) -> Self { + let proc_layout = Self::proc_layout(); + let p = ptr as *const u8; + + unsafe { + Self { + pdata: p as *const ProcData, + schedule: p.add(proc_layout.offset_schedule) as *const S, + future: p.add(proc_layout.offset_future) as *mut F, + output: p.add(proc_layout.offset_output) as *mut R, + } + } + } + + /// Wakes a waker. + unsafe fn wake(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + let mut state = (*raw.pdata).state.load(Ordering::Acquire); + + loop { + // If the proc is completed or closed, it can't be woken. + if state.intersects(COMPLETED | CLOSED) { + // Drop the waker. + Self::decrement(ptr); + break; + } + + // If the proc is already scheduled, we just need to synchronize with the thread that + // will run the proc by "publishing" our current view of the memory. + if state.is_scheduled() { + // Update the state without actually modifying it. + match (*raw.pdata).state.compare_exchange_weak( + state.into(), + state.into(), + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Drop the waker. + Self::decrement(ptr); + break; + } + Err(s) => state = s, + } + } else { + // Mark the proc as scheduled. + match (*raw.pdata).state.compare_exchange_weak( + state, + state | SCHEDULED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the proc was not yet scheduled and isn't currently running, now is the + // time to schedule it. + if !state.is_running() { + // Schedule the proc. + let proc = LightProc { + raw_proc: NonNull::new_unchecked(ptr as *mut ()), + }; + (*raw.schedule)(proc); + } else { + // Drop the waker. + Self::decrement(ptr); + } + + break; + } + Err(s) => state = s, + } + } + } + } + + /// Wakes a waker by reference. + unsafe fn wake_by_ref(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + let mut state = (*raw.pdata).state.load(Ordering::Acquire); + + loop { + // If the proc is completed or closed, it can't be woken. + if state.intersects(COMPLETED | CLOSED) { + break; + } + + // If the proc is already scheduled, we just need to synchronize with the thread that + // will run the proc by "publishing" our current view of the memory. + if state.is_scheduled() { + // Update the state without actually modifying it. + match (*raw.pdata).state.compare_exchange_weak( + state, + state, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(s) => state = s, + } + } else { + // If the proc is not scheduled nor running, we'll need to schedule after waking. + let new = if !state.intersects(SCHEDULED | RUNNING) { + (state | SCHEDULED) + 1 + } else { + state | SCHEDULED + }; + + // Mark the proc as scheduled. + match (*raw.pdata).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the proc is not scheduled nor running, now is the time to schedule. + if !state.intersects(SCHEDULED | RUNNING) { + // Schedule the proc. + let proc = LightProc { + raw_proc: NonNull::new_unchecked(ptr as *mut ()), + }; + (*raw.schedule)(proc); + } + + break; + } + Err(s) => state = s, + } + } + } + } + + /// Clones a waker. + unsafe fn clone_waker(ptr: *const ()) -> RawWaker { + let raw = Self::from_ptr(ptr); + let raw_waker = &(*raw.pdata).vtable.raw_waker; + + // Increment the reference count. With any kind of reference-counted data structure, + // relaxed ordering is fine when the reference is being cloned. + let state = (*raw.pdata).state.fetch_add(1, Ordering::Relaxed); + + // If the reference count overflowed, abort. + if state.bits() > i64::MAX as u64 { + std::process::abort(); + } + + RawWaker::new(ptr, raw_waker) + } + + /// Drops a waker or a proc. + /// + /// This function will decrement the reference count. If it drops down to zero and the + /// associated join handle has been dropped too, then the proc gets destroyed. + #[inline] + unsafe fn decrement(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + // Decrement the reference count. + let mut new = (*raw.pdata) + .state + .fetch_sub(1, Ordering::AcqRel); + new.set_refcount(new.get_refcount().saturating_sub(1)); + + // If this was the last reference to the proc and the `ProcHandle` has been dropped as + // well, then destroy the proc. + if new.get_refcount() == 0 && !new.is_handle() { + Self::destroy(ptr); + } + } + + /// Schedules a proc for running. + /// + /// This function doesn't modify the state of the proc. It only passes the proc reference to + /// its schedule function. + unsafe fn schedule(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + (*raw.schedule)(LightProc { + raw_proc: NonNull::new_unchecked(ptr as *mut ()), + }); + } + + /// Drops the future inside a proc. + #[inline] + unsafe fn drop_future(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + // We need a safeguard against panics because the destructor can panic. + raw.future.drop_in_place(); + } + + /// Returns a pointer to the output inside a proc. + unsafe fn get_output(ptr: *const ()) -> *const () { + let raw = Self::from_ptr(ptr); + raw.output as *const () + } + + /// Cleans up proc's resources and deallocates it. + /// + /// If the proc has not been closed, then its future or the output will be dropped. The + /// schedule function gets dropped too. + #[inline] + unsafe fn destroy(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + let proc_layout = Self::proc_layout(); + + // We need a safeguard against panics because destructors can panic. + // Drop the schedule function. + (raw.schedule as *mut S).drop_in_place(); + + // Finally, deallocate the memory reserved by the proc. + alloc::dealloc(ptr as *mut u8, proc_layout.layout); + } + + /// Ticks a proc. + /// + /// Ticking will call `poll` once and re-schedule the task if it returns `Poll::Pending`. If + /// polling its future panics, the proc will be closed and the panic propagated into the caller. + unsafe fn tick(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + // Create a context from the raw proc pointer and the vtable inside the its pdata. + let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new( + ptr, + &(*raw.pdata).vtable.raw_waker, + ))); + let cx = &mut Context::from_waker(&waker); + + let mut state = (*raw.pdata).state.load(Ordering::Acquire); + + // Update the proc's state before polling its future. + loop { + // If the proc has been closed, drop the proc reference and return. + if state.is_closed() { + // Notify the awaiter that the proc has been closed. + if state.is_awaiter() { + (*raw.pdata).notify(); + } + + // Drop the future. + Self::drop_future(ptr); + + // Drop the proc reference. + Self::decrement(ptr); + return; + } + + // Mark the proc as unscheduled and running. + match (*raw.pdata).state.compare_exchange_weak( + state, + (state & !SCHEDULED) | RUNNING, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Update the state because we're continuing with polling the future. + state = (state & !SCHEDULED) | RUNNING; + break; + } + Err(s) => state = s, + } + } + // If we get here the lightproc is not closed and marked as running and unscheduled. + + // Poll the inner future, but surround it with a guard that closes the proc in case polling + // panics. + let guard = Guard(raw); + let poll = ::poll(Pin::new_unchecked(&mut *raw.future), cx); + mem::forget(guard); + + match poll { + Poll::Ready(out) => { + // Replace the future with its output. + Self::drop_future(ptr); + raw.output.write(out); + + // A place where the output will be stored in case it needs to be dropped. + let mut output = None; + + // The proc is now completed. + loop { + // If the handle is dropped, we'll need to close it and drop the output. + let new = if !state.is_handle() { + (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED + } else { + (state & !RUNNING & !SCHEDULED) | COMPLETED + }; + + // Mark the proc as not running and completed. + match (*raw.pdata).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the handle is dropped or if the proc was closed while running, + // now it's time to drop the output. + if !state.is_handle() || state.is_closed() { + // Read the output. + output = Some(raw.output.read()); + } + + // Notify the awaiter that the proc has been completed. + if state.is_awaiter() { + (*raw.pdata).notify(); + } + + // Drop the proc reference. + Self::decrement(ptr); + break; + } + Err(s) => state = s, + } + } + + // Drop the output if it was taken out of the proc. + drop(output); + } + Poll::Pending => { + // The proc is still not completed. + loop { + // If the proc was closed while running, we'll need to unschedule in case it + // was woken and then clean up its resources. + let new = if state.is_closed() { + state & !( RUNNING | SCHEDULED ) + } else { + state & !RUNNING + }; + + // Mark the proc as not running. + match (*raw.pdata).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(state) => { + // If the proc was closed while running, we need to drop its future. + // If the proc was woken while running, we need to schedule it. + // Otherwise, we just drop the proc reference. + if state.is_closed() { + // The thread that closed the proc didn't drop the future because + // it was running so now it's our responsibility to do so. + Self::drop_future(ptr); + + // Drop the proc reference. + Self::decrement(ptr); + } else if state.is_scheduled() { + // The thread that has woken the proc didn't reschedule it because + // it was running so now it's our responsibility to do so. + Self::schedule(ptr); + } else { + // Drop the proc reference. + Self::decrement(ptr); + } + break; + } + Err(s) => state = s, + } + } + } + } + } +} + +impl Clone for RawProc { + fn clone(&self) -> Self { + Self { + pdata: self.pdata, + schedule: self.schedule, + future: self.future, + output: self.output, + } + } +} +impl Copy for RawProc {} + +/// A guard that closes the proc if polling its future panics. +struct Guard(RawProc) + where + F: Future + 'static, + R: 'static, + S: Fn(LightProc) + 'static; + +impl Drop for Guard +where + F: Future + 'static, + R: 'static, + S: Fn(LightProc) + 'static, +{ + fn drop(&mut self) { + let raw = self.0; + let ptr = raw.pdata as *const (); + + unsafe { + let mut state = (*raw.pdata).state.load(Ordering::Acquire); + + loop { + // If the proc was closed while running, then unschedule it, drop its + // future, and drop the proc reference. + if state.is_closed() { + // We still need to unschedule the proc because it is possible it was + // woken while running. + (*raw.pdata).state.fetch_and(!SCHEDULED, Ordering::AcqRel); + + // The thread that closed the proc didn't drop the future because it + // was running so now it's our responsibility to do so. + RawProc::::drop_future(ptr); + + // Drop the proc reference. + RawProc::::decrement(ptr); + break; + } + + // Mark the proc as not running, not scheduled, and closed. + match (*raw.pdata).state.compare_exchange_weak( + state, + (state & !RUNNING & !SCHEDULED) | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(state) => { + // Drop the future because the proc is now closed. + RawProc::::drop_future(ptr); + + // Notify the awaiter that the proc has been closed. + if state.is_awaiter() { + (*raw.pdata).notify(); + } + + // Drop the proc reference. + RawProc::::decrement(ptr); + break; + } + Err(s) => state = s, + } + } + } + } +} diff --git a/runtime/lightproc/src/recoverable_handle.rs b/runtime/lightproc/src/recoverable_handle.rs new file mode 100644 index 0000000..5d35067 --- /dev/null +++ b/runtime/lightproc/src/recoverable_handle.rs @@ -0,0 +1,119 @@ +//! +//! Handle for recoverable process +use std::any::Any; +use crate::proc_data::ProcData; +use crate::proc_handle::ProcHandle; +use crate::state::State; +use std::fmt::{self, Debug, Formatter}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::thread; + +/// Recoverable handle which encapsulates a standard Proc Handle and contain all panics inside. +/// +/// Execution of `after_panic` will be immediate on polling the [RecoverableHandle]'s future. +pub struct RecoverableHandle { + inner: ProcHandle>, + + /// Panic callback + /// + /// This callback will be called if the interior future panics. It is passed the panic + // reason i.e. the `Err` of [`std::thread::Result`] + panicked: Option) + Send + Sync>>, +} + +impl RecoverableHandle { + pub(crate) fn new(inner: ProcHandle>) -> Self { + RecoverableHandle { + inner, + panicked: None, + } + } + + /// Cancels the proc. + /// + /// If the proc has already completed, calling this method will have no effect. + /// + /// When a proc is cancelled, its future cannot be polled again and will be dropped instead. + pub fn cancel(&self) { + self.inner.cancel() + } + + /// Returns a state of the ProcHandle. + pub fn state(&self) -> State { + self.inner.state() + } + + /// Adds a callback that will be executed should the inner future `panic!`s + /// + /// ```rust + /// # use std::any::Any; + /// use lightproc::proc_stack::ProcStack; + /// use lightproc::proc_state::EmptyProcState; + /// # use lightproc::prelude::*; + /// # + /// # // ... future that does work + /// # let future = async { + /// # println!("Doing some work"); + /// # }; + /// # + /// # // ... basic schedule function with no waker logic + /// # fn schedule_function(proc: LightProc) {;} + /// # + /// # // ... process stack with a lifecycle callback + /// # let proc_stack = + /// # ProcStack::default() + /// # .with_after_panic(|s: &mut EmptyProcState| { + /// # println!("After panic started!"); + /// # }); + /// # + /// // ... creating a recoverable process + /// let (proc, recoverable) = LightProc::recoverable( + /// future, + /// schedule_function, + /// ); + /// + /// recoverable + /// .on_return(|_e: Box| { + /// println!("Inner future panicked"); + /// }); + /// ``` + pub fn on_panic(mut self, callback: F) -> Self + where F: FnOnce(Box) + Send + Sync + 'static, + { + self.panicked = Some(Box::new(callback)); + self + } + +} + +impl Future for RecoverableHandle { + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match Pin::new(&mut self.inner).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(Ok(val))) => Poll::Ready(Some(val)), + Poll::Ready(Some(Err(e))) => { + if let Some(callback) = self.panicked.take() { + callback(e); + } + + Poll::Ready(None) + }, + } + } +} + +impl Debug for RecoverableHandle { + fn fmt(&self, fmt: &mut Formatter) -> fmt::Result { + let ptr = self.inner.raw_proc.as_ptr(); + let pdata = ptr as *const ProcData; + + fmt.debug_struct("ProcHandle") + .field("pdata", unsafe { &(*pdata) }) + .finish_non_exhaustive() + } +} diff --git a/runtime/lightproc/src/state.rs b/runtime/lightproc/src/state.rs new file mode 100644 index 0000000..d729900 --- /dev/null +++ b/runtime/lightproc/src/state.rs @@ -0,0 +1,390 @@ +use std::sync::atomic::{AtomicU64, Ordering}; + +/// Set if the proc is scheduled for running. +/// +/// A proc is considered to be scheduled whenever its `LightProc` reference exists. It is in scheduled +/// state at the moment of creation and when it gets unpaused either by its `ProcHandle` or woken +/// by a `Waker`. +/// +/// This flag can't be set when the proc is completed. However, it can be set while the proc is +/// running, in which case it will be rescheduled as soon as polling finishes. +pub(crate) const SCHEDULED: State = State::SCHEDULED; + +/// Set if the proc is running. +/// +/// A proc is running state while its future is being polled. +/// +/// This flag can't be set when the proc is completed. However, it can be in scheduled state while +/// it is running, in which case it will be rescheduled when it stops being polled. +pub(crate) const RUNNING: State = State::RUNNING; + +/// Set if the proc has been completed. +/// +/// This flag is set when polling returns `Poll::Ready`. The output of the future is then stored +/// inside the proc until it becomes stopped. In fact, `ProcHandle` picks the output up by marking +/// the proc as stopped. +/// +/// This flag can't be set when the proc is scheduled or completed. +pub(crate) const COMPLETED: State = State::COMPLETED; + +/// Set if the proc is closed. +/// +/// If a proc is closed, that means its either cancelled or its output has been consumed by the +/// `ProcHandle`. A proc becomes closed when: +/// +/// 1. It gets cancelled by `LightProc::cancel()` or `ProcHandle::cancel()`. +/// 2. Its output is awaited by the `ProcHandle`. +/// 3. It panics while polling the future. +/// 4. It is completed and the `ProcHandle` is dropped. +pub(crate) const CLOSED: State = State::CLOSED; + +/// Set if the `ProcHandle` still exists. +/// +/// The `ProcHandle` is a special case in that it is only tracked by this flag, while all other +/// proc references (`LightProc` and `Waker`s) are tracked by the reference count. +pub(crate) const HANDLE: State = State::HANDLE; + +/// Set if the `ProcHandle` is awaiting the output. +/// +/// This flag is set while there is a registered awaiter of type `Waker` inside the proc. When the +/// proc gets closed or completed, we need to wake the awaiter. This flag can be used as a fast +/// check that tells us if we need to wake anyone without acquiring the lock inside the proc. +pub(crate) const AWAITER: State = State::AWAITER; + +/// Set if the awaiter is locked. +/// +/// This lock is acquired before a new awaiter is registered or the existing one is woken. +pub(crate) const LOCKED: State = State::LOCKED; + +/// A single reference. +/// +/// The lower bits in the state contain various flags representing the proc state, while the upper +/// bits contain the reference count. The value of `REFERENCE` represents a single reference in the +/// total reference count. +/// +/// Note that the reference counter only tracks the `LightProc` and `Waker`s. The `ProcHandle` is +/// tracked separately by the `HANDLE` flag. +pub(crate) const REFERENCE: State = State::REFERENCE; + +bitflags::bitflags! { + #[derive(Default)] + pub struct State: u64 { + const SCHEDULED = 1 << 0; + const RUNNING = 1 << 1; + const COMPLETED = 1 << 2; + const CLOSED = 1 << 3; + const HANDLE = 1 << 4; + const AWAITER = 1 << 5; + const LOCKED = 1 << 6; + const REFERENCE = 1 << 7; + } +} + +impl State { + #[inline(always)] + const fn new(bits: u64) -> Self { + unsafe { Self::from_bits_unchecked(bits) } + } + + /// Returns `true` if the future is in the pending. + #[inline(always)] + pub fn is_pending(&self) -> bool { + !self.is_completed() + } + + bitfield::bitfield_fields! { + u64; + #[inline(always)] + /// A proc is considered to be scheduled whenever its `LightProc` reference exists. It is in scheduled + /// state at the moment of creation and when it gets unpaused either by its `ProcHandle` or woken + /// by a `Waker`. + /// + /// This flag can't be set when the proc is completed. However, it can be set while the proc is + /// running, in which case it will be rescheduled as soon as polling finishes. + pub is_scheduled, set_scheduled: 0; + + #[inline(always)] + /// A proc is running state while its future is being polled. + /// + /// This flag can't be set when the proc is completed. However, it can be in scheduled state while + /// it is running, in which case it will be rescheduled when it stops being polled. + pub is_running, set_running: 1; + + #[inline(always)] + /// Set if the proc has been completed. + /// + /// This flag is set when polling returns `Poll::Ready`. The output of the future is then stored + /// inside the proc until it becomes stopped. In fact, `ProcHandle` picks the output up by marking + /// the proc as stopped. + /// + /// This flag can't be set when the proc is scheduled or completed. + pub is_completed, set_completed: 2; + + #[inline(always)] + /// Set if the proc is closed. + /// + /// If a proc is closed, that means its either cancelled or its output has been consumed by the + /// `ProcHandle`. A proc becomes closed when: + /// + /// 1. It gets cancelled by `LightProc::cancel()` or `ProcHandle::cancel()`. + /// 2. Its output is awaited by the `ProcHandle`. + /// 3. It panics while polling the future. + /// 4. It is completed and the `ProcHandle` is dropped. + pub is_closed, set_closed: 3; + + #[inline(always)] + /// Set if the `ProcHandle` still exists. + /// + /// The `ProcHandle` is a special case in that it is only tracked by this flag, while all other + /// proc references (`LightProc` and `Waker`s) are tracked by the reference count. + pub is_handle, set_handle: 4; + + #[inline(always)] + /// Set if the `ProcHandle` is awaiting the output. + /// + /// This flag is set while there is a registered awaiter of type `Waker` inside the proc. When the + /// proc gets closed or completed, we need to wake the awaiter. This flag can be used as a fast + /// check that tells us if we need to wake anyone without acquiring the lock inside the proc. + pub is_awaiter, set_awaiter: 5; + + #[inline(always)] + /// Set if the awaiter is locked. + /// + /// This lock is acquired before a new awaiter is registered or the existing one is woken. + pub is_locked, set_locked: 6; + + #[inline(always)] + /// The lower bits in the state contain various flags representing the proc state, while the upper + /// bits contain the reference count. + /// Note that the reference counter only tracks the `LightProc` and `Waker`s. The `ProcHandle` is + /// tracked separately by the `HANDLE` flag. + pub get_refcount, set_refcount: 63, 7; + } +} + +impl std::ops::Add for State { + type Output = State; + + fn add(mut self, rhs: u64) -> Self::Output { + self.set_refcount(self.get_refcount() + rhs); + self + } +} + +impl std::ops::Sub for State { + type Output = State; + + fn sub(mut self, rhs: u64) -> Self::Output { + self.set_refcount(self.get_refcount() - rhs); + self + } +} + +impl bitfield::BitRange for State + where u64: bitfield::BitRange +{ + fn bit_range(&self, msb: usize, lsb: usize) -> T { + self.bits.bit_range(msb, lsb) + } + + fn set_bit_range(&mut self, msb: usize, lsb: usize, value: T) { + self.bits.set_bit_range(msb, lsb, value) + } +} + +impl Into for State { + fn into(self) -> usize { + self.bits as usize + } +} + +#[repr(transparent)] +pub struct AtomicState { + inner: AtomicU64, +} + +impl AtomicState { + #[inline(always)] + pub const fn new(v: State) -> Self { + let inner = AtomicU64::new(v.bits); + Self { inner } + } + + #[inline(always)] + pub fn load(&self, order: Ordering) -> State { + State::new(self.inner.load(order)) + } + + #[inline(always)] + #[allow(dead_code)] + pub fn store(&self, val: State, order: Ordering) { + self.inner.store(val.bits, order) + } + + pub fn compare_exchange( + &self, + current: State, + new: State, + success: Ordering, + failure: Ordering + ) -> Result + { + self.inner.compare_exchange(current.bits, new.bits, success, failure) + .map(|u| State::new(u)) + .map_err(|u| State::new(u)) + } + + pub fn compare_exchange_weak( + &self, + current: State, + new: State, + success: Ordering, + failure: Ordering + ) -> Result + { + self.inner.compare_exchange_weak(current.bits, new.bits, success, failure) + .map(|u| State::new(u)) + .map_err(|u| State::new(u)) + } + + pub fn fetch_or(&self, val: State, order: Ordering) -> State { + State::new(self.inner.fetch_or(val.bits, order)) + } + + pub fn fetch_and(&self, val: State, order: Ordering) -> State { + State::new(self.inner.fetch_and(val.bits, order)) + } + + // FIXME: Do this properly + pub fn fetch_add(&self, val: u64, order: Ordering) -> State { + State::new(self.inner.fetch_add(val << 7, order)) + } + + // FIXME: Do this properly + pub fn fetch_sub(&self, val: u64, order: Ordering) -> State { + State::new(self.inner.fetch_sub(val << 7, order)) + } +} + +#[cfg(test)] +mod tests { + use crate::state::*; + + #[test] + fn test_state_has_debug() { + let state = SCHEDULED | AWAITER; + println!("{:?}", state); + } + + #[test] + fn test_is_scheduled_returns_true() { + let state = SCHEDULED; + assert_eq!(state.is_scheduled(), true); + + let mut state2 = State::default(); + state2.set_scheduled(true); + assert_eq!(state, state2) + } + + #[test] + fn test_is_scheduled_returns_false() { + let state = State::default(); + assert_eq!(state.is_scheduled(), false); + } + + #[test] + fn test_is_running_returns_true() { + let state = RUNNING; + assert_eq!(state.is_running(), true); + } + + #[test] + fn test_is_running_returns_false() { + let state = State::default(); + assert_eq!(state.is_running(), false); + } + + #[test] + fn test_is_completed_returns_true() { + let state = COMPLETED; + assert_eq!(state.is_completed(), true); + } + + #[test] + fn test_is_completed_returns_false() { + let state = State::default(); + assert_eq!(state.is_completed(), false); + } + + #[test] + fn test_is_closed_returns_true() { + let state = CLOSED; + assert_eq!(state.is_closed(), true); + } + + #[test] + fn test_is_closed_returns_false() { + let state = State::default(); + assert_eq!(state.is_closed(), false); + } + + #[test] + fn test_is_handle_returns_true() { + let state = HANDLE; + assert_eq!(state.is_handle(), true); + } + + #[test] + fn test_is_handle_returns_false() { + let state = State::default(); + assert_eq!(state.is_handle(), false); + } + + #[test] + fn test_is_awaiter_returns_true() { + let state = AWAITER; + assert_eq!(state.is_awaiter(), true); + } + + #[test] + fn test_is_awaiter_returns_false() { + let state = State::default(); + assert_eq!(state.is_awaiter(), false); + } + + #[test] + fn test_is_locked_returns_true() { + let state = LOCKED; + assert_eq!(state.is_locked(), true); + } + + #[test] + fn test_is_locked_returns_false() { + let state = State::default(); + assert_eq!(state.is_locked(), false); + } + + #[test] + fn test_is_pending_returns_true() { + let state = State::default(); + assert_eq!(state.is_pending(), true); + } + + #[test] + fn test_is_pending_returns_false() { + let state = COMPLETED; + assert_eq!(state.is_pending(), false); + } + + #[test] + fn test_add_sub_refcount() { + let state = State::default(); + assert_eq!(state.get_refcount(), 0); + let state = state + 5; + assert_eq!(state.get_refcount(), 5); + let mut state = state - 2; + assert_eq!(state.get_refcount(), 3); + state.set_refcount(1); + assert_eq!(state.get_refcount(), 1); + } +} diff --git a/runtime/lightproc/tests/stack.rs b/runtime/lightproc/tests/stack.rs new file mode 100644 index 0000000..6618d44 --- /dev/null +++ b/runtime/lightproc/tests/stack.rs @@ -0,0 +1,15 @@ +use lightproc::proc_stack::ProcStack; +use lightproc::proc_state::EmptyProcState; + +#[test] +fn stack_copy() { + let stack = ProcStack::default() + .with_pid(12) + .with_after_panic(|_s: &mut EmptyProcState| { + println!("After panic!"); + }); + + let stack2 = stack; + + assert_eq!(stack2.get_pid(), 12); +}