From 5a42b34fe3bfcfa8daad63f0ec7d2d73dd8804f6 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Mon, 14 Dec 2020 11:02:46 +0100 Subject: [PATCH] Better initialization --- src/actor.rs | 40 +++++++++++++++++++++++++++++-------- src/config.rs | 46 ++++++++++++++++++++++++++++++++++++------- src/main.rs | 20 +++++++++++-------- src/modules.rs | 1 + src/modules/shelly.rs | 2 +- src/network.rs | 1 + 6 files changed, 86 insertions(+), 24 deletions(-) diff --git a/src/actor.rs b/src/actor.rs index 24cda62..3f2fc82 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -11,11 +11,13 @@ use futures::channel::mpsc; use futures_signals::signal::{Signal, MutableSignalCloned, MutableSignal, Mutable}; use crate::db::machine::MachineState; -use crate::config::Settings; +use crate::config::Config; use crate::error::Result; - use crate::network::ActorMap; +use paho_mqtt::AsyncClient; +use slog::Logger; + pub trait Actuator { fn apply(&mut self, state: MachineState) -> BoxFuture<'static, ()>; } @@ -104,12 +106,34 @@ impl Actuator for Dummy { } } -pub fn load() -> Result<(ActorMap, Vec)> { - let d = Box::new(Dummy); - let (tx, a) = Actor::wrap(d); - +pub fn load(log: &Logger, client: &AsyncClient, config: &Config) -> Result<(ActorMap, Vec)> { let mut map = HashMap::new(); - map.insert("Dummy".to_string(), tx); - Ok(( map, vec![a] )) + let actuators = config.actors.iter() + .map(|(k,v)| (k, load_single(log, client, k, &v.name, &v.params))) + .filter_map(|(k, n)| match n { + None => None, + Some(a) => Some((k, a)) + }); + + let mut v = Vec::new(); + for (name, actuator) in actuators { + let (tx, a) = Actor::wrap(actuator); + map.insert(name.clone(), tx); + v.push(a); + } + + + Ok(( map, v )) +} + +fn load_single(log: &Logger, client: &AsyncClient, name: &String, module_name: &String, params: &HashMap) -> Option> { + use crate::modules::*; + + match module_name.as_ref() { + "Shelly" => { + Some(Box::new(Shelly::new(log, name.clone(), client.clone()))) + } + _ => None, + } } diff --git a/src/config.rs b/src/config.rs index 6a5ce8b..3f3dafc 100644 --- a/src/config.rs +++ b/src/config.rs @@ -12,7 +12,9 @@ use crate::machine::MachineDescription; use crate::db::machine::MachineIdentifier; pub fn read(path: &Path) -> Result { - serde_dhall::from_file(path).parse().map_err(Into::into) + serde_dhall::from_file(path) + .parse() + .map_err(Into::into) } #[deprecated] @@ -29,8 +31,13 @@ pub struct Config { /// Machine descriptions to load pub machines: HashMap, - /// Modules to load and their configuration options - pub modules: HashMap>, + /// Actors to load and their configuration options + pub actors: HashMap, + + /// Initiators to load and their configuration options + pub initiators: HashMap, + + pub mqtt_url: String, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -39,13 +46,38 @@ pub struct Listen { pub port: Option, } -impl Default for Settings { +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ModuleConfig { + pub name: String, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub params: HashMap +} + +impl Default for Config { fn default() -> Self { - let modules: HashMap::> = HashMap::new(); + let mut actors: HashMap:: = HashMap::new(); + let mut initiators: HashMap:: = HashMap::new(); + + actors.insert("Actor".to_string(), ModuleConfig { + name: "Shelly".to_string(), + params: HashMap::new(), + }); + initiators.insert("Initiator".to_string(), ModuleConfig { + name: "TCP-Listen".to_string(), + params: HashMap::new(), + }); + Config { - listens: Box::new([]), + listens: Box::new([ + Listen { + address: "localhost".to_string(), + port: Some(DEFAULT_PORT), + } + ]), machines: HashMap::new(), - modules: modules, + actors: actors, + initiators: initiators, + mqtt_url: "tcp://localhost:1883".to_string(), } } } diff --git a/src/main.rs b/src/main.rs index ec6091a..e64665a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -46,6 +46,8 @@ use error::Error; use slog::Logger; +use paho_mqtt::AsyncClient; + fn main() { use clap::{crate_version, crate_description, crate_name}; @@ -81,14 +83,14 @@ fn main() { // Check for the --print-default option first because we don't need to do anything else in that // case. if matches.is_present("print default") { - let config = config::Settings::default(); - let encoded = toml::to_vec(&config).unwrap(); + let config = config::Config::default(); + let encoded = serde_dhall::serialize(&config).to_string().unwrap(); // Direct writing to fd 1 is faster but also prevents any print-formatting that could // invalidate the generated TOML let stdout = io::stdout(); let mut handle = stdout.lock(); - handle.write_all(&encoded).unwrap(); + handle.write_all(&encoded.as_bytes()).unwrap(); // Early return to exit. return; @@ -123,7 +125,7 @@ fn maybe(matches: clap::ArgMatches, log: Arc) -> Result<(), Error> { // If no `config` option is given use a preset default. let configpath = matches.value_of("config").unwrap_or("/etc/bffh/config.toml"); let config = config::read(&PathBuf::from_str(configpath).unwrap())?; - + debug!(log, "Loaded Config: {:?}", config); if matches.is_present("dump") { error!(log, "Dumping is currently not implemented"); @@ -134,8 +136,13 @@ fn maybe(matches: clap::ArgMatches, log: Arc) -> Result<(), Error> { } else { let ex = Executor::new(); + let mqtt = AsyncClient::new(config.mqtt_url.clone())?; + let tok = mqtt.connect(paho_mqtt::ConnectOptions::new()); + + smol::block_on(tok); + let machines = machine::load(&config)?; - let (mut actor_map, actors) = actor::load()?; + let (mut actor_map, actors) = actor::load(&log, &mqtt, &config)?; let (mut init_map, initiators) = initiator::load()?; let network = network::Network::new(machines, actor_map, init_map); @@ -143,9 +150,6 @@ fn maybe(matches: clap::ArgMatches, log: Arc) -> Result<(), Error> { // TODO HERE: restore connections between initiators, machines, actors - // TODO HERE: Spawn all actors & inits - - // Like so let actor_tasks = actors.into_iter().map(|actor| ex.spawn(actor)); let init_tasks = initiators.into_iter().map(|init| ex.spawn(init)); diff --git a/src/modules.rs b/src/modules.rs index f2decc8..51e218c 100644 --- a/src/modules.rs +++ b/src/modules.rs @@ -8,6 +8,7 @@ use slog::Logger; mod shelly; +pub use shelly::Shelly; use futures::prelude::*; use futures::task::Spawn; diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index b673b7b..574ad9c 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -23,7 +23,7 @@ use paho_mqtt as mqtt; /// This actuator will toggle the shellie with the given `name`. /// If you need to toggle shellies on multiple brokers you need multiple instanced of this /// actuator with different clients. -struct Shelly { +pub struct Shelly { log: Logger, name: String, client: mqtt::AsyncClient, diff --git a/src/network.rs b/src/network.rs index d45fac4..3e93bf4 100644 --- a/src/network.rs +++ b/src/network.rs @@ -39,6 +39,7 @@ impl fmt::Display for Error { /// Main signal network /// /// Network as per FRP, not the one with packages and frames +// TODO De/Serialize established connection on startup/shutdown. pub struct Network { inits: InitMap,