Executor compiles

This commit is contained in:
Nadja Reitzenstein 2021-11-14 17:51:48 +01:00
parent 24be65b3d9
commit 55d6609e33
22 changed files with 2505 additions and 0 deletions

View File

@ -0,0 +1,34 @@
[package]
name = "executor"
version = "0.3.0"
publish = false
description = "Executor"
authors = []
keywords = []
categories = []
readme = "README.md"
license = "Apache-2.0/MIT"
edition = "2021"
exclude = [
"scripts/*",
]
[dependencies]
lightproc = { path = "../lightproc" }
crossbeam-utils = "0.8"
crossbeam-channel = "0.5"
crossbeam-epoch = "0.9"
crossbeam-deque = "0.8.1"
lazy_static = "1.4"
libc = "0.2"
num_cpus = "1.13"
pin-utils = "0.1.0"
# Allocator
arrayvec = { version = "0.7.0" }
futures-timer = "3.0.2"
once_cell = "1.4.0"
lever = "0.1"
tracing = "0.1.19"
crossbeam-queue = "0.3.0"

View File

@ -0,0 +1,94 @@
# Bastion Executor
<table align=left style='float: left; margin: 4px 10px 0px 0px; border: 1px solid #000000;'>
<tr>
<td>Latest Release</td>
<td>
<a href="https://crates.io/crates/bastion">
<img alt="Crates.io" src="https://img.shields.io/crates/v/bastion-executor.svg?style=popout-square">
</a>
</td>
</tr>
<tr>
<td></td>
</tr>
<tr>
<td>License</td>
<td>
<a href="https://github.com/bastion-rs/bastion/blob/master/LICENSE">
<img alt="Crates.io" src="https://img.shields.io/crates/l/bastion.svg?style=popout-square">
</a>
</td>
</tr>
<tr>
<td>Build Status</td>
<td>
<a href="https://actions-badge.atrox.dev/bastion-rs/bastion/goto">
<img alt="Build Status" src="https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fbastion-rs%2Fbastion%2Fbadge&style=flat" />
</a>
</td>
</tr>
<tr>
<td>Downloads</td>
<td>
<a href="https://crates.io/crates/bastion-executor">
<img alt="Crates.io" src="https://img.shields.io/crates/d/bastion-executor.svg?style=popout-square">
</a>
</td>
</tr>
<tr>
<td>Discord</td>
<td>
<a href="https://discord.gg/DqRqtRT">
<img src="https://img.shields.io/discord/628383521450360842.svg?logo=discord" />
</a>
</td>
</tr>
</table>
Bastion Executor is NUMA-aware SMP based Fault-tolerant Executor
Bastion Executor is a highly-available, fault-tolerant, async communication
oriented executor. Bastion's main idea is supplying a fully async runtime
with fault-tolerance to work on heavy loads.
Main differences between other executors are:
* Uses SMP based execution scheme to exploit cache affinity on multiple cores and execution is
equally distributed over the system resources, which means utilizing the all system.
* Uses NUMA-aware allocation for scheduler's queues and exploit locality on server workloads.
* Tailored for creating middleware and working with actor model like concurrency and distributed communication.
**NOTE:** Bastion Executor is independent of it's framework implementation.
It uses [lightproc](https://docs.rs/lightproc) to encapsulate and provide fault-tolerance to your future based workloads.
You can use your futures with [lightproc](https://docs.rs/lightproc) to run your workloads on Bastion Executor without the need to have framework.
## Example Usage
```rust
use bastion_executor::prelude::*;
use lightproc::proc_stack::ProcStack;
fn main() {
let pid = 1;
let stack = ProcStack::default()
.with_pid(pid)
.with_after_panic(move || println!("after panic {}", pid.clone()));
let handle = spawn(
async {
panic!("test");
},
stack,
);
let pid = 2;
let stack = ProcStack::default().with_pid(pid);
run(
async {
handle.await;
},
stack.clone(),
);
}
```

View File

@ -0,0 +1,67 @@
#![feature(test)]
extern crate test;
use bastion_executor::blocking;
use lightproc::proc_stack::ProcStack;
use std::thread;
use std::time::Duration;
use test::Bencher;
#[cfg(feature = "tokio-runtime")]
mod tokio_benchs {
use super::*;
#[bench]
fn blocking(b: &mut Bencher) {
tokio_test::block_on(async { _blocking(b) });
}
#[bench]
fn blocking_single(b: &mut Bencher) {
tokio_test::block_on(async {
_blocking_single(b);
});
}
}
#[cfg(not(feature = "tokio-runtime"))]
mod no_tokio_benchs {
use super::*;
#[bench]
fn blocking(b: &mut Bencher) {
_blocking(b);
}
#[bench]
fn blocking_single(b: &mut Bencher) {
_blocking_single(b);
}
}
// Benchmark for a 10K burst task spawn
fn _blocking(b: &mut Bencher) {
b.iter(|| {
(0..10_000)
.map(|_| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
},
ProcStack::default(),
)
})
.collect::<Vec<_>>()
});
}
// Benchmark for a single blocking task spawn
fn _blocking_single(b: &mut Bencher) {
b.iter(|| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
},
ProcStack::default(),
)
});
}

View File

@ -0,0 +1,25 @@
#![feature(test)]
extern crate test;
use bastion_executor::prelude::*;
use lightproc::proc_stack::ProcStack;
use test::{black_box, Bencher};
#[bench]
fn increment(b: &mut Bencher) {
let mut sum = 0;
b.iter(|| {
run(
async {
(0..10_000_000).for_each(|_| {
sum += 1;
});
},
ProcStack::default(),
);
});
black_box(sum);
}

View File

@ -0,0 +1,69 @@
#![feature(test)]
extern crate test;
use bastion_executor::blocking;
use bastion_executor::run::run;
use futures::future::join_all;
use lightproc::proc_stack::ProcStack;
use std::thread;
use std::time::Duration;
use test::Bencher;
#[cfg(feature = "tokio-runtime")]
mod tokio_benchs {
use super::*;
#[bench]
fn blocking(b: &mut Bencher) {
tokio_test::block_on(async { _blocking(b) });
}
#[bench]
fn blocking_single(b: &mut Bencher) {
tokio_test::block_on(async {
_blocking_single(b);
});
}
}
#[cfg(not(feature = "tokio-runtime"))]
mod no_tokio_benchs {
use super::*;
#[bench]
fn blocking(b: &mut Bencher) {
_blocking(b);
}
#[bench]
fn blocking_single(b: &mut Bencher) {
_blocking_single(b);
}
}
// Benchmark for a 10K burst task spawn
fn _blocking(b: &mut Bencher) {
b.iter(|| {
(0..10_000)
.map(|_| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
},
ProcStack::default(),
)
})
.collect::<Vec<_>>()
});
}
// Benchmark for a single blocking task spawn
fn _blocking_single(b: &mut Bencher) {
b.iter(|| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
},
ProcStack::default(),
)
});
}

View File

@ -0,0 +1,70 @@
#![feature(test)]
extern crate test;
use bastion_executor::load_balancer;
use bastion_executor::prelude::spawn;
use futures_timer::Delay;
use lightproc::proc_stack::ProcStack;
use std::time::Duration;
use test::Bencher;
#[cfg(feature = "tokio-runtime")]
mod tokio_benchs {
use super::*;
#[bench]
fn spawn_lot(b: &mut Bencher) {
tokio_test::block_on(async { _spawn_lot(b) });
}
#[bench]
fn spawn_single(b: &mut Bencher) {
tokio_test::block_on(async {
_spawn_single(b);
});
}
}
#[cfg(not(feature = "tokio-runtime"))]
mod no_tokio_benchs {
use super::*;
#[bench]
fn spawn_lot(b: &mut Bencher) {
_spawn_lot(b);
}
#[bench]
fn spawn_single(b: &mut Bencher) {
_spawn_single(b);
}
}
// Benchmark for a 10K burst task spawn
fn _spawn_lot(b: &mut Bencher) {
let proc_stack = ProcStack::default();
b.iter(|| {
let _ = (0..10_000)
.map(|_| {
spawn(
async {
let duration = Duration::from_millis(1);
Delay::new(duration).await;
},
proc_stack.clone(),
)
})
.collect::<Vec<_>>();
});
}
// Benchmark for a single task spawn
fn _spawn_single(b: &mut Bencher) {
let proc_stack = ProcStack::default();
b.iter(|| {
spawn(
async {
let duration = Duration::from_millis(1);
Delay::new(duration).await;
},
proc_stack.clone(),
);
});
}

View File

@ -0,0 +1,71 @@
#![feature(test)]
extern crate test;
use bastion_executor::load_balancer::{core_count, get_cores, stats, SmpStats};
use bastion_executor::placement;
use std::thread;
use test::Bencher;
fn stress_stats<S: SmpStats + Sync + Send>(stats: &'static S) {
let mut handles = Vec::with_capacity(*core_count());
for core in get_cores() {
let handle = thread::spawn(move || {
placement::set_for_current(*core);
for i in 0..100 {
stats.store_load(core.id, 10);
if i % 3 == 0 {
let _sorted_load = stats.get_sorted_load();
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
// previous lock based stats benchmark 1,352,791 ns/iter (+/- 2,682,013)
// 158,278 ns/iter (+/- 117,103)
#[bench]
fn lockless_stats_bench(b: &mut Bencher) {
b.iter(|| {
stress_stats(stats());
});
}
#[bench]
fn lockless_stats_bad_load(b: &mut Bencher) {
let stats = stats();
const MAX_CORE: usize = 256;
for i in 0..MAX_CORE {
// Generating the worst possible mergesort scenario
// [0,2,4,6,8,10,1,3,5,7,9]...
if i <= MAX_CORE / 2 {
stats.store_load(i, i * 2);
} else {
stats.store_load(i, i - 1 - MAX_CORE / 2);
}
}
b.iter(|| {
let _sorted_load = stats.get_sorted_load();
});
}
#[bench]
fn lockless_stats_good_load(b: &mut Bencher) {
let stats = stats();
const MAX_CORE: usize = 256;
for i in 0..MAX_CORE {
// Generating the best possible mergesort scenario
// [0,1,2,3,4,5,6,7,8,9]...
stats.store_load(i, i);
}
b.iter(|| {
let _sorted_load = stats.get_sorted_load();
});
}

View File

@ -0,0 +1,42 @@
use std::io::Write;
use std::panic::resume_unwind;
use std::time::Duration;
use executor::pool;
use executor::prelude::*;
fn main() {
std::panic::set_hook(Box::new(|info| {
let tid = std::thread::current().id();
println!("Panicking ThreadId: {:?}", tid);
std::io::stdout().flush();
println!("panic hook: {:?}", info);
}));
let tid = std::thread::current().id();
println!("Main ThreadId: {:?}", tid);
let handle = spawn(
async {
panic!("test");
},
);
run(
async {
handle.await;
},
ProcStack {},
);
let pool = pool::get();
let manager = pool::get_manager().unwrap();
println!("After panic: {:?}", pool);
println!("{:#?}", manager);
let h = std::thread::spawn(|| {
panic!("This is a test");
});
std::thread::sleep(Duration::from_secs(30));
println!("After panic");
}

View File

@ -0,0 +1,5 @@
#!/bin/zsh
cargo test longhauling_task_join -- --ignored --exact --nocapture
cargo test slow_join_interrupted -- --ignored --exact --nocapture
cargo test slow_join -- --ignored --exact --nocapture

View File

@ -0,0 +1,165 @@
//!
//! Pool of threads to run heavy processes
//!
//! We spawn futures onto the pool with [`spawn_blocking`] method of global run queue or
//! with corresponding [`Worker`]'s spawn method.
//!
//! [`Worker`]: crate::run_queue::Worker
use crate::thread_manager::{DynamicPoolManager, DynamicRunner};
use crossbeam_channel::{unbounded, Receiver, Sender};
use lazy_static::lazy_static;
use lightproc::lightproc::LightProc;
use lightproc::recoverable_handle::RecoverableHandle;
use once_cell::sync::{Lazy, OnceCell};
use std::future::Future;
use std::iter::Iterator;
use std::time::Duration;
use std::{env, thread};
use tracing::trace;
/// If low watermark isn't configured this is the default scaler value.
/// This value is used for the heuristics of the scaler
const DEFAULT_LOW_WATERMARK: u64 = 2;
const THREAD_RECV_TIMEOUT: Duration = Duration::from_millis(100);
/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
pub fn spawn_blocking<F, R>(future: F) -> RecoverableHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (task, handle) = LightProc::recoverable(future, schedule);
task.schedule();
handle
}
#[derive(Debug)]
struct BlockingRunner {
// We keep a handle to the tokio runtime here to make sure
// it will never be dropped while the DynamicPoolManager is alive,
// In case we need to spin up some threads.
#[cfg(feature = "tokio-runtime")]
runtime_handle: tokio::runtime::Handle,
}
impl DynamicRunner for BlockingRunner {
fn run_static(&self, park_timeout: Duration) -> ! {
loop {
while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) {
trace!("static thread: running task");
self.run(task);
}
trace!("static: empty queue, parking with timeout");
thread::park_timeout(park_timeout);
}
}
fn run_dynamic(&self, parker: impl Fn()) -> ! {
loop {
while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) {
trace!("dynamic thread: running task");
self.run(task);
}
trace!(
"dynamic thread: parking - {:?}",
std::thread::current().id()
);
parker();
}
}
fn run_standalone(&self) {
while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) {
self.run(task);
}
trace!("standalone thread: quitting.");
}
}
impl BlockingRunner {
fn run(&self, task: LightProc) {
#[cfg(feature = "tokio-runtime")]
{
self.runtime_handle.spawn_blocking(|| task.run());
}
#[cfg(not(feature = "tokio-runtime"))]
{
task.run();
}
}
}
/// Pool interface between the scheduler and thread pool
struct Pool {
sender: Sender<LightProc>,
receiver: Receiver<LightProc>,
}
static DYNAMIC_POOL_MANAGER: OnceCell<DynamicPoolManager<BlockingRunner>> = OnceCell::new();
static POOL: Lazy<Pool> = Lazy::new(|| {
#[cfg(feature = "tokio-runtime")]
{
let runner = BlockingRunner {
// We use current() here instead of try_current()
// because we want bastion to crash as soon as possible
// if there is no available runtime.
runtime_handle: tokio::runtime::Handle::current(),
};
DYNAMIC_POOL_MANAGER
.set(DynamicPoolManager::new(*low_watermark() as usize, runner))
.expect("couldn't create dynamic pool manager");
}
#[cfg(not(feature = "tokio-runtime"))]
{
let runner = BlockingRunner {};
DYNAMIC_POOL_MANAGER
.set(DynamicPoolManager::new(*low_watermark() as usize, runner))
.expect("couldn't create dynamic pool manager");
}
DYNAMIC_POOL_MANAGER
.get()
.expect("couldn't get static pool manager")
.initialize();
let (sender, receiver) = unbounded();
Pool { sender, receiver }
});
/// Enqueues work, attempting to send to the thread pool in a
/// nonblocking way and spinning up needed amount of threads
/// based on the previous statistics without relying on
/// if there is not a thread ready to accept the work or not.
fn schedule(t: LightProc) {
if let Err(err) = POOL.sender.try_send(t) {
// We were not able to send to the channel without
// blocking.
POOL.sender.send(err.into_inner()).unwrap();
}
// Add up for every incoming scheduled task
DYNAMIC_POOL_MANAGER.get().unwrap().increment_frequency();
}
///
/// Low watermark value, defines the bare minimum of the pool.
/// Spawns initial thread set.
/// Can be configurable with env var `BASTION_BLOCKING_THREADS` at runtime.
#[inline]
fn low_watermark() -> &'static u64 {
lazy_static! {
static ref LOW_WATERMARK: u64 = {
env::var_os("BASTION_BLOCKING_THREADS")
.map(|x| x.to_str().unwrap().parse::<u64>().unwrap())
.unwrap_or(DEFAULT_LOW_WATERMARK)
};
}
&*LOW_WATERMARK
}

View File

@ -0,0 +1,50 @@
//!
//!
//!
//! Bastion Executor is NUMA-aware SMP based Fault-tolerant Executor
//!
//! Bastion Executor is a highly-available, fault-tolerant, async communication
//! oriented executor. Bastion's main idea is supplying a fully async runtime
//! with fault-tolerance to work on heavy loads.
//!
//! Main differences between other executors are:
//! * Uses SMP based execution scheme to exploit cache affinity on multiple cores and execution is
//! equally distributed over the system resources, which means utilizing the all system.
//! * Uses NUMA-aware allocation for scheduler's queues and exploit locality on server workloads.
//! * Tailored for creating middleware and working with actor model like concurrency and distributed communication.
//!
//! **NOTE:** Bastion Executor is independent of it's framework implementation.
//! It uses [lightproc] to encapsulate and provide fault-tolerance to your future based workloads.
//! You can use your futures with [lightproc] to run your workloads on Bastion Executor without the need to have framework.
//!
//! [lightproc]: https://docs.rs/lightproc
//!
#![doc(
html_logo_url = "https://raw.githubusercontent.com/bastion-rs/bastion/master/img/bastion-logo.png"
)]
// Force missing implementations
#![warn(missing_docs)]
#![warn(missing_debug_implementations)]
#![warn(unused_imports)]
#![forbid(unused_must_use)]
#![forbid(unused_import_braces)]
pub mod blocking;
pub mod load_balancer;
pub mod placement;
pub mod pool;
pub mod run;
pub mod sleepers;
mod thread_manager;
pub mod worker;
mod proc_stack;
///
/// Prelude of Bastion Executor
pub mod prelude {
pub use crate::blocking::*;
pub use crate::pool::*;
pub use crate::run::*;
pub use crate::proc_stack::*;
}

View File

@ -0,0 +1,234 @@
//!
//! Module for gathering statistics about the run queues of the runtime
//!
//! Load balancer calculates sampled mean to provide average process execution amount
//! to all runtime.
//!
use crate::load_balancer;
use crate::placement;
use arrayvec::ArrayVec;
use fmt::{Debug, Formatter};
use lazy_static::*;
use once_cell::sync::Lazy;
use placement::CoreId;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::RwLock;
use std::time::{Duration, Instant};
use std::{fmt, usize};
use tracing::{debug, error};
const MEAN_UPDATE_TRESHOLD: Duration = Duration::from_millis(200);
/// Stats of all the smp queues.
pub trait SmpStats {
/// Stores the load of the given queue.
fn store_load(&self, affinity: usize, load: usize);
/// returns tuple of queue id and load ordered from highest load to lowest.
fn get_sorted_load(&self) -> ArrayVec<(usize, usize), MAX_CORE>;
/// mean of the all smp queue load.
fn mean(&self) -> usize;
/// update the smp mean.
fn update_mean(&self);
}
static LOAD_BALANCER: Lazy<LoadBalancer> = Lazy::new(|| {
let lb = LoadBalancer::new(placement::get_core_ids().unwrap());
debug!("Instantiated load_balancer: {:?}", lb);
lb
});
/// Load-balancer struct which allows us to update the mean load
pub struct LoadBalancer {
/// The number of cores
/// available for this program
pub num_cores: usize,
/// The core Ids available for this program
/// This doesn't take affinity into account
pub cores: Vec<CoreId>,
mean_last_updated_at: RwLock<Instant>,
}
impl LoadBalancer {
/// Creates a new LoadBalancer.
/// if you're looking for `num_cores` and `cores`
/// Have a look at `load_balancer::core_count()`
/// and `load_balancer::get_cores()` respectively.
pub fn new(cores: Vec<CoreId>) -> Self {
Self {
num_cores: cores.len(),
cores,
mean_last_updated_at: RwLock::new(Instant::now()),
}
}
}
impl Debug for LoadBalancer {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
fmt.debug_struct("LoadBalancer")
.field("num_cores", &self.num_cores)
.field("cores", &self.cores)
.field("mean_last_updated_at", &self.mean_last_updated_at)
.finish()
}
}
impl LoadBalancer {
/// Iterates the statistics to get the mean load across the cores
pub fn update_load_mean(&self) {
// Check if update should occur
if !self.should_update() {
return;
}
self.mean_last_updated_at
.write()
.map(|mut last_updated_at| {
*last_updated_at = Instant::now();
})
.unwrap_or_else(|e| error!("couldn't update mean timestamp - {}", e));
load_balancer::stats().update_mean();
}
fn should_update(&self) -> bool {
// If we couldn't acquire a lock on the mean last_updated_at,
// There is probably someone else updating already
self.mean_last_updated_at
.try_read()
.map(|last_updated_at| last_updated_at.elapsed() > MEAN_UPDATE_TRESHOLD)
.unwrap_or(false)
}
}
/// Update the mean load on the singleton
pub fn update() {
LOAD_BALANCER.update_load_mean()
}
/// Maximum number of core supported by modern computers.
const MAX_CORE: usize = 256;
///
/// Holding all statistics related to the run queue
///
/// Contains:
/// * Mean level of processes in the run queues
/// * SMP queue distributions
pub struct Stats {
smp_load: [AtomicUsize; MAX_CORE],
mean_level: AtomicUsize,
updating_mean: AtomicBool,
}
impl fmt::Debug for Stats {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Stats")
.field("smp_load", &&self.smp_load[..])
.field("mean_level", &self.mean_level)
.field("updating_mean", &self.updating_mean)
.finish()
}
}
impl Stats {
/// new returns LockLessStats
pub fn new(num_cores: usize) -> Stats {
let smp_load: [AtomicUsize; MAX_CORE] = {
let mut data: [MaybeUninit<AtomicUsize>; MAX_CORE] =
unsafe { MaybeUninit::uninit().assume_init() };
for core_data in data.iter_mut().take(num_cores) {
unsafe {
std::ptr::write(core_data.as_mut_ptr(), AtomicUsize::new(0));
}
}
for core_data in data.iter_mut().take(MAX_CORE).skip(num_cores) {
unsafe {
std::ptr::write(core_data.as_mut_ptr(), AtomicUsize::new(usize::MAX));
}
}
unsafe { std::mem::transmute::<_, [AtomicUsize; MAX_CORE]>(data) }
};
Stats {
smp_load,
mean_level: AtomicUsize::new(0),
updating_mean: AtomicBool::new(false),
}
}
}
unsafe impl Sync for Stats {}
unsafe impl Send for Stats {}
impl SmpStats for Stats {
fn store_load(&self, affinity: usize, load: usize) {
self.smp_load[affinity].store(load, Ordering::SeqCst);
}
fn get_sorted_load(&self) -> ArrayVec<(usize, usize), MAX_CORE> {
let mut sorted_load = ArrayVec::new();
for (core, load) in self.smp_load.iter().enumerate() {
let load = load.load(Ordering::SeqCst);
// load till maximum core.
if load == usize::MAX {
break;
}
// unsafe is ok here because self.smp_load.len() is MAX_CORE
unsafe { sorted_load.push_unchecked((core, load)) };
}
sorted_load.sort_by(|x, y| y.1.cmp(&x.1));
sorted_load
}
fn mean(&self) -> usize {
self.mean_level.load(Ordering::Acquire)
}
fn update_mean(&self) {
// Don't update if it's updating already
if self.updating_mean.load(Ordering::Acquire) {
return;
}
self.updating_mean.store(true, Ordering::Release);
let mut sum: usize = 0;
let num_cores = LOAD_BALANCER.num_cores;
for item in self.smp_load.iter().take(num_cores) {
if let Some(tmp) = sum.checked_add(item.load(Ordering::Acquire)) {
sum = tmp;
}
}
self.mean_level
.store(sum.wrapping_div(num_cores), Ordering::Release);
self.updating_mean.store(false, Ordering::Release);
}
}
///
/// Static access to runtime statistics
#[inline]
pub fn stats() -> &'static Stats {
lazy_static! {
static ref LOCKLESS_STATS: Stats = Stats::new(*core_count());
}
&*LOCKLESS_STATS
}
///
/// Retrieve core count for the runtime scheduling purposes
#[inline]
pub fn core_count() -> &'static usize {
&LOAD_BALANCER.num_cores
}
///
/// Retrieve cores for the runtime scheduling purposes
#[inline]
pub fn get_cores() -> &'static [CoreId] {
&*LOAD_BALANCER.cores
}

View File

@ -0,0 +1,414 @@
//! Core placement configuration and management
//!
//! Placement module enables thread placement onto the cores.
//! CPU level affinity assignment is done here.
/// This function tries to retrieve information
/// on all the "cores" active on this system.
pub fn get_core_ids() -> Option<Vec<CoreId>> {
get_core_ids_helper()
}
/// This function tries to retrieve
/// the number of active "cores" on the system.
pub fn get_num_cores() -> Option<usize> {
get_core_ids().map(|ids| ids.len())
}
///
/// Sets the current threads affinity
pub fn set_for_current(core_id: CoreId) {
tracing::trace!("Executor: placement: set affinity on core {}", core_id.id);
set_for_current_helper(core_id);
}
///
/// CoreID implementation to identify system cores.
#[derive(Copy, Clone, Debug)]
pub struct CoreId {
/// Used core ID
pub id: usize,
}
// Linux Section
#[cfg(target_os = "linux")]
#[inline]
fn get_core_ids_helper() -> Option<Vec<CoreId>> {
linux::get_core_ids()
}
#[cfg(target_os = "linux")]
#[inline]
fn set_for_current_helper(core_id: CoreId) {
linux::set_for_current(core_id);
}
#[cfg(target_os = "linux")]
mod linux {
use std::mem;
use libc::{cpu_set_t, sched_getaffinity, sched_setaffinity, CPU_ISSET, CPU_SET, CPU_SETSIZE};
use super::CoreId;
pub fn get_core_ids() -> Option<Vec<CoreId>> {
if let Some(full_set) = get_affinity_mask() {
let mut core_ids: Vec<CoreId> = Vec::new();
for i in 0..CPU_SETSIZE as usize {
if unsafe { CPU_ISSET(i, &full_set) } {
core_ids.push(CoreId { id: i });
}
}
Some(core_ids)
} else {
None
}
}
pub fn set_for_current(core_id: CoreId) {
// Turn `core_id` into a `libc::cpu_set_t` with only
// one core active.
let mut set = new_cpu_set();
unsafe { CPU_SET(core_id.id, &mut set) };
// Set the current thread's core affinity.
unsafe {
sched_setaffinity(
0, // Defaults to current thread
mem::size_of::<cpu_set_t>(),
&set,
);
}
}
fn get_affinity_mask() -> Option<cpu_set_t> {
let mut set = new_cpu_set();
// Try to get current core affinity mask.
let result = unsafe {
sched_getaffinity(
0, // Defaults to current thread
mem::size_of::<cpu_set_t>(),
&mut set,
)
};
if result == 0 {
Some(set)
} else {
None
}
}
fn new_cpu_set() -> cpu_set_t {
unsafe { mem::zeroed::<cpu_set_t>() }
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_linux_get_affinity_mask() {
match get_affinity_mask() {
Some(_) => {}
None => {
panic!();
}
}
}
#[test]
fn test_linux_get_core_ids() {
match get_core_ids() {
Some(set) => {
assert_eq!(set.len(), num_cpus::get());
}
None => {
panic!();
}
}
}
#[test]
fn test_linux_set_for_current() {
let ids = get_core_ids().unwrap();
assert!(!ids.is_empty());
set_for_current(ids[0]);
// Ensure that the system pinned the current thread
// to the specified core.
let mut core_mask = new_cpu_set();
unsafe { CPU_SET(ids[0].id, &mut core_mask) };
let new_mask = get_affinity_mask().unwrap();
let mut is_equal = true;
for i in 0..CPU_SETSIZE as usize {
let is_set1 = unsafe { CPU_ISSET(i, &core_mask) };
let is_set2 = unsafe { CPU_ISSET(i, &new_mask) };
if is_set1 != is_set2 {
is_equal = false;
}
}
assert!(is_equal);
}
}
}
// Windows Section
#[cfg(target_os = "windows")]
#[inline]
fn get_core_ids_helper() -> Option<Vec<CoreId>> {
windows::get_core_ids()
}
#[cfg(target_os = "windows")]
#[inline]
fn set_for_current_helper(core_id: CoreId) {
windows::set_for_current(core_id);
}
#[cfg(target_os = "windows")]
extern crate winapi;
#[cfg(target_os = "windows")]
mod windows {
#[allow(unused_imports)]
use winapi::shared::basetsd::{DWORD_PTR, PDWORD_PTR};
use winapi::um::processthreadsapi::{GetCurrentProcess, GetCurrentThread};
use winapi::um::winbase::{GetProcessAffinityMask, SetThreadAffinityMask};
use super::CoreId;
pub fn get_core_ids() -> Option<Vec<CoreId>> {
if let Some(mask) = get_affinity_mask() {
// Find all active cores in the bitmask.
let mut core_ids: Vec<CoreId> = Vec::new();
for i in 0..usize::MIN.count_zeros() as usize {
let test_mask = 1 << i;
if (mask & test_mask) == test_mask {
core_ids.push(CoreId { id: i });
}
}
Some(core_ids)
} else {
None
}
}
pub fn set_for_current(core_id: CoreId) {
// Convert `CoreId` back into mask.
let mask: DWORD_PTR = 1 << core_id.id;
// Set core affinity for current thread.
unsafe {
SetThreadAffinityMask(GetCurrentThread(), mask);
}
}
fn get_affinity_mask() -> Option<usize> {
let mut process_mask: usize = 0;
let mut system_mask: usize = 0;
let res = unsafe {
GetProcessAffinityMask(
GetCurrentProcess(),
&mut process_mask as PDWORD_PTR,
&mut system_mask as PDWORD_PTR,
)
};
// Successfully retrieved affinity mask
if res != 0 {
Some(process_mask)
}
// Failed to retrieve affinity mask
else {
None
}
}
#[cfg(test)]
mod tests {
use num_cpus;
use super::*;
#[test]
fn test_macos_get_core_ids() {
match get_core_ids() {
Some(set) => {
assert_eq!(set.len(), num_cpus::get());
}
None => {
assert!(false);
}
}
}
#[test]
fn test_macos_set_for_current() {
let ids = get_core_ids().unwrap();
assert!(ids.len() > 0);
set_for_current(ids[0]);
}
}
}
// MacOS Section
#[cfg(target_os = "macos")]
#[inline]
fn get_core_ids_helper() -> Option<Vec<CoreId>> {
macos::get_core_ids()
}
#[cfg(target_os = "macos")]
#[inline]
fn set_for_current_helper(core_id: CoreId) {
macos::set_for_current(core_id);
}
#[cfg(target_os = "macos")]
mod macos {
use std::mem;
use libc::{c_int, c_uint, pthread_self};
use super::CoreId;
type KernReturnT = c_int;
type IntegerT = c_int;
type NaturalT = c_uint;
type ThreadT = c_uint;
type ThreadPolicyFlavorT = NaturalT;
type MachMsgTypeNumberT = NaturalT;
#[repr(C)]
struct ThreadAffinityPolicyDataT {
affinity_tag: IntegerT,
}
type ThreadPolicyT = *mut ThreadAffinityPolicyDataT;
const THREAD_AFFINITY_POLICY: ThreadPolicyFlavorT = 4;
#[link(name = "System", kind = "framework")]
extern "C" {
fn thread_policy_set(
thread: ThreadT,
flavor: ThreadPolicyFlavorT,
policy_info: ThreadPolicyT,
count: MachMsgTypeNumberT,
) -> KernReturnT;
}
pub fn get_core_ids() -> Option<Vec<CoreId>> {
Some(
(0..(num_cpus::get()))
.map(|n| CoreId { id: n as usize })
.collect::<Vec<_>>(),
)
}
pub fn set_for_current(core_id: CoreId) {
let thread_affinity_policy_count: MachMsgTypeNumberT =
mem::size_of::<ThreadAffinityPolicyDataT>() as MachMsgTypeNumberT
/ mem::size_of::<IntegerT>() as MachMsgTypeNumberT;
let mut info = ThreadAffinityPolicyDataT {
affinity_tag: core_id.id as IntegerT,
};
unsafe {
thread_policy_set(
pthread_self() as ThreadT,
THREAD_AFFINITY_POLICY,
&mut info as ThreadPolicyT,
thread_affinity_policy_count,
);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_windows_get_core_ids() {
match get_core_ids() {
Some(set) => {
assert_eq!(set.len(), num_cpus::get());
}
None => {
panic!();
}
}
}
#[test]
fn test_windows_set_for_current() {
let ids = get_core_ids().unwrap();
assert!(ids.len() > 0);
set_for_current(ids[0]);
}
}
}
// Stub Section
#[cfg(not(any(target_os = "linux", target_os = "windows", target_os = "macos")))]
#[inline]
fn get_core_ids_helper() -> Option<Vec<CoreId>> {
None
}
#[cfg(not(any(target_os = "linux", target_os = "windows", target_os = "macos")))]
#[inline]
fn set_for_current_helper(core_id: CoreId) {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_core_ids() {
match get_core_ids() {
Some(set) => {
assert_eq!(set.len(), num_cpus::get());
}
None => {
panic!();
}
}
}
#[test]
fn test_set_for_current() {
let ids = get_core_ids().unwrap();
assert!(!ids.is_empty());
set_for_current(ids[0]);
}
}

View File

@ -0,0 +1,223 @@
//!
//! Pool of threads to run lightweight processes
//!
//! We spawn futures onto the pool with [`spawn`] method of global run queue or
//! with corresponding [`Worker`]'s spawn method.
//!
//! [`spawn`]: crate::pool::spawn
//! [`Worker`]: crate::run_queue::Worker
use crate::thread_manager::{DynamicPoolManager, DynamicRunner};
use crate::worker;
use crossbeam_channel::{unbounded, Receiver, Sender};
use lazy_static::lazy_static;
use lightproc::lightproc::LightProc;
use lightproc::recoverable_handle::RecoverableHandle;
use once_cell::sync::{Lazy, OnceCell};
use std::future::Future;
use std::iter::Iterator;
use std::time::Duration;
use std::{env, thread};
use tracing::trace;
///
/// Spawn a process (which contains future + process stack) onto the executor from the global level.
///
/// # Example
/// ```rust
/// use executor::prelude::*;
///
/// # #[cfg(feature = "tokio-runtime")]
/// # #[tokio::main]
/// # async fn main() {
/// # start();
/// # }
/// #
/// # #[cfg(not(feature = "tokio-runtime"))]
/// # fn main() {
/// # start();
/// # }
/// #
/// # fn start() {
///
/// let handle = spawn(
/// async {
/// panic!("test");
/// },
/// );
///
/// run(
/// async {
/// handle.await;
/// },
/// ProcStack { },
/// );
/// # }
/// ```
pub fn spawn<F, R>(future: F) -> RecoverableHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (task, handle) = LightProc::recoverable(future, worker::schedule);
task.schedule();
handle
}
/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
pub fn spawn_blocking<F, R>(future: F) -> RecoverableHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (task, handle) = LightProc::recoverable(future, schedule);
task.schedule();
handle
}
///
/// Acquire the static Pool reference
#[inline]
pub fn get() -> &'static Pool {
&*POOL
}
pub fn get_manager() -> Option<&'static DynamicPoolManager<AsyncRunner>> {
DYNAMIC_POOL_MANAGER.get()
}
impl Pool {
///
/// Spawn a process (which contains future + process stack) onto the executor via [Pool] interface.
pub fn spawn<F, R>(&self, future: F) -> RecoverableHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (task, handle) = LightProc::recoverable(future, worker::schedule);
task.schedule();
handle
}
}
/// Enqueues work, attempting to send to the thread pool in a
/// nonblocking way and spinning up needed amount of threads
/// based on the previous statistics without relying on
/// if there is not a thread ready to accept the work or not.
pub(crate) fn schedule(t: LightProc) {
if let Err(err) = POOL.sender.try_send(t) {
// We were not able to send to the channel without
// blocking.
POOL.sender.send(err.into_inner()).unwrap();
}
// Add up for every incoming scheduled task
DYNAMIC_POOL_MANAGER.get().unwrap().increment_frequency();
}
///
/// Low watermark value, defines the bare minimum of the pool.
/// Spawns initial thread set.
/// Can be configurable with env var `BASTION_BLOCKING_THREADS` at runtime.
#[inline]
fn low_watermark() -> &'static u64 {
lazy_static! {
static ref LOW_WATERMARK: u64 = {
env::var_os("BASTION_BLOCKING_THREADS")
.map(|x| x.to_str().unwrap().parse::<u64>().unwrap())
.unwrap_or(DEFAULT_LOW_WATERMARK)
};
}
&*LOW_WATERMARK
}
/// If low watermark isn't configured this is the default scaler value.
/// This value is used for the heuristics of the scaler
const DEFAULT_LOW_WATERMARK: u64 = 2;
/// Pool interface between the scheduler and thread pool
#[derive(Debug)]
pub struct Pool {
sender: Sender<LightProc>,
receiver: Receiver<LightProc>,
}
#[derive(Debug)]
pub struct AsyncRunner {
}
impl DynamicRunner for AsyncRunner {
fn run_static(&self, park_timeout: Duration) -> ! {
loop {
for task in &POOL.receiver {
trace!("static: running task");
self.run(task);
}
trace!("static: empty queue, parking with timeout");
thread::park_timeout(park_timeout);
}
}
fn run_dynamic(&self, parker: impl Fn()) -> ! {
loop {
while let Ok(task) = POOL.receiver.try_recv() {
trace!("dynamic thread: running task");
self.run(task);
}
trace!(
"dynamic thread: parking - {:?}",
std::thread::current().id()
);
parker();
}
}
fn run_standalone(&self) {
while let Ok(task) = POOL.receiver.try_recv() {
self.run(task);
}
trace!("standalone thread: quitting.");
}
}
impl AsyncRunner {
fn run(&self, task: LightProc) {
task.run();
}
}
static DYNAMIC_POOL_MANAGER: OnceCell<DynamicPoolManager<AsyncRunner>> = OnceCell::new();
static POOL: Lazy<Pool> = Lazy::new(|| {
#[cfg(feature = "tokio-runtime")]
{
let runner = AsyncRunner {
// We use current() here instead of try_current()
// because we want bastion to crash as soon as possible
// if there is no available runtime.
runtime_handle: tokio::runtime::Handle::current(),
};
DYNAMIC_POOL_MANAGER
.set(DynamicPoolManager::new(*low_watermark() as usize, runner))
.expect("couldn't create dynamic pool manager");
}
#[cfg(not(feature = "tokio-runtime"))]
{
let runner = AsyncRunner {};
DYNAMIC_POOL_MANAGER
.set(DynamicPoolManager::new(*low_watermark() as usize, runner))
.expect("couldn't create dynamic pool manager");
}
DYNAMIC_POOL_MANAGER
.get()
.expect("couldn't get static pool manager")
.initialize();
let (sender, receiver) = unbounded();
Pool { sender, receiver }
});

View File

@ -0,0 +1,5 @@
#[derive(Debug)]
pub struct ProcStack {
}

154
runtime/executor/src/run.rs Normal file
View File

@ -0,0 +1,154 @@
//!
//! Blocking run of the async processes
//!
//!
use crate::worker;
use crossbeam_utils::sync::{Parker, Unparker};
use std::cell::Cell;
use std::future::Future;
use std::mem;
use std::mem::{ManuallyDrop, MaybeUninit};
use std::pin::Pin;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use crate::proc_stack::ProcStack;
///
/// This method blocks the current thread until passed future is resolved with an output.
///
/// It is called `block_on` or `blocking` in some executors.
///
/// # Example
/// ```rust
/// use executor::prelude::*;
/// use lightproc::prelude::*;
/// let mut sum = 0;
///
/// run(
/// async {
/// (0..10_000_000).for_each(|_| {
/// sum += 1;
/// });
/// },
/// ProcStack::default(),
/// );
/// ```
pub fn run<F, T>(future: F, stack: ProcStack) -> T
where
F: Future<Output = T>,
{
unsafe {
// An explicitly uninitialized `T`. Until `assume_init` is called this will not call any
// drop code for T
let mut out = MaybeUninit::uninit();
// Wrap the future into one that stores the result into `out`.
let future = {
let out = out.as_mut_ptr();
async move {
*out = future.await;
}
};
// Pin the future onto the stack.
pin_utils::pin_mut!(future);
// Extend the lifetime of the future to 'static.
let future = mem::transmute::<
Pin<&'_ mut dyn Future<Output = ()>>,
Pin<&'static mut dyn Future<Output = ()>>,
>(future);
// Block on the future and and wait for it to complete.
worker::set_stack(&stack, || block(future));
// Assume that if the future completed and didn't panic it fully initialized its output
out.assume_init()
}
}
fn block<F, T>(f: F) -> T
where
F: Future<Output = T>,
{
thread_local! {
// May hold a pre-allocated parker that can be reused for efficiency.
//
// Note that each invocation of `block` needs its own parker. In particular, if `block`
// recursively calls itself, we must make sure that each recursive call uses a distinct
// parker instance.
static CACHE: Cell<Option<Parker>> = Cell::new(None);
}
pin_utils::pin_mut!(f);
CACHE.with(|cache| {
// Reuse a cached parker or create a new one for this invocation of `block`.
let parker: Parker = cache.take().unwrap_or_else(|| Parker::new());
let ptr = Unparker::into_raw(parker.unparker().clone());
let vt = vtable();
// Waker must not be dropped until it's no longer required. We also happen to know that a
// Parker contains at least one reference to `Unparker` so the relevant `Unparker` will not
// be dropped at least until the `Parker` is.
let waker = unsafe { Waker::from_raw(RawWaker::new(ptr, vt)) };
let cx = &mut Context::from_waker(&waker);
loop {
if let Poll::Ready(t) = f.as_mut().poll(cx) {
// Save the parker for the next invocation of `block`.
cache.set(Some(parker));
return t;
}
parker.park();
}
})
}
fn vtable() -> &'static RawWakerVTable {
/// This function will be called when the RawWaker gets cloned, e.g. when the Waker in which
/// the RawWaker is stored gets cloned.
//
/// The implementation of this function must retain all resources that are required for this
/// additional instance of a RawWaker and associated task. Calling wake on the resulting
/// RawWaker should result in a wakeup of the same task that would have been awoken by the
/// original RawWaker.
unsafe fn clone_raw(ptr: *const ()) -> RawWaker {
// [`Unparker`] implements `Clone` and upholds the contract stated above. The current
// Implementation is simply an Arc over the actual inner values.
let unparker = Unparker::from_raw(ptr).clone();
RawWaker::new(Unparker::into_raw(unparker), vtable())
}
/// This function will be called when wake is called on the Waker. It must wake up the task
/// associated with this RawWaker.
///
/// The implementation of this function must make sure to release any resources that are
/// associated with this instance of a RawWaker and associated task.
unsafe fn wake_raw(ptr: *const ()) {
// We reconstruct the Unparker from the pointer here thus ensuring it is dropped at the
// end of this function call.
Unparker::from_raw(ptr).unpark();
}
/// This function will be called when wake_by_ref is called on the Waker. It must wake up the
/// task associated with this RawWaker.
///
/// This function is similar to wake, but must not consume the provided data pointer.
unsafe fn wake_by_ref_raw(ptr: *const ()) {
// We **must not** drop the resulting Unparker so we wrap it in `ManuallyDrop`.
let unparker = ManuallyDrop::new(Unparker::from_raw(ptr));
unparker.unpark();
}
/// This function gets called when a RawWaker gets dropped.
///
/// The implementation of this function must make sure to release any resources that are
/// associated with this instance of a RawWaker and associated task.
unsafe fn drop_raw(ptr: *const ()) {
drop(Unparker::from_raw(ptr))
}
&RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw)
}

View File

@ -0,0 +1,67 @@
//!
//! Where workers went to parking while no workload is in their worker queue.
//!
//! If a workload received pool will wake them up.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Condvar, Mutex};
/// The place where worker threads go to sleep.
///
/// Similar to how thread parking works, if a notification comes up while no threads are sleeping,
/// the next thread that attempts to go to sleep will pick up the notification immediately.
#[derive(Debug)]
#[allow(clippy::mutex_atomic)]
pub struct Sleepers {
/// How many threads are currently a sleep.
sleep: Mutex<usize>,
/// A condvar for notifying sleeping threads.
wake: Condvar,
/// Set to `true` if a notification came up while nobody was sleeping.
notified: AtomicBool,
}
#[allow(clippy::mutex_atomic)]
impl Default for Sleepers {
/// Creates a new `Sleepers`.
fn default() -> Self {
Self {
sleep: Mutex::new(0),
wake: Condvar::new(),
notified: AtomicBool::new(false),
}
}
}
#[allow(clippy::mutex_atomic)]
impl Sleepers {
/// Creates a new `Sleepers`.
pub fn new() -> Self {
Self::default()
}
/// Puts the current thread to sleep.
pub fn wait(&self) {
let mut sleep = self.sleep.lock().unwrap();
if !self.notified.swap(false, Ordering::SeqCst) {
*sleep += 1;
std::mem::drop(self.wake.wait(sleep).unwrap());
}
}
/// Notifies one thread.
pub fn notify_one(&self) {
if !self.notified.load(Ordering::SeqCst) {
let mut sleep = self.sleep.lock().unwrap();
if *sleep > 0 {
*sleep -= 1;
self.wake.notify_one();
} else {
self.notified.store(true, Ordering::SeqCst);
}
}
}
}

View File

@ -0,0 +1,404 @@
//! A thread manager to predict how many threads should be spawned to handle the upcoming load.
//!
//! The thread manager consists of three elements:
//! * Frequency Detector
//! * Trend Estimator
//! * Predictive Upscaler
//!
//! ## Frequency Detector
//! Detects how many tasks are submitted from scheduler to thread pool in a given time frame.
//! Pool manager thread does this sampling every 90 milliseconds.
//! This value is going to be used for trend estimation phase.
//!
//! ## Trend Estimator
//! Hold up to the given number of frequencies to create an estimation.
//! Trend estimator holds 10 frequencies at a time.
//! This value is stored as constant in [FREQUENCY_QUEUE_SIZE](constant.FREQUENCY_QUEUE_SIZE.html).
//! Estimation algorithm and prediction uses Exponentially Weighted Moving Average algorithm.
//!
//! This algorithm is adapted from [A Novel Predictive and SelfAdaptive Dynamic Thread Pool Management](https://doi.org/10.1109/ISPA.2011.61)
//! and altered to:
//! * use instead of heavy calculation of trend, utilize thread redundancy which is the sum of the differences between the predicted and observed value.
//! * use instead of linear trend estimation, it uses exponential trend estimation where formula is:
//! ```text
//! LOW_WATERMARK * (predicted - observed) + LOW_WATERMARK
//! ```
//! *NOTE:* If this algorithm wants to be tweaked increasing [LOW_WATERMARK](constant.LOW_WATERMARK.html) will automatically adapt the additional dynamic thread spawn count
//! * operate without watermarking by timestamps (in paper which is used to measure algorithms own performance during the execution)
//! * operate extensive subsampling. Extensive subsampling congests the pool manager thread.
//! * operate without keeping track of idle time of threads or job out queue like TEMA and FOPS implementations.
//!
//! ## Predictive Upscaler
//! Upscaler has three cases (also can be seen in paper):
//! * The rate slightly increases and there are many idle threads.
//! * The number of worker threads tends to be reduced since the workload of the system is descending.
//! * The system has no request or stalled. (Our case here is when the current tasks block further tasks from being processed throughput hogs)
//!
//! For the first two EMA calculation and exponential trend estimation gives good performance.
//! For the last case, upscaler selects upscaling amount by amount of tasks mapped when throughput hogs happen.
//!
//! **example scenario:** Let's say we have 10_000 tasks where every one of them is blocking for 1 second. Scheduler will map plenty of tasks but will get rejected.
//! This makes estimation calculation nearly 0 for both entering and exiting parts. When this happens and we still see tasks mapped from scheduler.
//! We start to slowly increase threads by amount of frequency linearly. High increase of this value either make us hit to the thread threshold on
//! some OS or make congestion on the other thread utilizations of the program, because of context switch.
//!
//! Throughput hogs determined by a combination of job in / job out frequency and current scheduler task assignment frequency.
//! Threshold of EMA difference is eluded by machine epsilon for floating point arithmetic errors.
use crate::{load_balancer, placement};
use core::fmt;
use crossbeam_queue::ArrayQueue;
use fmt::{Debug, Formatter};
use lazy_static::lazy_static;
use lever::prelude::TTas;
use placement::CoreId;
use std::collections::{HashMap, VecDeque};
use std::time::Duration;
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Mutex,
},
thread::{self, Thread},
};
use std::any::Any;
use std::panic::resume_unwind;
use std::thread::{JoinHandle, ThreadId};
use crossbeam_deque::Worker;
use crossbeam_utils::sync::{Parker, Unparker};
use tracing::{debug, trace};
use lightproc::lightproc::LightProc;
/// The default thread park timeout before checking for new tasks.
const THREAD_PARK_TIMEOUT: Duration = Duration::from_millis(1);
/// Frequency histogram's sliding window size.
/// Defines how many frequencies will be considered for adaptation.
const FREQUENCY_QUEUE_SIZE: usize = 10;
/// If low watermark isn't configured this is the default scaler value.
/// This value is used for the heuristics of the scaler
const DEFAULT_LOW_WATERMARK: u64 = 2;
/// Pool scaler interval time (milliseconds).
/// This is the actual interval which makes adaptation calculation.
const SCALER_POLL_INTERVAL: u64 = 90;
/// Exponential moving average smoothing coefficient for limited window.
/// Smoothing factor is estimated with: 2 / (N + 1) where N is sample size.
const EMA_COEFFICIENT: f64 = 2_f64 / (FREQUENCY_QUEUE_SIZE as f64 + 1_f64);
lazy_static! {
static ref ROUND_ROBIN_PIN: Mutex<CoreId> = Mutex::new(CoreId { id: 0 });
}
/// The `DynamicRunner` is piloted by `DynamicPoolManager`.
/// Upon request it needs to be able to provide runner routines for:
/// * Static threads.
/// * Dynamic threads.
/// * Standalone threads.
///
/// Your implementation of `DynamicRunner`
/// will allow you to define what tasks must be accomplished.
///
/// Run static threads:
///
/// run_static should never return, and park for park_timeout instead.
///
/// Run dynamic threads:
/// run_dynamic should never return, and call `parker()` when it has no more tasks to process.
/// It will be unparked automatically by the `DynamicPoolManager` if needs be.
///
/// Run standalone threads:
/// run_standalone should return once it has no more tasks to process.
/// The `DynamicPoolManager` will spawn other standalone threads if needs be.
pub trait DynamicRunner {
fn run_static(&self, park_timeout: Duration) -> ! {
let parker = Parker::new();
self.run_dynamic(|| parker.park_timeout(park_timeout));
}
fn run_dynamic(&self, parker: impl Fn()) -> !;
fn run_standalone(&self);
}
/// The `DynamicPoolManager` is responsible for
/// growing and shrinking a pool according to EMA rules.
///
/// It needs to be passed a structure that implements `DynamicRunner`,
/// That will be responsible for actually spawning threads.
///
/// The `DynamicPoolManager` keeps track of the number
/// of required number of threads to process load correctly.
/// and depending on the current state it will case it will:
/// - Spawn a lot of threads (we're predicting a load spike, and we need to prepare for it)
/// - Spawn few threads (there's a constant load, and throughput is low because the current resources are busy)
/// - Do nothing (the load is shrinking, threads will automatically stop once they're done).
///
/// Kinds of threads:
///
/// ## Static threads:
/// Defined in the constructor, they will always be available. They park for `THREAD_PARK_TIMEOUT` on idle.
///
/// ## Dynamic threads:
/// Created during `DynamicPoolManager` initialization, they will park on idle.
/// The `DynamicPoolManager` grows the number of Dynamic threads
/// so the total number of Static threads + Dynamic threads
/// is the number of available cores on the machine. (`num_cpus::get()`)
///
/// ## Standalone threads:
/// They are created when there aren't enough static and dynamic threads to process the expected load.
/// They will be destroyed on idle.
///
/// ## Spawn order:
/// In order to handle a growing load, the pool manager will ask to:
/// - Use Static threads
/// - Unpark Dynamic threads
/// - Spawn Standalone threads
///
/// The pool manager is not responsible for the tasks to be performed by the threads, it's handled by the `DynamicRunner`
///
/// If you use tracing, you can have a look at the trace! logs generated by the structure.
///
pub struct DynamicPoolManager<Runner> {
static_threads: usize,
dynamic_threads: usize,
parked_threads: ArrayQueue<Unparker>,
runner: Runner,
last_frequency: AtomicU64,
frequencies: TTas<VecDeque<u64>>,
}
impl<Runner: Debug> Debug for DynamicPoolManager<Runner> {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
fmt.debug_struct("DynamicPoolManager")
.field("static_threads", &self.static_threads)
.field("dynamic_threads", &self.dynamic_threads)
.field("parked_threads", &self.parked_threads.len())
.field("runner", &self.runner)
.field("last_frequency", &self.last_frequency)
.field("frequencies", &self.frequencies.try_lock())
.finish()
}
}
impl<Runner: DynamicRunner + Sync + Send> DynamicPoolManager<Runner> {
pub fn new(static_threads: usize, runner: Runner) -> Self {
let dynamic_threads = 1.max(num_cpus::get().checked_sub(static_threads).unwrap_or(0));
Self {
static_threads,
dynamic_threads,
parked_threads: ArrayQueue::new(dynamic_threads),
runner,
last_frequency: AtomicU64::new(0),
frequencies: TTas::new(VecDeque::with_capacity(
FREQUENCY_QUEUE_SIZE.saturating_add(1),
)),
}
}
pub fn increment_frequency(&self) {
self.last_frequency.fetch_add(1, Ordering::Acquire);
}
/// Initialize the dynamic pool
/// That will be scaled
pub fn initialize(&'static self) {
// Static thread manager that will always be available
trace!("spooling up {} static worker threads", self.static_threads);
(0..self.static_threads).for_each(|n| {
let runner = &self.runner;
thread::Builder::new()
.name(format!("static #{}", n))
.spawn(move || {
Self::affinity_pinner();
runner.run_static(THREAD_PARK_TIMEOUT);
})
.expect("failed to spawn static worker thread");
});
// Dynamic thread manager that will allow us to unpark threads when needed
trace!("spooling up {} dynamic worker threads", self.dynamic_threads);
(0..self.dynamic_threads).for_each(|n| {
let runner = &self.runner;
thread::Builder::new()
.name(format!("dynamic #{}", n))
.spawn(move || {
Self::affinity_pinner();
let parker = Parker::new();
let unparker = parker.unparker();
runner.run_dynamic(|| self.park_thread(&parker, unparker));
})
.expect("failed to spawn dynamic worker thread");
});
// Pool manager to check frequency of task rates
// and take action by scaling the pool accordingly.
thread::Builder::new()
.name("pool manager".to_string())
.spawn(move || {
let poll_interval = Duration::from_millis(SCALER_POLL_INTERVAL);
trace!("setting up the pool manager");
loop {
self.scale_pool();
thread::park_timeout(poll_interval);
}
})
.expect("failed to spawn pool manager thread");
}
/// Provision threads takes a number of threads that need to be made available.
/// It will try to unpark threads from the dynamic pool, and spawn more threads if needs be.
pub fn provision_threads(&'static self, n: usize) {
for i in 0..n {
if !self.unpark_thread() {
let new_threads = n - i;
trace!(
"no more threads to unpark, spawning {} new threads",
new_threads
);
return self.spawn_threads(new_threads);
}
}
}
fn spawn_threads(&'static self, n: usize) {
(0..n).for_each(|_| {
let runner = &self.runner;
thread::Builder::new()
.name("standalone worker".to_string())
.spawn(move || {
Self::affinity_pinner();
runner.run_standalone();
})
.unwrap();
})
}
/// Parks a thread until [`unpark_thread`] unparks it
pub fn park_thread(&self, parker: &Parker, unparker: &Unparker) {
if let Err(unparker) = self.parked_threads
// Unparker is an Arc internally so this is (comparatively) cheap to do.
.push(unparker.clone()) {
panic!("Failed to park with {:?}", unparker);
}
trace!("parking thread {:?}", std::thread::current().id());
parker.park();
}
/// Pops a thread from the parked_threads queue and unparks it.
///
/// Returns true if there were threads to unpark
fn unpark_thread(&self) -> bool {
trace!("parked_threads: len is {}", self.parked_threads.len());
if let Some(unparker) = self.parked_threads.pop() {
debug!("Unparking thread with {:?}", &unparker);
unparker.unpark();
true
} else {
false
}
}
/// Affinity pinner for blocking pool
///
/// Pinning isn't going to be enabled for single core systems.
#[inline]
fn affinity_pinner() {
if 1 != *load_balancer::core_count() {
let mut core = ROUND_ROBIN_PIN.lock().unwrap();
placement::set_for_current(*core);
core.id = (core.id + 1) % *load_balancer::core_count();
}
}
/// Exponentially Weighted Moving Average calculation
///
/// This allows us to find the EMA value.
/// This value represents the trend of tasks mapped onto the thread pool.
/// Calculation is following:
/// ```text
/// +--------+-----------------+----------------------------------+
/// | Symbol | Identifier | Explanation |
/// +--------+-----------------+----------------------------------+
/// | α | EMA_COEFFICIENT | smoothing factor between 0 and 1 |
/// | Yt | freq | frequency sample at time t |
/// | St | acc | EMA at time t |
/// +--------+-----------------+----------------------------------+
/// ```
/// Under these definitions formula is following:
/// ```text
/// EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St
/// ```
/// # Arguments
///
/// * `freq_queue` - Sliding window of frequency samples
#[inline]
fn calculate_ema(freq_queue: &VecDeque<u64>) -> f64 {
freq_queue.iter().enumerate().fold(0_f64, |acc, (i, freq)| {
acc + ((*freq as f64) * ((1_f64 - EMA_COEFFICIENT).powf(i as f64) as f64))
}) * EMA_COEFFICIENT as f64
}
/// Adaptive pool scaling function
///
/// This allows to spawn new threads to make room for incoming task pressure.
/// Works in the background detached from the pool system and scales up the pool based
/// on the request rate.
///
/// It uses frequency based calculation to define work. Utilizing average processing rate.
fn scale_pool(&'static self) {
// Fetch current frequency, it does matter that operations are ordered in this approach.
let current_frequency = self.last_frequency.swap(0, Ordering::SeqCst);
let mut freq_queue = self.frequencies.lock();
// Make it safe to start for calculations by adding initial frequency scale
if freq_queue.len() == 0 {
freq_queue.push_back(0);
}
// Calculate message rate for the given time window
let frequency = (current_frequency as f64 / SCALER_POLL_INTERVAL as f64) as u64;
// Calculates current time window's EMA value (including last sample)
let prev_ema_frequency = Self::calculate_ema(&freq_queue);
// Add seen frequency data to the frequency histogram.
freq_queue.push_back(frequency);
if freq_queue.len() == FREQUENCY_QUEUE_SIZE.saturating_add(1) {
freq_queue.pop_front();
}
// Calculates current time window's EMA value (including last sample)
let curr_ema_frequency = Self::calculate_ema(&freq_queue);
// Adapts the thread count of pool
//
// Sliding window of frequencies visited by the pool manager.
// Pool manager creates EMA value for previous window and current window.
// Compare them to determine scaling amount based on the trends.
// If current EMA value is bigger, we will scale up.
if curr_ema_frequency > prev_ema_frequency {
// "Scale by" amount can be seen as "how much load is coming".
// "Scale" amount is "how many threads we should spawn".
let scale_by: f64 = curr_ema_frequency - prev_ema_frequency;
let scale = num_cpus::get().min(
((DEFAULT_LOW_WATERMARK as f64 * scale_by) + DEFAULT_LOW_WATERMARK as f64) as usize,
);
trace!("unparking {} threads", scale);
// It is time to scale the pool!
self.provision_threads(scale);
} else if (curr_ema_frequency - prev_ema_frequency).abs() < f64::EPSILON
&& current_frequency != 0
{
// Throughput is low. Allocate more threads to unblock flow.
// If we fall to this case, scheduler is congested by longhauling tasks.
// For unblock the flow we should add up some threads to the pool, but not that many to
// stagger the program's operation.
trace!("unparking {} threads", DEFAULT_LOW_WATERMARK);
self.provision_threads(DEFAULT_LOW_WATERMARK as usize);
}
}
}

View File

@ -0,0 +1,93 @@
//!
//! SMP parallelism based cache affine worker implementation
//!
//! This worker implementation relies on worker run queue statistics which are hold in the pinned global memory
//! where workload distribution calculated and amended to their own local queues.
use crate::pool;
use lightproc::prelude::*;
use std::cell::Cell;
use std::ptr;
use std::time::Duration;
use crossbeam_deque::{Stealer, Worker};
use crate::proc_stack::ProcStack;
/// The timeout we'll use when parking before an other Steal attempt
pub const THREAD_PARK_TIMEOUT: Duration = Duration::from_millis(1);
thread_local! {
static STACK: Cell<*const ProcStack> = Cell::new(ptr::null_mut());
}
///
/// Set the current process's stack during the run of the future.
pub(crate) fn set_stack<F, R>(stack: *const ProcStack, f: F) -> R
where
F: FnOnce() -> R,
{
struct ResetStack<'a>(&'a Cell<*const ProcStack>);
impl Drop for ResetStack<'_> {
fn drop(&mut self) {
self.0.set(ptr::null());
}
}
STACK.with(|st| {
st.set(stack);
// create a guard to reset STACK even if the future panics. This is important since we
// must not drop the pointed-to ProcStack here in any case.
let _guard = ResetStack(st);
f()
})
}
/*
pub(crate) fn get_proc_stack<F, R>(f: F) -> Option<R>
where
F: FnOnce(&ProcStack) -> R,
{
let res = STACK.try_with(|st| unsafe { st.get().as_ref().map(f) });
match res {
Ok(Some(val)) => Some(val),
Ok(None) | Err(_) => None,
}
}
///
/// Get the stack currently in use for this thread
pub fn current() -> ProcStack {
get_proc_stack(|proc| proc.clone())
.expect("`proc::current()` called outside the context of the proc")
}
*/
pub(crate) fn schedule(proc: LightProc) {
pool::schedule(proc)
}
/// A worker thread running futures locally and stealing work from other workers if it runs empty.
pub struct WorkerThread {
queue: Worker<LightProc>,
}
impl WorkerThread {
pub fn new() -> Self {
Self {
queue: Worker::new_fifo(),
}
}
pub fn stealer(&self) -> Stealer<LightProc> {
self.queue.stealer()
}
pub fn tick(&self) {
if let Some(lightproc) = self.queue.pop() {
lightproc.run()
}
}
}

View File

@ -0,0 +1,26 @@
#[cfg(test)]
mod tests {
use bastion_executor::{placement, pool};
#[test]
fn affinity_replacement() {
let core_ids = placement::get_core_ids().unwrap();
dbg!(core_ids);
}
#[cfg(feature = "tokio-runtime")]
mod tokio_tests {
#[tokio::test]
async fn pool_check() {
super::pool::get();
}
}
#[cfg(not(feature = "tokio-runtime"))]
mod no_tokio_tests {
#[test]
fn pool_check() {
super::pool::get();
}
}
}

View File

@ -0,0 +1,38 @@
use bastion_executor::blocking;
use bastion_executor::run::run;
use lightproc::proc_stack::ProcStack;
use std::thread;
use std::time::Duration;
#[cfg(feature = "tokio-runtime")]
mod tokio_tests {
#[tokio::test]
async fn test_run_blocking() {
super::run_test()
}
}
#[cfg(not(feature = "tokio-runtime"))]
mod no_tokio_tests {
#[test]
fn test_run_blocking() {
super::run_test()
}
}
fn run_test() {
let output = run(
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
42
},
ProcStack::default(),
),
ProcStack::default(),
)
.unwrap();
assert_eq!(42, output);
}

View File

@ -0,0 +1,155 @@
use bastion_executor::blocking;
use bastion_executor::run::run;
use futures::future::join_all;
use lightproc::proc_stack::ProcStack;
use lightproc::recoverable_handle::RecoverableHandle;
use std::thread;
use std::time::Duration;
use std::time::Instant;
// Test for slow joins without task bursts during joins.
#[test]
#[ignore]
fn slow_join() {
let thread_join_time_max = 11_000;
let start = Instant::now();
// Send an initial batch of million bursts.
let handles = (0..1_000_000)
.map(|_| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
},
ProcStack::default(),
)
})
.collect::<Vec<RecoverableHandle<()>>>();
run(join_all(handles), ProcStack::default());
// Let them join to see how it behaves under different workloads.
let duration = Duration::from_millis(thread_join_time_max);
thread::sleep(duration);
// Spawn yet another batch of work on top of it
let handles = (0..10_000)
.map(|_| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(100);
thread::sleep(duration);
},
ProcStack::default(),
)
})
.collect::<Vec<RecoverableHandle<()>>>();
run(join_all(handles), ProcStack::default());
// Slow joins shouldn't cause internal slow down
let elapsed = start.elapsed().as_millis() - thread_join_time_max as u128;
println!("Slow task join. Monotonic exec time: {:?} ns", elapsed);
// Previous implementation is around this threshold.
}
// Test for slow joins with task burst.
#[test]
#[ignore]
fn slow_join_interrupted() {
let thread_join_time_max = 2_000;
let start = Instant::now();
// Send an initial batch of million bursts.
let handles = (0..1_000_000)
.map(|_| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
},
ProcStack::default(),
)
})
.collect::<Vec<RecoverableHandle<()>>>();
run(join_all(handles), ProcStack::default());
// Let them join to see how it behaves under different workloads.
// This time join under the time window.
let duration = Duration::from_millis(thread_join_time_max);
thread::sleep(duration);
// Spawn yet another batch of work on top of it
let handles = (0..10_000)
.map(|_| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(100);
thread::sleep(duration);
},
ProcStack::default(),
)
})
.collect::<Vec<RecoverableHandle<()>>>();
run(join_all(handles), ProcStack::default());
// Slow joins shouldn't cause internal slow down
let elapsed = start.elapsed().as_millis() - thread_join_time_max as u128;
println!("Slow task join. Monotonic exec time: {:?} ns", elapsed);
// Previous implementation is around this threshold.
}
// This test is expensive but it proves that longhauling tasks are working in adaptive thread pool.
// Thread pool which spawns on-demand will panic with this test.
#[test]
#[ignore]
fn longhauling_task_join() {
let thread_join_time_max = 11_000;
let start = Instant::now();
// First batch of overhauling tasks
let _ = (0..100_000)
.map(|_| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1000);
thread::sleep(duration);
},
ProcStack::default(),
)
})
.collect::<Vec<RecoverableHandle<()>>>();
// Let them join to see how it behaves under different workloads.
let duration = Duration::from_millis(thread_join_time_max);
thread::sleep(duration);
// Send yet another medium sized batch to see how it scales.
let handles = (0..10_000)
.map(|_| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(100);
thread::sleep(duration);
},
ProcStack::default(),
)
})
.collect::<Vec<RecoverableHandle<()>>>();
run(join_all(handles), ProcStack::default());
// Slow joins shouldn't cause internal slow down
let elapsed = start.elapsed().as_millis() - thread_join_time_max as u128;
println!(
"Long-hauling task join. Monotonic exec time: {:?} ns",
elapsed
);
// Previous implementation will panic when this test is running.
}