bffh/runtime/console/src/aggregate.rs
2022-06-22 19:01:58 +02:00

462 lines
15 KiB
Rust

use crate::id_map::{IdMap, ToProto};
use crate::server::{Watch, WatchRequest};
use crate::stats::{TimeAnchor, Unsent};
use crate::{server, stats};
use crate::{Event, Shared};
use console_api::{async_ops, instrument, resources, tasks};
use crossbeam_channel::{Receiver, TryRecvError};
use futures_util::{FutureExt, StreamExt};
use std::collections::HashMap;
use std::num::NonZeroU64;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::span;
use tracing_core::Metadata;
#[derive(Debug)]
struct Resource {
id: span::Id,
is_dirty: AtomicBool,
parent_id: Option<span::Id>,
metadata: &'static Metadata<'static>,
concrete_type: String,
kind: resources::resource::Kind,
location: Option<console_api::Location>,
is_internal: bool,
}
/// Represents static data for tasks
#[derive(Debug)]
struct Task {
id: span::Id,
is_dirty: AtomicBool,
metadata: &'static Metadata<'static>,
fields: Vec<console_api::Field>,
location: Option<console_api::Location>,
}
#[derive(Debug)]
struct AsyncOp {
id: span::Id,
is_dirty: AtomicBool,
parent_id: Option<span::Id>,
resource_id: span::Id,
metadata: &'static Metadata<'static>,
source: String,
}
impl ToProto for Task {
type Output = tasks::Task;
fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
tasks::Task {
id: Some(self.id.clone().into()),
// TODO: more kinds of tasks...
kind: tasks::task::Kind::Spawn as i32,
metadata: Some(self.metadata.into()),
parents: Vec::new(), // TODO: implement parents nicely
fields: self.fields.clone(),
location: self.location.clone(),
}
}
}
impl Unsent for Task {
fn take_unsent(&self) -> bool {
self.is_dirty.swap(false, Ordering::AcqRel)
}
fn is_unsent(&self) -> bool {
self.is_dirty.load(Ordering::Acquire)
}
}
impl ToProto for Resource {
type Output = resources::Resource;
fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
resources::Resource {
id: Some(self.id.clone().into()),
parent_resource_id: self.parent_id.clone().map(Into::into),
kind: Some(self.kind.clone()),
metadata: Some(self.metadata.into()),
concrete_type: self.concrete_type.clone(),
location: self.location.clone(),
is_internal: self.is_internal,
}
}
}
impl Unsent for Resource {
fn take_unsent(&self) -> bool {
self.is_dirty.swap(false, Ordering::AcqRel)
}
fn is_unsent(&self) -> bool {
self.is_dirty.load(Ordering::Acquire)
}
}
impl ToProto for AsyncOp {
type Output = async_ops::AsyncOp;
fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
async_ops::AsyncOp {
id: Some(self.id.clone().into()),
metadata: Some(self.metadata.into()),
resource_id: Some(self.resource_id.clone().into()),
source: self.source.clone(),
parent_async_op_id: self.parent_id.clone().map(Into::into),
}
}
}
impl Unsent for AsyncOp {
fn take_unsent(&self) -> bool {
self.is_dirty.swap(false, Ordering::AcqRel)
}
fn is_unsent(&self) -> bool {
self.is_dirty.load(Ordering::Acquire)
}
}
#[derive(Copy, Clone, Eq, PartialEq)]
pub(crate) enum Include {
All,
UpdatedOnly,
}
#[derive(Debug)]
pub struct Aggregator {
shared: Arc<Shared>,
events: Receiver<Event>,
rpcs: async_channel::Receiver<server::Command>,
watchers: Vec<Watch<instrument::Update>>,
details_watchers: HashMap<span::Id, Vec<Watch<tasks::TaskDetails>>>,
all_metadata: Vec<console_api::register_metadata::NewMetadata>,
new_metadata: Vec<console_api::register_metadata::NewMetadata>,
running: bool,
publish_interval: Duration,
base_time: TimeAnchor,
tasks: IdMap<Task>,
task_stats: IdMap<Arc<stats::TaskStats>>,
resources: IdMap<Resource>,
resource_stats: IdMap<Arc<stats::ResourceStats>>,
async_ops: IdMap<AsyncOp>,
async_op_stats: IdMap<Arc<stats::AsyncOpStats>>,
poll_ops: Vec<console_api::resources::PollOp>,
}
impl Aggregator {
pub(crate) fn new(
shared: Arc<Shared>,
events: Receiver<Event>,
rpcs: async_channel::Receiver<server::Command>,
) -> Self {
Self {
shared,
events,
rpcs,
watchers: Vec::new(),
details_watchers: HashMap::default(),
running: true,
publish_interval: Duration::from_secs(1),
all_metadata: Vec::new(),
new_metadata: Vec::new(),
base_time: TimeAnchor::new(),
tasks: IdMap::default(),
task_stats: IdMap::default(),
resources: IdMap::default(),
resource_stats: IdMap::default(),
async_ops: IdMap::default(),
async_op_stats: IdMap::default(),
poll_ops: Vec::new(),
}
}
fn add_instrument_subscription(&mut self, subscription: Watch<instrument::Update>) {
tracing::debug!("new instrument subscription");
let task_update = Some(self.task_update(Include::All));
let resource_update = Some(self.resource_update(Include::All));
let async_op_update = Some(self.async_op_update(Include::All));
let now = Instant::now();
let update = &instrument::Update {
task_update,
resource_update,
async_op_update,
now: Some(self.base_time.to_timestamp(now)),
new_metadata: Some(console_api::RegisterMetadata {
metadata: (self.all_metadata).clone(),
}),
};
// Send the initial state --- if this fails, the subscription is already dead
if subscription.update(update) {
self.watchers.push(subscription)
}
}
/// Add the task details subscription to the watchers after sending the first update,
/// if the task is found.
fn add_task_detail_subscription(
&mut self,
watch_request: WatchRequest<console_api::tasks::TaskDetails>,
) {
let WatchRequest {
id,
mut stream_sender,
buffer,
} = watch_request;
tracing::debug!(id = ?id, "new task details subscription");
if let Some(stats) = self.task_stats.get(&id) {
let (tx, rx) = async_channel::bounded(buffer);
let subscription = Watch(tx);
let now = Some(self.base_time.to_timestamp(Instant::now()));
// Send back the stream receiver.
// Then send the initial state --- if this fails, the subscription is already dead.
if stream_sender.send(rx).is_ok()
&& subscription.update(&console_api::tasks::TaskDetails {
task_id: Some(id.clone().into()),
now,
poll_times_histogram: Some(stats.poll_duration_histogram()),
})
{
self.details_watchers
.entry(id.clone())
.or_insert_with(Vec::new)
.push(subscription);
}
}
// If the task is not found, drop `stream_sender` which will result in a not found error
}
fn task_update(&mut self, include: Include) -> tasks::TaskUpdate {
tasks::TaskUpdate {
new_tasks: self.tasks.as_proto_list(include, &self.base_time),
stats_update: self.task_stats.as_proto(include, &self.base_time),
dropped_events: self.shared.dropped_tasks.swap(0, Ordering::AcqRel) as u64,
}
}
fn resource_update(&mut self, include: Include) -> resources::ResourceUpdate {
let new_poll_ops = match include {
Include::All => self.poll_ops.clone(),
Include::UpdatedOnly => std::mem::take(&mut self.poll_ops),
};
resources::ResourceUpdate {
new_resources: self.resources.as_proto_list(include, &self.base_time),
stats_update: self.resource_stats.as_proto(include, &self.base_time),
new_poll_ops,
dropped_events: self.shared.dropped_resources.swap(0, Ordering::AcqRel) as u64,
}
}
fn async_op_update(&mut self, include: Include) -> async_ops::AsyncOpUpdate {
async_ops::AsyncOpUpdate {
new_async_ops: self.async_ops.as_proto_list(include, &self.base_time),
stats_update: self.async_op_stats.as_proto(include, &self.base_time),
dropped_events: self.shared.dropped_async_ops.swap(0, Ordering::AcqRel) as u64,
}
}
pub async fn run(mut self) {
let mut timer = StreamExt::fuse(async_io::Timer::interval(self.publish_interval));
loop {
let mut recv = self.rpcs.recv().fuse();
let should_send: bool = futures_util::select! {
_ = timer.next() => self.running,
cmd = recv => {
match cmd {
Ok(server::Command::Instrument(subscription)) => {
self.add_instrument_subscription(subscription);
}
Ok(server::Command::WatchTaskDetail(request)) => {
}
Ok(server::Command::Pause) => {
self.running = false;
}
Ok(server::Command::Resume) => {
self.running = true;
}
Err(_) => {
tracing::debug!("rpc channel closed, exiting");
return
}
}
false
},
};
// drain and aggregate buffered events.
//
// Note: we *don't* want to actually await the call to `recv` --- we
// don't want the aggregator task to be woken on every event,
// because it will then be woken when its own `poll` calls are
// exited. that would result in a busy-loop. instead, we only want
// to be woken when the flush interval has elapsed, or when the
// channel is almost full.
while let Ok(event) = self.events.try_recv() {
self.update_state(event);
}
if let Err(TryRecvError::Disconnected) = self.events.try_recv() {
tracing::debug!("event channel closed; terminating");
return;
}
// flush data to clients, if there are any currently subscribed
// watchers and we should send a new update.
if !self.watchers.is_empty() && should_send {
self.publish();
}
}
}
fn publish(&mut self) {
let new_metadata = if !self.new_metadata.is_empty() {
Some(console_api::RegisterMetadata {
metadata: std::mem::take(&mut self.new_metadata),
})
} else {
None
};
let task_update = Some(self.task_update(Include::UpdatedOnly));
let resource_update = Some(self.resource_update(Include::UpdatedOnly));
let async_op_update = Some(self.async_op_update(Include::UpdatedOnly));
let update = instrument::Update {
now: Some(self.base_time.to_timestamp(Instant::now())),
new_metadata,
task_update,
resource_update,
async_op_update,
};
self.watchers
.retain(|watch: &Watch<instrument::Update>| watch.update(&update));
let stats = &self.task_stats;
// Assuming there are much fewer task details subscribers than there are
// stats updates, iterate over `details_watchers` and compact the map.
self.details_watchers.retain(|id, watchers| {
if let Some(task_stats) = stats.get(id) {
let details = tasks::TaskDetails {
task_id: Some(id.clone().into()),
now: Some(self.base_time.to_timestamp(Instant::now())),
poll_times_histogram: Some(task_stats.poll_duration_histogram()),
};
watchers.retain(|watch| watch.update(&details));
!watchers.is_empty()
} else {
false
}
});
}
/// Update the current state with data from a single event.
fn update_state(&mut self, event: Event) {
// do state update
match event {
Event::Metadata(meta) => {
self.all_metadata.push(meta.into());
self.new_metadata.push(meta.into());
}
Event::Spawn {
id,
metadata,
stats,
fields,
location,
} => {
self.tasks.insert(
id.clone(),
Task {
id: id.clone(),
is_dirty: AtomicBool::new(true),
metadata,
fields,
location,
// TODO: parents
},
);
self.task_stats.insert(id, stats);
}
Event::Resource {
id,
parent_id,
metadata,
kind,
concrete_type,
location,
is_internal,
stats,
} => {
self.resources.insert(
id.clone(),
Resource {
id: id.clone(),
is_dirty: AtomicBool::new(true),
parent_id,
kind,
metadata,
concrete_type,
location,
is_internal,
},
);
self.resource_stats.insert(id, stats);
}
Event::PollOp {
metadata,
resource_id,
op_name,
async_op_id,
task_id,
is_ready,
} => {
let poll_op = resources::PollOp {
metadata: Some(metadata.into()),
resource_id: Some(resource_id.into()),
name: op_name,
task_id: Some(task_id.into()),
async_op_id: Some(async_op_id.into()),
is_ready,
};
self.poll_ops.push(poll_op);
}
Event::AsyncResourceOp {
id,
source,
resource_id,
metadata,
parent_id,
stats,
} => {
self.async_ops.insert(
id.clone(),
AsyncOp {
id: id.clone(),
is_dirty: AtomicBool::new(true),
resource_id,
metadata,
source,
parent_id,
},
);
self.async_op_stats.insert(id, stats);
}
}
}
}