diff --git a/bffhd/initiators/mod.rs b/bffhd/initiators/mod.rs index 3e24f75..f20ee9d 100644 --- a/bffhd/initiators/mod.rs +++ b/bffhd/initiators/mod.rs @@ -51,6 +51,10 @@ impl InitiatorCallbacks { self.resource.try_update(session, status).await } + pub fn set_status(&mut self, status: Status) { + self.resource.set_status(status) + } + pub fn open_session(&self, uid: &str) -> Option { self.sessions.open(&self.span, uid) } diff --git a/bffhd/initiators/process.rs b/bffhd/initiators/process.rs index 74de4f5..d714ec3 100644 --- a/bffhd/initiators/process.rs +++ b/bffhd/initiators/process.rs @@ -2,7 +2,7 @@ use super::Initiator; use super::InitiatorCallbacks; use crate::resources::state::State; use crate::utils::linebuffer::LineBuffer; -use async_process::{Child, ChildStdout, Command, Stdio}; +use async_process::{Child, ChildStderr, ChildStdout, Command, Stdio}; use futures_lite::{ready, AsyncRead}; use miette::{miette, IntoDiagnostic}; use serde::{Deserialize, Serialize}; @@ -11,11 +11,12 @@ use std::future::Future; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; +use crate::resources::modules::fabaccess::Status; #[derive(Debug, Serialize, Deserialize)] pub enum InputMessage { #[serde(rename = "state")] - SetState(State), + SetState(Status), } #[derive(Serialize, Deserialize)] @@ -26,6 +27,7 @@ pub struct Process { pub args: Vec, state: Option, buffer: LineBuffer, + err_buffer: LineBuffer, callbacks: InitiatorCallbacks, } @@ -42,6 +44,10 @@ impl Process { .stdout .take() .expect("Child just spawned with piped stdout has no stdout"), + child + .stderr + .take() + .expect("Child just spawned with piped stderr has no stderr"), child, )); Ok(()) @@ -49,13 +55,15 @@ impl Process { } struct ProcessState { - pub child: Child, pub stdout: ChildStdout, + pub stderr: ChildStderr, + pub stderr_closed: bool, + pub child: Child, } impl ProcessState { - pub fn new(stdout: ChildStdout, child: Child) -> Self { - Self { stdout, child } + pub fn new(stdout: ChildStdout, stderr: ChildStderr, child: Child) -> Self { + Self { stdout, stderr, stderr_closed: false, child } } fn try_process(&mut self, buffer: &[u8], callbacks: &mut InitiatorCallbacks) -> usize { @@ -77,11 +85,24 @@ impl ProcessState { } fn process_line(&mut self, line: &[u8], callbacks: &mut InitiatorCallbacks) { - match serde_json::from_slice::(line) { - Ok(state) => { - tracing::trace!(?state, "got new state for process initiator"); + if !line.is_empty() { + let res = std::str::from_utf8(line); + if let Err(error) = &res { + tracing::warn!(%error, "Initiator sent line with invalid UTF-8"); + return; + } + let string = res.unwrap().trim(); + // Ignore whitespace-only lines + if !string.is_empty() { + match serde_json::from_str::(res.unwrap()) { + Ok(state) => { + tracing::trace!(?state, "got new state for process initiator"); + let InputMessage::SetState(status) = state; + callbacks.set_status(status); + } + Err(error) => tracing::warn!(%error, "process initiator did not send a valid line"), + } } - Err(error) => tracing::warn!(%error, "process initiator did not send a valid line"), } } } @@ -93,6 +114,7 @@ impl Future for Process { if let Process { state: Some(state), buffer, + err_buffer, callbacks, .. } = self.get_mut() @@ -129,6 +151,44 @@ impl Future for Process { let processed = state.try_process(buffer, callbacks); buffer.consume(processed); + if !state.stderr_closed { + let stderr = &mut state.stderr; + loop { + let buf = err_buffer.get_mut_write(512); + match AsyncRead::poll_read(Pin::new(stderr), cx, buf) { + Poll::Pending => break, + Poll::Ready(Ok(read)) => { + err_buffer.advance_valid(read); + continue; + } + Poll::Ready(Err(error)) => { + tracing::warn!(%error, "reading from child stderr errored"); + state.stderr_closed = true; + break; + } + } + } + } + + { + let mut consumed = 0; + + while let Some(idx) = buffer[consumed..].iter().position(|b| *b == b'\n') { + if idx == 0 { + consumed += 1; + continue; + } + let line = &buffer[consumed..(consumed + idx)]; + match std::str::from_utf8(line) { + Ok(line) => tracing::debug!(line, "initiator STDERR"), + Err(error) => tracing::debug!(%error, + "invalid UTF-8 on initiator STDERR"), + } + consumed = idx; + } + err_buffer.consume(consumed); + } + return Poll::Pending; } } @@ -142,8 +202,8 @@ impl Future for Process { impl Initiator for Process { fn new(params: &HashMap, callbacks: InitiatorCallbacks) -> miette::Result - where - Self: Sized, + where + Self: Sized, { let cmd = params .get("cmd") @@ -158,6 +218,7 @@ impl Initiator for Process { args, state: None, buffer: LineBuffer::new(), + err_buffer: LineBuffer::new(), callbacks, }; this.spawn().into_diagnostic()?; diff --git a/bffhd/resources/mod.rs b/bffhd/resources/mod.rs index c222467..369efe3 100644 --- a/bffhd/resources/mod.rs +++ b/bffhd/resources/mod.rs @@ -166,7 +166,7 @@ impl Resource { self.inner.set_state(archived) } - fn set_status(&self, state: Status) { + pub fn set_status(&self, state: Status) { let old = self.inner.get_state(); let oldref: &Archived = old.as_ref(); let previous: &Archived> = &oldref.inner.previous;