Refactor machines somewhat

This commit is contained in:
Gregor Reitzenstein 2020-11-30 14:08:03 +01:00
parent 65841f5046
commit cc40cde831
9 changed files with 130 additions and 79 deletions

View File

@ -40,17 +40,17 @@ impl Machine {
if let Some(state) = self.db.machine.get_state(&self.id) {
match state.state {
Status::Free => builder.set_state(State::Free),
Status::InUse(_u) => {
Status::InUse(_u, _p) => {
builder.set_state(State::InUse);
}
Status::ToCheck(_u) => {
Status::ToCheck(_u, _p) => {
builder.set_state(State::ToCheck);
}
Status::Blocked(_u) => {
Status::Blocked(_u, _p) => {
builder.set_state(State::Blocked);
}
Status::Disabled => builder.set_state(State::Disabled),
Status::Reserved(_u) => {
Status::Reserved(_u, _p) => {
builder.set_state(State::Reserved);
}
}

View File

@ -37,6 +37,7 @@ pub mod internal;
use internal::Internal;
pub type MachineIdentifier = Uuid;
pub type Priority = u64;
/// Status of a Machine
#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
@ -44,15 +45,15 @@ pub enum Status {
/// Not currently used by anybody
Free,
/// Used by somebody
InUse(UserId),
InUse(UserId, Priority),
/// Was used by somebody and now needs to be checked for cleanliness
ToCheck(UserId),
ToCheck(UserId, Priority),
/// Not used by anybody but also can not be used. E.g. down for maintenance
Blocked(UserId),
Blocked(UserId, Priority),
/// Disabled for some other reason
Disabled,
/// Reserved
Reserved(UserId),
Reserved(UserId, Priority),
}
pub fn uuid_from_api(uuid: crate::schema::api_capnp::u_u_i_d::Reader) -> Uuid {
@ -75,6 +76,24 @@ pub struct MachineState {
pub state: Status,
}
impl MachineState {
/// Check if the given priority is higher than one's own.
///
/// If `self` does not have a priority then this function always returns `true`
pub fn is_higher_priority(&self, priority: u64) -> bool {
match self.state {
Status::Disabled | Status::Free => { true },
Status::Blocked(_, self_prio) |
Status::InUse(_, self_prio) |
Status::ToCheck(_, self_prio) |
Status::Reserved(_, self_prio) =>
{
priority > self_prio
}
}
}
}
pub fn init(log: Logger, config: &Settings, env: Arc<lmdb::Environment>) -> Result<Internal> {
let mut machine_descriptions = MachineDescription::load_file(&config.machines)?;
let mut flags = lmdb::DatabaseFlags::empty();

View File

@ -66,11 +66,25 @@ pub struct UserData {
/// Persons are only ever given roles, not permissions directly
pub roles: Vec<RoleIdentifier>,
#[serde(skip_serializing_if = "is_zero")]
#[serde(default = "default_priority")]
/// A priority number, defaulting to 0.
///
/// The higher, the higher the priority. Higher priority users overwrite lower priority ones.
pub priority: u64,
/// Additional data storage
#[serde(flatten)]
kv: HashMap<Box<[u8]>, Box<[u8]>>,
}
fn is_zero(i: &u64) -> bool {
*i == 0
}
const fn default_priority() -> u64 {
0
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -24,7 +24,8 @@ pub enum Error {
MQTT(mqtt::Error),
Config(config::ConfigError),
BadVersion((u32,u32)),
Argon2(argon2::Error)
Argon2(argon2::Error),
Denied,
}
impl fmt::Display for Error {
@ -72,6 +73,9 @@ impl fmt::Display for Error {
Error::BadVersion((major,minor)) => {
write!(f, "Peer uses API version {}.{} which is incompatible!", major, minor)
}
Error::Denied => {
write!(f, "You do not have the permission required to do that.")
}
}
}
}

View File

@ -1,4 +1,8 @@
use std::path::Path;
use std::task::{Poll, Context};
use std::pin::Pin;
use std::future::Future;
use std::collections::HashMap;
use std::fs;
@ -10,7 +14,7 @@ use futures_signals::signal::Mutable;
use uuid::Uuid;
use crate::error::Result;
use crate::error::{Result, Error};
use crate::db::access;
use crate::db::machine::{MachineIdentifier, Status, MachineState};
@ -34,14 +38,21 @@ pub struct Machine {
/// This is a Signal generator. Subscribers to this signal will be notified of changes. In the
/// case of an actor it should then make sure that the real world matches up with the set state
state: Mutable<MachineState>,
reset: Option<MachineState>,
rx: Option<futures::channel::oneshot::Receiver<()>>,
access: access::AccessControl,
}
impl Machine {
pub fn new(id: MachineIdentifier, desc: MachineDescription, perm: access::PermIdentifier) -> Machine {
pub fn new(id: MachineIdentifier, desc: MachineDescription, access: access::AccessControl, state: MachineState) -> Machine {
Machine {
id: id,
desc: desc,
state: Mutable::new(MachineState { state: Status::Free}),
state: Mutable::new(state),
reset: None,
rx: None,
access: access,
}
}
@ -57,27 +68,68 @@ impl Machine {
Box::pin(self.state.signal_cloned().dedupe_cloned())
}
/// Requests to use a machine. Returns `true` if successful.
/// Requests to use a machine. Returns a return token if successful.
///
/// This will update the internal state of the machine, notifying connected actors, if any.
pub async fn request_use
( &mut self
, access: access::AccessControl
, who: &User
) -> Result<bool>
/// The return token is a channel that considers the machine 'returned' if anything is sent
/// along it or if the sending end gets dropped. Anybody who holds this token needs to check if
/// the receiving end was canceled which indicates that the machine has been taken off their
/// hands.
pub async fn request_state_change(&mut self, who: &User, new_state: MachineState)
-> Result<ReturnToken>
{
// TODO: Check different levels
if access.check(&who.data, &self.desc.privs.write).await? {
self.state.set(MachineState { state: Status::InUse(who.id.clone()) });
return Ok(true);
} else {
return Ok(false);
if self.access.check(&who.data, &self.desc.privs.write).await? {
if self.state.lock_ref().is_higher_priority(who.data.priority) {
let (tx, rx) = futures::channel::oneshot::channel();
let old_state = self.state.replace(new_state);
self.reset.replace(old_state);
// Also this drops the old receiver, which will signal to the initiator that the
// machine has been taken off their hands.
self.rx.replace(rx);
return Ok(tx);
}
}
return Err(Error::Denied);
}
pub fn set_state(&mut self, state: Status) {
self.state.set(MachineState { state })
}
pub fn get_signal(&self) -> impl Signal {
self.state.signal_cloned().dedupe_cloned()
}
pub fn reset_state(&mut self) {
if let Some(state) = self.reset.take() {
self.state.replace(state);
}
}
}
type ReturnToken = futures::channel::oneshot::Sender<()>;
impl Future for Machine {
type Output = MachineState;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut this = &mut *self;
// TODO Return this on exit
if false {
return Poll::Ready(self.state.get_cloned());
}
if let Some(mut rx) = this.rx.take() {
match Future::poll(Pin::new(&mut rx), cx) {
// Regardless if we were canceled or properly returned, reset.
Poll::Ready(_) => self.reset_state(),
Poll::Pending => { this.rx.replace(rx); },
}
}
Poll::Pending
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]

View File

@ -217,11 +217,12 @@ fn main() -> Result<(), Error> {
let pdb = pdb?;
let mut ac = db::access::AccessControl::new();
ac.add_source_unchecked("Internal".to_string(), Box::new(pdb));
let machdb = Arc::new(machdb);
let passdb = db::pass::PassDB::init(log.new(o!("system" => "passwords")), env.clone()).unwrap();
let db = db::Databases {
access: Arc::new(db::access::AccessControl::new()),
machine: Arc::new(machdb),
machine: machdb.clone(),
passdb: Arc::new(passdb),
};
@ -245,7 +246,7 @@ fn main() -> Result<(), Error> {
// FIXME: implement notification so the modules can shut down cleanly instead of being killed
// without warning.
let modlog = log.clone();
let mut regs = Registries::new();
let mut regs = Registries::new(machdb.clone());
match exec.run_until(modules::init(modlog.new(o!("system" => "modules")), config.clone(), pool.clone(), regs.clone())) {
Ok(()) => {}
Err(e) => {

View File

@ -100,7 +100,7 @@ impl Stream for Shelly {
info!(unpin.log, "Machine Status changed: {:?}", status);
let topic = format!("shellies/{}/relay/0/command", unpin.name);
let pl = match status {
Status::InUse(_) => "on",
Status::InUse(_, _) => "on",
_ => "off",
};
let msg = mqtt::Message::new(topic, pl, 0);

View File

@ -1,8 +1,11 @@
use std::sync::Arc;
use crate::db::machine::MachineDB;
mod actuators;
mod sensors;
pub use actuators::{Actuator, ActBox, StatusSignal};
pub use sensors::{Sensor, SensBox};
#[derive(Clone)]
/// BFFH registries
@ -15,10 +18,10 @@ pub struct Registries {
}
impl Registries {
pub fn new() -> Self {
pub fn new(db: Arc<MachineDB>) -> Self {
Registries {
actuators: actuators::Actuators::new(),
sensors: sensors::Sensors::new(),
sensors: sensors::Sensors::new(db),
}
}
}

View File

@ -2,6 +2,9 @@ use std::pin::Pin;
use futures::task::{Context, Poll};
use futures::{Future, Stream};
use futures::future::BoxFuture;
use futures_signals::signal::Signal;
use crate::db::user::UserId;
use crate::db::machine::MachineDB;
use std::sync::Arc;
use smol::lock::RwLock;
@ -10,64 +13,19 @@ use std::collections::HashMap;
#[derive(Clone)]
pub struct Sensors {
inner: Arc<RwLock<Inner>>,
db: Arc<MachineDB>,
}
impl Sensors {
pub fn new() -> Self {
pub fn new(db: Arc<MachineDB>) -> Self {
Sensors {
inner: Arc::new(RwLock::new(Inner::new())),
db: db,
}
}
}
pub type SensBox = Box<dyn Sensor + Send + Sync>;
pub type SensBox = Box<dyn Signal<Item=UserId> + Send + Sync>;
type Inner = HashMap<String, SensBox>;
// Implementing Sensors.
//
// Given the coroutine/task split stays as it is - Sensor input to machine update being one,
// machine update signal to actor doing thing being another, a Sensor implementation would send a
// Stream of futures - each future being an atomic Machine update.
#[async_trait]
/// BFFH Sensor
///
/// A sensor is anything that can forward an intent of an user to do something to bffh.
/// This may be a card reader connected to a machine, a website allowing users to select a machine
/// they want to use or something like QRHello
pub trait Sensor: Stream<Item = BoxFuture<'static, ()>> {
/// Setup the Sensor.
///
/// After this async function completes the Stream implementation should be able to generate
/// futures when polled.
/// Implementations can rely on this function being polled to completeion before the stream
/// is polled.
// TODO Is this sensible vs just having module-specific setup fns?
async fn setup(&mut self);
/// Shutdown the sensor gracefully
///
/// Implementations can rely on that the stream will not be polled after this function has been
/// called.
async fn shutdown(&mut self);
}
struct Dummy;
#[async_trait]
impl Sensor for Dummy {
async fn setup(&mut self) {
return;
}
async fn shutdown(&mut self) {
return;
}
}
impl Stream for Dummy {
type Item = BoxFuture<'static, ()>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
Poll::Ready(Some(Box::pin(futures::future::ready(()))))
}
}