From f4148d398f87732f15ce3c6fff7cf18bb428a1c4 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Wed, 2 Dec 2020 11:31:17 +0100 Subject: [PATCH] Sync Actor works now --- src/actor.rs | 120 ++++++++++++++++-------------------- src/db/machine.rs | 2 - src/main.rs | 27 ++++---- src/modules/shelly.rs | 97 +---------------------------- src/registries.rs | 8 +-- src/registries/actuators.rs | 70 ++------------------- 6 files changed, 78 insertions(+), 246 deletions(-) diff --git a/src/actor.rs b/src/actor.rs index 4d22ded..8a042e8 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -10,7 +10,7 @@ use futures::channel::mpsc; use futures_signals::signal::{Signal, MutableSignalCloned, MutableSignal, Mutable}; use crate::db::machine::MachineState; -use crate::registries::Actuator; +use crate::registries::actuators::Actuator; use crate::config::Settings; use crate::error::Result; @@ -22,88 +22,74 @@ pub struct Actor { // of a task context. In short, using Mutable isn't possible and we would have to write our own // implementation of MutableSignal*'s . Preferably with the correct optimizations for our case // where there is only one consumer. So a mpsc channel that drops all but the last input. - rx: mpsc::Receiver> - inner: S, + rx: mpsc::Receiver>, + inner: Option, + + actuator: Box, + future: Option + Unpin + Send>>, } -pub fn load() { - let s = Mutable::new(MachineState::new()); +impl Actor { + pub fn new(rx: mpsc::Receiver>, actuator: Box) -> Self { + Self { + rx: rx, + inner: None, + actuator: actuator, + future: None, + } + } - Ok(()) + pub fn wrap(actuator: Box) -> (mpsc::Sender>, Self) { + let (tx, rx) = mpsc::channel(1); + (tx, Self::new(rx, actuator)) + } } -#[must_use = "Signals do nothing unless polled"] -pub struct MaybeFlatten { - signal: Option, - inner: Option, -} +impl + Unpin> Future for Actor { + type Output = (); -// Poll parent => Has inner => Poll inner => Output -// -------------------------------------------------------- -// Some(Some(inner)) => => Some(value) => Some(value) -// Some(Some(inner)) => => => Pending -// Some(None) => => => Pending -// None => Some(inner) => Some(value) => Some(value) -// None => Some(inner) => None => None -// None => Some(inner) => Pending => Pending -// None => None => => None -// Pending => Some(inner) => Some(value) => Some(value) -// Pending => Some(inner) => None => Pending -// Pending => Some(inner) => Pending => Pending -// Pending => None => => Pending -impl Signal for MaybeFlatten - where A: Signal> + Unpin, - B: Signal + Unpin, -{ - type Item = B::Item; - - #[inline] - fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { let mut this = &mut *self; + let mut done = false; // Is the channel with new state-signals exhausted? - let done = match this.signal.as_mut().map(|signal| Signal::poll_change(Pin::new(signal), cx)) { - None => true, - Some(Poll::Ready(None)) => { - this.signal = None; - true - }, - Some(Poll::Ready(Some(new_inner))) => { - this.inner = new_inner; - false - }, - Some(Poll::Pending) => false, - }; + // Update the signal we're polling from, if there is an update that is. + match Stream::poll_next(Pin::new(&mut this.rx), cx) { + Poll::Ready(None) => done = true, + Poll::Ready(Some(new_signal)) => this.inner = new_signal, + Poll::Pending => { }, + } + // Poll the `apply` future. + match this.future.as_mut().map(|future| Future::poll(Pin::new(future), cx)) { + None => { } + Some(Poll::Ready(_)) => this.future = None, + Some(Poll::Pending) => return Poll::Pending, + } + + // Poll the signal and apply all changes that happen to the inner Actuator match this.inner.as_mut().map(|inner| Signal::poll_change(Pin::new(inner), cx)) { + None => Poll::Pending, + Some(Poll::Pending) => Poll::Pending, Some(Poll::Ready(None)) => { this.inner = None; - }, - Some(poll) => { - return poll; - }, - None => {}, - } - if done { - Poll::Ready(None) - - } else { - Poll::Pending + if done { + Poll::Ready(()) + } else { + Poll::Pending + } + }, + Some(Poll::Ready(Some(state))) => { + this.actuator.apply(state); + Poll::Pending + } } } } -#[cfg(test)] -mod tests { - use super::*; - use futures_test::*; - use futures_signals::signal::Signal; +pub fn load + Unpin>() -> Result<(mpsc::Sender>, Actor)> { + let d = Box::new(crate::registries::actuators::Dummy); + let a = Actor::wrap(d); - #[test] - fn load_test() { - let (a, s, m) = super::load().unwrap(); - - let cx = task::panic_context(); - a.signal.poll_change(&mut cx); - } + Ok(a) } diff --git a/src/db/machine.rs b/src/db/machine.rs index 61aabbb..4c47e54 100644 --- a/src/db/machine.rs +++ b/src/db/machine.rs @@ -27,8 +27,6 @@ use smol::channel::{Receiver, Sender}; use futures::{Future, Stream, StreamExt}; use futures_signals::signal::*; -use crate::registries::StatusSignal; - use crate::machine::MachineDescription; use crate::db::user::UserId; diff --git a/src/main.rs b/src/main.rs index e2ea63b..68e5c8d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -135,27 +135,32 @@ fn maybe(matches: clap::ArgMatches, log: Arc) -> Result<(), Error> { error!(log, "Loading is currently not implemented"); Ok(()) } else { - let db = db::Databases::new(&log, &config)?; - - // TODO: Spawn api connections on their own (non-main) thread, use the main thread to - // handle signals (a cli if stdin is not closed?) and make it stop and clean up all threads - // when bffh should exit - let machines = machine::load(&config)?; - let actors = actor::load()?; - let initiators = initiator::load(&config)?; + //let machines = machine::load(&config)?; + //let initiators = initiator::load(&config)?; let ex = Executor::new(); - for i in initiators.into_iter() { - ex.spawn(i.run()); - } + let m = futures_signals::signal::Mutable::new(crate::db::machine::MachineState::new()); + let (mut tx, actor) = actor::load()?; // TODO HERE: restore connections between initiators, machines, actors + // Like so + tx.try_send(Some(m.signal_cloned())).unwrap(); + + // TODO HERE: Spawn all actors & inits + + // Like so + ex.spawn(actor); + let (signal, shutdown) = async_channel::bounded::<()>(1); easy_parallel::Parallel::new() .each(0..4, |_| smol::block_on(ex.run(shutdown.recv()))); + let db = db::Databases::new(&log, &config)?; + // TODO: Spawn api connections on their own (non-main) thread, use the main thread to + // handle signals (a cli if stdin is not closed?) and make it stop and clean up all threads + // when bffh should exit server::serve_api_connections(log.clone(), config, db) // Signal is dropped here, stopping all executor threads as well. } diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index 47d0224..c20cf1c 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -1,9 +1,9 @@ use slog::Logger; use crate::config::Settings; -use crate::registries::{Registries, Actuator, ActBox, StatusSignal}; use crate::error::Result; use crate::db::machine::Status; +use crate::registries::Registries; use std::pin::Pin; use futures::prelude::*; @@ -19,100 +19,5 @@ use paho_mqtt as mqtt; // entirety. This works reasonably enough for this static modules here but if we do dynamic loading // via dlopen(), lua API, python API etc it will not. pub async fn run(log: Logger, config: Settings, registries: Registries, spawner: S) { - let rx = registries.actuators.register("shelly".to_string()).await; - let mut shelly = Shelly::new(log, config, rx).await; - - let f = shelly.for_each(|f| f); - spawner.spawn_obj(FutureObj::from(Box::pin(f))); - } -/// An actuator for all Shellies connected listening on one MQTT broker -/// -/// This actuator can power toggle an arbitrariy named shelly on the broker it is connected to. If -/// you need to toggle shellies on multiple brokers you need multiple instanced of this actuator. -struct Shelly { - log: Logger, - sigchan: mpsc::Receiver, - signal: Option, - waker: Option, - name: String, - client: mqtt::AsyncClient, -} - -impl Shelly { - // Can't use Error, it's not Send. fabinfra/fabaccess/bffh#7 - pub async fn new(log: Logger, config: Settings, sigchan: mpsc::Receiver) -> Self { - let client = mqtt::AsyncClient::new(config.shelly.unwrap().mqtt_url).unwrap(); - - let o = client.connect(mqtt::ConnectOptions::new()).await.unwrap(); - println!("{:?}", o); - - let name = "test".to_string(); - let signal: Option = None; - let waker = None; - - Shelly { log, sigchan, signal, waker, name, client } - } -} - - -impl Actuator for Shelly { - fn subscribe(&mut self, signal: StatusSignal) { - self.signal.replace(signal); - if let Some(waker) = self.waker.take() { - waker.wake(); - } - } -} - -impl Stream for Shelly { - type Item = future::BoxFuture<'static, ()>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let unpin = Pin::into_inner(self); - - info!(unpin.log, "tick {}", unpin.signal.is_some()); - - if let Poll::Ready(v) = Stream::poll_next(Pin::new(&mut unpin.sigchan), cx) { - if let Some(s) = v { - // We have received a new signal to use - unpin.signal.replace(s); - // We use `if let` instead of .and_then because we want the waker to be dropped - // afterwards. It's only there to ensure the future is called when a signal is - // installed the first time - // TODO probably don't need that here because we're polling it either way directly - // afterwards, eh? - if let Some(waker) = unpin.waker.take() { - waker.wake(); - } - } else { - info!(unpin.log, "bye"); - // This means that the sending end was dropped, so we shut down - unpin.signal.take(); - unpin.waker.take(); - return Poll::Ready(None); - } - } - - if let Some(ref mut s) = unpin.signal { - if let Some(status) = ready!(Signal::poll_change(Pin::new(s), cx)) { - info!(unpin.log, "Machine Status changed: {:?}", status); - let topic = format!("shellies/{}/relay/0/command", unpin.name); - let pl = match status { - Status::InUse(_, _) => "on", - _ => "off", - }; - let msg = mqtt::Message::new(topic, pl, 0); - let f = unpin.client.publish(msg).map(|_| ()); - - return Poll::Ready(Some(Box::pin(f))); - } - } else { - info!(unpin.log, "I ain't got no signal son"); - unpin.waker.replace(cx.waker().clone()); - } - - Poll::Pending - } -} diff --git a/src/registries.rs b/src/registries.rs index 6b89c3b..ad82913 100644 --- a/src/registries.rs +++ b/src/registries.rs @@ -1,9 +1,7 @@ use std::sync::Arc; -mod actuators; -mod sensors; - -pub use actuators::{Actuator, ActBox, StatusSignal}; +pub mod actuators; +pub mod sensors; #[derive(Clone)] /// BFFH registries @@ -11,14 +9,12 @@ pub use actuators::{Actuator, ActBox, StatusSignal}; /// This struct is only a reference to the underlying registries - cloning it will generate a new /// reference, not clone the registries pub struct Registries { - pub actuators: actuators::Actuators, pub sensors: sensors::Sensors, } impl Registries { pub fn new() -> Self { Registries { - actuators: actuators::Actuators::new(), sensors: sensors::Sensors::new(), } } diff --git a/src/registries/actuators.rs b/src/registries/actuators.rs index b23049f..fc84f0f 100644 --- a/src/registries/actuators.rs +++ b/src/registries/actuators.rs @@ -10,76 +10,18 @@ use futures::channel::mpsc; use futures::task::{Context, Poll, Spawn}; use futures_signals::signal::Signal; -use crate::db::machine::Status; +use crate::db::machine::MachineState; use std::collections::HashMap; -#[derive(Clone)] -pub struct Actuators { - inner: Arc>, +pub trait Actuator { + fn apply(&mut self, state: MachineState); } -pub type ActBox = Box; - -type Inner = HashMap>; - -impl Actuators { - pub fn new() -> Self { - Actuators { - inner: Arc::new(RwLock::new(Inner::new())) - } - } - - pub async fn register(&self, name: String) -> mpsc::Receiver { - let (tx, rx) = mpsc::channel(1); - let mut wlock = self.inner.write().await; - // TODO: Log an error or something if that name was already taken - wlock.insert(name, tx); - - return rx; - } - - pub async fn subscribe(&mut self, name: String, signal: StatusSignal) { - let mut wlock = self.inner.write().await; - if let Some(tx) = wlock.get_mut(&name) { - tx.send(signal).await; - } - } -} - -pub type StatusSignal = Pin + Send + Sync>>; - -pub trait Actuator: Stream> { - fn subscribe(&mut self, signal: StatusSignal); -} - -// This is merely a proof that Actuator *can* be implemented on a finite, known type. Yay for type -// systems with halting problems. -struct Dummy { - log: Logger, - sigchan: mpsc::Receiver, - signal: Option, -} +pub struct Dummy; impl Actuator for Dummy { - fn subscribe(&mut self, signal: StatusSignal) { - self.signal.replace(signal); - } -} - -impl Stream for Dummy { - type Item = future::BoxFuture<'static, ()>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let unpin = Pin::into_inner(self); - if let Some(ref mut s) = unpin.signal { - let status = ready!(Signal::poll_change(Pin::new(s), cx)); - - info!(unpin.log, "Dummy actuator would set status to {:?}, but is a Dummy", status); - - Poll::Ready(Some(Box::pin(futures::future::ready(())))) - } else { - Poll::Pending - } + fn apply(&mut self, state: MachineState) { + println!("New state for dummy actuator: {:?}", state); } }