mirror of
https://gitlab.com/fabinfra/fabaccess/bffh.git
synced 2024-11-23 23:27:57 +01:00
Batch module draft
This commit is contained in:
parent
14402d627c
commit
0abf8278d3
22
Cargo.lock
generated
22
Cargo.lock
generated
@ -235,9 +235,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bitflags"
|
name = "bitflags"
|
||||||
version = "1.3.2"
|
version = "1.2.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
|
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "blake2b_simd"
|
name = "blake2b_simd"
|
||||||
@ -541,6 +541,7 @@ dependencies = [
|
|||||||
"rust-argon2",
|
"rust-argon2",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_dhall",
|
"serde_dhall",
|
||||||
|
"serde_json",
|
||||||
"signal-hook",
|
"signal-hook",
|
||||||
"slog",
|
"slog",
|
||||||
"slog-async",
|
"slog-async",
|
||||||
@ -1057,6 +1058,12 @@ dependencies = [
|
|||||||
"either",
|
"either",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "itoa"
|
||||||
|
version = "0.4.8"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lazy_static"
|
name = "lazy_static"
|
||||||
version = "1.4.0"
|
version = "1.4.0"
|
||||||
@ -1649,6 +1656,17 @@ dependencies = [
|
|||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde_json"
|
||||||
|
version = "1.0.68"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0f690853975602e1bfe1ccbf50504d67174e3bcf340f23b5ea9992e0587a52d8"
|
||||||
|
dependencies = [
|
||||||
|
"itoa",
|
||||||
|
"ryu",
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sha-1"
|
name = "sha-1"
|
||||||
version = "0.8.2"
|
version = "0.8.2"
|
||||||
|
@ -38,6 +38,7 @@ flexbuffers = "2.0.0"
|
|||||||
bincode = "2.0.0-dev"
|
bincode = "2.0.0-dev"
|
||||||
|
|
||||||
serde_dhall = { version = "0.10.1", default-features = false }
|
serde_dhall = { version = "0.10.1", default-features = false }
|
||||||
|
serde_json = "1.0"
|
||||||
|
|
||||||
uuid = { version = "0.8.2", features = ["serde", "v4"] }
|
uuid = { version = "0.8.2", features = ["serde", "v4"] }
|
||||||
|
|
||||||
|
@ -10,3 +10,6 @@ pub use shelly::Shelly;
|
|||||||
|
|
||||||
mod process;
|
mod process;
|
||||||
pub use process::Process;
|
pub use process::Process;
|
||||||
|
|
||||||
|
mod batch;
|
||||||
|
pub use batch::Batch;
|
||||||
|
165
src/modules/batch.rs
Normal file
165
src/modules/batch.rs
Normal file
@ -0,0 +1,165 @@
|
|||||||
|
use std::pin::Pin;
|
||||||
|
use std::cell::RefCell;
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::process::Stdio;
|
||||||
|
use smol::process::{Command, Child};
|
||||||
|
use smol::io::{AsyncWriteExt, AsyncReadExt};
|
||||||
|
|
||||||
|
use futures::future::FutureExt;
|
||||||
|
|
||||||
|
use crate::actor::Actuator;
|
||||||
|
use crate::initiator::Sensor;
|
||||||
|
use crate::db::machine::{MachineState, Status};
|
||||||
|
use crate::db::user::{User, Internal as UserDB};
|
||||||
|
use futures::future::BoxFuture;
|
||||||
|
|
||||||
|
use slog::Logger;
|
||||||
|
|
||||||
|
use serde::{Serialize, Deserialize};
|
||||||
|
|
||||||
|
pub struct Batch {
|
||||||
|
userdb: UserDB,
|
||||||
|
name: String,
|
||||||
|
cmd: String,
|
||||||
|
args: Vec<String>,
|
||||||
|
kill: bool,
|
||||||
|
child: Child,
|
||||||
|
stdout: RefCell<Pin<Box<dyn AsyncWrite>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Batch {
|
||||||
|
pub fn new(log: Logger, name: String, params: &HashMap<String, String>, userdb: UserDB)
|
||||||
|
-> Option<Self>
|
||||||
|
{
|
||||||
|
let cmd = params.get("cmd").map(|s| s.to_string())?;
|
||||||
|
let args = params.get("args").map(|argv|
|
||||||
|
argv.split_whitespace()
|
||||||
|
.map(|s| s.to_string())
|
||||||
|
.collect())
|
||||||
|
.unwrap_or_else(Vec::new);
|
||||||
|
|
||||||
|
let kill = params.get("kill_on_exit").and_then(|s|
|
||||||
|
s.parse()
|
||||||
|
.or_else(|| {
|
||||||
|
warn!(log, "Can't parse `kill_on_exit` for {} set as {} as boolean. \
|
||||||
|
Must be either \"True\" or \"False\".", &name, &s);
|
||||||
|
false
|
||||||
|
}));
|
||||||
|
|
||||||
|
info!(log, "Starting {} ({})…", &name, &cmd);
|
||||||
|
let mut child = Self::start(&name, &cmd, &args)
|
||||||
|
.map_err(|err| error!(log, "Failed to spawn {} ({}): {}", &name, &cmd, err))
|
||||||
|
.ok()?;
|
||||||
|
let stdout = Self::get_stdin(&mut child);
|
||||||
|
|
||||||
|
Ok(Self { userdb, name, cmd, args, kill, child, stdout })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start_actor(name: &String, cmd: &String, args: &Vec<String>) -> Result<Child> {
|
||||||
|
let mut command = Command::new(cmd);
|
||||||
|
command
|
||||||
|
.stdin(Stdio::piped())
|
||||||
|
.stdout(Stdio::null())
|
||||||
|
.stderr(Stdio::piped())
|
||||||
|
.args(args.iter())
|
||||||
|
.arg(name);
|
||||||
|
|
||||||
|
command
|
||||||
|
.spawn()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_stdout(child: &mut Child) -> Pin<Box<dyn AsyncWrite>> {
|
||||||
|
let stdout = child.stdout.expect("Actor child has closed stdout");
|
||||||
|
stdout.boxed_writer()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn maybe_restart(&mut self, f: &mut Option<impl Future<Item=()>>) -> bool {
|
||||||
|
let stat = self.child.try_status();
|
||||||
|
if stat.is_err() {
|
||||||
|
error!(self.log, "Can't check process for {} ({}) [{}]: {}",
|
||||||
|
&self.name, &self.cmd, self.child.id(), stat.unwrap_err());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if let Some(status) = stat.unwrap() {
|
||||||
|
warn!(self.log, "Process for {} ({}) exited with code {}",
|
||||||
|
&self.name, &self.cmd, status);
|
||||||
|
let errlog = self.log.new(o!("pid" => self.child.id()));
|
||||||
|
// If we have any stderr try to log it
|
||||||
|
if let Some(stderr) = self.child.stderr.take() {
|
||||||
|
f = Some(async move {
|
||||||
|
match stderr.into_stdio().await {
|
||||||
|
Err(err) => error!(errlog, "Failed to open actor process STDERR: ", err),
|
||||||
|
Ok(err) => if !retv.stderr.is_empty() {
|
||||||
|
let errstr = String::from_utf8_lossy(err);
|
||||||
|
for line in errstr.lines() {
|
||||||
|
warn!(errlog, "{}", line);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
info!(self.log, "Attempting to re-start {}", &self.name);
|
||||||
|
let mut child = Self::start(&self.name, &self.cmd, &self.args)
|
||||||
|
.map_err(|err| error!(log, "Failed to spawn {} ({}): {}", &self.name, &self.cmd, err))
|
||||||
|
.ok();
|
||||||
|
// Nothing else to do with the currect architecture. In reality we should fail here
|
||||||
|
// because we *didn't apply* the change.
|
||||||
|
if child.is_none() {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
self.child = child.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
struct StateChangeObj {
|
||||||
|
name: String,
|
||||||
|
state: MachineState,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StateChangeObj {
|
||||||
|
pub fn new(name: String, state: MachineState) -> Self {
|
||||||
|
Self { name, state }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Actuator for Batch {
|
||||||
|
fn apply(&mut self, state: MachineState) -> BoxFuture<'static, ()> {
|
||||||
|
debug!(self.log, "Giving {} ({}) new state: {:?}", &self.name, &self.cmd, &state);
|
||||||
|
|
||||||
|
let mut f = None;
|
||||||
|
if !self.maybe_restart(&mut f) {
|
||||||
|
return Box::pin(futures::future::ready(()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut json = String::new();
|
||||||
|
// Per default compact
|
||||||
|
let ser = serde_json::ser::Serializer::new(&mut json);
|
||||||
|
|
||||||
|
let change = StateChangeObj::new(self.name.clone(), state);
|
||||||
|
change.serialize(&mut ser);
|
||||||
|
|
||||||
|
// Verify that this "line" does not contain any whitespace.
|
||||||
|
debug_assert!(!json.chars().any(|c| c == "\n"));
|
||||||
|
|
||||||
|
let stdin = self.child.stdin.take().expect("Batch actor child has closed stdin?!");
|
||||||
|
|
||||||
|
let errlog = self.log.new(o!("pid" => self.child.id()));
|
||||||
|
let g = async move {
|
||||||
|
if let Some(f) = f {
|
||||||
|
f.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Err(e) = stdin.write(json).await {
|
||||||
|
error!(errlog, "Failed to send statechange to child: {}", e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Box::pin(g);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user