Attach a GroupID to all LightProcs

This commit is contained in:
Nadja Reitzenstein 2022-06-23 17:28:13 +02:00
parent ff727b6d97
commit 7e113bab47
5 changed files with 40 additions and 9 deletions

View File

@ -13,6 +13,7 @@ use crate::worker::{Sleeper, WorkerThread};
use crossbeam_deque::{Injector, Stealer}; use crossbeam_deque::{Injector, Stealer};
use lightproc::lightproc::LightProc; use lightproc::lightproc::LightProc;
use lightproc::recoverable_handle::RecoverableHandle; use lightproc::recoverable_handle::RecoverableHandle;
use lightproc::GroupId;
use std::cell::Cell; use std::cell::Cell;
use std::future::Future; use std::future::Future;
use std::iter::Iterator; use std::iter::Iterator;
@ -112,8 +113,9 @@ impl<'a, 'executor: 'a> Executor<'executor> {
loc.col = location.column(), loc.col = location.column(),
kind = "global", kind = "global",
); );
let cgroup = None;
let (task, handle) = LightProc::recoverable(future, self.schedule(), span); let (task, handle) = LightProc::recoverable(future, self.schedule(), span, cgroup);
tracing::trace!("spawning sendable task"); tracing::trace!("spawning sendable task");
task.schedule(); task.schedule();
handle handle
@ -134,8 +136,9 @@ impl<'a, 'executor: 'a> Executor<'executor> {
loc.col = location.column(), loc.col = location.column(),
kind = "local", kind = "local",
); );
let cgroup = None;
let (task, handle) = LightProc::recoverable(future, schedule_local(), span); let (task, handle) = LightProc::recoverable(future, schedule_local(), span, cgroup);
tracing::trace!("spawning sendable task"); tracing::trace!("spawning sendable task");
task.schedule(); task.schedule();
handle handle

View File

@ -33,6 +33,8 @@ pub mod lightproc;
pub mod proc_handle; pub mod proc_handle;
pub mod recoverable_handle; pub mod recoverable_handle;
pub use proc_data::GroupId;
/// The lightproc prelude. /// The lightproc prelude.
/// ///
/// The prelude re-exports lightproc structs and handles from this crate. /// The prelude re-exports lightproc structs and handles from this crate.

View File

@ -31,6 +31,7 @@ use crate::proc_ext::ProcFutureExt;
use crate::proc_handle::ProcHandle; use crate::proc_handle::ProcHandle;
use crate::raw_proc::RawProc; use crate::raw_proc::RawProc;
use crate::recoverable_handle::RecoverableHandle; use crate::recoverable_handle::RecoverableHandle;
use crate::GroupId;
use std::fmt::{self, Debug, Formatter}; use std::fmt::{self, Debug, Formatter};
use std::future::Future; use std::future::Future;
use std::mem::ManuallyDrop; use std::mem::ManuallyDrop;
@ -81,6 +82,7 @@ impl LightProc {
future: F, future: F,
schedule: S, schedule: S,
span: Span, span: Span,
cgroup: Option<GroupId>,
) -> (Self, RecoverableHandle<R>) ) -> (Self, RecoverableHandle<R>)
where where
F: Future<Output = R> + 'a, F: Future<Output = R> + 'a,
@ -88,7 +90,7 @@ impl LightProc {
S: Fn(LightProc) + 'a, S: Fn(LightProc) + 'a,
{ {
let recovery_future = AssertUnwindSafe(future).catch_unwind(); let recovery_future = AssertUnwindSafe(future).catch_unwind();
let (proc, handle) = Self::build(recovery_future, schedule, span); let (proc, handle) = Self::build(recovery_future, schedule, span, cgroup);
(proc, RecoverableHandle::new(handle)) (proc, RecoverableHandle::new(handle))
} }
@ -122,13 +124,18 @@ impl LightProc {
/// Span::current(), /// Span::current(),
/// ); /// );
/// ``` /// ```
pub fn build<'a, F, R, S>(future: F, schedule: S, span: Span) -> (Self, ProcHandle<R>) pub fn build<'a, F, R, S>(
future: F,
schedule: S,
span: Span,
cgroup: Option<GroupId>,
) -> (Self, ProcHandle<R>)
where where
F: Future<Output = R> + 'a, F: Future<Output = R> + 'a,
R: 'a, R: 'a,
S: Fn(LightProc) + 'a, S: Fn(LightProc) + 'a,
{ {
let raw_proc = RawProc::allocate(future, schedule, span); let raw_proc = RawProc::allocate(future, schedule, span, cgroup);
let proc = LightProc { raw_proc }; let proc = LightProc { raw_proc };
let handle = ProcHandle::new(raw_proc); let handle = ProcHandle::new(raw_proc);
(proc, handle) (proc, handle)

View File

@ -3,10 +3,16 @@ use crate::state::*;
use crossbeam_utils::Backoff; use crossbeam_utils::Backoff;
use std::cell::Cell; use std::cell::Cell;
use std::fmt::{self, Debug, Formatter}; use std::fmt::{self, Debug, Formatter};
use std::num::NonZeroU64;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::task::Waker; use std::task::Waker;
use tracing::Span; use tracing::Span;
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
#[repr(transparent)]
/// Opaque id of the group this proc belongs to
pub struct GroupId(NonZeroU64);
/// The pdata of a proc. /// The pdata of a proc.
/// ///
/// This pdata is stored right at the beginning of every heap-allocated proc. /// This pdata is stored right at the beginning of every heap-allocated proc.
@ -32,6 +38,11 @@ pub(crate) struct ProcData {
/// A lightproc has a tracing span associated that allow recording occurances of vtable calls /// A lightproc has a tracing span associated that allow recording occurances of vtable calls
/// for this process. /// for this process.
pub(crate) span: Span, pub(crate) span: Span,
/// Control group assigned to this process.
///
/// The control group links this process to its supervision tree
pub(crate) cgroup: Option<GroupId>,
} }
impl ProcData { impl ProcData {

View File

@ -15,6 +15,7 @@ use std::pin::Pin;
use std::ptr::NonNull; use std::ptr::NonNull;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use crate::GroupId;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use tracing::Span; use tracing::Span;
@ -49,7 +50,12 @@ where
/// Allocates a proc with the given `future` and `schedule` function. /// Allocates a proc with the given `future` and `schedule` function.
/// ///
/// It is assumed there are initially only the `LightProc` reference and the `ProcHandle`. /// It is assumed there are initially only the `LightProc` reference and the `ProcHandle`.
pub(crate) fn allocate(future: F, schedule: S, span: Span) -> NonNull<()> { pub(crate) fn allocate(
future: F,
schedule: S,
span: Span,
cgroup: Option<GroupId>,
) -> NonNull<()> {
// Compute the layout of the proc for allocation. Abort if the computation fails. // Compute the layout of the proc for allocation. Abort if the computation fails.
let proc_layout = Self::proc_layout(); let proc_layout = Self::proc_layout();
@ -83,6 +89,7 @@ where
tick: Self::tick, tick: Self::tick,
}, },
span, span,
cgroup,
}); });
// Write the schedule function as the third field of the proc. // Write the schedule function as the third field of the proc.
@ -360,10 +367,11 @@ where
raw.output as *const () raw.output as *const ()
} }
/// Cleans up proc's resources and deallocates it. /// Cleans up the procs resources and deallocates the associated memory.
/// ///
/// If the proc has not been closed, then its future or the output will be dropped. The /// The future or output stored will *not* be dropped, but its memory will be freed. Callers
/// schedule function gets dropped too. /// must ensure that they are correctly dropped beforehand if either of those is still alive to
/// prevent use-after-free.
#[inline] #[inline]
unsafe fn destroy(ptr: *const ()) { unsafe fn destroy(ptr: *const ()) {
let raw = Self::from_ptr(ptr); let raw = Self::from_ptr(ptr);