Start adding a Future ReturnToken

This commit is contained in:
Gregor Reitzenstein 2021-02-14 23:05:03 +00:00
parent 2db596dad0
commit 131546e150
2 changed files with 25 additions and 29 deletions

View File

@ -141,9 +141,10 @@ impl write::give_back::Server for GiveBack {
_results: write::give_back::RetResults) _results: write::give_back::RetResults)
-> Promise<(), Error> -> Promise<(), Error>
{ {
println!("I'm doing my part!");
if let Some(chan) = self.0.take() { if let Some(chan) = self.0.take() {
chan.send(()) // Err here just means machine was taken from us
.expect("Other end of GiveBack token was dropped?!"); let _ = chan.send(());
} }
Promise::ok(()) Promise::ok(())

View File

@ -12,7 +12,9 @@ use std::fs;
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use futures::Stream;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::channel::{mpsc, oneshot};
use futures_signals::signal::Signal; use futures_signals::signal::Signal;
use futures_signals::signal::SignalExt; use futures_signals::signal::SignalExt;
@ -99,7 +101,8 @@ impl Machine {
} else { } else {
if new_state == MachineState::free() { if new_state == MachineState::free() {
let mut guard = this.inner.try_lock().unwrap(); let mut guard = this.inner.try_lock().unwrap();
return guard.do_state_change(new_state); guard.do_state_change(new_state);
return Ok(ReturnToken(self.inner.clone()));
} }
} }
@ -123,6 +126,7 @@ impl Deref for Machine {
} }
} }
#[derive(Debug)] #[derive(Debug)]
/// Internal machine representation /// Internal machine representation
/// ///
@ -142,7 +146,6 @@ pub struct Inner {
/// case of an actor it should then make sure that the real world matches up with the set state /// case of an actor it should then make sure that the real world matches up with the set state
state: Mutable<MachineState>, state: Mutable<MachineState>,
reset: Option<MachineState>, reset: Option<MachineState>,
rx: Option<futures::channel::oneshot::Receiver<()>>,
} }
impl Inner { impl Inner {
@ -156,7 +159,6 @@ impl Inner {
desc: desc, desc: desc,
state: Mutable::new(state), state: Mutable::new(state),
reset: None, reset: None,
rx: None,
} }
} }
@ -172,14 +174,9 @@ impl Inner {
Box::pin(self.state.signal_cloned().dedupe_cloned()) Box::pin(self.state.signal_cloned().dedupe_cloned())
} }
pub fn do_state_change(&mut self, new_state: MachineState) -> Result<ReturnToken> { pub fn do_state_change(&mut self, new_state: MachineState) {
let (tx, rx) = futures::channel::oneshot::channel();
let old_state = self.state.replace(new_state); let old_state = self.state.replace(new_state);
self.reset.replace(old_state); self.reset.replace(old_state);
// Also this drops the old receiver, which will signal to the initiator that the
// machine has been taken off their hands.
self.rx.replace(rx);
return Ok(tx);
} }
pub fn read_state(&self) -> ReadOnlyMutable<MachineState> { pub fn read_state(&self) -> ReadOnlyMutable<MachineState> {
@ -197,28 +194,24 @@ impl Inner {
} }
} }
pub type ReturnToken = futures::channel::oneshot::Sender<()>; //pub type ReturnToken = futures::channel::oneshot::Sender<()>;
pub struct ReturnToken {
inner: Arc<Mutex<Inner>>,
}
impl Future for Inner { impl ReturnToken {
type Output = MachineState; pub fn new(inner: Arc<Mutex<Inner>>) -> Self {
Self { inner }
}
}
impl Future for ReturnToken {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = &mut *self; let mut this = &mut *self;
// TODO Return this on exit
if false {
return Poll::Ready(self.state.get_cloned());
}
// Check if the return token was sent/dropped this.0
if let Some(mut rx) = this.rx.take() {
match Future::poll(Pin::new(&mut rx), cx) {
// Regardless if we were canceled or properly returned, reset.
Poll::Ready(_) => self.reset_state(),
Poll::Pending => { this.rx.replace(rx); },
}
}
Poll::Pending
} }
} }
@ -255,6 +248,8 @@ pub fn load(config: &crate::config::Config, access: Arc<access::AccessControl>)
// TODO: Read state from the state db // TODO: Read state from the state db
(v.name.clone(), Machine::construct(k, v, MachineState::new(), access.clone())) (v.name.clone(), Machine::construct(k, v, MachineState::new(), access.clone()))
}); });
Ok(HashMap::from_iter(it)) Ok(HashMap::from_iter(it))
} }