From 0abf8278d38c6b5e6036cdfc15dbf6bb4c3e56f3 Mon Sep 17 00:00:00 2001 From: Nadja Reitzenstein Date: Wed, 22 Sep 2021 08:39:50 +0200 Subject: [PATCH] Batch module draft --- Cargo.lock | 22 +++++- Cargo.toml | 1 + src/modules.rs | 3 + src/modules/batch.rs | 165 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 189 insertions(+), 2 deletions(-) create mode 100644 src/modules/batch.rs diff --git a/Cargo.lock b/Cargo.lock index 2fcd4eb..5455972 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -235,9 +235,9 @@ dependencies = [ [[package]] name = "bitflags" -version = "1.3.2" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" [[package]] name = "blake2b_simd" @@ -541,6 +541,7 @@ dependencies = [ "rust-argon2", "serde", "serde_dhall", + "serde_json", "signal-hook", "slog", "slog-async", @@ -1057,6 +1058,12 @@ dependencies = [ "either", ] +[[package]] +name = "itoa" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" + [[package]] name = "lazy_static" version = "1.4.0" @@ -1649,6 +1656,17 @@ dependencies = [ "url", ] +[[package]] +name = "serde_json" +version = "1.0.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f690853975602e1bfe1ccbf50504d67174e3bcf340f23b5ea9992e0587a52d8" +dependencies = [ + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha-1" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index 860c709..cb43104 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ flexbuffers = "2.0.0" bincode = "2.0.0-dev" serde_dhall = { version = "0.10.1", default-features = false } +serde_json = "1.0" uuid = { version = "0.8.2", features = ["serde", "v4"] } diff --git a/src/modules.rs b/src/modules.rs index 46b6427..d95aa42 100644 --- a/src/modules.rs +++ b/src/modules.rs @@ -10,3 +10,6 @@ pub use shelly::Shelly; mod process; pub use process::Process; + +mod batch; +pub use batch::Batch; diff --git a/src/modules/batch.rs b/src/modules/batch.rs new file mode 100644 index 0000000..e4f062e --- /dev/null +++ b/src/modules/batch.rs @@ -0,0 +1,165 @@ +use std::pin::Pin; +use std::cell::RefCell; + +use std::collections::HashMap; +use std::process::Stdio; +use smol::process::{Command, Child}; +use smol::io::{AsyncWriteExt, AsyncReadExt}; + +use futures::future::FutureExt; + +use crate::actor::Actuator; +use crate::initiator::Sensor; +use crate::db::machine::{MachineState, Status}; +use crate::db::user::{User, Internal as UserDB}; +use futures::future::BoxFuture; + +use slog::Logger; + +use serde::{Serialize, Deserialize}; + +pub struct Batch { + userdb: UserDB, + name: String, + cmd: String, + args: Vec, + kill: bool, + child: Child, + stdout: RefCell>>, +} + +impl Batch { + pub fn new(log: Logger, name: String, params: &HashMap, userdb: UserDB) + -> Option + { + let cmd = params.get("cmd").map(|s| s.to_string())?; + let args = params.get("args").map(|argv| + argv.split_whitespace() + .map(|s| s.to_string()) + .collect()) + .unwrap_or_else(Vec::new); + + let kill = params.get("kill_on_exit").and_then(|s| + s.parse() + .or_else(|| { + warn!(log, "Can't parse `kill_on_exit` for {} set as {} as boolean. \ + Must be either \"True\" or \"False\".", &name, &s); + false + })); + + info!(log, "Starting {} ({})…", &name, &cmd); + let mut child = Self::start(&name, &cmd, &args) + .map_err(|err| error!(log, "Failed to spawn {} ({}): {}", &name, &cmd, err)) + .ok()?; + let stdout = Self::get_stdin(&mut child); + + Ok(Self { userdb, name, cmd, args, kill, child, stdout }) + } + + fn start_actor(name: &String, cmd: &String, args: &Vec) -> Result { + let mut command = Command::new(cmd); + command + .stdin(Stdio::piped()) + .stdout(Stdio::null()) + .stderr(Stdio::piped()) + .args(args.iter()) + .arg(name); + + command + .spawn() + } + + fn get_stdout(child: &mut Child) -> Pin> { + let stdout = child.stdout.expect("Actor child has closed stdout"); + stdout.boxed_writer() + } + + fn maybe_restart(&mut self, f: &mut Option>) -> bool { + let stat = self.child.try_status(); + if stat.is_err() { + error!(self.log, "Can't check process for {} ({}) [{}]: {}", + &self.name, &self.cmd, self.child.id(), stat.unwrap_err()); + return false; + } + if let Some(status) = stat.unwrap() { + warn!(self.log, "Process for {} ({}) exited with code {}", + &self.name, &self.cmd, status); + let errlog = self.log.new(o!("pid" => self.child.id())); + // If we have any stderr try to log it + if let Some(stderr) = self.child.stderr.take() { + f = Some(async move { + match stderr.into_stdio().await { + Err(err) => error!(errlog, "Failed to open actor process STDERR: ", err), + Ok(err) => if !retv.stderr.is_empty() { + let errstr = String::from_utf8_lossy(err); + for line in errstr.lines() { + warn!(errlog, "{}", line); + } + } + _ => {} + } + }); + } + info!(self.log, "Attempting to re-start {}", &self.name); + let mut child = Self::start(&self.name, &self.cmd, &self.args) + .map_err(|err| error!(log, "Failed to spawn {} ({}): {}", &self.name, &self.cmd, err)) + .ok(); + // Nothing else to do with the currect architecture. In reality we should fail here + // because we *didn't apply* the change. + if child.is_none() { + false + } + self.child = child.unwrap(); + } + + true + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct StateChangeObj { + name: String, + state: MachineState, +} + +impl StateChangeObj { + pub fn new(name: String, state: MachineState) -> Self { + Self { name, state } + } +} + +impl Actuator for Batch { + fn apply(&mut self, state: MachineState) -> BoxFuture<'static, ()> { + debug!(self.log, "Giving {} ({}) new state: {:?}", &self.name, &self.cmd, &state); + + let mut f = None; + if !self.maybe_restart(&mut f) { + return Box::pin(futures::future::ready(())); + } + + let mut json = String::new(); + // Per default compact + let ser = serde_json::ser::Serializer::new(&mut json); + + let change = StateChangeObj::new(self.name.clone(), state); + change.serialize(&mut ser); + + // Verify that this "line" does not contain any whitespace. + debug_assert!(!json.chars().any(|c| c == "\n")); + + let stdin = self.child.stdin.take().expect("Batch actor child has closed stdin?!"); + + let errlog = self.log.new(o!("pid" => self.child.id())); + let g = async move { + if let Some(f) = f { + f.await; + } + + if let Err(e) = stdin.write(json).await { + error!(errlog, "Failed to send statechange to child: {}", e); + } + }; + + Box::pin(g); + } +}