diff --git a/src/api/machine.rs b/src/api/machine.rs index 983abe3..2199a9d 100644 --- a/src/api/machine.rs +++ b/src/api/machine.rs @@ -4,6 +4,8 @@ use std::ops::Deref; use capnp::capability::Promise; use capnp::Error; +use futures::FutureExt; + use crate::schema::api_capnp::State; use crate::schema::api_capnp::machine::*; use crate::connection::Session; @@ -85,13 +87,16 @@ impl write::Server for Write { { let uid = self.0.session.user.as_ref().map(|u| u.id.clone()); let new_state = MachineState::used(uid.clone()); - if let Ok(tok) = self.0.machine.request_state_change(self.0.session.user.as_ref(), new_state) { - info!(self.0.session.log, "yay"); - } else { - info!(self.0.session.log, "nay"); - } + let this = self.0.clone(); + let f = this.machine.request_state_change(this.session.user.as_ref(), new_state) + .map(|res_token| match res_token { + Ok(tok) => { + return Ok(()); + }, + Err(e) => Err(capnp::Error::failed("State change request returned an err".to_string())), + }); - Promise::ok(()) + Promise::from_future(f) } fn reserve(&mut self, diff --git a/src/initiator.rs b/src/initiator.rs index 1e6264d..729d5ae 100644 --- a/src/initiator.rs +++ b/src/initiator.rs @@ -34,6 +34,8 @@ pub struct Initiator { signal: MutableSignalCloned>, machine: Option, future: Option, MachineState)>>, + // TODO: Prepare the init for async state change requests. + state_change_fut: Option>>, token: Option, sensor: BoxSensor, } @@ -44,6 +46,7 @@ impl Initiator { signal: signal, machine: None, future: None, + state_change_fut: None, token: None, sensor: sensor, } @@ -73,14 +76,43 @@ impl Future for Initiator { // Do as much work as we can: loop { + // Always poll the state change future first + if let Some(ref mut f) = this.state_change_fut { + print!("Polling state change fut ..."); + match Future::poll(Pin::new(f), cx) { + // If there is a state change future and it would block we return early + Poll::Pending => { + println!(" blocked"); + return Poll::Pending; + }, + Poll::Ready(Ok(tok)) => { + println!(" returned ok"); + // Explicity drop the future + let _ = this.state_change_fut.take(); + + // Store the given return token for future use + this.token.replace(tok); + } + Poll::Ready(Err(e)) => { + println!(" returned err: {:?}", e); + // Explicity drop the future + let _ = this.state_change_fut.take(); + } + } + } + // If there is a future, poll it match this.future.as_mut().map(|future| Future::poll(Pin::new(future), cx)) { None => { this.future = Some(this.sensor.run_sensor()); }, Some(Poll::Ready((user, state))) => { + println!("New sensor fut"); this.future.take(); - this.machine.as_mut().map(|machine| machine.request_state_change(user.as_ref(), state).unwrap()); + let f = this.machine.as_mut().map(|machine| { + machine.request_state_change(user.as_ref(), state) + }); + this.state_change_fut = f; } Some(Poll::Pending) => return Poll::Pending, } diff --git a/src/machine.rs b/src/machine.rs index a65d208..e0a3534 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -12,6 +12,8 @@ use std::fs; use serde::{Serialize, Deserialize}; +use futures::future::BoxFuture; + use futures_signals::signal::Signal; use futures_signals::signal::SignalExt; use futures_signals::signal::{Mutable, ReadOnlyMutable}; @@ -22,7 +24,7 @@ use crate::error::{Result, Error}; use crate::db::access; use crate::db::machine::{MachineIdentifier, Status, MachineState}; -use crate::db::user::User; +use crate::db::user::{User, UserData}; use crate::network::MachineMap; @@ -49,39 +51,68 @@ impl Index { #[derive(Debug, Clone)] pub struct Machine { - inner: Arc> + inner: Arc>, + access: Arc, } impl Machine { - pub fn new(inner: Inner) -> Self { - Self { inner: Arc::new(Mutex::new(inner)) } + pub fn new(inner: Inner, access: Arc) -> Self { + Self { inner: Arc::new(Mutex::new(inner)), access: access } } pub fn construct ( id: MachineIdentifier , desc: MachineDescription , state: MachineState + , access: Arc ) -> Machine { - Self::new(Inner::new(id, desc, state)) + Self::new(Inner::new(id, desc, state), access) } - pub fn from_file>(path: P) -> Result> { + pub fn from_file>(path: P, access: Arc) + -> Result> + { let mut map: HashMap = MachineDescription::load_file(path)?; Ok(map.drain().map(|(id, desc)| { - Self::construct(id, desc, MachineState::new()) + Self::construct(id, desc, MachineState::new(), access.clone()) }).collect()) } + /// Requests to use a machine. Returns a return token if successful. + /// + /// This will update the internal state of the machine, notifying connected actors, if any. + /// The return token is a channel that considers the machine 'returned' if anything is sent + /// along it or if the sending end gets dropped. Anybody who holds this token needs to check if + /// the receiving end was canceled which indicates that the machine has been taken off their + /// hands. pub fn request_state_change(&self, who: Option<&User>, new_state: MachineState) - -> Result + -> BoxFuture<'static, Result> { - let mut guard = self.inner.try_lock().unwrap(); - guard.request_state_change(who, new_state) + let this = self.clone(); + let udata: Option = who.map(|u| u.data.clone()); + + let f = async move { + if let Some(udata) = udata { + let mut guard = this.inner.try_lock().unwrap(); + if this.access.check(&udata, &guard.desc.privs.write).await? { + return guard.do_state_change(new_state); + } + } else { + if new_state == MachineState::free() { + let mut guard = this.inner.try_lock().unwrap(); + return guard.do_state_change(new_state); + } + } + + return Err(Error::Denied); + }; + + Box::pin(f) } pub fn signal(&self) -> impl Signal { - let mut guard = self.inner.try_lock().unwrap(); + let guard = self.inner.try_lock().unwrap(); guard.signal() } } @@ -117,7 +148,11 @@ pub struct Inner { } impl Inner { - pub fn new(id: MachineIdentifier, desc: MachineDescription, state: MachineState) -> Inner { + pub fn new ( id: MachineIdentifier + , desc: MachineDescription + , state: MachineState + ) -> Inner + { Inner { id: id, desc: desc, @@ -139,29 +174,7 @@ impl Inner { Box::pin(self.state.signal_cloned().dedupe_cloned()) } - /// Requests to use a machine. Returns a return token if successful. - /// - /// This will update the internal state of the machine, notifying connected actors, if any. - /// The return token is a channel that considers the machine 'returned' if anything is sent - /// along it or if the sending end gets dropped. Anybody who holds this token needs to check if - /// the receiving end was canceled which indicates that the machine has been taken off their - /// hands. - pub fn request_state_change(&mut self, who: Option<&User>, new_state: MachineState) - -> Result - { - if who.is_none() { - if new_state.state == Status::Free { - return self.do_state_change(new_state); - } - } else { - // TODO: Correctly check permissions here - return self.do_state_change(new_state); - } - - return Err(Error::Denied); - } - - fn do_state_change(&mut self, new_state: MachineState) -> Result { + pub fn do_state_change(&mut self, new_state: MachineState) -> Result { let (tx, rx) = futures::channel::oneshot::channel(); let old_state = self.state.replace(new_state); self.reset.replace(old_state); @@ -234,13 +247,15 @@ impl MachineDescription { } } -pub fn load(config: &crate::config::Settings) -> Result { +pub fn load(config: &crate::config::Settings, access: Arc) + -> Result +{ let mut map = config.machines.clone(); let it = map.drain() .map(|(k,v)| { // TODO: Read state from the state db - (v.name.clone(), Machine::construct(k, v, MachineState::new())) + (v.name.clone(), Machine::construct(k, v, MachineState::new(), access.clone())) }); Ok(HashMap::from_iter(it)) } diff --git a/src/main.rs b/src/main.rs index 15f4991..5773601 100644 --- a/src/main.rs +++ b/src/main.rs @@ -153,13 +153,14 @@ fn maybe(matches: clap::ArgMatches, log: Arc) -> Result<(), Error> { Ok(()) } else { let ex = Executor::new(); + let db = db::Databases::new(&log, &config)?; let mqtt = AsyncClient::new(config.mqtt_url.clone())?; let tok = mqtt.connect(paho_mqtt::ConnectOptions::new()); smol::block_on(tok)?; - let machines = machine::load(&config)?; + let machines = machine::load(&config, db.access.clone())?; let (mut actor_map, actors) = actor::load(&log, &mqtt, &config)?; let (mut init_map, initiators) = initiator::load(&log, &mqtt, &config)?; @@ -189,7 +190,6 @@ fn maybe(matches: clap::ArgMatches, log: Arc) -> Result<(), Error> { let (_, r) = easy_parallel::Parallel::new() .each(0..4, |_| smol::block_on(ex.run(shutdown.recv()))) .finish(|| { - let db = db::Databases::new(&log, &config)?; // TODO: Spawn api connections on their own (non-main) thread, use the main thread to // handle signals (a cli if stdin is not closed?) and make it stop and clean up all threads // when bffh should exit