Sync Actor works now

This commit is contained in:
Gregor Reitzenstein 2020-12-02 11:31:17 +01:00
parent 6cf4b1d078
commit f4148d398f
6 changed files with 78 additions and 246 deletions

View File

@ -10,7 +10,7 @@ use futures::channel::mpsc;
use futures_signals::signal::{Signal, MutableSignalCloned, MutableSignal, Mutable};
use crate::db::machine::MachineState;
use crate::registries::Actuator;
use crate::registries::actuators::Actuator;
use crate::config::Settings;
use crate::error::Result;
@ -22,88 +22,74 @@ pub struct Actor<S: Signal> {
// of a task context. In short, using Mutable isn't possible and we would have to write our own
// implementation of MutableSignal*'s . Preferably with the correct optimizations for our case
// where there is only one consumer. So a mpsc channel that drops all but the last input.
rx: mpsc::Receiver<Option<S>>
inner: S,
rx: mpsc::Receiver<Option<S>>,
inner: Option<S>,
actuator: Box<dyn Actuator + Send + Sync>,
future: Option<Box<dyn Future<Output=()> + Unpin + Send>>,
}
pub fn load() {
let s = Mutable::new(MachineState::new());
impl<S: Signal + Unpin> Actor<S> {
pub fn new(rx: mpsc::Receiver<Option<S>>, actuator: Box<dyn Actuator + Send + Sync>) -> Self {
Self {
rx: rx,
inner: None,
actuator: actuator,
future: None,
}
}
Ok(())
pub fn wrap(actuator: Box<dyn Actuator + Send + Sync>) -> (mpsc::Sender<Option<S>>, Self) {
let (tx, rx) = mpsc::channel(1);
(tx, Self::new(rx, actuator))
}
}
#[must_use = "Signals do nothing unless polled"]
pub struct MaybeFlatten<A: Signal, B: Signal> {
signal: Option<A>,
inner: Option<B>,
}
impl<S: Signal<Item=MachineState> + Unpin> Future for Actor<S> {
type Output = ();
// Poll parent => Has inner => Poll inner => Output
// --------------------------------------------------------
// Some(Some(inner)) => => Some(value) => Some(value)
// Some(Some(inner)) => => => Pending
// Some(None) => => => Pending
// None => Some(inner) => Some(value) => Some(value)
// None => Some(inner) => None => None
// None => Some(inner) => Pending => Pending
// None => None => => None
// Pending => Some(inner) => Some(value) => Some(value)
// Pending => Some(inner) => None => Pending
// Pending => Some(inner) => Pending => Pending
// Pending => None => => Pending
impl<A, B> Signal for MaybeFlatten<A, B>
where A: Signal<Item=Option<B>> + Unpin,
B: Signal + Unpin,
{
type Item = B::Item;
#[inline]
fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut this = &mut *self;
let mut done = false; // Is the channel with new state-signals exhausted?
let done = match this.signal.as_mut().map(|signal| Signal::poll_change(Pin::new(signal), cx)) {
None => true,
Some(Poll::Ready(None)) => {
this.signal = None;
true
},
Some(Poll::Ready(Some(new_inner))) => {
this.inner = new_inner;
false
},
Some(Poll::Pending) => false,
};
// Update the signal we're polling from, if there is an update that is.
match Stream::poll_next(Pin::new(&mut this.rx), cx) {
Poll::Ready(None) => done = true,
Poll::Ready(Some(new_signal)) => this.inner = new_signal,
Poll::Pending => { },
}
// Poll the `apply` future.
match this.future.as_mut().map(|future| Future::poll(Pin::new(future), cx)) {
None => { }
Some(Poll::Ready(_)) => this.future = None,
Some(Poll::Pending) => return Poll::Pending,
}
// Poll the signal and apply all changes that happen to the inner Actuator
match this.inner.as_mut().map(|inner| Signal::poll_change(Pin::new(inner), cx)) {
None => Poll::Pending,
Some(Poll::Pending) => Poll::Pending,
Some(Poll::Ready(None)) => {
this.inner = None;
},
Some(poll) => {
return poll;
},
None => {},
}
if done {
Poll::Ready(None)
} else {
Poll::Pending
if done {
Poll::Ready(())
} else {
Poll::Pending
}
},
Some(Poll::Ready(Some(state))) => {
this.actuator.apply(state);
Poll::Pending
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_test::*;
use futures_signals::signal::Signal;
pub fn load<S: Signal<Item=MachineState> + Unpin>() -> Result<(mpsc::Sender<Option<S>>, Actor<S>)> {
let d = Box::new(crate::registries::actuators::Dummy);
let a = Actor::wrap(d);
#[test]
fn load_test() {
let (a, s, m) = super::load().unwrap();
let cx = task::panic_context();
a.signal.poll_change(&mut cx);
}
Ok(a)
}

View File

@ -27,8 +27,6 @@ use smol::channel::{Receiver, Sender};
use futures::{Future, Stream, StreamExt};
use futures_signals::signal::*;
use crate::registries::StatusSignal;
use crate::machine::MachineDescription;
use crate::db::user::UserId;

View File

@ -135,27 +135,32 @@ fn maybe(matches: clap::ArgMatches, log: Arc<Logger>) -> Result<(), Error> {
error!(log, "Loading is currently not implemented");
Ok(())
} else {
let db = db::Databases::new(&log, &config)?;
// TODO: Spawn api connections on their own (non-main) thread, use the main thread to
// handle signals (a cli if stdin is not closed?) and make it stop and clean up all threads
// when bffh should exit
let machines = machine::load(&config)?;
let actors = actor::load()?;
let initiators = initiator::load(&config)?;
//let machines = machine::load(&config)?;
//let initiators = initiator::load(&config)?;
let ex = Executor::new();
for i in initiators.into_iter() {
ex.spawn(i.run());
}
let m = futures_signals::signal::Mutable::new(crate::db::machine::MachineState::new());
let (mut tx, actor) = actor::load()?;
// TODO HERE: restore connections between initiators, machines, actors
// Like so
tx.try_send(Some(m.signal_cloned())).unwrap();
// TODO HERE: Spawn all actors & inits
// Like so
ex.spawn(actor);
let (signal, shutdown) = async_channel::bounded::<()>(1);
easy_parallel::Parallel::new()
.each(0..4, |_| smol::block_on(ex.run(shutdown.recv())));
let db = db::Databases::new(&log, &config)?;
// TODO: Spawn api connections on their own (non-main) thread, use the main thread to
// handle signals (a cli if stdin is not closed?) and make it stop and clean up all threads
// when bffh should exit
server::serve_api_connections(log.clone(), config, db)
// Signal is dropped here, stopping all executor threads as well.
}

View File

@ -1,9 +1,9 @@
use slog::Logger;
use crate::config::Settings;
use crate::registries::{Registries, Actuator, ActBox, StatusSignal};
use crate::error::Result;
use crate::db::machine::Status;
use crate::registries::Registries;
use std::pin::Pin;
use futures::prelude::*;
@ -19,100 +19,5 @@ use paho_mqtt as mqtt;
// entirety. This works reasonably enough for this static modules here but if we do dynamic loading
// via dlopen(), lua API, python API etc it will not.
pub async fn run<S: Spawn>(log: Logger, config: Settings, registries: Registries, spawner: S) {
let rx = registries.actuators.register("shelly".to_string()).await;
let mut shelly = Shelly::new(log, config, rx).await;
let f = shelly.for_each(|f| f);
spawner.spawn_obj(FutureObj::from(Box::pin(f)));
}
/// An actuator for all Shellies connected listening on one MQTT broker
///
/// This actuator can power toggle an arbitrariy named shelly on the broker it is connected to. If
/// you need to toggle shellies on multiple brokers you need multiple instanced of this actuator.
struct Shelly {
log: Logger,
sigchan: mpsc::Receiver<StatusSignal>,
signal: Option<StatusSignal>,
waker: Option<Waker>,
name: String,
client: mqtt::AsyncClient,
}
impl Shelly {
// Can't use Error, it's not Send. fabinfra/fabaccess/bffh#7
pub async fn new(log: Logger, config: Settings, sigchan: mpsc::Receiver<StatusSignal>) -> Self {
let client = mqtt::AsyncClient::new(config.shelly.unwrap().mqtt_url).unwrap();
let o = client.connect(mqtt::ConnectOptions::new()).await.unwrap();
println!("{:?}", o);
let name = "test".to_string();
let signal: Option<StatusSignal> = None;
let waker = None;
Shelly { log, sigchan, signal, waker, name, client }
}
}
impl Actuator for Shelly {
fn subscribe(&mut self, signal: StatusSignal) {
self.signal.replace(signal);
if let Some(waker) = self.waker.take() {
waker.wake();
}
}
}
impl Stream for Shelly {
type Item = future::BoxFuture<'static, ()>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let unpin = Pin::into_inner(self);
info!(unpin.log, "tick {}", unpin.signal.is_some());
if let Poll::Ready(v) = Stream::poll_next(Pin::new(&mut unpin.sigchan), cx) {
if let Some(s) = v {
// We have received a new signal to use
unpin.signal.replace(s);
// We use `if let` instead of .and_then because we want the waker to be dropped
// afterwards. It's only there to ensure the future is called when a signal is
// installed the first time
// TODO probably don't need that here because we're polling it either way directly
// afterwards, eh?
if let Some(waker) = unpin.waker.take() {
waker.wake();
}
} else {
info!(unpin.log, "bye");
// This means that the sending end was dropped, so we shut down
unpin.signal.take();
unpin.waker.take();
return Poll::Ready(None);
}
}
if let Some(ref mut s) = unpin.signal {
if let Some(status) = ready!(Signal::poll_change(Pin::new(s), cx)) {
info!(unpin.log, "Machine Status changed: {:?}", status);
let topic = format!("shellies/{}/relay/0/command", unpin.name);
let pl = match status {
Status::InUse(_, _) => "on",
_ => "off",
};
let msg = mqtt::Message::new(topic, pl, 0);
let f = unpin.client.publish(msg).map(|_| ());
return Poll::Ready(Some(Box::pin(f)));
}
} else {
info!(unpin.log, "I ain't got no signal son");
unpin.waker.replace(cx.waker().clone());
}
Poll::Pending
}
}

View File

@ -1,9 +1,7 @@
use std::sync::Arc;
mod actuators;
mod sensors;
pub use actuators::{Actuator, ActBox, StatusSignal};
pub mod actuators;
pub mod sensors;
#[derive(Clone)]
/// BFFH registries
@ -11,14 +9,12 @@ pub use actuators::{Actuator, ActBox, StatusSignal};
/// This struct is only a reference to the underlying registries - cloning it will generate a new
/// reference, not clone the registries
pub struct Registries {
pub actuators: actuators::Actuators,
pub sensors: sensors::Sensors,
}
impl Registries {
pub fn new() -> Self {
Registries {
actuators: actuators::Actuators::new(),
sensors: sensors::Sensors::new(),
}
}

View File

@ -10,76 +10,18 @@ use futures::channel::mpsc;
use futures::task::{Context, Poll, Spawn};
use futures_signals::signal::Signal;
use crate::db::machine::Status;
use crate::db::machine::MachineState;
use std::collections::HashMap;
#[derive(Clone)]
pub struct Actuators {
inner: Arc<RwLock<Inner>>,
pub trait Actuator {
fn apply(&mut self, state: MachineState);
}
pub type ActBox = Box<dyn Actuator + Sync + Send + Unpin>;
type Inner = HashMap<String, mpsc::Sender<StatusSignal>>;
impl Actuators {
pub fn new() -> Self {
Actuators {
inner: Arc::new(RwLock::new(Inner::new()))
}
}
pub async fn register(&self, name: String) -> mpsc::Receiver<StatusSignal> {
let (tx, rx) = mpsc::channel(1);
let mut wlock = self.inner.write().await;
// TODO: Log an error or something if that name was already taken
wlock.insert(name, tx);
return rx;
}
pub async fn subscribe(&mut self, name: String, signal: StatusSignal) {
let mut wlock = self.inner.write().await;
if let Some(tx) = wlock.get_mut(&name) {
tx.send(signal).await;
}
}
}
pub type StatusSignal = Pin<Box<dyn Signal<Item = Status> + Send + Sync>>;
pub trait Actuator: Stream<Item = future::BoxFuture<'static, ()>> {
fn subscribe(&mut self, signal: StatusSignal);
}
// This is merely a proof that Actuator *can* be implemented on a finite, known type. Yay for type
// systems with halting problems.
struct Dummy {
log: Logger,
sigchan: mpsc::Receiver<StatusSignal>,
signal: Option<StatusSignal>,
}
pub struct Dummy;
impl Actuator for Dummy {
fn subscribe(&mut self, signal: StatusSignal) {
self.signal.replace(signal);
}
}
impl Stream for Dummy {
type Item = future::BoxFuture<'static, ()>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let unpin = Pin::into_inner(self);
if let Some(ref mut s) = unpin.signal {
let status = ready!(Signal::poll_change(Pin::new(s), cx));
info!(unpin.log, "Dummy actuator would set status to {:?}, but is a Dummy", status);
Poll::Ready(Some(Box::pin(futures::future::ready(()))))
} else {
Poll::Pending
}
fn apply(&mut self, state: MachineState) {
println!("New state for dummy actuator: {:?}", state);
}
}