diff --git a/Cargo.lock b/Cargo.lock index 164f0c9..4764797 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1736,6 +1736,7 @@ dependencies = [ "futures-executor", "lazy_static", "pin-utils", + "tracing", ] [[package]] diff --git a/bffhd/lib.rs b/bffhd/lib.rs index e0c9d86..4153e9b 100644 --- a/bffhd/lib.rs +++ b/bffhd/lib.rs @@ -62,6 +62,7 @@ use crate::users::db::UserDB; use crate::users::Users; use executor::pool::Executor; use signal_hook::consts::signal::*; +use tracing::Span; pub struct Diflouroborane { config: Config, @@ -70,6 +71,7 @@ pub struct Diflouroborane { pub users: Users, pub roles: Roles, pub resources: ResourcesHandle, + span: Span, } pub static RESOURCES: OnceCell = OnceCell::new(); @@ -77,11 +79,14 @@ pub static RESOURCES: OnceCell = OnceCell::new(); impl Diflouroborane { pub fn new(config: Config) -> miette::Result { let mut server = logging::init(&config.logging); + let span = tracing::info_span!( + target: "bffh", + "bffh" + ); + let span2 = span.clone(); + let _guard = span2.enter(); tracing::info!(version = env::VERSION, "Starting BFFH"); - let span = tracing::info_span!("setup"); - let _guard = span.enter(); - let executor = Executor::new(); if let Some(aggregator) = server.aggregator.take() { @@ -116,10 +121,12 @@ impl Diflouroborane { users, roles, resources, + span, }) } pub fn run(&mut self) -> miette::Result<()> { + let _guard = self.span.enter(); let mut signals = signal_hook_async_std::Signals::new(&[SIGINT, SIGQUIT, SIGTERM]) .into_diagnostic() .wrap_err("Failed to construct signal handler")?; diff --git a/runtime/console/src/lib.rs b/runtime/console/src/lib.rs index fb3ba3a..112c6d2 100644 --- a/runtime/console/src/lib.rs +++ b/runtime/console/src/lib.rs @@ -252,11 +252,11 @@ where { fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest { let dropped = match (metadata.name(), metadata.target()) { - (_, "executor::task") | ("runtime.spawn", _) => { + (_, TaskVisitor::SPAWN_TARGET) | (TaskVisitor::SPAWN_NAME, _) => { self.spawn_callsites.insert(metadata); &self.shared.dropped_tasks } - (_, "executor::waker") => { + (WakerVisitor::WAKE_TARGET, _) => { self.waker_callsites.insert(metadata); &self.shared.dropped_tasks } @@ -268,7 +268,7 @@ where self.async_op_callsites.insert(metadata); &self.shared.dropped_async_ops } - ("runtime.resource.async_op.poll", _) => { + (AsyncOpVisitor::ASYNC_OP_POLL_NAME, _) => { self.async_op_poll_callsites.insert(metadata); &self.shared.dropped_async_ops } diff --git a/runtime/console/src/server.rs b/runtime/console/src/server.rs index 14ff385..b92e8c0 100644 --- a/runtime/console/src/server.rs +++ b/runtime/console/src/server.rs @@ -97,9 +97,6 @@ impl Server { pub async fn serve( mut self, /*, incoming: I */ ) -> Result<(), Box> { - // TODO: Spawn two tasks; the aggregator that's collecting stats, aggregating and - // collating them and the server task doing the tonic gRPC stuff - let svc = InstrumentServer::new(self); tonic::transport::Server::builder() diff --git a/runtime/console/src/visitors.rs b/runtime/console/src/visitors.rs index 858703f..77398d1 100644 --- a/runtime/console/src/visitors.rs +++ b/runtime/console/src/visitors.rs @@ -240,6 +240,9 @@ impl FieldVisitor { } impl TaskVisitor { + pub(crate) const SPAWN_TARGET: &'static str = "executor::task"; + pub(crate) const SPAWN_NAME: &'static str = "runtime.spawn"; + pub(crate) fn new(meta_id: console_api::MetaId) -> Self { TaskVisitor { field_visitor: FieldVisitor::new(meta_id), @@ -340,6 +343,7 @@ impl Visit for FieldVisitor { impl AsyncOpVisitor { pub(crate) const ASYNC_OP_SPAN_NAME: &'static str = "runtime.resource.async_op"; + pub(crate) const ASYNC_OP_POLL_NAME: &'static str = "runtime.resource.async_op.poll"; const ASYNC_OP_SRC_FIELD_NAME: &'static str = "source"; pub(crate) fn result(self) -> Option<(String, bool)> { @@ -365,6 +369,8 @@ impl Visit for AsyncOpVisitor { } impl WakerVisitor { + pub(crate) const WAKE_TARGET: &'static str = "executor::waker"; + const WAKE: &'static str = "waker.wake"; const WAKE_BY_REF: &'static str = "waker.wake_by_ref"; const CLONE: &'static str = "waker.clone"; diff --git a/runtime/executor/src/pool.rs b/runtime/executor/src/pool.rs index b74e035..d6f1338 100644 --- a/runtime/executor/src/pool.rs +++ b/runtime/executor/src/pool.rs @@ -113,11 +113,11 @@ impl<'a, 'executor: 'a> Executor<'executor> { { let span = tracing::trace_span!( target: "executor::task", + parent: &self.span, "runtime.spawn", ); - let fut = future.instrument(span); - let (task, handle) = LightProc::recoverable(fut, self.schedule()); + let (task, handle) = LightProc::recoverable(future, self.schedule(), span); tracing::trace!("spawning sendable task"); task.schedule(); handle @@ -130,11 +130,11 @@ impl<'a, 'executor: 'a> Executor<'executor> { { let span = tracing::trace_span!( target: "executor::task", + parent: &self.span, "runtime.spawn", ); - let fut = future.instrument(span); - let (task, handle) = LightProc::recoverable(fut, schedule_local()); + let (task, handle) = LightProc::recoverable(future, schedule_local(), span); tracing::trace!("spawning sendable task"); task.schedule(); handle diff --git a/runtime/executor/src/worker.rs b/runtime/executor/src/worker.rs index 345afa7..413476e 100644 --- a/runtime/executor/src/worker.rs +++ b/runtime/executor/src/worker.rs @@ -2,6 +2,7 @@ use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use crossbeam_queue::SegQueue; use crossbeam_utils::sync::{Parker, Unparker}; use lightproc::prelude::LightProc; +use lightproc::raw_proc::ProcData; use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; diff --git a/runtime/lightproc/Cargo.toml b/runtime/lightproc/Cargo.toml index 52403ea..7d4be3e 100644 --- a/runtime/lightproc/Cargo.toml +++ b/runtime/lightproc/Cargo.toml @@ -15,6 +15,7 @@ crossbeam-utils = "0.8" pin-utils = "0.1.0" bitfield = "0.13.2" bitflags = "1.3.2" +tracing = "0.1" [dev-dependencies] crossbeam = "0.8" diff --git a/runtime/lightproc/src/lib.rs b/runtime/lightproc/src/lib.rs index fc325b0..2bcf9db 100644 --- a/runtime/lightproc/src/lib.rs +++ b/runtime/lightproc/src/lib.rs @@ -16,9 +16,9 @@ #![forbid(missing_docs)] #![forbid(missing_debug_implementations)] #![forbid(unused_import_braces)] -#![forbid(unused_imports)] +#![warn(unused_imports)] #![forbid(unused_must_use)] -#![forbid(unused_variables)] +//TODO: reenable #![forbid(unused_variables)] mod catch_unwind; mod layout_helpers; diff --git a/runtime/lightproc/src/lightproc.rs b/runtime/lightproc/src/lightproc.rs index c94b5fa..0900f56 100644 --- a/runtime/lightproc/src/lightproc.rs +++ b/runtime/lightproc/src/lightproc.rs @@ -36,6 +36,7 @@ use std::future::Future; use std::mem::ManuallyDrop; use std::panic::AssertUnwindSafe; use std::ptr::NonNull; +use tracing::Span; /// Shared functionality for both Send and !Send LightProc pub struct LightProc { @@ -76,14 +77,18 @@ impl LightProc { /// println!("future panicked!: {}", &reason); /// }); /// ``` - pub fn recoverable<'a, F, R, S>(future: F, schedule: S) -> (Self, RecoverableHandle) + pub fn recoverable<'a, F, R, S>( + future: F, + schedule: S, + span: Span, + ) -> (Self, RecoverableHandle) where F: Future + 'a, R: 'a, S: Fn(LightProc) + 'a, { let recovery_future = AssertUnwindSafe(future).catch_unwind(); - let (proc, handle) = Self::build(recovery_future, schedule); + let (proc, handle) = Self::build(recovery_future, schedule, span); (proc, RecoverableHandle::new(handle)) } @@ -92,6 +97,7 @@ impl LightProc { /// /// # Example /// ```rust + /// # use tracing::Span; /// # use lightproc::prelude::*; /// # /// # // ... future that does work @@ -113,15 +119,16 @@ impl LightProc { /// let standard = LightProc::build( /// future, /// schedule_function, + /// Span::current(), /// ); /// ``` - pub fn build<'a, F, R, S>(future: F, schedule: S) -> (Self, ProcHandle) + pub fn build<'a, F, R, S>(future: F, schedule: S, span: Span) -> (Self, ProcHandle) where F: Future + 'a, R: 'a, S: Fn(LightProc) + 'a, { - let raw_proc = RawProc::allocate(future, schedule); + let raw_proc = RawProc::allocate(future, schedule, span); let proc = LightProc { raw_proc }; let handle = ProcHandle::new(raw_proc); (proc, handle) diff --git a/runtime/lightproc/src/proc_data.rs b/runtime/lightproc/src/proc_data.rs index be9b956..8c215a5 100644 --- a/runtime/lightproc/src/proc_data.rs +++ b/runtime/lightproc/src/proc_data.rs @@ -5,6 +5,7 @@ use std::cell::Cell; use std::fmt::{self, Debug, Formatter}; use std::sync::atomic::Ordering; use std::task::Waker; +use tracing::Span; /// The pdata of a proc. /// @@ -25,6 +26,12 @@ pub(crate) struct ProcData { /// In addition to the actual waker virtual table, it also contains pointers to several other /// methods necessary for bookkeeping the heap-allocated proc. pub(crate) vtable: &'static ProcVTable, + + /// The span assigned to this process. + /// + /// A lightproc has a tracing span associated that allow recording occurances of vtable calls + /// for this process. + pub(crate) span: Span, } impl ProcData { diff --git a/runtime/lightproc/src/raw_proc.rs b/runtime/lightproc/src/raw_proc.rs index 4c28603..c06cf95 100644 --- a/runtime/lightproc/src/raw_proc.rs +++ b/runtime/lightproc/src/raw_proc.rs @@ -16,8 +16,10 @@ use std::ptr::NonNull; use std::sync::atomic::Ordering; use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; +use tracing::Span; /// Raw pointers to the fields of a proc. +// TODO: Make generic over the Allocator used! pub(crate) struct RawProc<'a, F, R, S> { pub(crate) pdata: *const ProcData, pub(crate) schedule: *const S, @@ -26,6 +28,10 @@ pub(crate) struct RawProc<'a, F, R, S> { // Make the lifetime 'a of the future invariant _marker: PhantomData<&'a ()>, + // TODO: We should link a proc to a process bucket for scheduling and tracing + // => nope, do that via scheduling func + // TODO: A proc should be able to be assigned a name for tracing and reporting + // This could also be implemented via storing a Span similar to `Instrumented` } impl<'a, F, R, S> RawProc<'a, F, R, S> @@ -37,7 +43,7 @@ where /// Allocates a proc with the given `future` and `schedule` function. /// /// It is assumed there are initially only the `LightProc` reference and the `ProcHandle`. - pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> { + pub(crate) fn allocate(future: F, schedule: S, span: Span) -> NonNull<()> { // Compute the layout of the proc for allocation. Abort if the computation fails. let proc_layout = Self::proc_layout(); @@ -70,6 +76,7 @@ where destroy: Self::destroy, tick: Self::tick, }, + span, }); // Write the schedule function as the third field of the proc. @@ -128,6 +135,16 @@ where /// Wakes a waker. unsafe fn wake(ptr: *const ()) { let raw = Self::from_ptr(ptr); + let _guard = (&(*raw.pdata).span).enter(); + let id = (&(*raw.pdata).span) + .id() + .map(|id| id.into_u64()) + .unwrap_or(0); + tracing::trace!( + target: "executor::waker", + op = "waker.wake", + task.id = id, + ); let mut state = (*raw.pdata).state.load(Ordering::Acquire); @@ -191,6 +208,16 @@ where /// Wakes a waker by reference. unsafe fn wake_by_ref(ptr: *const ()) { let raw = Self::from_ptr(ptr); + let _guard = (&(*raw.pdata).span).enter(); + let id = (&(*raw.pdata).span) + .id() + .map(|id| id.into_u64()) + .unwrap_or(0); + tracing::trace!( + target: "executor::waker", + op = "waker.wake_by_ref", + task.id = id, + ); let mut state = (*raw.pdata).state.load(Ordering::Acquire); @@ -250,6 +277,17 @@ where /// Clones a waker. unsafe fn clone_waker(ptr: *const ()) -> RawWaker { let raw = Self::from_ptr(ptr); + let _guard = (&(*raw.pdata).span).enter(); + let id = (&(*raw.pdata).span) + .id() + .map(|id| id.into_u64()) + .unwrap_or(0); + tracing::trace!( + target: "executor::waker", + op = "waker.clone", + task.id = id, + ); + let raw_waker = &(*raw.pdata).vtable.raw_waker; // Increment the reference count. With any kind of reference-counted data structure, @@ -271,6 +309,16 @@ where #[inline] unsafe fn decrement(ptr: *const ()) { let raw = Self::from_ptr(ptr); + let _guard = (&(*raw.pdata).span).enter(); + let id = (&(*raw.pdata).span) + .id() + .map(|id| id.into_u64()) + .unwrap_or(0); + tracing::trace!( + target: "executor::waker", + op = "waker.drop", + task.id = id, + ); // Decrement the reference count. let new = (*raw.pdata).state.fetch_sub(1, Ordering::AcqRel); @@ -323,6 +371,9 @@ where // Drop the schedule function. (raw.schedule as *mut S).drop_in_place(); + // Drop the proc data containing the associated Span + (raw.pdata as *mut ProcData).drop_in_place(); + // Finally, deallocate the memory reserved by the proc. alloc::dealloc(ptr as *mut u8, proc_layout.layout); }