use std::cell::Cell; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use futures_signals::signal::{MutableSignalRef, ReadOnlyMutable, Signal}; use futures_util::future::BoxFuture; use crate::resource::state::State; pub trait Actor { fn apply(&mut self, state: State) -> BoxFuture<'static, ()>; } fn loader>(cell: &Cell>) -> Option { cell.take() } pub struct ActorDriver { rx: MutableSignalRef>, &'static dyn Fn(&Cell>) -> Option>, signal: Option, actor: Box, future: Option>, } impl> ActorDriver { pub fn new(rx: &ReadOnlyMutable>>, actor: Box) -> Self { let f: &'static dyn Fn(&Cell>) -> Option = &loader; let rx = rx.signal_ref(f); Self { rx, signal: None, actor, future: None, } } } impl Future for ActorDriver where S: Signal + Unpin + Send, { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { let mut done = false; // Is the channel with new state-signals exhausted? match Pin::new(&mut self.rx).poll_change(cx) { Poll::Ready(Some(new_signal)) => { self.signal = new_signal; }, Poll::Ready(None) => done = true, Poll::Pending => {}, } // Work until there is no more work to do. loop { // Poll the `apply` future. And ensure it's completed before the next one is started match self.future.as_mut() .map(|future| Future::poll(Pin::new(future), cx)) { // Skip and poll for a new future to do None => { } // This apply future is done, get a new one Some(Poll::Ready(_)) => self.future = None, // This future would block so we return to continue work another time Some(Poll::Pending) => return Poll::Pending, } // Poll the signal and apply any change that happen to the inner Actuator match self.signal.as_mut() .map(|inner| S::poll_change(Pin::new(inner), cx)) { // No signal to poll None => return Poll::Pending, Some(Poll::Pending) => return Poll::Pending, Some(Poll::Ready(None)) => { self.signal = None; if done { return Poll::Ready(()); } else { return Poll::Pending; } }, Some(Poll::Ready(Some(state))) => { // This future MUST be polled before we exit from the Actor::poll because if we // do not do that it will not register the dependency and thus NOT BE POLLED. let f = self.actor.apply(state); self.future.replace(f); } } } } }