Process initiator working

This commit is contained in:
Nadja Reitzenstein 2022-06-18 16:52:22 +02:00
parent 6d8d1384d9
commit a66303566a
8 changed files with 275 additions and 2 deletions

18
Cargo.lock generated
View File

@ -226,6 +226,23 @@ dependencies = [
"futures-micro", "futures-micro",
] ]
[[package]]
name = "async-process"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf2c06e30a24e8c78a3987d07f0930edf76ef35e027e7bdb063fccafdad1f60c"
dependencies = [
"async-io",
"blocking",
"cfg-if",
"event-listener",
"futures-lite",
"libc",
"once_cell",
"signal-hook",
"winapi",
]
[[package]] [[package]]
name = "async-std" name = "async-std"
version = "1.10.0" version = "1.10.0"
@ -839,6 +856,7 @@ dependencies = [
"async-io", "async-io",
"async-net", "async-net",
"async-oneshot", "async-oneshot",
"async-process",
"async-trait", "async-trait",
"backtrace", "backtrace",
"capnp", "capnp",

View File

@ -38,6 +38,7 @@ futures-util = "0.3"
futures-lite = "1.12.0" futures-lite = "1.12.0"
async-net = "1.6.1" async-net = "1.6.1"
async-io = "1.7.0" async-io = "1.7.0"
async-process = "1.4.0"
backtrace = "0.3.65" backtrace = "0.3.65"
miette = { version = "4.7.1", features = ["fancy"] } miette = { version = "4.7.1", features = ["fancy"] }
thiserror = "1.0.31" thiserror = "1.0.31"

View File

@ -1,4 +1,5 @@
use crate::initiators::dummy::Dummy; use crate::initiators::dummy::Dummy;
use crate::initiators::process::Process;
use crate::resources::modules::fabaccess::Status; use crate::resources::modules::fabaccess::Status;
use crate::session::SessionHandle; use crate::session::SessionHandle;
use crate::{ use crate::{
@ -19,6 +20,7 @@ use std::time::Duration;
use url::Url; use url::Url;
mod dummy; mod dummy;
mod process;
pub trait Initiator: Future<Output = ()> { pub trait Initiator: Future<Output = ()> {
fn new(params: &HashMap<String, String>, callbacks: InitiatorCallbacks) -> miette::Result<Self> fn new(params: &HashMap<String, String>, callbacks: InitiatorCallbacks) -> miette::Result<Self>
@ -78,7 +80,7 @@ impl Future for InitiatorDriver {
ready!(Pin::new(&mut self.initiator).poll(cx)); ready!(Pin::new(&mut self.initiator).poll(cx));
tracing::warn!(initiator=%self.name, "an initiator module ran to completion!"); tracing::warn!(initiator=%self.name, "initiator module ran to completion!");
Poll::Ready(()) Poll::Ready(())
} }
@ -139,6 +141,12 @@ fn load_single(
resource, resource,
sessions.clone(), sessions.clone(),
)), )),
"Process" => Some(InitiatorDriver::new::<Process>(
name.clone(),
params,
resource,
sessions.clone(),
)),
_ => None, _ => None,
}; };

166
bffhd/initiators/process.rs Normal file
View File

@ -0,0 +1,166 @@
use super::Initiator;
use super::InitiatorCallbacks;
use crate::resources::state::State;
use crate::utils::linebuffer::LineBuffer;
use async_process::{Child, ChildStdout, Command, Stdio};
use futures_lite::{ready, AsyncRead};
use miette::{miette, IntoDiagnostic};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Debug, Serialize, Deserialize)]
pub enum InputMessage {
#[serde(rename = "state")]
SetState(State),
}
#[derive(Serialize, Deserialize)]
pub struct OutputLine {}
pub struct Process {
pub cmd: String,
pub args: Vec<String>,
state: Option<ProcessState>,
buffer: LineBuffer,
callbacks: InitiatorCallbacks,
}
impl Process {
fn spawn(&mut self) -> io::Result<()> {
let mut child = Command::new(&self.cmd)
.args(&self.args)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
self.state = Some(ProcessState::new(
child
.stdout
.take()
.expect("Child just spawned with piped stdout has no stdout"),
child,
));
Ok(())
}
}
struct ProcessState {
pub child: Child,
pub stdout: ChildStdout,
}
impl ProcessState {
pub fn new(stdout: ChildStdout, child: Child) -> Self {
Self { stdout, child }
}
fn try_process(&mut self, buffer: &[u8], callbacks: &mut InitiatorCallbacks) -> usize {
tracing::trace!("trying to process current buffer");
let mut end = 0;
while let Some(idx) = buffer[end..].iter().position(|b| *b == b'\n') {
if idx == 0 {
end += 1;
continue;
}
let line = &buffer[end..(end + idx)];
self.process_line(line, callbacks);
end = idx;
}
end
}
fn process_line(&mut self, line: &[u8], callbacks: &mut InitiatorCallbacks) {
match serde_json::from_slice::<InputMessage>(line) {
Ok(state) => {
tracing::trace!(?state, "got new state for process initiator");
}
Err(error) => tracing::warn!(%error, "process initiator did not send a valid line"),
}
}
}
impl Future for Process {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Process {
state: Some(state),
buffer,
callbacks,
..
} = self.get_mut()
{
match state.child.try_status() {
Err(error) => {
tracing::error!(%error, "checking child exit code returned an error");
return Poll::Ready(());
}
Ok(Some(exitcode)) => {
tracing::warn!(%exitcode, "child process exited");
return Poll::Ready(());
}
Ok(None) => {
tracing::trace!("process initiator checking on process");
let stdout = &mut state.stdout;
loop {
let buf = buffer.get_mut_write(512);
match AsyncRead::poll_read(Pin::new(stdout), cx, buf) {
Poll::Pending => break,
Poll::Ready(Ok(read)) => {
buffer.advance_valid(read);
continue;
}
Poll::Ready(Err(error)) => {
tracing::warn!(%error, "reading from child stdout errored");
return Poll::Ready(());
}
}
}
let processed = state.try_process(buffer, callbacks);
buffer.consume(processed);
return Poll::Pending;
}
}
} else {
tracing::warn!("process initiator has no process attached!");
}
Poll::Ready(())
}
}
impl Initiator for Process {
fn new(params: &HashMap<String, String>, callbacks: InitiatorCallbacks) -> miette::Result<Self>
where
Self: Sized,
{
let cmd = params
.get("cmd")
.ok_or(miette!("Process initiator requires a `cmd` parameter."))?
.clone();
let args = params
.get("args")
.map(|argv| argv.split_whitespace().map(|s| s.to_string()).collect())
.unwrap_or_else(Vec::new);
let mut this = Self {
cmd,
args,
state: None,
buffer: LineBuffer::new(),
callbacks,
};
this.spawn().into_diagnostic()?;
Ok(this)
}
}

View File

@ -1,9 +1,9 @@
use crate::config::deser_option;
use crate::utils::oid::ObjectIdentifier; use crate::utils::oid::ObjectIdentifier;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use rkyv::{Archive, Archived, Deserialize, Infallible}; use rkyv::{Archive, Archived, Deserialize, Infallible};
use std::fmt; use std::fmt;
use std::fmt::Write; use std::fmt::Write;
use std::str::FromStr; use std::str::FromStr;
//use crate::oidvalue; //use crate::oidvalue;
@ -54,6 +54,11 @@ pub enum Status {
/// The status of the machine /// The status of the machine
pub struct MachineState { pub struct MachineState {
pub state: Status, pub state: Status,
#[serde(
default,
skip_serializing_if = "Option::is_none",
deserialize_with = "deser_option"
)]
pub previous: Option<UserRef>, pub previous: Option<UserRef>,
} }

60
bffhd/utils/linebuffer.rs Normal file
View File

@ -0,0 +1,60 @@
use std::ops::{Deref, DerefMut};
pub struct LineBuffer {
buffer: Vec<u8>,
valid: usize,
}
impl LineBuffer {
pub fn new() -> Self {
Self {
buffer: Vec::new(),
valid: 0,
}
}
/// Resize the internal Vec so that buffer.len() == buffer.capacity()
fn resize(&mut self) {
// SAFETY: Whatever is in memory is always valid as u8.
unsafe { self.buffer.set_len(self.buffer.capacity()) }
}
/// Get an (initialized but empty) writeable buffer of at least `atleast` bytes
pub fn get_mut_write(&mut self, atleast: usize) -> &mut [u8] {
let avail = self.buffer.len() - self.valid;
if avail < atleast {
self.buffer.reserve(atleast - avail);
self.resize()
}
&mut self.buffer[self.valid..]
}
pub fn advance_valid(&mut self, amount: usize) {
self.valid += amount
}
/// Mark `amount` bytes as 'consumed'
///
/// This will move any remaining data to the start of the buffer for future processing
pub fn consume(&mut self, amount: usize) {
assert!(amount <= self.valid);
if amount < self.valid {
self.buffer.copy_within(amount..self.valid, 0);
}
self.valid -= amount;
}
}
impl Deref for LineBuffer {
type Target = [u8];
fn deref(&self) -> &Self::Target {
&self.buffer[0..self.valid]
}
}
impl DerefMut for LineBuffer {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.buffer[0..self.valid]
}
}

View File

@ -8,3 +8,5 @@ pub mod varint;
pub mod l10nstring; pub mod l10nstring;
pub mod uuid; pub mod uuid;
pub mod linebuffer;

13
examples/init.py Executable file
View File

@ -0,0 +1,13 @@
#!/usr/bin/env python
import sys
import time
while True:
print('{ "state": { "1.3.6.1.4.1.48398.612.2.4": { "state": "Free" } } }')
sys.stdout.flush()
time.sleep(2)
print('{ "state": { "1.3.6.1.4.1.48398.612.2.4": { "state": { "InUse": { "id": "Testuser" } } } } }')
sys.stdout.flush()
time.sleep(2)