diff --git a/src/api/machine.rs b/src/api/machine.rs index da6485d..f0bc8f9 100644 --- a/src/api/machine.rs +++ b/src/api/machine.rs @@ -40,17 +40,17 @@ impl Machine { if let Some(state) = self.db.machine.get_state(&self.id) { match state.state { Status::Free => builder.set_state(State::Free), - Status::InUse(_u) => { + Status::InUse(_u, _p) => { builder.set_state(State::InUse); } - Status::ToCheck(_u) => { + Status::ToCheck(_u, _p) => { builder.set_state(State::ToCheck); } - Status::Blocked(_u) => { + Status::Blocked(_u, _p) => { builder.set_state(State::Blocked); } Status::Disabled => builder.set_state(State::Disabled), - Status::Reserved(_u) => { + Status::Reserved(_u, _p) => { builder.set_state(State::Reserved); } } diff --git a/src/db/machine.rs b/src/db/machine.rs index 28a2e37..c331de1 100644 --- a/src/db/machine.rs +++ b/src/db/machine.rs @@ -37,6 +37,7 @@ pub mod internal; use internal::Internal; pub type MachineIdentifier = Uuid; +pub type Priority = u64; /// Status of a Machine #[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] @@ -44,15 +45,15 @@ pub enum Status { /// Not currently used by anybody Free, /// Used by somebody - InUse(UserId), + InUse(UserId, Priority), /// Was used by somebody and now needs to be checked for cleanliness - ToCheck(UserId), + ToCheck(UserId, Priority), /// Not used by anybody but also can not be used. E.g. down for maintenance - Blocked(UserId), + Blocked(UserId, Priority), /// Disabled for some other reason Disabled, /// Reserved - Reserved(UserId), + Reserved(UserId, Priority), } pub fn uuid_from_api(uuid: crate::schema::api_capnp::u_u_i_d::Reader) -> Uuid { @@ -75,6 +76,24 @@ pub struct MachineState { pub state: Status, } +impl MachineState { + /// Check if the given priority is higher than one's own. + /// + /// If `self` does not have a priority then this function always returns `true` + pub fn is_higher_priority(&self, priority: u64) -> bool { + match self.state { + Status::Disabled | Status::Free => { true }, + Status::Blocked(_, self_prio) | + Status::InUse(_, self_prio) | + Status::ToCheck(_, self_prio) | + Status::Reserved(_, self_prio) => + { + priority > self_prio + } + } + } +} + pub fn init(log: Logger, config: &Settings, env: Arc) -> Result { let mut machine_descriptions = MachineDescription::load_file(&config.machines)?; let mut flags = lmdb::DatabaseFlags::empty(); diff --git a/src/db/user.rs b/src/db/user.rs index 5f213d5..41519c5 100644 --- a/src/db/user.rs +++ b/src/db/user.rs @@ -66,11 +66,25 @@ pub struct UserData { /// Persons are only ever given roles, not permissions directly pub roles: Vec, + #[serde(skip_serializing_if = "is_zero")] + #[serde(default = "default_priority")] + /// A priority number, defaulting to 0. + /// + /// The higher, the higher the priority. Higher priority users overwrite lower priority ones. + pub priority: u64, + /// Additional data storage #[serde(flatten)] kv: HashMap, Box<[u8]>>, } +fn is_zero(i: &u64) -> bool { + *i == 0 +} +const fn default_priority() -> u64 { + 0 +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/error.rs b/src/error.rs index c497e72..93d7fd9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -24,7 +24,8 @@ pub enum Error { MQTT(mqtt::Error), Config(config::ConfigError), BadVersion((u32,u32)), - Argon2(argon2::Error) + Argon2(argon2::Error), + Denied, } impl fmt::Display for Error { @@ -72,6 +73,9 @@ impl fmt::Display for Error { Error::BadVersion((major,minor)) => { write!(f, "Peer uses API version {}.{} which is incompatible!", major, minor) } + Error::Denied => { + write!(f, "You do not have the permission required to do that.") + } } } } diff --git a/src/machine.rs b/src/machine.rs index 85f03b9..b7f4d3e 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -1,4 +1,8 @@ use std::path::Path; +use std::task::{Poll, Context}; +use std::pin::Pin; +use std::future::Future; + use std::collections::HashMap; use std::fs; @@ -10,7 +14,7 @@ use futures_signals::signal::Mutable; use uuid::Uuid; -use crate::error::Result; +use crate::error::{Result, Error}; use crate::db::access; use crate::db::machine::{MachineIdentifier, Status, MachineState}; @@ -34,14 +38,21 @@ pub struct Machine { /// This is a Signal generator. Subscribers to this signal will be notified of changes. In the /// case of an actor it should then make sure that the real world matches up with the set state state: Mutable, + reset: Option, + rx: Option>, + + access: access::AccessControl, } impl Machine { - pub fn new(id: MachineIdentifier, desc: MachineDescription, perm: access::PermIdentifier) -> Machine { + pub fn new(id: MachineIdentifier, desc: MachineDescription, access: access::AccessControl, state: MachineState) -> Machine { Machine { id: id, desc: desc, - state: Mutable::new(MachineState { state: Status::Free}), + state: Mutable::new(state), + reset: None, + rx: None, + access: access, } } @@ -57,27 +68,68 @@ impl Machine { Box::pin(self.state.signal_cloned().dedupe_cloned()) } - /// Requests to use a machine. Returns `true` if successful. + /// Requests to use a machine. Returns a return token if successful. /// /// This will update the internal state of the machine, notifying connected actors, if any. - pub async fn request_use - ( &mut self - , access: access::AccessControl - , who: &User - ) -> Result + /// The return token is a channel that considers the machine 'returned' if anything is sent + /// along it or if the sending end gets dropped. Anybody who holds this token needs to check if + /// the receiving end was canceled which indicates that the machine has been taken off their + /// hands. + pub async fn request_state_change(&mut self, who: &User, new_state: MachineState) + -> Result { - // TODO: Check different levels - if access.check(&who.data, &self.desc.privs.write).await? { - self.state.set(MachineState { state: Status::InUse(who.id.clone()) }); - return Ok(true); - } else { - return Ok(false); + if self.access.check(&who.data, &self.desc.privs.write).await? { + if self.state.lock_ref().is_higher_priority(who.data.priority) { + let (tx, rx) = futures::channel::oneshot::channel(); + let old_state = self.state.replace(new_state); + self.reset.replace(old_state); + // Also this drops the old receiver, which will signal to the initiator that the + // machine has been taken off their hands. + self.rx.replace(rx); + return Ok(tx); + } } + + return Err(Error::Denied); } pub fn set_state(&mut self, state: Status) { self.state.set(MachineState { state }) } + + pub fn get_signal(&self) -> impl Signal { + self.state.signal_cloned().dedupe_cloned() + } + + pub fn reset_state(&mut self) { + if let Some(state) = self.reset.take() { + self.state.replace(state); + } + } +} + +type ReturnToken = futures::channel::oneshot::Sender<()>; + +impl Future for Machine { + type Output = MachineState; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let mut this = &mut *self; + // TODO Return this on exit + if false { + return Poll::Ready(self.state.get_cloned()); + } + + if let Some(mut rx) = this.rx.take() { + match Future::poll(Pin::new(&mut rx), cx) { + // Regardless if we were canceled or properly returned, reset. + Poll::Ready(_) => self.reset_state(), + Poll::Pending => { this.rx.replace(rx); }, + } + } + + Poll::Pending + } } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] diff --git a/src/main.rs b/src/main.rs index 4823d98..368f548 100644 --- a/src/main.rs +++ b/src/main.rs @@ -217,11 +217,12 @@ fn main() -> Result<(), Error> { let pdb = pdb?; let mut ac = db::access::AccessControl::new(); ac.add_source_unchecked("Internal".to_string(), Box::new(pdb)); + let machdb = Arc::new(machdb); let passdb = db::pass::PassDB::init(log.new(o!("system" => "passwords")), env.clone()).unwrap(); let db = db::Databases { access: Arc::new(db::access::AccessControl::new()), - machine: Arc::new(machdb), + machine: machdb.clone(), passdb: Arc::new(passdb), }; @@ -245,7 +246,7 @@ fn main() -> Result<(), Error> { // FIXME: implement notification so the modules can shut down cleanly instead of being killed // without warning. let modlog = log.clone(); - let mut regs = Registries::new(); + let mut regs = Registries::new(machdb.clone()); match exec.run_until(modules::init(modlog.new(o!("system" => "modules")), config.clone(), pool.clone(), regs.clone())) { Ok(()) => {} Err(e) => { diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index 6b40967..47d0224 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -100,7 +100,7 @@ impl Stream for Shelly { info!(unpin.log, "Machine Status changed: {:?}", status); let topic = format!("shellies/{}/relay/0/command", unpin.name); let pl = match status { - Status::InUse(_) => "on", + Status::InUse(_, _) => "on", _ => "off", }; let msg = mqtt::Message::new(topic, pl, 0); diff --git a/src/registries.rs b/src/registries.rs index 0975bab..89ea123 100644 --- a/src/registries.rs +++ b/src/registries.rs @@ -1,8 +1,11 @@ +use std::sync::Arc; + +use crate::db::machine::MachineDB; + mod actuators; mod sensors; pub use actuators::{Actuator, ActBox, StatusSignal}; -pub use sensors::{Sensor, SensBox}; #[derive(Clone)] /// BFFH registries @@ -15,10 +18,10 @@ pub struct Registries { } impl Registries { - pub fn new() -> Self { + pub fn new(db: Arc) -> Self { Registries { actuators: actuators::Actuators::new(), - sensors: sensors::Sensors::new(), + sensors: sensors::Sensors::new(db), } } } diff --git a/src/registries/sensors.rs b/src/registries/sensors.rs index 2b7819e..42fe64d 100644 --- a/src/registries/sensors.rs +++ b/src/registries/sensors.rs @@ -2,6 +2,9 @@ use std::pin::Pin; use futures::task::{Context, Poll}; use futures::{Future, Stream}; use futures::future::BoxFuture; +use futures_signals::signal::Signal; +use crate::db::user::UserId; +use crate::db::machine::MachineDB; use std::sync::Arc; use smol::lock::RwLock; @@ -10,64 +13,19 @@ use std::collections::HashMap; #[derive(Clone)] pub struct Sensors { inner: Arc>, + db: Arc, } impl Sensors { - pub fn new() -> Self { + pub fn new(db: Arc) -> Self { Sensors { inner: Arc::new(RwLock::new(Inner::new())), + db: db, } } } -pub type SensBox = Box; +pub type SensBox = Box + Send + Sync>; type Inner = HashMap; -// Implementing Sensors. -// -// Given the coroutine/task split stays as it is - Sensor input to machine update being one, -// machine update signal to actor doing thing being another, a Sensor implementation would send a -// Stream of futures - each future being an atomic Machine update. -#[async_trait] -/// BFFH Sensor -/// -/// A sensor is anything that can forward an intent of an user to do something to bffh. -/// This may be a card reader connected to a machine, a website allowing users to select a machine -/// they want to use or something like QRHello -pub trait Sensor: Stream> { - /// Setup the Sensor. - /// - /// After this async function completes the Stream implementation should be able to generate - /// futures when polled. - /// Implementations can rely on this function being polled to completeion before the stream - /// is polled. - // TODO Is this sensible vs just having module-specific setup fns? - async fn setup(&mut self); - - /// Shutdown the sensor gracefully - /// - /// Implementations can rely on that the stream will not be polled after this function has been - /// called. - async fn shutdown(&mut self); -} - -struct Dummy; -#[async_trait] -impl Sensor for Dummy { - async fn setup(&mut self) { - return; - } - - async fn shutdown(&mut self) { - return; - } -} - -impl Stream for Dummy { - type Item = BoxFuture<'static, ()>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Poll::Ready(Some(Box::pin(futures::future::ready(())))) - } -}