Moves actuators to be coroutines

This commit is contained in:
Gregor Reitzenstein 2020-09-17 15:34:35 +02:00
parent 7e6748ad15
commit 0ea9177e14
3 changed files with 71 additions and 6 deletions

View File

@ -1,13 +1,14 @@
use slog::Logger; use slog::Logger;
use crate::config::Settings; use crate::config::Settings;
use crate::registries::{Registries, Actuator, ActBox}; use crate::registries::{Registries, Actuator, ActBox, StatusSignal};
use crate::error::Result; use crate::error::Result;
use std::pin::Pin; use std::pin::Pin;
use futures::prelude::*; use futures::prelude::*;
use futures::ready; use futures::ready;
use futures::task::{Poll, Context}; use futures::task::{Poll, Context};
use futures_signals::signal::Signal;
use paho_mqtt as mqtt; use paho_mqtt as mqtt;
@ -24,8 +25,9 @@ pub async fn run(log: Logger, config: Settings, registries: Registries) {
/// ///
/// This actuator can power toggle an arbitrariy named shelly on the broker it is connected to. If /// This actuator can power toggle an arbitrariy named shelly on the broker it is connected to. If
/// you need to toggle shellies on multiple brokers you need multiple instanced of this actuator. /// you need to toggle shellies on multiple brokers you need multiple instanced of this actuator.
#[derive(Clone)]
struct Shelly { struct Shelly {
signal: Option<StatusSignal>,
name: String,
client: mqtt::AsyncClient, client: mqtt::AsyncClient,
} }
@ -36,7 +38,10 @@ impl Shelly {
client.connect(mqtt::ConnectOptions::new()).await.unwrap(); client.connect(mqtt::ConnectOptions::new()).await.unwrap();
Box::new(Shelly { client }) let name = "test".to_string();
let signal: Option<StatusSignal> = None;
Box::new(Shelly { signal, name, client })
} }
} }
@ -55,3 +60,21 @@ impl Actuator for Shelly {
self.client.publish(msg).map(|_| ()).await self.client.publish(msg).map(|_| ()).await
} }
} }
impl Stream for Shelly {
type Item = future::BoxFuture<'static, ()>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let unpin = Pin::into_inner(self);
if let Some(ref mut s) = unpin.signal {
let status = ready!(Signal::poll_change(Pin::new(s), cx));
let topic = format!("shellies/{}/relay/0/command", unpin.name);
let msg = mqtt::Message::new(topic, "on", 0);
let f = unpin.client.publish(msg).map(|_| ());
Poll::Ready(Some(Box::pin(f)))
} else {
Poll::Pending
}
}
}

View File

@ -1,7 +1,7 @@
mod actuators; mod actuators;
mod sensors; mod sensors;
pub use actuators::{Actuator, ActBox}; pub use actuators::{Actuator, ActBox, StatusSignal};
pub use sensors::{Sensor, SensBox}; pub use sensors::{Sensor, SensBox};
#[derive(Clone)] #[derive(Clone)]

View File

@ -1,7 +1,15 @@
use slog::Logger;
use std::sync::Arc; use std::sync::Arc;
use smol::lock::RwLock; use smol::lock::RwLock;
use std::pin::Pin;
use futures::ready;
use futures::prelude::*; use futures::prelude::*;
use futures::task::{Context, Poll};
use futures_signals::signal::Signal;
use crate::machine::Status;
use std::collections::HashMap; use std::collections::HashMap;
@ -30,16 +38,26 @@ impl Actuators {
#[async_trait] #[async_trait]
pub trait Actuator { pub trait Actuator: Stream<Item = future::BoxFuture<'static, ()>> {
// TODO: Is it smarter to pass a (reference to?) a machine instead of 'name'? Do we need to // TODO: Is it smarter to pass a (reference to?) a machine instead of 'name'? Do we need to
// pass basically arbitrary parameters to the Actuator? // pass basically arbitrary parameters to the Actuator?
async fn power_on(&mut self, name: String); async fn power_on(&mut self, name: String);
async fn power_off(&mut self, name: String); async fn power_off(&mut self, name: String);
} }
pub type StatusSignal = Pin<Box<dyn Signal<Item = Status> + Send + Sync>>;
#[async_trait]
pub trait Subscriber {
async fn subscribe(&mut self, signal: StatusSignal);
}
// This is merely a proof that Actuator *can* be implemented on a finite, known type. Yay for type // This is merely a proof that Actuator *can* be implemented on a finite, known type. Yay for type
// systems with halting problems. // systems with halting problems.
struct Dummy; struct Dummy {
log: Logger,
signal: Option<StatusSignal>
}
#[async_trait] #[async_trait]
impl Actuator for Dummy { impl Actuator for Dummy {
async fn power_on(&mut self, _name: String) { async fn power_on(&mut self, _name: String) {
@ -49,3 +67,27 @@ impl Actuator for Dummy {
return return
} }
} }
#[async_trait]
impl Subscriber for Dummy {
async fn subscribe(&mut self, signal: StatusSignal) {
self.signal.replace(signal);
}
}
impl Stream for Dummy {
type Item = future::BoxFuture<'static, ()>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let unpin = Pin::into_inner(self);
if let Some(ref mut s) = unpin.signal {
let status = ready!(Signal::poll_change(Pin::new(s), cx));
info!(unpin.log, "Dummy actuator would set status to {:?}, but is a Dummy", status);
Poll::Ready(Some(Box::pin(futures::future::ready(()))))
} else {
Poll::Pending
}
}
}