diff --git a/runtime/executor/src/pool.rs b/runtime/executor/src/pool.rs index 4f288d6..e5038ba 100644 --- a/runtime/executor/src/pool.rs +++ b/runtime/executor/src/pool.rs @@ -13,6 +13,7 @@ use crate::worker::{Sleeper, WorkerThread}; use crossbeam_deque::{Injector, Stealer}; use lightproc::lightproc::LightProc; use lightproc::recoverable_handle::RecoverableHandle; +use lightproc::GroupId; use std::cell::Cell; use std::future::Future; use std::iter::Iterator; @@ -112,8 +113,9 @@ impl<'a, 'executor: 'a> Executor<'executor> { loc.col = location.column(), kind = "global", ); + let cgroup = None; - let (task, handle) = LightProc::recoverable(future, self.schedule(), span); + let (task, handle) = LightProc::recoverable(future, self.schedule(), span, cgroup); tracing::trace!("spawning sendable task"); task.schedule(); handle @@ -134,8 +136,9 @@ impl<'a, 'executor: 'a> Executor<'executor> { loc.col = location.column(), kind = "local", ); + let cgroup = None; - let (task, handle) = LightProc::recoverable(future, schedule_local(), span); + let (task, handle) = LightProc::recoverable(future, schedule_local(), span, cgroup); tracing::trace!("spawning sendable task"); task.schedule(); handle diff --git a/runtime/lightproc/src/lib.rs b/runtime/lightproc/src/lib.rs index 2bcf9db..075e0f4 100644 --- a/runtime/lightproc/src/lib.rs +++ b/runtime/lightproc/src/lib.rs @@ -33,6 +33,8 @@ pub mod lightproc; pub mod proc_handle; pub mod recoverable_handle; +pub use proc_data::GroupId; + /// The lightproc prelude. /// /// The prelude re-exports lightproc structs and handles from this crate. diff --git a/runtime/lightproc/src/lightproc.rs b/runtime/lightproc/src/lightproc.rs index b527c54..c8e2f6c 100644 --- a/runtime/lightproc/src/lightproc.rs +++ b/runtime/lightproc/src/lightproc.rs @@ -31,6 +31,7 @@ use crate::proc_ext::ProcFutureExt; use crate::proc_handle::ProcHandle; use crate::raw_proc::RawProc; use crate::recoverable_handle::RecoverableHandle; +use crate::GroupId; use std::fmt::{self, Debug, Formatter}; use std::future::Future; use std::mem::ManuallyDrop; @@ -81,6 +82,7 @@ impl LightProc { future: F, schedule: S, span: Span, + cgroup: Option, ) -> (Self, RecoverableHandle) where F: Future + 'a, @@ -88,7 +90,7 @@ impl LightProc { S: Fn(LightProc) + 'a, { let recovery_future = AssertUnwindSafe(future).catch_unwind(); - let (proc, handle) = Self::build(recovery_future, schedule, span); + let (proc, handle) = Self::build(recovery_future, schedule, span, cgroup); (proc, RecoverableHandle::new(handle)) } @@ -122,13 +124,18 @@ impl LightProc { /// Span::current(), /// ); /// ``` - pub fn build<'a, F, R, S>(future: F, schedule: S, span: Span) -> (Self, ProcHandle) + pub fn build<'a, F, R, S>( + future: F, + schedule: S, + span: Span, + cgroup: Option, + ) -> (Self, ProcHandle) where F: Future + 'a, R: 'a, S: Fn(LightProc) + 'a, { - let raw_proc = RawProc::allocate(future, schedule, span); + let raw_proc = RawProc::allocate(future, schedule, span, cgroup); let proc = LightProc { raw_proc }; let handle = ProcHandle::new(raw_proc); (proc, handle) diff --git a/runtime/lightproc/src/proc_data.rs b/runtime/lightproc/src/proc_data.rs index 8c215a5..9412459 100644 --- a/runtime/lightproc/src/proc_data.rs +++ b/runtime/lightproc/src/proc_data.rs @@ -3,10 +3,16 @@ use crate::state::*; use crossbeam_utils::Backoff; use std::cell::Cell; use std::fmt::{self, Debug, Formatter}; +use std::num::NonZeroU64; use std::sync::atomic::Ordering; use std::task::Waker; use tracing::Span; +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +#[repr(transparent)] +/// Opaque id of the group this proc belongs to +pub struct GroupId(NonZeroU64); + /// The pdata of a proc. /// /// This pdata is stored right at the beginning of every heap-allocated proc. @@ -32,6 +38,11 @@ pub(crate) struct ProcData { /// A lightproc has a tracing span associated that allow recording occurances of vtable calls /// for this process. pub(crate) span: Span, + + /// Control group assigned to this process. + /// + /// The control group links this process to its supervision tree + pub(crate) cgroup: Option, } impl ProcData { diff --git a/runtime/lightproc/src/raw_proc.rs b/runtime/lightproc/src/raw_proc.rs index 4cb7041..1481fb0 100644 --- a/runtime/lightproc/src/raw_proc.rs +++ b/runtime/lightproc/src/raw_proc.rs @@ -15,6 +15,7 @@ use std::pin::Pin; use std::ptr::NonNull; use std::sync::atomic::Ordering; +use crate::GroupId; use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; use tracing::Span; @@ -49,7 +50,12 @@ where /// Allocates a proc with the given `future` and `schedule` function. /// /// It is assumed there are initially only the `LightProc` reference and the `ProcHandle`. - pub(crate) fn allocate(future: F, schedule: S, span: Span) -> NonNull<()> { + pub(crate) fn allocate( + future: F, + schedule: S, + span: Span, + cgroup: Option, + ) -> NonNull<()> { // Compute the layout of the proc for allocation. Abort if the computation fails. let proc_layout = Self::proc_layout(); @@ -83,6 +89,7 @@ where tick: Self::tick, }, span, + cgroup, }); // Write the schedule function as the third field of the proc. @@ -360,10 +367,11 @@ where raw.output as *const () } - /// Cleans up proc's resources and deallocates it. + /// Cleans up the procs resources and deallocates the associated memory. /// - /// If the proc has not been closed, then its future or the output will be dropped. The - /// schedule function gets dropped too. + /// The future or output stored will *not* be dropped, but its memory will be freed. Callers + /// must ensure that they are correctly dropped beforehand if either of those is still alive to + /// prevent use-after-free. #[inline] unsafe fn destroy(ptr: *const ()) { let raw = Self::from_ptr(ptr);