From 1dc8dc47102003c573d8286e931f07699523b627 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Mon, 14 Dec 2020 12:39:01 +0100 Subject: [PATCH] Even more improved init --- src/actor.rs | 23 ++++++++++++++++++--- src/config.rs | 31 +++++++++++++++++++++++----- src/initiator.rs | 48 +++++++++++++++++++++++++++++++++++++------ src/machine.rs | 2 +- src/main.rs | 18 ++++++++++++---- src/modules/shelly.rs | 1 + 6 files changed, 104 insertions(+), 19 deletions(-) diff --git a/src/actor.rs b/src/actor.rs index 3f2fc82..9e9a684 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -110,7 +110,7 @@ pub fn load(log: &Logger, client: &AsyncClient, config: &Config) -> Result<(Acto let mut map = HashMap::new(); let actuators = config.actors.iter() - .map(|(k,v)| (k, load_single(log, client, k, &v.name, &v.params))) + .map(|(k,v)| (k, load_single(log, client, k, &v.module, &v.params))) .filter_map(|(k, n)| match n { None => None, Some(a) => Some((k, a)) @@ -127,13 +127,30 @@ pub fn load(log: &Logger, client: &AsyncClient, config: &Config) -> Result<(Acto Ok(( map, v )) } -fn load_single(log: &Logger, client: &AsyncClient, name: &String, module_name: &String, params: &HashMap) -> Option> { +fn load_single( + log: &Logger, + client: &AsyncClient, + name: &String, + module_name: &String, + params: &HashMap + ) -> Option> +{ use crate::modules::*; match module_name.as_ref() { "Shelly" => { + if !params.is_empty() { + warn!(log, "\"{}\" module expects no parameters. Configured as \"{}\".", + module_name, name); + } Some(Box::new(Shelly::new(log, name.clone(), client.clone()))) + }, + "Dummy" => { + Some(Box::new(Dummy)) } - _ => None, + _ => { + error!(log, "No actor found with name \"{}\", configured as \"{}\".", module_name, name); + None + }, } } diff --git a/src/config.rs b/src/config.rs index 3f3dafc..a18b7d6 100644 --- a/src/config.rs +++ b/src/config.rs @@ -10,6 +10,7 @@ use serde::{Serialize, Deserialize}; use crate::error::Result; use crate::machine::MachineDescription; use crate::db::machine::MachineIdentifier; +use crate::db::access::*; pub fn read(path: &Path) -> Result { serde_dhall::from_file(path) @@ -38,6 +39,9 @@ pub struct Config { pub initiators: HashMap, pub mqtt_url: String, + + pub actor_connections: Box<[(String, String)]>, + pub init_connections: Box<[(String, String)]>, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -48,8 +52,7 @@ pub struct Listen { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ModuleConfig { - pub name: String, - #[serde(skip_serializing_if = "HashMap::is_empty")] + pub module: String, pub params: HashMap } @@ -57,16 +60,28 @@ impl Default for Config { fn default() -> Self { let mut actors: HashMap:: = HashMap::new(); let mut initiators: HashMap:: = HashMap::new(); + let mut machines = HashMap::new(); actors.insert("Actor".to_string(), ModuleConfig { - name: "Shelly".to_string(), + module: "Shelly".to_string(), params: HashMap::new(), }); initiators.insert("Initiator".to_string(), ModuleConfig { - name: "TCP-Listen".to_string(), + module: "TCP-Listen".to_string(), params: HashMap::new(), }); + machines.insert("Testmachine".to_string(), MachineDescription { + name: "Testmachine".to_string(), + description: Some("A test machine".to_string()), + privs: PrivilegesBuf { + disclose: PermissionBuf::from_string("lab.test.read".to_string()), + read: PermissionBuf::from_string("lab.test.read".to_string()), + write: PermissionBuf::from_string("lab.test.write".to_string()), + manage: PermissionBuf::from_string("lab.test.admin".to_string()), + }, + }); + Config { listens: Box::new([ Listen { @@ -74,10 +89,16 @@ impl Default for Config { port: Some(DEFAULT_PORT), } ]), - machines: HashMap::new(), + machines: machines, actors: actors, initiators: initiators, mqtt_url: "tcp://localhost:1883".to_string(), + actor_connections: Box::new([ + ("Testmachine".to_string(), "Actor".to_string()), + ]), + init_connections: Box::new([ + ("Initiator".to_string(), "Testmachine".to_string()), + ]), } } } diff --git a/src/initiator.rs b/src/initiator.rs index 4f3f1bd..580787f 100644 --- a/src/initiator.rs +++ b/src/initiator.rs @@ -2,8 +2,13 @@ use std::pin::Pin; use std::task::{Poll, Context}; use std::future::Future; use std::collections::HashMap; + use smol::{Task, Timer}; +use slog::Logger; + +use paho_mqtt::AsyncClient; + use futures::FutureExt; use futures::future::BoxFuture; @@ -17,6 +22,7 @@ use crate::db::user::{User, UserId, UserData}; use crate::network::InitMap; use crate::error::Result; +use crate::config::Config; pub trait Sensor { fn run_sensor(&mut self) -> BoxFuture<'static, (Option, MachineState)>; @@ -82,14 +88,44 @@ impl Future for Initiator { } } -pub fn load() -> Result<(InitMap, Vec)> { - let d = Box::new(Dummy::new()); - let (m, i) = Initiator::wrap(d); - +pub fn load(log: &Logger, client: &AsyncClient, config: &Config) -> Result<(InitMap, Vec)> { let mut map = HashMap::new(); - map.insert("Dummy".to_string(), m); - Ok((map, vec![i])) + let initiators = config.initiators.iter() + .map(|(k,v)| (k, load_single(log, client, k, &v.module, &v.params))) + .filter_map(|(k,n)| match n { + None => None, + Some(i) => Some((k, i)), + }); + + let mut v = Vec::new(); + for (name, initiator) in initiators { + let (m, i) = Initiator::wrap(initiator); + map.insert(name.clone(), m); + v.push(i); + } + + Ok((map, v)) +} + +fn load_single( + log: &Logger, + client: &AsyncClient, + name: &String, + module_name: &String, + params: &HashMap + ) -> Option +{ + match module_name.as_ref() { + "Dummy" => { + Some(Box::new(Dummy::new())) + }, + _ => { + error!(log, "No initiator found with name \"{}\", configured as \"{}\"", + module_name, name); + None + } + } } pub struct Dummy { diff --git a/src/machine.rs b/src/machine.rs index c21c2d1..9e6a872 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -224,7 +224,7 @@ pub struct MachineDescription { /// The permission required #[serde(flatten)] - privs: access::PrivilegesBuf, + pub privs: access::PrivilegesBuf, } impl MachineDescription { diff --git a/src/main.rs b/src/main.rs index e64665a..926e1be 100644 --- a/src/main.rs +++ b/src/main.rs @@ -139,16 +139,26 @@ fn maybe(matches: clap::ArgMatches, log: Arc) -> Result<(), Error> { let mqtt = AsyncClient::new(config.mqtt_url.clone())?; let tok = mqtt.connect(paho_mqtt::ConnectOptions::new()); - smol::block_on(tok); + smol::block_on(tok)?; let machines = machine::load(&config)?; let (mut actor_map, actors) = actor::load(&log, &mqtt, &config)?; - let (mut init_map, initiators) = initiator::load()?; + let (mut init_map, initiators) = initiator::load(&log, &mqtt, &config)?; - let network = network::Network::new(machines, actor_map, init_map); + // TODO: restore connections between initiators, machines, actors + let mut network = network::Network::new(machines, actor_map, init_map); + for (a,b) in config.actor_connections.iter() { + if let Err(e) = network.connect_actor(a,b) { + error!(log, "{}", e); + } + } - // TODO HERE: restore connections between initiators, machines, actors + for (a,b) in config.init_connections.iter() { + if let Err(e) = network.connect_init(a,b) { + error!(log, "{}", e); + } + } let actor_tasks = actors.into_iter().map(|actor| ex.spawn(actor)); let init_tasks = initiators.into_iter().map(|init| ex.spawn(init)); diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index 574ad9c..c4c6cdc 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -32,6 +32,7 @@ pub struct Shelly { impl Shelly { pub fn new(log_view: &Logger, name: String, client: mqtt::AsyncClient) -> Self { let log = log_view.new(o!("shelly_name" => name.clone())); + debug!(log, "Starting shelly module for {}", &name); Shelly { log, name, client, } }