From 65830af01da8de1292cbbbf288ad68b13b02a30a Mon Sep 17 00:00:00 2001 From: Nadja Reitzenstein Date: Thu, 30 Sep 2021 10:07:42 +0200 Subject: [PATCH] Stuff --- src/db/access.rs | 15 ++++++++++ src/initiator.rs | 38 ++++++++++++------------ src/machine.rs | 70 ++++++++++++++++++++++++++++++++++++++++++-- src/main.rs | 4 ++- src/modules/batch.rs | 50 +++++++++++++++++-------------- src/resource.rs | 25 ++++++++++++++++ 6 files changed, 157 insertions(+), 45 deletions(-) create mode 100644 src/resource.rs diff --git a/src/db/access.rs b/src/db/access.rs index 7ab54c5..8106f68 100644 --- a/src/db/access.rs +++ b/src/db/access.rs @@ -49,6 +49,21 @@ impl AccessControl { } } + pub fn check>(&self, user: &UserData, perm: P) -> Result { + let mut roles = HashMap::new(); + // Check all user roles by.. + Ok(user.roles.iter().any(|role| { + // 1. Getting the whole tree down to a list of Roles applied + self.internal.tally_role(&mut roles, role)?; + + // 2. Checking if any of the roles the user has give any permission granting the + // requested one. + roles.drain().any(|(rid, role)| { + role.permissions.iter().any(|rule| rule.match_perm(perm)) + }) + })) + } + pub fn collect_permrules(&self, user: &UserData) -> Result> { self.internal.collect_permrules(user) } diff --git a/src/initiator.rs b/src/initiator.rs index f9d9bf6..e6bc689 100644 --- a/src/initiator.rs +++ b/src/initiator.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::pin::Pin; use std::task::{Poll, Context}; use std::future::Future; @@ -7,14 +8,13 @@ use smol::Timer; use slog::Logger; -use paho_mqtt::AsyncClient; - use futures::future::BoxFuture; use futures_signals::signal::{Signal, Mutable, MutableSignalCloned}; use crate::machine::{Machine, ReturnToken}; use crate::db::machine::MachineState; -use crate::db::user::{User, UserId, UserData}; +use crate::db::user::{User, UserId, UserData, Internal as UserDB}; +use crate::db::access::AccessControl; use crate::network::InitMap; @@ -22,7 +22,7 @@ use crate::error::Result; use crate::config::Config; pub trait Sensor { - fn run_sensor(&mut self) -> BoxFuture<'static, (Option, MachineState)>; + fn run_sensor(&mut self) -> BoxFuture<'static, (Option, MachineState)>; } type BoxSensor = Box; @@ -36,10 +36,13 @@ pub struct Initiator { state_change_fut: Option>>, token: Option, sensor: BoxSensor, + + userdb: UserDB, + access: AccessControl, } impl Initiator { - pub fn new(log: Logger, sensor: BoxSensor, signal: MutableSignalCloned>) -> Self { + pub fn new(log: Logger, sensor: BoxSensor, signal: MutableSignalCloned>, userdb: UserDB, access: AccessControl) -> Self { Self { log: log, signal: signal, @@ -48,14 +51,16 @@ impl Initiator { state_change_fut: None, token: None, sensor: sensor, + userdb, + access, } } - pub fn wrap(log: Logger, sensor: BoxSensor) -> (Mutable>, Self) { + pub fn wrap(log: Logger, sensor: BoxSensor, userdb: UserDB, access: Arc) -> (Mutable>, Self) { let m = Mutable::new(None); let s = m.signal_cloned(); - (m, Self::new(log, sensor, s)) + (m, Self::new(log, sensor, s, userdb, access)) } } @@ -113,12 +118,11 @@ impl Future for Initiator { None => { this.future = Some(this.sensor.run_sensor()); }, - Some(Poll::Ready((user, state))) => { + Some(Poll::Ready((uid, state))) => { debug!(this.log, "Sensor returned a new state"); this.future.take(); let f = this.machine.as_mut().map(|machine| { - unimplemented!() - //machine.request_state_change(user.as_ref(), state) + machine.request_state_change(state, this.access.clone(), user) }); this.state_change_fut = f; } @@ -128,7 +132,7 @@ impl Future for Initiator { } } -pub fn load(log: &Logger, config: &Config) -> Result<(InitMap, Vec)> { +pub fn load(log: &Logger, config: &Config, userdb: UserDB, access: Arc) -> Result<(InitMap, Vec)> { let mut map = HashMap::new(); let initiators = config.initiators.iter() @@ -140,7 +144,7 @@ pub fn load(log: &Logger, config: &Config) -> Result<(InitMap, Vec)> let mut v = Vec::new(); for (name, initiator) in initiators { - let (m, i) = Initiator::wrap(log.new(o!("name" => name.clone())), initiator); + let (m, i) = Initiator::wrap(log.new(o!("name" => name.clone())), initiator, userdb.clone(), access.clone()); map.insert(name.clone(), m); v.push(i); } @@ -180,7 +184,7 @@ impl Dummy { impl Sensor for Dummy { fn run_sensor(&mut self) - -> BoxFuture<'static, (Option, MachineState)> + -> BoxFuture<'static, (Option, MachineState)> { let step = self.step; self.step = !step; @@ -192,12 +196,8 @@ impl Sensor for Dummy { if step { return (None, MachineState::free()); } else { - let user = User::new( - UserId::new("test".to_string(), None, None), - UserData::new(vec![crate::db::access::RoleIdentifier::local_from_str("lmdb".to_string(), "testrole".to_string())], 0), - ); - let id = user.id.clone(); - return (Some(user), MachineState::used(Some(id))); + let user = UserId::new("test".to_string(), None, None); + return (Some(user), MachineState::used(Some(user))); } }; diff --git a/src/machine.rs b/src/machine.rs index e585e4e..d7fe156 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -22,9 +22,9 @@ use futures_signals::signal::{Mutable, ReadOnlyMutable}; use crate::error::{Result, Error}; -use crate::db::access; +use crate::db::access::{AccessControl, PrivilegesBuf, PermissionBuf}; use crate::db::machine::{MachineIdentifier, MachineState, Status}; -use crate::db::user::{User, UserData}; +use crate::db::user::{User, UserData, UserId}; use crate::network::MachineMap; use crate::space; @@ -82,6 +82,52 @@ impl Machine { Self::new(Inner::new(id, state), desc) } + fn match_perm(&self, status: &Status) -> Option<&PermissionBuf> { + let p = self.desc.privs; + match status { + // If you were allowed to use it you're allowed to give it back + Status::Free + | Status::ToCheck(_) + => None, + + Status::Blocked(_) + | Status::Disabled + | Status::Reserved(_) + => Some(&p.manage), + + Status::InUse(_) => Some(&p.write), + } + } + + pub fn request_state_change(&self, new_state: MachineState, access: AccessControl, user: &User) + -> BoxFuture<'static, Result<()>> + { + let this = self.clone(); + let perm = self.match_perm(&new_state.state); + let grant = perm.map(|p| access.check(&user.data, p).unwrap_or(false)); + + let uid = user.id.clone(); + // is it a return + let is_ret = new_state.state == Status::Free; + // is it a (normal) write /the user is allowed to do/? + let is_wri = new_state.state == Status::InUse(Some(uid)) + && access.check(&user.data, self.desc.privs.write).unwrap_or(false); + + let f = async move { + let mut guard = this.inner.lock().await; + // either e.g. InUse() => Free or I'm allowed to overwrite + if (is_ret && guard.is_self(uid)) + || (is_wri && guard.is_free()) + || grant.unwrap_or(false) + { + guard.do_state_change(new_state); + } + return Ok(()) + }; + + Box::pin(f) + } + pub fn do_state_change(&self, new_state: MachineState) -> BoxFuture<'static, Result<()>> { @@ -126,6 +172,10 @@ impl Deref for Machine { /// A machine connects an event from a sensor to an actor activating/deactivating a real-world /// machine, checking that the user who wants the machine (de)activated has the required /// permissions. +/// +/// Machines have a rather complex state machine since they have to be eventually consistent and +/// can fail at any point in time (e.g. because power cuts out suddenly, a different task on this +/// thread panics, some loaded code produces a segfault, ...) pub struct Inner { /// Globally unique machine readable identifier pub id: MachineIdentifier, @@ -180,6 +230,20 @@ impl Inner { self.state.replace(state); } } + + pub fn is_self(&mut self, uid: UserId) -> bool { + match self.read_state().get_cloned().state { + Status::InUse(u) if u == uid => true, + _ => false, + } + } + + pub fn is_free(&mut self) -> bool { + match self.read_state().get_cloned().state { + Status::Free => true, + _ => false, + } + } } //pub type ReturnToken = futures::channel::oneshot::Sender<()>; @@ -228,7 +292,7 @@ pub struct MachineDescription { /// The permission required #[serde(flatten)] - pub privs: access::PrivilegesBuf, + pub privs: PrivilegesBuf, } impl MachineDescription { diff --git a/src/main.rs b/src/main.rs index 3e7df9b..7fb60fa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,6 +25,8 @@ mod actor; mod initiator; mod space; +mod resource; + use clap::{App, Arg}; use std::io; @@ -169,7 +171,7 @@ fn maybe(matches: clap::ArgMatches, log: Arc) -> Result<(), Error> { let machines = machine::load(&config)?; let (actor_map, actors) = actor::load(&log, &config)?; - let (init_map, initiators) = initiator::load(&log, &config)?; + let (init_map, initiators) = initiator::load(&log, &config, db.userdb.clone(), db.access.clone())?; let mut network = network::Network::new(machines, actor_map, init_map); diff --git a/src/modules/batch.rs b/src/modules/batch.rs index e4f062e..db6327e 100644 --- a/src/modules/batch.rs +++ b/src/modules/batch.rs @@ -1,12 +1,13 @@ +use std::io::{Read, Write}; use std::pin::Pin; use std::cell::RefCell; use std::collections::HashMap; use std::process::Stdio; use smol::process::{Command, Child}; -use smol::io::{AsyncWriteExt, AsyncReadExt}; +use smol::io::{AsyncWrite, AsyncWriteExt, AsyncReadExt}; -use futures::future::FutureExt; +use futures::future::{Future, FutureExt}; use crate::actor::Actuator; use crate::initiator::Sensor; @@ -19,13 +20,14 @@ use slog::Logger; use serde::{Serialize, Deserialize}; pub struct Batch { + log: Logger, userdb: UserDB, name: String, cmd: String, args: Vec, kill: bool, child: Child, - stdout: RefCell>>, + stdout: Pin>, } impl Batch { @@ -39,24 +41,28 @@ impl Batch { .collect()) .unwrap_or_else(Vec::new); - let kill = params.get("kill_on_exit").and_then(|s| - s.parse() - .or_else(|| { - warn!(log, "Can't parse `kill_on_exit` for {} set as {} as boolean. \ - Must be either \"True\" or \"False\".", &name, &s); - false - })); + let kill = params + .get("kill_on_exit") + .and_then(|kill| + kill.parse() + .or_else(|_| { + warn!(log, "Can't parse `kill_on_exit` for {} set as {} as boolean. \ + Must be either \"True\" or \"False\".", &name, &s); + Ok(false) + }) + .ok()) + .unwrap_or(false); info!(log, "Starting {} ({})…", &name, &cmd); let mut child = Self::start(&name, &cmd, &args) .map_err(|err| error!(log, "Failed to spawn {} ({}): {}", &name, &cmd, err)) .ok()?; - let stdout = Self::get_stdin(&mut child); + let stdout = Self::get_stdout(&mut child); - Ok(Self { userdb, name, cmd, args, kill, child, stdout }) + Ok(Self { log, userdb, name, cmd, args, kill, child, stdout }) } - fn start_actor(name: &String, cmd: &String, args: &Vec) -> Result { + fn start(name: &String, cmd: &String, args: &Vec) -> std::io::Result { let mut command = Command::new(cmd); command .stdin(Stdio::piped()) @@ -74,7 +80,7 @@ impl Batch { stdout.boxed_writer() } - fn maybe_restart(&mut self, f: &mut Option>) -> bool { + fn maybe_restart(&mut self, f: &mut Option>) -> bool { let stat = self.child.try_status(); if stat.is_err() { error!(self.log, "Can't check process for {} ({}) [{}]: {}", @@ -87,22 +93,22 @@ impl Batch { let errlog = self.log.new(o!("pid" => self.child.id())); // If we have any stderr try to log it if let Some(stderr) = self.child.stderr.take() { - f = Some(async move { - match stderr.into_stdio().await { - Err(err) => error!(errlog, "Failed to open actor process STDERR: ", err), - Ok(err) => if !retv.stderr.is_empty() { - let errstr = String::from_utf8_lossy(err); + *f = Some(Box::pin(async move { + let mut out = String::new(); + match stderr.read_to_string(&mut out).await { + Err(e) => warn!(errlog, "Failed to read child stderr: {}", e), + Ok(n) => if n != 0 { + let errstr = String::from_utf8_lossy(out); for line in errstr.lines() { warn!(errlog, "{}", line); } } - _ => {} } - }); + })); } info!(self.log, "Attempting to re-start {}", &self.name); let mut child = Self::start(&self.name, &self.cmd, &self.args) - .map_err(|err| error!(log, "Failed to spawn {} ({}): {}", &self.name, &self.cmd, err)) + .map_err(|err| error!(self.log, "Failed to spawn {} ({}): {}", &self.name, &self.cmd, err)) .ok(); // Nothing else to do with the currect architecture. In reality we should fail here // because we *didn't apply* the change. diff --git a/src/resource.rs b/src/resource.rs new file mode 100644 index 0000000..d800b4a --- /dev/null +++ b/src/resource.rs @@ -0,0 +1,25 @@ +use core::sync::atomic; + +/// A something BFFH holds internal state of +pub struct Resource { + // claims + strong: atomic::AtomicUsize, + weak: atomic::AtomicUsize, + max_strong: usize, +} + +/// A claim is taken in lieu of an user on a resource. +/// +/// They come in two flavours: Weak, of which an infinite amount can exist, and Strong which may be +/// limited in number. Strong claims represent the right of the user to use this resource +/// "writable". A weak claim indicates co-usage of a resource and are mainly useful for notice and +/// information of the respective other ones. E.g. a space would be strongly claimed by keyholders +/// when they check in and released when they check out and weakly claimed by everybody else. In +/// that case the last strong claim could also fail to be released if there are outstanding weak +/// claims. Alternatively, releasing the last strong claim also releases all weak claims and sets +/// the resource to "Free" again. +/// +/// Most importantly, claims can be released by *both* the claim holder and the resource. +pub struct Claim { + id: u128, +}