More console implementation stuff

This commit is contained in:
Nadja Reitzenstein
2022-06-21 16:20:44 +02:00
parent df7bd80d06
commit 8a35818b4f
8 changed files with 715 additions and 6 deletions

View File

@ -7,4 +7,10 @@ edition = "2021"
[dependencies]
console-api = "0.3"
tracing = "0.1"
tonic = { version = "0.7.2", default_features = false, features = [] }
hyper = { version = "0.14", default_features = false, features = ["http2", "server", "stream"] }
thread_local = "1.1"
tracing = "0.1"
tracing-core = "0.1"
tracing-subscriber = { version = "0.3", default_features = false, features = ["registry"] }
crossbeam-channel = "0.5"

View File

@ -0,0 +1,12 @@
use crate::Event;
use crossbeam_channel::Receiver;
pub(crate) struct Aggregator {
events: Receiver<Event>,
}
impl Aggregator {
pub fn new(events: Receiver<Event>) -> Self {
Self { events }
}
}

View File

@ -30,13 +30,13 @@ impl<const MAX_CALLSITES: usize> Callsites<MAX_CALLSITES> {
}
}
pub(crate) fn contains(&self, callsite: &'static Metadata<'static>) -> bool {
pub(crate) fn contains(&self, callsite: &Metadata<'static>) -> bool {
let mut idx = 0;
let mut end = self.len.load(Ordering::Acquire);
while {
for cs in &self.array[idx..end] {
let ptr = cs.load(Ordering::Acquire);
let meta = unsafe { ptr as *const _ as &'static Metadata<'static> };
let meta = unsafe { ptr as *const _ as &Metadata<'static> };
if meta.callsite() == callsite.callsite() {
return true;
}
@ -55,7 +55,7 @@ impl<const MAX_CALLSITES: usize> Callsites<MAX_CALLSITES> {
impl<const MAX_CALLSITES: usize> Default for Callsites<MAX_CALLSITES> {
fn default() -> Self {
const NULLPTR: AtomicPtr<_> = AtomicPtr::new(ptr::null_mut());
const NULLPTR: AtomicPtr<Metadata<'static>> = AtomicPtr::new(ptr::null_mut());
Self {
array: [NULLPTR; MAX_CALLSITES],
len: AtomicUsize::new(0),

View File

@ -0,0 +1,5 @@
use tracing_core::Metadata;
pub(crate) enum Event {
Metadata(&'static Metadata<'static>),
}

View File

@ -1,4 +1,164 @@
use crossbeam_channel::{Sender, TrySendError};
use std::any::TypeId;
use std::cell::RefCell;
use std::net::IpAddr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use thread_local::ThreadLocal;
use tracing_core::span::{Attributes, Id, Record};
use tracing_core::{Interest, LevelFilter, Metadata, Subscriber};
use tracing_subscriber::filter::Filtered;
use tracing_subscriber::layer::{Context, Filter, Layered};
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;
mod aggregate;
mod callsites;
mod event;
mod server;
mod stack;
use crate::aggregate::Aggregator;
use crate::callsites::Callsites;
use event::Event;
pub use server::Server;
use stack::SpanStack;
pub struct ConsoleLayer {
current_spans: ThreadLocal<RefCell<SpanStack>>,
tx: Sender<Event>,
shared: Arc<Shared>,
spawn_callsites: Callsites<8>,
waker_callsites: Callsites<8>,
}
#[derive(Debug)]
pub struct Builder {
/// Network Address the console server will listen on
server_addr: IpAddr,
/// Network Port the console server will listen on
server_port: u16,
/// Number of events that can be buffered before events are dropped.
///
/// A smaller number will reduce the memory footprint but may lead to more events being dropped
/// during activity bursts.
event_buffer_capacity: usize,
}
impl Builder {
pub fn build(self) -> (ConsoleLayer, Server) {
ConsoleLayer::build(self)
}
}
impl Default for Builder {
fn default() -> Self {
Self {
// Listen on `::1` (aka localhost) by default
server_addr: Server::DEFAULT_ADDR,
server_port: Server::DEFAULT_PORT,
event_buffer_capacity: ConsoleLayer::DEFAULT_EVENT_BUFFER_CAPACITY,
}
}
}
#[derive(Debug, Default)]
struct Shared {
dropped_tasks: AtomicUsize,
dropped_resources: AtomicUsize,
}
impl ConsoleLayer {
pub fn new() -> (Self, Server) {
Self::builder().build()
}
pub fn builder() -> Builder {
Builder::default()
}
fn build(config: Builder) -> (Self, Server) {
tracing::debug!(
?config.server_addr,
config.event_buffer_capacity,
"configured console subscriber"
);
let (tx, events) = crossbeam_channel::bounded(config.event_buffer_capacity);
let shared = Arc::new(Shared::default());
let aggregator = Aggregator::new(events);
let server = Server::new(aggregator);
let layer = Self {
current_spans: ThreadLocal::new(),
tx,
shared,
spawn_callsites: Callsites::default(),
waker_callsites: Callsites::default(),
};
(layer, server)
}
}
impl ConsoleLayer {
const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 1024;
const DEFAULT_CLIENT_BUFFER_CAPACITY: usize = 1024;
fn is_spawn(&self, metadata: &Metadata<'static>) -> bool {
self.spawn_callsites.contains(metadata)
}
fn is_waker(&self, metadata: &Metadata<'static>) -> bool {
self.waker_callsites.contains(metadata)
}
fn send_stats<S>(
&self,
dropped: &AtomicUsize,
mkEvent: impl FnOnce() -> (Event, S),
) -> Option<S> {
if self.tx.is_full() {
dropped.fetch_add(1, Ordering::Release);
return None;
}
let (event, stats) = mkEvent();
match self.tx.try_send(event) {
Ok(()) => Some(stats),
Err(TrySendError::Full(_)) => {
dropped.fetch_add(1, Ordering::Release);
None
}
Err(TrySendError::Disconnected(_)) => None,
}
}
fn send_metadata(&self, dropped: &AtomicUsize, event: Event) -> bool {
self.send_stats(dropped, || (event, ())).is_some()
}
}
impl<S> Layer<S> for ConsoleLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
let dropped = match (metadata.name(), metadata.target()) {
(_, "executor::spawn") => {
self.spawn_callsites.insert(metadata);
&self.shared.dropped_tasks
}
(_, "executor::waker") => {
self.waker_callsites.insert(metadata);
&self.shared.dropped_tasks
}
(_, _) => &self.shared.dropped_tasks,
};
self.send_metadata(dropped, Event::Metadata(metadata));
Interest::always()
}
}
#[cfg(test)]
mod tests {

View File

@ -0,0 +1,95 @@
use crate::Aggregator;
use console_api::instrument::instrument_server::{Instrument, InstrumentServer};
use console_api::instrument::{
InstrumentRequest, PauseRequest, PauseResponse, ResumeRequest, ResumeResponse,
TaskDetailsRequest,
};
use std::error::Error;
use std::net::{IpAddr, Ipv6Addr};
pub struct Server {
aggregator: Aggregator,
client_buffer_size: usize,
}
impl Server {
pub(crate) const DEFAULT_ADDR: IpAddr = IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1));
pub(crate) const DEFAULT_PORT: u16 = 49289;
pub(crate) fn new(aggregator: Aggregator, client_buffer_size: usize) -> Self {
Self {
aggregator,
client_buffer_size,
}
}
pub(crate) async fn serve(
mut self, /*, incoming: I */
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// TODO: Spawn two tasks; the aggregator that's collecting stats, aggregating and
// collating them and the server task doing the tonic gRPC stuff
let svc = InstrumentServer::new(self);
// The gRPC server task; requires a `Stream` of `tokio::AsyncRead + tokio::AsyncWrite`.
// TODO: Pass an async listening socket that implements the tokio versions of Read/Write
let incoming = todo!();
tonic::transport::Server::builder()
.add_service(svc)
.serve_with_incoming(incoming)
.await?;
// TODO: Kill the aggregator task if the serve task has ended.
Ok(())
}
}
#[tonic::async_trait]
impl Instrument for Server {
type WatchUpdatesStream = ();
async fn watch_updates(
&self,
request: tonic::Request<InstrumentRequest>,
) -> Result<tonic::Response<Self::WatchUpdatesStream>, tonic::Status> {
/*
match request.remote_addr() {
Some(addr) => tracing::debug!(client.addr = %addr, "starting a new watch"),
None => tracing::debug!(client.addr = %"<unknown>", "starting a new watch"),
}
let permit = self.subscribe.reserve().await.map_err(|_| {
tonic::Status::internal("cannot start new watch, aggregation task is not running")
})?;
let (tx, rx) = mpsc::channel(self.client_buffer);
permit.send(Command::Instrument(Watch(tx)));
tracing::debug!("watch started");
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Ok(tonic::Response::new(stream))
*/
todo!()
}
type WatchTaskDetailsStream = ();
async fn watch_task_details(
&self,
request: tonic::Request<TaskDetailsRequest>,
) -> Result<tonic::Response<Self::WatchTaskDetailsStream>, tonic::Status> {
todo!()
}
async fn pause(
&self,
request: tonic::Request<PauseRequest>,
) -> Result<tonic::Response<PauseResponse>, tonic::Status> {
todo!()
}
async fn resume(
&self,
request: tonic::Request<ResumeRequest>,
) -> Result<tonic::Response<ResumeResponse>, tonic::Status> {
todo!()
}
}

View File

@ -0,0 +1,64 @@
use tracing_core::span::Id;
// This has been copied from tracing-subscriber. Once the library adds
// the ability to iterate over entered spans, this code will
// no longer be needed here
//
// https://github.com/tokio-rs/tracing/blob/master/tracing-subscriber/src/registry/stack.rs
#[derive(Debug, Clone)]
pub(crate) struct ContextId {
id: Id,
duplicate: bool,
}
impl ContextId {
pub fn id(&self) -> &Id {
&self.id
}
}
/// `SpanStack` tracks what spans are currently executing on a thread-local basis.
///
/// A "separate current span" for each thread is a semantic choice, as each span
/// can be executing in a different thread.
#[derive(Debug, Default)]
pub(crate) struct SpanStack {
stack: Vec<ContextId>,
}
impl SpanStack {
#[inline]
pub(crate) fn push(&mut self, id: Id) -> bool {
let duplicate = self.stack.iter().any(|i| i.id == id);
self.stack.push(ContextId { id, duplicate });
!duplicate
}
/// Pop a currently entered span.
///
/// Returns `true` if the span was actually exited.
#[inline]
pub(crate) fn pop(&mut self, expected_id: &Id) -> bool {
if let Some((idx, _)) = self
.stack
.iter()
.enumerate()
.rev()
.find(|(_, ctx_id)| ctx_id.id == *expected_id)
{
let ContextId { id: _, duplicate } = self.stack.remove(idx);
return !duplicate;
}
false
}
pub(crate) fn iter(&self) -> impl Iterator<Item = &Id> {
self.stack
.iter()
.filter_map(|ContextId { id, duplicate }| if *duplicate { None } else { Some(id) })
}
pub(crate) fn stack(&self) -> &Vec<ContextId> {
&self.stack
}
}