From 73c24456d4354e54328291b92208c25a42cd4b3d Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Mon, 14 Sep 2020 10:37:51 +0200 Subject: [PATCH] Registries draft --- Cargo.toml | 12 ++++-- src/error.rs | 15 ++++++- src/main.rs | 12 ++++++ src/modules.rs | 20 +++++++--- src/modules/mqtt.rs | 9 ----- src/modules/shelly.rs | 20 ++++++++++ src/registries.rs | 12 ++++++ src/registries/actuators.rs | 78 +++++++++++++++++++++++++++++++++++++ 8 files changed, 159 insertions(+), 19 deletions(-) delete mode 100644 src/modules/mqtt.rs create mode 100644 src/modules/shelly.rs create mode 100644 src/registries.rs create mode 100644 src/registries/actuators.rs diff --git a/Cargo.toml b/Cargo.toml index 66cc378..5327abe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,9 +8,6 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -# TODO: reduce the feature groups for faster compilation -#tokio = { version = "0.2", features = ["full"] } - futures = { version = "0.3", features = ["thread-pool", "compat"] } futures-util = "0.3" futures-signals = "0.3" @@ -29,6 +26,7 @@ capnp-futures = "0.13" toml = "0.5" serde = { version = "1.0", features = ["derive"] } +flexbuffers = "0.1" uuid = { version = "0.8", features = ["serde", "v4"] } @@ -36,5 +34,13 @@ clap = "2.33" rsasl = "0.1" +# rumqtt needs tokio which I'm trying to get away from +paho-mqtt = "0.7.1" + +mlua = { version = "0.4", features = ["async", "luajit"] } + +libc = "0.2" +lmdb-rkv = "0.14" + [build-dependencies] capnpc = "0.13" diff --git a/src/error.rs b/src/error.rs index e7c0dcd..2c50e68 100644 --- a/src/error.rs +++ b/src/error.rs @@ -4,6 +4,9 @@ use toml; use rsasl::SaslError; +// SpawnError is a somewhat ambigous name, `use as` to make it futures::SpawnError instead. +use futures::task as futures; + #[derive(Debug)] pub enum Error { TomlDe(toml::de::Error), @@ -15,6 +18,7 @@ pub enum Error { LMDB(lmdb::Error), FlexbuffersDe(flexbuffers::DeserializationError), FlexbuffersSer(flexbuffers::SerializationError), + FuturesSpawn(futures::SpawnError), } impl fmt::Display for Error { @@ -47,6 +51,9 @@ impl fmt::Display for Error { Error::FlexbuffersSer(e) => { write!(f, "Flexbuffers encoding error: {}", e) }, + Error::FuturesSpawn(e) => { + write!(f, "Future could not be spawned: {}", e) + }, } } } @@ -105,4 +112,10 @@ impl From for Error { } } -pub type Result = std::result::Result; +impl From for Error { + fn from(e: futures::SpawnError) -> Error { + Error::FuturesSpawn(e) + } +} + +pub(crate) type Result = std::result::Result; diff --git a/src/main.rs b/src/main.rs index 56b19a5..1378dfa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,6 +13,7 @@ mod config; mod error; mod machine; mod connection; +mod registries; use signal_hook::iterator::Signals; @@ -217,6 +218,17 @@ fn main() -> Result<(), Error> { .create()?; let local_spawn = exec.spawner(); + // Start all modules on the threadpool. The pool will run the modules until it is dropped. + // FIXME: implement notification so the modules can shut down cleanly instead of being killed + // without warning. + let modlog = log.clone(); + match modules::init(modlog.new(o!("system" => "modules")), &config, &pool) { + Ok(()) => {} + Err(e) => { + error!(modlog, "Module startup failed: {}", e); + return Err(e); + } + } // Closure inefficiencies. Lucky cloning an Arc is pretty cheap. let inner_log = log.clone(); diff --git a/src/modules.rs b/src/modules.rs index 09562d9..239efbb 100644 --- a/src/modules.rs +++ b/src/modules.rs @@ -5,12 +5,20 @@ //! Additionally, FFI modules to other languages (Python/Lua/...) make the most sense in here as //! well. -mod mqtt; - use slog::Logger; -pub fn init(log: Logger) { - info!(log, "Initializing submodules"); - mqtt::init(log.new(o!())); - info!(log, "Finished initializing submodules"); +mod shelly; + +use futures::prelude::*; +use futures::task::Spawn; + +use crate::config::Config; +use crate::error::Result; + +// spawner is a type that allows 'tasks' to be spawned on it, running them to completion. +pub fn init(log: Logger, config: &Config, spawner: &S) -> Result<()> { + let f = Box::new(shelly::init(log.clone(), config.clone())); + spawner.spawn_obj(f.into())?; + + Ok(()) } diff --git a/src/modules/mqtt.rs b/src/modules/mqtt.rs deleted file mode 100644 index 17cf8e5..0000000 --- a/src/modules/mqtt.rs +++ /dev/null @@ -1,9 +0,0 @@ -//! Mock impl of MQTT as transport. -//! -//! Specific Protocol implementations (Sonoff/Card2Go/...) would be located here - -use slog::Logger; - -pub fn init(log: Logger) { - info!(log, "MQTT Module initialized.") -} diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs new file mode 100644 index 0000000..6aad0de --- /dev/null +++ b/src/modules/shelly.rs @@ -0,0 +1,20 @@ +use slog::Logger; +use crate::config::Config; + +use crate::registries::Actuator; + +pub async fn init(log: Logger, config: Config) { + info!(log, "This is where shelly support would start up, IF IT WAS IMPLEMENTED"); +} + +/// An actuator for all Shellies connected listening on one MQTT broker +/// +/// This actuator can power toggle an arbitrariy named shelly on the broker it is connected to. If +/// you need to toggle shellies on multiple brokers you need multiple instanced of this actuator. +struct Shelly { + +} + +impl Actuator for Shelly { + +} diff --git a/src/registries.rs b/src/registries.rs new file mode 100644 index 0000000..6da7754 --- /dev/null +++ b/src/registries.rs @@ -0,0 +1,12 @@ +mod actuators; + +pub use actuators::Actuator; + +#[derive(Clone)] +/// BFFH registries +/// +/// This struct is only a reference to the underlying registries - cloning it will generate a new +/// reference, not clone the registries +pub struct Registries { + actuators: actuators::Actuators, +} diff --git a/src/registries/actuators.rs b/src/registries/actuators.rs new file mode 100644 index 0000000..5febf12 --- /dev/null +++ b/src/registries/actuators.rs @@ -0,0 +1,78 @@ +use std::sync::Arc; +use smol::lock::RwLock; + +use futures::prelude::*; + +use std::collections::HashMap; + +#[derive(Clone)] +pub struct Actuators { + inner: Arc>, +} + +type ActBox = Box + , PowerOffFut = Future + >>; + +type Inner = HashMap; + +impl Actuators { + pub fn new() -> Self { + Actuators { + inner: Arc::new(RwLock::new(Inner::new())) + } + } + + pub async fn register(&self, name: String, act: ActBox) { + let mut wlock = self.inner.write().await; + // TODO: Log an error or something if that name was already taken + wlock.insert(name, act); + } +} + + +pub trait Actuator { + // TODO: Is it smarter to pass a (reference to?) a machine instead of 'name'? Do we need to + // pass basically arbitrary parameters to the Actuator? + type PowerOnFut: Future; + fn power_on(&mut self, name: String) -> Self::PowerOnFut; + + type PowerOffFut: Future; + fn power_off(&mut self, name: String) -> Self::PowerOffFut; +} + +// This is merely a proof that Actuator *can* be implemented on a finite, known type. Yay for type +// systems with halting problems. +struct Dummy; +impl Actuator for Dummy { + type PowerOnFut = DummyPowerOnFut; + type PowerOffFut = DummyPowerOffFut; + + fn power_on(&mut self) -> DummyPowerOnFut { + DummyPowerOnFut + } + fn power_off(&mut self) -> DummyPowerOffFut { + DummyPowerOffFut + } +} + +use std::pin::Pin; +use futures::task::{Poll, Context}; + +struct DummyPowerOnFut; +impl Future for DummyPowerOnFut { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + Poll::Ready(()) + } +} +struct DummyPowerOffFut; +impl Future for DummyPowerOffFut { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + Poll::Ready(()) + } +}