diff --git a/runtime/executor/Cargo.toml b/runtime/executor/Cargo.toml new file mode 100644 index 0000000..5c27a14 --- /dev/null +++ b/runtime/executor/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "executor" +version = "0.3.0" +publish = false +description = "Executor" +authors = [] +keywords = [] +categories = [] +readme = "README.md" +license = "Apache-2.0/MIT" +edition = "2021" +exclude = [ + "scripts/*", +] + +[dependencies] +lightproc = { path = "../lightproc" } + +crossbeam-utils = "0.8" +crossbeam-channel = "0.5" +crossbeam-epoch = "0.9" +crossbeam-deque = "0.8.1" +lazy_static = "1.4" +libc = "0.2" +num_cpus = "1.13" +pin-utils = "0.1.0" + +# Allocator +arrayvec = { version = "0.7.0" } +futures-timer = "3.0.2" +once_cell = "1.4.0" +lever = "0.1" +tracing = "0.1.19" +crossbeam-queue = "0.3.0" diff --git a/runtime/executor/README.md b/runtime/executor/README.md new file mode 100644 index 0000000..b2dc377 --- /dev/null +++ b/runtime/executor/README.md @@ -0,0 +1,94 @@ +# Bastion Executor + + + + + + + + + + + + + + + + + + + + + + + + + +
Latest Release + + Crates.io + +
License + + Crates.io + +
Build Status + + Build Status + +
Downloads + + Crates.io + +
Discord + + + +
+ +Bastion Executor is NUMA-aware SMP based Fault-tolerant Executor + +Bastion Executor is a highly-available, fault-tolerant, async communication +oriented executor. Bastion's main idea is supplying a fully async runtime +with fault-tolerance to work on heavy loads. + +Main differences between other executors are: +* Uses SMP based execution scheme to exploit cache affinity on multiple cores and execution is +equally distributed over the system resources, which means utilizing the all system. +* Uses NUMA-aware allocation for scheduler's queues and exploit locality on server workloads. +* Tailored for creating middleware and working with actor model like concurrency and distributed communication. + +**NOTE:** Bastion Executor is independent of it's framework implementation. +It uses [lightproc](https://docs.rs/lightproc) to encapsulate and provide fault-tolerance to your future based workloads. +You can use your futures with [lightproc](https://docs.rs/lightproc) to run your workloads on Bastion Executor without the need to have framework. + +## Example Usage + +```rust +use bastion_executor::prelude::*; +use lightproc::proc_stack::ProcStack; + +fn main() { + let pid = 1; + let stack = ProcStack::default() + .with_pid(pid) + .with_after_panic(move || println!("after panic {}", pid.clone())); + + let handle = spawn( + async { + panic!("test"); + }, + stack, + ); + + let pid = 2; + let stack = ProcStack::default().with_pid(pid); + + run( + async { + handle.await; + }, + stack.clone(), + ); +} +``` \ No newline at end of file diff --git a/runtime/executor/benches/blocking.rs b/runtime/executor/benches/blocking.rs new file mode 100644 index 0000000..6c5a6ff --- /dev/null +++ b/runtime/executor/benches/blocking.rs @@ -0,0 +1,67 @@ +#![feature(test)] + +extern crate test; + +use bastion_executor::blocking; +use lightproc::proc_stack::ProcStack; +use std::thread; +use std::time::Duration; +use test::Bencher; + +#[cfg(feature = "tokio-runtime")] +mod tokio_benchs { + use super::*; + #[bench] + fn blocking(b: &mut Bencher) { + tokio_test::block_on(async { _blocking(b) }); + } + #[bench] + fn blocking_single(b: &mut Bencher) { + tokio_test::block_on(async { + _blocking_single(b); + }); + } +} + +#[cfg(not(feature = "tokio-runtime"))] +mod no_tokio_benchs { + use super::*; + #[bench] + fn blocking(b: &mut Bencher) { + _blocking(b); + } + #[bench] + fn blocking_single(b: &mut Bencher) { + _blocking_single(b); + } +} + +// Benchmark for a 10K burst task spawn +fn _blocking(b: &mut Bencher) { + b.iter(|| { + (0..10_000) + .map(|_| { + blocking::spawn_blocking( + async { + let duration = Duration::from_millis(1); + thread::sleep(duration); + }, + ProcStack::default(), + ) + }) + .collect::>() + }); +} + +// Benchmark for a single blocking task spawn +fn _blocking_single(b: &mut Bencher) { + b.iter(|| { + blocking::spawn_blocking( + async { + let duration = Duration::from_millis(1); + thread::sleep(duration); + }, + ProcStack::default(), + ) + }); +} diff --git a/runtime/executor/benches/perf.rs b/runtime/executor/benches/perf.rs new file mode 100644 index 0000000..e8a588b --- /dev/null +++ b/runtime/executor/benches/perf.rs @@ -0,0 +1,25 @@ +#![feature(test)] + +extern crate test; + +use bastion_executor::prelude::*; +use lightproc::proc_stack::ProcStack; +use test::{black_box, Bencher}; + +#[bench] +fn increment(b: &mut Bencher) { + let mut sum = 0; + + b.iter(|| { + run( + async { + (0..10_000_000).for_each(|_| { + sum += 1; + }); + }, + ProcStack::default(), + ); + }); + + black_box(sum); +} diff --git a/runtime/executor/benches/run_blocking.rs b/runtime/executor/benches/run_blocking.rs new file mode 100644 index 0000000..43de440 --- /dev/null +++ b/runtime/executor/benches/run_blocking.rs @@ -0,0 +1,69 @@ +#![feature(test)] + +extern crate test; + +use bastion_executor::blocking; +use bastion_executor::run::run; +use futures::future::join_all; +use lightproc::proc_stack::ProcStack; +use std::thread; +use std::time::Duration; +use test::Bencher; + +#[cfg(feature = "tokio-runtime")] +mod tokio_benchs { + use super::*; + #[bench] + fn blocking(b: &mut Bencher) { + tokio_test::block_on(async { _blocking(b) }); + } + #[bench] + fn blocking_single(b: &mut Bencher) { + tokio_test::block_on(async { + _blocking_single(b); + }); + } +} + +#[cfg(not(feature = "tokio-runtime"))] +mod no_tokio_benchs { + use super::*; + #[bench] + fn blocking(b: &mut Bencher) { + _blocking(b); + } + #[bench] + fn blocking_single(b: &mut Bencher) { + _blocking_single(b); + } +} + +// Benchmark for a 10K burst task spawn +fn _blocking(b: &mut Bencher) { + b.iter(|| { + (0..10_000) + .map(|_| { + blocking::spawn_blocking( + async { + let duration = Duration::from_millis(1); + thread::sleep(duration); + }, + ProcStack::default(), + ) + }) + .collect::>() + }); +} + +// Benchmark for a single blocking task spawn +fn _blocking_single(b: &mut Bencher) { + b.iter(|| { + blocking::spawn_blocking( + async { + let duration = Duration::from_millis(1); + thread::sleep(duration); + }, + ProcStack::default(), + ) + }); +} diff --git a/runtime/executor/benches/spawn.rs b/runtime/executor/benches/spawn.rs new file mode 100644 index 0000000..02b896b --- /dev/null +++ b/runtime/executor/benches/spawn.rs @@ -0,0 +1,70 @@ +#![feature(test)] + +extern crate test; + +use bastion_executor::load_balancer; +use bastion_executor::prelude::spawn; +use futures_timer::Delay; +use lightproc::proc_stack::ProcStack; +use std::time::Duration; +use test::Bencher; + +#[cfg(feature = "tokio-runtime")] +mod tokio_benchs { + use super::*; + #[bench] + fn spawn_lot(b: &mut Bencher) { + tokio_test::block_on(async { _spawn_lot(b) }); + } + #[bench] + fn spawn_single(b: &mut Bencher) { + tokio_test::block_on(async { + _spawn_single(b); + }); + } +} + +#[cfg(not(feature = "tokio-runtime"))] +mod no_tokio_benchs { + use super::*; + #[bench] + fn spawn_lot(b: &mut Bencher) { + _spawn_lot(b); + } + #[bench] + fn spawn_single(b: &mut Bencher) { + _spawn_single(b); + } +} + +// Benchmark for a 10K burst task spawn +fn _spawn_lot(b: &mut Bencher) { + let proc_stack = ProcStack::default(); + b.iter(|| { + let _ = (0..10_000) + .map(|_| { + spawn( + async { + let duration = Duration::from_millis(1); + Delay::new(duration).await; + }, + proc_stack.clone(), + ) + }) + .collect::>(); + }); +} + +// Benchmark for a single task spawn +fn _spawn_single(b: &mut Bencher) { + let proc_stack = ProcStack::default(); + b.iter(|| { + spawn( + async { + let duration = Duration::from_millis(1); + Delay::new(duration).await; + }, + proc_stack.clone(), + ); + }); +} diff --git a/runtime/executor/benches/stats.rs b/runtime/executor/benches/stats.rs new file mode 100644 index 0000000..684e7cb --- /dev/null +++ b/runtime/executor/benches/stats.rs @@ -0,0 +1,71 @@ +#![feature(test)] + +extern crate test; +use bastion_executor::load_balancer::{core_count, get_cores, stats, SmpStats}; +use bastion_executor::placement; +use std::thread; +use test::Bencher; + +fn stress_stats(stats: &'static S) { + let mut handles = Vec::with_capacity(*core_count()); + for core in get_cores() { + let handle = thread::spawn(move || { + placement::set_for_current(*core); + for i in 0..100 { + stats.store_load(core.id, 10); + if i % 3 == 0 { + let _sorted_load = stats.get_sorted_load(); + } + } + }); + handles.push(handle); + } + + for handle in handles { + handle.join().unwrap(); + } +} + +// previous lock based stats benchmark 1,352,791 ns/iter (+/- 2,682,013) + +// 158,278 ns/iter (+/- 117,103) +#[bench] +fn lockless_stats_bench(b: &mut Bencher) { + b.iter(|| { + stress_stats(stats()); + }); +} + +#[bench] +fn lockless_stats_bad_load(b: &mut Bencher) { + let stats = stats(); + const MAX_CORE: usize = 256; + for i in 0..MAX_CORE { + // Generating the worst possible mergesort scenario + // [0,2,4,6,8,10,1,3,5,7,9]... + if i <= MAX_CORE / 2 { + stats.store_load(i, i * 2); + } else { + stats.store_load(i, i - 1 - MAX_CORE / 2); + } + } + + b.iter(|| { + let _sorted_load = stats.get_sorted_load(); + }); +} + +#[bench] +fn lockless_stats_good_load(b: &mut Bencher) { + let stats = stats(); + const MAX_CORE: usize = 256; + for i in 0..MAX_CORE { + // Generating the best possible mergesort scenario + // [0,1,2,3,4,5,6,7,8,9]... + stats.store_load(i, i); + } + + b.iter(|| { + let _sorted_load = stats.get_sorted_load(); + }); +} diff --git a/runtime/executor/examples/spawn_async.rs b/runtime/executor/examples/spawn_async.rs new file mode 100644 index 0000000..250f433 --- /dev/null +++ b/runtime/executor/examples/spawn_async.rs @@ -0,0 +1,42 @@ +use std::io::Write; +use std::panic::resume_unwind; +use std::time::Duration; +use executor::pool; +use executor::prelude::*; + +fn main() { + std::panic::set_hook(Box::new(|info| { + let tid = std::thread::current().id(); + println!("Panicking ThreadId: {:?}", tid); + std::io::stdout().flush(); + println!("panic hook: {:?}", info); + })); + let tid = std::thread::current().id(); + println!("Main ThreadId: {:?}", tid); + + let handle = spawn( + async { + panic!("test"); + }, + ); + + run( + async { + handle.await; + }, + ProcStack {}, + ); + + let pool = pool::get(); + let manager = pool::get_manager().unwrap(); + println!("After panic: {:?}", pool); + println!("{:#?}", manager); + + let h = std::thread::spawn(|| { + panic!("This is a test"); + }); + + std::thread::sleep(Duration::from_secs(30)); + + println!("After panic"); +} diff --git a/runtime/executor/scripts/test_blocking_thread_pool.sh b/runtime/executor/scripts/test_blocking_thread_pool.sh new file mode 100644 index 0000000..f1c28d8 --- /dev/null +++ b/runtime/executor/scripts/test_blocking_thread_pool.sh @@ -0,0 +1,5 @@ +#!/bin/zsh + +cargo test longhauling_task_join -- --ignored --exact --nocapture +cargo test slow_join_interrupted -- --ignored --exact --nocapture +cargo test slow_join -- --ignored --exact --nocapture \ No newline at end of file diff --git a/runtime/executor/src/blocking.rs b/runtime/executor/src/blocking.rs new file mode 100644 index 0000000..29e34ef --- /dev/null +++ b/runtime/executor/src/blocking.rs @@ -0,0 +1,165 @@ +//! +//! Pool of threads to run heavy processes +//! +//! We spawn futures onto the pool with [`spawn_blocking`] method of global run queue or +//! with corresponding [`Worker`]'s spawn method. +//! +//! [`Worker`]: crate::run_queue::Worker + +use crate::thread_manager::{DynamicPoolManager, DynamicRunner}; +use crossbeam_channel::{unbounded, Receiver, Sender}; +use lazy_static::lazy_static; +use lightproc::lightproc::LightProc; +use lightproc::recoverable_handle::RecoverableHandle; +use once_cell::sync::{Lazy, OnceCell}; +use std::future::Future; +use std::iter::Iterator; +use std::time::Duration; +use std::{env, thread}; +use tracing::trace; + +/// If low watermark isn't configured this is the default scaler value. +/// This value is used for the heuristics of the scaler +const DEFAULT_LOW_WATERMARK: u64 = 2; + +const THREAD_RECV_TIMEOUT: Duration = Duration::from_millis(100); + +/// Spawns a blocking task. +/// +/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks. +pub fn spawn_blocking(future: F) -> RecoverableHandle +where + F: Future + Send + 'static, + R: Send + 'static, +{ + let (task, handle) = LightProc::recoverable(future, schedule); + task.schedule(); + handle +} + +#[derive(Debug)] +struct BlockingRunner { + // We keep a handle to the tokio runtime here to make sure + // it will never be dropped while the DynamicPoolManager is alive, + // In case we need to spin up some threads. + #[cfg(feature = "tokio-runtime")] + runtime_handle: tokio::runtime::Handle, +} + +impl DynamicRunner for BlockingRunner { + fn run_static(&self, park_timeout: Duration) -> ! { + loop { + while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) { + trace!("static thread: running task"); + self.run(task); + } + + trace!("static: empty queue, parking with timeout"); + thread::park_timeout(park_timeout); + } + } + fn run_dynamic(&self, parker: impl Fn()) -> ! { + loop { + while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) { + trace!("dynamic thread: running task"); + self.run(task); + } + trace!( + "dynamic thread: parking - {:?}", + std::thread::current().id() + ); + parker(); + } + } + fn run_standalone(&self) { + while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) { + self.run(task); + } + trace!("standalone thread: quitting."); + } +} + +impl BlockingRunner { + fn run(&self, task: LightProc) { + #[cfg(feature = "tokio-runtime")] + { + self.runtime_handle.spawn_blocking(|| task.run()); + } + #[cfg(not(feature = "tokio-runtime"))] + { + task.run(); + } + } +} + +/// Pool interface between the scheduler and thread pool +struct Pool { + sender: Sender, + receiver: Receiver, +} + +static DYNAMIC_POOL_MANAGER: OnceCell> = OnceCell::new(); + +static POOL: Lazy = Lazy::new(|| { + #[cfg(feature = "tokio-runtime")] + { + let runner = BlockingRunner { + // We use current() here instead of try_current() + // because we want bastion to crash as soon as possible + // if there is no available runtime. + runtime_handle: tokio::runtime::Handle::current(), + }; + + DYNAMIC_POOL_MANAGER + .set(DynamicPoolManager::new(*low_watermark() as usize, runner)) + .expect("couldn't create dynamic pool manager"); + } + #[cfg(not(feature = "tokio-runtime"))] + { + let runner = BlockingRunner {}; + + DYNAMIC_POOL_MANAGER + .set(DynamicPoolManager::new(*low_watermark() as usize, runner)) + .expect("couldn't create dynamic pool manager"); + } + + DYNAMIC_POOL_MANAGER + .get() + .expect("couldn't get static pool manager") + .initialize(); + + let (sender, receiver) = unbounded(); + Pool { sender, receiver } +}); + +/// Enqueues work, attempting to send to the thread pool in a +/// nonblocking way and spinning up needed amount of threads +/// based on the previous statistics without relying on +/// if there is not a thread ready to accept the work or not. +fn schedule(t: LightProc) { + if let Err(err) = POOL.sender.try_send(t) { + // We were not able to send to the channel without + // blocking. + POOL.sender.send(err.into_inner()).unwrap(); + } + + // Add up for every incoming scheduled task + DYNAMIC_POOL_MANAGER.get().unwrap().increment_frequency(); +} + +/// +/// Low watermark value, defines the bare minimum of the pool. +/// Spawns initial thread set. +/// Can be configurable with env var `BASTION_BLOCKING_THREADS` at runtime. +#[inline] +fn low_watermark() -> &'static u64 { + lazy_static! { + static ref LOW_WATERMARK: u64 = { + env::var_os("BASTION_BLOCKING_THREADS") + .map(|x| x.to_str().unwrap().parse::().unwrap()) + .unwrap_or(DEFAULT_LOW_WATERMARK) + }; + } + + &*LOW_WATERMARK +} diff --git a/runtime/executor/src/lib.rs b/runtime/executor/src/lib.rs new file mode 100644 index 0000000..4e0a14e --- /dev/null +++ b/runtime/executor/src/lib.rs @@ -0,0 +1,50 @@ +//! +//! +//! +//! Bastion Executor is NUMA-aware SMP based Fault-tolerant Executor +//! +//! Bastion Executor is a highly-available, fault-tolerant, async communication +//! oriented executor. Bastion's main idea is supplying a fully async runtime +//! with fault-tolerance to work on heavy loads. +//! +//! Main differences between other executors are: +//! * Uses SMP based execution scheme to exploit cache affinity on multiple cores and execution is +//! equally distributed over the system resources, which means utilizing the all system. +//! * Uses NUMA-aware allocation for scheduler's queues and exploit locality on server workloads. +//! * Tailored for creating middleware and working with actor model like concurrency and distributed communication. +//! +//! **NOTE:** Bastion Executor is independent of it's framework implementation. +//! It uses [lightproc] to encapsulate and provide fault-tolerance to your future based workloads. +//! You can use your futures with [lightproc] to run your workloads on Bastion Executor without the need to have framework. +//! +//! [lightproc]: https://docs.rs/lightproc +//! + +#![doc( + html_logo_url = "https://raw.githubusercontent.com/bastion-rs/bastion/master/img/bastion-logo.png" +)] +// Force missing implementations +#![warn(missing_docs)] +#![warn(missing_debug_implementations)] +#![warn(unused_imports)] +#![forbid(unused_must_use)] +#![forbid(unused_import_braces)] + +pub mod blocking; +pub mod load_balancer; +pub mod placement; +pub mod pool; +pub mod run; +pub mod sleepers; +mod thread_manager; +pub mod worker; +mod proc_stack; + +/// +/// Prelude of Bastion Executor +pub mod prelude { + pub use crate::blocking::*; + pub use crate::pool::*; + pub use crate::run::*; + pub use crate::proc_stack::*; +} diff --git a/runtime/executor/src/load_balancer.rs b/runtime/executor/src/load_balancer.rs new file mode 100644 index 0000000..9b253f6 --- /dev/null +++ b/runtime/executor/src/load_balancer.rs @@ -0,0 +1,234 @@ +//! +//! Module for gathering statistics about the run queues of the runtime +//! +//! Load balancer calculates sampled mean to provide average process execution amount +//! to all runtime. +//! +use crate::load_balancer; +use crate::placement; +use arrayvec::ArrayVec; +use fmt::{Debug, Formatter}; +use lazy_static::*; +use once_cell::sync::Lazy; +use placement::CoreId; +use std::mem::MaybeUninit; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::RwLock; +use std::time::{Duration, Instant}; +use std::{fmt, usize}; +use tracing::{debug, error}; + +const MEAN_UPDATE_TRESHOLD: Duration = Duration::from_millis(200); + +/// Stats of all the smp queues. +pub trait SmpStats { + /// Stores the load of the given queue. + fn store_load(&self, affinity: usize, load: usize); + /// returns tuple of queue id and load ordered from highest load to lowest. + fn get_sorted_load(&self) -> ArrayVec<(usize, usize), MAX_CORE>; + /// mean of the all smp queue load. + fn mean(&self) -> usize; + /// update the smp mean. + fn update_mean(&self); +} + +static LOAD_BALANCER: Lazy = Lazy::new(|| { + let lb = LoadBalancer::new(placement::get_core_ids().unwrap()); + debug!("Instantiated load_balancer: {:?}", lb); + lb +}); + +/// Load-balancer struct which allows us to update the mean load +pub struct LoadBalancer { + /// The number of cores + /// available for this program + pub num_cores: usize, + /// The core Ids available for this program + /// This doesn't take affinity into account + pub cores: Vec, + mean_last_updated_at: RwLock, +} + +impl LoadBalancer { + /// Creates a new LoadBalancer. + /// if you're looking for `num_cores` and `cores` + /// Have a look at `load_balancer::core_count()` + /// and `load_balancer::get_cores()` respectively. + pub fn new(cores: Vec) -> Self { + Self { + num_cores: cores.len(), + cores, + mean_last_updated_at: RwLock::new(Instant::now()), + } + } +} + +impl Debug for LoadBalancer { + fn fmt(&self, fmt: &mut Formatter) -> fmt::Result { + fmt.debug_struct("LoadBalancer") + .field("num_cores", &self.num_cores) + .field("cores", &self.cores) + .field("mean_last_updated_at", &self.mean_last_updated_at) + .finish() + } +} + +impl LoadBalancer { + /// Iterates the statistics to get the mean load across the cores + pub fn update_load_mean(&self) { + // Check if update should occur + if !self.should_update() { + return; + } + self.mean_last_updated_at + .write() + .map(|mut last_updated_at| { + *last_updated_at = Instant::now(); + }) + .unwrap_or_else(|e| error!("couldn't update mean timestamp - {}", e)); + + load_balancer::stats().update_mean(); + } + + fn should_update(&self) -> bool { + // If we couldn't acquire a lock on the mean last_updated_at, + // There is probably someone else updating already + self.mean_last_updated_at + .try_read() + .map(|last_updated_at| last_updated_at.elapsed() > MEAN_UPDATE_TRESHOLD) + .unwrap_or(false) + } +} + +/// Update the mean load on the singleton +pub fn update() { + LOAD_BALANCER.update_load_mean() +} + +/// Maximum number of core supported by modern computers. +const MAX_CORE: usize = 256; + +/// +/// Holding all statistics related to the run queue +/// +/// Contains: +/// * Mean level of processes in the run queues +/// * SMP queue distributions +pub struct Stats { + smp_load: [AtomicUsize; MAX_CORE], + mean_level: AtomicUsize, + updating_mean: AtomicBool, +} + +impl fmt::Debug for Stats { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Stats") + .field("smp_load", &&self.smp_load[..]) + .field("mean_level", &self.mean_level) + .field("updating_mean", &self.updating_mean) + .finish() + } +} + +impl Stats { + /// new returns LockLessStats + pub fn new(num_cores: usize) -> Stats { + let smp_load: [AtomicUsize; MAX_CORE] = { + let mut data: [MaybeUninit; MAX_CORE] = + unsafe { MaybeUninit::uninit().assume_init() }; + + for core_data in data.iter_mut().take(num_cores) { + unsafe { + std::ptr::write(core_data.as_mut_ptr(), AtomicUsize::new(0)); + } + } + for core_data in data.iter_mut().take(MAX_CORE).skip(num_cores) { + unsafe { + std::ptr::write(core_data.as_mut_ptr(), AtomicUsize::new(usize::MAX)); + } + } + + unsafe { std::mem::transmute::<_, [AtomicUsize; MAX_CORE]>(data) } + }; + Stats { + smp_load, + mean_level: AtomicUsize::new(0), + updating_mean: AtomicBool::new(false), + } + } +} + +unsafe impl Sync for Stats {} +unsafe impl Send for Stats {} + +impl SmpStats for Stats { + fn store_load(&self, affinity: usize, load: usize) { + self.smp_load[affinity].store(load, Ordering::SeqCst); + } + + fn get_sorted_load(&self) -> ArrayVec<(usize, usize), MAX_CORE> { + let mut sorted_load = ArrayVec::new(); + + for (core, load) in self.smp_load.iter().enumerate() { + let load = load.load(Ordering::SeqCst); + // load till maximum core. + if load == usize::MAX { + break; + } + // unsafe is ok here because self.smp_load.len() is MAX_CORE + unsafe { sorted_load.push_unchecked((core, load)) }; + } + sorted_load.sort_by(|x, y| y.1.cmp(&x.1)); + sorted_load + } + + fn mean(&self) -> usize { + self.mean_level.load(Ordering::Acquire) + } + + fn update_mean(&self) { + // Don't update if it's updating already + if self.updating_mean.load(Ordering::Acquire) { + return; + } + + self.updating_mean.store(true, Ordering::Release); + let mut sum: usize = 0; + let num_cores = LOAD_BALANCER.num_cores; + + for item in self.smp_load.iter().take(num_cores) { + if let Some(tmp) = sum.checked_add(item.load(Ordering::Acquire)) { + sum = tmp; + } + } + + self.mean_level + .store(sum.wrapping_div(num_cores), Ordering::Release); + + self.updating_mean.store(false, Ordering::Release); + } +} + +/// +/// Static access to runtime statistics +#[inline] +pub fn stats() -> &'static Stats { + lazy_static! { + static ref LOCKLESS_STATS: Stats = Stats::new(*core_count()); + } + &*LOCKLESS_STATS +} + +/// +/// Retrieve core count for the runtime scheduling purposes +#[inline] +pub fn core_count() -> &'static usize { + &LOAD_BALANCER.num_cores +} + +/// +/// Retrieve cores for the runtime scheduling purposes +#[inline] +pub fn get_cores() -> &'static [CoreId] { + &*LOAD_BALANCER.cores +} diff --git a/runtime/executor/src/placement.rs b/runtime/executor/src/placement.rs new file mode 100644 index 0000000..273fdb1 --- /dev/null +++ b/runtime/executor/src/placement.rs @@ -0,0 +1,414 @@ +//! Core placement configuration and management +//! +//! Placement module enables thread placement onto the cores. +//! CPU level affinity assignment is done here. + +/// This function tries to retrieve information +/// on all the "cores" active on this system. +pub fn get_core_ids() -> Option> { + get_core_ids_helper() +} + +/// This function tries to retrieve +/// the number of active "cores" on the system. +pub fn get_num_cores() -> Option { + get_core_ids().map(|ids| ids.len()) +} +/// +/// Sets the current threads affinity +pub fn set_for_current(core_id: CoreId) { + tracing::trace!("Executor: placement: set affinity on core {}", core_id.id); + set_for_current_helper(core_id); +} + +/// +/// CoreID implementation to identify system cores. +#[derive(Copy, Clone, Debug)] +pub struct CoreId { + /// Used core ID + pub id: usize, +} + +// Linux Section + +#[cfg(target_os = "linux")] +#[inline] +fn get_core_ids_helper() -> Option> { + linux::get_core_ids() +} + +#[cfg(target_os = "linux")] +#[inline] +fn set_for_current_helper(core_id: CoreId) { + linux::set_for_current(core_id); +} + +#[cfg(target_os = "linux")] +mod linux { + use std::mem; + + use libc::{cpu_set_t, sched_getaffinity, sched_setaffinity, CPU_ISSET, CPU_SET, CPU_SETSIZE}; + + use super::CoreId; + + pub fn get_core_ids() -> Option> { + if let Some(full_set) = get_affinity_mask() { + let mut core_ids: Vec = Vec::new(); + + for i in 0..CPU_SETSIZE as usize { + if unsafe { CPU_ISSET(i, &full_set) } { + core_ids.push(CoreId { id: i }); + } + } + + Some(core_ids) + } else { + None + } + } + + pub fn set_for_current(core_id: CoreId) { + // Turn `core_id` into a `libc::cpu_set_t` with only + // one core active. + let mut set = new_cpu_set(); + + unsafe { CPU_SET(core_id.id, &mut set) }; + + // Set the current thread's core affinity. + unsafe { + sched_setaffinity( + 0, // Defaults to current thread + mem::size_of::(), + &set, + ); + } + } + + fn get_affinity_mask() -> Option { + let mut set = new_cpu_set(); + + // Try to get current core affinity mask. + let result = unsafe { + sched_getaffinity( + 0, // Defaults to current thread + mem::size_of::(), + &mut set, + ) + }; + + if result == 0 { + Some(set) + } else { + None + } + } + + fn new_cpu_set() -> cpu_set_t { + unsafe { mem::zeroed::() } + } + + #[cfg(test)] + mod tests { + + use super::*; + + #[test] + fn test_linux_get_affinity_mask() { + match get_affinity_mask() { + Some(_) => {} + None => { + panic!(); + } + } + } + + #[test] + fn test_linux_get_core_ids() { + match get_core_ids() { + Some(set) => { + assert_eq!(set.len(), num_cpus::get()); + } + None => { + panic!(); + } + } + } + + #[test] + fn test_linux_set_for_current() { + let ids = get_core_ids().unwrap(); + + assert!(!ids.is_empty()); + + set_for_current(ids[0]); + + // Ensure that the system pinned the current thread + // to the specified core. + let mut core_mask = new_cpu_set(); + unsafe { CPU_SET(ids[0].id, &mut core_mask) }; + + let new_mask = get_affinity_mask().unwrap(); + + let mut is_equal = true; + + for i in 0..CPU_SETSIZE as usize { + let is_set1 = unsafe { CPU_ISSET(i, &core_mask) }; + let is_set2 = unsafe { CPU_ISSET(i, &new_mask) }; + + if is_set1 != is_set2 { + is_equal = false; + } + } + + assert!(is_equal); + } + } +} + +// Windows Section + +#[cfg(target_os = "windows")] +#[inline] +fn get_core_ids_helper() -> Option> { + windows::get_core_ids() +} + +#[cfg(target_os = "windows")] +#[inline] +fn set_for_current_helper(core_id: CoreId) { + windows::set_for_current(core_id); +} + +#[cfg(target_os = "windows")] +extern crate winapi; + +#[cfg(target_os = "windows")] +mod windows { + #[allow(unused_imports)] + use winapi::shared::basetsd::{DWORD_PTR, PDWORD_PTR}; + use winapi::um::processthreadsapi::{GetCurrentProcess, GetCurrentThread}; + use winapi::um::winbase::{GetProcessAffinityMask, SetThreadAffinityMask}; + + use super::CoreId; + + pub fn get_core_ids() -> Option> { + if let Some(mask) = get_affinity_mask() { + // Find all active cores in the bitmask. + let mut core_ids: Vec = Vec::new(); + + for i in 0..usize::MIN.count_zeros() as usize { + let test_mask = 1 << i; + + if (mask & test_mask) == test_mask { + core_ids.push(CoreId { id: i }); + } + } + + Some(core_ids) + } else { + None + } + } + + pub fn set_for_current(core_id: CoreId) { + // Convert `CoreId` back into mask. + let mask: DWORD_PTR = 1 << core_id.id; + + // Set core affinity for current thread. + unsafe { + SetThreadAffinityMask(GetCurrentThread(), mask); + } + } + + fn get_affinity_mask() -> Option { + let mut process_mask: usize = 0; + let mut system_mask: usize = 0; + + let res = unsafe { + GetProcessAffinityMask( + GetCurrentProcess(), + &mut process_mask as PDWORD_PTR, + &mut system_mask as PDWORD_PTR, + ) + }; + + // Successfully retrieved affinity mask + if res != 0 { + Some(process_mask) + } + // Failed to retrieve affinity mask + else { + None + } + } + + #[cfg(test)] + mod tests { + use num_cpus; + + use super::*; + + #[test] + fn test_macos_get_core_ids() { + match get_core_ids() { + Some(set) => { + assert_eq!(set.len(), num_cpus::get()); + } + None => { + assert!(false); + } + } + } + + #[test] + fn test_macos_set_for_current() { + let ids = get_core_ids().unwrap(); + + assert!(ids.len() > 0); + + set_for_current(ids[0]); + } + } +} + +// MacOS Section + +#[cfg(target_os = "macos")] +#[inline] +fn get_core_ids_helper() -> Option> { + macos::get_core_ids() +} + +#[cfg(target_os = "macos")] +#[inline] +fn set_for_current_helper(core_id: CoreId) { + macos::set_for_current(core_id); +} + +#[cfg(target_os = "macos")] +mod macos { + use std::mem; + + use libc::{c_int, c_uint, pthread_self}; + + use super::CoreId; + + type KernReturnT = c_int; + type IntegerT = c_int; + type NaturalT = c_uint; + type ThreadT = c_uint; + type ThreadPolicyFlavorT = NaturalT; + type MachMsgTypeNumberT = NaturalT; + + #[repr(C)] + struct ThreadAffinityPolicyDataT { + affinity_tag: IntegerT, + } + + type ThreadPolicyT = *mut ThreadAffinityPolicyDataT; + + const THREAD_AFFINITY_POLICY: ThreadPolicyFlavorT = 4; + + #[link(name = "System", kind = "framework")] + extern "C" { + fn thread_policy_set( + thread: ThreadT, + flavor: ThreadPolicyFlavorT, + policy_info: ThreadPolicyT, + count: MachMsgTypeNumberT, + ) -> KernReturnT; + } + + pub fn get_core_ids() -> Option> { + Some( + (0..(num_cpus::get())) + .map(|n| CoreId { id: n as usize }) + .collect::>(), + ) + } + + pub fn set_for_current(core_id: CoreId) { + let thread_affinity_policy_count: MachMsgTypeNumberT = + mem::size_of::() as MachMsgTypeNumberT + / mem::size_of::() as MachMsgTypeNumberT; + + let mut info = ThreadAffinityPolicyDataT { + affinity_tag: core_id.id as IntegerT, + }; + + unsafe { + thread_policy_set( + pthread_self() as ThreadT, + THREAD_AFFINITY_POLICY, + &mut info as ThreadPolicyT, + thread_affinity_policy_count, + ); + } + } + + #[cfg(test)] + mod tests { + + use super::*; + + #[test] + fn test_windows_get_core_ids() { + match get_core_ids() { + Some(set) => { + assert_eq!(set.len(), num_cpus::get()); + } + None => { + panic!(); + } + } + } + + #[test] + fn test_windows_set_for_current() { + let ids = get_core_ids().unwrap(); + + assert!(ids.len() > 0); + + set_for_current(ids[0]); + } + } +} + +// Stub Section + +#[cfg(not(any(target_os = "linux", target_os = "windows", target_os = "macos")))] +#[inline] +fn get_core_ids_helper() -> Option> { + None +} + +#[cfg(not(any(target_os = "linux", target_os = "windows", target_os = "macos")))] +#[inline] +fn set_for_current_helper(core_id: CoreId) {} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn test_get_core_ids() { + match get_core_ids() { + Some(set) => { + assert_eq!(set.len(), num_cpus::get()); + } + None => { + panic!(); + } + } + } + + #[test] + fn test_set_for_current() { + let ids = get_core_ids().unwrap(); + + assert!(!ids.is_empty()); + + set_for_current(ids[0]); + } +} diff --git a/runtime/executor/src/pool.rs b/runtime/executor/src/pool.rs new file mode 100644 index 0000000..98ab569 --- /dev/null +++ b/runtime/executor/src/pool.rs @@ -0,0 +1,223 @@ +//! +//! Pool of threads to run lightweight processes +//! +//! We spawn futures onto the pool with [`spawn`] method of global run queue or +//! with corresponding [`Worker`]'s spawn method. +//! +//! [`spawn`]: crate::pool::spawn +//! [`Worker`]: crate::run_queue::Worker + +use crate::thread_manager::{DynamicPoolManager, DynamicRunner}; +use crate::worker; +use crossbeam_channel::{unbounded, Receiver, Sender}; +use lazy_static::lazy_static; +use lightproc::lightproc::LightProc; +use lightproc::recoverable_handle::RecoverableHandle; +use once_cell::sync::{Lazy, OnceCell}; +use std::future::Future; +use std::iter::Iterator; +use std::time::Duration; +use std::{env, thread}; +use tracing::trace; + +/// +/// Spawn a process (which contains future + process stack) onto the executor from the global level. +/// +/// # Example +/// ```rust +/// use executor::prelude::*; +/// +/// # #[cfg(feature = "tokio-runtime")] +/// # #[tokio::main] +/// # async fn main() { +/// # start(); +/// # } +/// # +/// # #[cfg(not(feature = "tokio-runtime"))] +/// # fn main() { +/// # start(); +/// # } +/// # +/// # fn start() { +/// +/// let handle = spawn( +/// async { +/// panic!("test"); +/// }, +/// ); +/// +/// run( +/// async { +/// handle.await; +/// }, +/// ProcStack { }, +/// ); +/// # } +/// ``` +pub fn spawn(future: F) -> RecoverableHandle +where + F: Future + Send + 'static, + R: Send + 'static, +{ + let (task, handle) = LightProc::recoverable(future, worker::schedule); + task.schedule(); + handle +} + +/// Spawns a blocking task. +/// +/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks. +pub fn spawn_blocking(future: F) -> RecoverableHandle +where + F: Future + Send + 'static, + R: Send + 'static, +{ + let (task, handle) = LightProc::recoverable(future, schedule); + task.schedule(); + handle +} + +/// +/// Acquire the static Pool reference +#[inline] +pub fn get() -> &'static Pool { + &*POOL +} + +pub fn get_manager() -> Option<&'static DynamicPoolManager> { + DYNAMIC_POOL_MANAGER.get() +} + +impl Pool { + /// + /// Spawn a process (which contains future + process stack) onto the executor via [Pool] interface. + pub fn spawn(&self, future: F) -> RecoverableHandle + where + F: Future + Send + 'static, + R: Send + 'static, + { + let (task, handle) = LightProc::recoverable(future, worker::schedule); + task.schedule(); + handle + } +} + +/// Enqueues work, attempting to send to the thread pool in a +/// nonblocking way and spinning up needed amount of threads +/// based on the previous statistics without relying on +/// if there is not a thread ready to accept the work or not. +pub(crate) fn schedule(t: LightProc) { + if let Err(err) = POOL.sender.try_send(t) { + // We were not able to send to the channel without + // blocking. + POOL.sender.send(err.into_inner()).unwrap(); + } + // Add up for every incoming scheduled task + DYNAMIC_POOL_MANAGER.get().unwrap().increment_frequency(); +} + +/// +/// Low watermark value, defines the bare minimum of the pool. +/// Spawns initial thread set. +/// Can be configurable with env var `BASTION_BLOCKING_THREADS` at runtime. +#[inline] +fn low_watermark() -> &'static u64 { + lazy_static! { + static ref LOW_WATERMARK: u64 = { + env::var_os("BASTION_BLOCKING_THREADS") + .map(|x| x.to_str().unwrap().parse::().unwrap()) + .unwrap_or(DEFAULT_LOW_WATERMARK) + }; + } + + &*LOW_WATERMARK +} + +/// If low watermark isn't configured this is the default scaler value. +/// This value is used for the heuristics of the scaler +const DEFAULT_LOW_WATERMARK: u64 = 2; + +/// Pool interface between the scheduler and thread pool +#[derive(Debug)] +pub struct Pool { + sender: Sender, + receiver: Receiver, +} + +#[derive(Debug)] +pub struct AsyncRunner { + +} + +impl DynamicRunner for AsyncRunner { + fn run_static(&self, park_timeout: Duration) -> ! { + loop { + for task in &POOL.receiver { + trace!("static: running task"); + self.run(task); + } + + trace!("static: empty queue, parking with timeout"); + thread::park_timeout(park_timeout); + } + } + fn run_dynamic(&self, parker: impl Fn()) -> ! { + loop { + while let Ok(task) = POOL.receiver.try_recv() { + trace!("dynamic thread: running task"); + self.run(task); + } + trace!( + "dynamic thread: parking - {:?}", + std::thread::current().id() + ); + parker(); + } + } + fn run_standalone(&self) { + while let Ok(task) = POOL.receiver.try_recv() { + self.run(task); + } + trace!("standalone thread: quitting."); + } +} + +impl AsyncRunner { + fn run(&self, task: LightProc) { + task.run(); + } +} + +static DYNAMIC_POOL_MANAGER: OnceCell> = OnceCell::new(); + +static POOL: Lazy = Lazy::new(|| { + #[cfg(feature = "tokio-runtime")] + { + let runner = AsyncRunner { + // We use current() here instead of try_current() + // because we want bastion to crash as soon as possible + // if there is no available runtime. + runtime_handle: tokio::runtime::Handle::current(), + }; + + DYNAMIC_POOL_MANAGER + .set(DynamicPoolManager::new(*low_watermark() as usize, runner)) + .expect("couldn't create dynamic pool manager"); + } + #[cfg(not(feature = "tokio-runtime"))] + { + let runner = AsyncRunner {}; + + DYNAMIC_POOL_MANAGER + .set(DynamicPoolManager::new(*low_watermark() as usize, runner)) + .expect("couldn't create dynamic pool manager"); + } + + DYNAMIC_POOL_MANAGER + .get() + .expect("couldn't get static pool manager") + .initialize(); + + let (sender, receiver) = unbounded(); + Pool { sender, receiver } +}); diff --git a/runtime/executor/src/proc_stack.rs b/runtime/executor/src/proc_stack.rs new file mode 100644 index 0000000..05b612c --- /dev/null +++ b/runtime/executor/src/proc_stack.rs @@ -0,0 +1,5 @@ + +#[derive(Debug)] +pub struct ProcStack { + +} diff --git a/runtime/executor/src/run.rs b/runtime/executor/src/run.rs new file mode 100644 index 0000000..0f23bcd --- /dev/null +++ b/runtime/executor/src/run.rs @@ -0,0 +1,154 @@ +//! +//! Blocking run of the async processes +//! +//! +use crate::worker; +use crossbeam_utils::sync::{Parker, Unparker}; +use std::cell::Cell; +use std::future::Future; +use std::mem; +use std::mem::{ManuallyDrop, MaybeUninit}; +use std::pin::Pin; +use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; +use crate::proc_stack::ProcStack; + +/// +/// This method blocks the current thread until passed future is resolved with an output. +/// +/// It is called `block_on` or `blocking` in some executors. +/// +/// # Example +/// ```rust +/// use executor::prelude::*; +/// use lightproc::prelude::*; +/// let mut sum = 0; +/// +/// run( +/// async { +/// (0..10_000_000).for_each(|_| { +/// sum += 1; +/// }); +/// }, +/// ProcStack::default(), +/// ); +/// ``` +pub fn run(future: F, stack: ProcStack) -> T +where + F: Future, +{ + unsafe { + // An explicitly uninitialized `T`. Until `assume_init` is called this will not call any + // drop code for T + let mut out = MaybeUninit::uninit(); + + // Wrap the future into one that stores the result into `out`. + let future = { + let out = out.as_mut_ptr(); + + async move { + *out = future.await; + } + }; + + // Pin the future onto the stack. + pin_utils::pin_mut!(future); + + // Extend the lifetime of the future to 'static. + let future = mem::transmute::< + Pin<&'_ mut dyn Future>, + Pin<&'static mut dyn Future>, + >(future); + + // Block on the future and and wait for it to complete. + worker::set_stack(&stack, || block(future)); + + // Assume that if the future completed and didn't panic it fully initialized its output + out.assume_init() + } +} + +fn block(f: F) -> T +where + F: Future, +{ + thread_local! { + // May hold a pre-allocated parker that can be reused for efficiency. + // + // Note that each invocation of `block` needs its own parker. In particular, if `block` + // recursively calls itself, we must make sure that each recursive call uses a distinct + // parker instance. + static CACHE: Cell> = Cell::new(None); + } + + pin_utils::pin_mut!(f); + + CACHE.with(|cache| { + // Reuse a cached parker or create a new one for this invocation of `block`. + let parker: Parker = cache.take().unwrap_or_else(|| Parker::new()); + + let ptr = Unparker::into_raw(parker.unparker().clone()); + let vt = vtable(); + + // Waker must not be dropped until it's no longer required. We also happen to know that a + // Parker contains at least one reference to `Unparker` so the relevant `Unparker` will not + // be dropped at least until the `Parker` is. + let waker = unsafe { Waker::from_raw(RawWaker::new(ptr, vt)) }; + let cx = &mut Context::from_waker(&waker); + + loop { + if let Poll::Ready(t) = f.as_mut().poll(cx) { + // Save the parker for the next invocation of `block`. + cache.set(Some(parker)); + return t; + } + parker.park(); + } + }) +} + +fn vtable() -> &'static RawWakerVTable { + /// This function will be called when the RawWaker gets cloned, e.g. when the Waker in which + /// the RawWaker is stored gets cloned. + // + /// The implementation of this function must retain all resources that are required for this + /// additional instance of a RawWaker and associated task. Calling wake on the resulting + /// RawWaker should result in a wakeup of the same task that would have been awoken by the + /// original RawWaker. + unsafe fn clone_raw(ptr: *const ()) -> RawWaker { + // [`Unparker`] implements `Clone` and upholds the contract stated above. The current + // Implementation is simply an Arc over the actual inner values. + let unparker = Unparker::from_raw(ptr).clone(); + RawWaker::new(Unparker::into_raw(unparker), vtable()) + } + + /// This function will be called when wake is called on the Waker. It must wake up the task + /// associated with this RawWaker. + /// + /// The implementation of this function must make sure to release any resources that are + /// associated with this instance of a RawWaker and associated task. + unsafe fn wake_raw(ptr: *const ()) { + // We reconstruct the Unparker from the pointer here thus ensuring it is dropped at the + // end of this function call. + Unparker::from_raw(ptr).unpark(); + } + + /// This function will be called when wake_by_ref is called on the Waker. It must wake up the + /// task associated with this RawWaker. + /// + /// This function is similar to wake, but must not consume the provided data pointer. + unsafe fn wake_by_ref_raw(ptr: *const ()) { + // We **must not** drop the resulting Unparker so we wrap it in `ManuallyDrop`. + let unparker = ManuallyDrop::new(Unparker::from_raw(ptr)); + unparker.unpark(); + } + + /// This function gets called when a RawWaker gets dropped. + /// + /// The implementation of this function must make sure to release any resources that are + /// associated with this instance of a RawWaker and associated task. + unsafe fn drop_raw(ptr: *const ()) { + drop(Unparker::from_raw(ptr)) + } + + &RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw) +} diff --git a/runtime/executor/src/sleepers.rs b/runtime/executor/src/sleepers.rs new file mode 100644 index 0000000..04811bb --- /dev/null +++ b/runtime/executor/src/sleepers.rs @@ -0,0 +1,67 @@ +//! +//! Where workers went to parking while no workload is in their worker queue. +//! +//! If a workload received pool will wake them up. +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Condvar, Mutex}; + +/// The place where worker threads go to sleep. +/// +/// Similar to how thread parking works, if a notification comes up while no threads are sleeping, +/// the next thread that attempts to go to sleep will pick up the notification immediately. +#[derive(Debug)] +#[allow(clippy::mutex_atomic)] +pub struct Sleepers { + /// How many threads are currently a sleep. + sleep: Mutex, + + /// A condvar for notifying sleeping threads. + wake: Condvar, + + /// Set to `true` if a notification came up while nobody was sleeping. + notified: AtomicBool, +} + +#[allow(clippy::mutex_atomic)] +impl Default for Sleepers { + /// Creates a new `Sleepers`. + fn default() -> Self { + Self { + sleep: Mutex::new(0), + wake: Condvar::new(), + notified: AtomicBool::new(false), + } + } +} + +#[allow(clippy::mutex_atomic)] +impl Sleepers { + /// Creates a new `Sleepers`. + pub fn new() -> Self { + Self::default() + } + + /// Puts the current thread to sleep. + pub fn wait(&self) { + let mut sleep = self.sleep.lock().unwrap(); + + if !self.notified.swap(false, Ordering::SeqCst) { + *sleep += 1; + std::mem::drop(self.wake.wait(sleep).unwrap()); + } + } + + /// Notifies one thread. + pub fn notify_one(&self) { + if !self.notified.load(Ordering::SeqCst) { + let mut sleep = self.sleep.lock().unwrap(); + + if *sleep > 0 { + *sleep -= 1; + self.wake.notify_one(); + } else { + self.notified.store(true, Ordering::SeqCst); + } + } + } +} diff --git a/runtime/executor/src/thread_manager.rs b/runtime/executor/src/thread_manager.rs new file mode 100644 index 0000000..ba76d2a --- /dev/null +++ b/runtime/executor/src/thread_manager.rs @@ -0,0 +1,404 @@ +//! A thread manager to predict how many threads should be spawned to handle the upcoming load. +//! +//! The thread manager consists of three elements: +//! * Frequency Detector +//! * Trend Estimator +//! * Predictive Upscaler +//! +//! ## Frequency Detector +//! Detects how many tasks are submitted from scheduler to thread pool in a given time frame. +//! Pool manager thread does this sampling every 90 milliseconds. +//! This value is going to be used for trend estimation phase. +//! +//! ## Trend Estimator +//! Hold up to the given number of frequencies to create an estimation. +//! Trend estimator holds 10 frequencies at a time. +//! This value is stored as constant in [FREQUENCY_QUEUE_SIZE](constant.FREQUENCY_QUEUE_SIZE.html). +//! Estimation algorithm and prediction uses Exponentially Weighted Moving Average algorithm. +//! +//! This algorithm is adapted from [A Novel Predictive and Self–Adaptive Dynamic Thread Pool Management](https://doi.org/10.1109/ISPA.2011.61) +//! and altered to: +//! * use instead of heavy calculation of trend, utilize thread redundancy which is the sum of the differences between the predicted and observed value. +//! * use instead of linear trend estimation, it uses exponential trend estimation where formula is: +//! ```text +//! LOW_WATERMARK * (predicted - observed) + LOW_WATERMARK +//! ``` +//! *NOTE:* If this algorithm wants to be tweaked increasing [LOW_WATERMARK](constant.LOW_WATERMARK.html) will automatically adapt the additional dynamic thread spawn count +//! * operate without watermarking by timestamps (in paper which is used to measure algorithms own performance during the execution) +//! * operate extensive subsampling. Extensive subsampling congests the pool manager thread. +//! * operate without keeping track of idle time of threads or job out queue like TEMA and FOPS implementations. +//! +//! ## Predictive Upscaler +//! Upscaler has three cases (also can be seen in paper): +//! * The rate slightly increases and there are many idle threads. +//! * The number of worker threads tends to be reduced since the workload of the system is descending. +//! * The system has no request or stalled. (Our case here is when the current tasks block further tasks from being processed – throughput hogs) +//! +//! For the first two EMA calculation and exponential trend estimation gives good performance. +//! For the last case, upscaler selects upscaling amount by amount of tasks mapped when throughput hogs happen. +//! +//! **example scenario:** Let's say we have 10_000 tasks where every one of them is blocking for 1 second. Scheduler will map plenty of tasks but will get rejected. +//! This makes estimation calculation nearly 0 for both entering and exiting parts. When this happens and we still see tasks mapped from scheduler. +//! We start to slowly increase threads by amount of frequency linearly. High increase of this value either make us hit to the thread threshold on +//! some OS or make congestion on the other thread utilizations of the program, because of context switch. +//! +//! Throughput hogs determined by a combination of job in / job out frequency and current scheduler task assignment frequency. +//! Threshold of EMA difference is eluded by machine epsilon for floating point arithmetic errors. + +use crate::{load_balancer, placement}; +use core::fmt; +use crossbeam_queue::ArrayQueue; +use fmt::{Debug, Formatter}; +use lazy_static::lazy_static; +use lever::prelude::TTas; +use placement::CoreId; +use std::collections::{HashMap, VecDeque}; +use std::time::Duration; +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + Mutex, + }, + thread::{self, Thread}, +}; +use std::any::Any; +use std::panic::resume_unwind; +use std::thread::{JoinHandle, ThreadId}; +use crossbeam_deque::Worker; +use crossbeam_utils::sync::{Parker, Unparker}; +use tracing::{debug, trace}; +use lightproc::lightproc::LightProc; + +/// The default thread park timeout before checking for new tasks. +const THREAD_PARK_TIMEOUT: Duration = Duration::from_millis(1); + +/// Frequency histogram's sliding window size. +/// Defines how many frequencies will be considered for adaptation. +const FREQUENCY_QUEUE_SIZE: usize = 10; + +/// If low watermark isn't configured this is the default scaler value. +/// This value is used for the heuristics of the scaler +const DEFAULT_LOW_WATERMARK: u64 = 2; + +/// Pool scaler interval time (milliseconds). +/// This is the actual interval which makes adaptation calculation. +const SCALER_POLL_INTERVAL: u64 = 90; + +/// Exponential moving average smoothing coefficient for limited window. +/// Smoothing factor is estimated with: 2 / (N + 1) where N is sample size. +const EMA_COEFFICIENT: f64 = 2_f64 / (FREQUENCY_QUEUE_SIZE as f64 + 1_f64); + +lazy_static! { + static ref ROUND_ROBIN_PIN: Mutex = Mutex::new(CoreId { id: 0 }); +} + +/// The `DynamicRunner` is piloted by `DynamicPoolManager`. +/// Upon request it needs to be able to provide runner routines for: +/// * Static threads. +/// * Dynamic threads. +/// * Standalone threads. +/// +/// Your implementation of `DynamicRunner` +/// will allow you to define what tasks must be accomplished. +/// +/// Run static threads: +/// +/// run_static should never return, and park for park_timeout instead. +/// +/// Run dynamic threads: +/// run_dynamic should never return, and call `parker()` when it has no more tasks to process. +/// It will be unparked automatically by the `DynamicPoolManager` if needs be. +/// +/// Run standalone threads: +/// run_standalone should return once it has no more tasks to process. +/// The `DynamicPoolManager` will spawn other standalone threads if needs be. +pub trait DynamicRunner { + fn run_static(&self, park_timeout: Duration) -> ! { + let parker = Parker::new(); + self.run_dynamic(|| parker.park_timeout(park_timeout)); + } + fn run_dynamic(&self, parker: impl Fn()) -> !; + fn run_standalone(&self); +} + +/// The `DynamicPoolManager` is responsible for +/// growing and shrinking a pool according to EMA rules. +/// +/// It needs to be passed a structure that implements `DynamicRunner`, +/// That will be responsible for actually spawning threads. +/// +/// The `DynamicPoolManager` keeps track of the number +/// of required number of threads to process load correctly. +/// and depending on the current state it will case it will: +/// - Spawn a lot of threads (we're predicting a load spike, and we need to prepare for it) +/// - Spawn few threads (there's a constant load, and throughput is low because the current resources are busy) +/// - Do nothing (the load is shrinking, threads will automatically stop once they're done). +/// +/// Kinds of threads: +/// +/// ## Static threads: +/// Defined in the constructor, they will always be available. They park for `THREAD_PARK_TIMEOUT` on idle. +/// +/// ## Dynamic threads: +/// Created during `DynamicPoolManager` initialization, they will park on idle. +/// The `DynamicPoolManager` grows the number of Dynamic threads +/// so the total number of Static threads + Dynamic threads +/// is the number of available cores on the machine. (`num_cpus::get()`) +/// +/// ## Standalone threads: +/// They are created when there aren't enough static and dynamic threads to process the expected load. +/// They will be destroyed on idle. +/// +/// ## Spawn order: +/// In order to handle a growing load, the pool manager will ask to: +/// - Use Static threads +/// - Unpark Dynamic threads +/// - Spawn Standalone threads +/// +/// The pool manager is not responsible for the tasks to be performed by the threads, it's handled by the `DynamicRunner` +/// +/// If you use tracing, you can have a look at the trace! logs generated by the structure. +/// +pub struct DynamicPoolManager { + static_threads: usize, + dynamic_threads: usize, + parked_threads: ArrayQueue, + runner: Runner, + last_frequency: AtomicU64, + frequencies: TTas>, +} + +impl Debug for DynamicPoolManager { + fn fmt(&self, fmt: &mut Formatter) -> fmt::Result { + fmt.debug_struct("DynamicPoolManager") + .field("static_threads", &self.static_threads) + .field("dynamic_threads", &self.dynamic_threads) + .field("parked_threads", &self.parked_threads.len()) + .field("runner", &self.runner) + .field("last_frequency", &self.last_frequency) + .field("frequencies", &self.frequencies.try_lock()) + .finish() + } +} + +impl DynamicPoolManager { + pub fn new(static_threads: usize, runner: Runner) -> Self { + let dynamic_threads = 1.max(num_cpus::get().checked_sub(static_threads).unwrap_or(0)); + + Self { + static_threads, + dynamic_threads, + parked_threads: ArrayQueue::new(dynamic_threads), + runner, + last_frequency: AtomicU64::new(0), + frequencies: TTas::new(VecDeque::with_capacity( + FREQUENCY_QUEUE_SIZE.saturating_add(1), + )), + } + } + + pub fn increment_frequency(&self) { + self.last_frequency.fetch_add(1, Ordering::Acquire); + } + + /// Initialize the dynamic pool + /// That will be scaled + pub fn initialize(&'static self) { + // Static thread manager that will always be available + trace!("spooling up {} static worker threads", self.static_threads); + (0..self.static_threads).for_each(|n| { + let runner = &self.runner; + thread::Builder::new() + .name(format!("static #{}", n)) + .spawn(move || { + Self::affinity_pinner(); + runner.run_static(THREAD_PARK_TIMEOUT); + }) + .expect("failed to spawn static worker thread"); + }); + + // Dynamic thread manager that will allow us to unpark threads when needed + trace!("spooling up {} dynamic worker threads", self.dynamic_threads); + (0..self.dynamic_threads).for_each(|n| { + let runner = &self.runner; + thread::Builder::new() + .name(format!("dynamic #{}", n)) + .spawn(move || { + Self::affinity_pinner(); + let parker = Parker::new(); + let unparker = parker.unparker(); + runner.run_dynamic(|| self.park_thread(&parker, unparker)); + }) + .expect("failed to spawn dynamic worker thread"); + }); + + // Pool manager to check frequency of task rates + // and take action by scaling the pool accordingly. + thread::Builder::new() + .name("pool manager".to_string()) + .spawn(move || { + let poll_interval = Duration::from_millis(SCALER_POLL_INTERVAL); + trace!("setting up the pool manager"); + loop { + self.scale_pool(); + thread::park_timeout(poll_interval); + } + }) + .expect("failed to spawn pool manager thread"); + } + + /// Provision threads takes a number of threads that need to be made available. + /// It will try to unpark threads from the dynamic pool, and spawn more threads if needs be. + pub fn provision_threads(&'static self, n: usize) { + for i in 0..n { + if !self.unpark_thread() { + let new_threads = n - i; + trace!( + "no more threads to unpark, spawning {} new threads", + new_threads + ); + return self.spawn_threads(new_threads); + } + } + } + + fn spawn_threads(&'static self, n: usize) { + (0..n).for_each(|_| { + let runner = &self.runner; + thread::Builder::new() + .name("standalone worker".to_string()) + .spawn(move || { + Self::affinity_pinner(); + runner.run_standalone(); + }) + .unwrap(); + }) + } + + /// Parks a thread until [`unpark_thread`] unparks it + pub fn park_thread(&self, parker: &Parker, unparker: &Unparker) { + if let Err(unparker) = self.parked_threads + // Unparker is an Arc internally so this is (comparatively) cheap to do. + .push(unparker.clone()) { + panic!("Failed to park with {:?}", unparker); + } + + trace!("parking thread {:?}", std::thread::current().id()); + parker.park(); + } + + /// Pops a thread from the parked_threads queue and unparks it. + /// + /// Returns true if there were threads to unpark + fn unpark_thread(&self) -> bool { + trace!("parked_threads: len is {}", self.parked_threads.len()); + if let Some(unparker) = self.parked_threads.pop() { + debug!("Unparking thread with {:?}", &unparker); + unparker.unpark(); + true + } else { + false + } + } + + /// Affinity pinner for blocking pool + /// + /// Pinning isn't going to be enabled for single core systems. + #[inline] + fn affinity_pinner() { + if 1 != *load_balancer::core_count() { + let mut core = ROUND_ROBIN_PIN.lock().unwrap(); + placement::set_for_current(*core); + core.id = (core.id + 1) % *load_balancer::core_count(); + } + } + + /// Exponentially Weighted Moving Average calculation + /// + /// This allows us to find the EMA value. + /// This value represents the trend of tasks mapped onto the thread pool. + /// Calculation is following: + /// ```text + /// +--------+-----------------+----------------------------------+ + /// | Symbol | Identifier | Explanation | + /// +--------+-----------------+----------------------------------+ + /// | α | EMA_COEFFICIENT | smoothing factor between 0 and 1 | + /// | Yt | freq | frequency sample at time t | + /// | St | acc | EMA at time t | + /// +--------+-----------------+----------------------------------+ + /// ``` + /// Under these definitions formula is following: + /// ```text + /// EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St + /// ``` + /// # Arguments + /// + /// * `freq_queue` - Sliding window of frequency samples + #[inline] + fn calculate_ema(freq_queue: &VecDeque) -> f64 { + freq_queue.iter().enumerate().fold(0_f64, |acc, (i, freq)| { + acc + ((*freq as f64) * ((1_f64 - EMA_COEFFICIENT).powf(i as f64) as f64)) + }) * EMA_COEFFICIENT as f64 + } + + /// Adaptive pool scaling function + /// + /// This allows to spawn new threads to make room for incoming task pressure. + /// Works in the background detached from the pool system and scales up the pool based + /// on the request rate. + /// + /// It uses frequency based calculation to define work. Utilizing average processing rate. + fn scale_pool(&'static self) { + // Fetch current frequency, it does matter that operations are ordered in this approach. + let current_frequency = self.last_frequency.swap(0, Ordering::SeqCst); + let mut freq_queue = self.frequencies.lock(); + + // Make it safe to start for calculations by adding initial frequency scale + if freq_queue.len() == 0 { + freq_queue.push_back(0); + } + + // Calculate message rate for the given time window + let frequency = (current_frequency as f64 / SCALER_POLL_INTERVAL as f64) as u64; + + // Calculates current time window's EMA value (including last sample) + let prev_ema_frequency = Self::calculate_ema(&freq_queue); + + // Add seen frequency data to the frequency histogram. + freq_queue.push_back(frequency); + if freq_queue.len() == FREQUENCY_QUEUE_SIZE.saturating_add(1) { + freq_queue.pop_front(); + } + + // Calculates current time window's EMA value (including last sample) + let curr_ema_frequency = Self::calculate_ema(&freq_queue); + + // Adapts the thread count of pool + // + // Sliding window of frequencies visited by the pool manager. + // Pool manager creates EMA value for previous window and current window. + // Compare them to determine scaling amount based on the trends. + // If current EMA value is bigger, we will scale up. + if curr_ema_frequency > prev_ema_frequency { + // "Scale by" amount can be seen as "how much load is coming". + // "Scale" amount is "how many threads we should spawn". + let scale_by: f64 = curr_ema_frequency - prev_ema_frequency; + let scale = num_cpus::get().min( + ((DEFAULT_LOW_WATERMARK as f64 * scale_by) + DEFAULT_LOW_WATERMARK as f64) as usize, + ); + trace!("unparking {} threads", scale); + + // It is time to scale the pool! + self.provision_threads(scale); + } else if (curr_ema_frequency - prev_ema_frequency).abs() < f64::EPSILON + && current_frequency != 0 + { + // Throughput is low. Allocate more threads to unblock flow. + // If we fall to this case, scheduler is congested by longhauling tasks. + // For unblock the flow we should add up some threads to the pool, but not that many to + // stagger the program's operation. + trace!("unparking {} threads", DEFAULT_LOW_WATERMARK); + self.provision_threads(DEFAULT_LOW_WATERMARK as usize); + } + } +} diff --git a/runtime/executor/src/worker.rs b/runtime/executor/src/worker.rs new file mode 100644 index 0000000..b5f24ef --- /dev/null +++ b/runtime/executor/src/worker.rs @@ -0,0 +1,93 @@ +//! +//! SMP parallelism based cache affine worker implementation +//! +//! This worker implementation relies on worker run queue statistics which are hold in the pinned global memory +//! where workload distribution calculated and amended to their own local queues. + +use crate::pool; + +use lightproc::prelude::*; +use std::cell::Cell; +use std::ptr; +use std::time::Duration; +use crossbeam_deque::{Stealer, Worker}; +use crate::proc_stack::ProcStack; + +/// The timeout we'll use when parking before an other Steal attempt +pub const THREAD_PARK_TIMEOUT: Duration = Duration::from_millis(1); + +thread_local! { + static STACK: Cell<*const ProcStack> = Cell::new(ptr::null_mut()); +} + +/// +/// Set the current process's stack during the run of the future. +pub(crate) fn set_stack(stack: *const ProcStack, f: F) -> R +where + F: FnOnce() -> R, +{ + struct ResetStack<'a>(&'a Cell<*const ProcStack>); + + impl Drop for ResetStack<'_> { + fn drop(&mut self) { + self.0.set(ptr::null()); + } + } + + STACK.with(|st| { + st.set(stack); + // create a guard to reset STACK even if the future panics. This is important since we + // must not drop the pointed-to ProcStack here in any case. + let _guard = ResetStack(st); + + f() + }) +} + +/* +pub(crate) fn get_proc_stack(f: F) -> Option +where + F: FnOnce(&ProcStack) -> R, +{ + let res = STACK.try_with(|st| unsafe { st.get().as_ref().map(f) }); + + match res { + Ok(Some(val)) => Some(val), + Ok(None) | Err(_) => None, + } +} + +/// +/// Get the stack currently in use for this thread +pub fn current() -> ProcStack { + get_proc_stack(|proc| proc.clone()) + .expect("`proc::current()` called outside the context of the proc") +} + */ + +pub(crate) fn schedule(proc: LightProc) { + pool::schedule(proc) +} + +/// A worker thread running futures locally and stealing work from other workers if it runs empty. +pub struct WorkerThread { + queue: Worker, +} + +impl WorkerThread { + pub fn new() -> Self { + Self { + queue: Worker::new_fifo(), + } + } + + pub fn stealer(&self) -> Stealer { + self.queue.stealer() + } + + pub fn tick(&self) { + if let Some(lightproc) = self.queue.pop() { + lightproc.run() + } + } +} \ No newline at end of file diff --git a/runtime/executor/tests/lib.rs b/runtime/executor/tests/lib.rs new file mode 100644 index 0000000..416c571 --- /dev/null +++ b/runtime/executor/tests/lib.rs @@ -0,0 +1,26 @@ +#[cfg(test)] +mod tests { + use bastion_executor::{placement, pool}; + + #[test] + fn affinity_replacement() { + let core_ids = placement::get_core_ids().unwrap(); + dbg!(core_ids); + } + + #[cfg(feature = "tokio-runtime")] + mod tokio_tests { + #[tokio::test] + async fn pool_check() { + super::pool::get(); + } + } + + #[cfg(not(feature = "tokio-runtime"))] + mod no_tokio_tests { + #[test] + fn pool_check() { + super::pool::get(); + } + } +} diff --git a/runtime/executor/tests/run_blocking.rs b/runtime/executor/tests/run_blocking.rs new file mode 100644 index 0000000..6f79295 --- /dev/null +++ b/runtime/executor/tests/run_blocking.rs @@ -0,0 +1,38 @@ +use bastion_executor::blocking; +use bastion_executor::run::run; +use lightproc::proc_stack::ProcStack; +use std::thread; +use std::time::Duration; + +#[cfg(feature = "tokio-runtime")] +mod tokio_tests { + #[tokio::test] + async fn test_run_blocking() { + super::run_test() + } +} + +#[cfg(not(feature = "tokio-runtime"))] +mod no_tokio_tests { + #[test] + fn test_run_blocking() { + super::run_test() + } +} + +fn run_test() { + let output = run( + blocking::spawn_blocking( + async { + let duration = Duration::from_millis(1); + thread::sleep(duration); + 42 + }, + ProcStack::default(), + ), + ProcStack::default(), + ) + .unwrap(); + + assert_eq!(42, output); +} diff --git a/runtime/executor/tests/thread_pool.rs b/runtime/executor/tests/thread_pool.rs new file mode 100644 index 0000000..8673cda --- /dev/null +++ b/runtime/executor/tests/thread_pool.rs @@ -0,0 +1,155 @@ +use bastion_executor::blocking; +use bastion_executor::run::run; +use futures::future::join_all; +use lightproc::proc_stack::ProcStack; +use lightproc::recoverable_handle::RecoverableHandle; +use std::thread; +use std::time::Duration; +use std::time::Instant; + +// Test for slow joins without task bursts during joins. +#[test] +#[ignore] +fn slow_join() { + let thread_join_time_max = 11_000; + let start = Instant::now(); + + // Send an initial batch of million bursts. + let handles = (0..1_000_000) + .map(|_| { + blocking::spawn_blocking( + async { + let duration = Duration::from_millis(1); + thread::sleep(duration); + }, + ProcStack::default(), + ) + }) + .collect::>>(); + + run(join_all(handles), ProcStack::default()); + + // Let them join to see how it behaves under different workloads. + let duration = Duration::from_millis(thread_join_time_max); + thread::sleep(duration); + + // Spawn yet another batch of work on top of it + let handles = (0..10_000) + .map(|_| { + blocking::spawn_blocking( + async { + let duration = Duration::from_millis(100); + thread::sleep(duration); + }, + ProcStack::default(), + ) + }) + .collect::>>(); + + run(join_all(handles), ProcStack::default()); + + // Slow joins shouldn't cause internal slow down + let elapsed = start.elapsed().as_millis() - thread_join_time_max as u128; + println!("Slow task join. Monotonic exec time: {:?} ns", elapsed); + + // Previous implementation is around this threshold. +} + +// Test for slow joins with task burst. +#[test] +#[ignore] +fn slow_join_interrupted() { + let thread_join_time_max = 2_000; + let start = Instant::now(); + + // Send an initial batch of million bursts. + let handles = (0..1_000_000) + .map(|_| { + blocking::spawn_blocking( + async { + let duration = Duration::from_millis(1); + thread::sleep(duration); + }, + ProcStack::default(), + ) + }) + .collect::>>(); + + run(join_all(handles), ProcStack::default()); + + // Let them join to see how it behaves under different workloads. + // This time join under the time window. + let duration = Duration::from_millis(thread_join_time_max); + thread::sleep(duration); + + // Spawn yet another batch of work on top of it + let handles = (0..10_000) + .map(|_| { + blocking::spawn_blocking( + async { + let duration = Duration::from_millis(100); + thread::sleep(duration); + }, + ProcStack::default(), + ) + }) + .collect::>>(); + + run(join_all(handles), ProcStack::default()); + + // Slow joins shouldn't cause internal slow down + let elapsed = start.elapsed().as_millis() - thread_join_time_max as u128; + println!("Slow task join. Monotonic exec time: {:?} ns", elapsed); + + // Previous implementation is around this threshold. +} + +// This test is expensive but it proves that longhauling tasks are working in adaptive thread pool. +// Thread pool which spawns on-demand will panic with this test. +#[test] +#[ignore] +fn longhauling_task_join() { + let thread_join_time_max = 11_000; + let start = Instant::now(); + + // First batch of overhauling tasks + let _ = (0..100_000) + .map(|_| { + blocking::spawn_blocking( + async { + let duration = Duration::from_millis(1000); + thread::sleep(duration); + }, + ProcStack::default(), + ) + }) + .collect::>>(); + + // Let them join to see how it behaves under different workloads. + let duration = Duration::from_millis(thread_join_time_max); + thread::sleep(duration); + + // Send yet another medium sized batch to see how it scales. + let handles = (0..10_000) + .map(|_| { + blocking::spawn_blocking( + async { + let duration = Duration::from_millis(100); + thread::sleep(duration); + }, + ProcStack::default(), + ) + }) + .collect::>>(); + + run(join_all(handles), ProcStack::default()); + + // Slow joins shouldn't cause internal slow down + let elapsed = start.elapsed().as_millis() - thread_join_time_max as u128; + println!( + "Long-hauling task join. Monotonic exec time: {:?} ns", + elapsed + ); + + // Previous implementation will panic when this test is running. +}