LightProc fork working

This commit is contained in:
Nadja Reitzenstein 2021-11-14 17:50:59 +01:00
parent 3231b51f89
commit 24be65b3d9
18 changed files with 2096 additions and 0 deletions

View File

@ -0,0 +1,23 @@
[package]
name = "lightproc"
version = "0.3.0"
publish = false
description = "Lightweight process abstraction for Rust"
authors = []
keywords = ["fault-tolerant", "runtime", "actor", "system", "lightweight-process"]
categories = ["concurrency", "asynchronous"]
readme = "README.md"
license = "Apache-2.0/MIT"
edition = "2021"
[dependencies]
crossbeam-utils = "0.8"
pin-utils = "0.1.0"
bitfield = "0.13.2"
bitflags = "1.3.2"
[dev-dependencies]
crossbeam = "0.8"
futures-executor = "0.3"
lazy_static = "1.4.0"
async-std = "1.5"

View File

@ -0,0 +1,55 @@
# LightProc
<table align=left style='float: left; margin: 4px 10px 0px 0px; border: 1px solid #000000;'>
<tr>
<td>Latest Release</td>
<td>
<a href="https://crates.io/crates/lightproc">
<img alt="Crates.io" src="https://img.shields.io/crates/v/lightproc.svg?style=popout-square">
</a>
</td>
</tr>
<tr>
<td></td>
</tr>
<tr>
<td>License</td>
<td>
<a href="https://github.com/bastion-rs/bastion/blob/master/LICENSE">
<img alt="Crates.io" src="https://img.shields.io/crates/l/bastion.svg?style=popout-square">
</a>
</td>
</tr>
<tr>
<td>Build Status</td>
<td>
<a href="https://actions-badge.atrox.dev/bastion-rs/bastion/goto">
<img alt="Build Status" src="https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fbastion-rs%2Fbastion%2Fbadge&style=flat" />
</a>
</td>
</tr>
<tr>
<td>Downloads</td>
<td>
<a href="https://crates.io/crates/lightproc">
<img alt="Crates.io" src="https://img.shields.io/crates/d/lightproc.svg?style=popout-square">
</a>
</td>
</tr>
<tr>
<td>Discord</td>
<td>
<a href="https://discord.gg/DqRqtRT">
<img src="https://img.shields.io/discord/628383521450360842.svg?logo=discord" />
</a>
</td>
</tr>
</table>
LightProc is Lightweight Process abstraction for Rust.
Beneath the implementation:
* It uses futures with lifecycle callbacks to implement Erlang like processes.
* Contains basic pid(process id) to identify processes.
* All panics inside futures are propagated to upper layers.

View File

@ -0,0 +1,61 @@
use std::any::Any;
use std::fmt::Debug;
use std::ops::Deref;
use crossbeam::channel::{unbounded, Sender};
use futures_executor as executor;
use lazy_static::lazy_static;
use lightproc::prelude::*;
use std::future::Future;
use std::thread;
fn spawn_on_thread<F, R>(future: F) -> RecoverableHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Debug + Send + 'static,
{
lazy_static! {
// A channel that holds scheduled procs.
static ref QUEUE: Sender<LightProc> = {
let (sender, receiver) = unbounded::<LightProc>();
// Start the executor thread.
thread::spawn(move || {
for proc in receiver {
proc.run();
}
});
sender
};
}
let schedule = |t| (QUEUE.deref()).send(t).unwrap();
let (proc, handle) = LightProc::recoverable(
future,
schedule
);
let handle = handle
.on_panic(|err: Box<dyn Any + Send>| {
match err.downcast::<&'static str>() {
Ok(reason) => println!("Future panicked: {}", &reason),
Err(err) =>
println!("Future panicked with a non-text reason of typeid {:?}",
err.type_id()),
}
});
proc.schedule();
handle
}
fn main() {
let handle = spawn_on_thread(async {
panic!("Panic here!");
});
executor::block_on(handle);
println!("But see, despite the inner future panicking we can continue executing as normal.");
}

View File

@ -0,0 +1,46 @@
use crossbeam::channel;
use futures_executor as executor;
use lightproc::prelude::*;
use std::future::Future;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
fn spawn_on_thread<F, R>(fut: F) -> ProcHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (sender, receiver) = channel::unbounded();
let sender = Arc::new(sender);
let s = Arc::downgrade(&sender);
let future = async move {
let _ = sender;
fut.await
};
let schedule = move |t| s.upgrade().unwrap().send(t).unwrap();
let (proc, handle) = LightProc::build(
future,
schedule,
);
proc.schedule();
thread::spawn(move || {
for proc in receiver {
proc.run();
}
});
handle
}
fn main() {
executor::block_on(spawn_on_thread(async {
println!("Sleeping!");
async_std::task::sleep(Duration::from_secs(1)).await;
println!("Done sleeping");
}));
}

View File

@ -0,0 +1,78 @@
use crossbeam::channel::{unbounded, Sender};
use futures_executor as executor;
use lazy_static::lazy_static;
use lightproc::prelude::*;
use std::future::Future;
use std::sync::{Arc, Mutex};
use std::thread;
#[derive(Copy, Clone)]
pub struct GlobalState {
pub amount: usize,
}
fn spawn_on_thread<F, R>(future: F, gs: Arc<Mutex<GlobalState>>)
-> RecoverableHandle<Arc<Mutex<GlobalState>>, R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
lazy_static! {
// A channel that holds scheduled procs.
static ref QUEUE: Sender<LightProc> = {
let (sender, receiver) = unbounded::<LightProc>();
// Start the executor thread.
thread::spawn(move || {
for proc in receiver {
proc.run();
}
});
sender
};
}
let stack = ProcStack::build(Box::new(gs))
.initialize(Callback::wrap(|s: &mut Arc<Mutex<GlobalState>>| {
println!("initializing");
s.clone().lock().unwrap().amount += 1;
}))
.completed(Callback::wrap(|s: &mut Arc<Mutex<GlobalState>>| {
println!("completed");
s.clone().lock().unwrap().amount += 2;
}));
let schedule = |t| QUEUE.send(t).unwrap();
let (proc, handle) = LightProc::recoverable(future, schedule, stack);
let handle = handle
.on_panic(|s: &mut Arc<Mutex<GlobalState>>, _e| {
println!("panicked");
s.clone().lock().unwrap().amount += 3;
});
proc.schedule();
handle
}
fn main() {
let gs = Arc::new(Mutex::new(GlobalState { amount: 0 }));
let handle = spawn_on_thread(
async {
panic!("Panic here!");
},
gs.clone(),
);
executor::block_on(handle);
// 0 at the start
// +1 before the start
// +2 after panic occurs and completion triggers
// +3 after panic triggers
let amount = gs.lock().unwrap().amount;
assert_eq!(amount, 6);
println!("Amount: {}", amount);
}

View File

@ -0,0 +1,36 @@
use pin_utils::unsafe_pinned;
use std::any::Any;
use std::future::Future;
use std::panic::{catch_unwind, AssertUnwindSafe, UnwindSafe};
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Debug)]
pub(crate) struct CatchUnwind<F>
where
F: Future,
{
future: F,
}
impl<F> CatchUnwind<F>
where
F: Future + UnwindSafe,
{
unsafe_pinned!(future: F);
pub(crate) fn new(future: F) -> CatchUnwind<F> {
CatchUnwind { future }
}
}
impl<F> Future for CatchUnwind<F>
where
F: Future + UnwindSafe,
{
type Output = Result<F::Output, Box<dyn Any + Send>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
catch_unwind(AssertUnwindSafe(|| self.future().poll(cx)))?.map(Ok)
}
}

View File

@ -0,0 +1,28 @@
use std::alloc::Layout;
use std::io::{Error, ErrorKind};
#[inline]
pub(crate) fn extend(layout: Layout, next: Layout) -> (Layout, usize) {
let new_align = std::cmp::max(layout.align(), next.align());
let pad = padding_needed_for(layout, next.align());
let offset = layout
.size()
.checked_add(pad)
.ok_or_else(|| Error::new(ErrorKind::Other, "Padding overflow check failed"))
.unwrap();
let new_size = offset
.checked_add(next.size())
.ok_or_else(|| Error::new(ErrorKind::Other, "New size can't be computed"))
.unwrap();
let layout = Layout::from_size_align(new_size, new_align).unwrap();
(layout, offset)
}
#[inline]
pub(crate) fn padding_needed_for(layout: Layout, align: usize) -> usize {
let len = layout.size();
let len_rounded_up = len.wrapping_add(align).wrapping_sub(1) & !align.wrapping_sub(1);
len_rounded_up.wrapping_sub(len)
}

View File

@ -0,0 +1,43 @@
//!
//!
//! LightProc is Lightweight Process abstraction for Rust.
//!
//! Beneath the implementation:
//! * It uses futures with lifecycle callbacks to implement Erlang like processes.
//! * Contains basic pid(process id) to identify processes.
//! * All panics inside futures are propagated to upper layers.
//!
//! The naming convention of this crate comes from [Erlang's Lightweight Processes].
//!
//! [Erlang's Lightweight Processes]: https://en.wikipedia.org/wiki/Light-weight_process
//!
// Force missing implementations
#![forbid(missing_docs)]
#![forbid(missing_debug_implementations)]
#![forbid(unused_import_braces)]
#![forbid(unused_imports)]
#![forbid(unused_must_use)]
#![forbid(unused_variables)]
mod catch_unwind;
mod layout_helpers;
mod proc_data;
mod proc_ext;
mod proc_layout;
mod proc_vtable;
mod raw_proc;
mod state;
pub mod lightproc;
pub mod proc_handle;
pub mod recoverable_handle;
/// The lightproc prelude.
///
/// The prelude re-exports lightproc structs and handles from this crate.
pub mod prelude {
pub use crate::lightproc::*;
pub use crate::proc_handle::*;
pub use crate::recoverable_handle::*;
}

View File

@ -0,0 +1,192 @@
//!
//! Lightweight process implementation which enables users
//! to create either panic recoverable process or
//! ordinary process.
//!
//! Lightweight processes needs a stack to use their lifecycle
//! operations like `before_start`, `after_complete` and more...
//!
//! # Example Usage
//!
//! ```rust
//! use lightproc::prelude::*;
//!
//! // ... future that does work
//! let future = async {
//! println!("Doing some work");
//! };
//!
//! // ... basic schedule function with no waker logic
//! fn schedule_function(proc: LightProc) {;}
//!
//! // ... creating a recoverable process
//! let panic_recoverable = LightProc::recoverable(
//! future,
//! schedule_function,
//! );
//! ```
use crate::proc_data::ProcData;
use crate::proc_ext::ProcFutureExt;
use crate::proc_handle::ProcHandle;
use crate::raw_proc::RawProc;
use crate::recoverable_handle::RecoverableHandle;
use std::fmt::{self, Debug, Formatter};
use std::future::Future;
use std::mem;
use std::panic::AssertUnwindSafe;
use std::ptr::NonNull;
/// Shared functionality for both Send and !Send LightProc
pub struct LightProc {
/// A pointer to the heap-allocated proc.
pub(crate) raw_proc: NonNull<()>,
}
// LightProc is both Sync and Send because it explicitly handles synchronization internally:
// The state of a `LightProc` is only modified atomically guaranteeing a consistent view from all
// threads. Existing handles are atomically reference counted so the proc itself will not be dropped
// until all pointers to it are themselves dropped.
// However, if the future or result inside the LightProc is !Send the executor must ensure that
// the `schedule` function does not move the LightProc to a different thread.
unsafe impl Send for LightProc {}
unsafe impl Sync for LightProc {}
impl LightProc {
/// Creates a recoverable process which will catch panics in the given future.
///
/// # Example
/// ```rust
/// # use std::any::Any;
/// # use lightproc::prelude::*;
/// #
/// # // ... basic schedule function with no waker logic
/// # fn schedule_function(proc: LightProc) {;}
/// #
/// let future = async {
/// panic!("oh no!");
/// };
/// // ... creating a recoverable process
/// let (proc, handle) = LightProc::recoverable(
/// future,
/// schedule_function,
/// );
/// let handle = handle.on_panic(|s: &mut EmptyProcState, e: Box<dyn Any + Send>| {
/// let reason = e.downcast::<String>();
/// println!("future panicked!: {}", &reason);
/// });
/// ```
pub fn recoverable<F, R, S>(future: F, schedule: S) -> (Self, RecoverableHandle<R>)
where F: Future<Output=R> + 'static,
R: 'static,
S: Fn(LightProc) + 'static,
{
let recovery_future = AssertUnwindSafe(future).catch_unwind();
let (proc, handle) = Self::build(recovery_future, schedule);
(proc, RecoverableHandle::new(handle))
}
///
/// Creates a process which will stop its execution on occurrence of panic.
///
/// # Example
/// ```rust
/// # use lightproc::prelude::*;
/// #
/// # // ... future that does work
/// # let future = async {
/// # println!("Doing some work");
/// # };
/// #
/// # // ... basic schedule function with no waker logic
/// # fn schedule_function(proc: LightProc) {;}
/// #
/// # // ... process stack with a lifecycle callback
/// # let proc_stack =
/// # ProcStack::default()
/// # .with_after_panic(|s: &mut EmptyProcState| {
/// # println!("After panic started!");
/// # });
/// #
/// // ... creating a standard process
/// let standard = LightProc::build(
/// future,
/// schedule_function,
/// );
/// ```
pub fn build<F, R, S>(future: F, schedule: S) -> (Self, ProcHandle<R>)
where F: Future<Output=R> + 'static,
R: 'static,
S: Fn(LightProc) + 'static,
{
let raw_proc = RawProc::allocate(future, schedule);
let proc = LightProc { raw_proc };
let handle = ProcHandle::new(raw_proc);
(proc, handle)
}
///
/// Schedule the lightweight process with passed `schedule` function at the build time.
pub fn schedule(self) {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;
mem::forget(self);
unsafe {
((*pdata).vtable.schedule)(ptr);
}
}
/// Run this LightProc.
///
/// "Running" a lightproc means ticking it once and if it doesn't complete
/// immediately re-scheduling it as soon as it's Waker wakes it back up.
pub fn run(self) {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;
mem::forget(self);
unsafe {
((*pdata).vtable.tick)(ptr);
}
}
/// Cancel polling the lightproc's inner future, thus cancelling the proc itself.
pub fn cancel(&self) {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;
unsafe {
(*pdata).cancel();
}
}
}
impl Debug for LightProc {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;
fmt.debug_struct("LightProc")
.field("pdata", unsafe { &(*pdata) })
.finish()
}
}
impl Drop for LightProc {
fn drop(&mut self) {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;
unsafe {
// Cancel the proc.
(*pdata).cancel();
// Drop the future.
((*pdata).vtable.drop_future)(ptr);
// Drop the proc reference.
((*pdata).vtable.decrement)(ptr);
}
}
}

View File

@ -0,0 +1,148 @@
use crate::proc_vtable::ProcVTable;
use crate::state::*;
use crossbeam_utils::Backoff;
use std::cell::Cell;
use std::fmt::{self, Debug, Formatter};
use std::sync::atomic::Ordering;
use std::task::Waker;
/// The pdata of a proc.
///
/// This pdata is stored right at the beginning of every heap-allocated proc.
pub(crate) struct ProcData {
/// Current state of the proc.
///
/// Contains flags representing the current state and the reference count.
pub(crate) state: AtomicState,
/// The proc that is blocked on the `ProcHandle`.
///
/// This waker needs to be woken once the proc completes or is closed.
pub(crate) awaiter: Cell<Option<Waker>>,
/// The virtual table.
///
/// In addition to the actual waker virtual table, it also contains pointers to several other
/// methods necessary for bookkeeping the heap-allocated proc.
pub(crate) vtable: &'static ProcVTable,
}
impl ProcData {
/// Cancels the proc.
///
/// This method will only mark the proc as closed and will notify the awaiter, but it won't
/// reschedule the proc if it's not completed.
pub(crate) fn cancel(&self) {
let mut state = self.state.load(Ordering::Acquire);
loop {
// If the proc has been completed or closed, it can't be cancelled.
if state.intersects(COMPLETED | CLOSED) {
break;
}
// Mark the proc as closed.
match self.state.compare_exchange_weak(
state.into(),
(state | CLOSED).into(),
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Notify the awaiter that the proc has been closed.
if state.is_awaiter() {
self.notify();
}
break;
}
Err(s) => state = s,
}
}
}
/// Notifies the proc blocked on the proc.
///
/// If there is a registered waker, it will be removed from the pdata and woken.
#[inline]
pub(crate) fn notify(&self) {
if let Some(waker) = self.swap_awaiter(None) {
// We need a safeguard against panics because waking can panic.
waker.wake();
}
}
/// Notifies the proc blocked on the proc unless its waker matches `current`.
///
/// If there is a registered waker, it will be removed from the pdata.
#[inline]
pub(crate) fn notify_unless(&self, current: &Waker) {
if let Some(waker) = self.swap_awaiter(None) {
if !waker.will_wake(current) {
// We need a safeguard against panics because waking can panic.
waker.wake();
}
}
}
/// Swaps the awaiter and returns the previous value.
#[inline]
pub(crate) fn swap_awaiter(&self, new: Option<Waker>) -> Option<Waker> {
let new_is_none = new.is_none();
// We're about to try acquiring the lock in a loop. If it's already being held by another
// thread, we'll have to spin for a while so it's best to employ a backoff strategy.
let backoff = Backoff::new();
loop {
// Acquire the lock. If we're storing an awaiter, then also set the awaiter flag.
let state = if new_is_none {
self.state.fetch_or(LOCKED.into(), Ordering::Acquire)
} else {
self.state.fetch_or((LOCKED | AWAITER).into(), Ordering::Acquire)
};
// If the lock was acquired, break from the loop.
if state.is_locked() {
break;
}
// Snooze for a little while because the lock is held by another thread.
backoff.snooze();
}
// Replace the awaiter.
let old = self.awaiter.replace(new);
// Release the lock. If we've cleared the awaiter, then also unset the awaiter flag.
if new_is_none {
self.state.fetch_and((!LOCKED & !AWAITER).into(), Ordering::Release);
} else {
self.state.fetch_and((!LOCKED).into(), Ordering::Release);
}
old
}
}
impl Debug for ProcData {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
let state = self.state.load(Ordering::SeqCst);
if fmt.alternate() {
fmt.debug_struct("ProcData")
.field("scheduled", &state.is_scheduled())
.field("running", &state.is_running())
.field("completed", &state.is_completed())
.field("closed", &state.is_closed())
.field("handle", &state.is_handle())
.field("awaiter", &state.is_awaiter())
.field("locked", &state.is_locked())
.field("ref_count", &state.get_refcount())
.finish()
} else {
fmt.debug_struct("ProcData")
.field("state", &state)
.finish()
}
}
}

View File

@ -0,0 +1,14 @@
use crate::catch_unwind::CatchUnwind;
use std::future::Future;
use std::panic::UnwindSafe;
pub(crate) trait ProcFutureExt: Future {
fn catch_unwind(self) -> CatchUnwind<Self>
where
Self: Sized + UnwindSafe,
{
CatchUnwind::new(self)
}
}
impl<T: ?Sized> ProcFutureExt for T where T: Future {}

View File

@ -0,0 +1,258 @@
//!
//! Handle for tasks which don't need to unwind panics inside
//! the given futures.
use crate::proc_data::ProcData;
use crate::state::*;
use std::fmt::{self, Debug, Formatter};
use std::future::Future;
use std::marker::{PhantomData, Unpin};
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::atomic::Ordering;
use std::task::{Context, Poll};
/// A handle that awaits the result of a proc.
///
/// This type is a future that resolves to an `Option<R>` where:
///
/// * `None` indicates the proc has panicked or was cancelled
/// * `Some(res)` indicates the proc has completed with `res`
pub struct ProcHandle<R> {
/// A raw proc pointer.
pub(crate) raw_proc: NonNull<()>,
/// A marker capturing the generic type `R`.
pub(crate) result: PhantomData<R>,
}
unsafe impl<R: Send> Send for ProcHandle<R> {}
unsafe impl<R: Sync> Sync for ProcHandle<R> {}
impl<R> Unpin for ProcHandle<R> {}
impl<R> ProcHandle<R> {
pub(crate) fn new(raw_proc: NonNull<()>) -> Self {
Self {
raw_proc,
result: PhantomData,
}
}
/// Cancels the proc.
///
/// If the proc has already completed, calling this method will have no effect.
///
/// When a proc is cancelled, its future cannot be polled again and will be dropped instead.
pub fn cancel(&self) {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;
unsafe {
let mut state = (*pdata).state.load(Ordering::Acquire);
loop {
// If the proc has been completed or closed, it can't be cancelled.
if state.intersects(COMPLETED | CLOSED) {
break;
}
// If the proc is not scheduled nor running, we'll need to schedule it.
let new = if state.intersects(SCHEDULED | RUNNING) {
(state | SCHEDULED | CLOSED) + 1
} else {
state | CLOSED
};
// Mark the proc as closed.
match (*pdata).state.compare_exchange_weak(
state,
new,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// If the proc is not scheduled nor running, schedule it so that its future
// gets dropped by the executor.
if !state.intersects(SCHEDULED | RUNNING) {
((*pdata).vtable.schedule)(ptr);
}
// Notify the awaiter that the proc has been closed.
if state.is_awaiter() {
(*pdata).notify();
}
break;
}
Err(s) => state = s,
}
}
}
}
/// Returns current state of the handle.
pub fn state(&self) -> State {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;
unsafe { (*pdata).state.load(Ordering::SeqCst) }
}
}
impl<R> Future for ProcHandle<R> {
type Output = Option<R>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;
unsafe {
let mut state = (*pdata).state.load(Ordering::Acquire);
loop {
// If the proc has been closed, notify the awaiter and return `None`.
if state.is_closed() {
// Even though the awaiter is most likely the current proc, it could also be
// another proc.
(*pdata).notify_unless(cx.waker());
return Poll::Ready(None);
}
// If the proc is not completed, register the current proc.
if !state.is_completed() {
// Replace the waker with one associated with the current proc. We need a
// safeguard against panics because dropping the previous waker can panic.
(*pdata).swap_awaiter(Some(cx.waker().clone()));
// Reload the state after registering. It is possible that the proc became
// completed or closed just before registration so we need to check for that.
state = (*pdata).state.load(Ordering::Acquire);
// If the proc has been closed, notify the awaiter and return `None`.
if state.is_closed() {
// Even though the awaiter is most likely the current proc, it could also
// be another proc.
(*pdata).notify_unless(cx.waker());
return Poll::Ready(None);
}
// If the proc is still not completed, we're blocked on it.
if !state.is_completed() {
return Poll::Pending;
}
}
// Since the proc is now completed, mark it as closed in order to grab its output.
match (*pdata).state.compare_exchange(
state,
state | CLOSED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Notify the awaiter. Even though the awaiter is most likely the current
// proc, it could also be another proc.
if state.is_awaiter() {
(*pdata).notify_unless(cx.waker());
}
// Take the output from the proc.
let output = ((*pdata).vtable.get_output)(ptr) as *mut R;
return Poll::Ready(Some(output.read()));
}
Err(s) => state = s,
}
}
}
}
}
impl<R> Debug for ProcHandle<R> {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;
fmt.debug_struct("ProcHandle")
.field("pdata", unsafe { &(*pdata) })
.finish_non_exhaustive()
}
}
impl<R> Drop for ProcHandle<R> {
fn drop(&mut self) {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;
// A place where the output will be stored in case it needs to be dropped.
let mut output = None;
unsafe {
// Optimistically assume the `ProcHandle` is being dropped just after creating the
// proc. This is a common case so if the handle is not used, the overhead of it is only
// one compare-exchange operation.
if let Err(mut state) = (*pdata).state.compare_exchange_weak(
SCHEDULED | HANDLE | REFERENCE,
SCHEDULED | REFERENCE,
Ordering::AcqRel,
Ordering::Acquire,
) {
loop {
// If the proc has been completed but not yet closed, that means its output
// must be dropped.
if state.is_completed() && !state.is_closed() {
// Mark the proc as closed in order to grab its output.
match (*pdata).state.compare_exchange_weak(
state,
state | CLOSED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Read the output.
output = Some((((*pdata).vtable.get_output)(ptr) as *mut R).read());
// Update the state variable because we're continuing the loop.
state |= CLOSED;
}
Err(s) => state = s,
}
} else {
// If this is the last reference to the proc and it's not closed, then
// close it and schedule one more time so that its future gets dropped by
// the executor.
let new = if state.get_refcount() == 0 && !state.is_closed() {
SCHEDULED | CLOSED | REFERENCE
} else {
state & !HANDLE
};
// Unset the handle flag.
match (*pdata).state.compare_exchange_weak(
state,
new,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// If this is the last reference to the proc, we need to either
// schedule dropping its future or destroy it.
if state.get_refcount() == 0 {
if !state.is_closed() {
((*pdata).vtable.schedule)(ptr);
} else {
((*pdata).vtable.destroy)(ptr);
}
}
break;
}
Err(s) => state = s,
}
}
}
}
}
// Drop the output if it was taken out of the proc.
drop(output);
}
}

View File

@ -0,0 +1,16 @@
use std::alloc::Layout;
#[derive(Clone, Copy)]
pub(crate) struct ProcLayout {
/// Memory layout of the whole proc.
pub(crate) layout: Layout,
/// Offset into the proc at which the schedule function is stored.
pub(crate) offset_schedule: usize,
/// Offset into the proc at which the future is stored.
pub(crate) offset_future: usize,
/// Offset into the proc at which the output is stored.
pub(crate) offset_output: usize,
}

View File

@ -0,0 +1,25 @@
use std::task::RawWakerVTable;
/// The vtable for a proc.
pub(crate) struct ProcVTable {
/// The raw waker vtable.
pub(crate) raw_waker: RawWakerVTable,
/// Schedules the proc.
pub(crate) schedule: unsafe fn(*const ()),
/// Drops the future inside the proc.
pub(crate) drop_future: unsafe fn(*const ()),
/// Returns a pointer to the output stored after completion.
pub(crate) get_output: unsafe fn(*const ()) -> *const (),
/// Drops a waker or a proc.
pub(crate) decrement: unsafe fn(ptr: *const ()),
/// Destroys the proc.
pub(crate) destroy: unsafe fn(*const ()),
/// Ticks the proc.
pub(crate) tick: unsafe fn(*const ()),
}

View File

@ -0,0 +1,549 @@
use crate::catch_unwind::CatchUnwind;
use crate::layout_helpers::extend;
use crate::lightproc::LightProc;
use crate::proc_data::ProcData;
use crate::proc_layout::ProcLayout;
use crate::proc_vtable::ProcVTable;
use crate::state::*;
use std::alloc::{self, Layout};
use std::cell::Cell;
use std::future::Future;
use std::mem::{self, ManuallyDrop};
use std::panic::AssertUnwindSafe;
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::atomic::Ordering;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
/// Raw pointers to the fields of a proc.
pub(crate) struct RawProc<F, R, S> {
pub(crate) pdata: *const ProcData,
pub(crate) schedule: *const S,
pub(crate) future: *mut F,
pub(crate) output: *mut R,
}
impl<F, R, S> RawProc<F, R, S>
where
F: Future<Output = R> + 'static,
R: 'static,
S: Fn(LightProc) + 'static,
{
/// Allocates a proc with the given `future` and `schedule` function.
///
/// It is assumed there are initially only the `LightProc` reference and the `ProcHandle`.
pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> {
// Compute the layout of the proc for allocation. Abort if the computation fails.
let proc_layout = Self::proc_layout();
unsafe {
// Allocate enough space for the entire proc.
let raw_proc = match NonNull::new(alloc::alloc(proc_layout.layout) as *mut ()) {
None => std::process::abort(),
Some(p) => p,
};
let raw = Self::from_ptr(raw_proc.as_ptr());
// Write the pdata as the first field of the proc.
(raw.pdata as *mut ProcData).write(ProcData {
state: AtomicState::new(SCHEDULED | HANDLE | REFERENCE),
awaiter: Cell::new(None),
vtable: &ProcVTable {
raw_waker: RawWakerVTable::new(
Self::clone_waker,
Self::wake,
Self::wake_by_ref,
Self::decrement,
),
schedule: Self::schedule,
drop_future: Self::drop_future,
get_output: Self::get_output,
decrement: Self::decrement,
destroy: Self::destroy,
tick: Self::tick,
},
});
// Write the schedule function as the third field of the proc.
(raw.schedule as *mut S).write(schedule);
// Write the future as the fourth field of the proc.
raw.future.write(future);
raw_proc
}
}
/// Returns the memory layout for a proc.
#[inline(always)]
fn proc_layout() -> ProcLayout {
let layout_pdata = Layout::new::<ProcData>();
let layout_schedule = Layout::new::<S>();
let layout_future = Layout::new::<CatchUnwind<AssertUnwindSafe<F>>>();
let layout_output = Layout::new::<R>();
let size_union = layout_future.size().max(layout_output.size());
let align_union = layout_future.align().max(layout_output.align());
let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) };
let layout = layout_pdata;
let (layout, offset_schedule) = extend(layout, layout_schedule);
let (layout, offset_union) = extend(layout, layout_union);
let offset_future = offset_union;
let offset_output = offset_union;
ProcLayout {
layout,
offset_schedule,
offset_future,
offset_output,
}
}
/// Creates a `RawProc` from a raw proc pointer.
#[inline]
pub(crate) fn from_ptr(ptr: *const ()) -> Self {
let proc_layout = Self::proc_layout();
let p = ptr as *const u8;
unsafe {
Self {
pdata: p as *const ProcData,
schedule: p.add(proc_layout.offset_schedule) as *const S,
future: p.add(proc_layout.offset_future) as *mut F,
output: p.add(proc_layout.offset_output) as *mut R,
}
}
}
/// Wakes a waker.
unsafe fn wake(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
let mut state = (*raw.pdata).state.load(Ordering::Acquire);
loop {
// If the proc is completed or closed, it can't be woken.
if state.intersects(COMPLETED | CLOSED) {
// Drop the waker.
Self::decrement(ptr);
break;
}
// If the proc is already scheduled, we just need to synchronize with the thread that
// will run the proc by "publishing" our current view of the memory.
if state.is_scheduled() {
// Update the state without actually modifying it.
match (*raw.pdata).state.compare_exchange_weak(
state.into(),
state.into(),
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Drop the waker.
Self::decrement(ptr);
break;
}
Err(s) => state = s,
}
} else {
// Mark the proc as scheduled.
match (*raw.pdata).state.compare_exchange_weak(
state,
state | SCHEDULED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// If the proc was not yet scheduled and isn't currently running, now is the
// time to schedule it.
if !state.is_running() {
// Schedule the proc.
let proc = LightProc {
raw_proc: NonNull::new_unchecked(ptr as *mut ()),
};
(*raw.schedule)(proc);
} else {
// Drop the waker.
Self::decrement(ptr);
}
break;
}
Err(s) => state = s,
}
}
}
}
/// Wakes a waker by reference.
unsafe fn wake_by_ref(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
let mut state = (*raw.pdata).state.load(Ordering::Acquire);
loop {
// If the proc is completed or closed, it can't be woken.
if state.intersects(COMPLETED | CLOSED) {
break;
}
// If the proc is already scheduled, we just need to synchronize with the thread that
// will run the proc by "publishing" our current view of the memory.
if state.is_scheduled() {
// Update the state without actually modifying it.
match (*raw.pdata).state.compare_exchange_weak(
state,
state,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(s) => state = s,
}
} else {
// If the proc is not scheduled nor running, we'll need to schedule after waking.
let new = if !state.intersects(SCHEDULED | RUNNING) {
(state | SCHEDULED) + 1
} else {
state | SCHEDULED
};
// Mark the proc as scheduled.
match (*raw.pdata).state.compare_exchange_weak(
state,
new,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// If the proc is not scheduled nor running, now is the time to schedule.
if !state.intersects(SCHEDULED | RUNNING) {
// Schedule the proc.
let proc = LightProc {
raw_proc: NonNull::new_unchecked(ptr as *mut ()),
};
(*raw.schedule)(proc);
}
break;
}
Err(s) => state = s,
}
}
}
}
/// Clones a waker.
unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
let raw = Self::from_ptr(ptr);
let raw_waker = &(*raw.pdata).vtable.raw_waker;
// Increment the reference count. With any kind of reference-counted data structure,
// relaxed ordering is fine when the reference is being cloned.
let state = (*raw.pdata).state.fetch_add(1, Ordering::Relaxed);
// If the reference count overflowed, abort.
if state.bits() > i64::MAX as u64 {
std::process::abort();
}
RawWaker::new(ptr, raw_waker)
}
/// Drops a waker or a proc.
///
/// This function will decrement the reference count. If it drops down to zero and the
/// associated join handle has been dropped too, then the proc gets destroyed.
#[inline]
unsafe fn decrement(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
// Decrement the reference count.
let mut new = (*raw.pdata)
.state
.fetch_sub(1, Ordering::AcqRel);
new.set_refcount(new.get_refcount().saturating_sub(1));
// If this was the last reference to the proc and the `ProcHandle` has been dropped as
// well, then destroy the proc.
if new.get_refcount() == 0 && !new.is_handle() {
Self::destroy(ptr);
}
}
/// Schedules a proc for running.
///
/// This function doesn't modify the state of the proc. It only passes the proc reference to
/// its schedule function.
unsafe fn schedule(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
(*raw.schedule)(LightProc {
raw_proc: NonNull::new_unchecked(ptr as *mut ()),
});
}
/// Drops the future inside a proc.
#[inline]
unsafe fn drop_future(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
// We need a safeguard against panics because the destructor can panic.
raw.future.drop_in_place();
}
/// Returns a pointer to the output inside a proc.
unsafe fn get_output(ptr: *const ()) -> *const () {
let raw = Self::from_ptr(ptr);
raw.output as *const ()
}
/// Cleans up proc's resources and deallocates it.
///
/// If the proc has not been closed, then its future or the output will be dropped. The
/// schedule function gets dropped too.
#[inline]
unsafe fn destroy(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
let proc_layout = Self::proc_layout();
// We need a safeguard against panics because destructors can panic.
// Drop the schedule function.
(raw.schedule as *mut S).drop_in_place();
// Finally, deallocate the memory reserved by the proc.
alloc::dealloc(ptr as *mut u8, proc_layout.layout);
}
/// Ticks a proc.
///
/// Ticking will call `poll` once and re-schedule the task if it returns `Poll::Pending`. If
/// polling its future panics, the proc will be closed and the panic propagated into the caller.
unsafe fn tick(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
// Create a context from the raw proc pointer and the vtable inside the its pdata.
let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(
ptr,
&(*raw.pdata).vtable.raw_waker,
)));
let cx = &mut Context::from_waker(&waker);
let mut state = (*raw.pdata).state.load(Ordering::Acquire);
// Update the proc's state before polling its future.
loop {
// If the proc has been closed, drop the proc reference and return.
if state.is_closed() {
// Notify the awaiter that the proc has been closed.
if state.is_awaiter() {
(*raw.pdata).notify();
}
// Drop the future.
Self::drop_future(ptr);
// Drop the proc reference.
Self::decrement(ptr);
return;
}
// Mark the proc as unscheduled and running.
match (*raw.pdata).state.compare_exchange_weak(
state,
(state & !SCHEDULED) | RUNNING,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Update the state because we're continuing with polling the future.
state = (state & !SCHEDULED) | RUNNING;
break;
}
Err(s) => state = s,
}
}
// If we get here the lightproc is not closed and marked as running and unscheduled.
// Poll the inner future, but surround it with a guard that closes the proc in case polling
// panics.
let guard = Guard(raw);
let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);
mem::forget(guard);
match poll {
Poll::Ready(out) => {
// Replace the future with its output.
Self::drop_future(ptr);
raw.output.write(out);
// A place where the output will be stored in case it needs to be dropped.
let mut output = None;
// The proc is now completed.
loop {
// If the handle is dropped, we'll need to close it and drop the output.
let new = if !state.is_handle() {
(state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
} else {
(state & !RUNNING & !SCHEDULED) | COMPLETED
};
// Mark the proc as not running and completed.
match (*raw.pdata).state.compare_exchange_weak(
state,
new,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// If the handle is dropped or if the proc was closed while running,
// now it's time to drop the output.
if !state.is_handle() || state.is_closed() {
// Read the output.
output = Some(raw.output.read());
}
// Notify the awaiter that the proc has been completed.
if state.is_awaiter() {
(*raw.pdata).notify();
}
// Drop the proc reference.
Self::decrement(ptr);
break;
}
Err(s) => state = s,
}
}
// Drop the output if it was taken out of the proc.
drop(output);
}
Poll::Pending => {
// The proc is still not completed.
loop {
// If the proc was closed while running, we'll need to unschedule in case it
// was woken and then clean up its resources.
let new = if state.is_closed() {
state & !( RUNNING | SCHEDULED )
} else {
state & !RUNNING
};
// Mark the proc as not running.
match (*raw.pdata).state.compare_exchange_weak(
state,
new,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(state) => {
// If the proc was closed while running, we need to drop its future.
// If the proc was woken while running, we need to schedule it.
// Otherwise, we just drop the proc reference.
if state.is_closed() {
// The thread that closed the proc didn't drop the future because
// it was running so now it's our responsibility to do so.
Self::drop_future(ptr);
// Drop the proc reference.
Self::decrement(ptr);
} else if state.is_scheduled() {
// The thread that has woken the proc didn't reschedule it because
// it was running so now it's our responsibility to do so.
Self::schedule(ptr);
} else {
// Drop the proc reference.
Self::decrement(ptr);
}
break;
}
Err(s) => state = s,
}
}
}
}
}
}
impl<F, R, S> Clone for RawProc<F, R, S> {
fn clone(&self) -> Self {
Self {
pdata: self.pdata,
schedule: self.schedule,
future: self.future,
output: self.output,
}
}
}
impl<F, R, S> Copy for RawProc<F, R, S> {}
/// A guard that closes the proc if polling its future panics.
struct Guard<F, R, S>(RawProc<F, R, S>)
where
F: Future<Output = R> + 'static,
R: 'static,
S: Fn(LightProc) + 'static;
impl<F, R, S> Drop for Guard<F, R, S>
where
F: Future<Output = R> + 'static,
R: 'static,
S: Fn(LightProc) + 'static,
{
fn drop(&mut self) {
let raw = self.0;
let ptr = raw.pdata as *const ();
unsafe {
let mut state = (*raw.pdata).state.load(Ordering::Acquire);
loop {
// If the proc was closed while running, then unschedule it, drop its
// future, and drop the proc reference.
if state.is_closed() {
// We still need to unschedule the proc because it is possible it was
// woken while running.
(*raw.pdata).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
// The thread that closed the proc didn't drop the future because it
// was running so now it's our responsibility to do so.
RawProc::<F, R, S>::drop_future(ptr);
// Drop the proc reference.
RawProc::<F, R, S>::decrement(ptr);
break;
}
// Mark the proc as not running, not scheduled, and closed.
match (*raw.pdata).state.compare_exchange_weak(
state,
(state & !RUNNING & !SCHEDULED) | CLOSED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(state) => {
// Drop the future because the proc is now closed.
RawProc::<F, R, S>::drop_future(ptr);
// Notify the awaiter that the proc has been closed.
if state.is_awaiter() {
(*raw.pdata).notify();
}
// Drop the proc reference.
RawProc::<F, R, S>::decrement(ptr);
break;
}
Err(s) => state = s,
}
}
}
}
}

View File

@ -0,0 +1,119 @@
//!
//! Handle for recoverable process
use std::any::Any;
use crate::proc_data::ProcData;
use crate::proc_handle::ProcHandle;
use crate::state::State;
use std::fmt::{self, Debug, Formatter};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
/// Recoverable handle which encapsulates a standard Proc Handle and contain all panics inside.
///
/// Execution of `after_panic` will be immediate on polling the [RecoverableHandle]'s future.
pub struct RecoverableHandle<R> {
inner: ProcHandle<thread::Result<R>>,
/// Panic callback
///
/// This callback will be called if the interior future panics. It is passed the panic
// reason i.e. the `Err` of [`std::thread::Result`]
panicked: Option<Box<dyn FnOnce(Box<dyn Any + Send>) + Send + Sync>>,
}
impl<R> RecoverableHandle<R> {
pub(crate) fn new(inner: ProcHandle<thread::Result<R>>) -> Self {
RecoverableHandle {
inner,
panicked: None,
}
}
/// Cancels the proc.
///
/// If the proc has already completed, calling this method will have no effect.
///
/// When a proc is cancelled, its future cannot be polled again and will be dropped instead.
pub fn cancel(&self) {
self.inner.cancel()
}
/// Returns a state of the ProcHandle.
pub fn state(&self) -> State {
self.inner.state()
}
/// Adds a callback that will be executed should the inner future `panic!`s
///
/// ```rust
/// # use std::any::Any;
/// use lightproc::proc_stack::ProcStack;
/// use lightproc::proc_state::EmptyProcState;
/// # use lightproc::prelude::*;
/// #
/// # // ... future that does work
/// # let future = async {
/// # println!("Doing some work");
/// # };
/// #
/// # // ... basic schedule function with no waker logic
/// # fn schedule_function(proc: LightProc) {;}
/// #
/// # // ... process stack with a lifecycle callback
/// # let proc_stack =
/// # ProcStack::default()
/// # .with_after_panic(|s: &mut EmptyProcState| {
/// # println!("After panic started!");
/// # });
/// #
/// // ... creating a recoverable process
/// let (proc, recoverable) = LightProc::recoverable(
/// future,
/// schedule_function,
/// );
///
/// recoverable
/// .on_return(|_e: Box<dyn Any + Send>| {
/// println!("Inner future panicked");
/// });
/// ```
pub fn on_panic<F>(mut self, callback: F) -> Self
where F: FnOnce(Box<dyn Any + Send>) + Send + Sync + 'static,
{
self.panicked = Some(Box::new(callback));
self
}
}
impl<R> Future for RecoverableHandle<R> {
type Output = Option<R>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match Pin::new(&mut self.inner).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(val))) => Poll::Ready(Some(val)),
Poll::Ready(Some(Err(e))) => {
if let Some(callback) = self.panicked.take() {
callback(e);
}
Poll::Ready(None)
},
}
}
}
impl<R> Debug for RecoverableHandle<R> {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
let ptr = self.inner.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;
fmt.debug_struct("ProcHandle")
.field("pdata", unsafe { &(*pdata) })
.finish_non_exhaustive()
}
}

View File

@ -0,0 +1,390 @@
use std::sync::atomic::{AtomicU64, Ordering};
/// Set if the proc is scheduled for running.
///
/// A proc is considered to be scheduled whenever its `LightProc` reference exists. It is in scheduled
/// state at the moment of creation and when it gets unpaused either by its `ProcHandle` or woken
/// by a `Waker`.
///
/// This flag can't be set when the proc is completed. However, it can be set while the proc is
/// running, in which case it will be rescheduled as soon as polling finishes.
pub(crate) const SCHEDULED: State = State::SCHEDULED;
/// Set if the proc is running.
///
/// A proc is running state while its future is being polled.
///
/// This flag can't be set when the proc is completed. However, it can be in scheduled state while
/// it is running, in which case it will be rescheduled when it stops being polled.
pub(crate) const RUNNING: State = State::RUNNING;
/// Set if the proc has been completed.
///
/// This flag is set when polling returns `Poll::Ready`. The output of the future is then stored
/// inside the proc until it becomes stopped. In fact, `ProcHandle` picks the output up by marking
/// the proc as stopped.
///
/// This flag can't be set when the proc is scheduled or completed.
pub(crate) const COMPLETED: State = State::COMPLETED;
/// Set if the proc is closed.
///
/// If a proc is closed, that means its either cancelled or its output has been consumed by the
/// `ProcHandle`. A proc becomes closed when:
///
/// 1. It gets cancelled by `LightProc::cancel()` or `ProcHandle::cancel()`.
/// 2. Its output is awaited by the `ProcHandle`.
/// 3. It panics while polling the future.
/// 4. It is completed and the `ProcHandle` is dropped.
pub(crate) const CLOSED: State = State::CLOSED;
/// Set if the `ProcHandle` still exists.
///
/// The `ProcHandle` is a special case in that it is only tracked by this flag, while all other
/// proc references (`LightProc` and `Waker`s) are tracked by the reference count.
pub(crate) const HANDLE: State = State::HANDLE;
/// Set if the `ProcHandle` is awaiting the output.
///
/// This flag is set while there is a registered awaiter of type `Waker` inside the proc. When the
/// proc gets closed or completed, we need to wake the awaiter. This flag can be used as a fast
/// check that tells us if we need to wake anyone without acquiring the lock inside the proc.
pub(crate) const AWAITER: State = State::AWAITER;
/// Set if the awaiter is locked.
///
/// This lock is acquired before a new awaiter is registered or the existing one is woken.
pub(crate) const LOCKED: State = State::LOCKED;
/// A single reference.
///
/// The lower bits in the state contain various flags representing the proc state, while the upper
/// bits contain the reference count. The value of `REFERENCE` represents a single reference in the
/// total reference count.
///
/// Note that the reference counter only tracks the `LightProc` and `Waker`s. The `ProcHandle` is
/// tracked separately by the `HANDLE` flag.
pub(crate) const REFERENCE: State = State::REFERENCE;
bitflags::bitflags! {
#[derive(Default)]
pub struct State: u64 {
const SCHEDULED = 1 << 0;
const RUNNING = 1 << 1;
const COMPLETED = 1 << 2;
const CLOSED = 1 << 3;
const HANDLE = 1 << 4;
const AWAITER = 1 << 5;
const LOCKED = 1 << 6;
const REFERENCE = 1 << 7;
}
}
impl State {
#[inline(always)]
const fn new(bits: u64) -> Self {
unsafe { Self::from_bits_unchecked(bits) }
}
/// Returns `true` if the future is in the pending.
#[inline(always)]
pub fn is_pending(&self) -> bool {
!self.is_completed()
}
bitfield::bitfield_fields! {
u64;
#[inline(always)]
/// A proc is considered to be scheduled whenever its `LightProc` reference exists. It is in scheduled
/// state at the moment of creation and when it gets unpaused either by its `ProcHandle` or woken
/// by a `Waker`.
///
/// This flag can't be set when the proc is completed. However, it can be set while the proc is
/// running, in which case it will be rescheduled as soon as polling finishes.
pub is_scheduled, set_scheduled: 0;
#[inline(always)]
/// A proc is running state while its future is being polled.
///
/// This flag can't be set when the proc is completed. However, it can be in scheduled state while
/// it is running, in which case it will be rescheduled when it stops being polled.
pub is_running, set_running: 1;
#[inline(always)]
/// Set if the proc has been completed.
///
/// This flag is set when polling returns `Poll::Ready`. The output of the future is then stored
/// inside the proc until it becomes stopped. In fact, `ProcHandle` picks the output up by marking
/// the proc as stopped.
///
/// This flag can't be set when the proc is scheduled or completed.
pub is_completed, set_completed: 2;
#[inline(always)]
/// Set if the proc is closed.
///
/// If a proc is closed, that means its either cancelled or its output has been consumed by the
/// `ProcHandle`. A proc becomes closed when:
///
/// 1. It gets cancelled by `LightProc::cancel()` or `ProcHandle::cancel()`.
/// 2. Its output is awaited by the `ProcHandle`.
/// 3. It panics while polling the future.
/// 4. It is completed and the `ProcHandle` is dropped.
pub is_closed, set_closed: 3;
#[inline(always)]
/// Set if the `ProcHandle` still exists.
///
/// The `ProcHandle` is a special case in that it is only tracked by this flag, while all other
/// proc references (`LightProc` and `Waker`s) are tracked by the reference count.
pub is_handle, set_handle: 4;
#[inline(always)]
/// Set if the `ProcHandle` is awaiting the output.
///
/// This flag is set while there is a registered awaiter of type `Waker` inside the proc. When the
/// proc gets closed or completed, we need to wake the awaiter. This flag can be used as a fast
/// check that tells us if we need to wake anyone without acquiring the lock inside the proc.
pub is_awaiter, set_awaiter: 5;
#[inline(always)]
/// Set if the awaiter is locked.
///
/// This lock is acquired before a new awaiter is registered or the existing one is woken.
pub is_locked, set_locked: 6;
#[inline(always)]
/// The lower bits in the state contain various flags representing the proc state, while the upper
/// bits contain the reference count.
/// Note that the reference counter only tracks the `LightProc` and `Waker`s. The `ProcHandle` is
/// tracked separately by the `HANDLE` flag.
pub get_refcount, set_refcount: 63, 7;
}
}
impl std::ops::Add<u64> for State {
type Output = State;
fn add(mut self, rhs: u64) -> Self::Output {
self.set_refcount(self.get_refcount() + rhs);
self
}
}
impl std::ops::Sub<u64> for State {
type Output = State;
fn sub(mut self, rhs: u64) -> Self::Output {
self.set_refcount(self.get_refcount() - rhs);
self
}
}
impl<T> bitfield::BitRange<T> for State
where u64: bitfield::BitRange<T>
{
fn bit_range(&self, msb: usize, lsb: usize) -> T {
self.bits.bit_range(msb, lsb)
}
fn set_bit_range(&mut self, msb: usize, lsb: usize, value: T) {
self.bits.set_bit_range(msb, lsb, value)
}
}
impl Into<usize> for State {
fn into(self) -> usize {
self.bits as usize
}
}
#[repr(transparent)]
pub struct AtomicState {
inner: AtomicU64,
}
impl AtomicState {
#[inline(always)]
pub const fn new(v: State) -> Self {
let inner = AtomicU64::new(v.bits);
Self { inner }
}
#[inline(always)]
pub fn load(&self, order: Ordering) -> State {
State::new(self.inner.load(order))
}
#[inline(always)]
#[allow(dead_code)]
pub fn store(&self, val: State, order: Ordering) {
self.inner.store(val.bits, order)
}
pub fn compare_exchange(
&self,
current: State,
new: State,
success: Ordering,
failure: Ordering
) -> Result<State, State>
{
self.inner.compare_exchange(current.bits, new.bits, success, failure)
.map(|u| State::new(u))
.map_err(|u| State::new(u))
}
pub fn compare_exchange_weak(
&self,
current: State,
new: State,
success: Ordering,
failure: Ordering
) -> Result<State, State>
{
self.inner.compare_exchange_weak(current.bits, new.bits, success, failure)
.map(|u| State::new(u))
.map_err(|u| State::new(u))
}
pub fn fetch_or(&self, val: State, order: Ordering) -> State {
State::new(self.inner.fetch_or(val.bits, order))
}
pub fn fetch_and(&self, val: State, order: Ordering) -> State {
State::new(self.inner.fetch_and(val.bits, order))
}
// FIXME: Do this properly
pub fn fetch_add(&self, val: u64, order: Ordering) -> State {
State::new(self.inner.fetch_add(val << 7, order))
}
// FIXME: Do this properly
pub fn fetch_sub(&self, val: u64, order: Ordering) -> State {
State::new(self.inner.fetch_sub(val << 7, order))
}
}
#[cfg(test)]
mod tests {
use crate::state::*;
#[test]
fn test_state_has_debug() {
let state = SCHEDULED | AWAITER;
println!("{:?}", state);
}
#[test]
fn test_is_scheduled_returns_true() {
let state = SCHEDULED;
assert_eq!(state.is_scheduled(), true);
let mut state2 = State::default();
state2.set_scheduled(true);
assert_eq!(state, state2)
}
#[test]
fn test_is_scheduled_returns_false() {
let state = State::default();
assert_eq!(state.is_scheduled(), false);
}
#[test]
fn test_is_running_returns_true() {
let state = RUNNING;
assert_eq!(state.is_running(), true);
}
#[test]
fn test_is_running_returns_false() {
let state = State::default();
assert_eq!(state.is_running(), false);
}
#[test]
fn test_is_completed_returns_true() {
let state = COMPLETED;
assert_eq!(state.is_completed(), true);
}
#[test]
fn test_is_completed_returns_false() {
let state = State::default();
assert_eq!(state.is_completed(), false);
}
#[test]
fn test_is_closed_returns_true() {
let state = CLOSED;
assert_eq!(state.is_closed(), true);
}
#[test]
fn test_is_closed_returns_false() {
let state = State::default();
assert_eq!(state.is_closed(), false);
}
#[test]
fn test_is_handle_returns_true() {
let state = HANDLE;
assert_eq!(state.is_handle(), true);
}
#[test]
fn test_is_handle_returns_false() {
let state = State::default();
assert_eq!(state.is_handle(), false);
}
#[test]
fn test_is_awaiter_returns_true() {
let state = AWAITER;
assert_eq!(state.is_awaiter(), true);
}
#[test]
fn test_is_awaiter_returns_false() {
let state = State::default();
assert_eq!(state.is_awaiter(), false);
}
#[test]
fn test_is_locked_returns_true() {
let state = LOCKED;
assert_eq!(state.is_locked(), true);
}
#[test]
fn test_is_locked_returns_false() {
let state = State::default();
assert_eq!(state.is_locked(), false);
}
#[test]
fn test_is_pending_returns_true() {
let state = State::default();
assert_eq!(state.is_pending(), true);
}
#[test]
fn test_is_pending_returns_false() {
let state = COMPLETED;
assert_eq!(state.is_pending(), false);
}
#[test]
fn test_add_sub_refcount() {
let state = State::default();
assert_eq!(state.get_refcount(), 0);
let state = state + 5;
assert_eq!(state.get_refcount(), 5);
let mut state = state - 2;
assert_eq!(state.get_refcount(), 3);
state.set_refcount(1);
assert_eq!(state.get_refcount(), 1);
}
}

View File

@ -0,0 +1,15 @@
use lightproc::proc_stack::ProcStack;
use lightproc::proc_state::EmptyProcState;
#[test]
fn stack_copy() {
let stack = ProcStack::default()
.with_pid(12)
.with_after_panic(|_s: &mut EmptyProcState| {
println!("After panic!");
});
let stack2 = stack;
assert_eq!(stack2.get_pid(), 12);
}