Update process initiator to make shelly-timeout doable

This commit is contained in:
Nadja Reitzenstein 2022-08-22 19:05:57 +02:00
parent f3278fcf05
commit ec1cac9443
3 changed files with 77 additions and 12 deletions

View File

@ -51,6 +51,10 @@ impl InitiatorCallbacks {
self.resource.try_update(session, status).await self.resource.try_update(session, status).await
} }
pub fn set_status(&mut self, status: Status) {
self.resource.set_status(status)
}
pub fn open_session(&self, uid: &str) -> Option<SessionHandle> { pub fn open_session(&self, uid: &str) -> Option<SessionHandle> {
self.sessions.open(&self.span, uid) self.sessions.open(&self.span, uid)
} }

View File

@ -2,7 +2,7 @@ use super::Initiator;
use super::InitiatorCallbacks; use super::InitiatorCallbacks;
use crate::resources::state::State; use crate::resources::state::State;
use crate::utils::linebuffer::LineBuffer; use crate::utils::linebuffer::LineBuffer;
use async_process::{Child, ChildStdout, Command, Stdio}; use async_process::{Child, ChildStderr, ChildStdout, Command, Stdio};
use futures_lite::{ready, AsyncRead}; use futures_lite::{ready, AsyncRead};
use miette::{miette, IntoDiagnostic}; use miette::{miette, IntoDiagnostic};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -11,11 +11,12 @@ use std::future::Future;
use std::io; use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use crate::resources::modules::fabaccess::Status;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum InputMessage { pub enum InputMessage {
#[serde(rename = "state")] #[serde(rename = "state")]
SetState(State), SetState(Status),
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
@ -26,6 +27,7 @@ pub struct Process {
pub args: Vec<String>, pub args: Vec<String>,
state: Option<ProcessState>, state: Option<ProcessState>,
buffer: LineBuffer, buffer: LineBuffer,
err_buffer: LineBuffer,
callbacks: InitiatorCallbacks, callbacks: InitiatorCallbacks,
} }
@ -42,6 +44,10 @@ impl Process {
.stdout .stdout
.take() .take()
.expect("Child just spawned with piped stdout has no stdout"), .expect("Child just spawned with piped stdout has no stdout"),
child
.stderr
.take()
.expect("Child just spawned with piped stderr has no stderr"),
child, child,
)); ));
Ok(()) Ok(())
@ -49,13 +55,15 @@ impl Process {
} }
struct ProcessState { struct ProcessState {
pub child: Child,
pub stdout: ChildStdout, pub stdout: ChildStdout,
pub stderr: ChildStderr,
pub stderr_closed: bool,
pub child: Child,
} }
impl ProcessState { impl ProcessState {
pub fn new(stdout: ChildStdout, child: Child) -> Self { pub fn new(stdout: ChildStdout, stderr: ChildStderr, child: Child) -> Self {
Self { stdout, child } Self { stdout, stderr, stderr_closed: false, child }
} }
fn try_process(&mut self, buffer: &[u8], callbacks: &mut InitiatorCallbacks) -> usize { fn try_process(&mut self, buffer: &[u8], callbacks: &mut InitiatorCallbacks) -> usize {
@ -77,14 +85,27 @@ impl ProcessState {
} }
fn process_line(&mut self, line: &[u8], callbacks: &mut InitiatorCallbacks) { fn process_line(&mut self, line: &[u8], callbacks: &mut InitiatorCallbacks) {
match serde_json::from_slice::<InputMessage>(line) { if !line.is_empty() {
let res = std::str::from_utf8(line);
if let Err(error) = &res {
tracing::warn!(%error, "Initiator sent line with invalid UTF-8");
return;
}
let string = res.unwrap().trim();
// Ignore whitespace-only lines
if !string.is_empty() {
match serde_json::from_str::<InputMessage>(res.unwrap()) {
Ok(state) => { Ok(state) => {
tracing::trace!(?state, "got new state for process initiator"); tracing::trace!(?state, "got new state for process initiator");
let InputMessage::SetState(status) = state;
callbacks.set_status(status);
} }
Err(error) => tracing::warn!(%error, "process initiator did not send a valid line"), Err(error) => tracing::warn!(%error, "process initiator did not send a valid line"),
} }
} }
} }
}
}
impl Future for Process { impl Future for Process {
type Output = (); type Output = ();
@ -93,6 +114,7 @@ impl Future for Process {
if let Process { if let Process {
state: Some(state), state: Some(state),
buffer, buffer,
err_buffer,
callbacks, callbacks,
.. ..
} = self.get_mut() } = self.get_mut()
@ -129,6 +151,44 @@ impl Future for Process {
let processed = state.try_process(buffer, callbacks); let processed = state.try_process(buffer, callbacks);
buffer.consume(processed); buffer.consume(processed);
if !state.stderr_closed {
let stderr = &mut state.stderr;
loop {
let buf = err_buffer.get_mut_write(512);
match AsyncRead::poll_read(Pin::new(stderr), cx, buf) {
Poll::Pending => break,
Poll::Ready(Ok(read)) => {
err_buffer.advance_valid(read);
continue;
}
Poll::Ready(Err(error)) => {
tracing::warn!(%error, "reading from child stderr errored");
state.stderr_closed = true;
break;
}
}
}
}
{
let mut consumed = 0;
while let Some(idx) = buffer[consumed..].iter().position(|b| *b == b'\n') {
if idx == 0 {
consumed += 1;
continue;
}
let line = &buffer[consumed..(consumed + idx)];
match std::str::from_utf8(line) {
Ok(line) => tracing::debug!(line, "initiator STDERR"),
Err(error) => tracing::debug!(%error,
"invalid UTF-8 on initiator STDERR"),
}
consumed = idx;
}
err_buffer.consume(consumed);
}
return Poll::Pending; return Poll::Pending;
} }
} }
@ -158,6 +218,7 @@ impl Initiator for Process {
args, args,
state: None, state: None,
buffer: LineBuffer::new(), buffer: LineBuffer::new(),
err_buffer: LineBuffer::new(),
callbacks, callbacks,
}; };
this.spawn().into_diagnostic()?; this.spawn().into_diagnostic()?;

View File

@ -166,7 +166,7 @@ impl Resource {
self.inner.set_state(archived) self.inner.set_state(archived)
} }
fn set_status(&self, state: Status) { pub fn set_status(&self, state: Status) {
let old = self.inner.get_state(); let old = self.inner.get_state();
let oldref: &Archived<State> = old.as_ref(); let oldref: &Archived<State> = old.as_ref();
let previous: &Archived<Option<UserRef>> = &oldref.inner.previous; let previous: &Archived<Option<UserRef>> = &oldref.inner.previous;