Status commit

This commit is contained in:
Gregor Reitzenstein 2020-09-17 21:12:30 +02:00
parent e0c1ce868d
commit 7a876a538d
4 changed files with 22 additions and 8 deletions

View File

@ -229,7 +229,7 @@ fn main() -> Result<(), Error> {
// without warning. // without warning.
let modlog = log.clone(); let modlog = log.clone();
let regs = Registries::new(); let regs = Registries::new();
match modules::init(modlog.new(o!("system" => "modules")), &config, &pool, regs) { match exec.run_until(modules::init(modlog.new(o!("system" => "modules")), config.clone(), pool.clone(), regs)) {
Ok(()) => {} Ok(()) => {}
Err(e) => { Err(e) => {
error!(modlog, "Module startup failed: {}", e); error!(modlog, "Module startup failed: {}", e);

View File

@ -17,9 +17,8 @@ use crate::error::Result;
use crate::registries::Registries; use crate::registries::Registries;
// spawner is a type that allows 'tasks' to be spawned on it, running them to completion. // spawner is a type that allows 'tasks' to be spawned on it, running them to completion.
pub fn init<S: Spawn>(log: Logger, config: &Settings, spawner: &S, registries: Registries) -> Result<()> { pub async fn init<S: Spawn + Clone + Send>(log: Logger, config: Settings, spawner: S, registries: Registries) -> Result<()> {
let f = Box::new(shelly::run(log.clone(), config.clone(), registries.clone())); shelly::run(log.clone(), config.clone(), registries.clone()).await;
spawner.spawn_obj(f.into())?;
Ok(()) Ok(())
} }

View File

@ -8,7 +8,8 @@ use crate::machine::Status;
use std::pin::Pin; use std::pin::Pin;
use futures::prelude::*; use futures::prelude::*;
use futures::ready; use futures::ready;
use futures::task::{Poll, Context, Waker}; use futures::task::{Poll, Context, Waker, Spawn};
use futures::StreamExt;
use futures_signals::signal::Signal; use futures_signals::signal::Signal;
use paho_mqtt as mqtt; use paho_mqtt as mqtt;
@ -19,7 +20,7 @@ use paho_mqtt as mqtt;
pub async fn run(log: Logger, config: Settings, registries: Registries) { pub async fn run(log: Logger, config: Settings, registries: Registries) {
let shelly = Shelly::new(config).await; let shelly = Shelly::new(config).await;
let r = registries.actuators.register("shelly".to_string(), shelly).await; let r = registries.actuators.register("shelly".to_string(), Box::new(shelly)).await;
} }
/// An actuator for all Shellies connected listening on one MQTT broker /// An actuator for all Shellies connected listening on one MQTT broker
@ -35,7 +36,7 @@ struct Shelly {
impl Shelly { impl Shelly {
// Can't use Error, it's not Send. fabinfra/fabaccess/bffh#7 // Can't use Error, it's not Send. fabinfra/fabaccess/bffh#7
pub async fn new(config: Settings) -> ActBox { pub async fn new(config: Settings) -> Self {
let client = mqtt::AsyncClient::new(config.shelly.unwrap().mqtt_url).unwrap(); let client = mqtt::AsyncClient::new(config.shelly.unwrap().mqtt_url).unwrap();
client.connect(mqtt::ConnectOptions::new()).await.unwrap(); client.connect(mqtt::ConnectOptions::new()).await.unwrap();
@ -44,7 +45,7 @@ impl Shelly {
let signal: Option<StatusSignal> = None; let signal: Option<StatusSignal> = None;
let waker = None; let waker = None;
Box::new(Shelly { signal, waker, name, client }) Shelly { signal, waker, name, client }
} }
} }

View File

@ -34,6 +34,20 @@ impl Actuators {
// TODO: Log an error or something if that name was already taken // TODO: Log an error or something if that name was already taken
wlock.insert(name, act); wlock.insert(name, act);
} }
pub async fn run<S: Spawn>(&mut self) {
let mut wlock = self.inner.write().await;
for (_name, act) in wlock.into_iter() {
}
}
pub async fn subscribe(&mut self, name: String, signal: StatusSignal) {
let mut wlock = self.inner.write().await;
if let Some(act) = wlock.get_mut(&name) {
act.subscribe(signal);
}
}
} }
pub type StatusSignal = Pin<Box<dyn Signal<Item = Status> + Send + Sync>>; pub type StatusSignal = Pin<Box<dyn Signal<Item = Status> + Send + Sync>>;