Allow tracking cgroups with futures

This commit is contained in:
Nadja Reitzenstein 2022-06-23 21:19:31 +02:00
parent e7358838d5
commit 77e0935945
8 changed files with 261 additions and 6 deletions

2
Cargo.lock generated
View File

@ -1128,6 +1128,8 @@ dependencies = [
"parking_lot", "parking_lot",
"pin-utils", "pin-utils",
"rand", "rand",
"sharded-slab",
"thread_local",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
] ]

View File

@ -3,7 +3,7 @@ use async_net::TcpListener;
use capnp_rpc::rpc_twoparty_capnp::Side; use capnp_rpc::rpc_twoparty_capnp::Side;
use capnp_rpc::twoparty::VatNetwork; use capnp_rpc::twoparty::VatNetwork;
use capnp_rpc::RpcSystem; use capnp_rpc::RpcSystem;
use executor::prelude::Executor; use executor::prelude::{Executor, GroupId, SupervisionRegistry};
use futures_rustls::server::TlsStream; use futures_rustls::server::TlsStream;
use futures_rustls::TlsAcceptor; use futures_rustls::TlsAcceptor;
use futures_util::stream::FuturesUnordered; use futures_util::stream::FuturesUnordered;
@ -167,6 +167,7 @@ impl APIServer {
tracing::error!("Error during RPC handling: {}", e); tracing::error!("Error during RPC handling: {}", e);
} }
}; };
self.executor.spawn_local(f); let cgroup = SupervisionRegistry::with(SupervisionRegistry::new_group);
self.executor.spawn_local_cgroup(f, cgroup);
} }
} }

View File

@ -51,6 +51,10 @@ hdrhistogram = "7.5"
# Stats & Tracing # Stats & Tracing
tracing = "0.1" tracing = "0.1"
# Supervision trees
sharded-slab = "0.1"
thread_local = "1.1"
[dev-dependencies] [dev-dependencies]
async-std = "1.10.0" async-std = "1.10.0"
tracing = { version = "0.1.19", features = ["max_level_trace"]} tracing = { version = "0.1.19", features = ["max_level_trace"]}

View File

@ -40,4 +40,6 @@ mod worker;
/// Prelude of Bastion Executor /// Prelude of Bastion Executor
pub mod prelude { pub mod prelude {
pub use crate::pool::*; pub use crate::pool::*;
pub use crate::supervision::SupervisionRegistry;
pub use lightproc::GroupId;
} }

View File

@ -8,6 +8,7 @@
//! [`Worker`]: crate::run_queue::Worker //! [`Worker`]: crate::run_queue::Worker
use crate::run::block; use crate::run::block;
use crate::supervision::SupervisionRegistry;
use crate::thread_manager::{DynamicRunner, ThreadManager}; use crate::thread_manager::{DynamicRunner, ThreadManager};
use crate::worker::{Sleeper, WorkerThread}; use crate::worker::{Sleeper, WorkerThread};
use crossbeam_deque::{Injector, Stealer}; use crossbeam_deque::{Injector, Stealer};
@ -49,12 +50,19 @@ impl Spooler<'_> {
/// Global executor /// Global executor
pub struct Executor<'a> { pub struct Executor<'a> {
spooler: Arc<Spooler<'a>>, spooler: Arc<Spooler<'a>>,
root_cgroup: GroupId,
} }
impl<'a, 'executor: 'a> Executor<'executor> { impl<'a, 'executor: 'a> Executor<'executor> {
pub fn new() -> Self { pub fn new() -> Self {
let root_cgroup = SupervisionRegistry::with(|registry| {
let cgroup = registry.new_root_group();
registry.set_current(&cgroup);
cgroup
});
Executor { Executor {
spooler: Arc::new(Spooler::new()), spooler: Arc::new(Spooler::new()),
root_cgroup,
} }
} }
@ -105,6 +113,8 @@ impl<'a, 'executor: 'a> Executor<'executor> {
R: Send + 'a, R: Send + 'a,
{ {
let location = std::panic::Location::caller(); let location = std::panic::Location::caller();
let cgroup = SupervisionRegistry::current();
let id = cgroup.as_ref().map(|id| id.into_u64()).unwrap_or(0);
let span = tracing::trace_span!( let span = tracing::trace_span!(
target: "executor::task", target: "executor::task",
"runtime.spawn", "runtime.spawn",
@ -112,8 +122,8 @@ impl<'a, 'executor: 'a> Executor<'executor> {
loc.line = location.line(), loc.line = location.line(),
loc.col = location.column(), loc.col = location.column(),
kind = "global", kind = "global",
cgroup = id,
); );
let cgroup = None;
let (task, handle) = LightProc::recoverable(future, self.schedule(), span, cgroup); let (task, handle) = LightProc::recoverable(future, self.schedule(), span, cgroup);
tracing::trace!("spawning sendable task"); tracing::trace!("spawning sendable task");
@ -128,6 +138,8 @@ impl<'a, 'executor: 'a> Executor<'executor> {
R: Send + 'a, R: Send + 'a,
{ {
let location = std::panic::Location::caller(); let location = std::panic::Location::caller();
let cgroup = SupervisionRegistry::current();
let id = cgroup.as_ref().map(|id| id.into_u64()).unwrap_or(0);
let span = tracing::trace_span!( let span = tracing::trace_span!(
target: "executor::task", target: "executor::task",
"runtime.spawn", "runtime.spawn",
@ -135,8 +147,8 @@ impl<'a, 'executor: 'a> Executor<'executor> {
loc.line = location.line(), loc.line = location.line(),
loc.col = location.column(), loc.col = location.column(),
kind = "local", kind = "local",
cgroup = id,
); );
let cgroup = None;
let (task, handle) = LightProc::recoverable(future, schedule_local(), span, cgroup); let (task, handle) = LightProc::recoverable(future, schedule_local(), span, cgroup);
tracing::trace!("spawning sendable task"); tracing::trace!("spawning sendable task");
@ -144,6 +156,29 @@ impl<'a, 'executor: 'a> Executor<'executor> {
handle handle
} }
#[track_caller]
pub fn spawn_local_cgroup<F, R>(&self, future: F, cgroup: GroupId) -> RecoverableHandle<R>
where
F: Future<Output = R> + 'a,
R: Send + 'a,
{
let location = std::panic::Location::caller();
let span = tracing::trace_span!(
target: "executor::task",
"runtime.spawn",
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
kind = "local",
cgroup = cgroup.into_u64(),
);
let (task, handle) = LightProc::recoverable(future, schedule_local(), span, Some(cgroup));
tracing::trace!("spawning sendable task");
task.schedule();
handle
}
/// Block the calling thread until the given future completes. /// Block the calling thread until the given future completes.
/// ///
/// # Example /// # Example

View File

@ -1,3 +1,179 @@
pub(crate) struct SupervisionRegistry {} use lightproc::GroupId;
use once_cell::sync::OnceCell;
use sharded_slab::pool::Ref;
use sharded_slab::{Clear, Pool};
use std::borrow::Borrow;
use std::cell;
use std::cell::RefCell;
use std::sync::atomic::{fence, AtomicUsize, Ordering};
use thread_local::ThreadLocal;
impl SupervisionRegistry {} static REGISTRY: OnceCell<SupervisionRegistry> = OnceCell::new();
fn id_to_idx(id: &GroupId) -> usize {
(id.into_u64() as usize).wrapping_sub(1)
}
fn idx_to_id(idx: usize) -> GroupId {
GroupId::from_u64(idx.wrapping_add(1) as u64)
}
pub struct SupervisionRegistry {
groups: Pool<GroupInner>,
// TODO: would this be better as the full stack?
current: ThreadLocal<RefCell<GroupId>>,
}
impl SupervisionRegistry {
fn new() -> Self {
Self {
groups: Pool::new(),
current: ThreadLocal::new(),
}
}
pub fn with<T>(f: impl FnOnce(&Self) -> T) -> T {
let this = REGISTRY.get_or_init(SupervisionRegistry::new);
f(&this)
}
pub(crate) fn get(&self, id: &GroupId) -> Option<Ref<'_, GroupInner>> {
self.groups.get(id_to_idx(id))
}
#[inline]
pub fn current_ref(&self) -> Option<cell::Ref<GroupId>> {
self.current.get().map(|c| c.borrow())
}
pub fn current() -> Option<GroupId> {
Self::with(|this| this.current_ref().map(|id| this.clone_group(&id)))
}
pub(crate) fn set_current(&self, id: &GroupId) {
self.current.get_or(|| RefCell::new(id.clone()));
}
pub fn new_root_group(&self) -> GroupId {
self.new_group_inner(None)
}
pub fn new_group(&self) -> GroupId {
let parent = self.current_ref().map(|id| self.clone_group(&id));
self.new_group_inner(parent)
}
fn new_group_inner(&self, parent: Option<GroupId>) -> GroupId {
tracing::trace_span!(
target: "executor::supervision",
"new_group"
);
let parent_id = parent.as_ref().map(|id| id.into_non_zero_u64());
let idx = self
.groups
.create_with(|group| {
group.parent = parent;
let ref_cnt = group.ref_count.get_mut();
debug_assert_eq!(0, *ref_cnt);
*ref_cnt = 1;
})
.expect("Failed to allocate a new group");
let id = idx_to_id(idx);
tracing::trace!(
target: "executor::supervision",
parent = parent_id,
id = id.into_non_zero_u64(),
"process group created"
);
id
}
fn clone_group(&self, id: &GroupId) -> GroupId {
tracing::trace!(
target: "executor::supervision",
id = id.into_u64(),
"cloning process group"
);
let group = self
.get(&id)
.unwrap_or_else(|| panic!("tried to clone group {:?}, but no such group exists!", id));
let ref_cnt = group.ref_count.fetch_add(1, Ordering::Relaxed);
assert_ne!(
0, ref_cnt,
"tried cloning group {:?} that was already closed",
id
);
id.clone()
}
/// Try to close the group with the given ID
///
/// If this method returns `true` the Group was closed. Otherwise there are still references
/// left open.
fn try_close(&self, id: GroupId) -> bool {
tracing::trace!(
target: "executor::supervision",
id = id.into_u64(),
"dropping process group"
);
let group = match self.get(&id) {
None if std::thread::panicking() => return false,
None => panic!("tried to drop a ref to {:?}, but no such group exists!", id),
Some(group) => group,
};
// Reference count *decreases* on the other hand must observe strong ordering — when
let remaining = group.ref_count.fetch_sub(1, Ordering::Release);
if !std::thread::panicking() {
assert!(remaining < usize::MAX, "group reference count overflow");
}
if remaining > 1 {
return false;
}
// Generate a compiler fence making sure that all other calls to `try_close` are finished
// before the one that returns `true`.
fence(Ordering::Acquire);
true
}
}
#[derive(Debug)]
pub(crate) struct GroupInner {
parent: Option<GroupId>,
ref_count: AtomicUsize,
}
impl GroupInner {
#[inline]
/// Increment the reference count of this group and return the previous value
fn increment_refcnt(&self) -> usize {
// Reference count increases don't need strong ordering. The increments can be done in
// any order as long as they *do* happen.
self.ref_count.fetch_add(1, Ordering::Relaxed)
}
}
impl Default for GroupInner {
fn default() -> Self {
Self {
parent: None,
ref_count: AtomicUsize::new(0),
}
}
}
impl Clear for GroupInner {
fn clear(&mut self) {
// A group is always alive as long as at least one of its children is alive. So each
// Group holds a reference to its parent if it has one. If a group is being deleted this
// reference must be closed too, i.e. the parent reference count reduced by one.
if let Some(parent) = self.parent.take() {
SupervisionRegistry::with(|reg| reg.try_close(parent));
}
}
}

View File

@ -122,6 +122,7 @@ impl LightProc {
/// future, /// future,
/// schedule_function, /// schedule_function,
/// Span::current(), /// Span::current(),
/// None,
/// ); /// );
/// ``` /// ```
pub fn build<'a, F, R, S>( pub fn build<'a, F, R, S>(

View File

@ -13,6 +13,40 @@ use tracing::Span;
/// Opaque id of the group this proc belongs to /// Opaque id of the group this proc belongs to
pub struct GroupId(NonZeroU64); pub struct GroupId(NonZeroU64);
impl GroupId {
/// Construct an ID from an u64
///
/// # Panics
/// - if the provided `u64` is `0`.
pub fn from_u64(i: u64) -> Self {
Self(NonZeroU64::new(i).expect("group id must be > 0"))
}
#[inline]
/// Construct an ID from a NonZeroU64
///
/// This method can't fail
pub const fn from_non_zero_u64(i: NonZeroU64) -> Self {
Self(i)
}
#[allow(clippy::wrong_self_convention)]
//noinspection RsSelfConvention
#[inline]
/// Convert a GroupId into a u64
pub const fn into_u64(&self) -> u64 {
self.0.get()
}
#[allow(clippy::wrong_self_convention)]
//noinspection RsSelfConvention
#[inline]
/// Convert a GroupId into a NonZeroU64
pub const fn into_non_zero_u64(&self) -> NonZeroU64 {
self.0
}
}
/// The pdata of a proc. /// The pdata of a proc.
/// ///
/// This pdata is stored right at the beginning of every heap-allocated proc. /// This pdata is stored right at the beginning of every heap-allocated proc.