Current state for tagging

This commit is contained in:
Gregor Reitzenstein 2020-11-17 12:09:45 +01:00
parent 8442a3d29d
commit 3a459fc098
12 changed files with 150 additions and 157 deletions

View File

@ -12,7 +12,7 @@ use capnp_rpc::twoparty::VatNetwork;
use capnp_rpc::rpc_twoparty_capnp::Side;
use capnp::capability::FromServer;
use crate::db::machine::Machines;
use crate::machines::Machines;
use crate::db::user::User;
use uuid::Uuid;
@ -20,11 +20,11 @@ use uuid::Uuid;
pub struct MachinesAPI {
log: Logger,
user: User,
machines: Arc<Machines>,
machines: Machines,
}
impl MachinesAPI {
pub fn new(log: Logger, user: User, machines: Arc<Machines>) -> Self {
pub fn new(log: Logger, user: User, machines: Machines) -> Self {
Self { log, user, machines }
}
}
@ -35,9 +35,6 @@ impl api_capnp::machines::Server for MachinesAPI {
mut results: api_capnp::machines::ListMachinesResults)
-> Promise<(), Error>
{
let l = results.get();
let keys: Vec<api_capnp::machine::Reader> = self.machines.iter().map(|x| x.into()).collect();
l.set_machines(keys);
Promise::ok(())
}

5
src/builtin.rs Normal file
View File

@ -0,0 +1,5 @@
use crate::db::access::Perm;
static BUILTIN_PERMISSIONS: [Perm] = [
Perm::new("")
];

View File

@ -7,6 +7,7 @@ use crate::auth;
use crate::api;
pub use crate::schema::connection_capnp;
use crate::db::Databases;
use capnp_rpc::{twoparty, rpc_twoparty_capnp};
@ -52,6 +53,13 @@ impl connection_capnp::bootstrap::Server for Connection {
Promise::ok(())
}
fn machines(&mut self,
_: Params<machines_params::Owned>,
mut res: Results<machines_results::Owned>
) -> Promise<(), capnp::Error> {
Promise::ok(())
}
}
async fn handshake(log: &Logger, stream: &mut TcpStream) -> Result<()> {

View File

@ -26,6 +26,7 @@ use lmdb::{Transaction, RwTransaction, Cursor};
use smol::channel::{Receiver, Sender};
use futures::{Future, Stream, StreamExt};
use futures_signals::signal::*;
use crate::registries::StatusSignal;
@ -53,17 +54,6 @@ pub enum Status {
Reserved(UserIdentifier),
}
#[derive(Clone)]
pub struct GiveBack {
mdb: Arc<Box<dyn MachineDB>>,
uuid: Uuid,
}
impl GiveBack {
pub fn new(mdb: Arc<Box<dyn MachineDB>>, uuid: Uuid) -> Self {
Self { mdb, uuid }
}
}
fn uuid_from_api(uuid: crate::api::api_capnp::u_u_i_d::Reader) -> Uuid {
let uuid0 = uuid.get_uuid0() as u128;
let uuid1 = uuid.get_uuid1() as u128;
@ -78,88 +68,20 @@ fn api_from_uuid(uuid: Uuid, mut wr: crate::api::api_capnp::u_u_i_d::Builder) {
wr.set_uuid1(uuid1);
}
#[derive(Debug, Serialize, Deserialize)]
/// Internal machine representation
///
/// A machine connects an event from a sensor to an actor activating/deactivating a real-world
/// machine, checking that the user who wants the machine (de)activated has the required
/// permissions.
pub struct Machine {
/// Computer-readable identifier for this machine
// Implicit in database since it's the key.
#[serde(skip)]
id: MachineIdentifier,
/// The human-readable name of the machine. Does not need to be unique
name: String,
/// The required permissions to use this machine.
perm: access::PermIdentifier,
/// The state of the machine as bffh thinks the machine *should* be in.
///
/// This is a Signal generator. Subscribers to this signal will be notified of changes. In the
/// case of an actor it should then make sure that the real world matches up with the set state
state: Mutable<Status>,
}
impl Machine {
pub fn new(id: Uuid, name: String, perm: access::PermIdentifier) -> Machine {
Machine {
id: id,
name: name,
perm: perm,
state: Mutable::new(Status::Free),
}
}
/// Generate a signal from the internal state.
///
/// A signal is a lossy stream of state changes. Lossy in that if changes happen in quick
/// succession intermediary values may be lost. But this isn't really relevant in this case
/// since the only relevant state is the latest one.
pub fn signal(&self) -> StatusSignal {
// dedupe ensures that if state is changed but only changes to the value it had beforehand
// (could for example happen if the machine changes current user but stays activated) no
// update is sent.
Box::pin(self.state.signal_cloned().dedupe_cloned())
}
/// Requests to use a machine. Returns `true` if successful.
///
/// This will update the internal state of the machine, notifying connected actors, if any.
pub fn request_use<P: access::RoleDB>
( &mut self
, pp: &P
, who: &User
) -> Result<bool>
{
if pp.check(who, &self.perm)? {
self.state.set(Status::InUse(who.id.clone()));
return Ok(true);
} else {
return Ok(false);
}
}
pub fn set_state(&mut self, state: Status) {
self.state.set(state)
}
}
#[derive(Debug)]
pub struct Machines {
inner: HashMap<Uuid, Machine>,
}
impl Machines {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
/// The status of the machine
pub struct MachineState {
state: Status,
}
// TODO split up for non-writable Definition Databases
pub trait MachineDB {
fn get_machine(&self, machID: &MachineIdentifier) -> Result<Option<Machine>>;
fn put_machine(&self, machID: &MachineIdentifier, machine: Machine) -> Result<()>;
fn get_status(&self, machID: &MachineIdentifier)
-> impl Future<Output=Result<Option<MachineState>>>;
fn put_status(&self, machID: &MachineIdentifier, machine: MachineState)
-> impl Future<Output=Result<()>>;
fn iter_status(&self) -> impl Stream<Output=Result<MachineState>>;
}
pub fn init(log: Logger, config: &Settings, env: Arc<lmdb::Environment>) -> Result<Internal> {

View File

@ -9,7 +9,12 @@ use uuid::Uuid;
use lmdb::{Environment, Transaction, RwTransaction, Cursor};
use super::{MachineIdentifier, Machine, MachineDB};
use futures::{Future, Stream, StreamExt};
use futures::stream;
use futures::future::Ready;
use futures::stream::Iter;
use super::{MachineIdentifier, MachineState, MachineDB};
use crate::error::Result;
#[derive(Clone, Debug)]
@ -24,8 +29,8 @@ impl Internal {
Self { log, env, db }
}
pub fn _get_machine<T: Transaction>(&self, txn: &T, uuid: &Uuid)
-> Result<Option<Machine>>
pub fn get<T: Transaction>(&self, txn: &T, uuid: &Uuid)
-> Result<Option<MachineState>>
{
match txn.get(self.db, uuid.as_bytes()) {
Ok(bytes) => {
@ -39,10 +44,10 @@ impl Internal {
}
}
pub fn _put_machine( &self, txn: &mut RwTransaction, uuid: &Uuid, machine: Machine)
pub fn put(&self, txn: &mut RwTransaction, uuid: &Uuid, status: MachineStatus)
-> Result<()>
{
let bytes = flexbuffers::to_vec(machine)?;
let bytes = flexbuffers::to_vec(status)?;
txn.put(self.db, uuid.as_bytes(), &bytes, lmdb::WriteFlags::empty())?;
Ok(())
@ -74,7 +79,7 @@ impl Internal {
continue;
}
};
let mach: Machine = match toml::from_str(&s) {
let mach: MachineState = match toml::from_str(&s) {
Ok(r) => r,
Err(e) => {
warn!(self.log, "Failed to parse mach at path {}: {}, skipping!"
@ -83,7 +88,7 @@ impl Internal {
continue;
}
};
self._put_machine(txn, &machID, mach)?;
self.put(txn, &machID, mach)?;
debug!(self.log, "Loaded machine {}", machID);
} else {
warn!(self.log, "Path {} is not a file, skipping!", path.display());
@ -99,7 +104,7 @@ impl Internal {
for buf in mach_cursor.iter_start() {
let (kbuf, vbuf) = buf?;
let machID = uuid::Uuid::from_slice(kbuf).unwrap();
let mach: Machine = flexbuffers::from_slice(vbuf)?;
let mach: MachineState = flexbuffers::from_slice(vbuf)?;
let filename = format!("{}.yml", machID.to_hyphenated().to_string());
path.set_file_name(filename);
let mut fp = std::fs::File::create(&path)?;
@ -109,19 +114,33 @@ impl Internal {
Ok(())
}
pub fn iter<T: Transaction>(&self, txn: &T) -> _ {
let mut cursor = txn.open_ro_cursor(self.db)?;
Ok(cursor.iter_start().map(|buf| {
let (kbuf, vbuf) = buf.unwrap();
let machID = uuid::Uuid::from_slice(kbuf).unwrap();
flexbuffers::from_slice(vbuf).unwrap()
}))
}
}
impl MachineDB for Internal {
fn get_machine(&self, machID: &MachineIdentifier) -> Result<Option<Machine>> {
let txn = self.env.begin_ro_txn()?;
self._get_machine(&txn, machID)
fn get_status(&self, machID: &MachineIdentifier) -> Ready<Result<Option<MachineState>>> {
let txn = self.env.begin_ro_txn().unwrap();
futures::future::ready(self.get(&txn, machID))
}
fn put_machine(&self, machID: &MachineIdentifier, machine: Machine) -> Result<()> {
let mut txn = self.env.begin_rw_txn()?;
self._put_machine(&mut txn, machID, machine)?;
txn.commit()?;
fn put_status(&self, machID: &MachineIdentifier, machine: MachineState) -> Ready<Result<()>> {
let mut txn = self.env.begin_rw_txn().unwrap();
self.put(&mut txn, machID, machine).unwrap();
txn.commit().unwrap();
Ok(())
futures::future::ready(Ok(()))
}
fn iter_status(&self) -> _ {
let txn = self.env.begin_ro_txn().unwrap();
stream::iter(self.iter(&txn))
}
}

View File

@ -11,3 +11,9 @@ pub mod user;
///
/// Stores&Retrieves Machines
pub mod machine;
pub struct Databases {
pub roles: Box<dyn access::RoleDB>,
pub user: Box<dyn user::UserDB>,
pub machine: Box<dyn machine::MachineDB>,
}

76
src/machine.rs Normal file
View File

@ -0,0 +1,76 @@
use futures_signals::signal::Signal;
use futures_signals::signal::SignalExt;
use futures_signals::signal::Mutable;
use crate::error::Result;
use crate::db::user::User;
use crate::db::access;
use crate::db::machine::{MachineIdentifier, Status, MachineState};
#[derive(Debug)]
/// Internal machine representation
///
/// A machine connects an event from a sensor to an actor activating/deactivating a real-world
/// machine, checking that the user who wants the machine (de)activated has the required
/// permissions.
pub struct Machine {
/// Computer-readable identifier for this machine
id: MachineIdentifier,
/// The human-readable name of the machine. Does not need to be unique
name: String,
/// The required permissions to use this machine.
perm: access::PermIdentifier,
/// The state of the machine as bffh thinks the machine *should* be in.
///
/// This is a Signal generator. Subscribers to this signal will be notified of changes. In the
/// case of an actor it should then make sure that the real world matches up with the set state
state: Mutable<MachineState>,
}
impl Machine {
pub fn new(id: Uuid, name: String, perm: access::PermIdentifier) -> Machine {
Machine {
id: id,
name: name,
perm: perm,
state: Mutable::new(MachineState { state: Status::Free}),
}
}
/// Generate a signal from the internal state.
///
/// A signal is a lossy stream of state changes. Lossy in that if changes happen in quick
/// succession intermediary values may be lost. But this isn't really relevant in this case
/// since the only relevant state is the latest one.
pub fn signal(&self) -> impl Signal<Item=MachineState> {
// dedupe ensures that if state is changed but only changes to the value it had beforehand
// (could for example happen if the machine changes current user but stays activated) no
// update is sent.
Box::pin(self.state.signal_cloned().dedupe_cloned())
}
/// Requests to use a machine. Returns `true` if successful.
///
/// This will update the internal state of the machine, notifying connected actors, if any.
pub fn request_use<P: access::RoleDB>
( &mut self
, pp: &P
, who: &User
) -> Result<bool>
{
if pp.check(who, &self.perm)? {
self.state.set(MachineState { state: Status::InUse(who.id.clone()) });
return Ok(true);
} else {
return Ok(false);
}
}
pub fn set_state(&mut self, state: Status) {
self.state.set(MachineState { state })
}
}

2
src/machines.rs Normal file
View File

@ -0,0 +1,2 @@
#[derive(Clone)]
pub struct Machines;

View File

@ -15,13 +15,10 @@ mod config;
mod error;
mod connection;
mod registries;
mod network;
mod schema;
mod db;
// TODO: Remove these and improve module namespacing
use db::access;
pub use db::machine;
mod machine;
mod machines;
use clap::{App, Arg};

View File

@ -3,7 +3,7 @@ use slog::Logger;
use crate::config::Settings;
use crate::registries::{Registries, Actuator, ActBox, StatusSignal};
use crate::error::Result;
use crate::machine::Status;
use crate::db::machine::Status;
use std::pin::Pin;
use futures::prelude::*;

View File

@ -1,39 +0,0 @@
use futures_signals::signal::Signal;
use crate::machine;
use crate::access;
use crate::db::user::UserIdentifier;
struct Network {
}
impl Network {
pub fn new() -> Self {
Self { }
}
/// react to a signal coming in by running a future with $parameter
// TODO: Actually take a parameter.
pub fn react<S: Signal, F: Fn() -> ()>(&mut self, s: S, f: F) {
unimplemented!()
}
/// Filter an incoming signal
///
/// Idea being that bffh builds an event network that filters an incoming event into an
/// the appropiate (sub)set of signal handlers based on pretty dynamic configuration.
pub fn filter<B, S: Signal<Item=B>, F: Fn(&B) -> bool>(&mut self) {
unimplemented!()
}
}
/// The internal bffh event type
///
/// Everything that BFFH considers an event is contained in an instance of this.
#[derive(PartialEq, Eq, Clone, PartialOrd, Ord, Debug)]
enum Event {
/// An user wants to use a machine
// TODO: Define /what/ an user wants to do with said machine?
MachineRequest(machine::MachineIdentifier, UserIdentifier),
}

View File

@ -10,7 +10,7 @@ use futures::channel::mpsc;
use futures::task::{Context, Poll, Spawn};
use futures_signals::signal::Signal;
use crate::machine::Status;
use crate::db::machine::Status;
use std::collections::HashMap;