diff --git a/Cargo.lock b/Cargo.lock index e630afb..6c27c8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -424,6 +424,7 @@ dependencies = [ "futures-signals", "futures-test", "futures-util", + "genawaiter", "glob", "lazy_static", "libc", @@ -742,6 +743,36 @@ dependencies = [ "slab", ] +[[package]] +name = "genawaiter" +version = "0.99.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c86bd0361bcbde39b13475e6e36cb24c329964aa2611be285289d1e4b751c1a0" +dependencies = [ + "genawaiter-macro", + "genawaiter-proc-macro", + "proc-macro-hack", +] + +[[package]] +name = "genawaiter-macro" +version = "0.99.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b32dfe1fdfc0bbde1f22a5da25355514b5e450c33a6af6770884c8750aedfbc" + +[[package]] +name = "genawaiter-proc-macro" +version = "0.99.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "784f84eebc366e15251c4a8c3acee82a6a6f427949776ecb88377362a9621738" +dependencies = [ + "proc-macro-error", + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "getrandom" version = "0.1.15" @@ -1058,6 +1089,32 @@ dependencies = [ "toml", ] +[[package]] +name = "proc-macro-error" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "syn-mid", + "version_check", +] + [[package]] name = "proc-macro-hack" version = "0.5.19" @@ -1327,6 +1384,17 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "syn-mid" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c42823f0ff906a3eb8109610e825221b07fb1456d45c7d01cf18cb581b23ecfb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "take_mut" version = "0.2.2" diff --git a/Cargo.toml b/Cargo.toml index 31d3abf..964cbf3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ rand = "0.7" async-channel = "1.5" easy-parallel = "3.1" +genawaiter = "0.99" [build-dependencies] capnpc = "0.13" diff --git a/src/db/user.rs b/src/db/user.rs index 6962ffd..fd78cb3 100644 --- a/src/db/user.rs +++ b/src/db/user.rs @@ -17,6 +17,12 @@ pub struct User { pub data: UserData, } +impl User { + pub fn new(id: UserId, data: UserData) -> Self { + Self { id, data } + } +} + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] /// Authorization Identity /// @@ -78,6 +84,16 @@ pub struct UserData { kv: HashMap, Box<[u8]>>, } +impl UserData { + pub fn new(roles: Vec, priority: u64) -> Self { + Self { + roles: roles, + priority: priority, + kv: HashMap::new(), + } + } +} + fn is_zero(i: &u64) -> bool { *i == 0 } diff --git a/src/initiator.rs b/src/initiator.rs index 5b64bb2..c4cd16b 100644 --- a/src/initiator.rs +++ b/src/initiator.rs @@ -1,21 +1,87 @@ +use std::pin::Pin; +use std::task::{Poll, Context}; use std::future::Future; -use smol::Task; +use smol::{Task, Timer}; -use futures_signals::signal::Signal; -use crate::machine::Machine; +use futures::FutureExt; +use futures::future::BoxFuture; + +use genawaiter::{sync::{Gen, GenBoxed, Co}, GeneratorState}; + +use futures_signals::signal::{Signal, MutableSignalCloned}; +use crate::machine::{Machine, ReturnToken}; +use crate::db::machine::MachineState; +use crate::db::user::{User, UserId, UserData}; use crate::error::Result; -pub struct Initiator { - machine: Box + Send>, +pub struct Initiator<'a> { + signal: MutableSignalCloned>, + machine: Option, + future: Option, MachineState)>>, + token: Option, + step: bool, } -impl Initiator { - pub fn run(self) -> impl Future { - futures::future::pending() +async fn producer(step: bool) -> (Option, MachineState) { + Timer::after(std::time::Duration::from_secs(1)).await; + if step { + return (None, MachineState::free()); + } else { + let user = User::new( + UserId::new("test".to_string(), None, None), + UserData::new(vec![], 0), + ); + let p = user.data.priority; + let id = user.id.clone(); + return (Some(user), MachineState::used(id, p)); } } -pub fn load(config: &crate::config::Settings) -> Result> { +impl<'a> Initiator<'a> { + pub fn new(signal: MutableSignalCloned>) -> Self { + Self { + signal: signal, + machine: None, + future: None, + token: None, + step: false, + } + } +} + +impl<'a> Future for Initiator<'a> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let mut this = &mut *self; + + // First of course, see what machine we should work with. + match Signal::poll_change(Pin::new(&mut this.signal), cx) { + Poll::Pending => { } + Poll::Ready(None) => return Poll::Ready(()), + // Keep in mind this is actually an Option + Poll::Ready(Some(machine)) => this.machine = machine, + } + + // Do as much work as we can: + loop { + // If there is a future, poll it + match this.future.as_mut().map(|future| Future::poll(Pin::new(future), cx)) { + None => { + this.future = Some(Box::pin(producer(this.step))); + this.step = !this.step; + }, + Some(Poll::Ready((user, state))) => { + this.future.take(); + this.machine.as_mut().map(|machine| machine.request_state_change(user.as_ref(), state)); + } + Some(Poll::Pending) => return Poll::Pending, + } + } + } +} + +pub fn load<'a>() -> Result> { unimplemented!() } diff --git a/src/machine.rs b/src/machine.rs index 2ed0f0f..1527b49 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -69,6 +69,13 @@ impl Machine { Self::construct(id, desc, MachineState::new()) }).collect()) } + + pub fn request_state_change(&self, who: Option<&User>, new_state: MachineState) + -> Result + { + let mut guard = self.inner.try_lock().unwrap(); + guard.request_state_change(who, new_state) + } } impl Deref for Machine { @@ -131,10 +138,23 @@ impl Inner { /// along it or if the sending end gets dropped. Anybody who holds this token needs to check if /// the receiving end was canceled which indicates that the machine has been taken off their /// hands. - pub async fn request_state_change(&mut self, who: &User, new_state: MachineState) + pub fn request_state_change(&mut self, who: Option<&User>, new_state: MachineState) -> Result { - if self.state.lock_ref().is_higher_priority(who.data.priority) { + if who.is_none() { + if new_state.state == Status::Free { + return self.do_state_change(new_state); + } + } else { + if self.state.lock_ref().is_higher_priority(who.unwrap().data.priority) { + return self.do_state_change(new_state); + } + } + + return Err(Error::Denied); + } + + fn do_state_change(&mut self, new_state: MachineState) -> Result { let (tx, rx) = futures::channel::oneshot::channel(); let old_state = self.state.replace(new_state); self.reset.replace(old_state); @@ -142,9 +162,6 @@ impl Inner { // machine has been taken off their hands. self.rx.replace(rx); return Ok(tx); - } - - return Err(Error::Denied); } pub fn set_state(&mut self, state: Status) { @@ -162,7 +179,7 @@ impl Inner { } } -type ReturnToken = futures::channel::oneshot::Sender<()>; +pub type ReturnToken = futures::channel::oneshot::Sender<()>; impl Future for Inner { type Output = MachineState;