use crate::id_map::{IdMap, ToProto}; use crate::server::{Watch, WatchRequest}; use crate::stats::{TimeAnchor, Unsent}; use crate::{server, stats}; use crate::{Event, Shared}; use console_api::{async_ops, instrument, resources, tasks}; use crossbeam_channel::{Receiver, TryRecvError}; use futures_util::{FutureExt, StreamExt}; use std::collections::HashMap; use std::num::NonZeroU64; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; use tracing::span; use tracing_core::Metadata; #[derive(Debug)] struct Resource { id: span::Id, is_dirty: AtomicBool, parent_id: Option, metadata: &'static Metadata<'static>, concrete_type: String, kind: resources::resource::Kind, location: Option, is_internal: bool, } /// Represents static data for tasks #[derive(Debug)] struct Task { id: span::Id, is_dirty: AtomicBool, metadata: &'static Metadata<'static>, fields: Vec, location: Option, } #[derive(Debug)] struct AsyncOp { id: span::Id, is_dirty: AtomicBool, parent_id: Option, resource_id: span::Id, metadata: &'static Metadata<'static>, source: String, } impl ToProto for Task { type Output = tasks::Task; fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output { tasks::Task { id: Some(self.id.clone().into()), // TODO: more kinds of tasks... kind: tasks::task::Kind::Spawn as i32, metadata: Some(self.metadata.into()), parents: Vec::new(), // TODO: implement parents nicely fields: self.fields.clone(), location: self.location.clone(), } } } impl Unsent for Task { fn take_unsent(&self) -> bool { self.is_dirty.swap(false, Ordering::AcqRel) } fn is_unsent(&self) -> bool { self.is_dirty.load(Ordering::Acquire) } } impl ToProto for Resource { type Output = resources::Resource; fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output { resources::Resource { id: Some(self.id.clone().into()), parent_resource_id: self.parent_id.clone().map(Into::into), kind: Some(self.kind.clone()), metadata: Some(self.metadata.into()), concrete_type: self.concrete_type.clone(), location: self.location.clone(), is_internal: self.is_internal, } } } impl Unsent for Resource { fn take_unsent(&self) -> bool { self.is_dirty.swap(false, Ordering::AcqRel) } fn is_unsent(&self) -> bool { self.is_dirty.load(Ordering::Acquire) } } impl ToProto for AsyncOp { type Output = async_ops::AsyncOp; fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output { async_ops::AsyncOp { id: Some(self.id.clone().into()), metadata: Some(self.metadata.into()), resource_id: Some(self.resource_id.clone().into()), source: self.source.clone(), parent_async_op_id: self.parent_id.clone().map(Into::into), } } } impl Unsent for AsyncOp { fn take_unsent(&self) -> bool { self.is_dirty.swap(false, Ordering::AcqRel) } fn is_unsent(&self) -> bool { self.is_dirty.load(Ordering::Acquire) } } #[derive(Copy, Clone, Eq, PartialEq)] pub(crate) enum Include { All, UpdatedOnly, } #[derive(Debug)] pub struct Aggregator { shared: Arc, events: Receiver, rpcs: async_channel::Receiver, watchers: Vec>, details_watchers: HashMap>>, all_metadata: Vec, new_metadata: Vec, running: bool, publish_interval: Duration, base_time: TimeAnchor, tasks: IdMap, task_stats: IdMap>, resources: IdMap, resource_stats: IdMap>, async_ops: IdMap, async_op_stats: IdMap>, poll_ops: Vec, } impl Aggregator { pub(crate) fn new( shared: Arc, events: Receiver, rpcs: async_channel::Receiver, ) -> Self { Self { shared, events, rpcs, watchers: Vec::new(), details_watchers: HashMap::default(), running: true, publish_interval: Duration::from_secs(1), all_metadata: Vec::new(), new_metadata: Vec::new(), base_time: TimeAnchor::new(), tasks: IdMap::default(), task_stats: IdMap::default(), resources: IdMap::default(), resource_stats: IdMap::default(), async_ops: IdMap::default(), async_op_stats: IdMap::default(), poll_ops: Vec::new(), } } fn add_instrument_subscription(&mut self, subscription: Watch) { tracing::debug!("new instrument subscription"); let task_update = Some(self.task_update(Include::All)); let resource_update = Some(self.resource_update(Include::All)); let async_op_update = Some(self.async_op_update(Include::All)); let now = Instant::now(); let update = &instrument::Update { task_update, resource_update, async_op_update, now: Some(self.base_time.to_timestamp(now)), new_metadata: Some(console_api::RegisterMetadata { metadata: (self.all_metadata).clone(), }), }; // Send the initial state --- if this fails, the subscription is already dead if subscription.update(update) { self.watchers.push(subscription) } } /// Add the task details subscription to the watchers after sending the first update, /// if the task is found. fn add_task_detail_subscription( &mut self, watch_request: WatchRequest, ) { let WatchRequest { id, mut stream_sender, buffer, } = watch_request; tracing::debug!(id = ?id, "new task details subscription"); if let Some(stats) = self.task_stats.get(&id) { let (tx, rx) = async_channel::bounded(buffer); let subscription = Watch(tx); let now = Some(self.base_time.to_timestamp(Instant::now())); // Send back the stream receiver. // Then send the initial state --- if this fails, the subscription is already dead. if stream_sender.send(rx).is_ok() && subscription.update(&console_api::tasks::TaskDetails { task_id: Some(id.clone().into()), now, poll_times_histogram: Some(stats.poll_duration_histogram()), }) { self.details_watchers .entry(id.clone()) .or_insert_with(Vec::new) .push(subscription); } } // If the task is not found, drop `stream_sender` which will result in a not found error } fn task_update(&mut self, include: Include) -> tasks::TaskUpdate { tasks::TaskUpdate { new_tasks: self.tasks.as_proto_list(include, &self.base_time), stats_update: self.task_stats.as_proto(include, &self.base_time), dropped_events: self.shared.dropped_tasks.swap(0, Ordering::AcqRel) as u64, } } fn resource_update(&mut self, include: Include) -> resources::ResourceUpdate { let new_poll_ops = match include { Include::All => self.poll_ops.clone(), Include::UpdatedOnly => std::mem::take(&mut self.poll_ops), }; resources::ResourceUpdate { new_resources: self.resources.as_proto_list(include, &self.base_time), stats_update: self.resource_stats.as_proto(include, &self.base_time), new_poll_ops, dropped_events: self.shared.dropped_resources.swap(0, Ordering::AcqRel) as u64, } } fn async_op_update(&mut self, include: Include) -> async_ops::AsyncOpUpdate { async_ops::AsyncOpUpdate { new_async_ops: self.async_ops.as_proto_list(include, &self.base_time), stats_update: self.async_op_stats.as_proto(include, &self.base_time), dropped_events: self.shared.dropped_async_ops.swap(0, Ordering::AcqRel) as u64, } } pub async fn run(mut self) { let mut timer = StreamExt::fuse(async_io::Timer::interval(self.publish_interval)); loop { let mut recv = self.rpcs.recv().fuse(); let should_send: bool = futures_util::select! { _ = timer.next() => self.running, cmd = recv => { match cmd { Ok(server::Command::Instrument(subscription)) => { self.add_instrument_subscription(subscription); } Ok(server::Command::WatchTaskDetail(request)) => { } Ok(server::Command::Pause) => { self.running = false; } Ok(server::Command::Resume) => { self.running = true; } Err(_) => { tracing::debug!("rpc channel closed, exiting"); return } } false }, }; // drain and aggregate buffered events. // // Note: we *don't* want to actually await the call to `recv` --- we // don't want the aggregator task to be woken on every event, // because it will then be woken when its own `poll` calls are // exited. that would result in a busy-loop. instead, we only want // to be woken when the flush interval has elapsed, or when the // channel is almost full. while let Ok(event) = self.events.try_recv() { self.update_state(event); } if let Err(TryRecvError::Disconnected) = self.events.try_recv() { tracing::debug!("event channel closed; terminating"); return; } // flush data to clients, if there are any currently subscribed // watchers and we should send a new update. if !self.watchers.is_empty() && should_send { self.publish(); } } } fn publish(&mut self) { let new_metadata = if !self.new_metadata.is_empty() { Some(console_api::RegisterMetadata { metadata: std::mem::take(&mut self.new_metadata), }) } else { None }; let task_update = Some(self.task_update(Include::UpdatedOnly)); let resource_update = Some(self.resource_update(Include::UpdatedOnly)); let async_op_update = Some(self.async_op_update(Include::UpdatedOnly)); let update = instrument::Update { now: Some(self.base_time.to_timestamp(Instant::now())), new_metadata, task_update, resource_update, async_op_update, }; self.watchers .retain(|watch: &Watch| watch.update(&update)); let stats = &self.task_stats; // Assuming there are much fewer task details subscribers than there are // stats updates, iterate over `details_watchers` and compact the map. self.details_watchers.retain(|id, watchers| { if let Some(task_stats) = stats.get(id) { let details = tasks::TaskDetails { task_id: Some(id.clone().into()), now: Some(self.base_time.to_timestamp(Instant::now())), poll_times_histogram: Some(task_stats.poll_duration_histogram()), }; watchers.retain(|watch| watch.update(&details)); !watchers.is_empty() } else { false } }); } /// Update the current state with data from a single event. fn update_state(&mut self, event: Event) { // do state update match event { Event::Metadata(meta) => { self.all_metadata.push(meta.into()); self.new_metadata.push(meta.into()); } Event::Spawn { id, metadata, stats, fields, location, } => { self.tasks.insert( id.clone(), Task { id: id.clone(), is_dirty: AtomicBool::new(true), metadata, fields, location, // TODO: parents }, ); self.task_stats.insert(id, stats); } Event::Resource { id, parent_id, metadata, kind, concrete_type, location, is_internal, stats, } => { self.resources.insert( id.clone(), Resource { id: id.clone(), is_dirty: AtomicBool::new(true), parent_id, kind, metadata, concrete_type, location, is_internal, }, ); self.resource_stats.insert(id, stats); } Event::PollOp { metadata, resource_id, op_name, async_op_id, task_id, is_ready, } => { let poll_op = resources::PollOp { metadata: Some(metadata.into()), resource_id: Some(resource_id.into()), name: op_name, task_id: Some(task_id.into()), async_op_id: Some(async_op_id.into()), is_ready, }; self.poll_ops.push(poll_op); } Event::AsyncResourceOp { id, source, resource_id, metadata, parent_id, stats, } => { self.async_ops.insert( id.clone(), AsyncOp { id: id.clone(), is_dirty: AtomicBool::new(true), resource_id, metadata, source, parent_id, }, ); self.async_op_stats.insert(id, stats); } } } }