From 18d69063fd6fb7b856b91ff58188f213704ef324 Mon Sep 17 00:00:00 2001 From: Nadja Reitzenstein Date: Wed, 22 Jun 2022 19:01:51 +0200 Subject: [PATCH] Even more console shenanigans --- bffhd/lib.rs | 17 ++++++++++++++++- runtime/console/src/aggregate.rs | 8 +++----- runtime/console/src/lib.rs | 16 ++++++++++++---- runtime/console/src/stats.rs | 3 +-- runtime/executor/src/pool.rs | 16 ++++++++++++++-- runtime/executor/src/worker.rs | 1 - 6 files changed, 46 insertions(+), 15 deletions(-) diff --git a/bffhd/lib.rs b/bffhd/lib.rs index 4153e9b..777b81e 100644 --- a/bffhd/lib.rs +++ b/bffhd/lib.rs @@ -92,7 +92,22 @@ impl Diflouroborane { if let Some(aggregator) = server.aggregator.take() { executor.spawn(aggregator.run()); } - executor.spawn(server.serve()); + tracing::info!("Server is being spawned"); + let handle = executor.spawn(server.serve()); + std::thread::spawn(move || { + let result = async_io::block_on(handle); + match result { + Some(Ok(())) => { + tracing::info!("console server finished without error"); + } + Some(Err(error)) => { + tracing::info!(%error, "console server finished with error"); + } + None => { + tracing::info!("console server finished with panic"); + } + } + }); let env = StateDB::open_env(&config.db_path)?; diff --git a/runtime/console/src/aggregate.rs b/runtime/console/src/aggregate.rs index 1acf6d2..4029a1f 100644 --- a/runtime/console/src/aggregate.rs +++ b/runtime/console/src/aggregate.rs @@ -335,13 +335,13 @@ impl Aggregator { async_op_update, }; - //self.watchers.retain_and_shrink(|watch: &Watch| watch.update - // (&update)); + self.watchers + .retain(|watch: &Watch| watch.update(&update)); let stats = &self.task_stats; // Assuming there are much fewer task details subscribers than there are // stats updates, iterate over `details_watchers` and compact the map. - /*self.details_watchers.retain_and_shrink(|id, watchers| { + self.details_watchers.retain(|id, watchers| { if let Some(task_stats) = stats.get(id) { let details = tasks::TaskDetails { task_id: Some(id.clone().into()), @@ -354,8 +354,6 @@ impl Aggregator { false } }); - - */ } /// Update the current state with data from a single event. diff --git a/runtime/console/src/lib.rs b/runtime/console/src/lib.rs index 112c6d2..e5c7cc7 100644 --- a/runtime/console/src/lib.rs +++ b/runtime/console/src/lib.rs @@ -256,7 +256,7 @@ where self.spawn_callsites.insert(metadata); &self.shared.dropped_tasks } - (WakerVisitor::WAKE_TARGET, _) => { + (_, WakerVisitor::WAKE_TARGET) => { self.waker_callsites.insert(metadata); &self.shared.dropped_tasks } @@ -311,7 +311,9 @@ where (event, stats) }) { ctx.span(id) - .expect("`on_new_span` called with nonexistent span. This is a tracing bug."); + .expect("`on_new_span` called with nonexistent span. This is a tracing bug.") + .extensions_mut() + .insert(stats); } } else if self.is_resource(metadata) { let at = Instant::now(); @@ -346,7 +348,10 @@ where }; (event, stats) }) { - ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats); + ctx.span(id) + .expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!") + .extensions_mut() + .insert(stats); } } } else if self.is_async_op(metadata) { @@ -381,7 +386,10 @@ where (event, stats) }) { - ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats); + ctx.span(id) + .expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!") + .extensions_mut() + .insert(stats); } } } diff --git a/runtime/console/src/stats.rs b/runtime/console/src/stats.rs index 9018d8f..021c8da 100644 --- a/runtime/console/src/stats.rs +++ b/runtime/console/src/stats.rs @@ -209,7 +209,7 @@ impl TaskStats { polls: AtomicUsize::new(0), }, wakes: AtomicUsize::new(0), - waker_clones: AtomicUsize::new(0), + waker_clones: AtomicUsize::new(1), waker_drops: AtomicUsize::new(0), self_wakes: AtomicUsize::new(0), } @@ -232,7 +232,6 @@ impl TaskStats { use event::WakeOp; match op { WakeOp::Wake { self_wake } => { - self.waker_drops.fetch_add(1, Ordering::Release); self.wake(at, self_wake); } WakeOp::WakeByRef { self_wake } => { diff --git a/runtime/executor/src/pool.rs b/runtime/executor/src/pool.rs index d6f1338..4c91363 100644 --- a/runtime/executor/src/pool.rs +++ b/runtime/executor/src/pool.rs @@ -106,15 +106,21 @@ impl<'a, 'executor: 'a> Executor<'executor> { /// ); /// # } /// ``` + #[track_caller] pub fn spawn(&self, future: F) -> RecoverableHandle where F: Future + Send + 'a, R: Send + 'a, { + let location = std::panic::Location::caller(); let span = tracing::trace_span!( target: "executor::task", - parent: &self.span, + parent: Span::current(), "runtime.spawn", + kind = "global", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), ); let (task, handle) = LightProc::recoverable(future, self.schedule(), span); @@ -123,15 +129,21 @@ impl<'a, 'executor: 'a> Executor<'executor> { handle } + #[track_caller] pub fn spawn_local(&self, future: F) -> RecoverableHandle where F: Future + 'a, R: Send + 'a, { + let location = std::panic::Location::caller(); let span = tracing::trace_span!( target: "executor::task", - parent: &self.span, + parent: Span::current(), "runtime.spawn", + kind = "local", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), ); let (task, handle) = LightProc::recoverable(future, schedule_local(), span); diff --git a/runtime/executor/src/worker.rs b/runtime/executor/src/worker.rs index 413476e..345afa7 100644 --- a/runtime/executor/src/worker.rs +++ b/runtime/executor/src/worker.rs @@ -2,7 +2,6 @@ use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use crossbeam_queue::SegQueue; use crossbeam_utils::sync::{Parker, Unparker}; use lightproc::prelude::LightProc; -use lightproc::raw_proc::ProcData; use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration;