Compile with new DB system

This commit is contained in:
Nadja Reitzenstein 2022-03-16 18:10:59 +01:00
parent 1156174d7a
commit 2b7044d498
20 changed files with 348 additions and 446 deletions

View File

@ -1,7 +1,9 @@
use std::collections::HashMap; use std::collections::HashMap;
use futures_util::future; use futures_util::future;
use futures_util::future::BoxFuture; use futures_util::future::BoxFuture;
use rkyv::Archived;
use crate::actors::Actor; use crate::actors::Actor;
use crate::db::ArchivedValue;
use crate::resources::state::State; use crate::resources::state::State;
pub struct Dummy { pub struct Dummy {
@ -16,7 +18,7 @@ impl Dummy {
} }
impl Actor for Dummy { impl Actor for Dummy {
fn apply(&mut self, state: State) -> BoxFuture<'static, ()> { fn apply(&mut self, state: ArchivedValue<State>) -> BoxFuture<'static, ()> {
tracing::info!(name=%self.name, params=?self.params, ?state, "dummy actor updating state"); tracing::info!(name=%self.name, params=?self.params, ?state, "dummy actor updating state");
Box::pin(future::ready(())) Box::pin(future::ready(()))
} }

View File

@ -20,17 +20,14 @@ use rustls::{RootCertStore};
use url::Url; use url::Url;
use crate::actors::dummy::Dummy; use crate::actors::dummy::Dummy;
use crate::actors::process::Process; use crate::actors::process::Process;
use crate::db::ArchivedValue;
mod shelly; mod shelly;
mod process; mod process;
mod dummy; mod dummy;
pub trait Actor { pub trait Actor {
fn apply(&mut self, state: State) -> BoxFuture<'static, ()>; fn apply(&mut self, state: ArchivedValue<State>) -> BoxFuture<'static, ()>;
}
fn loader<S: Signal<Item = State>>(cell: &Cell<Option<S>>) -> Option<S> {
cell.take()
} }
pub struct ActorDriver<S: 'static> { pub struct ActorDriver<S: 'static> {
@ -40,7 +37,7 @@ pub struct ActorDriver<S: 'static> {
future: Option<BoxFuture<'static, ()>>, future: Option<BoxFuture<'static, ()>>,
} }
impl<S: Signal<Item = State>> ActorDriver<S> { impl<S: Signal<Item = ArchivedValue<State>>> ActorDriver<S> {
pub fn new(signal: S, actor: Box<dyn Actor + Send + Sync>) -> Self { pub fn new(signal: S, actor: Box<dyn Actor + Send + Sync>) -> Self {
Self { Self {
signal, signal,
@ -52,7 +49,7 @@ impl<S: Signal<Item = State>> ActorDriver<S> {
impl<S> Future for ActorDriver<S> impl<S> Future for ActorDriver<S>
where where
S: Signal<Item = State> + Unpin + Send, S: Signal<Item = ArchivedValue<State>> + Unpin + Send,
{ {
type Output = (); type Output = ();

View File

@ -1,8 +1,10 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::process::{Command, Stdio}; use std::process::{Command, Stdio};
use futures_util::future::BoxFuture; use futures_util::future::BoxFuture;
use rkyv::Archived;
use crate::actors::Actor; use crate::actors::Actor;
use crate::resources::modules::fabaccess::Status; use crate::db::ArchivedValue;
use crate::resources::modules::fabaccess::ArchivedStatus;
use crate::resources::state::State; use crate::resources::state::State;
pub struct Process { pub struct Process {
@ -29,7 +31,7 @@ impl Process {
} }
impl Actor for Process { impl Actor for Process {
fn apply(&mut self, state: State) -> BoxFuture<'static, ()> { fn apply(&mut self, state: ArchivedValue<State>) -> BoxFuture<'static, ()> {
tracing::debug!(name=%self.name, cmd=%self.cmd, ?state, tracing::debug!(name=%self.name, cmd=%self.cmd, ?state,
"Process actor updating state"); "Process actor updating state");
let mut command = Command::new(&self.cmd); let mut command = Command::new(&self.cmd);
@ -38,25 +40,25 @@ impl Actor for Process {
.args(self.args.iter()) .args(self.args.iter())
.arg(&self.name); .arg(&self.name);
match state.inner.state { match &state.as_ref().inner.state {
Status::Free => { ArchivedStatus::Free => {
command.arg("free"); command.arg("free");
} }
Status::InUse(ref by) => { ArchivedStatus::InUse(by) => {
command.arg("inuse").arg(format!("{}", by.get_username())); command.arg("inuse").arg(by.id.as_str());
} }
Status::ToCheck(ref by) => { ArchivedStatus::ToCheck(by) => {
command.arg("tocheck") command.arg("tocheck")
.arg(format!("{}", by.get_username())); .arg(by.id.as_str());
} }
Status::Blocked(ref by) => { ArchivedStatus::Blocked(by) => {
command.arg("blocked") command.arg("blocked")
.arg(format!("{}", by.get_username())); .arg(by.id.as_str());
} }
Status::Disabled => { command.arg("disabled"); }, ArchivedStatus::Disabled => { command.arg("disabled"); },
Status::Reserved(ref by) => { ArchivedStatus::Reserved(by) => {
command.arg("reserved") command.arg("reserved")
.arg(format!("{}", by.get_username())); .arg(by.id.as_str());
} }
} }

View File

@ -1,8 +1,10 @@
use std::collections::HashMap; use std::collections::HashMap;
use futures_util::future::BoxFuture; use futures_util::future::BoxFuture;
use rkyv::Archived;
use rumqttc::{AsyncClient, QoS}; use rumqttc::{AsyncClient, QoS};
use crate::actors::Actor; use crate::actors::Actor;
use crate::resources::modules::fabaccess::Status; use crate::db::ArchivedValue;
use crate::resources::modules::fabaccess::ArchivedStatus;
use crate::resources::state::State; use crate::resources::state::State;
/// An actuator for a Shellie connected listening on one MQTT broker /// An actuator for a Shellie connected listening on one MQTT broker
@ -38,12 +40,12 @@ impl Shelly {
impl Actor for Shelly { impl Actor for Shelly {
fn apply(&mut self, state: State) -> BoxFuture<'static, ()> { fn apply(&mut self, state: ArchivedValue<State>) -> BoxFuture<'static, ()> {
tracing::debug!(?state, name=%self.name, tracing::debug!(?state, name=%self.name,
"Shelly changing state" "Shelly changing state"
); );
let pl = match state.inner.state { let pl = match state.as_ref().inner.state {
Status::InUse(_) => "on", ArchivedStatus::InUse(_) => "on",
_ => "off", _ => "off",
}; };

View File

@ -1,57 +1,7 @@
use std::marker::PhantomData;
pub use lmdb::{
Environment,
DatabaseFlags,
WriteFlags,
EnvironmentFlags,
Transaction,
RoTransaction,
RwTransaction,
};
use rkyv::{Fallible, Serialize, ser::serializers::AllocSerializer, AlignedVec};
mod raw; mod raw;
pub use raw::RawDB; pub use raw::RawDB;
use lmdb::Error; mod typed;
use rkyv::Deserialize; pub use typed::{DB, ArchivedValue, Adapter, AlignedAdapter};
use rkyv::ser::serializers::AlignedSerializer;
pub type Error = lmdb::Error;
use crate::users::db::{User};
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use rkyv::Infallible;
use crate::resources::state::{State};
use std::iter::FromIterator;
use std::ops::Deref;
use crate::resources::search::ResourcesHandle;
use crate::Users;
#[derive(Debug, serde::Serialize)]
pub struct Dump {
users: HashMap<String, User>,
states: HashMap<String, State>,
}
impl Dump {
pub fn new(userdb: Users, resources: ResourcesHandle) -> Result<Self> {
let users = HashMap::from_iter(userdb.into_inner().get_all()?.into_iter());
let mut states = HashMap::new();
for resource in resources.list_all().into_iter() {
if let Some(output) = resource.get_raw_state() {
let output: State = Deserialize::<State, _>::deserialize(output.deref(), &mut Infallible).unwrap();
let old = states.insert(resource.get_id().to_string(), output);
assert!(old.is_none());
}
}
Ok(Self { users, states })
}
}

View File

@ -38,7 +38,7 @@ impl RawDB {
txn.put(self.db, key, value, flags) txn.put(self.db, key, value, flags)
} }
pub fn reserve<'txn, K>(&self, txn: &'txn mut RwTransaction, key: &K, size: usize, flags: WriteFlags) pub fn reserve<'txn, K>(&self, txn: &'txn mut RwTransaction, key: &K, size: usize, flags: WriteFlags)
-> lmdb::Result<&'txn mut [u8]> -> lmdb::Result<&'txn mut [u8]>
where K: AsRef<[u8]> where K: AsRef<[u8]>
{ {

152
bffhd/db/typed.rs Normal file
View File

@ -0,0 +1,152 @@
use crate::db::RawDB;
use lmdb::{Cursor, RwTransaction, Transaction, WriteFlags};
use rkyv::{AlignedVec, Archive, Archived, Serialize};
use std::fmt;
use std::fmt::{Debug, Display, Formatter};
use std::marker::PhantomData;
use std::pin::Pin;
use crate::db;
#[derive(Clone)]
/// Packed, sendable resource state
pub struct ArchivedValue<T> {
/// State is encoded using rkyv making it trivially serializable
data: AlignedVec,
_marker: PhantomData<T>,
}
impl<T> ArchivedValue<T> {
pub fn new(data: AlignedVec) -> Self {
Self {
data,
_marker: PhantomData,
}
}
pub fn build(data: &[u8]) -> Self {
let mut v = AlignedVec::with_capacity(data.len());
v.extend_from_slice(data);
Self::new(v)
}
pub fn as_mut(&mut self) -> &mut AlignedVec {
&mut self.data
}
pub fn as_slice(&self) -> &[u8] {
self.data.as_slice()
}
pub fn as_mut_slice(&mut self) -> &mut [u8] {
self.data.as_mut_slice()
}
}
impl<T: Archive> AsRef<Archived<T>> for ArchivedValue<T> {
fn as_ref(&self) -> &Archived<T> {
unsafe { rkyv::archived_root::<T>(self.as_slice()) }
}
}
//
// Debug implementation shows wrapping SendState
//
impl<T: Archive> Debug for ArchivedValue<T>
where
<T as Archive>::Archived: Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_tuple("SendState").field(self.as_ref()).finish()
}
}
//
// Display implementation hides wrapping SendState
//
impl<T: Archive> Display for ArchivedValue<T>
where
<T as Archive>::Archived: Display,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
Display::fmt(self.as_ref(), f)
}
}
/// Adapter trait handling de-/serialization
///
/// Values must be read from raw, unaligned byte buffers provided by LMDB.
pub trait Adapter {
type Item;
/// Decode data from a short-lived byte buffer into a durable format
fn decode(data: &[u8]) -> Self::Item;
fn encoded_len(item: &Self::Item) -> usize;
fn encode_into(item: &Self::Item, buf: &mut [u8]);
}
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Debug)]
pub struct AlignedAdapter<V>(PhantomData<V>);
impl<V> Adapter for AlignedAdapter<V> {
type Item = ArchivedValue<V>;
fn decode(data: &[u8]) -> Self::Item {
ArchivedValue::build(data)
}
fn encoded_len(item: &Self::Item) -> usize {
item.as_slice().len()
}
fn encode_into(item: &Self::Item, buf: &mut [u8]) {
buf.copy_from_slice(item.as_slice())
}
}
#[derive(Debug, Clone)]
#[repr(transparent)]
/// `Typed` database, allowing storing a typed value
///
/// Values must be serialized into and deserialized from raw byte buffers.
/// This is handled by a stateless [Adapter] given by the type parameter `A`
pub struct DB<A> {
db: RawDB,
_marker: PhantomData<A>,
}
impl<A> DB<A> {
pub fn new(db: RawDB) -> Self {
Self {
db,
_marker: PhantomData,
}
}
}
impl<A: Adapter> DB<A> {
pub fn get<T: Transaction>(&self, txn: &T, key: &impl AsRef<[u8]>) -> Result<Option<A::Item>, db::Error> {
Ok(self.db.get(txn, key)?.map(A::decode))
}
pub fn put(
&self,
txn: &mut RwTransaction,
key: &impl AsRef<[u8]>,
value: &A::Item,
flags: WriteFlags,
) -> Result<(), db::Error>
{
let len = A::encoded_len(value);
let buf = self.db.reserve(txn, key, len, flags)?;
assert_eq!(buf.len(), len, "Reserved buffer is not of requested size!");
A::encode_into(value, buf);
Ok(())
}
pub fn del(&self, txn: &mut RwTransaction, key: &impl AsRef<[u8]>) -> Result<(), db::Error> {
self.db.del::<_, &[u8]>(txn, key, None)
}
pub fn get_all<'txn, T: Transaction>(&self, txn: &'txn T) -> Result<impl IntoIterator<Item=(&'txn [u8], A::Item)>, db::Error> {
let mut cursor = self.db.open_ro_cursor(txn)?;
let it = cursor.iter_start();
Ok(it.filter_map(|buf| buf.ok().map(|(kbuf,vbuf)| {
(kbuf, A::decode(vbuf))
})))
}
}

View File

@ -1,7 +1,9 @@
use std::io; use std::io;
use std::fmt; use std::fmt;
use rsasl::error::SessionError; use rsasl::error::SessionError;
use crate::db::DBError; use crate::db;
type DBError = db::Error;
#[derive(Debug)] #[derive(Debug)]
/// Shared error type /// Shared error type

View File

@ -5,7 +5,6 @@ use async_channel as channel;
use async_oneshot as oneshot; use async_oneshot as oneshot;
use futures_signals::signal::Signal; use futures_signals::signal::Signal;
use futures_util::future::BoxFuture; use futures_util::future::BoxFuture;
use crate::resources::driver::{Error, Update};
use crate::resources::claim::{ResourceID, UserID}; use crate::resources::claim::{ResourceID, UserID};
use crate::resources::state::State; use crate::resources::state::State;

View File

@ -25,8 +25,6 @@ pub mod resources;
pub mod actors; pub mod actors;
pub mod initiators;
pub mod sensors; pub mod sensors;
pub mod capnp; pub mod capnp;
@ -71,7 +69,7 @@ pub const RELEASE_STRING: &'static str = env!("BFFHD_RELEASE_STRING");
pub struct Diflouroborane { pub struct Diflouroborane {
config: Config, config: Config,
executor: Executor<'static>, executor: Executor<'static>,
pub statedb: Arc<StateDB>, pub statedb: StateDB,
pub users: Users, pub users: Users,
pub roles: Roles, pub roles: Roles,
pub resources: ResourcesHandle, pub resources: ResourcesHandle,
@ -90,8 +88,8 @@ impl Diflouroborane {
let executor = Executor::new(); let executor = Executor::new();
let env = StateDB::open_env(&config.db_path)?; let env = StateDB::open_env(&config.db_path)?;
let statedb = Arc::new(StateDB::create_with_env(env.clone()) let statedb = StateDB::create_with_env(env.clone())
.context("Failed to open state DB file")?); .context("Failed to open state DB file")?;
let users = Users::new(env.clone()).context("Failed to open users DB file")?; let users = Users::new(env.clone()).context("Failed to open users DB file")?;
let roles = Roles::new(config.roles.clone()); let roles = Roles::new(config.roles.clone());

View File

@ -1,7 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use async_channel::Sender; use async_channel::Sender;
use lmdb::Environment; use lmdb::Environment;
use crate::resources::driver::Update;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
/// Database of currently valid claims, interests and notify, as far as applicable /// Database of currently valid claims, interests and notify, as far as applicable

View File

@ -1,141 +0,0 @@
use std::fmt::Debug;
use async_trait::async_trait;
use futures_signals::signal::Mutable;
use async_oneshot::Sender;
use async_channel::Receiver;
use crate::resources::state::db::StateDB;
use super::state::State;
/// A resources in BFFH has to contain several different parts;
/// - Currently set state
/// - Execution state of attached actors (⇒ BFFH's job)
/// - Output of interal logic of a resources
/// ⇒ Resource logic gets read access to set state and write access to output state.
/// ⇒ state `update` happens via resources logic. This logic should do access control. If the update
/// succeeds then BFFH stores those input parameters ("set" state) and results / output state.
/// Storing input parameters is relevant so that BFFH can know that an "update" is a no-op
/// without having to run the module code.
/// ⇒ in fact actors only really care about the output state, and shouldn't (need to) see "set"
/// state.
/// ⇒ example reserving:
/// - Claimant sends 'update' message with a new state
/// - Doesn't set the state until `update` has returned Ok.
/// - This runs the `update` function with that new state and the claimants user context returning
/// either an Ok or an Error.
/// - Error is returned to Claimant to show user, stop.
/// - On ok:
/// - Commit new "set" state, storing it and making it visible to all other claimants
/// - Commit new output state, storing it and notifying all connected actors / Notify
/// ⇒ BFFHs job in this whole ordeal is:
/// - Message passing primitives so that update message are queued
/// - As reliable as possible storage system for input and output state
/// - Again message passing so that updates are broadcasted to all Notify and Actors.
/// ⇒ Resource module's job is:
/// - Validating updates semantically i.e. are the types correct
/// - Check authorization of updates i.e. is this user allowed to do that
#[async_trait]
pub trait ResourceModel: Debug {
/// Run whatever internal logic this resources has for the given State update, and return the
/// new output state that this update produces.
async fn on_update(&mut self, input: &State) -> Result<State, Error>;
async fn shutdown(&mut self);
}
#[derive(Debug)]
pub struct Passthrough;
#[async_trait]
impl ResourceModel for Passthrough {
async fn on_update(&mut self, input: &State) -> Result<State, Error> {
Ok(input.clone())
}
async fn shutdown(&mut self) {}
}
/// Error type a resources implementation can produce
#[derive(Debug)]
pub enum Error {
Internal(Box<dyn std::error::Error + Send>),
Denied,
}
// TODO: more message context
#[derive(Debug)]
pub struct Update {
pub state: State,
pub errchan: Sender<Error>,
}
#[derive(Debug)]
pub struct ResourceDriver {
// putput
res: Box<dyn ResourceModel>,
// input
rx: Receiver<Update>,
// output
db: StateDB,
key: String,
signal: Mutable<State>,
}
impl ResourceDriver {
pub async fn drive_to_end(&mut self) {
while let Ok(update) = self.rx.recv().await {
let state = update.state;
let mut errchan = update.errchan;
match self.res.on_update(&state).await {
Ok(outstate) => {
// FIXME: Send any error here to some global error collector. A failed write to
// the DB is not necessarily fatal, but it means that BFFH is now in an
// inconsistent state until a future update succeeds with writing to the DB.
// Not applying the new state isn't correct either since we don't know what the
// internal logic of the resources has done to make this happen.
// Another half right solution is to unwrap and recreate everything.
// "Best" solution would be to tell the resources to rollback their interal
// changes on a fatal failure and then notify the Claimant, while simply trying
// again for temporary failures.
let _ = self.db.update(self.key.as_bytes(), &state, &outstate);
self.signal.set(outstate);
},
Err(e) => {
let _ = errchan.send(e);
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::pin::Pin;
use std::task::Poll;
use std::future::Future;
use super::*;
#[futures_test::test]
async fn test_passthrough_is_id() {
let inp = state::tests::gen_random();
let mut res = Passthrough;
let out = res.on_update(&inp).await.unwrap();
assert_eq!(inp, out);
}
#[test]
fn test_passthrough_is_always_ready() {
let inp = State::build().finish();
let mut res = Passthrough;
let mut cx = futures_test::task::panic_context();
if let Poll::Ready(_) = Pin::new(&mut res.on_update(&inp)).poll(&mut cx) {
return;
}
panic!("Passthrough returned Poll::Pending")
}
}

View File

@ -1,19 +1,21 @@
use std::convert::Infallible;
use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
use futures_signals::signal::{Mutable, Signal, SignalExt}; use futures_signals::signal::{Mutable, Signal, SignalExt};
use lmdb::RoTransaction; use lmdb::RoTransaction;
use rkyv::Archived; use rkyv::{Archived, Deserialize};
use rkyv::ser::Serializer;
use rkyv::ser::serializers::AllocSerializer;
use crate::authorization::permissions::PrivilegesBuf; use crate::authorization::permissions::PrivilegesBuf;
use crate::config::MachineDescription; use crate::config::MachineDescription;
use crate::resources::modules::fabaccess::{MachineState, Status}; use crate::db::ArchivedValue;
use crate::resources::modules::fabaccess::{MachineState, Status, ArchivedStatus};
use crate::resources::state::db::StateDB; use crate::resources::state::db::StateDB;
use crate::resources::state::State; use crate::resources::state::State;
use crate::session::SessionHandle; use crate::session::SessionHandle;
use crate::users::UserRef; use crate::users::UserRef;
pub mod claim;
pub mod db; pub mod db;
pub mod driver;
pub mod search; pub mod search;
pub mod state; pub mod state;
@ -23,50 +25,56 @@ pub struct PermissionDenied;
pub(crate) struct Inner { pub(crate) struct Inner {
id: String, id: String,
db: Arc<StateDB>, db: StateDB,
signal: Mutable<State>, signal: Mutable<ArchivedValue<State>>,
desc: MachineDescription, desc: MachineDescription,
} }
impl Inner { impl Inner {
pub fn new(id: String, db: Arc<StateDB>, desc: MachineDescription) -> Self { pub fn new(id: String, db: StateDB, desc: MachineDescription) -> Self {
let state = if let Some(previous) = db.get_output(id.as_bytes()).unwrap() { let state = if let Some(previous) = db.get(id.as_bytes()).unwrap() {
let state = MachineState::from(&previous); tracing::info!(%id, ?previous, "Found previous state");
tracing::info!(%id, ?state, "Found previous state"); previous
state
} else { } else {
tracing::info!(%id, "No previous state, defaulting to `free`"); tracing::info!(%id, "No previous state, defaulting to `free`");
let state = MachineState::used(UserRef::new("test".to_string()), Some(UserRef::new let state = MachineState::used(UserRef::new("test".to_string()), Some(UserRef::new("prev".to_string())));
("prev".to_string())));
let update = state.to_state(); let update = state.to_state();
db.update(id.as_bytes(), &update, &update).unwrap();
state let mut serializer = AllocSerializer::<1024>::default();
serializer.serialize_value(&update).expect("failed to serialize new default state");
let val = ArchivedValue::new(serializer.into_serializer().into_inner());
db.put(&id.as_bytes(), &val).unwrap();
val
}; };
let signal = Mutable::new(state.to_state()); let signal = Mutable::new(state);
Self { id, db, signal, desc } Self { id, db, signal, desc }
} }
pub fn signal(&self) -> impl Signal<Item=State> { pub fn signal(&self) -> impl Signal<Item=ArchivedValue<State>> {
Box::pin(self.signal.signal_cloned().dedupe_cloned()) Box::pin(self.signal.signal_cloned())
} }
fn get_state(&self) -> MachineState { fn get_state(&self) -> ArchivedValue<State> {
MachineState::from(&self.db.get_output(self.id.as_bytes()).unwrap().unwrap()) self.db.get(self.id.as_bytes())
.expect("lmdb error")
.expect("state should never be None")
} }
fn get_raw_state(&self) -> Option<LMDBorrow<RoTransaction, Archived<State>>> { fn get_ref(&self) -> impl Deref<Target=ArchivedValue<State>> + '_ {
self.db.get_output(self.id.as_bytes()).unwrap() self.signal.lock_ref()
} }
fn set_state(&self, state: MachineState) { fn set_state(&self, state: ArchivedValue<State>) {
let span = tracing::debug_span!("set", id = %self.id, ?state, "Updating state"); let span = tracing::debug_span!("set_state", id = %self.id, ?state);
let _guard = span.enter(); let _guard = span.enter();
tracing::debug!("Updating state"); tracing::debug!("Updating state");
tracing::trace!("Updating DB"); tracing::trace!("Updating DB");
let update = state.to_state(); self.db.put(&self.id.as_bytes(), &state).unwrap();
self.db.update(self.id.as_bytes(), &update, &update).unwrap();
tracing::trace!("Updated DB, sending update signal"); tracing::trace!("Updated DB, sending update signal");
self.signal.set(update);
self.signal.set(state);
tracing::trace!("Sent update signal"); tracing::trace!("Sent update signal");
} }
} }
@ -81,11 +89,7 @@ impl Resource {
Self { inner } Self { inner }
} }
pub fn get_raw_state(&self) -> Option<LMDBorrow<RoTransaction, Archived<State>>> { pub fn get_state(&self) -> ArchivedValue<State> {
self.inner.get_raw_state()
}
pub fn get_state(&self) -> MachineState {
self.inner.get_state() self.inner.get_state()
} }
@ -93,7 +97,7 @@ impl Resource {
&self.inner.id &self.inner.id
} }
pub fn get_signal(&self) -> impl Signal<Item=State> { pub fn get_signal(&self) -> impl Signal<Item=ArchivedValue<State>> {
self.inner.signal() self.inner.signal()
} }
@ -102,46 +106,54 @@ impl Resource {
} }
fn set_state(&self, state: MachineState) { fn set_state(&self, state: MachineState) {
self.inner.set_state(state) let mut serializer = AllocSerializer::<1024>::default();
serializer.serialize_value(&state);
let archived = ArchivedValue::new(serializer.into_serializer().into_inner());
self.inner.set_state(archived)
} }
fn set_status(&self, state: Status) { fn set_status(&self, state: Status) {
let old = self.inner.get_state(); let old = self.inner.get_state();
let new = MachineState { state, .. old }; let oldref: &Archived<State> = old.as_ref();
let previous: &Archived<Option<UserRef>> = &oldref.inner.previous;
let previous = Deserialize::<Option<UserRef>, _>::deserialize(previous, &mut rkyv::Infallible)
.expect("Infallible deserializer failed");
let new = MachineState { state, previous };
self.set_state(new); self.set_state(new);
} }
pub async fn try_update(&self, session: SessionHandle, new: Status) { pub async fn try_update(&self, session: SessionHandle, new: Status) {
let old = self.get_state(); let old = self.get_state();
let old: &Archived<State> = old.as_ref();
let user = session.get_user(); let user = session.get_user();
if session.has_manage(self) // Default allow for managers if session.has_manage(self) // Default allow for managers
|| (session.has_write(self) // Decision tree for writers || (session.has_write(self) // Decision tree for writers
&& match (&old.state, &new) { && match (&old.inner.state, &new) {
// Going from available to used by the person requesting is okay. // Going from available to used by the person requesting is okay.
(Status::Free, Status::InUse(who)) (ArchivedStatus::Free, Status::InUse(who))
// Check that the person requesting does not request for somebody else. // Check that the person requesting does not request for somebody else.
// *That* is manage privilege. // *That* is manage privilege.
if who == &user => true, if who == &user => true,
// Reserving things for ourself is okay. // Reserving things for ourself is okay.
(Status::Free, Status::Reserved(whom)) (ArchivedStatus::Free, Status::Reserved(whom))
if &user == whom => true, if &user == whom => true,
// Returning things we've been using is okay. This includes both if // Returning things we've been using is okay. This includes both if
// they're being freed or marked as to be checked. // they're being freed or marked as to be checked.
(Status::InUse(who), Status::Free | Status::ToCheck(_)) (ArchivedStatus::InUse(who), Status::Free | Status::ToCheck(_))
if who == &user => true, if who == &user => true,
// Un-reserving things we reserved is okay // Un-reserving things we reserved is okay
(Status::Reserved(whom), Status::Free) (ArchivedStatus::Reserved(whom), Status::Free)
if whom == &user => true, if whom == &user => true,
// Using things that we've reserved is okay. But the person requesting // Using things that we've reserved is okay. But the person requesting
// that has to be the person that reserved the machine. Otherwise // that has to be the person that reserved the machine. Otherwise
// somebody could make a machine reserved by a different user as used by // somebody could make a machine reserved by a different user as used by
// that different user but use it themself. // that different user but use it themself.
(Status::Reserved(whom), Status::InUse(who)) (ArchivedStatus::Reserved(whom), Status::InUse(who))
if whom == &user && who == whom => true, if whom == &user && who == whom => true,
// Default is deny. // Default is deny.
@ -149,13 +161,13 @@ impl Resource {
}) })
// Default permissions everybody has // Default permissions everybody has
|| match (&old.state, &new) { || match (&old.inner.state, &new) {
// Returning things we've been using is okay. This includes both if // Returning things we've been using is okay. This includes both if
// they're being freed or marked as to be checked. // they're being freed or marked as to be checked.
(Status::InUse(who), Status::Free | Status::ToCheck(_)) if who == &user => true, (ArchivedStatus::InUse(who), Status::Free | Status::ToCheck(_)) if who == &user => true,
// Un-reserving things we reserved is okay // Un-reserving things we reserved is okay
(Status::Reserved(whom), Status::Free) if whom == &user => true, (ArchivedStatus::Reserved(whom), Status::Free) if whom == &user => true,
// Default is deny. // Default is deny.
_ => false, _ => false,
@ -166,11 +178,15 @@ impl Resource {
} }
pub async fn give_back(&self, session: SessionHandle) { pub async fn give_back(&self, session: SessionHandle) {
if let Status::InUse(user) = self.get_state().state { let state = self.get_state();
let s: &Archived<State> = state.as_ref();
let i: &Archived<MachineState> = &s.inner;
unimplemented!();
/*if let Status::InUse(user) = self.get_state().state {
if user == session.get_user() { if user == session.get_user() {
self.set_state(MachineState::free(Some(user))); self.set_state(MachineState::free(Some(user)));
} }
} }*/
} }
pub async fn force_set(&self, new: Status) { pub async fn force_set(&self, new: Status) {
@ -182,13 +198,13 @@ impl Resource {
} }
pub fn is_owned_by(&self, owner: UserRef) -> bool { pub fn is_owned_by(&self, owner: UserRef) -> bool {
match self.get_state().state { match &self.get_state().as_ref().inner.state {
Status::Free | Status::Disabled => false, ArchivedStatus::Free | ArchivedStatus::Disabled => false,
Status::InUse(user) ArchivedStatus::InUse(user)
| Status::ToCheck(user) | ArchivedStatus::ToCheck(user)
| Status::Blocked(user) | ArchivedStatus::Blocked(user)
| Status::Reserved(user) => user == owner, | ArchivedStatus::Reserved(user) => user == &owner,
} }
} }
} }

View File

@ -1,74 +1,52 @@
use std::{ use crate::db;
sync::Arc, use crate::db::{ArchivedValue, RawDB, DB, AlignedAdapter};
path::Path, use lmdb::{
}; DatabaseFlags, Environment, EnvironmentFlags, RoTransaction, RwTransaction, Transaction,
use rkyv::Archived;
use crate::db::{
DB,
Environment,
EnvironmentFlags,
DatabaseFlags,
WriteFlags, WriteFlags,
Adapter,
AllocAdapter,
DBError,
Transaction,
RoTransaction,
RwTransaction,
}; };
use std::{path::Path, sync::Arc};
use crate::resources::state::State; use crate::resources::state::State;
type StateAdapter = AllocAdapter<State>; #[derive(Debug, Clone)]
/// State Database containing the currently set state
#[derive(Debug)]
pub struct StateDB { pub struct StateDB {
/// The environment for all the databases below
env: Arc<Environment>, env: Arc<Environment>,
db: DB<AlignedAdapter<State>>,
input: DB<StateAdapter>,
output: DB<StateAdapter>,
// TODO: Index resources name/id/uuid -> u64
} }
impl StateDB { impl StateDB {
pub fn open_env<P: AsRef<Path>>(path: P) -> lmdb::Result<Arc<Environment>> { pub fn open_env<P: AsRef<Path>>(path: P) -> lmdb::Result<Arc<Environment>> {
Environment::new() Environment::new()
.set_flags( EnvironmentFlags::WRITE_MAP .set_flags(
| EnvironmentFlags::NO_SUB_DIR EnvironmentFlags::WRITE_MAP
| EnvironmentFlags::NO_TLS | EnvironmentFlags::NO_SUB_DIR
| EnvironmentFlags::NO_READAHEAD) | EnvironmentFlags::NO_TLS
.set_max_dbs(4) | EnvironmentFlags::NO_READAHEAD,
)
.open(path.as_ref()) .open(path.as_ref())
.map(Arc::new) .map(Arc::new)
} }
fn new(env: Arc<Environment>, input: DB<StateAdapter>, output: DB<StateAdapter>) -> Self { fn new(env: Arc<Environment>, db: RawDB) -> Self {
Self { env: env, input, output } let db = DB::new(db);
Self { env, db }
}
pub fn open_with_env(env: Arc<Environment>) -> lmdb::Result<Self> {
let db = unsafe { RawDB::open(&env, Some("state"))? };
Ok(Self::new(env, db))
} }
pub fn open<P: AsRef<Path>>(path: P) -> lmdb::Result<Self> { pub fn open<P: AsRef<Path>>(path: P) -> lmdb::Result<Self> {
let env = Self::open_env(path)?; let env = Self::open_env(path)?;
let input = unsafe { DB::open(&env, Some("input"))? }; Self::open_with_env(env)
let output = unsafe { DB::open(&env, Some("output"))? };
Ok(Self::new(env, input, output))
} }
pub fn create_with_env(env: Arc<Environment>) -> lmdb::Result<Self> { pub fn create_with_env(env: Arc<Environment>) -> lmdb::Result<Self> {
let flags = DatabaseFlags::empty(); let flags = DatabaseFlags::empty();
let input = unsafe { DB::create(&env, Some("input"), flags)? }; let db = unsafe { RawDB::create(&env, Some("state"), flags)? };
let output = unsafe { DB::create(&env, Some("output"), flags)? };
Ok(Self::new(env, input, output)) Ok(Self::new(env, db))
} }
pub fn create<P: AsRef<Path>>(path: P) -> lmdb::Result<Self> { pub fn create<P: AsRef<Path>>(path: P) -> lmdb::Result<Self> {
@ -76,46 +54,28 @@ impl StateDB {
Self::create_with_env(env) Self::create_with_env(env)
} }
fn update_txn(&self, txn: &mut RwTransaction, key: impl AsRef<[u8]>, input: &State, output: &State) pub fn begin_ro_txn(&self) -> Result<impl Transaction + '_, db::Error> {
-> Result<(), DBError> self.env.begin_ro_txn()
{ }
pub fn get(&self, key: impl AsRef<[u8]>) -> Result<Option<ArchivedValue<State>>, db::Error> {
let txn = self.env.begin_ro_txn()?;
self.db.get(&txn, &key.as_ref())
}
pub fn get_all<'txn, T: Transaction>(
&self,
txn: &'txn T,
) -> Result<impl IntoIterator<Item = (&'txn [u8], ArchivedValue<State>)>, db::Error, > {
self.db.get_all(txn)
}
pub fn put(&self, key: &impl AsRef<[u8]>, val: &ArchivedValue<State>) -> Result<(), db::Error> {
let mut txn = self.env.begin_rw_txn()?;
let flags = WriteFlags::empty(); let flags = WriteFlags::empty();
let k = key.as_ref(); self.db.put(&mut txn, key, val, flags)?;
self.input.put(txn, &k, input, flags)?; txn.commit()
self.output.put(txn, &k, output, flags)?;
Ok(())
} }
pub fn update(&self, key: impl AsRef<[u8]>, input: &State, output: &State)
-> Result<(), DBError>
{
let mut txn = self.env.begin_rw_txn().map_err(StateAdapter::from_db_err)?;
self.update_txn(&mut txn, key, input, output)?;
txn.commit().map_err(StateAdapter::from_db_err)
}
fn get(&self, db: &DB<StateAdapter>, key: impl AsRef<[u8]>)
-> Result<Option<LMDBorrow<RoTransaction, Archived<State>>>, DBError>
{
let txn = self.env.begin_ro_txn().map_err(StateAdapter::from_db_err)?;
if let Some(state) = db.get(&txn, &key.as_ref())? {
let ptr = state.into();
Ok(Some(unsafe { LMDBorrow::new(ptr, txn) }))
} else {
Ok(None)
}
}
#[inline(always)]
pub fn get_input(&self, key: impl AsRef<[u8]>)
-> Result<Option<LMDBorrow<RoTransaction, Archived<State>>>, DBError>
{ self.get(&self.input, key) }
#[inline(always)]
pub fn get_output(&self, key: impl AsRef<[u8]>)
-> Result<Option<LMDBorrow<RoTransaction, Archived<State>>>, DBError>
{ self.get(&self.output, key) }
} }
#[cfg(test)] #[cfg(test)]
@ -123,7 +83,7 @@ mod tests {
use super::*; use super::*;
use crate::resource::state::value::Vec3u8; use crate::resource::state::value::Vec3u8;
use crate::resource::state::value::{OID_COLOUR, OID_POWERED, OID_INTENSITY}; use crate::resource::state::value::{OID_COLOUR, OID_INTENSITY, OID_POWERED};
use std::ops::Deref; use std::ops::Deref;
#[test] #[test]
@ -133,14 +93,14 @@ mod tests {
tmppath.push("db"); tmppath.push("db");
let db = StateDB::create(tmppath).unwrap(); let db = StateDB::create(tmppath).unwrap();
let b = State::build() let b = State::build()
.add(OID_COLOUR.clone(), Box::new(Vec3u8 { a: 1, b: 2, c: 3})) .add(OID_COLOUR.clone(), Box::new(Vec3u8 { a: 1, b: 2, c: 3 }))
.add(OID_POWERED.clone(), Box::new(true)) .add(OID_POWERED.clone(), Box::new(true))
.add(OID_INTENSITY.clone(), Box::new(1023)) .add(OID_INTENSITY.clone(), Box::new(1023))
.finish(); .finish();
println!("({}) {:?}", b.hash(), b); println!("({}) {:?}", b.hash(), b);
let c = State::build() let c = State::build()
.add(OID_COLOUR.clone(), Box::new(Vec3u8 { a: 1, b: 2, c: 3})) .add(OID_COLOUR.clone(), Box::new(Vec3u8 { a: 1, b: 2, c: 3 }))
.add(OID_POWERED.clone(), Box::new(true)) .add(OID_POWERED.clone(), Box::new(true))
.add(OID_INTENSITY.clone(), Box::new(1023)) .add(OID_INTENSITY.clone(), Box::new(1023))
.finish(); .finish();

View File

@ -4,15 +4,12 @@ use std::{
Hasher Hasher
}, },
}; };
use std::fmt::Formatter; use std::fmt::{Debug, Display, Formatter};
use std::marker::PhantomData;
use std::ops::Deref; use std::ops::Deref;
use std::sync::Arc;
use rkyv::{ use rkyv::{AlignedVec, Archive, Archived, Deserialize, out_field, Serialize};
Archive,
Deserialize,
out_field,
Serialize,
};
use serde::de::{Error, MapAccess, Unexpected}; use serde::de::{Error, MapAccess, Unexpected};
use serde::Deserializer; use serde::Deserializer;
use serde::ser::SerializeMap; use serde::ser::SerializeMap;

View File

@ -1,37 +0,0 @@
use std::sync::Arc;
use lmdb::{DatabaseFlags, Environment};
use rkyv::{Archive, Serialize, Deserialize};
use crate::db::{AllocAdapter, DB, RawDB};
use crate::users::UserRef;
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Archive, Serialize, Deserialize)]
pub struct Session {
userid: UserRef,
}
type Adapter = AllocAdapter<Session>;
pub struct SessionCache {
env: Arc<Environment>,
db: DB<Adapter>,
}
impl SessionCache {
pub unsafe fn new(env: Arc<Environment>, db: RawDB) -> Self {
let db = DB::new_unchecked(db);
Self { env, db }
}
pub unsafe fn open(env: Arc<Environment>) -> lmdb::Result<Self> {
let db = RawDB::open(&env, Some("sessions"))?;
Ok(Self::new(env, db))
}
pub unsafe fn create(env: Arc<Environment>) -> lmdb::Result<Self> {
let flags = DatabaseFlags::empty();
let db = RawDB::create(&env, Some("sessions"), flags)?;
Ok(Self::new(env, db))
}
}

View File

@ -5,13 +5,9 @@
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use crate::authorization::roles::{Roles}; use crate::authorization::roles::{Roles};
use crate::resources::Resource; use crate::resources::Resource;
use crate::session::db::SessionCache;
use crate::Users; use crate::Users;
use crate::users::UserRef; use crate::users::UserRef;
mod db;
static SESSION_CACHE: OnceCell<SessionCache> = OnceCell::new();
#[derive(Clone)] #[derive(Clone)]
pub struct SessionManager { pub struct SessionManager {

View File

@ -1,12 +1,15 @@
use crate::db::{AllocAdapter, Environment, RawDB, Result, DB}; use lmdb::{DatabaseFlags, Environment, Transaction, WriteFlags};
use crate::db::{DatabaseFlags, RoTransaction, WriteFlags};
use lmdb::{Transaction};
use std::collections::{HashMap}; use std::collections::{HashMap};
use rkyv::Infallible;
use std::sync::Arc; use std::sync::Arc;
use anyhow::Context; use anyhow::Context;
use rkyv::{Archived, Deserialize}; use rkyv::{Archived, Deserialize};
use rkyv::ser::Serializer;
use rkyv::ser::serializers::AllocSerializer;
use crate::db;
use crate::db::{AlignedAdapter, ArchivedValue, DB, RawDB};
#[derive( #[derive(
Clone, Clone,
@ -72,58 +75,55 @@ impl UserData {
} }
} }
type Adapter = AllocAdapter<User>;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct UserDB { pub struct UserDB {
env: Arc<Environment>, env: Arc<Environment>,
db: DB<Adapter>, db: DB<AlignedAdapter<User>>,
} }
impl UserDB { impl UserDB {
pub unsafe fn new(env: Arc<Environment>, db: RawDB) -> Self { pub unsafe fn new(env: Arc<Environment>, db: RawDB) -> Self {
let db = DB::new_unchecked(db); let db = DB::new(db);
Self { env, db } Self { env, db }
} }
pub unsafe fn open(env: Arc<Environment>) -> Result<Self> { pub unsafe fn open(env: Arc<Environment>) -> Result<Self, db::Error> {
let db = RawDB::open(&env, Some("user"))?; let db = RawDB::open(&env, Some("user"))?;
Ok(Self::new(env, db)) Ok(Self::new(env, db))
} }
pub unsafe fn create(env: Arc<Environment>) -> Result<Self> { pub unsafe fn create(env: Arc<Environment>) -> Result<Self, db::Error> {
let flags = DatabaseFlags::empty(); let flags = DatabaseFlags::empty();
let db = RawDB::create(&env, Some("user"), flags)?; let db = RawDB::create(&env, Some("user"), flags)?;
Ok(Self::new(env, db)) Ok(Self::new(env, db))
} }
pub fn get(&self, uid: &str) -> Result<Option<LMDBorrow<RoTransaction, Archived<User>>>> { pub fn get(&self, uid: &str) -> Result<Option<ArchivedValue<User>>, db::Error> {
let txn = self.env.begin_ro_txn()?; let txn = self.env.begin_ro_txn()?;
if let Some(state) = self.db.get(&txn, &uid.as_bytes())? { self.db.get(&txn, &uid.as_bytes())
let ptr = state.into();
Ok(Some(unsafe { LMDBorrow::new(ptr, txn) }))
} else {
Ok(None)
}
} }
pub fn put(&self, uid: &str, user: &User) -> Result<()> { pub fn put(&self, uid: &str, user: &User) -> Result<(), db::Error> {
let mut serializer = AllocSerializer::<1024>::default();
let pos = serializer.serialize_value(user).expect("rkyv error");
assert_eq!(pos, 0);
let v = serializer.into_serializer().into_inner();
let value = ArchivedValue::new(v);
let mut txn = self.env.begin_rw_txn()?; let mut txn = self.env.begin_rw_txn()?;
let flags = WriteFlags::empty(); let flags = WriteFlags::empty();
self.db.put(&mut txn, &uid.as_bytes(), user, flags)?; self.db.put(&mut txn, &uid.as_bytes(), &value, flags)?;
txn.commit()?; txn.commit()?;
Ok(()) Ok(())
} }
pub fn get_all(&self) -> Result<Vec<(String, User)>> { pub fn get_all(&self) -> Result<Vec<(String, User)>, db::Error> {
let txn = self.env.begin_ro_txn()?; let txn = self.env.begin_ro_txn()?;
let mut cursor = self.db.open_ro_cursor(&txn)?; let mut iter = self.db.get_all(&txn)?;
let iter = cursor.iter_start();
let mut out = Vec::new(); let mut out = Vec::new();
let mut deserializer = rkyv::Infallible; for (uid, user) in iter {
for user in iter {
let (uid, user) = user?;
let uid = unsafe { std::str::from_utf8_unchecked(uid).to_string() }; let uid = unsafe { std::str::from_utf8_unchecked(uid).to_string() };
let user: User = user.deserialize(&mut deserializer).unwrap(); let user: User = Deserialize::<User, _>::deserialize(user.as_ref(), &mut Infallible).unwrap();
out.push((uid, user)); out.push((uid, user));
} }

View File

@ -43,7 +43,18 @@ use crate::UserDB;
)] )]
#[archive_attr(derive(Debug, PartialEq))] #[archive_attr(derive(Debug, PartialEq))]
pub struct UserRef { pub struct UserRef {
id: String, pub id: String,
}
impl PartialEq<ArchivedUserRef> for UserRef {
fn eq(&self, other: &ArchivedUserRef) -> bool {
self.id == other.id
}
}
impl PartialEq<UserRef> for ArchivedUserRef {
fn eq(&self, other: &UserRef) -> bool {
self.id == other.id
}
} }
impl UserRef { impl UserRef {
@ -88,7 +99,7 @@ impl Users {
self.userdb self.userdb
.get(uid) .get(uid)
.unwrap() .unwrap()
.map(|user| Deserialize::<db::User, _>::deserialize(user.deref(), &mut Infallible).unwrap()) .map(|user| Deserialize::<db::User, _>::deserialize(user.as_ref(), &mut Infallible).unwrap())
} }
pub fn load_file<P: AsRef<Path>>(&self, path: P) -> anyhow::Result<()> { pub fn load_file<P: AsRef<Path>>(&self, path: P) -> anyhow::Result<()> {

View File

@ -1,5 +1,4 @@
use clap::{Arg, Command}; use clap::{Arg, Command};
use diflouroborane::db::Dump;
use diflouroborane::{config, Diflouroborane}; use diflouroborane::{config, Diflouroborane};
@ -112,9 +111,7 @@ fn main() -> anyhow::Result<()> {
let mut config = config::read(&PathBuf::from_str(configpath).unwrap()).unwrap(); let mut config = config::read(&PathBuf::from_str(configpath).unwrap()).unwrap();
if matches.is_present("dump") { if matches.is_present("dump") {
let bffh = Diflouroborane::new(config)?; unimplemented!()
let dump = Dump::new(bffh.users, bffh.resources)?;
println!("{:?}", dump);
} else if matches.is_present("load") { } else if matches.is_present("load") {
let bffh = Diflouroborane::new(config)?; let bffh = Diflouroborane::new(config)?;
bffh.users.load_file(matches.value_of("load").unwrap()); bffh.users.load_file(matches.value_of("load").unwrap());