From 7a876a538d4ebfada334c2c5be5e64e462f0ca2e Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Thu, 17 Sep 2020 21:12:30 +0200 Subject: [PATCH] Status commit --- src/main.rs | 2 +- src/modules.rs | 5 ++--- src/modules/shelly.rs | 9 +++++---- src/registries/actuators.rs | 14 ++++++++++++++ 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/src/main.rs b/src/main.rs index 2cd09e9..a923881 100644 --- a/src/main.rs +++ b/src/main.rs @@ -229,7 +229,7 @@ fn main() -> Result<(), Error> { // without warning. let modlog = log.clone(); let regs = Registries::new(); - match modules::init(modlog.new(o!("system" => "modules")), &config, &pool, regs) { + match exec.run_until(modules::init(modlog.new(o!("system" => "modules")), config.clone(), pool.clone(), regs)) { Ok(()) => {} Err(e) => { error!(modlog, "Module startup failed: {}", e); diff --git a/src/modules.rs b/src/modules.rs index 8bb06bb..e200129 100644 --- a/src/modules.rs +++ b/src/modules.rs @@ -17,9 +17,8 @@ use crate::error::Result; use crate::registries::Registries; // spawner is a type that allows 'tasks' to be spawned on it, running them to completion. -pub fn init(log: Logger, config: &Settings, spawner: &S, registries: Registries) -> Result<()> { - let f = Box::new(shelly::run(log.clone(), config.clone(), registries.clone())); - spawner.spawn_obj(f.into())?; +pub async fn init(log: Logger, config: Settings, spawner: S, registries: Registries) -> Result<()> { + shelly::run(log.clone(), config.clone(), registries.clone()).await; Ok(()) } diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index ec23895..9abf94a 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -8,7 +8,8 @@ use crate::machine::Status; use std::pin::Pin; use futures::prelude::*; use futures::ready; -use futures::task::{Poll, Context, Waker}; +use futures::task::{Poll, Context, Waker, Spawn}; +use futures::StreamExt; use futures_signals::signal::Signal; use paho_mqtt as mqtt; @@ -19,7 +20,7 @@ use paho_mqtt as mqtt; pub async fn run(log: Logger, config: Settings, registries: Registries) { let shelly = Shelly::new(config).await; - let r = registries.actuators.register("shelly".to_string(), shelly).await; + let r = registries.actuators.register("shelly".to_string(), Box::new(shelly)).await; } /// An actuator for all Shellies connected listening on one MQTT broker @@ -35,7 +36,7 @@ struct Shelly { impl Shelly { // Can't use Error, it's not Send. fabinfra/fabaccess/bffh#7 - pub async fn new(config: Settings) -> ActBox { + pub async fn new(config: Settings) -> Self { let client = mqtt::AsyncClient::new(config.shelly.unwrap().mqtt_url).unwrap(); client.connect(mqtt::ConnectOptions::new()).await.unwrap(); @@ -44,7 +45,7 @@ impl Shelly { let signal: Option = None; let waker = None; - Box::new(Shelly { signal, waker, name, client }) + Shelly { signal, waker, name, client } } } diff --git a/src/registries/actuators.rs b/src/registries/actuators.rs index 15d64fa..1183b16 100644 --- a/src/registries/actuators.rs +++ b/src/registries/actuators.rs @@ -34,6 +34,20 @@ impl Actuators { // TODO: Log an error or something if that name was already taken wlock.insert(name, act); } + + pub async fn run(&mut self) { + let mut wlock = self.inner.write().await; + for (_name, act) in wlock.into_iter() { + + } + } + + pub async fn subscribe(&mut self, name: String, signal: StatusSignal) { + let mut wlock = self.inner.write().await; + if let Some(act) = wlock.get_mut(&name) { + act.subscribe(signal); + } + } } pub type StatusSignal = Pin + Send + Sync>>;