Even more console shenanigans

This commit is contained in:
Nadja Reitzenstein 2022-06-22 19:01:51 +02:00
parent 2d8d6f9938
commit 18d69063fd
6 changed files with 46 additions and 15 deletions

View File

@ -92,7 +92,22 @@ impl Diflouroborane {
if let Some(aggregator) = server.aggregator.take() {
executor.spawn(aggregator.run());
}
executor.spawn(server.serve());
tracing::info!("Server is being spawned");
let handle = executor.spawn(server.serve());
std::thread::spawn(move || {
let result = async_io::block_on(handle);
match result {
Some(Ok(())) => {
tracing::info!("console server finished without error");
}
Some(Err(error)) => {
tracing::info!(%error, "console server finished with error");
}
None => {
tracing::info!("console server finished with panic");
}
}
});
let env = StateDB::open_env(&config.db_path)?;

View File

@ -335,13 +335,13 @@ impl Aggregator {
async_op_update,
};
//self.watchers.retain_and_shrink(|watch: &Watch<instrument::Update>| watch.update
// (&update));
self.watchers
.retain(|watch: &Watch<instrument::Update>| watch.update(&update));
let stats = &self.task_stats;
// Assuming there are much fewer task details subscribers than there are
// stats updates, iterate over `details_watchers` and compact the map.
/*self.details_watchers.retain_and_shrink(|id, watchers| {
self.details_watchers.retain(|id, watchers| {
if let Some(task_stats) = stats.get(id) {
let details = tasks::TaskDetails {
task_id: Some(id.clone().into()),
@ -354,8 +354,6 @@ impl Aggregator {
false
}
});
*/
}
/// Update the current state with data from a single event.

View File

@ -256,7 +256,7 @@ where
self.spawn_callsites.insert(metadata);
&self.shared.dropped_tasks
}
(WakerVisitor::WAKE_TARGET, _) => {
(_, WakerVisitor::WAKE_TARGET) => {
self.waker_callsites.insert(metadata);
&self.shared.dropped_tasks
}
@ -311,7 +311,9 @@ where
(event, stats)
}) {
ctx.span(id)
.expect("`on_new_span` called with nonexistent span. This is a tracing bug.");
.expect("`on_new_span` called with nonexistent span. This is a tracing bug.")
.extensions_mut()
.insert(stats);
}
} else if self.is_resource(metadata) {
let at = Instant::now();
@ -346,7 +348,10 @@ where
};
(event, stats)
}) {
ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
ctx.span(id)
.expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!")
.extensions_mut()
.insert(stats);
}
}
} else if self.is_async_op(metadata) {
@ -381,7 +386,10 @@ where
(event, stats)
})
{
ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
ctx.span(id)
.expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!")
.extensions_mut()
.insert(stats);
}
}
}

View File

@ -209,7 +209,7 @@ impl TaskStats {
polls: AtomicUsize::new(0),
},
wakes: AtomicUsize::new(0),
waker_clones: AtomicUsize::new(0),
waker_clones: AtomicUsize::new(1),
waker_drops: AtomicUsize::new(0),
self_wakes: AtomicUsize::new(0),
}
@ -232,7 +232,6 @@ impl TaskStats {
use event::WakeOp;
match op {
WakeOp::Wake { self_wake } => {
self.waker_drops.fetch_add(1, Ordering::Release);
self.wake(at, self_wake);
}
WakeOp::WakeByRef { self_wake } => {

View File

@ -106,15 +106,21 @@ impl<'a, 'executor: 'a> Executor<'executor> {
/// );
/// # }
/// ```
#[track_caller]
pub fn spawn<F, R>(&self, future: F) -> RecoverableHandle<R>
where
F: Future<Output = R> + Send + 'a,
R: Send + 'a,
{
let location = std::panic::Location::caller();
let span = tracing::trace_span!(
target: "executor::task",
parent: &self.span,
parent: Span::current(),
"runtime.spawn",
kind = "global",
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
);
let (task, handle) = LightProc::recoverable(future, self.schedule(), span);
@ -123,15 +129,21 @@ impl<'a, 'executor: 'a> Executor<'executor> {
handle
}
#[track_caller]
pub fn spawn_local<F, R>(&self, future: F) -> RecoverableHandle<R>
where
F: Future<Output = R> + 'a,
R: Send + 'a,
{
let location = std::panic::Location::caller();
let span = tracing::trace_span!(
target: "executor::task",
parent: &self.span,
parent: Span::current(),
"runtime.spawn",
kind = "local",
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
);
let (task, handle) = LightProc::recoverable(future, schedule_local(), span);

View File

@ -2,7 +2,6 @@ use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use crossbeam_queue::SegQueue;
use crossbeam_utils::sync::{Parker, Unparker};
use lightproc::prelude::LightProc;
use lightproc::raw_proc::ProcData;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;