From a66303566aa964ad3490fedbb7a2b1c3ea3daace Mon Sep 17 00:00:00 2001 From: Nadja Reitzenstein Date: Sat, 18 Jun 2022 16:52:22 +0200 Subject: [PATCH] Process initiator working --- Cargo.lock | 18 +++ Cargo.toml | 1 + bffhd/initiators/mod.rs | 10 +- bffhd/initiators/process.rs | 166 +++++++++++++++++++++++++++ bffhd/resources/modules/fabaccess.rs | 7 +- bffhd/utils/linebuffer.rs | 60 ++++++++++ bffhd/utils/mod.rs | 2 + examples/init.py | 13 +++ 8 files changed, 275 insertions(+), 2 deletions(-) create mode 100644 bffhd/initiators/process.rs create mode 100644 bffhd/utils/linebuffer.rs create mode 100755 examples/init.py diff --git a/Cargo.lock b/Cargo.lock index d8cc667..1a9d6f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -226,6 +226,23 @@ dependencies = [ "futures-micro", ] +[[package]] +name = "async-process" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf2c06e30a24e8c78a3987d07f0930edf76ef35e027e7bdb063fccafdad1f60c" +dependencies = [ + "async-io", + "blocking", + "cfg-if", + "event-listener", + "futures-lite", + "libc", + "once_cell", + "signal-hook", + "winapi", +] + [[package]] name = "async-std" version = "1.10.0" @@ -839,6 +856,7 @@ dependencies = [ "async-io", "async-net", "async-oneshot", + "async-process", "async-trait", "backtrace", "capnp", diff --git a/Cargo.toml b/Cargo.toml index a082ad7..a50d54c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ futures-util = "0.3" futures-lite = "1.12.0" async-net = "1.6.1" async-io = "1.7.0" +async-process = "1.4.0" backtrace = "0.3.65" miette = { version = "4.7.1", features = ["fancy"] } thiserror = "1.0.31" diff --git a/bffhd/initiators/mod.rs b/bffhd/initiators/mod.rs index 7597cd8..e4c9714 100644 --- a/bffhd/initiators/mod.rs +++ b/bffhd/initiators/mod.rs @@ -1,4 +1,5 @@ use crate::initiators::dummy::Dummy; +use crate::initiators::process::Process; use crate::resources::modules::fabaccess::Status; use crate::session::SessionHandle; use crate::{ @@ -19,6 +20,7 @@ use std::time::Duration; use url::Url; mod dummy; +mod process; pub trait Initiator: Future { fn new(params: &HashMap, callbacks: InitiatorCallbacks) -> miette::Result @@ -78,7 +80,7 @@ impl Future for InitiatorDriver { ready!(Pin::new(&mut self.initiator).poll(cx)); - tracing::warn!(initiator=%self.name, "an initiator module ran to completion!"); + tracing::warn!(initiator=%self.name, "initiator module ran to completion!"); Poll::Ready(()) } @@ -139,6 +141,12 @@ fn load_single( resource, sessions.clone(), )), + "Process" => Some(InitiatorDriver::new::( + name.clone(), + params, + resource, + sessions.clone(), + )), _ => None, }; diff --git a/bffhd/initiators/process.rs b/bffhd/initiators/process.rs new file mode 100644 index 0000000..74de4f5 --- /dev/null +++ b/bffhd/initiators/process.rs @@ -0,0 +1,166 @@ +use super::Initiator; +use super::InitiatorCallbacks; +use crate::resources::state::State; +use crate::utils::linebuffer::LineBuffer; +use async_process::{Child, ChildStdout, Command, Stdio}; +use futures_lite::{ready, AsyncRead}; +use miette::{miette, IntoDiagnostic}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[derive(Debug, Serialize, Deserialize)] +pub enum InputMessage { + #[serde(rename = "state")] + SetState(State), +} + +#[derive(Serialize, Deserialize)] +pub struct OutputLine {} + +pub struct Process { + pub cmd: String, + pub args: Vec, + state: Option, + buffer: LineBuffer, + callbacks: InitiatorCallbacks, +} + +impl Process { + fn spawn(&mut self) -> io::Result<()> { + let mut child = Command::new(&self.cmd) + .args(&self.args) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + self.state = Some(ProcessState::new( + child + .stdout + .take() + .expect("Child just spawned with piped stdout has no stdout"), + child, + )); + Ok(()) + } +} + +struct ProcessState { + pub child: Child, + pub stdout: ChildStdout, +} + +impl ProcessState { + pub fn new(stdout: ChildStdout, child: Child) -> Self { + Self { stdout, child } + } + + fn try_process(&mut self, buffer: &[u8], callbacks: &mut InitiatorCallbacks) -> usize { + tracing::trace!("trying to process current buffer"); + + let mut end = 0; + + while let Some(idx) = buffer[end..].iter().position(|b| *b == b'\n') { + if idx == 0 { + end += 1; + continue; + } + let line = &buffer[end..(end + idx)]; + self.process_line(line, callbacks); + end = idx; + } + + end + } + + fn process_line(&mut self, line: &[u8], callbacks: &mut InitiatorCallbacks) { + match serde_json::from_slice::(line) { + Ok(state) => { + tracing::trace!(?state, "got new state for process initiator"); + } + Err(error) => tracing::warn!(%error, "process initiator did not send a valid line"), + } + } +} + +impl Future for Process { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if let Process { + state: Some(state), + buffer, + callbacks, + .. + } = self.get_mut() + { + match state.child.try_status() { + Err(error) => { + tracing::error!(%error, "checking child exit code returned an error"); + return Poll::Ready(()); + } + Ok(Some(exitcode)) => { + tracing::warn!(%exitcode, "child process exited"); + return Poll::Ready(()); + } + Ok(None) => { + tracing::trace!("process initiator checking on process"); + + let stdout = &mut state.stdout; + + loop { + let buf = buffer.get_mut_write(512); + match AsyncRead::poll_read(Pin::new(stdout), cx, buf) { + Poll::Pending => break, + Poll::Ready(Ok(read)) => { + buffer.advance_valid(read); + continue; + } + Poll::Ready(Err(error)) => { + tracing::warn!(%error, "reading from child stdout errored"); + return Poll::Ready(()); + } + } + } + + let processed = state.try_process(buffer, callbacks); + buffer.consume(processed); + + return Poll::Pending; + } + } + } else { + tracing::warn!("process initiator has no process attached!"); + } + + Poll::Ready(()) + } +} + +impl Initiator for Process { + fn new(params: &HashMap, callbacks: InitiatorCallbacks) -> miette::Result + where + Self: Sized, + { + let cmd = params + .get("cmd") + .ok_or(miette!("Process initiator requires a `cmd` parameter."))? + .clone(); + let args = params + .get("args") + .map(|argv| argv.split_whitespace().map(|s| s.to_string()).collect()) + .unwrap_or_else(Vec::new); + let mut this = Self { + cmd, + args, + state: None, + buffer: LineBuffer::new(), + callbacks, + }; + this.spawn().into_diagnostic()?; + Ok(this) + } +} diff --git a/bffhd/resources/modules/fabaccess.rs b/bffhd/resources/modules/fabaccess.rs index 1863aab..b50e9b6 100644 --- a/bffhd/resources/modules/fabaccess.rs +++ b/bffhd/resources/modules/fabaccess.rs @@ -1,9 +1,9 @@ +use crate::config::deser_option; use crate::utils::oid::ObjectIdentifier; use once_cell::sync::Lazy; use rkyv::{Archive, Archived, Deserialize, Infallible}; use std::fmt; use std::fmt::Write; - use std::str::FromStr; //use crate::oidvalue; @@ -54,6 +54,11 @@ pub enum Status { /// The status of the machine pub struct MachineState { pub state: Status, + #[serde( + default, + skip_serializing_if = "Option::is_none", + deserialize_with = "deser_option" + )] pub previous: Option, } diff --git a/bffhd/utils/linebuffer.rs b/bffhd/utils/linebuffer.rs new file mode 100644 index 0000000..87d6559 --- /dev/null +++ b/bffhd/utils/linebuffer.rs @@ -0,0 +1,60 @@ +use std::ops::{Deref, DerefMut}; + +pub struct LineBuffer { + buffer: Vec, + valid: usize, +} + +impl LineBuffer { + pub fn new() -> Self { + Self { + buffer: Vec::new(), + valid: 0, + } + } + + /// Resize the internal Vec so that buffer.len() == buffer.capacity() + fn resize(&mut self) { + // SAFETY: Whatever is in memory is always valid as u8. + unsafe { self.buffer.set_len(self.buffer.capacity()) } + } + + /// Get an (initialized but empty) writeable buffer of at least `atleast` bytes + pub fn get_mut_write(&mut self, atleast: usize) -> &mut [u8] { + let avail = self.buffer.len() - self.valid; + if avail < atleast { + self.buffer.reserve(atleast - avail); + self.resize() + } + &mut self.buffer[self.valid..] + } + + pub fn advance_valid(&mut self, amount: usize) { + self.valid += amount + } + + /// Mark `amount` bytes as 'consumed' + /// + /// This will move any remaining data to the start of the buffer for future processing + pub fn consume(&mut self, amount: usize) { + assert!(amount <= self.valid); + + if amount < self.valid { + self.buffer.copy_within(amount..self.valid, 0); + } + self.valid -= amount; + } +} + +impl Deref for LineBuffer { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.buffer[0..self.valid] + } +} +impl DerefMut for LineBuffer { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.buffer[0..self.valid] + } +} diff --git a/bffhd/utils/mod.rs b/bffhd/utils/mod.rs index c14ec8f..d65f75c 100644 --- a/bffhd/utils/mod.rs +++ b/bffhd/utils/mod.rs @@ -8,3 +8,5 @@ pub mod varint; pub mod l10nstring; pub mod uuid; + +pub mod linebuffer; diff --git a/examples/init.py b/examples/init.py new file mode 100755 index 0000000..73ed4e2 --- /dev/null +++ b/examples/init.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python + +import sys +import time + +while True: + print('{ "state": { "1.3.6.1.4.1.48398.612.2.4": { "state": "Free" } } }') + sys.stdout.flush() + time.sleep(2) + + print('{ "state": { "1.3.6.1.4.1.48398.612.2.4": { "state": { "InUse": { "id": "Testuser" } } } } }') + sys.stdout.flush() + time.sleep(2)