diff --git a/src/api/machine.rs b/src/api/machine.rs index 6c13820..ab4f013 100644 --- a/src/api/machine.rs +++ b/src/api/machine.rs @@ -141,12 +141,12 @@ impl write::give_back::Server for GiveBack { _results: write::give_back::RetResults) -> Promise<(), Error> { - if let Some(chan) = self.0.take() { - chan.send(()) - .expect("Other end of GiveBack token was dropped?!"); + if let Some(rt) = self.0.take() { + // Err here just means machine was taken from us + Promise::from_future(rt.map(|()| Ok(()))) + } else { + Promise::ok(()) } - - Promise::ok(()) } } diff --git a/src/machine.rs b/src/machine.rs index 3e37787..291a097 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -12,7 +12,9 @@ use std::fs; use serde::{Serialize, Deserialize}; +use futures::Stream; use futures::future::BoxFuture; +use futures::channel::{mpsc, oneshot}; use futures_signals::signal::Signal; use futures_signals::signal::SignalExt; @@ -94,12 +96,14 @@ impl Machine { if let Some(udata) = udata { let mut guard = this.inner.try_lock().unwrap(); if this.access.check(&udata, &guard.desc.privs.write).await? { - return guard.do_state_change(new_state); + guard.do_state_change(new_state); + return Ok(ReturnToken::new(this.inner.clone())) } } else { if new_state == MachineState::free() { let mut guard = this.inner.try_lock().unwrap(); - return guard.do_state_change(new_state); + guard.do_state_change(new_state); + return Ok(ReturnToken::new(this.inner.clone())); } } @@ -123,6 +127,7 @@ impl Deref for Machine { } } + #[derive(Debug)] /// Internal machine representation /// @@ -142,7 +147,6 @@ pub struct Inner { /// case of an actor it should then make sure that the real world matches up with the set state state: Mutable, reset: Option, - rx: Option>, } impl Inner { @@ -156,7 +160,6 @@ impl Inner { desc: desc, state: Mutable::new(state), reset: None, - rx: None, } } @@ -172,14 +175,9 @@ impl Inner { Box::pin(self.state.signal_cloned().dedupe_cloned()) } - pub fn do_state_change(&mut self, new_state: MachineState) -> Result { - let (tx, rx) = futures::channel::oneshot::channel(); + pub fn do_state_change(&mut self, new_state: MachineState) { let old_state = self.state.replace(new_state); self.reset.replace(old_state); - // Also this drops the old receiver, which will signal to the initiator that the - // machine has been taken off their hands. - self.rx.replace(rx); - return Ok(tx); } pub fn read_state(&self) -> ReadOnlyMutable { @@ -197,28 +195,36 @@ impl Inner { } } -pub type ReturnToken = futures::channel::oneshot::Sender<()>; +//pub type ReturnToken = futures::channel::oneshot::Sender<()>; +pub struct ReturnToken { + f: Option>, +} -impl Future for Inner { - type Output = MachineState; +impl ReturnToken { + pub fn new(inner: Arc>) -> Self { + let f = async move { + let mut guard = inner.lock().await; + guard.reset_state(); + }; + + Self { f: Some(Box::pin(f)) } + } +} + +impl Future for ReturnToken { + type Output = (); // FIXME: This should probably be a Result<(), Error> fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = &mut *self; - // TODO Return this on exit - if false { - return Poll::Ready(self.state.get_cloned()); - } + let mut this = &mut *self; - // Check if the return token was sent/dropped - if let Some(mut rx) = this.rx.take() { - match Future::poll(Pin::new(&mut rx), cx) { - // Regardless if we were canceled or properly returned, reset. - Poll::Ready(_) => self.reset_state(), - Poll::Pending => { this.rx.replace(rx); }, + match this.f.as_mut().map(|f| Future::poll(Pin::new(f), cx)) { + None => Poll::Ready(()), // TODO: Is it saner to return Pending here? This can only happen after the future completed + Some(Poll::Pending) => Poll::Pending, + Some(Poll::Ready(())) => { + let _ = this.f.take(); // Remove the future to not poll after completion + Poll::Ready(()) } } - - Poll::Pending } } @@ -255,6 +261,8 @@ pub fn load(config: &crate::config::Config, access: Arc) // TODO: Read state from the state db (v.name.clone(), Machine::construct(k, v, MachineState::new(), access.clone())) }); + + Ok(HashMap::from_iter(it)) }