From 35c9f45f6d35711c7902ae4c97fbf9908482e819 Mon Sep 17 00:00:00 2001 From: Nadja Reitzenstein Date: Tue, 21 Jun 2022 19:11:57 +0200 Subject: [PATCH] More console features --- Cargo.lock | 62 +++++- runtime/console/Cargo.toml | 13 +- runtime/console/src/aggregate.rs | 369 ++++++++++++++++++++++++++++++- runtime/console/src/attribute.rs | 30 +++ runtime/console/src/callsites.rs | 2 +- runtime/console/src/event.rs | 38 ++++ runtime/console/src/lib.rs | 10 +- runtime/console/src/server.rs | 191 +++++++++++++--- runtime/console/src/stats.rs | 161 ++++++++++++++ 9 files changed, 838 insertions(+), 38 deletions(-) create mode 100644 runtime/console/src/attribute.rs create mode 100644 runtime/console/src/stats.rs diff --git a/Cargo.lock b/Cargo.lock index d815106..d9e4bbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,7 +8,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47feb9fbcef700639ef28e04ca2a87eab8161a01a075ee227b15c90143805462" dependencies = [ - "nom", + "nom 5.1.2", ] [[package]] @@ -670,10 +670,21 @@ dependencies = [ name = "console" version = "0.1.0" dependencies = [ + "async-channel", + "async-compat", + "async-io", + "async-net", + "async-oneshot", "console-api", "crossbeam-channel", + "crossbeam-utils", + "futures-util", + "hdrhistogram", "hyper", + "prost-types", "thread_local", + "tokio", + "tokio-util", "tonic", "tracing", "tracing-core", @@ -743,6 +754,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + [[package]] name = "criterion" version = "0.3.5" @@ -1126,6 +1146,16 @@ dependencies = [ "instant", ] +[[package]] +name = "flate2" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1422,6 +1452,20 @@ dependencies = [ "ahash", ] +[[package]] +name = "hdrhistogram" +version = "7.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31672b7011be2c4f7456c4ddbcb40e7e9a4a9fad8efe49a6ebaf5f307d0109c0" +dependencies = [ + "base64", + "byteorder", + "crossbeam-channel", + "flate2", + "nom 7.1.1", + "num-traits", +] + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1832,6 +1876,12 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.5.3" @@ -1888,6 +1938,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "nom" +version = "7.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8903e5a29a317527874d0402f867152a3d21c908bb0b933e416c65e301d4c36" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "ntapi" version = "0.3.7" diff --git a/runtime/console/Cargo.toml b/runtime/console/Cargo.toml index 592b083..fd823e8 100644 --- a/runtime/console/Cargo.toml +++ b/runtime/console/Cargo.toml @@ -7,10 +7,21 @@ edition = "2021" [dependencies] console-api = "0.3" +prost-types = "0.10" tonic = { version = "0.7.2", default_features = false, features = [] } hyper = { version = "0.14", default_features = false, features = ["http2", "server", "stream"] } thread_local = "1.1" tracing = "0.1" tracing-core = "0.1" tracing-subscriber = { version = "0.3", default_features = false, features = ["registry"] } -crossbeam-channel = "0.5" \ No newline at end of file +crossbeam-utils = "0.8" +crossbeam-channel = "0.5" +async-net = "1.6" +async-compat = "0.2" +async-channel = "1.6" +async-oneshot = "0.5" +async-io = "1.7" +tokio-util = "0.7" +futures-util = "0.3" +tokio = { version = "1.19", default_features = false, features = []} +hdrhistogram = "7.5" \ No newline at end of file diff --git a/runtime/console/src/aggregate.rs b/runtime/console/src/aggregate.rs index 1c0a5a1..25fb192 100644 --- a/runtime/console/src/aggregate.rs +++ b/runtime/console/src/aggregate.rs @@ -1,12 +1,375 @@ +use crate::server::{Watch, WatchRequest}; +use crate::stats::TimeAnchor; use crate::Event; -use crossbeam_channel::Receiver; +use crate::{server, stats}; +use console_api::{async_ops, instrument, resources, tasks}; +use crossbeam_channel::{Receiver, TryRecvError}; +use futures_util::{FutureExt, StreamExt}; +use std::num::NonZeroU64; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tracing_core::Metadata; + +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +pub(crate) struct Id(NonZeroU64); + +impl Id { + pub fn from_non_zero_u64(u: NonZeroU64) -> Self { + Self(u) + } +} + +impl Into for Id { + fn into(self) -> console_api::Id { + console_api::Id { id: self.0.into() } + } +} + +struct Resource { + id: Id, + is_dirty: AtomicBool, + parent_id: Option, + metadata: &'static Metadata<'static>, + concrete_type: String, + kind: resources::resource::Kind, + location: Option, + is_internal: bool, +} + +/// Represents static data for tasks +struct Task { + id: Id, + is_dirty: AtomicBool, + metadata: &'static Metadata<'static>, + fields: Vec, + location: Option, +} + +struct AsyncOp { + id: Id, + is_dirty: AtomicBool, + parent_id: Option, + resource_id: Id, + metadata: &'static Metadata<'static>, + source: String, +} + +#[derive(Copy, Clone, Eq, PartialEq)] +pub(crate) enum Include { + All, + UpdatedOnly, +} + +type IdMap = std::collections::HashMap; pub(crate) struct Aggregator { events: Receiver, + rpcs: async_channel::Receiver, + watchers: Vec>, + details_watchers: IdMap>>, + all_metadata: Vec, + new_metadata: Vec, + running: bool, + publish_interval: Duration, + base_time: TimeAnchor, + tasks: IdMap, + task_stats: IdMap>, + resources: IdMap, + resource_stats: IdMap>, + async_ops: IdMap, + async_op_stats: IdMap>, + poll_ops: Vec, } impl Aggregator { - pub fn new(events: Receiver) -> Self { - Self { events } + pub fn new(events: Receiver, rpcs: async_channel::Receiver) -> Self { + Self { + events, + rpcs, + watchers: Vec::new(), + details_watchers: IdMap::new(), + running: true, + publish_interval: Duration::from_secs(1), + all_metadata: Vec::new(), + new_metadata: Vec::new(), + base_time: TimeAnchor::new(), + tasks: IdMap::new(), + task_stats: IdMap::new(), + resources: IdMap::new(), + resource_stats: IdMap::new(), + async_ops: IdMap::new(), + async_op_stats: IdMap::new(), + poll_ops: Vec::new(), + } + } + + fn add_instrument_subscription(&mut self, subscription: Watch) { + tracing::debug!("new instrument subscription"); + + let task_update = Some(self.task_update(Include::All)); + let resource_update = Some(self.resource_update(Include::All)); + let async_op_update = Some(self.async_op_update(Include::All)); + let now = Instant::now(); + + let update = &instrument::Update { + task_update, + resource_update, + async_op_update, + now: Some(self.base_time.to_timestamp(now)), + new_metadata: Some(console_api::RegisterMetadata { + metadata: (self.all_metadata).clone(), + }), + }; + + // Send the initial state --- if this fails, the subscription is already dead + if subscription.update(update) { + self.watchers.push(subscription) + } + } + + /// Add the task details subscription to the watchers after sending the first update, + /// if the task is found. + fn add_task_detail_subscription( + &mut self, + watch_request: WatchRequest, + ) { + let WatchRequest { + id, + mut stream_sender, + buffer, + } = watch_request; + tracing::debug!(id = ?id, "new task details subscription"); + if let Some(stats) = self.task_stats.get(&id) { + let (tx, rx) = async_channel::bounded(buffer); + let subscription = Watch(tx); + let now = Some(self.base_time.to_timestamp(Instant::now())); + // Send back the stream receiver. + // Then send the initial state --- if this fails, the subscription is already dead. + if stream_sender.send(rx).is_ok() + && subscription.update(&console_api::tasks::TaskDetails { + task_id: Some(id.clone().into()), + now, + poll_times_histogram: Some(stats.poll_duration_histogram()), + }) + { + self.details_watchers + .entry(id.clone()) + .or_insert_with(Vec::new) + .push(subscription); + } + } + // If the task is not found, drop `stream_sender` which will result in a not found error + } + + fn task_update(&mut self, include: Include) -> tasks::TaskUpdate { + todo!() + } + + fn resource_update(&mut self, include: Include) -> resources::ResourceUpdate { + todo!() + } + + fn async_op_update(&mut self, include: Include) -> async_ops::AsyncOpUpdate { + todo!() + } + + pub async fn run(mut self) { + let mut timer = StreamExt::fuse(async_io::Timer::interval(self.publish_interval)); + loop { + let mut recv = self.rpcs.recv().fuse(); + let should_send: bool = futures_util::select! { + _ = timer.next() => self.running, + cmd = recv => { + match cmd { + Ok(server::Command::Instrument(subscription)) => { + self.add_instrument_subscription(subscription); + } + Ok(server::Command::WatchTaskDetail(request)) => { + } + Ok(server::Command::Pause) => { + self.running = false; + } + Ok(server::Command::Resume) => { + self.running = true; + } + Err(_) => { + tracing::debug!("rpc channel closed, exiting"); + return + } + } + false + }, + }; + + // drain and aggregate buffered events. + // + // Note: we *don't* want to actually await the call to `recv` --- we + // don't want the aggregator task to be woken on every event, + // because it will then be woken when its own `poll` calls are + // exited. that would result in a busy-loop. instead, we only want + // to be woken when the flush interval has elapsed, or when the + // channel is almost full. + let mut drained = false; + while let Ok(event) = self.events.try_recv() { + self.update_state(event); + } + if let Err(TryRecvError::Disconnected) = self.events.try_recv() { + tracing::debug!("event channel closed; terminating"); + return; + } + + // flush data to clients, if there are any currently subscribed + // watchers and we should send a new update. + if !self.watchers.is_empty() && should_send { + self.publish(); + } + } + } + + fn publish(&mut self) { + let new_metadata = if !self.new_metadata.is_empty() { + Some(console_api::RegisterMetadata { + metadata: std::mem::take(&mut self.new_metadata), + }) + } else { + None + }; + let task_update = Some(self.task_update(Include::UpdatedOnly)); + let resource_update = Some(self.resource_update(Include::UpdatedOnly)); + let async_op_update = Some(self.async_op_update(Include::UpdatedOnly)); + + let update = instrument::Update { + now: Some(self.base_time.to_timestamp(Instant::now())), + new_metadata, + task_update, + resource_update, + async_op_update, + }; + + //self.watchers.retain_and_shrink(|watch: &Watch| watch.update + // (&update)); + + let stats = &self.task_stats; + // Assuming there are much fewer task details subscribers than there are + // stats updates, iterate over `details_watchers` and compact the map. + /*self.details_watchers.retain_and_shrink(|id, watchers| { + if let Some(task_stats) = stats.get(id) { + let details = tasks::TaskDetails { + task_id: Some(id.clone().into()), + now: Some(self.base_time.to_timestamp(Instant::now())), + poll_times_histogram: Some(task_stats.poll_duration_histogram()), + }; + watchers.retain(|watch| watch.update(&details)); + !watchers.is_empty() + } else { + false + } + }); + + */ + } + + /// Update the current state with data from a single event. + fn update_state(&mut self, event: Event) { + // do state update + match event { + Event::Metadata(meta) => { + self.all_metadata.push(meta.into()); + self.new_metadata.push(meta.into()); + } + + Event::Spawn { + id, + metadata, + stats, + fields, + location, + } => { + self.tasks.insert( + id.clone(), + Task { + id: id.clone(), + is_dirty: AtomicBool::new(true), + metadata, + fields, + location, + // TODO: parents + }, + ); + + self.task_stats.insert(id, stats); + } + + Event::Resource { + id, + parent_id, + metadata, + kind, + concrete_type, + location, + is_internal, + stats, + } => { + self.resources.insert( + id.clone(), + Resource { + id: id.clone(), + is_dirty: AtomicBool::new(true), + parent_id, + kind, + metadata, + concrete_type, + location, + is_internal, + }, + ); + + self.resource_stats.insert(id, stats); + } + + Event::PollOp { + metadata, + resource_id, + op_name, + async_op_id, + task_id, + is_ready, + } => { + let poll_op = resources::PollOp { + metadata: Some(metadata.into()), + resource_id: Some(resource_id.into()), + name: op_name, + task_id: Some(task_id.into()), + async_op_id: Some(async_op_id.into()), + is_ready, + }; + + self.poll_ops.push(poll_op); + } + + Event::AsyncResourceOp { + id, + source, + resource_id, + metadata, + parent_id, + stats, + } => { + self.async_ops.insert( + id.clone(), + AsyncOp { + id: id.clone(), + is_dirty: AtomicBool::new(true), + resource_id, + metadata, + source, + parent_id, + }, + ); + + self.async_op_stats.insert(id, stats); + } + } } } diff --git a/runtime/console/src/attribute.rs b/runtime/console/src/attribute.rs new file mode 100644 index 0000000..55c2eb8 --- /dev/null +++ b/runtime/console/src/attribute.rs @@ -0,0 +1,30 @@ +use crate::aggregate::Id; +use std::collections::HashMap; + +#[derive(Debug, Default)] +pub(crate) struct Attributes { + attributes: HashMap, +} + +#[derive(Debug, Clone)] +pub(crate) struct Update { + pub(crate) field: console_api::Field, + pub(crate) op: Option, + pub(crate) unit: Option, +} + +#[derive(Debug, Clone)] +pub(crate) enum UpdateOp { + Add, + Override, + Sub, +} + +/// Represents a key for a `proto::field::Name`. Because the +/// proto::field::Name might not be unique we also include the +/// resource id in this key +#[derive(Debug, Hash, PartialEq, Eq)] +struct FieldKey { + update_id: Id, + field_name: console_api::field::Name, +} diff --git a/runtime/console/src/callsites.rs b/runtime/console/src/callsites.rs index 35a0da4..803a86a 100644 --- a/runtime/console/src/callsites.rs +++ b/runtime/console/src/callsites.rs @@ -36,7 +36,7 @@ impl Callsites { while { for cs in &self.array[idx..end] { let ptr = cs.load(Ordering::Acquire); - let meta = unsafe { ptr as *const _ as &Metadata<'static> }; + let meta = unsafe { &*ptr }; if meta.callsite() == callsite.callsite() { return true; } diff --git a/runtime/console/src/event.rs b/runtime/console/src/event.rs index 65b316f..d681715 100644 --- a/runtime/console/src/event.rs +++ b/runtime/console/src/event.rs @@ -1,5 +1,43 @@ +use crate::aggregate::Id; +use crate::stats; +use console_api::resources; +use std::sync::Arc; use tracing_core::Metadata; pub(crate) enum Event { Metadata(&'static Metadata<'static>), + Spawn { + id: Id, + metadata: &'static Metadata<'static>, + stats: Arc, + fields: Vec, + location: Option, + }, + Resource { + id: Id, + parent_id: Option, + metadata: &'static Metadata<'static>, + concrete_type: String, + kind: resources::resource::Kind, + location: Option, + is_internal: bool, + stats: Arc, + }, + PollOp { + metadata: &'static Metadata<'static>, + resource_id: Id, + op_name: String, + async_op_id: Id, + task_id: Id, + is_ready: bool, + }, + AsyncResourceOp { + id: Id, + parent_id: Option, + resource_id: Id, + metadata: &'static Metadata<'static>, + source: String, + + stats: Arc, + }, } diff --git a/runtime/console/src/lib.rs b/runtime/console/src/lib.rs index b0b8688..de46432 100644 --- a/runtime/console/src/lib.rs +++ b/runtime/console/src/lib.rs @@ -13,10 +13,12 @@ use tracing_subscriber::registry::LookupSpan; use tracing_subscriber::Layer; mod aggregate; +mod attribute; mod callsites; mod event; mod server; mod stack; +mod stats; use crate::aggregate::Aggregator; use crate::callsites::Callsites; @@ -46,6 +48,8 @@ pub struct Builder { /// A smaller number will reduce the memory footprint but may lead to more events being dropped /// during activity bursts. event_buffer_capacity: usize, + + client_buffer_capacity: usize, } impl Builder { pub fn build(self) -> (ConsoleLayer, Server) { @@ -59,6 +63,7 @@ impl Default for Builder { server_addr: Server::DEFAULT_ADDR, server_port: Server::DEFAULT_PORT, event_buffer_capacity: ConsoleLayer::DEFAULT_EVENT_BUFFER_CAPACITY, + client_buffer_capacity: 1024, } } } @@ -85,8 +90,9 @@ impl ConsoleLayer { let (tx, events) = crossbeam_channel::bounded(config.event_buffer_capacity); let shared = Arc::new(Shared::default()); - let aggregator = Aggregator::new(events); - let server = Server::new(aggregator); + let (subscribe, rpcs) = async_channel::bounded(config.client_buffer_capacity); + let aggregator = Aggregator::new(events, rpcs); + let server = Server::new(aggregator, config.client_buffer_capacity, subscribe); let layer = Self { current_spans: ThreadLocal::new(), tx, diff --git a/runtime/console/src/server.rs b/runtime/console/src/server.rs index 2bf0ba1..5591206 100644 --- a/runtime/console/src/server.rs +++ b/runtime/console/src/server.rs @@ -1,25 +1,93 @@ +use crate::aggregate::Id; use crate::Aggregator; +use async_channel::{Receiver, Sender}; +use console_api::instrument; use console_api::instrument::instrument_server::{Instrument, InstrumentServer}; -use console_api::instrument::{ - InstrumentRequest, PauseRequest, PauseResponse, ResumeRequest, ResumeResponse, - TaskDetailsRequest, -}; +use console_api::tasks; +use futures_util::TryStreamExt; use std::error::Error; -use std::net::{IpAddr, Ipv6Addr}; +use std::io::IoSlice; +use std::net::{IpAddr, Ipv6Addr, SocketAddr}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::AsyncRead as TokioAsyncRead; +use tokio::io::{AsyncWrite as TokioAsyncWrite, ReadBuf}; +use tonic::transport::server::Connected; +use tonic::Status; + +struct StreamWrapper(T); +impl Connected for StreamWrapper { + type ConnectInfo = (); + + fn connect_info(&self) -> Self::ConnectInfo { + () + } +} +impl TokioAsyncWrite for StreamWrapper { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + TokioAsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + TokioAsyncWrite::poll_flush(Pin::new(&mut self.0), cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + TokioAsyncWrite::poll_write_vectored(Pin::new(&mut self.0), cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + TokioAsyncWrite::is_write_vectored(&self.0) + } +} +impl TokioAsyncRead for StreamWrapper { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + TokioAsyncRead::poll_read(Pin::new(&mut self.0), cx, buf) + } +} pub struct Server { aggregator: Aggregator, client_buffer_size: usize, + subscribe: Sender, } impl Server { pub(crate) const DEFAULT_ADDR: IpAddr = IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)); pub(crate) const DEFAULT_PORT: u16 = 49289; - pub(crate) fn new(aggregator: Aggregator, client_buffer_size: usize) -> Self { + pub(crate) fn new( + aggregator: Aggregator, + client_buffer_size: usize, + subscribe: Sender, + ) -> Self { + let subscribe = todo!(); Self { aggregator, client_buffer_size, + subscribe, } } @@ -32,8 +100,12 @@ impl Server { let svc = InstrumentServer::new(self); // The gRPC server task; requires a `Stream` of `tokio::AsyncRead + tokio::AsyncWrite`. - // TODO: Pass an async listening socket that implements the tokio versions of Read/Write - let incoming = todo!(); + let listener = + async_net::TcpListener::bind(SocketAddr::new(Self::DEFAULT_ADDR, Self::DEFAULT_PORT)) + .await?; + let incoming = listener + .incoming() + .map_ok(|stream| StreamWrapper(async_compat::Compat::new(stream))); tonic::transport::Server::builder() .add_service(svc) .serve_with_incoming(incoming) @@ -45,51 +117,110 @@ impl Server { } } +pub(crate) struct Watch(pub(crate) Sender>); +impl Watch { + pub fn update(&self, update: &T) -> bool { + self.0.try_send(Ok(update.clone())).is_ok() + } +} + +pub(crate) struct WatchRequest { + pub id: Id, + pub stream_sender: async_oneshot::Sender>>, + pub buffer: usize, +} + +pub(crate) enum Command { + Instrument(Watch), + WatchTaskDetail(WatchRequest), + Pause, + Resume, +} + #[tonic::async_trait] impl Instrument for Server { - type WatchUpdatesStream = (); + type WatchUpdatesStream = async_channel::Receiver>; async fn watch_updates( &self, - request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { - /* match request.remote_addr() { Some(addr) => tracing::debug!(client.addr = %addr, "starting a new watch"), None => tracing::debug!(client.addr = %"", "starting a new watch"), } - let permit = self.subscribe.reserve().await.map_err(|_| { - tonic::Status::internal("cannot start new watch, aggregation task is not running") - })?; - let (tx, rx) = mpsc::channel(self.client_buffer); - permit.send(Command::Instrument(Watch(tx))); - tracing::debug!("watch started"); - let stream = tokio_stream::wrappers::ReceiverStream::new(rx); - Ok(tonic::Response::new(stream)) - */ - todo!() + + if !self.subscribe.is_full() { + let (tx, rx) = async_channel::bounded(self.client_buffer_size); + self.subscribe.send(Command::Instrument(Watch(tx))).await; + tracing::debug!("watch started"); + Ok(tonic::Response::new(rx)) + } else { + Err(tonic::Status::internal( + "cannot start new watch, aggregation task is not running", + )) + } } - type WatchTaskDetailsStream = (); + type WatchTaskDetailsStream = async_channel::Receiver>; async fn watch_task_details( &self, - request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { - todo!() + let task_id = request + .into_inner() + .id + .ok_or_else(|| tonic::Status::invalid_argument("missing task_id"))? + .id; + + // `tracing` reserves span ID 0 for niche optimization for `Option`. + let id = std::num::NonZeroU64::new(task_id) + .map(Id::from_non_zero_u64) + .ok_or_else(|| tonic::Status::invalid_argument("task_id cannot be 0"))?; + + if !self.subscribe.is_full() { + // Check with the aggregator task to request a stream if the task exists. + let (stream_sender, stream_recv) = async_oneshot::oneshot(); + self.subscribe + .send(Command::WatchTaskDetail(WatchRequest { + id, + stream_sender, + buffer: self.client_buffer_size, + })) + .await; + // If the aggregator drops the sender, the task doesn't exist. + let rx = stream_recv.await.map_err(|_| { + tracing::warn!(id = ?task_id, "requested task not found"); + tonic::Status::not_found("task not found") + })?; + + tracing::debug!(id = ?task_id, "task details watch started"); + Ok(tonic::Response::new(rx)) + } else { + Err(tonic::Status::internal( + "cannot start new watch, aggregation task is not running", + )) + } } async fn pause( &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - todo!() + _request: tonic::Request, + ) -> Result, tonic::Status> { + self.subscribe.send(Command::Pause).await.map_err(|_| { + tonic::Status::internal("cannot pause, aggregation task is not running") + })?; + Ok(tonic::Response::new(instrument::PauseResponse {})) } async fn resume( &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - todo!() + _request: tonic::Request, + ) -> Result, tonic::Status> { + self.subscribe.send(Command::Resume).await.map_err(|_| { + tonic::Status::internal("cannot resume, aggregation task is not running") + })?; + Ok(tonic::Response::new(instrument::ResumeResponse {})) } } diff --git a/runtime/console/src/stats.rs b/runtime/console/src/stats.rs new file mode 100644 index 0000000..d2bf2af --- /dev/null +++ b/runtime/console/src/stats.rs @@ -0,0 +1,161 @@ +use crate::aggregate::Id; +use crate::attribute; +use crossbeam_utils::atomic::AtomicCell; +use hdrhistogram::serialization::{Serializer, V2Serializer}; +use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::sync::Mutex; +use std::time::{Duration, Instant, SystemTime}; + +/// Anchors an `Instant` with a `SystemTime` timestamp to allow converting +/// monotonic `Instant`s into timestamps that can be sent over the wire. +#[derive(Debug, Clone)] +pub(crate) struct TimeAnchor { + mono: Instant, + sys: SystemTime, +} + +impl TimeAnchor { + pub(crate) fn new() -> Self { + Self { + mono: Instant::now(), + sys: SystemTime::now(), + } + } + + pub(crate) fn to_system_time(&self, t: Instant) -> SystemTime { + let dur = t + .checked_duration_since(self.mono) + .unwrap_or_else(|| Duration::from_secs(0)); + self.sys + dur + } + + pub(crate) fn to_timestamp(&self, t: Instant) -> prost_types::Timestamp { + self.to_system_time(t).into() + } +} + +#[derive(Debug, Default)] +struct PollStats { + /// The number of polls in progress + current_polls: AtomicUsize, + /// The total number of polls + polls: AtomicUsize, + timestamps: Mutex>, +} + +/// Stats associated with a task. +#[derive(Debug)] +pub(crate) struct TaskStats { + is_dirty: AtomicBool, + is_dropped: AtomicBool, + // task stats + pub(crate) created_at: Instant, + timestamps: Mutex, + + // waker stats + wakes: AtomicUsize, + waker_clones: AtomicUsize, + waker_drops: AtomicUsize, + self_wakes: AtomicUsize, + + /// Poll durations and other stats. + poll_stats: PollStats, +} + +impl TaskStats { + pub(crate) fn poll_duration_histogram( + &self, + ) -> console_api::tasks::task_details::PollTimesHistogram { + let hist = self + .poll_stats + .timestamps + .lock() + .unwrap() + .histogram + .to_proto(); + console_api::tasks::task_details::PollTimesHistogram::Histogram(hist) + } +} + +/// Stats associated with an async operation. +/// +/// This shares all of the same fields as [`ResourceStats]`, with the addition +/// of [`PollStats`] tracking when the async operation is polled, and the task +/// ID of the last task to poll the async op. +#[derive(Debug)] +pub(crate) struct AsyncOpStats { + /// The task ID of the last task to poll this async op. + /// + /// This is set every time the async op is polled, in case a future is + /// passed between tasks. + task_id: AtomicCell, + + /// Fields shared with `ResourceStats`. + pub(crate) stats: ResourceStats, + + /// Poll durations and other stats. + poll_stats: PollStats<()>, +} + +/// Stats associated with a resource. +#[derive(Debug)] +pub(crate) struct ResourceStats { + is_dirty: AtomicBool, + is_dropped: AtomicBool, + created_at: Instant, + dropped_at: Mutex>, + attributes: Mutex, + pub(crate) inherit_child_attributes: bool, + pub(crate) parent_id: Option, +} + +#[derive(Debug, Default)] +struct TaskTimestamps { + dropped_at: Option, + last_wake: Option, +} + +#[derive(Debug, Default)] +struct PollTimestamps { + first_poll: Option, + last_poll_started: Option, + last_poll_ended: Option, + busy_time: Duration, + histogram: H, +} + +#[derive(Debug)] +struct Histogram { + histogram: hdrhistogram::Histogram, + max: u64, + outliers: u64, + max_outlier: Option, +} + +impl Histogram { + fn new(max: u64) -> Self { + // significant figures should be in the [0-5] range and memory usage + // grows exponentially with higher a sigfig + let histogram = hdrhistogram::Histogram::new_with_max(max, 2).unwrap(); + Self { + histogram, + max, + max_outlier: None, + outliers: 0, + } + } + + fn to_proto(&self) -> console_api::tasks::DurationHistogram { + let mut serializer = V2Serializer::new(); + let mut raw_histogram = Vec::new(); + serializer + .serialize(&self.histogram, &mut raw_histogram) + .expect("histogram failed to serialize"); + console_api::tasks::DurationHistogram { + raw_histogram, + max_value: self.max, + high_outliers: self.outliers, + highest_outlier: self.max_outlier, + } + } +}