2020-12-01 10:21:39 +01:00
|
|
|
use std::pin::Pin;
|
|
|
|
use std::task::{Poll, Context};
|
|
|
|
use std::sync::Arc;
|
2020-12-01 09:44:18 +01:00
|
|
|
use std::future::Future;
|
|
|
|
|
2020-12-01 10:21:39 +01:00
|
|
|
use smol::Executor;
|
|
|
|
|
|
|
|
use futures::{future::BoxFuture, Stream, StreamExt};
|
2020-12-01 16:06:39 +01:00
|
|
|
use futures::channel::mpsc;
|
|
|
|
use futures_signals::signal::{Signal, MutableSignalCloned, MutableSignal, Mutable};
|
2020-12-01 09:44:18 +01:00
|
|
|
|
|
|
|
use crate::db::machine::MachineState;
|
2020-12-02 11:31:17 +01:00
|
|
|
use crate::registries::actuators::Actuator;
|
2020-12-01 09:44:18 +01:00
|
|
|
use crate::config::Settings;
|
|
|
|
use crate::error::Result;
|
|
|
|
|
2020-12-01 16:06:39 +01:00
|
|
|
pub struct Actor<S: Signal> {
|
|
|
|
// FIXME: This should really be a Signal.
|
|
|
|
// But, alas, MutableSignalCloned is itself not `Clone`. For good reason as keeping track of
|
|
|
|
// the changes itself happens in a way that Clone won't work (well).
|
|
|
|
// So, you can't clone it, you can't copy it and you can't get at the variable inside outside
|
|
|
|
// of a task context. In short, using Mutable isn't possible and we would have to write our own
|
|
|
|
// implementation of MutableSignal*'s . Preferably with the correct optimizations for our case
|
|
|
|
// where there is only one consumer. So a mpsc channel that drops all but the last input.
|
2020-12-02 11:31:17 +01:00
|
|
|
rx: mpsc::Receiver<Option<S>>,
|
|
|
|
inner: Option<S>,
|
2020-12-01 10:21:39 +01:00
|
|
|
|
2020-12-02 11:31:17 +01:00
|
|
|
actuator: Box<dyn Actuator + Send + Sync>,
|
2020-12-02 11:46:46 +01:00
|
|
|
future: Option<BoxFuture<'static, ()>>,
|
2020-12-01 10:21:39 +01:00
|
|
|
}
|
|
|
|
|
2020-12-02 11:31:17 +01:00
|
|
|
impl<S: Signal + Unpin> Actor<S> {
|
|
|
|
pub fn new(rx: mpsc::Receiver<Option<S>>, actuator: Box<dyn Actuator + Send + Sync>) -> Self {
|
|
|
|
Self {
|
|
|
|
rx: rx,
|
|
|
|
inner: None,
|
|
|
|
actuator: actuator,
|
|
|
|
future: None,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn wrap(actuator: Box<dyn Actuator + Send + Sync>) -> (mpsc::Sender<Option<S>>, Self) {
|
|
|
|
let (tx, rx) = mpsc::channel(1);
|
|
|
|
(tx, Self::new(rx, actuator))
|
|
|
|
}
|
2020-12-01 16:06:39 +01:00
|
|
|
}
|
2020-12-01 10:21:39 +01:00
|
|
|
|
2020-12-02 11:31:17 +01:00
|
|
|
impl<S: Signal<Item=MachineState> + Unpin> Future for Actor<S> {
|
|
|
|
type Output = ();
|
2020-12-01 16:06:39 +01:00
|
|
|
|
2020-12-02 11:31:17 +01:00
|
|
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
2020-12-01 10:21:39 +01:00
|
|
|
let mut this = &mut *self;
|
2020-12-02 11:31:17 +01:00
|
|
|
let mut done = false; // Is the channel with new state-signals exhausted?
|
2020-12-01 10:21:39 +01:00
|
|
|
|
2020-12-02 11:31:17 +01:00
|
|
|
// Update the signal we're polling from, if there is an update that is.
|
|
|
|
match Stream::poll_next(Pin::new(&mut this.rx), cx) {
|
|
|
|
Poll::Ready(None) => done = true,
|
|
|
|
Poll::Ready(Some(new_signal)) => this.inner = new_signal,
|
|
|
|
Poll::Pending => { },
|
|
|
|
}
|
2020-12-01 16:06:39 +01:00
|
|
|
|
2020-12-02 11:31:17 +01:00
|
|
|
// Poll the `apply` future.
|
|
|
|
match this.future.as_mut().map(|future| Future::poll(Pin::new(future), cx)) {
|
|
|
|
None => { }
|
|
|
|
Some(Poll::Ready(_)) => this.future = None,
|
|
|
|
Some(Poll::Pending) => return Poll::Pending,
|
|
|
|
}
|
|
|
|
|
|
|
|
// Poll the signal and apply all changes that happen to the inner Actuator
|
2020-12-01 16:06:39 +01:00
|
|
|
match this.inner.as_mut().map(|inner| Signal::poll_change(Pin::new(inner), cx)) {
|
2020-12-02 11:31:17 +01:00
|
|
|
None => Poll::Pending,
|
|
|
|
Some(Poll::Pending) => Poll::Pending,
|
2020-12-01 16:06:39 +01:00
|
|
|
Some(Poll::Ready(None)) => {
|
|
|
|
this.inner = None;
|
2020-12-01 09:44:18 +01:00
|
|
|
|
2020-12-02 11:31:17 +01:00
|
|
|
if done {
|
|
|
|
Poll::Ready(())
|
|
|
|
} else {
|
|
|
|
Poll::Pending
|
|
|
|
}
|
|
|
|
},
|
|
|
|
Some(Poll::Ready(Some(state))) => {
|
2020-12-02 11:46:46 +01:00
|
|
|
this.future.replace(this.actuator.apply(state));
|
2020-12-02 11:31:17 +01:00
|
|
|
Poll::Pending
|
|
|
|
}
|
2020-12-01 10:21:39 +01:00
|
|
|
}
|
2020-12-01 09:44:18 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-12-02 11:31:17 +01:00
|
|
|
pub fn load<S: Signal<Item=MachineState> + Unpin>() -> Result<(mpsc::Sender<Option<S>>, Actor<S>)> {
|
|
|
|
let d = Box::new(crate::registries::actuators::Dummy);
|
|
|
|
let a = Actor::wrap(d);
|
2020-12-01 16:06:39 +01:00
|
|
|
|
2020-12-02 11:31:17 +01:00
|
|
|
Ok(a)
|
2020-12-01 09:44:18 +01:00
|
|
|
}
|