From 026aa40019c90d8efe150489633d946787e3f0a1 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Fri, 18 Sep 2020 12:34:18 +0200 Subject: [PATCH] Look ma, an event network! --- src/machine.rs | 10 ++++++-- src/main.rs | 40 +++++++++++++++++++++++------- src/modules.rs | 2 +- src/modules/shelly.rs | 49 +++++++++++++++++++++++++++++++------ src/registries/actuators.rs | 25 ++++++++----------- 5 files changed, 92 insertions(+), 34 deletions(-) diff --git a/src/machine.rs b/src/machine.rs index c60e369..9229e76 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -23,6 +23,8 @@ use smol::channel::{Receiver, Sender}; use futures_signals::signal::*; +use crate::registries::StatusSignal; + pub type ID = Uuid; /// Status of a Machine @@ -138,8 +140,8 @@ impl Machine { /// dedupe ensures that if state is changed but only changes to the value it had beforehand /// (could for example happen if the machine changes current user but stays activated) no /// update is sent. - pub fn signal(&self) -> impl Signal { - self.state.signal().dedupe() + pub fn signal(&self) -> StatusSignal { + Box::pin(self.state.signal().dedupe()) } /// Requests to use a machine. Returns `true` if successful. @@ -159,6 +161,10 @@ impl Machine { return Ok(false); } } + + pub fn set_state(&mut self, state: Status) { + self.state.set(state) + } } pub struct MachineDB { diff --git a/src/main.rs b/src/main.rs index a923881..7e8739e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -197,16 +197,16 @@ fn main() -> Result<(), Error> { } }).collect(); - let (mach, auth) = exec.run_until(async { - // Rull all futures to completion in parallel. - // This will block until all three are done starting up. - join!(machinedb_f, authentication_f) - }); + //let (mach, auth) = exec.run_until(async { + // // Rull all futures to completion in parallel. + // // This will block until all three are done starting up. + // join!(machinedb_f, authentication_f) + //}); // Error out if any of the subsystems failed to start. - let mach = mach?; + //let mach = mach?; let pdb = pdb?; - let auth = auth?; + //let auth = auth?; // Since the below closures will happen at a much later time we need to make sure all pointers // are still valid. Thus, Arc. @@ -228,8 +228,8 @@ fn main() -> Result<(), Error> { // FIXME: implement notification so the modules can shut down cleanly instead of being killed // without warning. let modlog = log.clone(); - let regs = Registries::new(); - match exec.run_until(modules::init(modlog.new(o!("system" => "modules")), config.clone(), pool.clone(), regs)) { + let mut regs = Registries::new(); + match exec.run_until(modules::init(modlog.new(o!("system" => "modules")), config.clone(), pool.clone(), regs.clone())) { Ok(()) => {} Err(e) => { error!(modlog, "Module startup failed: {}", e); @@ -237,6 +237,28 @@ fn main() -> Result<(), Error> { } } + use uuid::Uuid; + use machine::{Status, Machine}; + let mut machine = Machine::new(Uuid::new_v4(), "Testmachine".to_string(), 0); + let f = regs.actuators.subscribe("shelly".to_string(), machine.signal()); + exec.run_until(f); + + let postlog = log.clone(); + use std::thread; + use std::time::Duration; + thread::spawn(move || { + info!(postlog, "Zzz"); + thread::sleep(Duration::from_millis(1000)); + machine.set_state(Status::Occupied); + info!(postlog, "Beep"); + thread::sleep(Duration::from_millis(2000)); + machine.set_state(Status::Blocked); + info!(postlog, "Bap"); + thread::sleep(Duration::from_millis(3000)); + machine.set_state(Status::Free); + info!(postlog, "Boop"); + }); + // Closure inefficiencies. Lucky cloning an Arc is pretty cheap. let inner_log = log.clone(); let loop_log = log.clone(); diff --git a/src/modules.rs b/src/modules.rs index e200129..9f8a21e 100644 --- a/src/modules.rs +++ b/src/modules.rs @@ -18,7 +18,7 @@ use crate::registries::Registries; // spawner is a type that allows 'tasks' to be spawned on it, running them to completion. pub async fn init(log: Logger, config: Settings, spawner: S, registries: Registries) -> Result<()> { - shelly::run(log.clone(), config.clone(), registries.clone()).await; + shelly::run(log.clone(), config.clone(), registries.clone(), spawner.clone()).await; Ok(()) } diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index 9abf94a..91ebee6 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -7,8 +7,9 @@ use crate::machine::Status; use std::pin::Pin; use futures::prelude::*; +use futures::channel::mpsc; use futures::ready; -use futures::task::{Poll, Context, Waker, Spawn}; +use futures::task::{Poll, Context, Waker, Spawn, FutureObj}; use futures::StreamExt; use futures_signals::signal::Signal; @@ -17,10 +18,15 @@ use paho_mqtt as mqtt; // TODO: Late config parsing. Right now the config is validated at the very startup in its // entirety. This works reasonably enough for this static modules here but if we do dynamic loading // via dlopen(), lua API, python API etc it will not. -pub async fn run(log: Logger, config: Settings, registries: Registries) { - let shelly = Shelly::new(config).await; +pub async fn run(log: Logger, config: Settings, registries: Registries, spawner: S) { + let (tx, rx) = mpsc::channel(1); + let mut shelly = Shelly::new(log, config, rx).await; + + let r = registries.actuators.register("shelly".to_string(), tx).await; + + let f = shelly.for_each(|f| f); + spawner.spawn_obj(FutureObj::from(Box::pin(f))); - let r = registries.actuators.register("shelly".to_string(), Box::new(shelly)).await; } /// An actuator for all Shellies connected listening on one MQTT broker @@ -28,6 +34,8 @@ pub async fn run(log: Logger, config: Settings, registries: Registries) { /// This actuator can power toggle an arbitrariy named shelly on the broker it is connected to. If /// you need to toggle shellies on multiple brokers you need multiple instanced of this actuator. struct Shelly { + log: Logger, + sigchan: mpsc::Receiver, signal: Option, waker: Option, name: String, @@ -36,16 +44,17 @@ struct Shelly { impl Shelly { // Can't use Error, it's not Send. fabinfra/fabaccess/bffh#7 - pub async fn new(config: Settings) -> Self { + pub async fn new(log: Logger, config: Settings, sigchan: mpsc::Receiver) -> Self { let client = mqtt::AsyncClient::new(config.shelly.unwrap().mqtt_url).unwrap(); - client.connect(mqtt::ConnectOptions::new()).await.unwrap(); + let o = client.connect(mqtt::ConnectOptions::new()).await.unwrap(); + println!("{:?}", o); let name = "test".to_string(); let signal: Option = None; let waker = None; - Shelly { signal, waker, name, client } + Shelly { log, sigchan, signal, waker, name, client } } } @@ -64,8 +73,33 @@ impl Stream for Shelly { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let unpin = Pin::into_inner(self); + + info!(unpin.log, "tick {}", unpin.signal.is_some()); + + if let Poll::Ready(v) = Stream::poll_next(Pin::new(&mut unpin.sigchan), cx) { + if let Some(s) = v { + // We have received a new signal to use + unpin.signal.replace(s); + // We use `if let` instead of .and_then because we want the waker to be dropped + // afterwards. It's only there to ensure the future is called when a signal is + // installed the first time + // TODO probably don't need that here because we're polling it either way directly + // afterwards, eh? + if let Some(waker) = unpin.waker.take() { + waker.wake(); + } + } else { + info!(unpin.log, "bye"); + // This means that the sending end was dropped, so we shut down + unpin.signal.take(); + unpin.waker.take(); + return Poll::Ready(None); + } + } + if let Some(ref mut s) = unpin.signal { if let Some(status) = ready!(Signal::poll_change(Pin::new(s), cx)) { + info!(unpin.log, "Machine Status changed: {:?}", status); let topic = format!("shellies/{}/relay/0/command", unpin.name); let pl = match status { Status::Free | Status::Blocked => "off", @@ -77,6 +111,7 @@ impl Stream for Shelly { return Poll::Ready(Some(Box::pin(f))); } } else { + info!(unpin.log, "I ain't got no signal son"); unpin.waker.replace(cx.waker().clone()); } diff --git a/src/registries/actuators.rs b/src/registries/actuators.rs index 1183b16..2ffa81f 100644 --- a/src/registries/actuators.rs +++ b/src/registries/actuators.rs @@ -6,7 +6,8 @@ use smol::lock::RwLock; use std::pin::Pin; use futures::ready; use futures::prelude::*; -use futures::task::{Context, Poll}; +use futures::channel::mpsc; +use futures::task::{Context, Poll, Spawn}; use futures_signals::signal::Signal; use crate::machine::Status; @@ -18,9 +19,9 @@ pub struct Actuators { inner: Arc>, } -pub type ActBox = Box; +pub type ActBox = Box; -type Inner = HashMap; +type Inner = HashMap>; impl Actuators { pub fn new() -> Self { @@ -29,23 +30,16 @@ impl Actuators { } } - pub async fn register(&self, name: String, act: ActBox) { + pub async fn register(&self, name: String, tx: mpsc::Sender) { let mut wlock = self.inner.write().await; // TODO: Log an error or something if that name was already taken - wlock.insert(name, act); - } - - pub async fn run(&mut self) { - let mut wlock = self.inner.write().await; - for (_name, act) in wlock.into_iter() { - - } + wlock.insert(name, tx); } pub async fn subscribe(&mut self, name: String, signal: StatusSignal) { let mut wlock = self.inner.write().await; - if let Some(act) = wlock.get_mut(&name) { - act.subscribe(signal); + if let Some(tx) = wlock.get_mut(&name) { + tx.send(signal).await; } } } @@ -60,7 +54,8 @@ pub trait Actuator: Stream> { // systems with halting problems. struct Dummy { log: Logger, - signal: Option + sigchan: mpsc::Receiver, + signal: Option, } impl Actuator for Dummy {