From 0ea9177e1417fe768960ec4279a475bbb32509a6 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Thu, 17 Sep 2020 15:34:35 +0200 Subject: [PATCH] Moves actuators to be coroutines --- src/modules/shelly.rs | 29 ++++++++++++++++++++--- src/registries.rs | 2 +- src/registries/actuators.rs | 46 +++++++++++++++++++++++++++++++++++-- 3 files changed, 71 insertions(+), 6 deletions(-) diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index 2db6be8..7e706f5 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -1,13 +1,14 @@ use slog::Logger; use crate::config::Settings; -use crate::registries::{Registries, Actuator, ActBox}; +use crate::registries::{Registries, Actuator, ActBox, StatusSignal}; use crate::error::Result; use std::pin::Pin; use futures::prelude::*; use futures::ready; use futures::task::{Poll, Context}; +use futures_signals::signal::Signal; use paho_mqtt as mqtt; @@ -24,8 +25,9 @@ pub async fn run(log: Logger, config: Settings, registries: Registries) { /// /// This actuator can power toggle an arbitrariy named shelly on the broker it is connected to. If /// you need to toggle shellies on multiple brokers you need multiple instanced of this actuator. -#[derive(Clone)] struct Shelly { + signal: Option, + name: String, client: mqtt::AsyncClient, } @@ -36,7 +38,10 @@ impl Shelly { client.connect(mqtt::ConnectOptions::new()).await.unwrap(); - Box::new(Shelly { client }) + let name = "test".to_string(); + let signal: Option = None; + + Box::new(Shelly { signal, name, client }) } } @@ -55,3 +60,21 @@ impl Actuator for Shelly { self.client.publish(msg).map(|_| ()).await } } + +impl Stream for Shelly { + type Item = future::BoxFuture<'static, ()>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let unpin = Pin::into_inner(self); + if let Some(ref mut s) = unpin.signal { + let status = ready!(Signal::poll_change(Pin::new(s), cx)); + let topic = format!("shellies/{}/relay/0/command", unpin.name); + let msg = mqtt::Message::new(topic, "on", 0); + let f = unpin.client.publish(msg).map(|_| ()); + + Poll::Ready(Some(Box::pin(f))) + } else { + Poll::Pending + } + } +} diff --git a/src/registries.rs b/src/registries.rs index 069403a..0975bab 100644 --- a/src/registries.rs +++ b/src/registries.rs @@ -1,7 +1,7 @@ mod actuators; mod sensors; -pub use actuators::{Actuator, ActBox}; +pub use actuators::{Actuator, ActBox, StatusSignal}; pub use sensors::{Sensor, SensBox}; #[derive(Clone)] diff --git a/src/registries/actuators.rs b/src/registries/actuators.rs index 3110ada..1e6a6e5 100644 --- a/src/registries/actuators.rs +++ b/src/registries/actuators.rs @@ -1,7 +1,15 @@ +use slog::Logger; + use std::sync::Arc; use smol::lock::RwLock; +use std::pin::Pin; +use futures::ready; use futures::prelude::*; +use futures::task::{Context, Poll}; +use futures_signals::signal::Signal; + +use crate::machine::Status; use std::collections::HashMap; @@ -30,16 +38,26 @@ impl Actuators { #[async_trait] -pub trait Actuator { +pub trait Actuator: Stream> { // TODO: Is it smarter to pass a (reference to?) a machine instead of 'name'? Do we need to // pass basically arbitrary parameters to the Actuator? async fn power_on(&mut self, name: String); async fn power_off(&mut self, name: String); } +pub type StatusSignal = Pin + Send + Sync>>; + +#[async_trait] +pub trait Subscriber { + async fn subscribe(&mut self, signal: StatusSignal); +} + // This is merely a proof that Actuator *can* be implemented on a finite, known type. Yay for type // systems with halting problems. -struct Dummy; +struct Dummy { + log: Logger, + signal: Option +} #[async_trait] impl Actuator for Dummy { async fn power_on(&mut self, _name: String) { @@ -49,3 +67,27 @@ impl Actuator for Dummy { return } } + +#[async_trait] +impl Subscriber for Dummy { + async fn subscribe(&mut self, signal: StatusSignal) { + self.signal.replace(signal); + } +} + +impl Stream for Dummy { + type Item = future::BoxFuture<'static, ()>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let unpin = Pin::into_inner(self); + if let Some(ref mut s) = unpin.signal { + let status = ready!(Signal::poll_change(Pin::new(s), cx)); + + info!(unpin.log, "Dummy actuator would set status to {:?}, but is a Dummy", status); + + Poll::Ready(Some(Box::pin(futures::future::ready(())))) + } else { + Poll::Pending + } + } +}