From 4ee94b260b20116e9060d68a09dcfa7f00b3f605 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Tue, 1 Dec 2020 09:44:18 +0100 Subject: [PATCH] Run the event network --- src/actor.rs | 28 +++++++++++++ src/initiator.rs | 9 +++++ src/machine.rs | 8 +++- src/main.rs | 102 ++++++++++++++++++++++++++++++----------------- 4 files changed, 108 insertions(+), 39 deletions(-) create mode 100644 src/actor.rs create mode 100644 src/initiator.rs diff --git a/src/actor.rs b/src/actor.rs new file mode 100644 index 0000000..78d8392 --- /dev/null +++ b/src/actor.rs @@ -0,0 +1,28 @@ +use std::future::Future; + +use futures_signals::signal::Signal; + +use crate::db::machine::MachineState; +use crate::registries::Actuator; +use crate::config::Settings; +use crate::error::Result; + +pub struct Actor { + inner: Box +} + +impl Actor { + pub fn new(inner: Box) -> Self { + Self { inner } + } + + pub fn run(self, ex: Arc) -> impl Future { + inner.for_each(|fut| { + ex.run(fut); + }) + } +} + +pub fn load(config: &Settings) -> Result> { + unimplemented!() +} diff --git a/src/initiator.rs b/src/initiator.rs new file mode 100644 index 0000000..5314854 --- /dev/null +++ b/src/initiator.rs @@ -0,0 +1,9 @@ +use smol::Task; + +pub struct Initiator { + inner: Task<()>, +} + +pub fn load(config: &crate::config::Settings) -> Result> { + unimplemented!() +} diff --git a/src/machine.rs b/src/machine.rs index 3ff39a7..d5c3deb 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -65,9 +65,9 @@ impl Machine { pub fn from_file>(path: P) -> Result> { let map: HashMap = MachineDescription::load_file(path)?; - map.drain().map(|(id, desc)| { + Ok(map.drain().map(|(id, desc)| { Self::construct(id, desc, MachineState::new()) - }).collect() + }).collect()) } } @@ -209,6 +209,10 @@ impl MachineDescription { } } +pub fn load(config: &crate::config::Settings) -> Result> { + unimplemented!() +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/main.rs b/src/main.rs index c9be230..20a4789 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,6 +19,9 @@ mod db; mod machine; mod builtin; mod server; +mod network; +mod actor; +mod initiator; use clap::{App, Arg}; @@ -38,13 +41,15 @@ use std::sync::Arc; use lmdb::Transaction; use smol::net::TcpListener; +use smol::Executor; + use error::Error; +use slog::Logger; + use registries::Registries; -// Returning a `Result` from `main` allows us to use the `?` shorthand. -// In the case of an Err it will be printed using `fmt::Debug` -fn maybe() -> Result { +fn main() { use clap::{crate_version, crate_description, crate_name}; // Argument parsing @@ -80,60 +85,83 @@ fn maybe() -> Result { // case. if matches.is_present("print default") { let config = config::Settings::default(); - let encoded = toml::to_vec(&config)?; + let encoded = toml::to_vec(&config).unwrap(); // Direct writing to fd 1 is faster but also prevents any print-formatting that could // invalidate the generated TOML let stdout = io::stdout(); let mut handle = stdout.lock(); - handle.write_all(&encoded)?; + handle.write_all(&encoded).unwrap(); // Early return to exit. - return Ok(0) + return Ok(()); } + let retval; + + // Scope to drop everything before exiting. + { + // Initialize the logging subsystem first to be able to better document the progress from now + // on. + // TODO: Now would be a really good time to close stdin/out and move logging to syslog + // Log is in an Arc so we can do very cheap clones in closures. + let log = Arc::new(log::init()); + info!(log, "Starting"); + + match maybe(matches, log.clone()) { + Ok(_) => retval = 0, + Err(e) => { + error!(log, "{}", e); + retval = -1; + } + } + } + + std::process::exit(retval); +} + +// Returning a `Result` from `main` allows us to use the `?` shorthand. +// In the case of an Err it will be printed using `fmt::Debug` +fn maybe(matches: clap::ArgMatches, log: Arc) -> Result<(), Error> { // If no `config` option is given use a preset default. let configpath = matches.value_of("config").unwrap_or("/etc/bffh/config.toml"); let config = config::read(&PathBuf::from_str(configpath).unwrap())?; - // Initialize the logging subsystem first to be able to better document the progress from now - // on. - // TODO: Now would be a really good time to close stdin/out and move logging to syslog - // Log is in an Arc so we can do very cheap clones in closures. - let log = Arc::new(log::init(&config)); - info!(log, "Starting"); if matches.is_present("dump") { error!(log, "Dumping is currently not implemented"); - Ok(-2) + Ok(()) } else if matches.is_present("load") { error!(log, "Loading is currently not implemented"); - Ok(-2) + Ok(()) } else { - let db = match db::Databases::new(&log, &config) { - Err(e) => { - error!(log, "{}", e); - return Ok(-1); - }, - Ok(ok) => ok - }; + let db = db::Databases::new(&log, &config)?; - match server::serve_api_connections(log.clone(), config, db) { - Err(e) => { - error!(log, "{}", e); - Ok(-1) - }, - ok => Ok(0) - } - } -} - -fn main() { - match maybe() { - Ok(i) => std::process::exit(i), - Err(e) => { - println!("{}", e); - std::process::exit(-1); + // TODO: Spawn api connections on their own (non-main) thread, use the main thread to + // handle signals (a cli if stdin is not closed?) and make it stop and clean up all threads + // when bffh should exit + let machines = machine::load(&config)?; + let actors = actor::load(&config)?; + let initiators = load_initiators(&config)?; + + // TODO restore connections between initiators, machines, actors + + let ex = Arc::new(Executor::new()); + + for i in initiators.into_iter() { + ex.spawn(i.run()); } + + for a in actors.into_iter() { + ex.spawn(a.run()); + } + + let (signal, shutdown) = futures::channel::oneshot::channel(); + for i in 0..4 { + std::thread::spawn(|| smol::block_on(ex.run(shutdown.recv()))); + } + + server::serve_api_connections(log.clone(), config, db) + // Signal is dropped here, stopping all executor threads as well. } }