From 131546e1501659547bc0804169c9eaec8face4d6 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Sun, 14 Feb 2021 23:05:03 +0000 Subject: [PATCH 1/2] Start adding a Future ReturnToken --- src/api/machine.rs | 5 +++-- src/machine.rs | 49 +++++++++++++++++++++------------------------- 2 files changed, 25 insertions(+), 29 deletions(-) diff --git a/src/api/machine.rs b/src/api/machine.rs index 6c13820..9894a29 100644 --- a/src/api/machine.rs +++ b/src/api/machine.rs @@ -141,9 +141,10 @@ impl write::give_back::Server for GiveBack { _results: write::give_back::RetResults) -> Promise<(), Error> { + println!("I'm doing my part!"); if let Some(chan) = self.0.take() { - chan.send(()) - .expect("Other end of GiveBack token was dropped?!"); + // Err here just means machine was taken from us + let _ = chan.send(()); } Promise::ok(()) diff --git a/src/machine.rs b/src/machine.rs index 3e37787..da9ccf9 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -12,7 +12,9 @@ use std::fs; use serde::{Serialize, Deserialize}; +use futures::Stream; use futures::future::BoxFuture; +use futures::channel::{mpsc, oneshot}; use futures_signals::signal::Signal; use futures_signals::signal::SignalExt; @@ -99,7 +101,8 @@ impl Machine { } else { if new_state == MachineState::free() { let mut guard = this.inner.try_lock().unwrap(); - return guard.do_state_change(new_state); + guard.do_state_change(new_state); + return Ok(ReturnToken(self.inner.clone())); } } @@ -123,6 +126,7 @@ impl Deref for Machine { } } + #[derive(Debug)] /// Internal machine representation /// @@ -142,7 +146,6 @@ pub struct Inner { /// case of an actor it should then make sure that the real world matches up with the set state state: Mutable, reset: Option, - rx: Option>, } impl Inner { @@ -156,7 +159,6 @@ impl Inner { desc: desc, state: Mutable::new(state), reset: None, - rx: None, } } @@ -172,14 +174,9 @@ impl Inner { Box::pin(self.state.signal_cloned().dedupe_cloned()) } - pub fn do_state_change(&mut self, new_state: MachineState) -> Result { - let (tx, rx) = futures::channel::oneshot::channel(); + pub fn do_state_change(&mut self, new_state: MachineState) { let old_state = self.state.replace(new_state); self.reset.replace(old_state); - // Also this drops the old receiver, which will signal to the initiator that the - // machine has been taken off their hands. - self.rx.replace(rx); - return Ok(tx); } pub fn read_state(&self) -> ReadOnlyMutable { @@ -197,28 +194,24 @@ impl Inner { } } -pub type ReturnToken = futures::channel::oneshot::Sender<()>; +//pub type ReturnToken = futures::channel::oneshot::Sender<()>; +pub struct ReturnToken { + inner: Arc>, +} -impl Future for Inner { - type Output = MachineState; +impl ReturnToken { + pub fn new(inner: Arc>) -> Self { + Self { inner } + } +} + +impl Future for ReturnToken { + type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = &mut *self; - // TODO Return this on exit - if false { - return Poll::Ready(self.state.get_cloned()); - } + let mut this = &mut *self; - // Check if the return token was sent/dropped - if let Some(mut rx) = this.rx.take() { - match Future::poll(Pin::new(&mut rx), cx) { - // Regardless if we were canceled or properly returned, reset. - Poll::Ready(_) => self.reset_state(), - Poll::Pending => { this.rx.replace(rx); }, - } - } - - Poll::Pending + this.0 } } @@ -255,6 +248,8 @@ pub fn load(config: &crate::config::Config, access: Arc) // TODO: Read state from the state db (v.name.clone(), Machine::construct(k, v, MachineState::new(), access.clone())) }); + + Ok(HashMap::from_iter(it)) } From 1a241f1f7da2cdadf33efd4a27232a0081d1d2c2 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Mon, 22 Feb 2021 17:25:04 +0100 Subject: [PATCH 2/2] Adds working GiveBack --- src/api/machine.rs | 9 ++++----- src/machine.rs | 25 +++++++++++++++++++------ 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/src/api/machine.rs b/src/api/machine.rs index 9894a29..ab4f013 100644 --- a/src/api/machine.rs +++ b/src/api/machine.rs @@ -141,13 +141,12 @@ impl write::give_back::Server for GiveBack { _results: write::give_back::RetResults) -> Promise<(), Error> { - println!("I'm doing my part!"); - if let Some(chan) = self.0.take() { + if let Some(rt) = self.0.take() { // Err here just means machine was taken from us - let _ = chan.send(()); + Promise::from_future(rt.map(|()| Ok(()))) + } else { + Promise::ok(()) } - - Promise::ok(()) } } diff --git a/src/machine.rs b/src/machine.rs index da9ccf9..291a097 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -96,13 +96,14 @@ impl Machine { if let Some(udata) = udata { let mut guard = this.inner.try_lock().unwrap(); if this.access.check(&udata, &guard.desc.privs.write).await? { - return guard.do_state_change(new_state); + guard.do_state_change(new_state); + return Ok(ReturnToken::new(this.inner.clone())) } } else { if new_state == MachineState::free() { let mut guard = this.inner.try_lock().unwrap(); guard.do_state_change(new_state); - return Ok(ReturnToken(self.inner.clone())); + return Ok(ReturnToken::new(this.inner.clone())); } } @@ -196,22 +197,34 @@ impl Inner { //pub type ReturnToken = futures::channel::oneshot::Sender<()>; pub struct ReturnToken { - inner: Arc>, + f: Option>, } impl ReturnToken { pub fn new(inner: Arc>) -> Self { - Self { inner } + let f = async move { + let mut guard = inner.lock().await; + guard.reset_state(); + }; + + Self { f: Some(Box::pin(f)) } } } impl Future for ReturnToken { - type Output = (); + type Output = (); // FIXME: This should probably be a Result<(), Error> fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { let mut this = &mut *self; - this.0 + match this.f.as_mut().map(|f| Future::poll(Pin::new(f), cx)) { + None => Poll::Ready(()), // TODO: Is it saner to return Pending here? This can only happen after the future completed + Some(Poll::Pending) => Poll::Pending, + Some(Poll::Ready(())) => { + let _ = this.f.take(); // Remove the future to not poll after completion + Poll::Ready(()) + } + } } }