diff --git a/src/actor.rs b/src/actor.rs index 9e9a684..25ec244 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -62,6 +62,9 @@ impl Future for Actor { let mut this = &mut *self; let mut done = false; // Is the channel with new state-signals exhausted? + // FIXME: This is potentially invalid, and may lead to the situation that the signal is + // replaced *twice* but the second change will not be honoured since this implementation of + // events is *EDGE*-triggered! // Update the signal we're polling from, if there is an update that is. match Stream::poll_next(Pin::new(&mut this.rx), cx) { Poll::Ready(None) => done = true, @@ -69,39 +72,58 @@ impl Future for Actor { Poll::Pending => { }, } - // Poll the `apply` future. - match this.future.as_mut().map(|future| Future::poll(Pin::new(future), cx)) { - None => { } - Some(Poll::Ready(_)) => this.future = None, - Some(Poll::Pending) => return Poll::Pending, - } + // Work until there is no more work to do. + loop { - // Poll the signal and apply all changes that happen to the inner Actuator - match this.inner.as_mut().map(|inner| Signal::poll_change(Pin::new(inner), cx)) { - None => Poll::Pending, - Some(Poll::Pending) => Poll::Pending, - Some(Poll::Ready(None)) => { - this.inner = None; + // Poll the `apply` future. And ensure it's completed before the next one is started + match this.future.as_mut().map(|future| Future::poll(Pin::new(future), cx)) { + // Skip and poll for a new future to do + None => { } - if done { - Poll::Ready(()) - } else { - Poll::Pending + // This apply future is done, get a new one + Some(Poll::Ready(_)) => this.future = None, + + // This future would block so we return to continue work another time + Some(Poll::Pending) => return Poll::Pending, + } + + // Poll the signal and apply any change that happen to the inner Actuator + match this.inner.as_mut().map(|inner| Signal::poll_change(Pin::new(inner), cx)) { + // No signal to poll + None => return Poll::Pending, + Some(Poll::Pending) => return Poll::Pending, + Some(Poll::Ready(None)) => { + this.inner = None; + + if done { + return Poll::Ready(()); + } else { + return Poll::Pending; + } + }, + Some(Poll::Ready(Some(state))) => { + // This future MUST be polled before we exit from the Actor::poll because if we + // do not do that it will not register the dependency and thus NOT BE POLLED. + this.future.replace(this.actuator.apply(state)); } - }, - Some(Poll::Ready(Some(state))) => { - this.future.replace(this.actuator.apply(state)); - Poll::Pending } } } } -pub struct Dummy; +pub struct Dummy { + log: Logger, +} + +impl Dummy { + pub fn new(log: &Logger) -> Self { + Self { log: log.new(o!("module" => "Dummy Actor")) } + } +} impl Actuator for Dummy { fn apply(&mut self, state: MachineState) -> BoxFuture<'static, ()> { - println!("New state for dummy actuator: {:?}", state); + info!(self.log, "New state for dummy actuator: {:?}", state); Box::pin(smol::future::ready(())) } } @@ -146,7 +168,7 @@ fn load_single( Some(Box::new(Shelly::new(log, name.clone(), client.clone()))) }, "Dummy" => { - Some(Box::new(Dummy)) + Some(Box::new(Dummy::new(log))) } _ => { error!(log, "No actor found with name \"{}\", configured as \"{}\".", module_name, name); diff --git a/src/initiator.rs b/src/initiator.rs index 580787f..94765f8 100644 --- a/src/initiator.rs +++ b/src/initiator.rs @@ -80,7 +80,7 @@ impl Future for Initiator { }, Some(Poll::Ready((user, state))) => { this.future.take(); - this.machine.as_mut().map(|machine| machine.request_state_change(user.as_ref(), state)); + this.machine.as_mut().map(|machine| machine.request_state_change(user.as_ref(), state).unwrap()); } Some(Poll::Pending) => return Poll::Pending, } @@ -118,7 +118,7 @@ fn load_single( { match module_name.as_ref() { "Dummy" => { - Some(Box::new(Dummy::new())) + Some(Box::new(Dummy::new(log))) }, _ => { error!(log, "No initiator found with name \"{}\", configured as \"{}\"", @@ -129,12 +129,13 @@ fn load_single( } pub struct Dummy { - step: bool + log: Logger, + step: bool, } impl Dummy { - pub fn new() -> Self { - Self { step: false } + pub fn new(log: &Logger) -> Self { + Self { log: log.new(o!("module" => "Dummy Initiator")), step: false } } } @@ -143,7 +144,9 @@ impl Sensor for Dummy { -> BoxFuture<'static, (Option, MachineState)> { let step = self.step; - self.step = !self.step; + self.step = !step; + + info!(self.log, "Kicking off new dummy initiator state change: {}", step); let f = async move { Timer::after(std::time::Duration::from_secs(1)).await; diff --git a/src/machine.rs b/src/machine.rs index 9e6a872..711c1d6 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -172,12 +172,8 @@ impl Inner { return Ok(tx); } - pub fn set_state(&mut self, state: Status) { - self.state.set(MachineState { state }) - } - pub fn get_signal(&self) -> impl Signal { - self.state.signal_cloned().dedupe_cloned() + self.state.signal_cloned() } pub fn reset_state(&mut self) { @@ -199,6 +195,7 @@ impl Future for Inner { return Poll::Ready(self.state.get_cloned()); } + // Check if the return token was sent/dropped if let Some(mut rx) = this.rx.take() { match Future::poll(Pin::new(&mut rx), cx) { // Regardless if we were canceled or properly returned, reset. diff --git a/src/main.rs b/src/main.rs index 926e1be..0bb1961 100644 --- a/src/main.rs +++ b/src/main.rs @@ -160,8 +160,12 @@ fn maybe(matches: clap::ArgMatches, log: Arc) -> Result<(), Error> { } } - let actor_tasks = actors.into_iter().map(|actor| ex.spawn(actor)); - let init_tasks = initiators.into_iter().map(|init| ex.spawn(init)); + for actor in actors.into_iter() { + ex.spawn(actor).detach(); + } + for init in initiators.into_iter() { + ex.spawn(init).detach(); + } let (signal, shutdown) = async_channel::bounded::<()>(1); easy_parallel::Parallel::new()