diff --git a/Cargo.toml b/Cargo.toml index df2a1a4..9878499 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,8 @@ uuid = { version = "0.8", features = ["serde", "v4"] } clap = "2.33" -#rsasl = "0.1" +# TODO update this if bindgen breaks (again) +rsasl = "0.1.2" # rumqtt needs tokio which I'm trying to get away from paho-mqtt = { git = "https://github.com/dequbed/paho.mqtt.rust.git", branch = "master", features = ["build_bindgen"] } @@ -42,5 +43,7 @@ paho-mqtt = { git = "https://github.com/dequbed/paho.mqtt.rust.git", branch = "m libc = "0.2" lmdb-rkv = "0.14" +async-trait = "0.1" + [build-dependencies] capnpc = "0.13" diff --git a/src/config.rs b/src/config.rs index 467c879..dee4bbd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -24,6 +24,7 @@ pub struct Config { pub passdb: PathBuf, pub(crate) access: Access, pub listen: Box<[Listen]>, + pub mqtt_url: String, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -56,6 +57,7 @@ impl Default for Config { address: "::1".to_string(), port: Some(DEFAULT_PORT) }]), + mqtt_url: "127.0.0.1:1883".to_string(), } } } diff --git a/src/error.rs b/src/error.rs index 2c50e68..8bbeb96 100644 --- a/src/error.rs +++ b/src/error.rs @@ -7,6 +7,8 @@ use rsasl::SaslError; // SpawnError is a somewhat ambigous name, `use as` to make it futures::SpawnError instead. use futures::task as futures; +use paho_mqtt::errors as mqtt; + #[derive(Debug)] pub enum Error { TomlDe(toml::de::Error), @@ -19,6 +21,7 @@ pub enum Error { FlexbuffersDe(flexbuffers::DeserializationError), FlexbuffersSer(flexbuffers::SerializationError), FuturesSpawn(futures::SpawnError), + MQTT(mqtt::Error), } impl fmt::Display for Error { @@ -54,6 +57,9 @@ impl fmt::Display for Error { Error::FuturesSpawn(e) => { write!(f, "Future could not be spawned: {}", e) }, + Error::MQTT(e) => { + write!(f, "Paho MQTT encountered an error: {}", e) + }, } } } @@ -118,4 +124,10 @@ impl From for Error { } } +impl From for Error { + fn from(e: mqtt::Error) -> Error { + Error::MQTT(e) + } +} + pub(crate) type Result = std::result::Result; diff --git a/src/main.rs b/src/main.rs index 1378dfa..3939fc0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,9 @@ extern crate slog; #[macro_use] extern crate capnp_rpc; +#[macro_use] +extern crate async_trait; + mod auth; mod access; mod modules; @@ -38,6 +41,8 @@ use lmdb::Transaction; use error::Error; +use registries::Registries; + const LMDB_MAX_DB: u32 = 16; // Returning a `Result` from `main` allows us to use the `?` shorthand. @@ -222,7 +227,8 @@ fn main() -> Result<(), Error> { // FIXME: implement notification so the modules can shut down cleanly instead of being killed // without warning. let modlog = log.clone(); - match modules::init(modlog.new(o!("system" => "modules")), &config, &pool) { + let regs = Registries::new(); + match modules::init(modlog.new(o!("system" => "modules")), &config, &local_spawn, regs) { Ok(()) => {} Err(e) => { error!(modlog, "Module startup failed: {}", e); diff --git a/src/modules.rs b/src/modules.rs index 239efbb..b64f6e0 100644 --- a/src/modules.rs +++ b/src/modules.rs @@ -10,15 +10,16 @@ use slog::Logger; mod shelly; use futures::prelude::*; -use futures::task::Spawn; +use futures::task::LocalSpawn; use crate::config::Config; use crate::error::Result; +use crate::registries::Registries; // spawner is a type that allows 'tasks' to be spawned on it, running them to completion. -pub fn init(log: Logger, config: &Config, spawner: &S) -> Result<()> { - let f = Box::new(shelly::init(log.clone(), config.clone())); - spawner.spawn_obj(f.into())?; +pub fn init(log: Logger, config: &Config, spawner: &S, registries: Registries) -> Result<()> { + let f = Box::new(shelly::run(log.clone(), config.clone(), registries.clone())); + spawner.spawn_local_obj(f.into())?; Ok(()) } diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index 6aad0de..c9ba979 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -1,10 +1,30 @@ use slog::Logger; + use crate::config::Config; +use crate::registries::{Registries, Actuator, ActBox}; +use crate::error::Result; -use crate::registries::Actuator; +use std::pin::Pin; +use futures::prelude::*; +use futures::ready; +use futures::task::{Poll, Context}; -pub async fn init(log: Logger, config: Config) { - info!(log, "This is where shelly support would start up, IF IT WAS IMPLEMENTED"); +use paho_mqtt as mqtt; + +// TODO: Late config parsing. Right now the config is validated at the very startup in its +// entirety. This works reasonably enough for this static modules here but if we do dynamic loading +// via dlopen(), lua API, python API etc it will not. +pub async fn run(log: Logger, config: Config, registries: Registries) { + let shelly_r = Shelly::new(config).await; + if let Err(e) = shelly_r { + error!(log, "Shelly module errored: {}", e); + return; + } + + let r = registries.actuators.register( + "shelly".to_string(), + shelly_r.unwrap() + ).await; } /// An actuator for all Shellies connected listening on one MQTT broker @@ -12,9 +32,31 @@ pub async fn init(log: Logger, config: Config) { /// This actuator can power toggle an arbitrariy named shelly on the broker it is connected to. If /// you need to toggle shellies on multiple brokers you need multiple instanced of this actuator. struct Shelly { - + client: mqtt::AsyncClient, } +impl Shelly { + pub async fn new(config: Config) -> Result { + let client = mqtt::AsyncClient::new(config.mqtt_url)?; + + client.connect(mqtt::ConnectOptions::new()).await?; + + Ok(Box::new(Shelly { client }) as ActBox) + } +} + + +#[async_trait] impl Actuator for Shelly { + async fn power_on(&mut self, name: String) { + let topic = ""; + let msg = mqtt::Message::new(topic, "1", 0); + self.client.publish(msg).map(|_| ()).await + } + async fn power_off(&mut self, name: String) { + let topic = ""; + let msg = mqtt::Message::new(topic, "0", 0); + self.client.publish(msg).map(|_| ()).await + } } diff --git a/src/registries.rs b/src/registries.rs index 6da7754..e5d218c 100644 --- a/src/registries.rs +++ b/src/registries.rs @@ -1,6 +1,6 @@ mod actuators; -pub use actuators::Actuator; +pub use actuators::{Actuator, ActBox}; #[derive(Clone)] /// BFFH registries @@ -8,5 +8,13 @@ pub use actuators::Actuator; /// This struct is only a reference to the underlying registries - cloning it will generate a new /// reference, not clone the registries pub struct Registries { - actuators: actuators::Actuators, + pub actuators: actuators::Actuators, +} + +impl Registries { + pub fn new() -> Self { + Registries { + actuators: actuators::Actuators::new() + } + } } diff --git a/src/registries/actuators.rs b/src/registries/actuators.rs index 5febf12..890fc1e 100644 --- a/src/registries/actuators.rs +++ b/src/registries/actuators.rs @@ -10,10 +10,9 @@ pub struct Actuators { inner: Arc>, } -type ActBox = Box - , PowerOffFut = Future - >>; +unsafe impl Send for Actuators { } + +pub type ActBox = Box; type Inner = HashMap; @@ -32,47 +31,23 @@ impl Actuators { } +#[async_trait] pub trait Actuator { // TODO: Is it smarter to pass a (reference to?) a machine instead of 'name'? Do we need to // pass basically arbitrary parameters to the Actuator? - type PowerOnFut: Future; - fn power_on(&mut self, name: String) -> Self::PowerOnFut; - - type PowerOffFut: Future; - fn power_off(&mut self, name: String) -> Self::PowerOffFut; + async fn power_on(&mut self, name: String); + async fn power_off(&mut self, name: String); } // This is merely a proof that Actuator *can* be implemented on a finite, known type. Yay for type // systems with halting problems. struct Dummy; +#[async_trait] impl Actuator for Dummy { - type PowerOnFut = DummyPowerOnFut; - type PowerOffFut = DummyPowerOffFut; - - fn power_on(&mut self) -> DummyPowerOnFut { - DummyPowerOnFut + async fn power_on(&mut self, _name: String) { + return } - fn power_off(&mut self) -> DummyPowerOffFut { - DummyPowerOffFut - } -} - -use std::pin::Pin; -use futures::task::{Poll, Context}; - -struct DummyPowerOnFut; -impl Future for DummyPowerOnFut { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - Poll::Ready(()) - } -} -struct DummyPowerOffFut; -impl Future for DummyPowerOffFut { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - Poll::Ready(()) + async fn power_off(&mut self, _name: String) { + return } }