From 3a459fc0984444fe67f6d93ab756f53e42755868 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Tue, 17 Nov 2020 12:09:45 +0100 Subject: [PATCH] Current state for tagging --- src/api.rs | 9 ++-- src/builtin.rs | 5 ++ src/connection.rs | 8 +++ src/db/machine.rs | 100 ++++-------------------------------- src/db/machine/internal.rs | 51 ++++++++++++------ src/db/mod.rs | 6 +++ src/machine.rs | 76 +++++++++++++++++++++++++++ src/machines.rs | 2 + src/main.rs | 7 +-- src/modules/shelly.rs | 2 +- src/network.rs | 39 -------------- src/registries/actuators.rs | 2 +- 12 files changed, 150 insertions(+), 157 deletions(-) create mode 100644 src/builtin.rs create mode 100644 src/machine.rs create mode 100644 src/machines.rs delete mode 100644 src/network.rs diff --git a/src/api.rs b/src/api.rs index 5d33593..5b17596 100644 --- a/src/api.rs +++ b/src/api.rs @@ -12,7 +12,7 @@ use capnp_rpc::twoparty::VatNetwork; use capnp_rpc::rpc_twoparty_capnp::Side; use capnp::capability::FromServer; -use crate::db::machine::Machines; +use crate::machines::Machines; use crate::db::user::User; use uuid::Uuid; @@ -20,11 +20,11 @@ use uuid::Uuid; pub struct MachinesAPI { log: Logger, user: User, - machines: Arc, + machines: Machines, } impl MachinesAPI { - pub fn new(log: Logger, user: User, machines: Arc) -> Self { + pub fn new(log: Logger, user: User, machines: Machines) -> Self { Self { log, user, machines } } } @@ -35,9 +35,6 @@ impl api_capnp::machines::Server for MachinesAPI { mut results: api_capnp::machines::ListMachinesResults) -> Promise<(), Error> { - let l = results.get(); - let keys: Vec = self.machines.iter().map(|x| x.into()).collect(); - l.set_machines(keys); Promise::ok(()) } diff --git a/src/builtin.rs b/src/builtin.rs new file mode 100644 index 0000000..c3a3c65 --- /dev/null +++ b/src/builtin.rs @@ -0,0 +1,5 @@ +use crate::db::access::Perm; + +static BUILTIN_PERMISSIONS: [Perm] = [ + Perm::new("") +]; diff --git a/src/connection.rs b/src/connection.rs index faaaca5..d130e59 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -7,6 +7,7 @@ use crate::auth; use crate::api; pub use crate::schema::connection_capnp; +use crate::db::Databases; use capnp_rpc::{twoparty, rpc_twoparty_capnp}; @@ -52,6 +53,13 @@ impl connection_capnp::bootstrap::Server for Connection { Promise::ok(()) } + + fn machines(&mut self, + _: Params, + mut res: Results + ) -> Promise<(), capnp::Error> { + Promise::ok(()) + } } async fn handshake(log: &Logger, stream: &mut TcpStream) -> Result<()> { diff --git a/src/db/machine.rs b/src/db/machine.rs index b2f9daa..4025c06 100644 --- a/src/db/machine.rs +++ b/src/db/machine.rs @@ -26,6 +26,7 @@ use lmdb::{Transaction, RwTransaction, Cursor}; use smol::channel::{Receiver, Sender}; +use futures::{Future, Stream, StreamExt}; use futures_signals::signal::*; use crate::registries::StatusSignal; @@ -53,17 +54,6 @@ pub enum Status { Reserved(UserIdentifier), } -#[derive(Clone)] -pub struct GiveBack { - mdb: Arc>, - uuid: Uuid, -} -impl GiveBack { - pub fn new(mdb: Arc>, uuid: Uuid) -> Self { - Self { mdb, uuid } - } -} - fn uuid_from_api(uuid: crate::api::api_capnp::u_u_i_d::Reader) -> Uuid { let uuid0 = uuid.get_uuid0() as u128; let uuid1 = uuid.get_uuid1() as u128; @@ -78,88 +68,20 @@ fn api_from_uuid(uuid: Uuid, mut wr: crate::api::api_capnp::u_u_i_d::Builder) { wr.set_uuid1(uuid1); } -#[derive(Debug, Serialize, Deserialize)] -/// Internal machine representation -/// -/// A machine connects an event from a sensor to an actor activating/deactivating a real-world -/// machine, checking that the user who wants the machine (de)activated has the required -/// permissions. -pub struct Machine { - /// Computer-readable identifier for this machine - // Implicit in database since it's the key. - #[serde(skip)] - id: MachineIdentifier, - - /// The human-readable name of the machine. Does not need to be unique - name: String, - - /// The required permissions to use this machine. - perm: access::PermIdentifier, - - /// The state of the machine as bffh thinks the machine *should* be in. - /// - /// This is a Signal generator. Subscribers to this signal will be notified of changes. In the - /// case of an actor it should then make sure that the real world matches up with the set state - state: Mutable, -} - -impl Machine { - pub fn new(id: Uuid, name: String, perm: access::PermIdentifier) -> Machine { - Machine { - id: id, - name: name, - perm: perm, - state: Mutable::new(Status::Free), - } - } - - /// Generate a signal from the internal state. - /// - /// A signal is a lossy stream of state changes. Lossy in that if changes happen in quick - /// succession intermediary values may be lost. But this isn't really relevant in this case - /// since the only relevant state is the latest one. - pub fn signal(&self) -> StatusSignal { - // dedupe ensures that if state is changed but only changes to the value it had beforehand - // (could for example happen if the machine changes current user but stays activated) no - // update is sent. - Box::pin(self.state.signal_cloned().dedupe_cloned()) - } - - /// Requests to use a machine. Returns `true` if successful. - /// - /// This will update the internal state of the machine, notifying connected actors, if any. - pub fn request_use - ( &mut self - , pp: &P - , who: &User - ) -> Result - { - if pp.check(who, &self.perm)? { - self.state.set(Status::InUse(who.id.clone())); - return Ok(true); - } else { - return Ok(false); - } - } - - pub fn set_state(&mut self, state: Status) { - self.state.set(state) - } -} - -#[derive(Debug)] -pub struct Machines { - inner: HashMap, -} - -impl Machines { - +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +/// The status of the machine +pub struct MachineState { + state: Status, } // TODO split up for non-writable Definition Databases pub trait MachineDB { - fn get_machine(&self, machID: &MachineIdentifier) -> Result>; - fn put_machine(&self, machID: &MachineIdentifier, machine: Machine) -> Result<()>; + fn get_status(&self, machID: &MachineIdentifier) + -> impl Future>>; + fn put_status(&self, machID: &MachineIdentifier, machine: MachineState) + -> impl Future>; + + fn iter_status(&self) -> impl Stream>; } pub fn init(log: Logger, config: &Settings, env: Arc) -> Result { diff --git a/src/db/machine/internal.rs b/src/db/machine/internal.rs index 1334787..11a625e 100644 --- a/src/db/machine/internal.rs +++ b/src/db/machine/internal.rs @@ -9,7 +9,12 @@ use uuid::Uuid; use lmdb::{Environment, Transaction, RwTransaction, Cursor}; -use super::{MachineIdentifier, Machine, MachineDB}; +use futures::{Future, Stream, StreamExt}; +use futures::stream; +use futures::future::Ready; +use futures::stream::Iter; + +use super::{MachineIdentifier, MachineState, MachineDB}; use crate::error::Result; #[derive(Clone, Debug)] @@ -24,8 +29,8 @@ impl Internal { Self { log, env, db } } - pub fn _get_machine(&self, txn: &T, uuid: &Uuid) - -> Result> + pub fn get(&self, txn: &T, uuid: &Uuid) + -> Result> { match txn.get(self.db, uuid.as_bytes()) { Ok(bytes) => { @@ -39,10 +44,10 @@ impl Internal { } } - pub fn _put_machine( &self, txn: &mut RwTransaction, uuid: &Uuid, machine: Machine) + pub fn put(&self, txn: &mut RwTransaction, uuid: &Uuid, status: MachineStatus) -> Result<()> { - let bytes = flexbuffers::to_vec(machine)?; + let bytes = flexbuffers::to_vec(status)?; txn.put(self.db, uuid.as_bytes(), &bytes, lmdb::WriteFlags::empty())?; Ok(()) @@ -74,7 +79,7 @@ impl Internal { continue; } }; - let mach: Machine = match toml::from_str(&s) { + let mach: MachineState = match toml::from_str(&s) { Ok(r) => r, Err(e) => { warn!(self.log, "Failed to parse mach at path {}: {}, skipping!" @@ -83,7 +88,7 @@ impl Internal { continue; } }; - self._put_machine(txn, &machID, mach)?; + self.put(txn, &machID, mach)?; debug!(self.log, "Loaded machine {}", machID); } else { warn!(self.log, "Path {} is not a file, skipping!", path.display()); @@ -99,7 +104,7 @@ impl Internal { for buf in mach_cursor.iter_start() { let (kbuf, vbuf) = buf?; let machID = uuid::Uuid::from_slice(kbuf).unwrap(); - let mach: Machine = flexbuffers::from_slice(vbuf)?; + let mach: MachineState = flexbuffers::from_slice(vbuf)?; let filename = format!("{}.yml", machID.to_hyphenated().to_string()); path.set_file_name(filename); let mut fp = std::fs::File::create(&path)?; @@ -109,19 +114,33 @@ impl Internal { Ok(()) } + + pub fn iter(&self, txn: &T) -> _ { + let mut cursor = txn.open_ro_cursor(self.db)?; + Ok(cursor.iter_start().map(|buf| { + let (kbuf, vbuf) = buf.unwrap(); + let machID = uuid::Uuid::from_slice(kbuf).unwrap(); + flexbuffers::from_slice(vbuf).unwrap() + })) + } } impl MachineDB for Internal { - fn get_machine(&self, machID: &MachineIdentifier) -> Result> { - let txn = self.env.begin_ro_txn()?; - self._get_machine(&txn, machID) + fn get_status(&self, machID: &MachineIdentifier) -> Ready>> { + let txn = self.env.begin_ro_txn().unwrap(); + futures::future::ready(self.get(&txn, machID)) } - fn put_machine(&self, machID: &MachineIdentifier, machine: Machine) -> Result<()> { - let mut txn = self.env.begin_rw_txn()?; - self._put_machine(&mut txn, machID, machine)?; - txn.commit()?; + fn put_status(&self, machID: &MachineIdentifier, machine: MachineState) -> Ready> { + let mut txn = self.env.begin_rw_txn().unwrap(); + self.put(&mut txn, machID, machine).unwrap(); + txn.commit().unwrap(); - Ok(()) + futures::future::ready(Ok(())) + } + + fn iter_status(&self) -> _ { + let txn = self.env.begin_ro_txn().unwrap(); + stream::iter(self.iter(&txn)) } } diff --git a/src/db/mod.rs b/src/db/mod.rs index 9786d0f..8fd035e 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -11,3 +11,9 @@ pub mod user; /// /// Stores&Retrieves Machines pub mod machine; + +pub struct Databases { + pub roles: Box, + pub user: Box, + pub machine: Box, +} diff --git a/src/machine.rs b/src/machine.rs new file mode 100644 index 0000000..287aa45 --- /dev/null +++ b/src/machine.rs @@ -0,0 +1,76 @@ +use futures_signals::signal::Signal; +use futures_signals::signal::SignalExt; +use futures_signals::signal::Mutable; + +use crate::error::Result; + +use crate::db::user::User; +use crate::db::access; +use crate::db::machine::{MachineIdentifier, Status, MachineState}; + +#[derive(Debug)] +/// Internal machine representation +/// +/// A machine connects an event from a sensor to an actor activating/deactivating a real-world +/// machine, checking that the user who wants the machine (de)activated has the required +/// permissions. +pub struct Machine { + /// Computer-readable identifier for this machine + id: MachineIdentifier, + + /// The human-readable name of the machine. Does not need to be unique + name: String, + + /// The required permissions to use this machine. + perm: access::PermIdentifier, + + /// The state of the machine as bffh thinks the machine *should* be in. + /// + /// This is a Signal generator. Subscribers to this signal will be notified of changes. In the + /// case of an actor it should then make sure that the real world matches up with the set state + state: Mutable, +} + +impl Machine { + pub fn new(id: Uuid, name: String, perm: access::PermIdentifier) -> Machine { + Machine { + id: id, + name: name, + perm: perm, + state: Mutable::new(MachineState { state: Status::Free}), + } + } + + /// Generate a signal from the internal state. + /// + /// A signal is a lossy stream of state changes. Lossy in that if changes happen in quick + /// succession intermediary values may be lost. But this isn't really relevant in this case + /// since the only relevant state is the latest one. + pub fn signal(&self) -> impl Signal { + // dedupe ensures that if state is changed but only changes to the value it had beforehand + // (could for example happen if the machine changes current user but stays activated) no + // update is sent. + Box::pin(self.state.signal_cloned().dedupe_cloned()) + } + + /// Requests to use a machine. Returns `true` if successful. + /// + /// This will update the internal state of the machine, notifying connected actors, if any. + pub fn request_use + ( &mut self + , pp: &P + , who: &User + ) -> Result + { + if pp.check(who, &self.perm)? { + self.state.set(MachineState { state: Status::InUse(who.id.clone()) }); + return Ok(true); + } else { + return Ok(false); + } + } + + pub fn set_state(&mut self, state: Status) { + self.state.set(MachineState { state }) + } +} diff --git a/src/machines.rs b/src/machines.rs new file mode 100644 index 0000000..774f802 --- /dev/null +++ b/src/machines.rs @@ -0,0 +1,2 @@ +#[derive(Clone)] +pub struct Machines; diff --git a/src/main.rs b/src/main.rs index 5387e2b..b2e5b46 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,13 +15,10 @@ mod config; mod error; mod connection; mod registries; -mod network; mod schema; mod db; - -// TODO: Remove these and improve module namespacing -use db::access; -pub use db::machine; +mod machine; +mod machines; use clap::{App, Arg}; diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index dcbea34..b8037c0 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -3,7 +3,7 @@ use slog::Logger; use crate::config::Settings; use crate::registries::{Registries, Actuator, ActBox, StatusSignal}; use crate::error::Result; -use crate::machine::Status; +use crate::db::machine::Status; use std::pin::Pin; use futures::prelude::*; diff --git a/src/network.rs b/src/network.rs deleted file mode 100644 index 99057f2..0000000 --- a/src/network.rs +++ /dev/null @@ -1,39 +0,0 @@ -use futures_signals::signal::Signal; - -use crate::machine; -use crate::access; -use crate::db::user::UserIdentifier; - -struct Network { - -} - -impl Network { - pub fn new() -> Self { - Self { } - } - - /// react to a signal coming in by running a future with $parameter - // TODO: Actually take a parameter. - pub fn react ()>(&mut self, s: S, f: F) { - unimplemented!() - } - - /// Filter an incoming signal - /// - /// Idea being that bffh builds an event network that filters an incoming event into an - /// the appropiate (sub)set of signal handlers based on pretty dynamic configuration. - pub fn filter, F: Fn(&B) -> bool>(&mut self) { - unimplemented!() - } -} - -/// The internal bffh event type -/// -/// Everything that BFFH considers an event is contained in an instance of this. -#[derive(PartialEq, Eq, Clone, PartialOrd, Ord, Debug)] -enum Event { - /// An user wants to use a machine - // TODO: Define /what/ an user wants to do with said machine? - MachineRequest(machine::MachineIdentifier, UserIdentifier), -} diff --git a/src/registries/actuators.rs b/src/registries/actuators.rs index 2ffa81f..5094f3c 100644 --- a/src/registries/actuators.rs +++ b/src/registries/actuators.rs @@ -10,7 +10,7 @@ use futures::channel::mpsc; use futures::task::{Context, Poll, Spawn}; use futures_signals::signal::Signal; -use crate::machine::Status; +use crate::db::machine::Status; use std::collections::HashMap;