diff --git a/Cargo.lock b/Cargo.lock index 6def150..f01992f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1128,6 +1128,8 @@ dependencies = [ "parking_lot", "pin-utils", "rand", + "sharded-slab", + "thread_local", "tracing", "tracing-subscriber", ] diff --git a/bffhd/capnp/mod.rs b/bffhd/capnp/mod.rs index d19ac28..42ba89f 100644 --- a/bffhd/capnp/mod.rs +++ b/bffhd/capnp/mod.rs @@ -3,7 +3,7 @@ use async_net::TcpListener; use capnp_rpc::rpc_twoparty_capnp::Side; use capnp_rpc::twoparty::VatNetwork; use capnp_rpc::RpcSystem; -use executor::prelude::Executor; +use executor::prelude::{Executor, GroupId, SupervisionRegistry}; use futures_rustls::server::TlsStream; use futures_rustls::TlsAcceptor; use futures_util::stream::FuturesUnordered; @@ -167,6 +167,7 @@ impl APIServer { tracing::error!("Error during RPC handling: {}", e); } }; - self.executor.spawn_local(f); + let cgroup = SupervisionRegistry::with(SupervisionRegistry::new_group); + self.executor.spawn_local_cgroup(f, cgroup); } } diff --git a/runtime/executor/Cargo.toml b/runtime/executor/Cargo.toml index be7e348..b11121f 100644 --- a/runtime/executor/Cargo.toml +++ b/runtime/executor/Cargo.toml @@ -51,6 +51,10 @@ hdrhistogram = "7.5" # Stats & Tracing tracing = "0.1" +# Supervision trees +sharded-slab = "0.1" +thread_local = "1.1" + [dev-dependencies] async-std = "1.10.0" tracing = { version = "0.1.19", features = ["max_level_trace"]} diff --git a/runtime/executor/src/lib.rs b/runtime/executor/src/lib.rs index 96024ea..60fa1f9 100644 --- a/runtime/executor/src/lib.rs +++ b/runtime/executor/src/lib.rs @@ -40,4 +40,6 @@ mod worker; /// Prelude of Bastion Executor pub mod prelude { pub use crate::pool::*; + pub use crate::supervision::SupervisionRegistry; + pub use lightproc::GroupId; } diff --git a/runtime/executor/src/pool.rs b/runtime/executor/src/pool.rs index e5038ba..9686706 100644 --- a/runtime/executor/src/pool.rs +++ b/runtime/executor/src/pool.rs @@ -8,6 +8,7 @@ //! [`Worker`]: crate::run_queue::Worker use crate::run::block; +use crate::supervision::SupervisionRegistry; use crate::thread_manager::{DynamicRunner, ThreadManager}; use crate::worker::{Sleeper, WorkerThread}; use crossbeam_deque::{Injector, Stealer}; @@ -49,12 +50,19 @@ impl Spooler<'_> { /// Global executor pub struct Executor<'a> { spooler: Arc>, + root_cgroup: GroupId, } impl<'a, 'executor: 'a> Executor<'executor> { pub fn new() -> Self { + let root_cgroup = SupervisionRegistry::with(|registry| { + let cgroup = registry.new_root_group(); + registry.set_current(&cgroup); + cgroup + }); Executor { spooler: Arc::new(Spooler::new()), + root_cgroup, } } @@ -105,6 +113,8 @@ impl<'a, 'executor: 'a> Executor<'executor> { R: Send + 'a, { let location = std::panic::Location::caller(); + let cgroup = SupervisionRegistry::current(); + let id = cgroup.as_ref().map(|id| id.into_u64()).unwrap_or(0); let span = tracing::trace_span!( target: "executor::task", "runtime.spawn", @@ -112,8 +122,8 @@ impl<'a, 'executor: 'a> Executor<'executor> { loc.line = location.line(), loc.col = location.column(), kind = "global", + cgroup = id, ); - let cgroup = None; let (task, handle) = LightProc::recoverable(future, self.schedule(), span, cgroup); tracing::trace!("spawning sendable task"); @@ -128,6 +138,8 @@ impl<'a, 'executor: 'a> Executor<'executor> { R: Send + 'a, { let location = std::panic::Location::caller(); + let cgroup = SupervisionRegistry::current(); + let id = cgroup.as_ref().map(|id| id.into_u64()).unwrap_or(0); let span = tracing::trace_span!( target: "executor::task", "runtime.spawn", @@ -135,8 +147,8 @@ impl<'a, 'executor: 'a> Executor<'executor> { loc.line = location.line(), loc.col = location.column(), kind = "local", + cgroup = id, ); - let cgroup = None; let (task, handle) = LightProc::recoverable(future, schedule_local(), span, cgroup); tracing::trace!("spawning sendable task"); @@ -144,6 +156,29 @@ impl<'a, 'executor: 'a> Executor<'executor> { handle } + #[track_caller] + pub fn spawn_local_cgroup(&self, future: F, cgroup: GroupId) -> RecoverableHandle + where + F: Future + 'a, + R: Send + 'a, + { + let location = std::panic::Location::caller(); + let span = tracing::trace_span!( + target: "executor::task", + "runtime.spawn", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + kind = "local", + cgroup = cgroup.into_u64(), + ); + + let (task, handle) = LightProc::recoverable(future, schedule_local(), span, Some(cgroup)); + tracing::trace!("spawning sendable task"); + task.schedule(); + handle + } + /// Block the calling thread until the given future completes. /// /// # Example diff --git a/runtime/executor/src/supervision.rs b/runtime/executor/src/supervision.rs index 9694c84..a11cfe0 100644 --- a/runtime/executor/src/supervision.rs +++ b/runtime/executor/src/supervision.rs @@ -1,3 +1,179 @@ -pub(crate) struct SupervisionRegistry {} +use lightproc::GroupId; +use once_cell::sync::OnceCell; +use sharded_slab::pool::Ref; +use sharded_slab::{Clear, Pool}; +use std::borrow::Borrow; +use std::cell; +use std::cell::RefCell; +use std::sync::atomic::{fence, AtomicUsize, Ordering}; +use thread_local::ThreadLocal; -impl SupervisionRegistry {} +static REGISTRY: OnceCell = OnceCell::new(); + +fn id_to_idx(id: &GroupId) -> usize { + (id.into_u64() as usize).wrapping_sub(1) +} + +fn idx_to_id(idx: usize) -> GroupId { + GroupId::from_u64(idx.wrapping_add(1) as u64) +} + +pub struct SupervisionRegistry { + groups: Pool, + // TODO: would this be better as the full stack? + current: ThreadLocal>, +} + +impl SupervisionRegistry { + fn new() -> Self { + Self { + groups: Pool::new(), + current: ThreadLocal::new(), + } + } + + pub fn with(f: impl FnOnce(&Self) -> T) -> T { + let this = REGISTRY.get_or_init(SupervisionRegistry::new); + f(&this) + } + + pub(crate) fn get(&self, id: &GroupId) -> Option> { + self.groups.get(id_to_idx(id)) + } + + #[inline] + pub fn current_ref(&self) -> Option> { + self.current.get().map(|c| c.borrow()) + } + + pub fn current() -> Option { + Self::with(|this| this.current_ref().map(|id| this.clone_group(&id))) + } + + pub(crate) fn set_current(&self, id: &GroupId) { + self.current.get_or(|| RefCell::new(id.clone())); + } + + pub fn new_root_group(&self) -> GroupId { + self.new_group_inner(None) + } + + pub fn new_group(&self) -> GroupId { + let parent = self.current_ref().map(|id| self.clone_group(&id)); + self.new_group_inner(parent) + } + + fn new_group_inner(&self, parent: Option) -> GroupId { + tracing::trace_span!( + target: "executor::supervision", + "new_group" + ); + let parent_id = parent.as_ref().map(|id| id.into_non_zero_u64()); + let idx = self + .groups + .create_with(|group| { + group.parent = parent; + + let ref_cnt = group.ref_count.get_mut(); + debug_assert_eq!(0, *ref_cnt); + *ref_cnt = 1; + }) + .expect("Failed to allocate a new group"); + + let id = idx_to_id(idx); + tracing::trace!( + target: "executor::supervision", + parent = parent_id, + id = id.into_non_zero_u64(), + "process group created" + ); + + id + } + + fn clone_group(&self, id: &GroupId) -> GroupId { + tracing::trace!( + target: "executor::supervision", + id = id.into_u64(), + "cloning process group" + ); + let group = self + .get(&id) + .unwrap_or_else(|| panic!("tried to clone group {:?}, but no such group exists!", id)); + + let ref_cnt = group.ref_count.fetch_add(1, Ordering::Relaxed); + assert_ne!( + 0, ref_cnt, + "tried cloning group {:?} that was already closed", + id + ); + id.clone() + } + + /// Try to close the group with the given ID + /// + /// If this method returns `true` the Group was closed. Otherwise there are still references + /// left open. + fn try_close(&self, id: GroupId) -> bool { + tracing::trace!( + target: "executor::supervision", + id = id.into_u64(), + "dropping process group" + ); + let group = match self.get(&id) { + None if std::thread::panicking() => return false, + None => panic!("tried to drop a ref to {:?}, but no such group exists!", id), + Some(group) => group, + }; + + // Reference count *decreases* on the other hand must observe strong ordering — when + let remaining = group.ref_count.fetch_sub(1, Ordering::Release); + if !std::thread::panicking() { + assert!(remaining < usize::MAX, "group reference count overflow"); + } + if remaining > 1 { + return false; + } + + // Generate a compiler fence making sure that all other calls to `try_close` are finished + // before the one that returns `true`. + fence(Ordering::Acquire); + true + } +} + +#[derive(Debug)] +pub(crate) struct GroupInner { + parent: Option, + ref_count: AtomicUsize, +} + +impl GroupInner { + #[inline] + /// Increment the reference count of this group and return the previous value + fn increment_refcnt(&self) -> usize { + // Reference count increases don't need strong ordering. The increments can be done in + // any order as long as they *do* happen. + self.ref_count.fetch_add(1, Ordering::Relaxed) + } +} + +impl Default for GroupInner { + fn default() -> Self { + Self { + parent: None, + ref_count: AtomicUsize::new(0), + } + } +} + +impl Clear for GroupInner { + fn clear(&mut self) { + // A group is always alive as long as at least one of its children is alive. So each + // Group holds a reference to its parent if it has one. If a group is being deleted this + // reference must be closed too, i.e. the parent reference count reduced by one. + if let Some(parent) = self.parent.take() { + SupervisionRegistry::with(|reg| reg.try_close(parent)); + } + } +} diff --git a/runtime/lightproc/src/lightproc.rs b/runtime/lightproc/src/lightproc.rs index c8e2f6c..63af3cd 100644 --- a/runtime/lightproc/src/lightproc.rs +++ b/runtime/lightproc/src/lightproc.rs @@ -122,6 +122,7 @@ impl LightProc { /// future, /// schedule_function, /// Span::current(), + /// None, /// ); /// ``` pub fn build<'a, F, R, S>( diff --git a/runtime/lightproc/src/proc_data.rs b/runtime/lightproc/src/proc_data.rs index 9412459..138fc2c 100644 --- a/runtime/lightproc/src/proc_data.rs +++ b/runtime/lightproc/src/proc_data.rs @@ -13,6 +13,40 @@ use tracing::Span; /// Opaque id of the group this proc belongs to pub struct GroupId(NonZeroU64); +impl GroupId { + /// Construct an ID from an u64 + /// + /// # Panics + /// - if the provided `u64` is `0`. + pub fn from_u64(i: u64) -> Self { + Self(NonZeroU64::new(i).expect("group id must be > 0")) + } + + #[inline] + /// Construct an ID from a NonZeroU64 + /// + /// This method can't fail + pub const fn from_non_zero_u64(i: NonZeroU64) -> Self { + Self(i) + } + + #[allow(clippy::wrong_self_convention)] + //noinspection RsSelfConvention + #[inline] + /// Convert a GroupId into a u64 + pub const fn into_u64(&self) -> u64 { + self.0.get() + } + + #[allow(clippy::wrong_self_convention)] + //noinspection RsSelfConvention + #[inline] + /// Convert a GroupId into a NonZeroU64 + pub const fn into_non_zero_u64(&self) -> NonZeroU64 { + self.0 + } +} + /// The pdata of a proc. /// /// This pdata is stored right at the beginning of every heap-allocated proc.