Run rustfmt

This commit is contained in:
Nadja Reitzenstein
2022-05-05 15:50:44 +02:00
parent 475cb9b9b4
commit 2d9f30b55b
74 changed files with 1243 additions and 1053 deletions

View File

@ -1,22 +1,22 @@
use executor::prelude::*;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use executor::prelude::*;
fn increment(b: &mut Criterion) {
let mut sum = 0;
let executor = Executor::new();
b.bench_function("Executor::run", |b| b.iter(|| {
executor.run(
async {
b.bench_function("Executor::run", |b| {
b.iter(|| {
executor.run(async {
(0..10_000_000).for_each(|_| {
sum += 1;
});
},
);
}));
});
})
});
black_box(sum);
}
criterion_group!(perf, increment);
criterion_main!(perf);
criterion_main!(perf);

View File

@ -1,8 +1,8 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use executor::load_balancer;
use executor::prelude::*;
use futures_timer::Delay;
use std::time::Duration;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
#[cfg(feature = "tokio-runtime")]
mod benches {
@ -27,7 +27,6 @@ mod benches {
pub fn spawn_single(b: &mut Criterion) {
_spawn_single(b);
}
}
criterion_group!(spawn, benches::spawn_lot, benches::spawn_single);
@ -36,29 +35,29 @@ criterion_main!(spawn);
// Benchmark for a 10K burst task spawn
fn _spawn_lot(b: &mut Criterion) {
let executor = Executor::new();
b.bench_function("spawn_lot", |b| b.iter(|| {
let _ = (0..10_000)
.map(|_| {
executor.spawn(
async {
b.bench_function("spawn_lot", |b| {
b.iter(|| {
let _ = (0..10_000)
.map(|_| {
executor.spawn(async {
let duration = Duration::from_millis(1);
Delay::new(duration).await;
},
)
})
.collect::<Vec<_>>();
}));
})
})
.collect::<Vec<_>>();
})
});
}
// Benchmark for a single task spawn
fn _spawn_single(b: &mut Criterion) {
let executor = Executor::new();
b.bench_function("spawn single", |b| b.iter(|| {
executor.spawn(
async {
b.bench_function("spawn single", |b| {
b.iter(|| {
executor.spawn(async {
let duration = Duration::from_millis(1);
Delay::new(duration).await;
},
);
}));
});
})
});
}

View File

@ -1,7 +1,7 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use executor::load_balancer::{core_count, get_cores, stats, SmpStats};
use executor::placement;
use std::thread;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
fn stress_stats<S: SmpStats + Sync + Send>(stats: &'static S) {
let mut handles = Vec::with_capacity(*core_count());
@ -27,9 +27,11 @@ fn stress_stats<S: SmpStats + Sync + Send>(stats: &'static S) {
// 158,278 ns/iter (+/- 117,103)
fn lockless_stats_bench(b: &mut Criterion) {
b.bench_function("stress_stats", |b| b.iter(|| {
stress_stats(stats());
}));
b.bench_function("stress_stats", |b| {
b.iter(|| {
stress_stats(stats());
})
});
}
fn lockless_stats_bad_load(b: &mut Criterion) {
@ -45,9 +47,11 @@ fn lockless_stats_bad_load(b: &mut Criterion) {
}
}
b.bench_function("get_sorted_load", |b| b.iter(|| {
let _sorted_load = stats.get_sorted_load();
}));
b.bench_function("get_sorted_load", |b| {
b.iter(|| {
let _sorted_load = stats.get_sorted_load();
})
});
}
fn lockless_stats_good_load(b: &mut Criterion) {
@ -59,11 +63,17 @@ fn lockless_stats_good_load(b: &mut Criterion) {
stats.store_load(i, i);
}
b.bench_function("get_sorted_load", |b| b.iter(|| {
let _sorted_load = stats.get_sorted_load();
}));
b.bench_function("get_sorted_load", |b| {
b.iter(|| {
let _sorted_load = stats.get_sorted_load();
})
});
}
criterion_group!(stats_bench, lockless_stats_bench, lockless_stats_bad_load,
lockless_stats_good_load);
criterion_group!(
stats_bench,
lockless_stats_bench,
lockless_stats_bad_load,
lockless_stats_good_load
);
criterion_main!(stats_bench);

View File

@ -1,12 +1,12 @@
use executor::pool;
use executor::prelude::*;
use futures_util::{stream::FuturesUnordered, Stream};
use futures_util::{FutureExt, StreamExt};
use lightproc::prelude::RecoverableHandle;
use std::io::Write;
use std::panic::resume_unwind;
use std::rc::Rc;
use std::time::Duration;
use futures_util::{stream::FuturesUnordered, Stream};
use futures_util::{FutureExt, StreamExt};
use executor::pool;
use executor::prelude::*;
use lightproc::prelude::RecoverableHandle;
fn main() {
tracing_subscriber::fmt()
@ -24,9 +24,9 @@ fn main() {
let executor = Executor::new();
let mut handles: FuturesUnordered<RecoverableHandle<usize>> = (0..2000).map(|n| {
executor.spawn(
async move {
let mut handles: FuturesUnordered<RecoverableHandle<usize>> = (0..2000)
.map(|n| {
executor.spawn(async move {
let m: u64 = rand::random::<u64>() % 200;
tracing::debug!("Will sleep {} * 1 ms", m);
// simulate some really heavy load.
@ -34,9 +34,9 @@ fn main() {
async_std::task::sleep(Duration::from_millis(1)).await;
}
return n;
},
)
}).collect();
})
})
.collect();
//let handle = handles.fuse().all(|opt| async move { opt.is_some() });
/* Futures passed to `spawn` need to be `Send` so this won't work:
@ -58,12 +58,12 @@ fn main() {
// However, you can't pass it a future outright but have to hand it a generator creating the
// future on the correct thread.
let fut = async {
let local_futs: FuturesUnordered<_> = (0..200).map(|ref n| {
let n = *n;
let exe = executor.clone();
async move {
exe.spawn(
async {
let local_futs: FuturesUnordered<_> = (0..200)
.map(|ref n| {
let n = *n;
let exe = executor.clone();
async move {
exe.spawn(async {
let tid = std::thread::current().id();
tracing::info!("spawn_local({}) is on thread {:?}", n, tid);
exe.spawn_local(async move {
@ -86,10 +86,11 @@ fn main() {
*rc
})
}
).await
}
}).collect();
})
.await
}
})
.collect();
local_futs
};
@ -108,12 +109,10 @@ fn main() {
async_std::task::sleep(Duration::from_secs(20)).await;
tracing::info!("This is taking too long.");
};
executor.run(
async {
let res = futures_util::select! {
_ = a.fuse() => {},
_ = b.fuse() => {},
};
},
);
executor.run(async {
let res = futures_util::select! {
_ = a.fuse() => {},
_ = b.fuse() => {},
};
});
}

View File

@ -28,10 +28,10 @@
#![forbid(unused_import_braces)]
pub mod load_balancer;
pub mod manage;
pub mod placement;
pub mod pool;
pub mod run;
pub mod manage;
mod thread_manager;
mod worker;

View File

@ -1,6 +1,2 @@
/// View and Manage the current processes of this executor
pub struct Manager {
}
pub struct Manager {}

View File

@ -7,19 +7,19 @@
//! [`spawn`]: crate::pool::spawn
//! [`Worker`]: crate::run_queue::Worker
use std::cell::Cell;
use crate::thread_manager::{ThreadManager, DynamicRunner};
use crate::run::block;
use crate::thread_manager::{DynamicRunner, ThreadManager};
use crate::worker::{Sleeper, WorkerThread};
use crossbeam_deque::{Injector, Stealer};
use lightproc::lightproc::LightProc;
use lightproc::recoverable_handle::RecoverableHandle;
use std::cell::Cell;
use std::future::Future;
use std::iter::Iterator;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::sync::Arc;
use std::time::Duration;
use crossbeam_deque::{Injector, Stealer};
use crate::run::block;
use crate::worker::{Sleeper, WorkerThread};
#[derive(Debug)]
struct Spooler<'a> {
@ -31,10 +31,13 @@ struct Spooler<'a> {
impl Spooler<'_> {
pub fn new() -> Self {
let spool = Arc::new(Injector::new());
let threads = Box::leak(Box::new(
ThreadManager::new(2, AsyncRunner, spool.clone())));
let threads = Box::leak(Box::new(ThreadManager::new(2, AsyncRunner, spool.clone())));
threads.initialize();
Self { spool, threads, _marker: PhantomData }
Self {
spool,
threads,
_marker: PhantomData,
}
}
}
@ -53,9 +56,7 @@ impl<'a, 'executor: 'a> Executor<'executor> {
fn schedule(&self) -> impl Fn(LightProc) + 'a {
let task_queue = self.spooler.spool.clone();
move |lightproc: LightProc| {
task_queue.push(lightproc)
}
move |lightproc: LightProc| task_queue.push(lightproc)
}
///
@ -94,23 +95,21 @@ impl<'a, 'executor: 'a> Executor<'executor> {
/// # }
/// ```
pub fn spawn<F, R>(&self, future: F) -> RecoverableHandle<R>
where
F: Future<Output = R> + Send + 'a,
R: Send + 'a,
where
F: Future<Output = R> + Send + 'a,
R: Send + 'a,
{
let (task, handle) =
LightProc::recoverable(future, self.schedule());
let (task, handle) = LightProc::recoverable(future, self.schedule());
task.schedule();
handle
}
pub fn spawn_local<F, R>(&self, future: F) -> RecoverableHandle<R>
where
F: Future<Output = R> + 'a,
R: Send + 'a,
where
F: Future<Output = R> + 'a,
R: Send + 'a,
{
let (task, handle) =
LightProc::recoverable(future, schedule_local());
let (task, handle) = LightProc::recoverable(future, schedule_local());
task.schedule();
handle
}
@ -135,8 +134,8 @@ impl<'a, 'executor: 'a> Executor<'executor> {
/// );
/// ```
pub fn run<F, R>(&self, future: F) -> R
where
F: Future<Output = R>,
where
F: Future<Output = R>,
{
unsafe {
// An explicitly uninitialized `R`. Until `assume_init` is called this will not call any
@ -174,17 +173,20 @@ impl DynamicRunner for AsyncRunner {
sleeper
}
fn run_static<'b>(fences: impl Iterator<Item=&'b Stealer<LightProc>>, park_timeout: Duration) -> ! {
fn run_static<'b>(
fences: impl Iterator<Item = &'b Stealer<LightProc>>,
park_timeout: Duration,
) -> ! {
let worker = get_worker();
worker.run_timeout(fences, park_timeout)
}
fn run_dynamic<'b>(fences: impl Iterator<Item=&'b Stealer<LightProc>>) -> ! {
fn run_dynamic<'b>(fences: impl Iterator<Item = &'b Stealer<LightProc>>) -> ! {
let worker = get_worker();
worker.run(fences)
}
fn run_standalone<'b>(fences: impl Iterator<Item=&'b Stealer<LightProc>>) {
fn run_standalone<'b>(fences: impl Iterator<Item = &'b Stealer<LightProc>>) {
let worker = get_worker();
worker.run_once(fences)
}
@ -196,10 +198,9 @@ thread_local! {
fn get_worker() -> &'static WorkerThread<'static, LightProc> {
WORKER.with(|cell| {
let worker = unsafe {
&*cell.as_ptr() as &'static Option<WorkerThread<_>>
};
worker.as_ref()
let worker = unsafe { &*cell.as_ptr() as &'static Option<WorkerThread<_>> };
worker
.as_ref()
.expect("AsyncRunner running outside Executor context")
})
}

View File

@ -45,13 +45,18 @@
//! Throughput hogs determined by a combination of job in / job out frequency and current scheduler task assignment frequency.
//! Threshold of EMA difference is eluded by machine epsilon for floating point arithmetic errors.
use crate::worker::Sleeper;
use crate::{load_balancer, placement};
use core::fmt;
use crossbeam_channel::bounded;
use crossbeam_deque::{Injector, Stealer};
use crossbeam_queue::ArrayQueue;
use fmt::{Debug, Formatter};
use lazy_static::lazy_static;
use lightproc::lightproc::LightProc;
use placement::CoreId;
use std::collections::VecDeque;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::{
sync::{
@ -60,12 +65,7 @@ use std::{
},
thread,
};
use std::sync::{Arc, RwLock};
use crossbeam_channel::bounded;
use crossbeam_deque::{Injector, Stealer};
use tracing::{debug, trace};
use lightproc::lightproc::LightProc;
use crate::worker::Sleeper;
/// The default thread park timeout before checking for new tasks.
const THREAD_PARK_TIMEOUT: Duration = Duration::from_millis(1);
@ -113,10 +113,12 @@ lazy_static! {
pub trait DynamicRunner {
fn setup(task_queue: Arc<Injector<LightProc>>) -> Sleeper<LightProc>;
fn run_static<'b>(fences: impl Iterator<Item=&'b Stealer<LightProc>>,
park_timeout: Duration) -> !;
fn run_dynamic<'b>(fences: impl Iterator<Item=&'b Stealer<LightProc>>) -> !;
fn run_standalone<'b>(fences: impl Iterator<Item=&'b Stealer<LightProc>>);
fn run_static<'b>(
fences: impl Iterator<Item = &'b Stealer<LightProc>>,
park_timeout: Duration,
) -> !;
fn run_dynamic<'b>(fences: impl Iterator<Item = &'b Stealer<LightProc>>) -> !;
fn run_standalone<'b>(fences: impl Iterator<Item = &'b Stealer<LightProc>>);
}
/// The `ThreadManager` is creates and destroys worker threads depending on demand according to
@ -183,11 +185,14 @@ impl<Runner: Debug> Debug for ThreadManager<Runner> {
}
fmt.debug_struct("DynamicPoolManager")
.field("thread pool", &ThreadCount(
&self.static_threads,
&self.dynamic_threads,
&self.parked_threads.len(),
))
.field(
"thread pool",
&ThreadCount(
&self.static_threads,
&self.dynamic_threads,
&self.parked_threads.len(),
),
)
.field("runner", &self.runner)
.field("last_frequency", &self.last_frequency)
.finish()
@ -195,7 +200,11 @@ impl<Runner: Debug> Debug for ThreadManager<Runner> {
}
impl<Runner: DynamicRunner + Sync + Send> ThreadManager<Runner> {
pub fn new(static_threads: usize, runner: Runner, task_queue: Arc<Injector<LightProc>>) -> Self {
pub fn new(
static_threads: usize,
runner: Runner,
task_queue: Arc<Injector<LightProc>>,
) -> Self {
let dynamic_threads = 1.max(num_cpus::get().checked_sub(static_threads).unwrap_or(0));
let parked_threads = ArrayQueue::new(1.max(static_threads + dynamic_threads));
let fences = Arc::new(RwLock::new(Vec::new()));
@ -252,7 +261,10 @@ impl<Runner: DynamicRunner + Sync + Send> ThreadManager<Runner> {
});
// Dynamic thread manager that will allow us to unpark threads when needed
debug!("spooling up {} dynamic worker threads", self.dynamic_threads);
debug!(
"spooling up {} dynamic worker threads",
self.dynamic_threads
);
(0..self.dynamic_threads).for_each(|_| {
let tx = tx.clone();
let fencelock = fencelock.clone();
@ -302,10 +314,11 @@ impl<Runner: DynamicRunner + Sync + Send> ThreadManager<Runner> {
/// Provision threads takes a number of threads that need to be made available.
/// It will try to unpark threads from the dynamic pool, and spawn more threads if needs be.
pub fn provision_threads(&'static self,
n: usize,
fencelock: &Arc<RwLock<Vec<Stealer<LightProc>>>>)
{
pub fn provision_threads(
&'static self,
n: usize,
fencelock: &Arc<RwLock<Vec<Stealer<LightProc>>>>,
) {
let rem = self.unpark_thread(n);
if rem != 0 {
debug!("no more threads to unpark, spawning {} new threads", rem);
@ -391,7 +404,5 @@ impl<Runner: DynamicRunner + Sync + Send> ThreadManager<Runner> {
/// on the request rate.
///
/// It uses frequency based calculation to define work. Utilizing average processing rate.
fn scale_pool(&'static self) {
}
fn scale_pool(&'static self) {}
}

View File

@ -1,10 +1,10 @@
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use crossbeam_queue::SegQueue;
use crossbeam_utils::sync::{Parker, Unparker};
use lightproc::prelude::LightProc;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
pub trait Runnable {
fn run(self);
@ -61,8 +61,14 @@ impl<'a, T: Runnable + 'a> WorkerThread<'a, T> {
let unparker = parker.unparker().clone();
(
Self { task_queue, tasks, local_tasks, parker, _marker },
Sleeper { stealer, unparker }
Self {
task_queue,
tasks,
local_tasks,
parker,
_marker,
},
Sleeper { stealer, unparker },
)
}
@ -71,10 +77,8 @@ impl<'a, T: Runnable + 'a> WorkerThread<'a, T> {
}
/// Run this worker thread "forever" (i.e. until the thread panics or is otherwise killed)
pub fn run(&self, fences: impl Iterator<Item=&'a Stealer<T>>) -> ! {
let fences: Vec<Stealer<T>> = fences
.map(|stealer| stealer.clone())
.collect();
pub fn run(&self, fences: impl Iterator<Item = &'a Stealer<T>>) -> ! {
let fences: Vec<Stealer<T>> = fences.map(|stealer| stealer.clone()).collect();
loop {
self.run_inner(&fences);
@ -82,10 +86,12 @@ impl<'a, T: Runnable + 'a> WorkerThread<'a, T> {
}
}
pub fn run_timeout(&self, fences: impl Iterator<Item=&'a Stealer<T>>, timeout: Duration) -> ! {
let fences: Vec<Stealer<T>> = fences
.map(|stealer| stealer.clone())
.collect();
pub fn run_timeout(
&self,
fences: impl Iterator<Item = &'a Stealer<T>>,
timeout: Duration,
) -> ! {
let fences: Vec<Stealer<T>> = fences.map(|stealer| stealer.clone()).collect();
loop {
self.run_inner(&fences);
@ -93,10 +99,8 @@ impl<'a, T: Runnable + 'a> WorkerThread<'a, T> {
}
}
pub fn run_once(&self, fences: impl Iterator<Item=&'a Stealer<T>>) {
let fences: Vec<Stealer<T>> = fences
.map(|stealer| stealer.clone())
.collect();
pub fn run_once(&self, fences: impl Iterator<Item = &'a Stealer<T>>) {
let fences: Vec<Stealer<T>> = fences.map(|stealer| stealer.clone()).collect();
self.run_inner(fences);
}
@ -123,17 +127,19 @@ impl<'a, T: Runnable + 'a> WorkerThread<'a, T> {
Steal::Success(task) => {
task.run();
continue 'work;
},
}
// If there is no more work to steal from the global queue, try other
// workers next
Steal::Empty => break,
// If a race condition occurred try again with backoff
Steal::Retry => for _ in 0..(1 << i) {
core::hint::spin_loop();
i += 1;
},
Steal::Retry => {
for _ in 0..(1 << i) {
core::hint::spin_loop();
i += 1;
}
}
}
}
@ -145,7 +151,7 @@ impl<'a, T: Runnable + 'a> WorkerThread<'a, T> {
Steal::Success(task) => {
task.run();
continue 'work;
},
}
// If no other worker has work to do we're done once again.
Steal::Empty => break,
@ -169,6 +175,6 @@ impl<'a, T: Runnable + 'a> WorkerThread<'a, T> {
}
#[inline(always)]
fn select_fence<'a, T>(fences: impl Iterator<Item=&'a Stealer<T>>) -> Option<&'a Stealer<T>> {
fn select_fence<'a, T>(fences: impl Iterator<Item = &'a Stealer<T>>) -> Option<&'a Stealer<T>> {
fences.max_by_key(|fence| fence.len())
}
}

View File

@ -1,8 +1,8 @@
use std::io::Write;
use executor::prelude::{spawn, ProcStack};
use executor::run::run;
use std::io::Write;
use std::thread;
use std::time::Duration;
use executor::prelude::{ProcStack, spawn};
#[cfg(feature = "tokio-runtime")]
mod tokio_tests {
@ -21,13 +21,11 @@ mod no_tokio_tests {
}
fn run_test() {
let handle = spawn(
async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
//42
},
);
let handle = spawn(async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
//42
});
let output = run(handle, ProcStack {});

View File

@ -1,11 +1,11 @@
use executor::blocking;
use executor::prelude::ProcStack;
use executor::run::run;
use futures_util::future::join_all;
use lightproc::recoverable_handle::RecoverableHandle;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use executor::prelude::ProcStack;
// Test for slow joins without task bursts during joins.
#[test]
@ -17,12 +17,10 @@ fn slow_join() {
// Send an initial batch of million bursts.
let handles = (0..1_000_000)
.map(|_| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
},
)
blocking::spawn_blocking(async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
})
})
.collect::<Vec<RecoverableHandle<()>>>();
@ -35,12 +33,10 @@ fn slow_join() {
// Spawn yet another batch of work on top of it
let handles = (0..10_000)
.map(|_| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(100);
thread::sleep(duration);
},
)
blocking::spawn_blocking(async {
let duration = Duration::from_millis(100);
thread::sleep(duration);
})
})
.collect::<Vec<RecoverableHandle<()>>>();
@ -63,12 +59,10 @@ fn slow_join_interrupted() {
// Send an initial batch of million bursts.
let handles = (0..1_000_000)
.map(|_| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
},
)
blocking::spawn_blocking(async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
})
})
.collect::<Vec<RecoverableHandle<()>>>();
@ -82,12 +76,10 @@ fn slow_join_interrupted() {
// Spawn yet another batch of work on top of it
let handles = (0..10_000)
.map(|_| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(100);
thread::sleep(duration);
},
)
blocking::spawn_blocking(async {
let duration = Duration::from_millis(100);
thread::sleep(duration);
})
})
.collect::<Vec<RecoverableHandle<()>>>();
@ -111,12 +103,10 @@ fn longhauling_task_join() {
// First batch of overhauling tasks
let _ = (0..100_000)
.map(|_| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1000);
thread::sleep(duration);
},
)
blocking::spawn_blocking(async {
let duration = Duration::from_millis(1000);
thread::sleep(duration);
})
})
.collect::<Vec<RecoverableHandle<()>>>();
@ -127,12 +117,10 @@ fn longhauling_task_join() {
// Send yet another medium sized batch to see how it scales.
let handles = (0..10_000)
.map(|_| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(100);
thread::sleep(duration);
},
)
blocking::spawn_blocking(async {
let duration = Duration::from_millis(100);
thread::sleep(duration);
})
})
.collect::<Vec<RecoverableHandle<()>>>();

View File

@ -1,11 +1,11 @@
use std::any::Any;
use std::fmt::Debug;
use std::ops::Deref;
use crossbeam::channel::{unbounded, Sender};
use futures_executor as executor;
use lazy_static::lazy_static;
use lightproc::prelude::*;
use std::any::Any;
use std::fmt::Debug;
use std::future::Future;
use std::ops::Deref;
use std::thread;
fn spawn_on_thread<F, R>(future: F) -> RecoverableHandle<R>
@ -30,20 +30,17 @@ where
}
let schedule = |t| (QUEUE.deref()).send(t).unwrap();
let (proc, handle) = LightProc::recoverable(
future,
schedule
);
let (proc, handle) = LightProc::recoverable(future, schedule);
let handle = handle
.on_panic(|err: Box<dyn Any + Send>| {
match err.downcast::<&'static str>() {
Ok(reason) => println!("Future panicked: {}", &reason),
Err(err) =>
println!("Future panicked with a non-text reason of typeid {:?}",
err.type_id()),
}
});
let handle = handle.on_panic(
|err: Box<dyn Any + Send>| match err.downcast::<&'static str>() {
Ok(reason) => println!("Future panicked: {}", &reason),
Err(err) => println!(
"Future panicked with a non-text reason of typeid {:?}",
err.type_id()
),
},
);
proc.schedule();

View File

@ -14,15 +14,10 @@ where
{
let (sender, receiver) = channel::unbounded();
let future = async move {
fut.await
};
let future = async move { fut.await };
let schedule = move |t| sender.send(t).unwrap();
let (proc, handle) = LightProc::build(
future,
schedule,
);
let (proc, handle) = LightProc::build(future, schedule);
proc.schedule();

View File

@ -77,9 +77,10 @@ impl LightProc {
/// });
/// ```
pub fn recoverable<'a, F, R, S>(future: F, schedule: S) -> (Self, RecoverableHandle<R>)
where F: Future<Output=R> + 'a,
R: 'a,
S: Fn(LightProc) + 'a,
where
F: Future<Output = R> + 'a,
R: 'a,
S: Fn(LightProc) + 'a,
{
let recovery_future = AssertUnwindSafe(future).catch_unwind();
let (proc, handle) = Self::build(recovery_future, schedule);
@ -115,9 +116,10 @@ impl LightProc {
/// );
/// ```
pub fn build<'a, F, R, S>(future: F, schedule: S) -> (Self, ProcHandle<R>)
where F: Future<Output=R> + 'a,
R: 'a,
S: Fn(LightProc) + 'a,
where
F: Future<Output = R> + 'a,
R: 'a,
S: Fn(LightProc) + 'a,
{
let raw_proc = RawProc::allocate(future, schedule);
let proc = LightProc { raw_proc };

View File

@ -44,12 +44,10 @@ impl ProcData {
let (flags, references) = state.parts();
let new = State::new(flags | CLOSED, references);
// Mark the proc as closed.
match self.state.compare_exchange_weak(
state,
new,
Ordering::AcqRel,
Ordering::Acquire,
) {
match self
.state
.compare_exchange_weak(state, new, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => {
// Notify the awaiter that the proc has been closed.
if state.is_awaiter() {
@ -117,7 +115,8 @@ impl ProcData {
// Release the lock. If we've cleared the awaiter, then also unset the awaiter flag.
if new_is_none {
self.state.fetch_and((!LOCKED & !AWAITER).into(), Ordering::Release);
self.state
.fetch_and((!LOCKED & !AWAITER).into(), Ordering::Release);
} else {
self.state.fetch_and((!LOCKED).into(), Ordering::Release);
}
@ -142,9 +141,7 @@ impl Debug for ProcData {
.field("ref_count", &state.get_refcount())
.finish()
} else {
fmt.debug_struct("ProcData")
.field("state", &state)
.finish()
fmt.debug_struct("ProcData").field("state", &state).finish()
}
}
}

View File

@ -273,9 +273,7 @@ where
let raw = Self::from_ptr(ptr);
// Decrement the reference count.
let new = (*raw.pdata)
.state
.fetch_sub(1, Ordering::AcqRel);
let new = (*raw.pdata).state.fetch_sub(1, Ordering::AcqRel);
let new = new.set_refcount(new.get_refcount().saturating_sub(1));
// If this was the last reference to the proc and the `ProcHandle` has been dropped as
@ -444,13 +442,12 @@ where
// was woken and then clean up its resources.
let (flags, references) = state.parts();
let flags = if state.is_closed() {
flags & !( RUNNING | SCHEDULED )
flags & !(RUNNING | SCHEDULED)
} else {
flags & !RUNNING
};
let new = State::new(flags, references);
// Mark the proc as not running.
match (*raw.pdata).state.compare_exchange_weak(
state,
@ -502,10 +499,10 @@ impl<'a, F, R, S> Copy for RawProc<'a, F, R, S> {}
/// A guard that closes the proc if polling its future panics.
struct Guard<'a, F, R, S>(RawProc<'a, F, R, S>)
where
F: Future<Output = R> + 'a,
R: 'a,
S: Fn(LightProc) + 'a;
where
F: Future<Output = R> + 'a,
R: 'a,
S: Fn(LightProc) + 'a;
impl<'a, F, R, S> Drop for Guard<'a, F, R, S>
where

View File

@ -1,9 +1,9 @@
//!
//! Handle for recoverable process
use std::any::Any;
use crate::proc_data::ProcData;
use crate::proc_handle::ProcHandle;
use crate::state::State;
use std::any::Any;
use std::fmt::{self, Debug, Formatter};
use std::future::Future;
use std::pin::Pin;
@ -80,12 +80,12 @@ impl<R> RecoverableHandle<R> {
/// });
/// ```
pub fn on_panic<F>(mut self, callback: F) -> Self
where F: FnOnce(Box<dyn Any + Send>) + Send + Sync + 'static,
where
F: FnOnce(Box<dyn Any + Send>) + Send + Sync + 'static,
{
self.panicked = Some(Box::new(callback));
self
}
}
impl<R> Future for RecoverableHandle<R> {
@ -102,7 +102,7 @@ impl<R> Future for RecoverableHandle<R> {
}
Poll::Ready(None)
},
}
}
}
}

View File

@ -73,25 +73,22 @@ bitflags::bitflags! {
#[repr(packed)]
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct State {
bytes: [u8; 8]
bytes: [u8; 8],
}
impl State {
#[inline(always)]
pub const fn new(flags: StateFlags, references: u32) -> Self {
let [a,b,c,d] = references.to_ne_bytes();
let [e,f,g,h] = flags.bits.to_ne_bytes();
Self::from_bytes([a,b,c,d,e,f,g,h])
let [a, b, c, d] = references.to_ne_bytes();
let [e, f, g, h] = flags.bits.to_ne_bytes();
Self::from_bytes([a, b, c, d, e, f, g, h])
}
#[inline(always)]
pub const fn parts(self: Self) -> (StateFlags, u32) {
let [a,b,c,d,e,f,g,h] = self.bytes;
let refcount = u32::from_ne_bytes([a,b,c,d]);
let state = unsafe {
StateFlags::from_bits_unchecked(u32::from_ne_bytes([e,f,g,h]))
};
let [a, b, c, d, e, f, g, h] = self.bytes;
let refcount = u32::from_ne_bytes([a, b, c, d]);
let state = unsafe { StateFlags::from_bits_unchecked(u32::from_ne_bytes([e, f, g, h])) };
(state, refcount)
}
@ -101,8 +98,8 @@ impl State {
/// Note that the reference counter only tracks the `LightProc` and `Waker`s. The `ProcHandle` is
/// tracked separately by the `HANDLE` flag.
pub const fn get_refcount(self) -> u32 {
let [a,b,c,d,_,_,_,_] = self.bytes;
u32::from_ne_bytes([a,b,c,d])
let [a, b, c, d, _, _, _, _] = self.bytes;
u32::from_ne_bytes([a, b, c, d])
}
#[inline(always)]
@ -116,7 +113,7 @@ impl State {
#[inline(always)]
pub const fn get_flags(self) -> StateFlags {
let [_, _, _, _, e, f, g, h] = self.bytes;
unsafe { StateFlags::from_bits_unchecked(u32::from_ne_bytes([e,f,g,h])) }
unsafe { StateFlags::from_bits_unchecked(u32::from_ne_bytes([e, f, g, h])) }
}
#[inline(always)]
@ -207,10 +204,10 @@ impl AtomicState {
current: State,
new: State,
success: Ordering,
failure: Ordering
) -> Result<State, State>
{
self.inner.compare_exchange(current.into_u64(), new.into_u64(), success, failure)
failure: Ordering,
) -> Result<State, State> {
self.inner
.compare_exchange(current.into_u64(), new.into_u64(), success, failure)
.map(|u| State::from_u64(u))
.map_err(|u| State::from_u64(u))
}
@ -220,37 +217,37 @@ impl AtomicState {
current: State,
new: State,
success: Ordering,
failure: Ordering
) -> Result<State, State>
{
self.inner.compare_exchange_weak(current.into_u64(), new.into_u64(), success, failure)
failure: Ordering,
) -> Result<State, State> {
self.inner
.compare_exchange_weak(current.into_u64(), new.into_u64(), success, failure)
.map(|u| State::from_u64(u))
.map_err(|u| State::from_u64(u))
}
pub fn fetch_or(&self, val: StateFlags, order: Ordering) -> State {
let [a,b,c,d] = val.bits.to_ne_bytes();
let store = u64::from_ne_bytes([0,0,0,0,a,b,c,d]);
let [a, b, c, d] = val.bits.to_ne_bytes();
let store = u64::from_ne_bytes([0, 0, 0, 0, a, b, c, d]);
State::from_u64(self.inner.fetch_or(store, order))
}
pub fn fetch_and(&self, val: StateFlags, order: Ordering) -> State {
let [a,b,c,d] = val.bits.to_ne_bytes();
let store = u64::from_ne_bytes([!0,!0,!0,!0,a,b,c,d]);
let [a, b, c, d] = val.bits.to_ne_bytes();
let store = u64::from_ne_bytes([!0, !0, !0, !0, a, b, c, d]);
State::from_u64(self.inner.fetch_and(store, order))
}
// FIXME: Do this properly
pub fn fetch_add(&self, val: u32, order: Ordering) -> State {
let [a,b,c,d] = val.to_ne_bytes();
let store = u64::from_ne_bytes([a,b,c,d,0,0,0,0]);
let [a, b, c, d] = val.to_ne_bytes();
let store = u64::from_ne_bytes([a, b, c, d, 0, 0, 0, 0]);
State::from_u64(self.inner.fetch_add(store, order))
}
// FIXME: Do this properly
pub fn fetch_sub(&self, val: u32, order: Ordering) -> State {
let [a,b,c,d] = val.to_ne_bytes();
let store = u64::from_ne_bytes([a,b,c,d,0,0,0,0]);
let [a, b, c, d] = val.to_ne_bytes();
let store = u64::from_ne_bytes([a, b, c, d, 0, 0, 0, 0]);
State::from_u64(self.inner.fetch_sub(store, order))
}
}