mirror of
https://gitlab.com/fabinfra/fabaccess/bffh.git
synced 2024-11-11 01:53:23 +01:00
120 lines
4.3 KiB
Rust
120 lines
4.3 KiB
Rust
use std::io::Write;
|
|
use std::panic::resume_unwind;
|
|
use std::rc::Rc;
|
|
use std::time::Duration;
|
|
use futures_util::{stream::FuturesUnordered, Stream};
|
|
use futures_util::{FutureExt, StreamExt};
|
|
use executor::pool;
|
|
use executor::prelude::*;
|
|
use lightproc::prelude::RecoverableHandle;
|
|
|
|
fn main() {
|
|
tracing_subscriber::fmt()
|
|
.with_max_level(tracing::Level::DEBUG)
|
|
.init();
|
|
|
|
let hook = std::panic::take_hook();
|
|
std::panic::set_hook(Box::new(move |info| {
|
|
let span = tracing::span!(tracing::Level::ERROR, "panic hook").entered();
|
|
let tid = std::thread::current().id();
|
|
tracing::error!("Panicking ThreadId: {:?}", tid);
|
|
tracing::error!("{}", info);
|
|
span.exit();
|
|
}));
|
|
|
|
let executor = Executor::new();
|
|
|
|
let mut handles: FuturesUnordered<RecoverableHandle<usize>> = (0..2000).map(|n| {
|
|
executor.spawn(
|
|
async move {
|
|
let m: u64 = rand::random::<u64>() % 200;
|
|
tracing::debug!("Will sleep {} * 1 ms", m);
|
|
// simulate some really heavy load.
|
|
for i in 0..m {
|
|
async_std::task::sleep(Duration::from_millis(1)).await;
|
|
}
|
|
return n;
|
|
},
|
|
)
|
|
}).collect();
|
|
//let handle = handles.fuse().all(|opt| async move { opt.is_some() });
|
|
|
|
/* Futures passed to `spawn` need to be `Send` so this won't work:
|
|
* let n = 1;
|
|
* let unsend = spawn(async move {
|
|
* let rc = Rc::new(n);
|
|
* let tid = std::thread::current().id();
|
|
* tracing::info!("!Send fut {} running on thread {:?}", *rc, tid);
|
|
* async_std::task::sleep(Duration::from_millis(20)).await;
|
|
* tracing::info!("!Send fut {} still running on thread {:?}!", *rc, tid);
|
|
* async_std::task::sleep(Duration::from_millis(20)).await;
|
|
* tracing::info!("!Send fut {} still running on thread {:?}!", *rc, tid);
|
|
* async_std::task::sleep(Duration::from_millis(20)).await;
|
|
* *rc
|
|
* });
|
|
*/
|
|
|
|
// But you can use `spawn_local` which will make sure to never Send your task to other threads.
|
|
// However, you can't pass it a future outright but have to hand it a generator creating the
|
|
// future on the correct thread.
|
|
let fut = async {
|
|
let local_futs: FuturesUnordered<_> = (0..200).map(|ref n| {
|
|
let n = *n;
|
|
let exe = executor.clone();
|
|
async move {
|
|
exe.spawn(
|
|
async {
|
|
let tid = std::thread::current().id();
|
|
tracing::info!("spawn_local({}) is on thread {:?}", n, tid);
|
|
exe.spawn_local(async move {
|
|
let rc = Rc::new(n);
|
|
|
|
let tid = std::thread::current().id();
|
|
tracing::info!("!Send fut {} running on thread {:?}", *rc, tid);
|
|
|
|
async_std::task::sleep(Duration::from_millis(20)).await;
|
|
|
|
let tid2 = std::thread::current().id();
|
|
tracing::info!("!Send fut {} still running on thread {:?}!", *rc, tid2);
|
|
assert_eq!(tid, tid2);
|
|
|
|
async_std::task::sleep(Duration::from_millis(20)).await;
|
|
|
|
let tid3 = std::thread::current().id();
|
|
tracing::info!("!Send fut {} still running on thread {:?}!", *rc, tid3);
|
|
assert_eq!(tid2, tid3);
|
|
|
|
*rc
|
|
})
|
|
}
|
|
).await
|
|
}
|
|
}).collect();
|
|
local_futs
|
|
};
|
|
|
|
let a = async move {
|
|
let mut local_futs = fut.await;
|
|
while let Some(fut) = local_futs.next().await {
|
|
assert!(fut.is_some());
|
|
tracing::info!("local fut returned {:?}", fut.unwrap().await)
|
|
}
|
|
while let Some(a) = handles.next().await {
|
|
assert!(a.is_some());
|
|
tracing::info!("shared fut returned {}", a.unwrap())
|
|
}
|
|
};
|
|
let b = async move {
|
|
async_std::task::sleep(Duration::from_secs(20)).await;
|
|
tracing::info!("This is taking too long.");
|
|
};
|
|
executor.run(
|
|
async {
|
|
let res = futures_util::select! {
|
|
_ = a.fuse() => {},
|
|
_ = b.fuse() => {},
|
|
};
|
|
},
|
|
);
|
|
}
|