Machine state change with access control

This commit is contained in:
Gregor Reitzenstein 2021-01-22 15:25:26 +00:00
parent 5295611563
commit f387f55c06
4 changed files with 98 additions and 46 deletions

View File

@ -4,6 +4,8 @@ use std::ops::Deref;
use capnp::capability::Promise;
use capnp::Error;
use futures::FutureExt;
use crate::schema::api_capnp::State;
use crate::schema::api_capnp::machine::*;
use crate::connection::Session;
@ -85,13 +87,16 @@ impl write::Server for Write {
{
let uid = self.0.session.user.as_ref().map(|u| u.id.clone());
let new_state = MachineState::used(uid.clone());
if let Ok(tok) = self.0.machine.request_state_change(self.0.session.user.as_ref(), new_state) {
info!(self.0.session.log, "yay");
} else {
info!(self.0.session.log, "nay");
}
let this = self.0.clone();
let f = this.machine.request_state_change(this.session.user.as_ref(), new_state)
.map(|res_token| match res_token {
Ok(tok) => {
return Ok(());
},
Err(e) => Err(capnp::Error::failed("State change request returned an err".to_string())),
});
Promise::ok(())
Promise::from_future(f)
}
fn reserve(&mut self,

View File

@ -34,6 +34,8 @@ pub struct Initiator {
signal: MutableSignalCloned<Option<Machine>>,
machine: Option<Machine>,
future: Option<BoxFuture<'static, (Option<User>, MachineState)>>,
// TODO: Prepare the init for async state change requests.
state_change_fut: Option<BoxFuture<'static, Result<ReturnToken>>>,
token: Option<ReturnToken>,
sensor: BoxSensor,
}
@ -44,6 +46,7 @@ impl Initiator {
signal: signal,
machine: None,
future: None,
state_change_fut: None,
token: None,
sensor: sensor,
}
@ -73,14 +76,43 @@ impl Future for Initiator {
// Do as much work as we can:
loop {
// Always poll the state change future first
if let Some(ref mut f) = this.state_change_fut {
print!("Polling state change fut ...");
match Future::poll(Pin::new(f), cx) {
// If there is a state change future and it would block we return early
Poll::Pending => {
println!(" blocked");
return Poll::Pending;
},
Poll::Ready(Ok(tok)) => {
println!(" returned ok");
// Explicity drop the future
let _ = this.state_change_fut.take();
// Store the given return token for future use
this.token.replace(tok);
}
Poll::Ready(Err(e)) => {
println!(" returned err: {:?}", e);
// Explicity drop the future
let _ = this.state_change_fut.take();
}
}
}
// If there is a future, poll it
match this.future.as_mut().map(|future| Future::poll(Pin::new(future), cx)) {
None => {
this.future = Some(this.sensor.run_sensor());
},
Some(Poll::Ready((user, state))) => {
println!("New sensor fut");
this.future.take();
this.machine.as_mut().map(|machine| machine.request_state_change(user.as_ref(), state).unwrap());
let f = this.machine.as_mut().map(|machine| {
machine.request_state_change(user.as_ref(), state)
});
this.state_change_fut = f;
}
Some(Poll::Pending) => return Poll::Pending,
}

View File

@ -12,6 +12,8 @@ use std::fs;
use serde::{Serialize, Deserialize};
use futures::future::BoxFuture;
use futures_signals::signal::Signal;
use futures_signals::signal::SignalExt;
use futures_signals::signal::{Mutable, ReadOnlyMutable};
@ -22,7 +24,7 @@ use crate::error::{Result, Error};
use crate::db::access;
use crate::db::machine::{MachineIdentifier, Status, MachineState};
use crate::db::user::User;
use crate::db::user::{User, UserData};
use crate::network::MachineMap;
@ -49,39 +51,68 @@ impl Index {
#[derive(Debug, Clone)]
pub struct Machine {
inner: Arc<Mutex<Inner>>
inner: Arc<Mutex<Inner>>,
access: Arc<access::AccessControl>,
}
impl Machine {
pub fn new(inner: Inner) -> Self {
Self { inner: Arc::new(Mutex::new(inner)) }
pub fn new(inner: Inner, access: Arc<access::AccessControl>) -> Self {
Self { inner: Arc::new(Mutex::new(inner)), access: access }
}
pub fn construct
( id: MachineIdentifier
, desc: MachineDescription
, state: MachineState
, access: Arc<access::AccessControl>
) -> Machine
{
Self::new(Inner::new(id, desc, state))
Self::new(Inner::new(id, desc, state), access)
}
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Vec<Machine>> {
pub fn from_file<P: AsRef<Path>>(path: P, access: Arc<access::AccessControl>)
-> Result<Vec<Machine>>
{
let mut map: HashMap<MachineIdentifier, MachineDescription> = MachineDescription::load_file(path)?;
Ok(map.drain().map(|(id, desc)| {
Self::construct(id, desc, MachineState::new())
Self::construct(id, desc, MachineState::new(), access.clone())
}).collect())
}
/// Requests to use a machine. Returns a return token if successful.
///
/// This will update the internal state of the machine, notifying connected actors, if any.
/// The return token is a channel that considers the machine 'returned' if anything is sent
/// along it or if the sending end gets dropped. Anybody who holds this token needs to check if
/// the receiving end was canceled which indicates that the machine has been taken off their
/// hands.
pub fn request_state_change(&self, who: Option<&User>, new_state: MachineState)
-> Result<ReturnToken>
-> BoxFuture<'static, Result<ReturnToken>>
{
let mut guard = self.inner.try_lock().unwrap();
guard.request_state_change(who, new_state)
let this = self.clone();
let udata: Option<UserData> = who.map(|u| u.data.clone());
let f = async move {
if let Some(udata) = udata {
let mut guard = this.inner.try_lock().unwrap();
if this.access.check(&udata, &guard.desc.privs.write).await? {
return guard.do_state_change(new_state);
}
} else {
if new_state == MachineState::free() {
let mut guard = this.inner.try_lock().unwrap();
return guard.do_state_change(new_state);
}
}
return Err(Error::Denied);
};
Box::pin(f)
}
pub fn signal(&self) -> impl Signal<Item=MachineState> {
let mut guard = self.inner.try_lock().unwrap();
let guard = self.inner.try_lock().unwrap();
guard.signal()
}
}
@ -117,7 +148,11 @@ pub struct Inner {
}
impl Inner {
pub fn new(id: MachineIdentifier, desc: MachineDescription, state: MachineState) -> Inner {
pub fn new ( id: MachineIdentifier
, desc: MachineDescription
, state: MachineState
) -> Inner
{
Inner {
id: id,
desc: desc,
@ -139,29 +174,7 @@ impl Inner {
Box::pin(self.state.signal_cloned().dedupe_cloned())
}
/// Requests to use a machine. Returns a return token if successful.
///
/// This will update the internal state of the machine, notifying connected actors, if any.
/// The return token is a channel that considers the machine 'returned' if anything is sent
/// along it or if the sending end gets dropped. Anybody who holds this token needs to check if
/// the receiving end was canceled which indicates that the machine has been taken off their
/// hands.
pub fn request_state_change(&mut self, who: Option<&User>, new_state: MachineState)
-> Result<ReturnToken>
{
if who.is_none() {
if new_state.state == Status::Free {
return self.do_state_change(new_state);
}
} else {
// TODO: Correctly check permissions here
return self.do_state_change(new_state);
}
return Err(Error::Denied);
}
fn do_state_change(&mut self, new_state: MachineState) -> Result<ReturnToken> {
pub fn do_state_change(&mut self, new_state: MachineState) -> Result<ReturnToken> {
let (tx, rx) = futures::channel::oneshot::channel();
let old_state = self.state.replace(new_state);
self.reset.replace(old_state);
@ -234,13 +247,15 @@ impl MachineDescription {
}
}
pub fn load(config: &crate::config::Settings) -> Result<MachineMap> {
pub fn load(config: &crate::config::Settings, access: Arc<access::AccessControl>)
-> Result<MachineMap>
{
let mut map = config.machines.clone();
let it = map.drain()
.map(|(k,v)| {
// TODO: Read state from the state db
(v.name.clone(), Machine::construct(k, v, MachineState::new()))
(v.name.clone(), Machine::construct(k, v, MachineState::new(), access.clone()))
});
Ok(HashMap::from_iter(it))
}

View File

@ -153,13 +153,14 @@ fn maybe(matches: clap::ArgMatches, log: Arc<Logger>) -> Result<(), Error> {
Ok(())
} else {
let ex = Executor::new();
let db = db::Databases::new(&log, &config)?;
let mqtt = AsyncClient::new(config.mqtt_url.clone())?;
let tok = mqtt.connect(paho_mqtt::ConnectOptions::new());
smol::block_on(tok)?;
let machines = machine::load(&config)?;
let machines = machine::load(&config, db.access.clone())?;
let (mut actor_map, actors) = actor::load(&log, &mqtt, &config)?;
let (mut init_map, initiators) = initiator::load(&log, &mqtt, &config)?;
@ -189,7 +190,6 @@ fn maybe(matches: clap::ArgMatches, log: Arc<Logger>) -> Result<(), Error> {
let (_, r) = easy_parallel::Parallel::new()
.each(0..4, |_| smol::block_on(ex.run(shutdown.recv())))
.finish(|| {
let db = db::Databases::new(&log, &config)?;
// TODO: Spawn api connections on their own (non-main) thread, use the main thread to
// handle signals (a cli if stdin is not closed?) and make it stop and clean up all threads
// when bffh should exit