More console features

This commit is contained in:
Nadja Reitzenstein 2022-06-21 19:11:57 +02:00
parent ee0593dc6f
commit 35c9f45f6d
9 changed files with 838 additions and 38 deletions

62
Cargo.lock generated
View File

@ -8,7 +8,7 @@ version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47feb9fbcef700639ef28e04ca2a87eab8161a01a075ee227b15c90143805462"
dependencies = [
"nom",
"nom 5.1.2",
]
[[package]]
@ -670,10 +670,21 @@ dependencies = [
name = "console"
version = "0.1.0"
dependencies = [
"async-channel",
"async-compat",
"async-io",
"async-net",
"async-oneshot",
"console-api",
"crossbeam-channel",
"crossbeam-utils",
"futures-util",
"hdrhistogram",
"hyper",
"prost-types",
"thread_local",
"tokio",
"tokio-util",
"tonic",
"tracing",
"tracing-core",
@ -743,6 +754,15 @@ dependencies = [
"libc",
]
[[package]]
name = "crc32fast"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
dependencies = [
"cfg-if",
]
[[package]]
name = "criterion"
version = "0.3.5"
@ -1126,6 +1146,16 @@ dependencies = [
"instant",
]
[[package]]
name = "flate2"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6"
dependencies = [
"crc32fast",
"miniz_oxide",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -1422,6 +1452,20 @@ dependencies = [
"ahash",
]
[[package]]
name = "hdrhistogram"
version = "7.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31672b7011be2c4f7456c4ddbcb40e7e9a4a9fad8efe49a6ebaf5f307d0109c0"
dependencies = [
"base64",
"byteorder",
"crossbeam-channel",
"flate2",
"nom 7.1.1",
"num-traits",
]
[[package]]
name = "hermit-abi"
version = "0.1.19"
@ -1832,6 +1876,12 @@ version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.5.3"
@ -1888,6 +1938,16 @@ dependencies = [
"version_check",
]
[[package]]
name = "nom"
version = "7.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8903e5a29a317527874d0402f867152a3d21c908bb0b933e416c65e301d4c36"
dependencies = [
"memchr",
"minimal-lexical",
]
[[package]]
name = "ntapi"
version = "0.3.7"

View File

@ -7,10 +7,21 @@ edition = "2021"
[dependencies]
console-api = "0.3"
prost-types = "0.10"
tonic = { version = "0.7.2", default_features = false, features = [] }
hyper = { version = "0.14", default_features = false, features = ["http2", "server", "stream"] }
thread_local = "1.1"
tracing = "0.1"
tracing-core = "0.1"
tracing-subscriber = { version = "0.3", default_features = false, features = ["registry"] }
crossbeam-channel = "0.5"
crossbeam-utils = "0.8"
crossbeam-channel = "0.5"
async-net = "1.6"
async-compat = "0.2"
async-channel = "1.6"
async-oneshot = "0.5"
async-io = "1.7"
tokio-util = "0.7"
futures-util = "0.3"
tokio = { version = "1.19", default_features = false, features = []}
hdrhistogram = "7.5"

View File

@ -1,12 +1,375 @@
use crate::server::{Watch, WatchRequest};
use crate::stats::TimeAnchor;
use crate::Event;
use crossbeam_channel::Receiver;
use crate::{server, stats};
use console_api::{async_ops, instrument, resources, tasks};
use crossbeam_channel::{Receiver, TryRecvError};
use futures_util::{FutureExt, StreamExt};
use std::num::NonZeroU64;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing_core::Metadata;
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub(crate) struct Id(NonZeroU64);
impl Id {
pub fn from_non_zero_u64(u: NonZeroU64) -> Self {
Self(u)
}
}
impl Into<console_api::Id> for Id {
fn into(self) -> console_api::Id {
console_api::Id { id: self.0.into() }
}
}
struct Resource {
id: Id,
is_dirty: AtomicBool,
parent_id: Option<Id>,
metadata: &'static Metadata<'static>,
concrete_type: String,
kind: resources::resource::Kind,
location: Option<console_api::Location>,
is_internal: bool,
}
/// Represents static data for tasks
struct Task {
id: Id,
is_dirty: AtomicBool,
metadata: &'static Metadata<'static>,
fields: Vec<console_api::Field>,
location: Option<console_api::Location>,
}
struct AsyncOp {
id: Id,
is_dirty: AtomicBool,
parent_id: Option<Id>,
resource_id: Id,
metadata: &'static Metadata<'static>,
source: String,
}
#[derive(Copy, Clone, Eq, PartialEq)]
pub(crate) enum Include {
All,
UpdatedOnly,
}
type IdMap<T> = std::collections::HashMap<Id, T>;
pub(crate) struct Aggregator {
events: Receiver<Event>,
rpcs: async_channel::Receiver<server::Command>,
watchers: Vec<Watch<instrument::Update>>,
details_watchers: IdMap<Vec<Watch<tasks::TaskDetails>>>,
all_metadata: Vec<console_api::register_metadata::NewMetadata>,
new_metadata: Vec<console_api::register_metadata::NewMetadata>,
running: bool,
publish_interval: Duration,
base_time: TimeAnchor,
tasks: IdMap<Task>,
task_stats: IdMap<Arc<stats::TaskStats>>,
resources: IdMap<Resource>,
resource_stats: IdMap<Arc<stats::ResourceStats>>,
async_ops: IdMap<AsyncOp>,
async_op_stats: IdMap<Arc<stats::AsyncOpStats>>,
poll_ops: Vec<console_api::resources::PollOp>,
}
impl Aggregator {
pub fn new(events: Receiver<Event>) -> Self {
Self { events }
pub fn new(events: Receiver<Event>, rpcs: async_channel::Receiver<server::Command>) -> Self {
Self {
events,
rpcs,
watchers: Vec::new(),
details_watchers: IdMap::new(),
running: true,
publish_interval: Duration::from_secs(1),
all_metadata: Vec::new(),
new_metadata: Vec::new(),
base_time: TimeAnchor::new(),
tasks: IdMap::new(),
task_stats: IdMap::new(),
resources: IdMap::new(),
resource_stats: IdMap::new(),
async_ops: IdMap::new(),
async_op_stats: IdMap::new(),
poll_ops: Vec::new(),
}
}
fn add_instrument_subscription(&mut self, subscription: Watch<instrument::Update>) {
tracing::debug!("new instrument subscription");
let task_update = Some(self.task_update(Include::All));
let resource_update = Some(self.resource_update(Include::All));
let async_op_update = Some(self.async_op_update(Include::All));
let now = Instant::now();
let update = &instrument::Update {
task_update,
resource_update,
async_op_update,
now: Some(self.base_time.to_timestamp(now)),
new_metadata: Some(console_api::RegisterMetadata {
metadata: (self.all_metadata).clone(),
}),
};
// Send the initial state --- if this fails, the subscription is already dead
if subscription.update(update) {
self.watchers.push(subscription)
}
}
/// Add the task details subscription to the watchers after sending the first update,
/// if the task is found.
fn add_task_detail_subscription(
&mut self,
watch_request: WatchRequest<console_api::tasks::TaskDetails>,
) {
let WatchRequest {
id,
mut stream_sender,
buffer,
} = watch_request;
tracing::debug!(id = ?id, "new task details subscription");
if let Some(stats) = self.task_stats.get(&id) {
let (tx, rx) = async_channel::bounded(buffer);
let subscription = Watch(tx);
let now = Some(self.base_time.to_timestamp(Instant::now()));
// Send back the stream receiver.
// Then send the initial state --- if this fails, the subscription is already dead.
if stream_sender.send(rx).is_ok()
&& subscription.update(&console_api::tasks::TaskDetails {
task_id: Some(id.clone().into()),
now,
poll_times_histogram: Some(stats.poll_duration_histogram()),
})
{
self.details_watchers
.entry(id.clone())
.or_insert_with(Vec::new)
.push(subscription);
}
}
// If the task is not found, drop `stream_sender` which will result in a not found error
}
fn task_update(&mut self, include: Include) -> tasks::TaskUpdate {
todo!()
}
fn resource_update(&mut self, include: Include) -> resources::ResourceUpdate {
todo!()
}
fn async_op_update(&mut self, include: Include) -> async_ops::AsyncOpUpdate {
todo!()
}
pub async fn run(mut self) {
let mut timer = StreamExt::fuse(async_io::Timer::interval(self.publish_interval));
loop {
let mut recv = self.rpcs.recv().fuse();
let should_send: bool = futures_util::select! {
_ = timer.next() => self.running,
cmd = recv => {
match cmd {
Ok(server::Command::Instrument(subscription)) => {
self.add_instrument_subscription(subscription);
}
Ok(server::Command::WatchTaskDetail(request)) => {
}
Ok(server::Command::Pause) => {
self.running = false;
}
Ok(server::Command::Resume) => {
self.running = true;
}
Err(_) => {
tracing::debug!("rpc channel closed, exiting");
return
}
}
false
},
};
// drain and aggregate buffered events.
//
// Note: we *don't* want to actually await the call to `recv` --- we
// don't want the aggregator task to be woken on every event,
// because it will then be woken when its own `poll` calls are
// exited. that would result in a busy-loop. instead, we only want
// to be woken when the flush interval has elapsed, or when the
// channel is almost full.
let mut drained = false;
while let Ok(event) = self.events.try_recv() {
self.update_state(event);
}
if let Err(TryRecvError::Disconnected) = self.events.try_recv() {
tracing::debug!("event channel closed; terminating");
return;
}
// flush data to clients, if there are any currently subscribed
// watchers and we should send a new update.
if !self.watchers.is_empty() && should_send {
self.publish();
}
}
}
fn publish(&mut self) {
let new_metadata = if !self.new_metadata.is_empty() {
Some(console_api::RegisterMetadata {
metadata: std::mem::take(&mut self.new_metadata),
})
} else {
None
};
let task_update = Some(self.task_update(Include::UpdatedOnly));
let resource_update = Some(self.resource_update(Include::UpdatedOnly));
let async_op_update = Some(self.async_op_update(Include::UpdatedOnly));
let update = instrument::Update {
now: Some(self.base_time.to_timestamp(Instant::now())),
new_metadata,
task_update,
resource_update,
async_op_update,
};
//self.watchers.retain_and_shrink(|watch: &Watch<instrument::Update>| watch.update
// (&update));
let stats = &self.task_stats;
// Assuming there are much fewer task details subscribers than there are
// stats updates, iterate over `details_watchers` and compact the map.
/*self.details_watchers.retain_and_shrink(|id, watchers| {
if let Some(task_stats) = stats.get(id) {
let details = tasks::TaskDetails {
task_id: Some(id.clone().into()),
now: Some(self.base_time.to_timestamp(Instant::now())),
poll_times_histogram: Some(task_stats.poll_duration_histogram()),
};
watchers.retain(|watch| watch.update(&details));
!watchers.is_empty()
} else {
false
}
});
*/
}
/// Update the current state with data from a single event.
fn update_state(&mut self, event: Event) {
// do state update
match event {
Event::Metadata(meta) => {
self.all_metadata.push(meta.into());
self.new_metadata.push(meta.into());
}
Event::Spawn {
id,
metadata,
stats,
fields,
location,
} => {
self.tasks.insert(
id.clone(),
Task {
id: id.clone(),
is_dirty: AtomicBool::new(true),
metadata,
fields,
location,
// TODO: parents
},
);
self.task_stats.insert(id, stats);
}
Event::Resource {
id,
parent_id,
metadata,
kind,
concrete_type,
location,
is_internal,
stats,
} => {
self.resources.insert(
id.clone(),
Resource {
id: id.clone(),
is_dirty: AtomicBool::new(true),
parent_id,
kind,
metadata,
concrete_type,
location,
is_internal,
},
);
self.resource_stats.insert(id, stats);
}
Event::PollOp {
metadata,
resource_id,
op_name,
async_op_id,
task_id,
is_ready,
} => {
let poll_op = resources::PollOp {
metadata: Some(metadata.into()),
resource_id: Some(resource_id.into()),
name: op_name,
task_id: Some(task_id.into()),
async_op_id: Some(async_op_id.into()),
is_ready,
};
self.poll_ops.push(poll_op);
}
Event::AsyncResourceOp {
id,
source,
resource_id,
metadata,
parent_id,
stats,
} => {
self.async_ops.insert(
id.clone(),
AsyncOp {
id: id.clone(),
is_dirty: AtomicBool::new(true),
resource_id,
metadata,
source,
parent_id,
},
);
self.async_op_stats.insert(id, stats);
}
}
}
}

View File

@ -0,0 +1,30 @@
use crate::aggregate::Id;
use std::collections::HashMap;
#[derive(Debug, Default)]
pub(crate) struct Attributes {
attributes: HashMap<FieldKey, console_api::Attribute>,
}
#[derive(Debug, Clone)]
pub(crate) struct Update {
pub(crate) field: console_api::Field,
pub(crate) op: Option<UpdateOp>,
pub(crate) unit: Option<String>,
}
#[derive(Debug, Clone)]
pub(crate) enum UpdateOp {
Add,
Override,
Sub,
}
/// Represents a key for a `proto::field::Name`. Because the
/// proto::field::Name might not be unique we also include the
/// resource id in this key
#[derive(Debug, Hash, PartialEq, Eq)]
struct FieldKey {
update_id: Id,
field_name: console_api::field::Name,
}

View File

@ -36,7 +36,7 @@ impl<const MAX_CALLSITES: usize> Callsites<MAX_CALLSITES> {
while {
for cs in &self.array[idx..end] {
let ptr = cs.load(Ordering::Acquire);
let meta = unsafe { ptr as *const _ as &Metadata<'static> };
let meta = unsafe { &*ptr };
if meta.callsite() == callsite.callsite() {
return true;
}

View File

@ -1,5 +1,43 @@
use crate::aggregate::Id;
use crate::stats;
use console_api::resources;
use std::sync::Arc;
use tracing_core::Metadata;
pub(crate) enum Event {
Metadata(&'static Metadata<'static>),
Spawn {
id: Id,
metadata: &'static Metadata<'static>,
stats: Arc<stats::TaskStats>,
fields: Vec<console_api::Field>,
location: Option<console_api::Location>,
},
Resource {
id: Id,
parent_id: Option<Id>,
metadata: &'static Metadata<'static>,
concrete_type: String,
kind: resources::resource::Kind,
location: Option<console_api::Location>,
is_internal: bool,
stats: Arc<stats::ResourceStats>,
},
PollOp {
metadata: &'static Metadata<'static>,
resource_id: Id,
op_name: String,
async_op_id: Id,
task_id: Id,
is_ready: bool,
},
AsyncResourceOp {
id: Id,
parent_id: Option<Id>,
resource_id: Id,
metadata: &'static Metadata<'static>,
source: String,
stats: Arc<stats::AsyncOpStats>,
},
}

View File

@ -13,10 +13,12 @@ use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;
mod aggregate;
mod attribute;
mod callsites;
mod event;
mod server;
mod stack;
mod stats;
use crate::aggregate::Aggregator;
use crate::callsites::Callsites;
@ -46,6 +48,8 @@ pub struct Builder {
/// A smaller number will reduce the memory footprint but may lead to more events being dropped
/// during activity bursts.
event_buffer_capacity: usize,
client_buffer_capacity: usize,
}
impl Builder {
pub fn build(self) -> (ConsoleLayer, Server) {
@ -59,6 +63,7 @@ impl Default for Builder {
server_addr: Server::DEFAULT_ADDR,
server_port: Server::DEFAULT_PORT,
event_buffer_capacity: ConsoleLayer::DEFAULT_EVENT_BUFFER_CAPACITY,
client_buffer_capacity: 1024,
}
}
}
@ -85,8 +90,9 @@ impl ConsoleLayer {
let (tx, events) = crossbeam_channel::bounded(config.event_buffer_capacity);
let shared = Arc::new(Shared::default());
let aggregator = Aggregator::new(events);
let server = Server::new(aggregator);
let (subscribe, rpcs) = async_channel::bounded(config.client_buffer_capacity);
let aggregator = Aggregator::new(events, rpcs);
let server = Server::new(aggregator, config.client_buffer_capacity, subscribe);
let layer = Self {
current_spans: ThreadLocal::new(),
tx,

View File

@ -1,25 +1,93 @@
use crate::aggregate::Id;
use crate::Aggregator;
use async_channel::{Receiver, Sender};
use console_api::instrument;
use console_api::instrument::instrument_server::{Instrument, InstrumentServer};
use console_api::instrument::{
InstrumentRequest, PauseRequest, PauseResponse, ResumeRequest, ResumeResponse,
TaskDetailsRequest,
};
use console_api::tasks;
use futures_util::TryStreamExt;
use std::error::Error;
use std::net::{IpAddr, Ipv6Addr};
use std::io::IoSlice;
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncRead as TokioAsyncRead;
use tokio::io::{AsyncWrite as TokioAsyncWrite, ReadBuf};
use tonic::transport::server::Connected;
use tonic::Status;
struct StreamWrapper<T>(T);
impl<T> Connected for StreamWrapper<T> {
type ConnectInfo = ();
fn connect_info(&self) -> Self::ConnectInfo {
()
}
}
impl<T: TokioAsyncWrite + Unpin> TokioAsyncWrite for StreamWrapper<T> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
TokioAsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf)
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
TokioAsyncWrite::poll_flush(Pin::new(&mut self.0), cx)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx)
}
fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
TokioAsyncWrite::poll_write_vectored(Pin::new(&mut self.0), cx, bufs)
}
fn is_write_vectored(&self) -> bool {
TokioAsyncWrite::is_write_vectored(&self.0)
}
}
impl<T: TokioAsyncRead + Unpin> TokioAsyncRead for StreamWrapper<T> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
TokioAsyncRead::poll_read(Pin::new(&mut self.0), cx, buf)
}
}
pub struct Server {
aggregator: Aggregator,
client_buffer_size: usize,
subscribe: Sender<Command>,
}
impl Server {
pub(crate) const DEFAULT_ADDR: IpAddr = IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1));
pub(crate) const DEFAULT_PORT: u16 = 49289;
pub(crate) fn new(aggregator: Aggregator, client_buffer_size: usize) -> Self {
pub(crate) fn new(
aggregator: Aggregator,
client_buffer_size: usize,
subscribe: Sender<Command>,
) -> Self {
let subscribe = todo!();
Self {
aggregator,
client_buffer_size,
subscribe,
}
}
@ -32,8 +100,12 @@ impl Server {
let svc = InstrumentServer::new(self);
// The gRPC server task; requires a `Stream` of `tokio::AsyncRead + tokio::AsyncWrite`.
// TODO: Pass an async listening socket that implements the tokio versions of Read/Write
let incoming = todo!();
let listener =
async_net::TcpListener::bind(SocketAddr::new(Self::DEFAULT_ADDR, Self::DEFAULT_PORT))
.await?;
let incoming = listener
.incoming()
.map_ok(|stream| StreamWrapper(async_compat::Compat::new(stream)));
tonic::transport::Server::builder()
.add_service(svc)
.serve_with_incoming(incoming)
@ -45,51 +117,110 @@ impl Server {
}
}
pub(crate) struct Watch<T>(pub(crate) Sender<Result<T, tonic::Status>>);
impl<T: Clone> Watch<T> {
pub fn update(&self, update: &T) -> bool {
self.0.try_send(Ok(update.clone())).is_ok()
}
}
pub(crate) struct WatchRequest<T> {
pub id: Id,
pub stream_sender: async_oneshot::Sender<Receiver<Result<T, tonic::Status>>>,
pub buffer: usize,
}
pub(crate) enum Command {
Instrument(Watch<instrument::Update>),
WatchTaskDetail(WatchRequest<tasks::TaskDetails>),
Pause,
Resume,
}
#[tonic::async_trait]
impl Instrument for Server {
type WatchUpdatesStream = ();
type WatchUpdatesStream = async_channel::Receiver<Result<instrument::Update, Status>>;
async fn watch_updates(
&self,
request: tonic::Request<InstrumentRequest>,
request: tonic::Request<instrument::InstrumentRequest>,
) -> Result<tonic::Response<Self::WatchUpdatesStream>, tonic::Status> {
/*
match request.remote_addr() {
Some(addr) => tracing::debug!(client.addr = %addr, "starting a new watch"),
None => tracing::debug!(client.addr = %"<unknown>", "starting a new watch"),
}
let permit = self.subscribe.reserve().await.map_err(|_| {
tonic::Status::internal("cannot start new watch, aggregation task is not running")
})?;
let (tx, rx) = mpsc::channel(self.client_buffer);
permit.send(Command::Instrument(Watch(tx)));
tracing::debug!("watch started");
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Ok(tonic::Response::new(stream))
*/
todo!()
if !self.subscribe.is_full() {
let (tx, rx) = async_channel::bounded(self.client_buffer_size);
self.subscribe.send(Command::Instrument(Watch(tx))).await;
tracing::debug!("watch started");
Ok(tonic::Response::new(rx))
} else {
Err(tonic::Status::internal(
"cannot start new watch, aggregation task is not running",
))
}
}
type WatchTaskDetailsStream = ();
type WatchTaskDetailsStream = async_channel::Receiver<Result<tasks::TaskDetails, Status>>;
async fn watch_task_details(
&self,
request: tonic::Request<TaskDetailsRequest>,
request: tonic::Request<instrument::TaskDetailsRequest>,
) -> Result<tonic::Response<Self::WatchTaskDetailsStream>, tonic::Status> {
todo!()
let task_id = request
.into_inner()
.id
.ok_or_else(|| tonic::Status::invalid_argument("missing task_id"))?
.id;
// `tracing` reserves span ID 0 for niche optimization for `Option<Id>`.
let id = std::num::NonZeroU64::new(task_id)
.map(Id::from_non_zero_u64)
.ok_or_else(|| tonic::Status::invalid_argument("task_id cannot be 0"))?;
if !self.subscribe.is_full() {
// Check with the aggregator task to request a stream if the task exists.
let (stream_sender, stream_recv) = async_oneshot::oneshot();
self.subscribe
.send(Command::WatchTaskDetail(WatchRequest {
id,
stream_sender,
buffer: self.client_buffer_size,
}))
.await;
// If the aggregator drops the sender, the task doesn't exist.
let rx = stream_recv.await.map_err(|_| {
tracing::warn!(id = ?task_id, "requested task not found");
tonic::Status::not_found("task not found")
})?;
tracing::debug!(id = ?task_id, "task details watch started");
Ok(tonic::Response::new(rx))
} else {
Err(tonic::Status::internal(
"cannot start new watch, aggregation task is not running",
))
}
}
async fn pause(
&self,
request: tonic::Request<PauseRequest>,
) -> Result<tonic::Response<PauseResponse>, tonic::Status> {
todo!()
_request: tonic::Request<instrument::PauseRequest>,
) -> Result<tonic::Response<instrument::PauseResponse>, tonic::Status> {
self.subscribe.send(Command::Pause).await.map_err(|_| {
tonic::Status::internal("cannot pause, aggregation task is not running")
})?;
Ok(tonic::Response::new(instrument::PauseResponse {}))
}
async fn resume(
&self,
request: tonic::Request<ResumeRequest>,
) -> Result<tonic::Response<ResumeResponse>, tonic::Status> {
todo!()
_request: tonic::Request<instrument::ResumeRequest>,
) -> Result<tonic::Response<instrument::ResumeResponse>, tonic::Status> {
self.subscribe.send(Command::Resume).await.map_err(|_| {
tonic::Status::internal("cannot resume, aggregation task is not running")
})?;
Ok(tonic::Response::new(instrument::ResumeResponse {}))
}
}

View File

@ -0,0 +1,161 @@
use crate::aggregate::Id;
use crate::attribute;
use crossbeam_utils::atomic::AtomicCell;
use hdrhistogram::serialization::{Serializer, V2Serializer};
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::Mutex;
use std::time::{Duration, Instant, SystemTime};
/// Anchors an `Instant` with a `SystemTime` timestamp to allow converting
/// monotonic `Instant`s into timestamps that can be sent over the wire.
#[derive(Debug, Clone)]
pub(crate) struct TimeAnchor {
mono: Instant,
sys: SystemTime,
}
impl TimeAnchor {
pub(crate) fn new() -> Self {
Self {
mono: Instant::now(),
sys: SystemTime::now(),
}
}
pub(crate) fn to_system_time(&self, t: Instant) -> SystemTime {
let dur = t
.checked_duration_since(self.mono)
.unwrap_or_else(|| Duration::from_secs(0));
self.sys + dur
}
pub(crate) fn to_timestamp(&self, t: Instant) -> prost_types::Timestamp {
self.to_system_time(t).into()
}
}
#[derive(Debug, Default)]
struct PollStats<H> {
/// The number of polls in progress
current_polls: AtomicUsize,
/// The total number of polls
polls: AtomicUsize,
timestamps: Mutex<PollTimestamps<H>>,
}
/// Stats associated with a task.
#[derive(Debug)]
pub(crate) struct TaskStats {
is_dirty: AtomicBool,
is_dropped: AtomicBool,
// task stats
pub(crate) created_at: Instant,
timestamps: Mutex<TaskTimestamps>,
// waker stats
wakes: AtomicUsize,
waker_clones: AtomicUsize,
waker_drops: AtomicUsize,
self_wakes: AtomicUsize,
/// Poll durations and other stats.
poll_stats: PollStats<Histogram>,
}
impl TaskStats {
pub(crate) fn poll_duration_histogram(
&self,
) -> console_api::tasks::task_details::PollTimesHistogram {
let hist = self
.poll_stats
.timestamps
.lock()
.unwrap()
.histogram
.to_proto();
console_api::tasks::task_details::PollTimesHistogram::Histogram(hist)
}
}
/// Stats associated with an async operation.
///
/// This shares all of the same fields as [`ResourceStats]`, with the addition
/// of [`PollStats`] tracking when the async operation is polled, and the task
/// ID of the last task to poll the async op.
#[derive(Debug)]
pub(crate) struct AsyncOpStats {
/// The task ID of the last task to poll this async op.
///
/// This is set every time the async op is polled, in case a future is
/// passed between tasks.
task_id: AtomicCell<u64>,
/// Fields shared with `ResourceStats`.
pub(crate) stats: ResourceStats,
/// Poll durations and other stats.
poll_stats: PollStats<()>,
}
/// Stats associated with a resource.
#[derive(Debug)]
pub(crate) struct ResourceStats {
is_dirty: AtomicBool,
is_dropped: AtomicBool,
created_at: Instant,
dropped_at: Mutex<Option<Instant>>,
attributes: Mutex<attribute::Attributes>,
pub(crate) inherit_child_attributes: bool,
pub(crate) parent_id: Option<Id>,
}
#[derive(Debug, Default)]
struct TaskTimestamps {
dropped_at: Option<Instant>,
last_wake: Option<Instant>,
}
#[derive(Debug, Default)]
struct PollTimestamps<H> {
first_poll: Option<Instant>,
last_poll_started: Option<Instant>,
last_poll_ended: Option<Instant>,
busy_time: Duration,
histogram: H,
}
#[derive(Debug)]
struct Histogram {
histogram: hdrhistogram::Histogram<u64>,
max: u64,
outliers: u64,
max_outlier: Option<u64>,
}
impl Histogram {
fn new(max: u64) -> Self {
// significant figures should be in the [0-5] range and memory usage
// grows exponentially with higher a sigfig
let histogram = hdrhistogram::Histogram::new_with_max(max, 2).unwrap();
Self {
histogram,
max,
max_outlier: None,
outliers: 0,
}
}
fn to_proto(&self) -> console_api::tasks::DurationHistogram {
let mut serializer = V2Serializer::new();
let mut raw_histogram = Vec::new();
serializer
.serialize(&self.histogram, &mut raw_histogram)
.expect("histogram failed to serialize");
console_api::tasks::DurationHistogram {
raw_histogram,
max_value: self.max,
high_outliers: self.outliers,
highest_outlier: self.max_outlier,
}
}
}