Run the event network

This commit is contained in:
Gregor Reitzenstein 2020-12-01 09:44:18 +01:00
parent 8c1fbfd1a9
commit 4ee94b260b
4 changed files with 108 additions and 39 deletions

28
src/actor.rs Normal file
View File

@ -0,0 +1,28 @@
use std::future::Future;
use futures_signals::signal::Signal;
use crate::db::machine::MachineState;
use crate::registries::Actuator;
use crate::config::Settings;
use crate::error::Result;
pub struct Actor {
inner: Box<dyn Actuator>
}
impl Actor {
pub fn new(inner: Box<dyn Actuator>) -> Self {
Self { inner }
}
pub fn run(self, ex: Arc<Executor>) -> impl Future<Output=()> {
inner.for_each(|fut| {
ex.run(fut);
})
}
}
pub fn load(config: &Settings) -> Result<Vec<Actor>> {
unimplemented!()
}

9
src/initiator.rs Normal file
View File

@ -0,0 +1,9 @@
use smol::Task;
pub struct Initiator {
inner: Task<()>,
}
pub fn load(config: &crate::config::Settings) -> Result<Vec<Initiator>> {
unimplemented!()
}

View File

@ -65,9 +65,9 @@ impl Machine {
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Vec<Machine>> { pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Vec<Machine>> {
let map: HashMap<MachineIdentifier, MachineDescription> = MachineDescription::load_file(path)?; let map: HashMap<MachineIdentifier, MachineDescription> = MachineDescription::load_file(path)?;
map.drain().map(|(id, desc)| { Ok(map.drain().map(|(id, desc)| {
Self::construct(id, desc, MachineState::new()) Self::construct(id, desc, MachineState::new())
}).collect() }).collect())
} }
} }
@ -209,6 +209,10 @@ impl MachineDescription {
} }
} }
pub fn load(config: &crate::config::Settings) -> Result<Vec<Machine>> {
unimplemented!()
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -19,6 +19,9 @@ mod db;
mod machine; mod machine;
mod builtin; mod builtin;
mod server; mod server;
mod network;
mod actor;
mod initiator;
use clap::{App, Arg}; use clap::{App, Arg};
@ -38,13 +41,15 @@ use std::sync::Arc;
use lmdb::Transaction; use lmdb::Transaction;
use smol::net::TcpListener; use smol::net::TcpListener;
use smol::Executor;
use error::Error; use error::Error;
use slog::Logger;
use registries::Registries; use registries::Registries;
// Returning a `Result` from `main` allows us to use the `?` shorthand. fn main() {
// In the case of an Err it will be printed using `fmt::Debug`
fn maybe() -> Result<i32, Error> {
use clap::{crate_version, crate_description, crate_name}; use clap::{crate_version, crate_description, crate_name};
// Argument parsing // Argument parsing
@ -80,60 +85,83 @@ fn maybe() -> Result<i32, Error> {
// case. // case.
if matches.is_present("print default") { if matches.is_present("print default") {
let config = config::Settings::default(); let config = config::Settings::default();
let encoded = toml::to_vec(&config)?; let encoded = toml::to_vec(&config).unwrap();
// Direct writing to fd 1 is faster but also prevents any print-formatting that could // Direct writing to fd 1 is faster but also prevents any print-formatting that could
// invalidate the generated TOML // invalidate the generated TOML
let stdout = io::stdout(); let stdout = io::stdout();
let mut handle = stdout.lock(); let mut handle = stdout.lock();
handle.write_all(&encoded)?; handle.write_all(&encoded).unwrap();
// Early return to exit. // Early return to exit.
return Ok(0) return Ok(());
} }
// If no `config` option is given use a preset default. let retval;
let configpath = matches.value_of("config").unwrap_or("/etc/bffh/config.toml");
let config = config::read(&PathBuf::from_str(configpath).unwrap())?; // Scope to drop everything before exiting.
{
// Initialize the logging subsystem first to be able to better document the progress from now // Initialize the logging subsystem first to be able to better document the progress from now
// on. // on.
// TODO: Now would be a really good time to close stdin/out and move logging to syslog // TODO: Now would be a really good time to close stdin/out and move logging to syslog
// Log is in an Arc so we can do very cheap clones in closures. // Log is in an Arc so we can do very cheap clones in closures.
let log = Arc::new(log::init(&config)); let log = Arc::new(log::init());
info!(log, "Starting"); info!(log, "Starting");
match maybe(matches, log.clone()) {
Ok(_) => retval = 0,
Err(e) => {
error!(log, "{}", e);
retval = -1;
}
}
}
std::process::exit(retval);
}
// Returning a `Result` from `main` allows us to use the `?` shorthand.
// In the case of an Err it will be printed using `fmt::Debug`
fn maybe(matches: clap::ArgMatches, log: Arc<Logger>) -> Result<(), Error> {
// If no `config` option is given use a preset default.
let configpath = matches.value_of("config").unwrap_or("/etc/bffh/config.toml");
let config = config::read(&PathBuf::from_str(configpath).unwrap())?;
if matches.is_present("dump") { if matches.is_present("dump") {
error!(log, "Dumping is currently not implemented"); error!(log, "Dumping is currently not implemented");
Ok(-2) Ok(())
} else if matches.is_present("load") { } else if matches.is_present("load") {
error!(log, "Loading is currently not implemented"); error!(log, "Loading is currently not implemented");
Ok(-2) Ok(())
} else { } else {
let db = match db::Databases::new(&log, &config) { let db = db::Databases::new(&log, &config)?;
Err(e) => {
error!(log, "{}", e);
return Ok(-1);
},
Ok(ok) => ok
};
match server::serve_api_connections(log.clone(), config, db) { // TODO: Spawn api connections on their own (non-main) thread, use the main thread to
Err(e) => { // handle signals (a cli if stdin is not closed?) and make it stop and clean up all threads
error!(log, "{}", e); // when bffh should exit
Ok(-1) let machines = machine::load(&config)?;
}, let actors = actor::load(&config)?;
ok => Ok(0) let initiators = load_initiators(&config)?;
}
} // TODO restore connections between initiators, machines, actors
let ex = Arc::new(Executor::new());
for i in initiators.into_iter() {
ex.spawn(i.run());
} }
fn main() { for a in actors.into_iter() {
match maybe() { ex.spawn(a.run());
Ok(i) => std::process::exit(i), }
Err(e) => {
println!("{}", e); let (signal, shutdown) = futures::channel::oneshot::channel();
std::process::exit(-1); for i in 0..4 {
} std::thread::spawn(|| smol::block_on(ex.run(shutdown.recv())));
}
server::serve_api_connections(log.clone(), config, db)
// Signal is dropped here, stopping all executor threads as well.
} }
} }