Fix dependency check error. Network now works. \o/

This commit is contained in:
Gregor Reitzenstein 2020-12-14 14:45:16 +01:00
parent 1dc8dc4710
commit 6d5802c0a5
4 changed files with 62 additions and 36 deletions

View File

@ -62,6 +62,9 @@ impl Future for Actor {
let mut this = &mut *self; let mut this = &mut *self;
let mut done = false; // Is the channel with new state-signals exhausted? let mut done = false; // Is the channel with new state-signals exhausted?
// FIXME: This is potentially invalid, and may lead to the situation that the signal is
// replaced *twice* but the second change will not be honoured since this implementation of
// events is *EDGE*-triggered!
// Update the signal we're polling from, if there is an update that is. // Update the signal we're polling from, if there is an update that is.
match Stream::poll_next(Pin::new(&mut this.rx), cx) { match Stream::poll_next(Pin::new(&mut this.rx), cx) {
Poll::Ready(None) => done = true, Poll::Ready(None) => done = true,
@ -69,39 +72,58 @@ impl Future for Actor {
Poll::Pending => { }, Poll::Pending => { },
} }
// Poll the `apply` future. // Work until there is no more work to do.
match this.future.as_mut().map(|future| Future::poll(Pin::new(future), cx)) { loop {
None => { }
Some(Poll::Ready(_)) => this.future = None,
Some(Poll::Pending) => return Poll::Pending,
}
// Poll the signal and apply all changes that happen to the inner Actuator // Poll the `apply` future. And ensure it's completed before the next one is started
match this.inner.as_mut().map(|inner| Signal::poll_change(Pin::new(inner), cx)) { match this.future.as_mut().map(|future| Future::poll(Pin::new(future), cx)) {
None => Poll::Pending, // Skip and poll for a new future to do
Some(Poll::Pending) => Poll::Pending, None => { }
Some(Poll::Ready(None)) => {
this.inner = None;
if done { // This apply future is done, get a new one
Poll::Ready(()) Some(Poll::Ready(_)) => this.future = None,
} else {
Poll::Pending // This future would block so we return to continue work another time
Some(Poll::Pending) => return Poll::Pending,
}
// Poll the signal and apply any change that happen to the inner Actuator
match this.inner.as_mut().map(|inner| Signal::poll_change(Pin::new(inner), cx)) {
// No signal to poll
None => return Poll::Pending,
Some(Poll::Pending) => return Poll::Pending,
Some(Poll::Ready(None)) => {
this.inner = None;
if done {
return Poll::Ready(());
} else {
return Poll::Pending;
}
},
Some(Poll::Ready(Some(state))) => {
// This future MUST be polled before we exit from the Actor::poll because if we
// do not do that it will not register the dependency and thus NOT BE POLLED.
this.future.replace(this.actuator.apply(state));
} }
},
Some(Poll::Ready(Some(state))) => {
this.future.replace(this.actuator.apply(state));
Poll::Pending
} }
} }
} }
} }
pub struct Dummy; pub struct Dummy {
log: Logger,
}
impl Dummy {
pub fn new(log: &Logger) -> Self {
Self { log: log.new(o!("module" => "Dummy Actor")) }
}
}
impl Actuator for Dummy { impl Actuator for Dummy {
fn apply(&mut self, state: MachineState) -> BoxFuture<'static, ()> { fn apply(&mut self, state: MachineState) -> BoxFuture<'static, ()> {
println!("New state for dummy actuator: {:?}", state); info!(self.log, "New state for dummy actuator: {:?}", state);
Box::pin(smol::future::ready(())) Box::pin(smol::future::ready(()))
} }
} }
@ -146,7 +168,7 @@ fn load_single(
Some(Box::new(Shelly::new(log, name.clone(), client.clone()))) Some(Box::new(Shelly::new(log, name.clone(), client.clone())))
}, },
"Dummy" => { "Dummy" => {
Some(Box::new(Dummy)) Some(Box::new(Dummy::new(log)))
} }
_ => { _ => {
error!(log, "No actor found with name \"{}\", configured as \"{}\".", module_name, name); error!(log, "No actor found with name \"{}\", configured as \"{}\".", module_name, name);

View File

@ -80,7 +80,7 @@ impl Future for Initiator {
}, },
Some(Poll::Ready((user, state))) => { Some(Poll::Ready((user, state))) => {
this.future.take(); this.future.take();
this.machine.as_mut().map(|machine| machine.request_state_change(user.as_ref(), state)); this.machine.as_mut().map(|machine| machine.request_state_change(user.as_ref(), state).unwrap());
} }
Some(Poll::Pending) => return Poll::Pending, Some(Poll::Pending) => return Poll::Pending,
} }
@ -118,7 +118,7 @@ fn load_single(
{ {
match module_name.as_ref() { match module_name.as_ref() {
"Dummy" => { "Dummy" => {
Some(Box::new(Dummy::new())) Some(Box::new(Dummy::new(log)))
}, },
_ => { _ => {
error!(log, "No initiator found with name \"{}\", configured as \"{}\"", error!(log, "No initiator found with name \"{}\", configured as \"{}\"",
@ -129,12 +129,13 @@ fn load_single(
} }
pub struct Dummy { pub struct Dummy {
step: bool log: Logger,
step: bool,
} }
impl Dummy { impl Dummy {
pub fn new() -> Self { pub fn new(log: &Logger) -> Self {
Self { step: false } Self { log: log.new(o!("module" => "Dummy Initiator")), step: false }
} }
} }
@ -143,7 +144,9 @@ impl Sensor for Dummy {
-> BoxFuture<'static, (Option<User>, MachineState)> -> BoxFuture<'static, (Option<User>, MachineState)>
{ {
let step = self.step; let step = self.step;
self.step = !self.step; self.step = !step;
info!(self.log, "Kicking off new dummy initiator state change: {}", step);
let f = async move { let f = async move {
Timer::after(std::time::Duration::from_secs(1)).await; Timer::after(std::time::Duration::from_secs(1)).await;

View File

@ -172,12 +172,8 @@ impl Inner {
return Ok(tx); return Ok(tx);
} }
pub fn set_state(&mut self, state: Status) {
self.state.set(MachineState { state })
}
pub fn get_signal(&self) -> impl Signal { pub fn get_signal(&self) -> impl Signal {
self.state.signal_cloned().dedupe_cloned() self.state.signal_cloned()
} }
pub fn reset_state(&mut self) { pub fn reset_state(&mut self) {
@ -199,6 +195,7 @@ impl Future for Inner {
return Poll::Ready(self.state.get_cloned()); return Poll::Ready(self.state.get_cloned());
} }
// Check if the return token was sent/dropped
if let Some(mut rx) = this.rx.take() { if let Some(mut rx) = this.rx.take() {
match Future::poll(Pin::new(&mut rx), cx) { match Future::poll(Pin::new(&mut rx), cx) {
// Regardless if we were canceled or properly returned, reset. // Regardless if we were canceled or properly returned, reset.

View File

@ -160,8 +160,12 @@ fn maybe(matches: clap::ArgMatches, log: Arc<Logger>) -> Result<(), Error> {
} }
} }
let actor_tasks = actors.into_iter().map(|actor| ex.spawn(actor)); for actor in actors.into_iter() {
let init_tasks = initiators.into_iter().map(|init| ex.spawn(init)); ex.spawn(actor).detach();
}
for init in initiators.into_iter() {
ex.spawn(init).detach();
}
let (signal, shutdown) = async_channel::bounded::<()>(1); let (signal, shutdown) = async_channel::bounded::<()>(1);
easy_parallel::Parallel::new() easy_parallel::Parallel::new()