fabaccess-bffh/src/actor.rs

157 lines
4.8 KiB
Rust
Raw Normal View History

2020-12-01 10:21:39 +01:00
use std::pin::Pin;
use std::task::{Poll, Context};
use std::sync::Arc;
use std::collections::HashMap;
2020-12-01 09:44:18 +01:00
use std::future::Future;
2020-12-01 10:21:39 +01:00
use smol::Executor;
use futures::{future::BoxFuture, Stream, StreamExt};
2020-12-01 16:06:39 +01:00
use futures::channel::mpsc;
use futures_signals::signal::{Signal, MutableSignalCloned, MutableSignal, Mutable};
2020-12-01 09:44:18 +01:00
use crate::db::machine::MachineState;
2020-12-14 11:02:46 +01:00
use crate::config::Config;
2020-12-01 09:44:18 +01:00
use crate::error::Result;
use crate::network::ActorMap;
2020-12-14 11:02:46 +01:00
use paho_mqtt::AsyncClient;
use slog::Logger;
2020-12-09 10:49:34 +01:00
pub trait Actuator {
fn apply(&mut self, state: MachineState) -> BoxFuture<'static, ()>;
}
pub type ActorSignal = Box<dyn Signal<Item=MachineState> + Unpin + Send>;
pub struct Actor {
2020-12-01 16:06:39 +01:00
// FIXME: This should really be a Signal.
// But, alas, MutableSignalCloned is itself not `Clone`. For good reason as keeping track of
// the changes itself happens in a way that Clone won't work (well).
// So, you can't clone it, you can't copy it and you can't get at the variable inside outside
// of a task context. In short, using Mutable isn't possible and we would have to write our own
// implementation of MutableSignal*'s . Preferably with the correct optimizations for our case
// where there is only one consumer. So a mpsc channel that drops all but the last input.
rx: mpsc::Receiver<Option<ActorSignal>>,
inner: Option<ActorSignal>,
2020-12-01 10:21:39 +01:00
2020-12-02 11:31:17 +01:00
actuator: Box<dyn Actuator + Send + Sync>,
2020-12-02 11:46:46 +01:00
future: Option<BoxFuture<'static, ()>>,
2020-12-01 10:21:39 +01:00
}
impl Actor {
pub fn new(rx: mpsc::Receiver<Option<ActorSignal>>, actuator: Box<dyn Actuator + Send + Sync>) -> Self {
2020-12-02 11:31:17 +01:00
Self {
rx: rx,
inner: None,
actuator: actuator,
future: None,
}
}
pub fn wrap(actuator: Box<dyn Actuator + Send + Sync>) -> (mpsc::Sender<Option<ActorSignal>>, Self) {
2020-12-02 11:31:17 +01:00
let (tx, rx) = mpsc::channel(1);
(tx, Self::new(rx, actuator))
}
2020-12-01 16:06:39 +01:00
}
2020-12-01 10:21:39 +01:00
impl Future for Actor {
2020-12-02 11:31:17 +01:00
type Output = ();
2020-12-01 16:06:39 +01:00
2020-12-02 11:31:17 +01:00
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
2020-12-01 10:21:39 +01:00
let mut this = &mut *self;
2020-12-02 11:31:17 +01:00
let mut done = false; // Is the channel with new state-signals exhausted?
2020-12-01 10:21:39 +01:00
2020-12-02 11:31:17 +01:00
// Update the signal we're polling from, if there is an update that is.
match Stream::poll_next(Pin::new(&mut this.rx), cx) {
Poll::Ready(None) => done = true,
Poll::Ready(Some(new_signal)) => this.inner = new_signal,
Poll::Pending => { },
}
2020-12-01 16:06:39 +01:00
2020-12-02 11:31:17 +01:00
// Poll the `apply` future.
match this.future.as_mut().map(|future| Future::poll(Pin::new(future), cx)) {
None => { }
Some(Poll::Ready(_)) => this.future = None,
Some(Poll::Pending) => return Poll::Pending,
}
// Poll the signal and apply all changes that happen to the inner Actuator
2020-12-01 16:06:39 +01:00
match this.inner.as_mut().map(|inner| Signal::poll_change(Pin::new(inner), cx)) {
2020-12-02 11:31:17 +01:00
None => Poll::Pending,
Some(Poll::Pending) => Poll::Pending,
2020-12-01 16:06:39 +01:00
Some(Poll::Ready(None)) => {
this.inner = None;
2020-12-01 09:44:18 +01:00
2020-12-02 11:31:17 +01:00
if done {
Poll::Ready(())
} else {
Poll::Pending
}
},
Some(Poll::Ready(Some(state))) => {
2020-12-02 11:46:46 +01:00
this.future.replace(this.actuator.apply(state));
2020-12-02 11:31:17 +01:00
Poll::Pending
}
2020-12-01 10:21:39 +01:00
}
2020-12-01 09:44:18 +01:00
}
}
2020-12-09 10:49:34 +01:00
pub struct Dummy;
impl Actuator for Dummy {
fn apply(&mut self, state: MachineState) -> BoxFuture<'static, ()> {
println!("New state for dummy actuator: {:?}", state);
Box::pin(smol::future::ready(()))
}
}
2020-12-14 11:02:46 +01:00
pub fn load(log: &Logger, client: &AsyncClient, config: &Config) -> Result<(ActorMap, Vec<Actor>)> {
let mut map = HashMap::new();
2020-12-01 16:06:39 +01:00
2020-12-14 11:02:46 +01:00
let actuators = config.actors.iter()
2020-12-14 12:39:01 +01:00
.map(|(k,v)| (k, load_single(log, client, k, &v.module, &v.params)))
2020-12-14 11:02:46 +01:00
.filter_map(|(k, n)| match n {
None => None,
Some(a) => Some((k, a))
});
let mut v = Vec::new();
for (name, actuator) in actuators {
let (tx, a) = Actor::wrap(actuator);
map.insert(name.clone(), tx);
v.push(a);
}
Ok(( map, v ))
}
2020-12-14 12:39:01 +01:00
fn load_single(
log: &Logger,
client: &AsyncClient,
name: &String,
module_name: &String,
params: &HashMap<String, String>
) -> Option<Box<dyn Actuator + Sync + Send>>
{
2020-12-14 11:02:46 +01:00
use crate::modules::*;
match module_name.as_ref() {
"Shelly" => {
2020-12-14 12:39:01 +01:00
if !params.is_empty() {
warn!(log, "\"{}\" module expects no parameters. Configured as \"{}\".",
module_name, name);
}
2020-12-14 11:02:46 +01:00
Some(Box::new(Shelly::new(log, name.clone(), client.clone())))
2020-12-14 12:39:01 +01:00
},
"Dummy" => {
Some(Box::new(Dummy))
2020-12-14 11:02:46 +01:00
}
2020-12-14 12:39:01 +01:00
_ => {
error!(log, "No actor found with name \"{}\", configured as \"{}\".", module_name, name);
None
},
2020-12-14 11:02:46 +01:00
}
2020-12-01 09:44:18 +01:00
}