mirror of
https://gitlab.com/fabinfra/fabaccess/bffh.git
synced 2024-11-22 06:47:56 +01:00
Console is attached and compiles
This commit is contained in:
parent
35c9f45f6d
commit
287ca9806d
538
Cargo.lock
generated
538
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -49,6 +49,7 @@ dirs = "4.0.0"
|
||||
|
||||
# Runtime
|
||||
executor = { path = "runtime/executor" }
|
||||
console = { path = "runtime/console" }
|
||||
|
||||
# Catch&Handle POSIX process signals
|
||||
signal-hook = "0.3.13"
|
||||
@ -72,9 +73,9 @@ rust-argon2 = "0.8.3"
|
||||
rand = "0.8.4"
|
||||
|
||||
# Async aware logging and tracing
|
||||
tracing = "0.1.28"
|
||||
tracing-subscriber = { version = "0.2.25", features = ["env-filter"] }
|
||||
tracing-futures = { version = "0.2.5", features = ["futures-03"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "registry", "std"] }
|
||||
tracing-futures = { version = "0.2", features = ["futures-03"] }
|
||||
|
||||
# API
|
||||
api = { path = "api" }
|
||||
|
@ -76,7 +76,7 @@ pub static RESOURCES: OnceCell<ResourcesHandle> = OnceCell::new();
|
||||
|
||||
impl Diflouroborane {
|
||||
pub fn new(config: Config) -> miette::Result<Self> {
|
||||
logging::init(&config.logging);
|
||||
let mut server = logging::init(&config.logging);
|
||||
tracing::info!(version = env::VERSION, "Starting BFFH");
|
||||
|
||||
let span = tracing::info_span!("setup");
|
||||
@ -84,6 +84,11 @@ impl Diflouroborane {
|
||||
|
||||
let executor = Executor::new();
|
||||
|
||||
if let Some(aggregator) = server.aggregator.take() {
|
||||
executor.spawn(aggregator.run());
|
||||
}
|
||||
executor.spawn(server.serve());
|
||||
|
||||
let env = StateDB::open_env(&config.db_path)?;
|
||||
|
||||
let statedb = StateDB::create_with_env(env.clone())?;
|
||||
|
@ -1,6 +1,7 @@
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct LogConfig {
|
||||
@ -24,21 +25,25 @@ impl Default for LogConfig {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init(config: &LogConfig) {
|
||||
pub fn init(config: &LogConfig) -> console::Server {
|
||||
let (console, server) = console::ConsoleLayer::new();
|
||||
|
||||
let filter = if let Some(ref filter) = config.filter {
|
||||
EnvFilter::new(filter.as_str())
|
||||
} else {
|
||||
EnvFilter::from_env("BFFH_LOG")
|
||||
};
|
||||
|
||||
let builder = tracing_subscriber::fmt().with_env_filter(filter);
|
||||
let format = &config.format;
|
||||
// TODO: Restore output format settings being settable
|
||||
let fmt_layer = tracing_subscriber::fmt::layer().with_filter(filter);
|
||||
|
||||
let format = config.format.to_lowercase();
|
||||
match format.as_str() {
|
||||
"compact" => builder.compact().init(),
|
||||
"pretty" => builder.pretty().init(),
|
||||
"full" => builder.init(),
|
||||
_ => builder.init(),
|
||||
}
|
||||
tracing::info!(format = format.as_str(), "Logging initialized")
|
||||
tracing_subscriber::registry()
|
||||
.with(fmt_layer)
|
||||
.with(console)
|
||||
.init();
|
||||
|
||||
tracing::info!(format = format.as_str(), "Logging initialized");
|
||||
|
||||
server
|
||||
}
|
||||
|
@ -25,3 +25,6 @@ tokio-util = "0.7"
|
||||
futures-util = "0.3"
|
||||
tokio = { version = "1.19", default_features = false, features = []}
|
||||
hdrhistogram = "7.5"
|
||||
|
||||
[dev-dependencies]
|
||||
tracing-subscriber = "0.3"
|
||||
|
@ -1,35 +1,24 @@
|
||||
use crate::id_map::{IdMap, ToProto};
|
||||
use crate::server::{Watch, WatchRequest};
|
||||
use crate::stats::TimeAnchor;
|
||||
use crate::Event;
|
||||
use crate::stats::{TimeAnchor, Unsent};
|
||||
use crate::{server, stats};
|
||||
use crate::{Event, Shared};
|
||||
use console_api::{async_ops, instrument, resources, tasks};
|
||||
use crossbeam_channel::{Receiver, TryRecvError};
|
||||
use futures_util::{FutureExt, StreamExt};
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroU64;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tracing::span;
|
||||
use tracing_core::Metadata;
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
|
||||
pub(crate) struct Id(NonZeroU64);
|
||||
|
||||
impl Id {
|
||||
pub fn from_non_zero_u64(u: NonZeroU64) -> Self {
|
||||
Self(u)
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<console_api::Id> for Id {
|
||||
fn into(self) -> console_api::Id {
|
||||
console_api::Id { id: self.0.into() }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Resource {
|
||||
id: Id,
|
||||
id: span::Id,
|
||||
is_dirty: AtomicBool,
|
||||
parent_id: Option<Id>,
|
||||
parent_id: Option<span::Id>,
|
||||
metadata: &'static Metadata<'static>,
|
||||
concrete_type: String,
|
||||
kind: resources::resource::Kind,
|
||||
@ -38,36 +27,114 @@ struct Resource {
|
||||
}
|
||||
|
||||
/// Represents static data for tasks
|
||||
#[derive(Debug)]
|
||||
struct Task {
|
||||
id: Id,
|
||||
id: span::Id,
|
||||
is_dirty: AtomicBool,
|
||||
metadata: &'static Metadata<'static>,
|
||||
fields: Vec<console_api::Field>,
|
||||
location: Option<console_api::Location>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct AsyncOp {
|
||||
id: Id,
|
||||
id: span::Id,
|
||||
is_dirty: AtomicBool,
|
||||
parent_id: Option<Id>,
|
||||
resource_id: Id,
|
||||
parent_id: Option<span::Id>,
|
||||
resource_id: span::Id,
|
||||
metadata: &'static Metadata<'static>,
|
||||
source: String,
|
||||
}
|
||||
|
||||
impl ToProto for Task {
|
||||
type Output = tasks::Task;
|
||||
|
||||
fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
|
||||
tasks::Task {
|
||||
id: Some(self.id.clone().into()),
|
||||
// TODO: more kinds of tasks...
|
||||
kind: tasks::task::Kind::Spawn as i32,
|
||||
metadata: Some(self.metadata.into()),
|
||||
parents: Vec::new(), // TODO: implement parents nicely
|
||||
fields: self.fields.clone(),
|
||||
location: self.location.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Unsent for Task {
|
||||
fn take_unsent(&self) -> bool {
|
||||
self.is_dirty.swap(false, Ordering::AcqRel)
|
||||
}
|
||||
|
||||
fn is_unsent(&self) -> bool {
|
||||
self.is_dirty.load(Ordering::Acquire)
|
||||
}
|
||||
}
|
||||
|
||||
impl ToProto for Resource {
|
||||
type Output = resources::Resource;
|
||||
|
||||
fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
|
||||
resources::Resource {
|
||||
id: Some(self.id.clone().into()),
|
||||
parent_resource_id: self.parent_id.clone().map(Into::into),
|
||||
kind: Some(self.kind.clone()),
|
||||
metadata: Some(self.metadata.into()),
|
||||
concrete_type: self.concrete_type.clone(),
|
||||
location: self.location.clone(),
|
||||
is_internal: self.is_internal,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Unsent for Resource {
|
||||
fn take_unsent(&self) -> bool {
|
||||
self.is_dirty.swap(false, Ordering::AcqRel)
|
||||
}
|
||||
|
||||
fn is_unsent(&self) -> bool {
|
||||
self.is_dirty.load(Ordering::Acquire)
|
||||
}
|
||||
}
|
||||
|
||||
impl ToProto for AsyncOp {
|
||||
type Output = async_ops::AsyncOp;
|
||||
|
||||
fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
|
||||
async_ops::AsyncOp {
|
||||
id: Some(self.id.clone().into()),
|
||||
metadata: Some(self.metadata.into()),
|
||||
resource_id: Some(self.resource_id.clone().into()),
|
||||
source: self.source.clone(),
|
||||
parent_async_op_id: self.parent_id.clone().map(Into::into),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Unsent for AsyncOp {
|
||||
fn take_unsent(&self) -> bool {
|
||||
self.is_dirty.swap(false, Ordering::AcqRel)
|
||||
}
|
||||
|
||||
fn is_unsent(&self) -> bool {
|
||||
self.is_dirty.load(Ordering::Acquire)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Eq, PartialEq)]
|
||||
pub(crate) enum Include {
|
||||
All,
|
||||
UpdatedOnly,
|
||||
}
|
||||
|
||||
type IdMap<T> = std::collections::HashMap<Id, T>;
|
||||
|
||||
pub(crate) struct Aggregator {
|
||||
#[derive(Debug)]
|
||||
pub struct Aggregator {
|
||||
shared: Arc<Shared>,
|
||||
events: Receiver<Event>,
|
||||
rpcs: async_channel::Receiver<server::Command>,
|
||||
watchers: Vec<Watch<instrument::Update>>,
|
||||
details_watchers: IdMap<Vec<Watch<tasks::TaskDetails>>>,
|
||||
details_watchers: HashMap<span::Id, Vec<Watch<tasks::TaskDetails>>>,
|
||||
all_metadata: Vec<console_api::register_metadata::NewMetadata>,
|
||||
new_metadata: Vec<console_api::register_metadata::NewMetadata>,
|
||||
running: bool,
|
||||
@ -83,23 +150,28 @@ pub(crate) struct Aggregator {
|
||||
}
|
||||
|
||||
impl Aggregator {
|
||||
pub fn new(events: Receiver<Event>, rpcs: async_channel::Receiver<server::Command>) -> Self {
|
||||
pub(crate) fn new(
|
||||
shared: Arc<Shared>,
|
||||
events: Receiver<Event>,
|
||||
rpcs: async_channel::Receiver<server::Command>,
|
||||
) -> Self {
|
||||
Self {
|
||||
shared,
|
||||
events,
|
||||
rpcs,
|
||||
watchers: Vec::new(),
|
||||
details_watchers: IdMap::new(),
|
||||
details_watchers: HashMap::default(),
|
||||
running: true,
|
||||
publish_interval: Duration::from_secs(1),
|
||||
all_metadata: Vec::new(),
|
||||
new_metadata: Vec::new(),
|
||||
base_time: TimeAnchor::new(),
|
||||
tasks: IdMap::new(),
|
||||
task_stats: IdMap::new(),
|
||||
resources: IdMap::new(),
|
||||
resource_stats: IdMap::new(),
|
||||
async_ops: IdMap::new(),
|
||||
async_op_stats: IdMap::new(),
|
||||
tasks: IdMap::default(),
|
||||
task_stats: IdMap::default(),
|
||||
resources: IdMap::default(),
|
||||
resource_stats: IdMap::default(),
|
||||
async_ops: IdMap::default(),
|
||||
async_op_stats: IdMap::default(),
|
||||
poll_ops: Vec::new(),
|
||||
}
|
||||
}
|
||||
@ -163,15 +235,32 @@ impl Aggregator {
|
||||
}
|
||||
|
||||
fn task_update(&mut self, include: Include) -> tasks::TaskUpdate {
|
||||
todo!()
|
||||
tasks::TaskUpdate {
|
||||
new_tasks: self.tasks.as_proto_list(include, &self.base_time),
|
||||
stats_update: self.task_stats.as_proto(include, &self.base_time),
|
||||
dropped_events: self.shared.dropped_tasks.swap(0, Ordering::AcqRel) as u64,
|
||||
}
|
||||
}
|
||||
|
||||
fn resource_update(&mut self, include: Include) -> resources::ResourceUpdate {
|
||||
todo!()
|
||||
let new_poll_ops = match include {
|
||||
Include::All => self.poll_ops.clone(),
|
||||
Include::UpdatedOnly => std::mem::take(&mut self.poll_ops),
|
||||
};
|
||||
resources::ResourceUpdate {
|
||||
new_resources: self.resources.as_proto_list(include, &self.base_time),
|
||||
stats_update: self.resource_stats.as_proto(include, &self.base_time),
|
||||
new_poll_ops,
|
||||
dropped_events: self.shared.dropped_resources.swap(0, Ordering::AcqRel) as u64,
|
||||
}
|
||||
}
|
||||
|
||||
fn async_op_update(&mut self, include: Include) -> async_ops::AsyncOpUpdate {
|
||||
todo!()
|
||||
async_ops::AsyncOpUpdate {
|
||||
new_async_ops: self.async_ops.as_proto_list(include, &self.base_time),
|
||||
stats_update: self.async_op_stats.as_proto(include, &self.base_time),
|
||||
dropped_events: self.shared.dropped_async_ops.swap(0, Ordering::AcqRel) as u64,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(mut self) {
|
||||
@ -210,7 +299,6 @@ impl Aggregator {
|
||||
// exited. that would result in a busy-loop. instead, we only want
|
||||
// to be woken when the flush interval has elapsed, or when the
|
||||
// channel is almost full.
|
||||
let mut drained = false;
|
||||
while let Ok(event) = self.events.try_recv() {
|
||||
self.update_state(event);
|
||||
}
|
||||
|
@ -1,11 +1,94 @@
|
||||
use crate::aggregate::Id;
|
||||
use std::collections::HashMap;
|
||||
use tracing_core::span::Id;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct Attributes {
|
||||
attributes: HashMap<FieldKey, console_api::Attribute>,
|
||||
}
|
||||
|
||||
impl Attributes {
|
||||
pub(crate) fn values(&self) -> impl Iterator<Item = &console_api::Attribute> {
|
||||
self.attributes.values()
|
||||
}
|
||||
|
||||
pub(crate) fn update(&mut self, id: &Id, update: &Update) {
|
||||
let field_name = match update.field.name.as_ref() {
|
||||
Some(name) => name.clone(),
|
||||
None => {
|
||||
tracing::warn!(?update.field, "field missing name, skipping...");
|
||||
return;
|
||||
}
|
||||
};
|
||||
let update_id = id.clone();
|
||||
let key = FieldKey {
|
||||
update_id,
|
||||
field_name,
|
||||
};
|
||||
|
||||
self.attributes
|
||||
.entry(key)
|
||||
.and_modify(|attr| update_attribute(attr, update))
|
||||
.or_insert_with(|| update.clone().into());
|
||||
}
|
||||
}
|
||||
|
||||
fn update_attribute(attribute: &mut console_api::Attribute, update: &Update) {
|
||||
use console_api::field::Value::*;
|
||||
let attribute_val = attribute.field.as_mut().and_then(|a| a.value.as_mut());
|
||||
let update_val = update.field.value.clone();
|
||||
let update_name = update.field.name.clone();
|
||||
match (attribute_val, update_val) {
|
||||
(Some(BoolVal(v)), Some(BoolVal(upd))) => *v = upd,
|
||||
|
||||
(Some(StrVal(v)), Some(StrVal(upd))) => *v = upd,
|
||||
|
||||
(Some(DebugVal(v)), Some(DebugVal(upd))) => *v = upd,
|
||||
|
||||
(Some(U64Val(v)), Some(U64Val(upd))) => match update.op {
|
||||
Some(UpdateOp::Add) => *v = v.saturating_add(upd),
|
||||
|
||||
Some(UpdateOp::Sub) => *v = v.saturating_sub(upd),
|
||||
|
||||
Some(UpdateOp::Override) => *v = upd,
|
||||
|
||||
None => tracing::warn!(
|
||||
"numeric attribute update {:?} needs to have an op field",
|
||||
update_name
|
||||
),
|
||||
},
|
||||
|
||||
(Some(I64Val(v)), Some(I64Val(upd))) => match update.op {
|
||||
Some(UpdateOp::Add) => *v = v.saturating_add(upd),
|
||||
|
||||
Some(UpdateOp::Sub) => *v = v.saturating_sub(upd),
|
||||
|
||||
Some(UpdateOp::Override) => *v = upd,
|
||||
|
||||
None => tracing::warn!(
|
||||
"numeric attribute update {:?} needs to have an op field",
|
||||
update_name
|
||||
),
|
||||
},
|
||||
|
||||
(val, update) => {
|
||||
tracing::warn!(
|
||||
"attribute {:?} cannot be updated by update {:?}",
|
||||
val,
|
||||
update
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Update> for console_api::Attribute {
|
||||
fn from(upd: Update) -> Self {
|
||||
console_api::Attribute {
|
||||
field: Some(upd.field),
|
||||
unit: upd.unit,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct Update {
|
||||
pub(crate) field: console_api::Field,
|
||||
|
@ -1,21 +1,21 @@
|
||||
use crate::aggregate::Id;
|
||||
use crate::stats;
|
||||
use console_api::resources;
|
||||
use std::sync::Arc;
|
||||
use tracing::span;
|
||||
use tracing_core::Metadata;
|
||||
|
||||
pub(crate) enum Event {
|
||||
Metadata(&'static Metadata<'static>),
|
||||
Spawn {
|
||||
id: Id,
|
||||
id: span::Id,
|
||||
metadata: &'static Metadata<'static>,
|
||||
stats: Arc<stats::TaskStats>,
|
||||
fields: Vec<console_api::Field>,
|
||||
location: Option<console_api::Location>,
|
||||
},
|
||||
Resource {
|
||||
id: Id,
|
||||
parent_id: Option<Id>,
|
||||
id: span::Id,
|
||||
parent_id: Option<span::Id>,
|
||||
metadata: &'static Metadata<'static>,
|
||||
concrete_type: String,
|
||||
kind: resources::resource::Kind,
|
||||
@ -25,19 +25,42 @@ pub(crate) enum Event {
|
||||
},
|
||||
PollOp {
|
||||
metadata: &'static Metadata<'static>,
|
||||
resource_id: Id,
|
||||
resource_id: span::Id,
|
||||
op_name: String,
|
||||
async_op_id: Id,
|
||||
task_id: Id,
|
||||
async_op_id: span::Id,
|
||||
task_id: span::Id,
|
||||
is_ready: bool,
|
||||
},
|
||||
AsyncResourceOp {
|
||||
id: Id,
|
||||
parent_id: Option<Id>,
|
||||
resource_id: Id,
|
||||
id: span::Id,
|
||||
parent_id: Option<span::Id>,
|
||||
resource_id: span::Id,
|
||||
metadata: &'static Metadata<'static>,
|
||||
source: String,
|
||||
|
||||
stats: Arc<stats::AsyncOpStats>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Copy)]
|
||||
pub(crate) enum WakeOp {
|
||||
Wake { self_wake: bool },
|
||||
WakeByRef { self_wake: bool },
|
||||
Clone,
|
||||
Drop,
|
||||
}
|
||||
|
||||
impl WakeOp {
|
||||
/// Returns `true` if `self` is a `Wake` or `WakeByRef` event.
|
||||
pub(crate) fn is_wake(self) -> bool {
|
||||
matches!(self, Self::Wake { .. } | Self::WakeByRef { .. })
|
||||
}
|
||||
|
||||
pub(crate) fn self_wake(self, self_wake: bool) -> Self {
|
||||
match self {
|
||||
Self::Wake { .. } => Self::Wake { self_wake },
|
||||
Self::WakeByRef { .. } => Self::WakeByRef { self_wake },
|
||||
x => x,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
126
runtime/console/src/id_map.rs
Normal file
126
runtime/console/src/id_map.rs
Normal file
@ -0,0 +1,126 @@
|
||||
use crate::aggregate::Include;
|
||||
use crate::stats::{DroppedAt, TimeAnchor, Unsent};
|
||||
use std::collections::HashMap;
|
||||
use std::time::{Duration, Instant};
|
||||
use tracing_core::span::Id;
|
||||
|
||||
pub(crate) trait ToProto {
|
||||
type Output;
|
||||
fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct IdMap<T> {
|
||||
data: HashMap<Id, T>,
|
||||
}
|
||||
|
||||
impl<T> Default for IdMap<T> {
|
||||
fn default() -> Self {
|
||||
IdMap {
|
||||
data: HashMap::<Id, T>::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Unsent> IdMap<T> {
|
||||
pub(crate) fn insert(&mut self, id: Id, data: T) {
|
||||
self.data.insert(id, data);
|
||||
}
|
||||
|
||||
pub(crate) fn since_last_update(&mut self) -> impl Iterator<Item = (&Id, &mut T)> {
|
||||
self.data.iter_mut().filter_map(|(id, data)| {
|
||||
if data.take_unsent() {
|
||||
Some((id, data))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn all(&self) -> impl Iterator<Item = (&Id, &T)> {
|
||||
self.data.iter()
|
||||
}
|
||||
|
||||
pub(crate) fn get(&self, id: &Id) -> Option<&T> {
|
||||
self.data.get(id)
|
||||
}
|
||||
|
||||
pub(crate) fn as_proto_list(
|
||||
&mut self,
|
||||
include: Include,
|
||||
base_time: &TimeAnchor,
|
||||
) -> Vec<T::Output>
|
||||
where
|
||||
T: ToProto,
|
||||
{
|
||||
match include {
|
||||
Include::UpdatedOnly => self
|
||||
.since_last_update()
|
||||
.map(|(_, d)| d.to_proto(base_time))
|
||||
.collect(),
|
||||
Include::All => self.all().map(|(_, d)| d.to_proto(base_time)).collect(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn as_proto(
|
||||
&mut self,
|
||||
include: Include,
|
||||
base_time: &TimeAnchor,
|
||||
) -> HashMap<u64, T::Output>
|
||||
where
|
||||
T: ToProto,
|
||||
{
|
||||
match include {
|
||||
Include::UpdatedOnly => self
|
||||
.since_last_update()
|
||||
.map(|(id, d)| (id.into_u64(), d.to_proto(base_time)))
|
||||
.collect(),
|
||||
Include::All => self
|
||||
.all()
|
||||
.map(|(id, d)| (id.into_u64(), d.to_proto(base_time)))
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn drop_closed<R: DroppedAt + Unsent>(
|
||||
&mut self,
|
||||
stats: &mut IdMap<R>,
|
||||
now: Instant,
|
||||
retention: Duration,
|
||||
has_watchers: bool,
|
||||
) {
|
||||
let _span = tracing::debug_span!(
|
||||
"drop_closed",
|
||||
entity = %std::any::type_name::<T>(),
|
||||
stats = %std::any::type_name::<R>(),
|
||||
)
|
||||
.entered();
|
||||
|
||||
// drop closed entities
|
||||
tracing::trace!(?retention, has_watchers, "dropping closed");
|
||||
|
||||
stats.data.retain(|id, stats| {
|
||||
if let Some(dropped_at) = stats.dropped_at() {
|
||||
let dropped_for = now.checked_duration_since(dropped_at).unwrap_or_default();
|
||||
let dirty = stats.is_unsent();
|
||||
let should_drop =
|
||||
// if there are any clients watching, retain all dirty tasks regardless of age
|
||||
(dirty && has_watchers)
|
||||
|| dropped_for > retention;
|
||||
tracing::trace!(
|
||||
stats.id = ?id,
|
||||
stats.dropped_at = ?dropped_at,
|
||||
stats.dropped_for = ?dropped_for,
|
||||
stats.dirty = dirty,
|
||||
should_drop,
|
||||
);
|
||||
return !should_drop;
|
||||
}
|
||||
|
||||
true
|
||||
});
|
||||
|
||||
// drop closed entities which no longer have stats.
|
||||
self.data.retain(|id, _| stats.data.contains_key(id));
|
||||
}
|
||||
}
|
@ -1,31 +1,39 @@
|
||||
use crossbeam_channel::{Sender, TrySendError};
|
||||
use std::any::TypeId;
|
||||
use std::borrow::Borrow;
|
||||
use std::cell::RefCell;
|
||||
use std::net::IpAddr;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use thread_local::ThreadLocal;
|
||||
use tracing_core::span::{Attributes, Id, Record};
|
||||
use tracing_core::{Interest, LevelFilter, Metadata, Subscriber};
|
||||
use tracing_subscriber::filter::Filtered;
|
||||
use tracing_subscriber::layer::{Context, Filter, Layered};
|
||||
use tracing_subscriber::registry::LookupSpan;
|
||||
use tracing::span;
|
||||
use tracing_core::span::Attributes;
|
||||
use tracing_core::{Interest, Metadata, Subscriber};
|
||||
use tracing_subscriber::layer::{Context, Filter};
|
||||
use tracing_subscriber::registry::{LookupSpan, SpanRef};
|
||||
use tracing_subscriber::Layer;
|
||||
|
||||
mod aggregate;
|
||||
mod attribute;
|
||||
mod callsites;
|
||||
mod event;
|
||||
mod id_map;
|
||||
mod server;
|
||||
mod stack;
|
||||
mod stats;
|
||||
mod visitors;
|
||||
|
||||
use crate::aggregate::Aggregator;
|
||||
use crate::callsites::Callsites;
|
||||
use crate::visitors::{
|
||||
AsyncOpVisitor, PollOpVisitor, ResourceVisitor, ResourceVisitorResult, StateUpdateVisitor,
|
||||
TaskVisitor, WakerVisitor,
|
||||
};
|
||||
use event::Event;
|
||||
pub use server::Server;
|
||||
use stack::SpanStack;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ConsoleLayer {
|
||||
current_spans: ThreadLocal<RefCell<SpanStack>>,
|
||||
|
||||
@ -34,6 +42,34 @@ pub struct ConsoleLayer {
|
||||
|
||||
spawn_callsites: Callsites<8>,
|
||||
waker_callsites: Callsites<8>,
|
||||
resource_callsites: Callsites<8>,
|
||||
|
||||
/// Set of callsites for spans representing async operations on resources
|
||||
///
|
||||
/// TODO: Take some time to determine more reasonable numbers
|
||||
async_op_callsites: Callsites<32>,
|
||||
|
||||
/// Set of callsites for spans representing async op poll operations
|
||||
///
|
||||
/// TODO: Take some time to determine more reasonable numbers
|
||||
async_op_poll_callsites: Callsites<32>,
|
||||
|
||||
/// Set of callsites for events representing poll operation invocations on resources
|
||||
///
|
||||
/// TODO: Take some time to determine more reasonable numbers
|
||||
poll_op_callsites: Callsites<32>,
|
||||
|
||||
/// Set of callsites for events representing state attribute state updates on resources
|
||||
///
|
||||
/// TODO: Take some time to determine more reasonable numbers
|
||||
resource_state_update_callsites: Callsites<32>,
|
||||
|
||||
/// Set of callsites for events representing state attribute state updates on async resource ops
|
||||
///
|
||||
/// TODO: Take some time to determine more reasonable numbers
|
||||
async_op_state_update_callsites: Callsites<32>,
|
||||
|
||||
max_poll_duration_nanos: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -50,6 +86,8 @@ pub struct Builder {
|
||||
event_buffer_capacity: usize,
|
||||
|
||||
client_buffer_capacity: usize,
|
||||
|
||||
poll_duration_max: Duration,
|
||||
}
|
||||
impl Builder {
|
||||
pub fn build(self) -> (ConsoleLayer, Server) {
|
||||
@ -64,6 +102,7 @@ impl Default for Builder {
|
||||
server_port: Server::DEFAULT_PORT,
|
||||
event_buffer_capacity: ConsoleLayer::DEFAULT_EVENT_BUFFER_CAPACITY,
|
||||
client_buffer_capacity: 1024,
|
||||
poll_duration_max: ConsoleLayer::DEFAULT_POLL_DURATION_MAX,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -72,6 +111,7 @@ impl Default for Builder {
|
||||
struct Shared {
|
||||
dropped_tasks: AtomicUsize,
|
||||
dropped_resources: AtomicUsize,
|
||||
dropped_async_ops: AtomicUsize,
|
||||
}
|
||||
|
||||
impl ConsoleLayer {
|
||||
@ -91,7 +131,7 @@ impl ConsoleLayer {
|
||||
let (tx, events) = crossbeam_channel::bounded(config.event_buffer_capacity);
|
||||
let shared = Arc::new(Shared::default());
|
||||
let (subscribe, rpcs) = async_channel::bounded(config.client_buffer_capacity);
|
||||
let aggregator = Aggregator::new(events, rpcs);
|
||||
let aggregator = Aggregator::new(shared.clone(), events, rpcs);
|
||||
let server = Server::new(aggregator, config.client_buffer_capacity, subscribe);
|
||||
let layer = Self {
|
||||
current_spans: ThreadLocal::new(),
|
||||
@ -99,6 +139,13 @@ impl ConsoleLayer {
|
||||
shared,
|
||||
spawn_callsites: Callsites::default(),
|
||||
waker_callsites: Callsites::default(),
|
||||
resource_callsites: Callsites::default(),
|
||||
async_op_callsites: Callsites::default(),
|
||||
async_op_poll_callsites: Callsites::default(),
|
||||
poll_op_callsites: Callsites::default(),
|
||||
resource_state_update_callsites: Callsites::default(),
|
||||
async_op_state_update_callsites: Callsites::default(),
|
||||
max_poll_duration_nanos: config.poll_duration_max.as_nanos() as u64,
|
||||
};
|
||||
|
||||
(layer, server)
|
||||
@ -109,6 +156,14 @@ impl ConsoleLayer {
|
||||
const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 1024;
|
||||
const DEFAULT_CLIENT_BUFFER_CAPACITY: usize = 1024;
|
||||
|
||||
/// The default maximum value for task poll duration histograms.
|
||||
///
|
||||
/// Any poll duration exceeding this will be clamped to this value. By
|
||||
/// default, the maximum poll duration is one second.
|
||||
///
|
||||
/// See also [`Builder::poll_duration_histogram_max`].
|
||||
pub const DEFAULT_POLL_DURATION_MAX: Duration = Duration::from_secs(1);
|
||||
|
||||
fn is_spawn(&self, metadata: &Metadata<'static>) -> bool {
|
||||
self.spawn_callsites.contains(metadata)
|
||||
}
|
||||
@ -117,6 +172,54 @@ impl ConsoleLayer {
|
||||
self.waker_callsites.contains(metadata)
|
||||
}
|
||||
|
||||
fn is_resource(&self, meta: &'static Metadata<'static>) -> bool {
|
||||
self.resource_callsites.contains(meta)
|
||||
}
|
||||
|
||||
fn is_async_op(&self, meta: &'static Metadata<'static>) -> bool {
|
||||
self.async_op_callsites.contains(meta)
|
||||
}
|
||||
|
||||
fn is_id_spawned<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
cx.span(id)
|
||||
.map(|span| self.is_spawn(span.metadata()))
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn is_id_resource<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
cx.span(id)
|
||||
.map(|span| self.is_resource(span.metadata()))
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn is_id_async_op<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
cx.span(id)
|
||||
.map(|span| self.is_async_op(span.metadata()))
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn first_entered<P>(&self, stack: &SpanStack, p: P) -> Option<span::Id>
|
||||
where
|
||||
P: Fn(&span::Id) -> bool,
|
||||
{
|
||||
stack
|
||||
.stack()
|
||||
.iter()
|
||||
.rev()
|
||||
.find(|id| p(id.id()))
|
||||
.map(|id| id.id())
|
||||
.cloned()
|
||||
}
|
||||
|
||||
fn send_stats<S>(
|
||||
&self,
|
||||
dropped: &AtomicUsize,
|
||||
@ -157,6 +260,30 @@ where
|
||||
self.waker_callsites.insert(metadata);
|
||||
&self.shared.dropped_tasks
|
||||
}
|
||||
(ResourceVisitor::RES_SPAN_NAME, _) => {
|
||||
self.resource_callsites.insert(metadata);
|
||||
&self.shared.dropped_resources
|
||||
}
|
||||
(AsyncOpVisitor::ASYNC_OP_SPAN_NAME, _) => {
|
||||
self.async_op_callsites.insert(metadata);
|
||||
&self.shared.dropped_async_ops
|
||||
}
|
||||
("runtime.resource.async_op.poll", _) => {
|
||||
self.async_op_poll_callsites.insert(metadata);
|
||||
&self.shared.dropped_async_ops
|
||||
}
|
||||
(_, PollOpVisitor::POLL_OP_EVENT_TARGET) => {
|
||||
self.poll_op_callsites.insert(metadata);
|
||||
&self.shared.dropped_async_ops
|
||||
}
|
||||
(_, StateUpdateVisitor::RE_STATE_UPDATE_EVENT_TARGET) => {
|
||||
self.resource_state_update_callsites.insert(metadata);
|
||||
&self.shared.dropped_resources
|
||||
}
|
||||
(_, StateUpdateVisitor::AO_STATE_UPDATE_EVENT_TARGET) => {
|
||||
self.async_op_state_update_callsites.insert(metadata);
|
||||
&self.shared.dropped_async_ops
|
||||
}
|
||||
(_, _) => &self.shared.dropped_tasks,
|
||||
};
|
||||
|
||||
@ -164,13 +291,215 @@ where
|
||||
|
||||
Interest::always()
|
||||
}
|
||||
|
||||
fn on_new_span(&self, attrs: &Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
|
||||
let metadata = attrs.metadata();
|
||||
if self.is_spawn(metadata) {
|
||||
let at = Instant::now();
|
||||
let mut task_visitor = TaskVisitor::new(metadata.into());
|
||||
attrs.record(&mut task_visitor);
|
||||
let (fields, location) = task_visitor.result();
|
||||
if let Some(stats) = self.send_stats(&self.shared.dropped_tasks, move || {
|
||||
let stats = Arc::new(stats::TaskStats::new(self.max_poll_duration_nanos, at));
|
||||
let event = Event::Spawn {
|
||||
id: id.clone(),
|
||||
stats: stats.clone(),
|
||||
metadata,
|
||||
fields,
|
||||
location,
|
||||
};
|
||||
(event, stats)
|
||||
}) {
|
||||
ctx.span(id)
|
||||
.expect("`on_new_span` called with nonexistent span. This is a tracing bug.");
|
||||
}
|
||||
} else if self.is_resource(metadata) {
|
||||
let at = Instant::now();
|
||||
let mut resource_visitor = ResourceVisitor::default();
|
||||
attrs.record(&mut resource_visitor);
|
||||
if let Some(result) = resource_visitor.result() {
|
||||
let ResourceVisitorResult {
|
||||
concrete_type,
|
||||
kind,
|
||||
location,
|
||||
is_internal,
|
||||
inherit_child_attrs,
|
||||
} = result;
|
||||
let parent_id = self.current_spans.get().and_then(|stack| {
|
||||
self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
|
||||
});
|
||||
if let Some(stats) = self.send_stats(&self.shared.dropped_resources, move || {
|
||||
let stats = Arc::new(stats::ResourceStats::new(
|
||||
at,
|
||||
inherit_child_attrs,
|
||||
parent_id.clone(),
|
||||
));
|
||||
let event = Event::Resource {
|
||||
id: id.clone(),
|
||||
parent_id,
|
||||
metadata,
|
||||
concrete_type,
|
||||
kind,
|
||||
location,
|
||||
is_internal,
|
||||
stats: stats.clone(),
|
||||
};
|
||||
(event, stats)
|
||||
}) {
|
||||
ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
|
||||
}
|
||||
}
|
||||
} else if self.is_async_op(metadata) {
|
||||
let at = Instant::now();
|
||||
let mut async_op_visitor = AsyncOpVisitor::default();
|
||||
attrs.record(&mut async_op_visitor);
|
||||
if let Some((source, inherit_child_attrs)) = async_op_visitor.result() {
|
||||
let resource_id = self.current_spans.get().and_then(|stack| {
|
||||
self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
|
||||
});
|
||||
|
||||
let parent_id = self.current_spans.get().and_then(|stack| {
|
||||
self.first_entered(&stack.borrow(), |id| self.is_id_async_op(id, &ctx))
|
||||
});
|
||||
|
||||
if let Some(resource_id) = resource_id {
|
||||
if let Some(stats) =
|
||||
self.send_stats(&self.shared.dropped_async_ops, move || {
|
||||
let stats = Arc::new(stats::AsyncOpStats::new(
|
||||
at,
|
||||
inherit_child_attrs,
|
||||
parent_id.clone(),
|
||||
));
|
||||
let event = Event::AsyncResourceOp {
|
||||
id: id.clone(),
|
||||
parent_id,
|
||||
resource_id,
|
||||
metadata,
|
||||
source,
|
||||
stats: stats.clone(),
|
||||
};
|
||||
(event, stats)
|
||||
})
|
||||
{
|
||||
ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[test]
|
||||
fn it_works() {
|
||||
let result = 2 + 2;
|
||||
assert_eq!(result, 4);
|
||||
fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
|
||||
let metadata = event.metadata();
|
||||
if self.waker_callsites.contains(metadata) {
|
||||
let at = Instant::now();
|
||||
let mut visitor = WakerVisitor::default();
|
||||
event.record(&mut visitor);
|
||||
if let Some((id, mut op)) = visitor.result() {
|
||||
if let Some(span) = ctx.span(&id) {
|
||||
let exts = span.extensions();
|
||||
if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
|
||||
if op.is_wake() {
|
||||
let self_wake = self
|
||||
.current_spans
|
||||
.get()
|
||||
.map(|spans| spans.borrow().iter().any(|span| span == &id))
|
||||
.unwrap_or(false);
|
||||
op = op.self_wake(self_wake);
|
||||
}
|
||||
|
||||
stats.record_wake_op(op, at);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn on_enter(&self, id: &span::Id, cx: Context<'_, S>) {
|
||||
fn update<S: Subscriber + for<'a> LookupSpan<'a>>(
|
||||
span: &SpanRef<S>,
|
||||
at: Option<Instant>,
|
||||
) -> Option<Instant> {
|
||||
let exts = span.extensions();
|
||||
// if the span we are entering is a task or async op, record the
|
||||
// poll stats.
|
||||
if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
|
||||
let at = at.unwrap_or_else(Instant::now);
|
||||
stats.start_poll(at);
|
||||
Some(at)
|
||||
} else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
|
||||
let at = at.unwrap_or_else(Instant::now);
|
||||
stats.start_poll(at);
|
||||
Some(at)
|
||||
// otherwise, is the span a resource? in that case, we also want
|
||||
// to enter it, although we don't care about recording poll
|
||||
// stats.
|
||||
} else if exts.get::<Arc<stats::ResourceStats>>().is_some() {
|
||||
Some(at.unwrap_or_else(Instant::now))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(span) = cx.span(id) {
|
||||
if let Some(now) = update(&span, None) {
|
||||
if let Some(parent) = span.parent() {
|
||||
update(&parent, Some(now));
|
||||
}
|
||||
self.current_spans
|
||||
.get_or_default()
|
||||
.borrow_mut()
|
||||
.push(id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn on_exit(&self, id: &span::Id, cx: Context<'_, S>) {
|
||||
fn update<S: Subscriber + for<'a> LookupSpan<'a>>(
|
||||
span: &SpanRef<S>,
|
||||
at: Option<Instant>,
|
||||
) -> Option<Instant> {
|
||||
let exts = span.extensions();
|
||||
// if the span we are entering is a task or async op, record the
|
||||
// poll stats.
|
||||
if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
|
||||
let at = at.unwrap_or_else(Instant::now);
|
||||
stats.end_poll(at);
|
||||
Some(at)
|
||||
} else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
|
||||
let at = at.unwrap_or_else(Instant::now);
|
||||
stats.end_poll(at);
|
||||
Some(at)
|
||||
// otherwise, is the span a resource? in that case, we also want
|
||||
// to enter it, although we don't care about recording poll
|
||||
// stats.
|
||||
} else if exts.get::<Arc<stats::ResourceStats>>().is_some() {
|
||||
Some(at.unwrap_or_else(Instant::now))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(span) = cx.span(id) {
|
||||
if let Some(now) = update(&span, None) {
|
||||
if let Some(parent) = span.parent() {
|
||||
update(&parent, Some(now));
|
||||
}
|
||||
self.current_spans.get_or_default().borrow_mut().pop(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn on_close(&self, id: span::Id, cx: Context<'_, S>) {
|
||||
if let Some(span) = cx.span(&id) {
|
||||
let now = Instant::now();
|
||||
let exts = span.extensions();
|
||||
if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
|
||||
stats.drop_task(now);
|
||||
} else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
|
||||
stats.drop_async_op(now);
|
||||
} else if let Some(stats) = exts.get::<Arc<stats::ResourceStats>>() {
|
||||
stats.drop_resource(now);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,19 +1,21 @@
|
||||
use crate::aggregate::Id;
|
||||
use crate::Aggregator;
|
||||
use async_channel::{Receiver, Sender};
|
||||
use async_compat::CompatExt;
|
||||
use console_api::instrument;
|
||||
use console_api::instrument::instrument_server::{Instrument, InstrumentServer};
|
||||
use console_api::tasks;
|
||||
use futures_util::TryStreamExt;
|
||||
use std::error::Error;
|
||||
use std::future::Future;
|
||||
use std::io::IoSlice;
|
||||
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
|
||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use tokio::io::AsyncRead as TokioAsyncRead;
|
||||
use tokio::io::{AsyncWrite as TokioAsyncWrite, ReadBuf};
|
||||
use tonic::transport::server::Connected;
|
||||
use tonic::Status;
|
||||
use tracing_core::span::Id;
|
||||
|
||||
struct StreamWrapper<T>(T);
|
||||
impl<T> Connected for StreamWrapper<T> {
|
||||
@ -68,14 +70,16 @@ impl<T: TokioAsyncRead + Unpin> TokioAsyncRead for StreamWrapper<T> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Server {
|
||||
aggregator: Aggregator,
|
||||
pub aggregator: Option<Aggregator>,
|
||||
client_buffer_size: usize,
|
||||
subscribe: Sender<Command>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
pub(crate) const DEFAULT_ADDR: IpAddr = IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1));
|
||||
//pub(crate) const DEFAULT_ADDR: IpAddr = IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1));
|
||||
pub(crate) const DEFAULT_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
|
||||
pub(crate) const DEFAULT_PORT: u16 = 49289;
|
||||
|
||||
pub(crate) fn new(
|
||||
@ -83,15 +87,14 @@ impl Server {
|
||||
client_buffer_size: usize,
|
||||
subscribe: Sender<Command>,
|
||||
) -> Self {
|
||||
let subscribe = todo!();
|
||||
Self {
|
||||
aggregator,
|
||||
aggregator: Some(aggregator),
|
||||
client_buffer_size,
|
||||
subscribe,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn serve(
|
||||
pub async fn serve(
|
||||
mut self, /*, incoming: I */
|
||||
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
|
||||
// TODO: Spawn two tasks; the aggregator that's collecting stats, aggregating and
|
||||
@ -99,16 +102,10 @@ impl Server {
|
||||
|
||||
let svc = InstrumentServer::new(self);
|
||||
|
||||
// The gRPC server task; requires a `Stream` of `tokio::AsyncRead + tokio::AsyncWrite`.
|
||||
let listener =
|
||||
async_net::TcpListener::bind(SocketAddr::new(Self::DEFAULT_ADDR, Self::DEFAULT_PORT))
|
||||
.await?;
|
||||
let incoming = listener
|
||||
.incoming()
|
||||
.map_ok(|stream| StreamWrapper(async_compat::Compat::new(stream)));
|
||||
tonic::transport::Server::builder()
|
||||
.add_service(svc)
|
||||
.serve_with_incoming(incoming)
|
||||
.serve(SocketAddr::new(Self::DEFAULT_ADDR, Self::DEFAULT_PORT))
|
||||
.compat()
|
||||
.await?;
|
||||
|
||||
// TODO: Kill the aggregator task if the serve task has ended.
|
||||
@ -117,6 +114,7 @@ impl Server {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Watch<T>(pub(crate) Sender<Result<T, tonic::Status>>);
|
||||
impl<T: Clone> Watch<T> {
|
||||
pub fn update(&self, update: &T) -> bool {
|
||||
@ -124,12 +122,14 @@ impl<T: Clone> Watch<T> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct WatchRequest<T> {
|
||||
pub id: Id,
|
||||
pub stream_sender: async_oneshot::Sender<Receiver<Result<T, tonic::Status>>>,
|
||||
pub buffer: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum Command {
|
||||
Instrument(Watch<instrument::Update>),
|
||||
WatchTaskDetail(WatchRequest<tasks::TaskDetails>),
|
||||
|
@ -1,10 +1,62 @@
|
||||
use crate::aggregate::Id;
|
||||
use crate::attribute;
|
||||
use crate::id_map::ToProto;
|
||||
use crate::{attribute, event};
|
||||
use crossbeam_utils::atomic::AtomicCell;
|
||||
use hdrhistogram::serialization::{Serializer, V2Serializer};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize};
|
||||
use std::sync::Mutex;
|
||||
use std::cmp;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use tracing::span;
|
||||
|
||||
/// A type which records whether it has unsent updates.
|
||||
///
|
||||
/// If something implementing this trait has been changed since the last time
|
||||
/// data was sent to a client, it will indicate that it is "dirty". If it has
|
||||
/// not been changed, it does not have to be included in the current update.
|
||||
pub(crate) trait Unsent {
|
||||
/// Returns `true` if this type has unsent updates, and if it does, clears
|
||||
/// the flag indicating there are unsent updates.
|
||||
///
|
||||
/// This is called when filtering which stats need to be included in the
|
||||
/// current update. If this returns `true`, it will be included, so it
|
||||
/// becomes no longer dirty.
|
||||
fn take_unsent(&self) -> bool;
|
||||
|
||||
/// Returns `true` if this type has unsent updates, without changing the
|
||||
/// flag.
|
||||
fn is_unsent(&self) -> bool;
|
||||
}
|
||||
|
||||
// An entity (e.g Task, Resource) that at some point in
|
||||
// time can be dropped. This generally refers to spans that
|
||||
// have been closed indicating that a task, async op or a
|
||||
// resource is not in use anymore
|
||||
pub(crate) trait DroppedAt {
|
||||
fn dropped_at(&self) -> Option<Instant>;
|
||||
}
|
||||
|
||||
impl<T: DroppedAt> DroppedAt for Arc<T> {
|
||||
fn dropped_at(&self) -> Option<Instant> {
|
||||
T::dropped_at(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Unsent> Unsent for Arc<T> {
|
||||
fn take_unsent(&self) -> bool {
|
||||
T::take_unsent(self)
|
||||
}
|
||||
|
||||
fn is_unsent(&self) -> bool {
|
||||
T::is_unsent(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ToProto> ToProto for Arc<T> {
|
||||
type Output = T::Output;
|
||||
fn to_proto(&self, base_time: &TimeAnchor) -> T::Output {
|
||||
T::to_proto(self, base_time)
|
||||
}
|
||||
}
|
||||
|
||||
/// Anchors an `Instant` with a `SystemTime` timestamp to allow converting
|
||||
/// monotonic `Instant`s into timestamps that can be sent over the wire.
|
||||
@ -34,6 +86,10 @@ impl TimeAnchor {
|
||||
}
|
||||
}
|
||||
|
||||
trait RecordPoll {
|
||||
fn record_poll_duration(&mut self, duration: Duration);
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct PollStats<H> {
|
||||
/// The number of polls in progress
|
||||
@ -43,6 +99,78 @@ struct PollStats<H> {
|
||||
timestamps: Mutex<PollTimestamps<H>>,
|
||||
}
|
||||
|
||||
impl<H: RecordPoll> PollStats<H> {
|
||||
fn start_poll(&self, at: Instant) {
|
||||
if self.current_polls.fetch_add(1, Ordering::AcqRel) == 0 {
|
||||
// We are starting the first poll
|
||||
let mut timestamps = self.timestamps.lock().unwrap();
|
||||
if timestamps.first_poll.is_none() {
|
||||
timestamps.first_poll = Some(at);
|
||||
}
|
||||
|
||||
timestamps.last_poll_started = Some(at);
|
||||
|
||||
self.polls.fetch_add(1, Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
fn end_poll(&self, at: Instant) {
|
||||
// Are we ending the last current poll?
|
||||
if self.current_polls.fetch_sub(1, Ordering::AcqRel) > 1 {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut timestamps = self.timestamps.lock().unwrap();
|
||||
let started = match timestamps.last_poll_started {
|
||||
Some(last_poll) => last_poll,
|
||||
None => {
|
||||
eprintln!(
|
||||
"a poll ended, but start timestamp was recorded. \
|
||||
this is probably a `console-subscriber` bug"
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
timestamps.last_poll_ended = Some(at);
|
||||
let elapsed = match at.checked_duration_since(started) {
|
||||
Some(elapsed) => elapsed,
|
||||
None => {
|
||||
eprintln!(
|
||||
"possible Instant clock skew detected: a poll's end timestamp \
|
||||
was before its start timestamp\nstart = {:?}\n end = {:?}",
|
||||
started, at
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// if we have a poll time histogram, add the timestamp
|
||||
timestamps.histogram.record_poll_duration(elapsed);
|
||||
|
||||
timestamps.busy_time += elapsed;
|
||||
}
|
||||
}
|
||||
|
||||
impl<H> ToProto for PollStats<H> {
|
||||
type Output = console_api::PollStats;
|
||||
|
||||
fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output {
|
||||
let timestamps = self.timestamps.lock().unwrap();
|
||||
console_api::PollStats {
|
||||
polls: self.polls.load(Ordering::Acquire) as u64,
|
||||
first_poll: timestamps.first_poll.map(|at| base_time.to_timestamp(at)),
|
||||
last_poll_started: timestamps
|
||||
.last_poll_started
|
||||
.map(|at| base_time.to_timestamp(at)),
|
||||
last_poll_ended: timestamps
|
||||
.last_poll_ended
|
||||
.map(|at| base_time.to_timestamp(at)),
|
||||
busy_time: Some(timestamps.busy_time.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Stats associated with a task.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TaskStats {
|
||||
@ -63,6 +191,30 @@ pub(crate) struct TaskStats {
|
||||
}
|
||||
|
||||
impl TaskStats {
|
||||
pub(crate) fn new(poll_duration_max: u64, created_at: Instant) -> Self {
|
||||
Self {
|
||||
is_dirty: AtomicBool::new(true),
|
||||
is_dropped: AtomicBool::new(false),
|
||||
created_at,
|
||||
timestamps: Mutex::new(TaskTimestamps::default()),
|
||||
poll_stats: PollStats {
|
||||
timestamps: Mutex::new(PollTimestamps {
|
||||
histogram: Histogram::new(poll_duration_max),
|
||||
first_poll: None,
|
||||
last_poll_started: None,
|
||||
last_poll_ended: None,
|
||||
busy_time: Duration::new(0, 0),
|
||||
}),
|
||||
current_polls: AtomicUsize::new(0),
|
||||
polls: AtomicUsize::new(0),
|
||||
},
|
||||
wakes: AtomicUsize::new(0),
|
||||
waker_clones: AtomicUsize::new(0),
|
||||
waker_drops: AtomicUsize::new(0),
|
||||
self_wakes: AtomicUsize::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn poll_duration_histogram(
|
||||
&self,
|
||||
) -> console_api::tasks::task_details::PollTimesHistogram {
|
||||
@ -75,6 +227,104 @@ impl TaskStats {
|
||||
.to_proto();
|
||||
console_api::tasks::task_details::PollTimesHistogram::Histogram(hist)
|
||||
}
|
||||
|
||||
pub(crate) fn record_wake_op(&self, op: event::WakeOp, at: Instant) {
|
||||
use event::WakeOp;
|
||||
match op {
|
||||
WakeOp::Wake { self_wake } => {
|
||||
self.waker_drops.fetch_add(1, Ordering::Release);
|
||||
self.wake(at, self_wake);
|
||||
}
|
||||
WakeOp::WakeByRef { self_wake } => {
|
||||
self.wake(at, self_wake);
|
||||
}
|
||||
WakeOp::Clone => {
|
||||
self.waker_clones.fetch_add(1, Ordering::Release);
|
||||
}
|
||||
WakeOp::Drop => {
|
||||
self.waker_drops.fetch_add(1, Ordering::Release);
|
||||
}
|
||||
}
|
||||
self.make_dirty();
|
||||
}
|
||||
|
||||
fn wake(&self, at: Instant, self_wake: bool) {
|
||||
let mut timestamps = self.timestamps.lock().unwrap();
|
||||
timestamps.last_wake = cmp::max(timestamps.last_wake, Some(at));
|
||||
if self_wake {
|
||||
self.wakes.fetch_add(1, Ordering::Release);
|
||||
}
|
||||
self.wakes.fetch_add(1, Ordering::Release);
|
||||
}
|
||||
|
||||
pub(crate) fn start_poll(&self, at: Instant) {
|
||||
self.poll_stats.start_poll(at);
|
||||
self.make_dirty();
|
||||
}
|
||||
|
||||
pub(crate) fn end_poll(&self, at: Instant) {
|
||||
self.poll_stats.end_poll(at);
|
||||
self.make_dirty();
|
||||
}
|
||||
|
||||
pub(crate) fn drop_task(&self, dropped_at: Instant) {
|
||||
if self.is_dropped.swap(true, Ordering::AcqRel) {
|
||||
// The task was already dropped.
|
||||
// TODO(eliza): this could maybe panic in debug mode...
|
||||
return;
|
||||
}
|
||||
|
||||
let mut timestamps = self.timestamps.lock().unwrap();
|
||||
let _prev = timestamps.dropped_at.replace(dropped_at);
|
||||
debug_assert_eq!(_prev, None, "tried to drop a task twice; this is a bug!");
|
||||
self.make_dirty();
|
||||
}
|
||||
|
||||
fn make_dirty(&self) {
|
||||
self.is_dirty.swap(true, Ordering::AcqRel);
|
||||
}
|
||||
}
|
||||
|
||||
impl ToProto for TaskStats {
|
||||
type Output = console_api::tasks::Stats;
|
||||
|
||||
fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output {
|
||||
let poll_stats = Some(self.poll_stats.to_proto(base_time));
|
||||
let timestamps = self.timestamps.lock().unwrap();
|
||||
console_api::tasks::Stats {
|
||||
poll_stats,
|
||||
created_at: Some(base_time.to_timestamp(self.created_at)),
|
||||
dropped_at: timestamps.dropped_at.map(|at| base_time.to_timestamp(at)),
|
||||
wakes: self.wakes.load(Ordering::Acquire) as u64,
|
||||
waker_clones: self.waker_clones.load(Ordering::Acquire) as u64,
|
||||
self_wakes: self.self_wakes.load(Ordering::Acquire) as u64,
|
||||
waker_drops: self.waker_drops.load(Ordering::Acquire) as u64,
|
||||
last_wake: timestamps.last_wake.map(|at| base_time.to_timestamp(at)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Unsent for TaskStats {
|
||||
#[inline]
|
||||
fn take_unsent(&self) -> bool {
|
||||
self.is_dirty.swap(false, Ordering::AcqRel)
|
||||
}
|
||||
|
||||
fn is_unsent(&self) -> bool {
|
||||
self.is_dirty.load(Ordering::Acquire)
|
||||
}
|
||||
}
|
||||
|
||||
impl DroppedAt for TaskStats {
|
||||
fn dropped_at(&self) -> Option<Instant> {
|
||||
// avoid acquiring the lock if we know we haven't tried to drop this
|
||||
// thing yet
|
||||
if self.is_dropped.load(Ordering::Acquire) {
|
||||
return self.timestamps.lock().unwrap().dropped_at;
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Stats associated with an async operation.
|
||||
@ -97,6 +347,98 @@ pub(crate) struct AsyncOpStats {
|
||||
poll_stats: PollStats<()>,
|
||||
}
|
||||
|
||||
impl AsyncOpStats {
|
||||
pub(crate) fn new(
|
||||
created_at: Instant,
|
||||
inherit_child_attributes: bool,
|
||||
parent_id: Option<span::Id>,
|
||||
) -> Self {
|
||||
Self {
|
||||
task_id: AtomicCell::new(0),
|
||||
stats: ResourceStats::new(created_at, inherit_child_attributes, parent_id),
|
||||
poll_stats: PollStats::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn task_id(&self) -> Option<u64> {
|
||||
let id = self.task_id.load();
|
||||
if id > 0 {
|
||||
Some(id as u64)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_task_id(&self, id: &tracing::span::Id) {
|
||||
self.task_id.store(id.into_u64());
|
||||
self.make_dirty();
|
||||
}
|
||||
|
||||
pub(crate) fn drop_async_op(&self, dropped_at: Instant) {
|
||||
self.stats.drop_resource(dropped_at)
|
||||
}
|
||||
|
||||
pub(crate) fn start_poll(&self, at: Instant) {
|
||||
self.poll_stats.start_poll(at);
|
||||
self.make_dirty();
|
||||
}
|
||||
|
||||
pub(crate) fn end_poll(&self, at: Instant) {
|
||||
self.poll_stats.end_poll(at);
|
||||
self.make_dirty();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn make_dirty(&self) {
|
||||
self.stats.make_dirty()
|
||||
}
|
||||
}
|
||||
|
||||
impl Unsent for AsyncOpStats {
|
||||
#[inline]
|
||||
fn take_unsent(&self) -> bool {
|
||||
self.stats.take_unsent()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn is_unsent(&self) -> bool {
|
||||
self.stats.is_unsent()
|
||||
}
|
||||
}
|
||||
|
||||
impl DroppedAt for AsyncOpStats {
|
||||
fn dropped_at(&self) -> Option<Instant> {
|
||||
self.stats.dropped_at()
|
||||
}
|
||||
}
|
||||
|
||||
impl ToProto for AsyncOpStats {
|
||||
type Output = console_api::async_ops::Stats;
|
||||
|
||||
fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output {
|
||||
let attributes = self
|
||||
.stats
|
||||
.attributes
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.cloned()
|
||||
.collect();
|
||||
console_api::async_ops::Stats {
|
||||
poll_stats: Some(self.poll_stats.to_proto(base_time)),
|
||||
created_at: Some(base_time.to_timestamp(self.stats.created_at)),
|
||||
dropped_at: self
|
||||
.stats
|
||||
.dropped_at
|
||||
.lock()
|
||||
.unwrap()
|
||||
.map(|at| base_time.to_timestamp(at)),
|
||||
task_id: self.task_id().map(Into::into),
|
||||
attributes,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Stats associated with a resource.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ResourceStats {
|
||||
@ -106,7 +448,92 @@ pub(crate) struct ResourceStats {
|
||||
dropped_at: Mutex<Option<Instant>>,
|
||||
attributes: Mutex<attribute::Attributes>,
|
||||
pub(crate) inherit_child_attributes: bool,
|
||||
pub(crate) parent_id: Option<Id>,
|
||||
pub(crate) parent_id: Option<span::Id>,
|
||||
}
|
||||
|
||||
impl ResourceStats {
|
||||
pub(crate) fn new(
|
||||
created_at: Instant,
|
||||
inherit_child_attributes: bool,
|
||||
parent_id: Option<span::Id>,
|
||||
) -> Self {
|
||||
Self {
|
||||
is_dirty: AtomicBool::new(true),
|
||||
is_dropped: AtomicBool::new(false),
|
||||
created_at,
|
||||
dropped_at: Mutex::new(None),
|
||||
attributes: Default::default(),
|
||||
inherit_child_attributes,
|
||||
parent_id,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn update_attribute(&self, id: &span::Id, update: &attribute::Update) {
|
||||
self.attributes.lock().unwrap().update(id, update);
|
||||
self.make_dirty();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn drop_resource(&self, dropped_at: Instant) {
|
||||
if self.is_dropped.swap(true, Ordering::AcqRel) {
|
||||
// The task was already dropped.
|
||||
// TODO(eliza): this could maybe panic in debug mode...
|
||||
return;
|
||||
}
|
||||
|
||||
let mut timestamp = self.dropped_at.lock().unwrap();
|
||||
let _prev = timestamp.replace(dropped_at);
|
||||
debug_assert_eq!(
|
||||
_prev, None,
|
||||
"tried to drop a resource/async op twice; this is a bug!"
|
||||
);
|
||||
self.make_dirty();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn make_dirty(&self) {
|
||||
self.is_dirty.swap(true, Ordering::AcqRel);
|
||||
}
|
||||
}
|
||||
|
||||
impl Unsent for ResourceStats {
|
||||
#[inline]
|
||||
fn take_unsent(&self) -> bool {
|
||||
self.is_dirty.swap(false, Ordering::AcqRel)
|
||||
}
|
||||
|
||||
fn is_unsent(&self) -> bool {
|
||||
self.is_dirty.load(Ordering::Acquire)
|
||||
}
|
||||
}
|
||||
|
||||
impl DroppedAt for ResourceStats {
|
||||
fn dropped_at(&self) -> Option<Instant> {
|
||||
// avoid acquiring the lock if we know we haven't tried to drop this
|
||||
// thing yet
|
||||
if self.is_dropped.load(Ordering::Acquire) {
|
||||
return *self.dropped_at.lock().unwrap();
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl ToProto for ResourceStats {
|
||||
type Output = console_api::resources::Stats;
|
||||
|
||||
fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output {
|
||||
let attributes = self.attributes.lock().unwrap().values().cloned().collect();
|
||||
console_api::resources::Stats {
|
||||
created_at: Some(base_time.to_timestamp(self.created_at)),
|
||||
dropped_at: self
|
||||
.dropped_at
|
||||
.lock()
|
||||
.unwrap()
|
||||
.map(|at| base_time.to_timestamp(at)),
|
||||
attributes,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@ -159,3 +586,26 @@ impl Histogram {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RecordPoll for Histogram {
|
||||
fn record_poll_duration(&mut self, duration: Duration) {
|
||||
let mut duration_ns = duration.as_nanos() as u64;
|
||||
|
||||
// clamp the duration to the histogram's max value
|
||||
if duration_ns > self.max {
|
||||
self.outliers += 1;
|
||||
self.max_outlier = cmp::max(self.max_outlier, Some(duration_ns));
|
||||
duration_ns = self.max;
|
||||
}
|
||||
|
||||
self.histogram
|
||||
.record(duration_ns)
|
||||
.expect("duration has already been clamped to histogram max value")
|
||||
}
|
||||
}
|
||||
|
||||
impl RecordPoll for () {
|
||||
fn record_poll_duration(&mut self, _: Duration) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
529
runtime/console/src/visitors.rs
Normal file
529
runtime/console/src/visitors.rs
Normal file
@ -0,0 +1,529 @@
|
||||
use crate::{attribute, event};
|
||||
use console_api::resources::resource;
|
||||
use tracing::{field, span};
|
||||
use tracing_core::field::Visit;
|
||||
|
||||
const LOCATION_FILE: &str = "loc.file";
|
||||
const LOCATION_LINE: &str = "loc.line";
|
||||
const LOCATION_COLUMN: &str = "loc.col";
|
||||
const INHERIT_FIELD_NAME: &str = "inherits_child_attrs";
|
||||
|
||||
/// Used to extract the fields needed to construct
|
||||
/// an Event::Resource from the metadata of a tracing span
|
||||
/// that has the following shape:
|
||||
///
|
||||
/// tracing::trace_span!(
|
||||
/// "runtime.resource",
|
||||
/// concrete_type = "Sleep",
|
||||
/// kind = "timer",
|
||||
/// is_internal = true,
|
||||
/// inherits_child_attrs = true,
|
||||
/// );
|
||||
///
|
||||
/// Fields:
|
||||
/// concrete_type - indicates the concrete rust type for this resource
|
||||
/// kind - indicates the type of resource (i.e. timer, sync, io )
|
||||
/// is_internal - whether this is a resource type that is not exposed publicly (i.e. BatchSemaphore)
|
||||
/// inherits_child_attrs - whether this resource should inherit the state attributes of its children
|
||||
#[derive(Default)]
|
||||
pub(crate) struct ResourceVisitor {
|
||||
concrete_type: Option<String>,
|
||||
kind: Option<resource::Kind>,
|
||||
is_internal: bool,
|
||||
inherit_child_attrs: bool,
|
||||
line: Option<u32>,
|
||||
file: Option<String>,
|
||||
column: Option<u32>,
|
||||
}
|
||||
|
||||
pub(crate) struct ResourceVisitorResult {
|
||||
pub(crate) concrete_type: String,
|
||||
pub(crate) kind: resource::Kind,
|
||||
pub(crate) location: Option<console_api::Location>,
|
||||
pub(crate) is_internal: bool,
|
||||
pub(crate) inherit_child_attrs: bool,
|
||||
}
|
||||
|
||||
/// Used to extract all fields from the metadata
|
||||
/// of a tracing span
|
||||
pub(crate) struct FieldVisitor {
|
||||
fields: Vec<console_api::Field>,
|
||||
meta_id: console_api::MetaId,
|
||||
}
|
||||
|
||||
/// Used to extract the fields needed to construct
|
||||
/// an `Event::Spawn` from the metadata of a tracing span
|
||||
/// that has the following shape:
|
||||
///
|
||||
/// ```
|
||||
/// tracing::trace_span!(
|
||||
/// target: "tokio::task",
|
||||
/// "runtime.spawn",
|
||||
/// kind = "local",
|
||||
/// task.name = "some_name",
|
||||
/// loc.file = "some_file.rs",
|
||||
/// loc.line = 555,
|
||||
/// loc.col = 5,
|
||||
/// );
|
||||
/// ```
|
||||
///
|
||||
/// # Fields
|
||||
///
|
||||
/// This visitor has special behavior for `loc.line`, `loc.file`, and `loc.col`
|
||||
/// fields, which are interpreted as a Rust source code location where the task
|
||||
/// was spawned, if they are present. Other fields are recorded as arbitrary
|
||||
/// key-value pairs.
|
||||
pub(crate) struct TaskVisitor {
|
||||
field_visitor: FieldVisitor,
|
||||
line: Option<u32>,
|
||||
file: Option<String>,
|
||||
column: Option<u32>,
|
||||
}
|
||||
|
||||
/// Used to extract the fields needed to construct
|
||||
/// an Event::AsyncOp from the metadata of a tracing span
|
||||
/// that has the following shape:
|
||||
///
|
||||
/// tracing::trace_span!(
|
||||
/// "runtime.resource.async_op",
|
||||
/// source = "Sleep::new_timeout",
|
||||
/// );
|
||||
///
|
||||
/// Fields:
|
||||
/// source - the method which has created an instance of this async operation
|
||||
#[derive(Default)]
|
||||
pub(crate) struct AsyncOpVisitor {
|
||||
source: Option<String>,
|
||||
inherit_child_attrs: bool,
|
||||
}
|
||||
|
||||
/// Used to extract the fields needed to construct
|
||||
/// an Event::Waker from the metadata of a tracing span
|
||||
/// that has the following shape:
|
||||
///
|
||||
/// tracing::trace!(
|
||||
/// target: "tokio::task::waker",
|
||||
/// op = "waker.clone",
|
||||
/// task.id = id.into_u64(),
|
||||
/// );
|
||||
///
|
||||
/// Fields:
|
||||
/// task.id - the id of the task this waker will wake
|
||||
/// op - the operation associated with this waker event
|
||||
#[derive(Default)]
|
||||
pub(crate) struct WakerVisitor {
|
||||
id: Option<span::Id>,
|
||||
op: Option<event::WakeOp>,
|
||||
}
|
||||
|
||||
/// Used to extract the fields needed to construct
|
||||
/// an Event::PollOp from the metadata of a tracing event
|
||||
/// that has the following shape:
|
||||
///
|
||||
/// tracing::trace!(
|
||||
/// target: "runtime::resource::poll_op",
|
||||
/// op_name = "poll_elapsed",
|
||||
/// readiness = "pending"
|
||||
/// );
|
||||
///
|
||||
/// Fields:
|
||||
/// op_name - the name of this resource poll operation
|
||||
/// readiness - the result of invoking this poll op, describing its readiness
|
||||
#[derive(Default)]
|
||||
pub(crate) struct PollOpVisitor {
|
||||
op_name: Option<String>,
|
||||
is_ready: Option<bool>,
|
||||
}
|
||||
|
||||
/// Used to extract the fields needed to construct
|
||||
/// an Event::StateUpdate from the metadata of a tracing event
|
||||
/// that has the following shape:
|
||||
///
|
||||
/// tracing::trace!(
|
||||
/// target: "runtime::resource::state_update",
|
||||
/// duration = duration,
|
||||
/// duration.unit = "ms",
|
||||
/// duration.op = "override",
|
||||
/// );
|
||||
///
|
||||
/// Fields:
|
||||
/// attribute_name - a field value for a field that has the name of the resource attribute being updated
|
||||
/// value - the value for this update
|
||||
/// unit - the unit for the value being updated (e.g. ms, s, bytes)
|
||||
/// op - the operation that this update performs to the value of the resource attribute (one of: ovr, sub, add)
|
||||
pub(crate) struct StateUpdateVisitor {
|
||||
meta_id: console_api::MetaId,
|
||||
field: Option<console_api::Field>,
|
||||
unit: Option<String>,
|
||||
op: Option<attribute::UpdateOp>,
|
||||
}
|
||||
|
||||
impl ResourceVisitor {
|
||||
pub(crate) const RES_SPAN_NAME: &'static str = "runtime.resource";
|
||||
const RES_CONCRETE_TYPE_FIELD_NAME: &'static str = "concrete_type";
|
||||
const RES_VIZ_FIELD_NAME: &'static str = "is_internal";
|
||||
const RES_KIND_FIELD_NAME: &'static str = "kind";
|
||||
const RES_KIND_TIMER: &'static str = "timer";
|
||||
|
||||
pub(crate) fn result(self) -> Option<ResourceVisitorResult> {
|
||||
let concrete_type = self.concrete_type?;
|
||||
let kind = self.kind?;
|
||||
|
||||
let location = if self.file.is_some() && self.line.is_some() && self.column.is_some() {
|
||||
Some(console_api::Location {
|
||||
file: self.file,
|
||||
line: self.line,
|
||||
column: self.column,
|
||||
..Default::default()
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Some(ResourceVisitorResult {
|
||||
concrete_type,
|
||||
kind,
|
||||
location,
|
||||
is_internal: self.is_internal,
|
||||
inherit_child_attrs: self.inherit_child_attrs,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Visit for ResourceVisitor {
|
||||
fn record_debug(&mut self, _: &field::Field, _: &dyn std::fmt::Debug) {}
|
||||
|
||||
fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
|
||||
match field.name() {
|
||||
Self::RES_CONCRETE_TYPE_FIELD_NAME => self.concrete_type = Some(value.to_string()),
|
||||
Self::RES_KIND_FIELD_NAME => {
|
||||
let kind = Some(match value {
|
||||
Self::RES_KIND_TIMER => {
|
||||
resource::kind::Kind::Known(resource::kind::Known::Timer as i32)
|
||||
}
|
||||
other => resource::kind::Kind::Other(other.to_string()),
|
||||
});
|
||||
self.kind = Some(resource::Kind { kind });
|
||||
}
|
||||
LOCATION_FILE => self.file = Some(value.to_string()),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn record_bool(&mut self, field: &tracing_core::Field, value: bool) {
|
||||
match field.name() {
|
||||
Self::RES_VIZ_FIELD_NAME => self.is_internal = value,
|
||||
INHERIT_FIELD_NAME => self.inherit_child_attrs = value,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn record_u64(&mut self, field: &tracing_core::Field, value: u64) {
|
||||
match field.name() {
|
||||
LOCATION_LINE => self.line = Some(value as u32),
|
||||
LOCATION_COLUMN => self.column = Some(value as u32),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FieldVisitor {
|
||||
pub(crate) fn new(meta_id: console_api::MetaId) -> Self {
|
||||
FieldVisitor {
|
||||
fields: Vec::default(),
|
||||
meta_id,
|
||||
}
|
||||
}
|
||||
pub(crate) fn result(self) -> Vec<console_api::Field> {
|
||||
self.fields
|
||||
}
|
||||
}
|
||||
|
||||
impl TaskVisitor {
|
||||
pub(crate) fn new(meta_id: console_api::MetaId) -> Self {
|
||||
TaskVisitor {
|
||||
field_visitor: FieldVisitor::new(meta_id),
|
||||
line: None,
|
||||
file: None,
|
||||
column: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn result(self) -> (Vec<console_api::Field>, Option<console_api::Location>) {
|
||||
let fields = self.field_visitor.result();
|
||||
let location = if self.file.is_some() && self.line.is_some() && self.column.is_some() {
|
||||
Some(console_api::Location {
|
||||
file: self.file,
|
||||
line: self.line,
|
||||
column: self.column,
|
||||
..Default::default()
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
(fields, location)
|
||||
}
|
||||
}
|
||||
|
||||
impl Visit for TaskVisitor {
|
||||
fn record_debug(&mut self, field: &field::Field, value: &dyn std::fmt::Debug) {
|
||||
self.field_visitor.record_debug(field, value);
|
||||
}
|
||||
|
||||
fn record_i64(&mut self, field: &tracing_core::Field, value: i64) {
|
||||
self.field_visitor.record_i64(field, value);
|
||||
}
|
||||
|
||||
fn record_u64(&mut self, field: &tracing_core::Field, value: u64) {
|
||||
match field.name() {
|
||||
LOCATION_LINE => self.line = Some(value as u32),
|
||||
LOCATION_COLUMN => self.column = Some(value as u32),
|
||||
_ => self.field_visitor.record_u64(field, value),
|
||||
}
|
||||
}
|
||||
|
||||
fn record_bool(&mut self, field: &tracing_core::Field, value: bool) {
|
||||
self.field_visitor.record_bool(field, value);
|
||||
}
|
||||
|
||||
fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
|
||||
if field.name() == LOCATION_FILE {
|
||||
self.file = Some(value.to_string());
|
||||
} else {
|
||||
self.field_visitor.record_str(field, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Visit for FieldVisitor {
|
||||
fn record_debug(&mut self, field: &field::Field, value: &dyn std::fmt::Debug) {
|
||||
self.fields.push(console_api::Field {
|
||||
name: Some(field.name().into()),
|
||||
value: Some(value.into()),
|
||||
metadata_id: Some(self.meta_id.clone()),
|
||||
});
|
||||
}
|
||||
|
||||
fn record_i64(&mut self, field: &tracing_core::Field, value: i64) {
|
||||
self.fields.push(console_api::Field {
|
||||
name: Some(field.name().into()),
|
||||
value: Some(value.into()),
|
||||
metadata_id: Some(self.meta_id.clone()),
|
||||
});
|
||||
}
|
||||
|
||||
fn record_u64(&mut self, field: &tracing_core::Field, value: u64) {
|
||||
self.fields.push(console_api::Field {
|
||||
name: Some(field.name().into()),
|
||||
value: Some(value.into()),
|
||||
metadata_id: Some(self.meta_id.clone()),
|
||||
});
|
||||
}
|
||||
|
||||
fn record_bool(&mut self, field: &tracing_core::Field, value: bool) {
|
||||
self.fields.push(console_api::Field {
|
||||
name: Some(field.name().into()),
|
||||
value: Some(value.into()),
|
||||
metadata_id: Some(self.meta_id.clone()),
|
||||
});
|
||||
}
|
||||
|
||||
fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
|
||||
self.fields.push(console_api::Field {
|
||||
name: Some(field.name().into()),
|
||||
value: Some(value.into()),
|
||||
metadata_id: Some(self.meta_id.clone()),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncOpVisitor {
|
||||
pub(crate) const ASYNC_OP_SPAN_NAME: &'static str = "runtime.resource.async_op";
|
||||
const ASYNC_OP_SRC_FIELD_NAME: &'static str = "source";
|
||||
|
||||
pub(crate) fn result(self) -> Option<(String, bool)> {
|
||||
let inherit = self.inherit_child_attrs;
|
||||
self.source.map(|s| (s, inherit))
|
||||
}
|
||||
}
|
||||
|
||||
impl Visit for AsyncOpVisitor {
|
||||
fn record_debug(&mut self, _: &field::Field, _: &dyn std::fmt::Debug) {}
|
||||
|
||||
fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
|
||||
if field.name() == Self::ASYNC_OP_SRC_FIELD_NAME {
|
||||
self.source = Some(value.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
fn record_bool(&mut self, field: &tracing_core::Field, value: bool) {
|
||||
if field.name() == INHERIT_FIELD_NAME {
|
||||
self.inherit_child_attrs = value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl WakerVisitor {
|
||||
const WAKE: &'static str = "waker.wake";
|
||||
const WAKE_BY_REF: &'static str = "waker.wake_by_ref";
|
||||
const CLONE: &'static str = "waker.clone";
|
||||
const DROP: &'static str = "waker.drop";
|
||||
const TASK_ID_FIELD_NAME: &'static str = "task.id";
|
||||
|
||||
pub(crate) fn result(self) -> Option<(span::Id, event::WakeOp)> {
|
||||
self.id.zip(self.op)
|
||||
}
|
||||
}
|
||||
|
||||
impl Visit for WakerVisitor {
|
||||
fn record_debug(&mut self, _: &field::Field, _: &dyn std::fmt::Debug) {
|
||||
// don't care (yet?)
|
||||
}
|
||||
|
||||
fn record_u64(&mut self, field: &tracing_core::Field, value: u64) {
|
||||
if field.name() == Self::TASK_ID_FIELD_NAME {
|
||||
self.id = Some(span::Id::from_u64(value));
|
||||
}
|
||||
}
|
||||
|
||||
fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
|
||||
use crate::event::WakeOp;
|
||||
if field.name() == "op" {
|
||||
self.op = Some(match value {
|
||||
Self::WAKE => WakeOp::Wake { self_wake: false },
|
||||
Self::WAKE_BY_REF => WakeOp::WakeByRef { self_wake: false },
|
||||
Self::CLONE => WakeOp::Clone,
|
||||
Self::DROP => WakeOp::Drop,
|
||||
_ => return,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PollOpVisitor {
|
||||
pub(crate) const POLL_OP_EVENT_TARGET: &'static str = "runtime::resource::poll_op";
|
||||
const OP_NAME_FIELD_NAME: &'static str = "op_name";
|
||||
const OP_READINESS_FIELD_NAME: &'static str = "is_ready";
|
||||
|
||||
pub(crate) fn result(self) -> Option<(String, bool)> {
|
||||
let op_name = self.op_name?;
|
||||
let is_ready = self.is_ready?;
|
||||
Some((op_name, is_ready))
|
||||
}
|
||||
}
|
||||
|
||||
impl Visit for PollOpVisitor {
|
||||
fn record_debug(&mut self, _: &field::Field, _: &dyn std::fmt::Debug) {}
|
||||
|
||||
fn record_bool(&mut self, field: &tracing_core::Field, value: bool) {
|
||||
if field.name() == Self::OP_READINESS_FIELD_NAME {
|
||||
self.is_ready = Some(value)
|
||||
}
|
||||
}
|
||||
|
||||
fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
|
||||
if field.name() == Self::OP_NAME_FIELD_NAME {
|
||||
self.op_name = Some(value.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl StateUpdateVisitor {
|
||||
pub(crate) const RE_STATE_UPDATE_EVENT_TARGET: &'static str = "runtime::resource::state_update";
|
||||
pub(crate) const AO_STATE_UPDATE_EVENT_TARGET: &'static str =
|
||||
"runtime::resource::async_op::state_update";
|
||||
|
||||
const STATE_OP_SUFFIX: &'static str = ".op";
|
||||
const STATE_UNIT_SUFFIX: &'static str = ".unit";
|
||||
|
||||
const OP_ADD: &'static str = "add";
|
||||
const OP_SUB: &'static str = "sub";
|
||||
const OP_OVERRIDE: &'static str = "override";
|
||||
|
||||
pub(crate) fn new(meta_id: console_api::MetaId) -> Self {
|
||||
StateUpdateVisitor {
|
||||
meta_id,
|
||||
field: None,
|
||||
unit: None,
|
||||
op: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn result(self) -> Option<attribute::Update> {
|
||||
Some(attribute::Update {
|
||||
field: self.field?,
|
||||
op: self.op,
|
||||
unit: self.unit,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Visit for StateUpdateVisitor {
|
||||
fn record_debug(&mut self, field: &field::Field, value: &dyn std::fmt::Debug) {
|
||||
if !field.name().ends_with(Self::STATE_OP_SUFFIX)
|
||||
&& !field.name().ends_with(Self::STATE_UNIT_SUFFIX)
|
||||
{
|
||||
self.field = Some(console_api::Field {
|
||||
name: Some(field.name().into()),
|
||||
value: Some(value.into()),
|
||||
metadata_id: Some(self.meta_id.clone()),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn record_i64(&mut self, field: &field::Field, value: i64) {
|
||||
if !field.name().ends_with(Self::STATE_OP_SUFFIX)
|
||||
&& !field.name().ends_with(Self::STATE_UNIT_SUFFIX)
|
||||
{
|
||||
self.field = Some(console_api::Field {
|
||||
name: Some(field.name().into()),
|
||||
value: Some(value.into()),
|
||||
metadata_id: Some(self.meta_id.clone()),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn record_u64(&mut self, field: &field::Field, value: u64) {
|
||||
if !field.name().ends_with(Self::STATE_OP_SUFFIX)
|
||||
&& !field.name().ends_with(Self::STATE_UNIT_SUFFIX)
|
||||
{
|
||||
self.field = Some(console_api::Field {
|
||||
name: Some(field.name().into()),
|
||||
value: Some(value.into()),
|
||||
metadata_id: Some(self.meta_id.clone()),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn record_bool(&mut self, field: &field::Field, value: bool) {
|
||||
if !field.name().ends_with(Self::STATE_OP_SUFFIX)
|
||||
&& !field.name().ends_with(Self::STATE_UNIT_SUFFIX)
|
||||
{
|
||||
self.field = Some(console_api::Field {
|
||||
name: Some(field.name().into()),
|
||||
value: Some(value.into()),
|
||||
metadata_id: Some(self.meta_id.clone()),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn record_str(&mut self, field: &field::Field, value: &str) {
|
||||
if field.name().ends_with(Self::STATE_OP_SUFFIX) {
|
||||
match value {
|
||||
Self::OP_ADD => self.op = Some(attribute::UpdateOp::Add),
|
||||
Self::OP_SUB => self.op = Some(attribute::UpdateOp::Sub),
|
||||
Self::OP_OVERRIDE => self.op = Some(attribute::UpdateOp::Override),
|
||||
_ => {}
|
||||
};
|
||||
} else if field.name().ends_with(Self::STATE_UNIT_SUFFIX) {
|
||||
self.unit = Some(value.to_string());
|
||||
} else {
|
||||
self.field = Some(console_api::Field {
|
||||
name: Some(field.name().into()),
|
||||
value: Some(value.into()),
|
||||
metadata_id: Some(self.meta_id.clone()),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user