From 2b7044d498fc2e5b3cf4afe8051d88c162550098 Mon Sep 17 00:00:00 2001 From: Nadja Reitzenstein Date: Wed, 16 Mar 2022 18:10:59 +0100 Subject: [PATCH] Compile with new DB system --- bffhd/actors/dummy.rs | 4 +- bffhd/actors/mod.rs | 11 +-- bffhd/actors/process.rs | 28 ++++--- bffhd/actors/shelly.rs | 10 ++- bffhd/db/mod.rs | 56 +------------ bffhd/db/raw.rs | 2 +- bffhd/db/typed.rs | 152 +++++++++++++++++++++++++++++++++++ bffhd/error.rs | 4 +- bffhd/initiators/mod.rs | 1 - bffhd/lib.rs | 8 +- bffhd/resources/claim.rs | 1 - bffhd/resources/driver.rs | 141 -------------------------------- bffhd/resources/mod.rs | 122 ++++++++++++++++------------ bffhd/resources/state/db.rs | 134 +++++++++++------------------- bffhd/resources/state/mod.rs | 11 +-- bffhd/session/db.rs | 37 --------- bffhd/session/mod.rs | 4 - bffhd/users/db.rs | 48 +++++------ bffhd/users/mod.rs | 15 +++- bin/bffhd/main.rs | 5 +- 20 files changed, 348 insertions(+), 446 deletions(-) create mode 100644 bffhd/db/typed.rs delete mode 100644 bffhd/resources/driver.rs delete mode 100644 bffhd/session/db.rs diff --git a/bffhd/actors/dummy.rs b/bffhd/actors/dummy.rs index 20e100d..4cf19e2 100644 --- a/bffhd/actors/dummy.rs +++ b/bffhd/actors/dummy.rs @@ -1,7 +1,9 @@ use std::collections::HashMap; use futures_util::future; use futures_util::future::BoxFuture; +use rkyv::Archived; use crate::actors::Actor; +use crate::db::ArchivedValue; use crate::resources::state::State; pub struct Dummy { @@ -16,7 +18,7 @@ impl Dummy { } impl Actor for Dummy { - fn apply(&mut self, state: State) -> BoxFuture<'static, ()> { + fn apply(&mut self, state: ArchivedValue) -> BoxFuture<'static, ()> { tracing::info!(name=%self.name, params=?self.params, ?state, "dummy actor updating state"); Box::pin(future::ready(())) } diff --git a/bffhd/actors/mod.rs b/bffhd/actors/mod.rs index 791b882..c52c2e7 100644 --- a/bffhd/actors/mod.rs +++ b/bffhd/actors/mod.rs @@ -20,17 +20,14 @@ use rustls::{RootCertStore}; use url::Url; use crate::actors::dummy::Dummy; use crate::actors::process::Process; +use crate::db::ArchivedValue; mod shelly; mod process; mod dummy; pub trait Actor { - fn apply(&mut self, state: State) -> BoxFuture<'static, ()>; -} - -fn loader>(cell: &Cell>) -> Option { - cell.take() + fn apply(&mut self, state: ArchivedValue) -> BoxFuture<'static, ()>; } pub struct ActorDriver { @@ -40,7 +37,7 @@ pub struct ActorDriver { future: Option>, } -impl> ActorDriver { +impl>> ActorDriver { pub fn new(signal: S, actor: Box) -> Self { Self { signal, @@ -52,7 +49,7 @@ impl> ActorDriver { impl Future for ActorDriver where - S: Signal + Unpin + Send, + S: Signal> + Unpin + Send, { type Output = (); diff --git a/bffhd/actors/process.rs b/bffhd/actors/process.rs index 066d3d4..839c355 100644 --- a/bffhd/actors/process.rs +++ b/bffhd/actors/process.rs @@ -1,8 +1,10 @@ use std::collections::HashMap; use std::process::{Command, Stdio}; use futures_util::future::BoxFuture; +use rkyv::Archived; use crate::actors::Actor; -use crate::resources::modules::fabaccess::Status; +use crate::db::ArchivedValue; +use crate::resources::modules::fabaccess::ArchivedStatus; use crate::resources::state::State; pub struct Process { @@ -29,7 +31,7 @@ impl Process { } impl Actor for Process { - fn apply(&mut self, state: State) -> BoxFuture<'static, ()> { + fn apply(&mut self, state: ArchivedValue) -> BoxFuture<'static, ()> { tracing::debug!(name=%self.name, cmd=%self.cmd, ?state, "Process actor updating state"); let mut command = Command::new(&self.cmd); @@ -38,25 +40,25 @@ impl Actor for Process { .args(self.args.iter()) .arg(&self.name); - match state.inner.state { - Status::Free => { + match &state.as_ref().inner.state { + ArchivedStatus::Free => { command.arg("free"); } - Status::InUse(ref by) => { - command.arg("inuse").arg(format!("{}", by.get_username())); + ArchivedStatus::InUse(by) => { + command.arg("inuse").arg(by.id.as_str()); } - Status::ToCheck(ref by) => { + ArchivedStatus::ToCheck(by) => { command.arg("tocheck") - .arg(format!("{}", by.get_username())); + .arg(by.id.as_str()); } - Status::Blocked(ref by) => { + ArchivedStatus::Blocked(by) => { command.arg("blocked") - .arg(format!("{}", by.get_username())); + .arg(by.id.as_str()); } - Status::Disabled => { command.arg("disabled"); }, - Status::Reserved(ref by) => { + ArchivedStatus::Disabled => { command.arg("disabled"); }, + ArchivedStatus::Reserved(by) => { command.arg("reserved") - .arg(format!("{}", by.get_username())); + .arg(by.id.as_str()); } } diff --git a/bffhd/actors/shelly.rs b/bffhd/actors/shelly.rs index 218fcd6..3a52175 100644 --- a/bffhd/actors/shelly.rs +++ b/bffhd/actors/shelly.rs @@ -1,8 +1,10 @@ use std::collections::HashMap; use futures_util::future::BoxFuture; +use rkyv::Archived; use rumqttc::{AsyncClient, QoS}; use crate::actors::Actor; -use crate::resources::modules::fabaccess::Status; +use crate::db::ArchivedValue; +use crate::resources::modules::fabaccess::ArchivedStatus; use crate::resources::state::State; /// An actuator for a Shellie connected listening on one MQTT broker @@ -38,12 +40,12 @@ impl Shelly { impl Actor for Shelly { - fn apply(&mut self, state: State) -> BoxFuture<'static, ()> { + fn apply(&mut self, state: ArchivedValue) -> BoxFuture<'static, ()> { tracing::debug!(?state, name=%self.name, "Shelly changing state" ); - let pl = match state.inner.state { - Status::InUse(_) => "on", + let pl = match state.as_ref().inner.state { + ArchivedStatus::InUse(_) => "on", _ => "off", }; diff --git a/bffhd/db/mod.rs b/bffhd/db/mod.rs index 4177614..c8bc6b2 100644 --- a/bffhd/db/mod.rs +++ b/bffhd/db/mod.rs @@ -1,57 +1,7 @@ -use std::marker::PhantomData; - -pub use lmdb::{ - Environment, - - DatabaseFlags, - WriteFlags, - EnvironmentFlags, - - Transaction, - RoTransaction, - RwTransaction, -}; - -use rkyv::{Fallible, Serialize, ser::serializers::AllocSerializer, AlignedVec}; - mod raw; pub use raw::RawDB; -use lmdb::Error; -use rkyv::Deserialize; -use rkyv::ser::serializers::AlignedSerializer; +mod typed; +pub use typed::{DB, ArchivedValue, Adapter, AlignedAdapter}; - -use crate::users::db::{User}; -use std::collections::HashMap; -use std::fmt::{Display, Formatter}; -use rkyv::Infallible; -use crate::resources::state::{State}; -use std::iter::FromIterator; -use std::ops::Deref; -use crate::resources::search::ResourcesHandle; - - -use crate::Users; - -#[derive(Debug, serde::Serialize)] -pub struct Dump { - users: HashMap, - states: HashMap, -} - -impl Dump { - pub fn new(userdb: Users, resources: ResourcesHandle) -> Result { - let users = HashMap::from_iter(userdb.into_inner().get_all()?.into_iter()); - let mut states = HashMap::new(); - for resource in resources.list_all().into_iter() { - if let Some(output) = resource.get_raw_state() { - let output: State = Deserialize::::deserialize(output.deref(), &mut Infallible).unwrap(); - let old = states.insert(resource.get_id().to_string(), output); - assert!(old.is_none()); - } - } - - Ok(Self { users, states }) - } -} \ No newline at end of file +pub type Error = lmdb::Error; \ No newline at end of file diff --git a/bffhd/db/raw.rs b/bffhd/db/raw.rs index 64d8105..6302ed1 100644 --- a/bffhd/db/raw.rs +++ b/bffhd/db/raw.rs @@ -38,7 +38,7 @@ impl RawDB { txn.put(self.db, key, value, flags) } - pub fn reserve<'txn, K>(&self, txn: &'txn mut RwTransaction, key: &K, size: usize, flags: WriteFlags) + pub fn reserve<'txn, K>(&self, txn: &'txn mut RwTransaction, key: &K, size: usize, flags: WriteFlags) -> lmdb::Result<&'txn mut [u8]> where K: AsRef<[u8]> { diff --git a/bffhd/db/typed.rs b/bffhd/db/typed.rs new file mode 100644 index 0000000..984a031 --- /dev/null +++ b/bffhd/db/typed.rs @@ -0,0 +1,152 @@ +use crate::db::RawDB; +use lmdb::{Cursor, RwTransaction, Transaction, WriteFlags}; +use rkyv::{AlignedVec, Archive, Archived, Serialize}; +use std::fmt; +use std::fmt::{Debug, Display, Formatter}; +use std::marker::PhantomData; +use std::pin::Pin; +use crate::db; + +#[derive(Clone)] +/// Packed, sendable resource state +pub struct ArchivedValue { + /// State is encoded using rkyv making it trivially serializable + data: AlignedVec, + _marker: PhantomData, +} +impl ArchivedValue { + pub fn new(data: AlignedVec) -> Self { + Self { + data, + _marker: PhantomData, + } + } + pub fn build(data: &[u8]) -> Self { + let mut v = AlignedVec::with_capacity(data.len()); + v.extend_from_slice(data); + Self::new(v) + } + + pub fn as_mut(&mut self) -> &mut AlignedVec { + &mut self.data + } + + pub fn as_slice(&self) -> &[u8] { + self.data.as_slice() + } + pub fn as_mut_slice(&mut self) -> &mut [u8] { + self.data.as_mut_slice() + } +} +impl AsRef> for ArchivedValue { + fn as_ref(&self) -> &Archived { + unsafe { rkyv::archived_root::(self.as_slice()) } + } +} + +// +// Debug implementation shows wrapping SendState +// +impl Debug for ArchivedValue +where + ::Archived: Debug, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_tuple("SendState").field(self.as_ref()).finish() + } +} + +// +// Display implementation hides wrapping SendState +// +impl Display for ArchivedValue +where + ::Archived: Display, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + Display::fmt(self.as_ref(), f) + } +} + +/// Adapter trait handling de-/serialization +/// +/// Values must be read from raw, unaligned byte buffers provided by LMDB. +pub trait Adapter { + type Item; + + /// Decode data from a short-lived byte buffer into a durable format + fn decode(data: &[u8]) -> Self::Item; + + fn encoded_len(item: &Self::Item) -> usize; + fn encode_into(item: &Self::Item, buf: &mut [u8]); +} + +#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Debug)] +pub struct AlignedAdapter(PhantomData); +impl Adapter for AlignedAdapter { + type Item = ArchivedValue; + + fn decode(data: &[u8]) -> Self::Item { + ArchivedValue::build(data) + } + + fn encoded_len(item: &Self::Item) -> usize { + item.as_slice().len() + } + + fn encode_into(item: &Self::Item, buf: &mut [u8]) { + buf.copy_from_slice(item.as_slice()) + } +} + +#[derive(Debug, Clone)] +#[repr(transparent)] +/// `Typed` database, allowing storing a typed value +/// +/// Values must be serialized into and deserialized from raw byte buffers. +/// This is handled by a stateless [Adapter] given by the type parameter `A` +pub struct DB { + db: RawDB, + _marker: PhantomData, +} +impl DB { + pub fn new(db: RawDB) -> Self { + Self { + db, + _marker: PhantomData, + } + } +} + +impl DB { + pub fn get(&self, txn: &T, key: &impl AsRef<[u8]>) -> Result, db::Error> { + Ok(self.db.get(txn, key)?.map(A::decode)) + } + + pub fn put( + &self, + txn: &mut RwTransaction, + key: &impl AsRef<[u8]>, + value: &A::Item, + flags: WriteFlags, + ) -> Result<(), db::Error> + { + let len = A::encoded_len(value); + let buf = self.db.reserve(txn, key, len, flags)?; + assert_eq!(buf.len(), len, "Reserved buffer is not of requested size!"); + A::encode_into(value, buf); + Ok(()) + } + + pub fn del(&self, txn: &mut RwTransaction, key: &impl AsRef<[u8]>) -> Result<(), db::Error> { + self.db.del::<_, &[u8]>(txn, key, None) + } + + pub fn get_all<'txn, T: Transaction>(&self, txn: &'txn T) -> Result, db::Error> { + let mut cursor = self.db.open_ro_cursor(txn)?; + let it = cursor.iter_start(); + Ok(it.filter_map(|buf| buf.ok().map(|(kbuf,vbuf)| { + (kbuf, A::decode(vbuf)) + }))) + } +} \ No newline at end of file diff --git a/bffhd/error.rs b/bffhd/error.rs index a7a2ae8..ba6a5ff 100644 --- a/bffhd/error.rs +++ b/bffhd/error.rs @@ -1,7 +1,9 @@ use std::io; use std::fmt; use rsasl::error::SessionError; -use crate::db::DBError; +use crate::db; + +type DBError = db::Error; #[derive(Debug)] /// Shared error type diff --git a/bffhd/initiators/mod.rs b/bffhd/initiators/mod.rs index 269e8ac..59d58e7 100644 --- a/bffhd/initiators/mod.rs +++ b/bffhd/initiators/mod.rs @@ -5,7 +5,6 @@ use async_channel as channel; use async_oneshot as oneshot; use futures_signals::signal::Signal; use futures_util::future::BoxFuture; -use crate::resources::driver::{Error, Update}; use crate::resources::claim::{ResourceID, UserID}; use crate::resources::state::State; diff --git a/bffhd/lib.rs b/bffhd/lib.rs index 05c0136..ce2dd2e 100644 --- a/bffhd/lib.rs +++ b/bffhd/lib.rs @@ -25,8 +25,6 @@ pub mod resources; pub mod actors; -pub mod initiators; - pub mod sensors; pub mod capnp; @@ -71,7 +69,7 @@ pub const RELEASE_STRING: &'static str = env!("BFFHD_RELEASE_STRING"); pub struct Diflouroborane { config: Config, executor: Executor<'static>, - pub statedb: Arc, + pub statedb: StateDB, pub users: Users, pub roles: Roles, pub resources: ResourcesHandle, @@ -90,8 +88,8 @@ impl Diflouroborane { let executor = Executor::new(); let env = StateDB::open_env(&config.db_path)?; - let statedb = Arc::new(StateDB::create_with_env(env.clone()) - .context("Failed to open state DB file")?); + let statedb = StateDB::create_with_env(env.clone()) + .context("Failed to open state DB file")?; let users = Users::new(env.clone()).context("Failed to open users DB file")?; let roles = Roles::new(config.roles.clone()); diff --git a/bffhd/resources/claim.rs b/bffhd/resources/claim.rs index 4fe699f..20b9fd9 100644 --- a/bffhd/resources/claim.rs +++ b/bffhd/resources/claim.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use async_channel::Sender; use lmdb::Environment; -use crate::resources::driver::Update; #[derive(Clone, Debug)] /// Database of currently valid claims, interests and notify, as far as applicable diff --git a/bffhd/resources/driver.rs b/bffhd/resources/driver.rs deleted file mode 100644 index 3e188b7..0000000 --- a/bffhd/resources/driver.rs +++ /dev/null @@ -1,141 +0,0 @@ -use std::fmt::Debug; -use async_trait::async_trait; - -use futures_signals::signal::Mutable; -use async_oneshot::Sender; -use async_channel::Receiver; -use crate::resources::state::db::StateDB; - -use super::state::State; - -/// A resources in BFFH has to contain several different parts; -/// - Currently set state -/// - Execution state of attached actors (⇒ BFFH's job) -/// - Output of interal logic of a resources -/// ⇒ Resource logic gets read access to set state and write access to output state. -/// ⇒ state `update` happens via resources logic. This logic should do access control. If the update -/// succeeds then BFFH stores those input parameters ("set" state) and results / output state. -/// Storing input parameters is relevant so that BFFH can know that an "update" is a no-op -/// without having to run the module code. -/// ⇒ in fact actors only really care about the output state, and shouldn't (need to) see "set" -/// state. -/// ⇒ example reserving: -/// - Claimant sends 'update' message with a new state -/// - Doesn't set the state until `update` has returned Ok. -/// - This runs the `update` function with that new state and the claimants user context returning -/// either an Ok or an Error. -/// - Error is returned to Claimant to show user, stop. -/// - On ok: -/// - Commit new "set" state, storing it and making it visible to all other claimants -/// - Commit new output state, storing it and notifying all connected actors / Notify -/// ⇒ BFFHs job in this whole ordeal is: -/// - Message passing primitives so that update message are queued -/// - As reliable as possible storage system for input and output state -/// - Again message passing so that updates are broadcasted to all Notify and Actors. -/// ⇒ Resource module's job is: -/// - Validating updates semantically i.e. are the types correct -/// - Check authorization of updates i.e. is this user allowed to do that -#[async_trait] -pub trait ResourceModel: Debug { - /// Run whatever internal logic this resources has for the given State update, and return the - /// new output state that this update produces. - async fn on_update(&mut self, input: &State) -> Result; - async fn shutdown(&mut self); -} - -#[derive(Debug)] -pub struct Passthrough; -#[async_trait] -impl ResourceModel for Passthrough { - async fn on_update(&mut self, input: &State) -> Result { - Ok(input.clone()) - } - - async fn shutdown(&mut self) {} -} - -/// Error type a resources implementation can produce -#[derive(Debug)] -pub enum Error { - Internal(Box), - Denied, -} - -// TODO: more message context -#[derive(Debug)] -pub struct Update { - pub state: State, - pub errchan: Sender, -} - -#[derive(Debug)] -pub struct ResourceDriver { - // putput - res: Box, - - // input - rx: Receiver, - - // output - db: StateDB, - key: String, - - signal: Mutable, -} - -impl ResourceDriver { - pub async fn drive_to_end(&mut self) { - while let Ok(update) = self.rx.recv().await { - let state = update.state; - let mut errchan = update.errchan; - - match self.res.on_update(&state).await { - Ok(outstate) => { - // FIXME: Send any error here to some global error collector. A failed write to - // the DB is not necessarily fatal, but it means that BFFH is now in an - // inconsistent state until a future update succeeds with writing to the DB. - // Not applying the new state isn't correct either since we don't know what the - // internal logic of the resources has done to make this happen. - // Another half right solution is to unwrap and recreate everything. - // "Best" solution would be to tell the resources to rollback their interal - // changes on a fatal failure and then notify the Claimant, while simply trying - // again for temporary failures. - let _ = self.db.update(self.key.as_bytes(), &state, &outstate); - self.signal.set(outstate); - }, - Err(e) => { - let _ = errchan.send(e); - } - } - } - } -} - -#[cfg(test)] -mod tests { - use std::pin::Pin; - use std::task::Poll; - use std::future::Future; - use super::*; - - #[futures_test::test] - async fn test_passthrough_is_id() { - let inp = state::tests::gen_random(); - - let mut res = Passthrough; - let out = res.on_update(&inp).await.unwrap(); - assert_eq!(inp, out); - } - - #[test] - fn test_passthrough_is_always_ready() { - let inp = State::build().finish(); - - let mut res = Passthrough; - let mut cx = futures_test::task::panic_context(); - if let Poll::Ready(_) = Pin::new(&mut res.on_update(&inp)).poll(&mut cx) { - return; - } - panic!("Passthrough returned Poll::Pending") - } -} diff --git a/bffhd/resources/mod.rs b/bffhd/resources/mod.rs index 987a537..8784fd1 100644 --- a/bffhd/resources/mod.rs +++ b/bffhd/resources/mod.rs @@ -1,19 +1,21 @@ - +use std::convert::Infallible; +use std::ops::Deref; use std::sync::Arc; use futures_signals::signal::{Mutable, Signal, SignalExt}; use lmdb::RoTransaction; -use rkyv::Archived; +use rkyv::{Archived, Deserialize}; +use rkyv::ser::Serializer; +use rkyv::ser::serializers::AllocSerializer; use crate::authorization::permissions::PrivilegesBuf; use crate::config::MachineDescription; -use crate::resources::modules::fabaccess::{MachineState, Status}; +use crate::db::ArchivedValue; +use crate::resources::modules::fabaccess::{MachineState, Status, ArchivedStatus}; use crate::resources::state::db::StateDB; use crate::resources::state::State; use crate::session::SessionHandle; use crate::users::UserRef; -pub mod claim; pub mod db; -pub mod driver; pub mod search; pub mod state; @@ -23,50 +25,56 @@ pub struct PermissionDenied; pub(crate) struct Inner { id: String, - db: Arc, - signal: Mutable, + db: StateDB, + signal: Mutable>, desc: MachineDescription, } impl Inner { - pub fn new(id: String, db: Arc, desc: MachineDescription) -> Self { - let state = if let Some(previous) = db.get_output(id.as_bytes()).unwrap() { - let state = MachineState::from(&previous); - tracing::info!(%id, ?state, "Found previous state"); - state + pub fn new(id: String, db: StateDB, desc: MachineDescription) -> Self { + let state = if let Some(previous) = db.get(id.as_bytes()).unwrap() { + tracing::info!(%id, ?previous, "Found previous state"); + previous } else { tracing::info!(%id, "No previous state, defaulting to `free`"); - let state = MachineState::used(UserRef::new("test".to_string()), Some(UserRef::new - ("prev".to_string()))); + let state = MachineState::used(UserRef::new("test".to_string()), Some(UserRef::new("prev".to_string()))); + let update = state.to_state(); - db.update(id.as_bytes(), &update, &update).unwrap(); - state + + let mut serializer = AllocSerializer::<1024>::default(); + serializer.serialize_value(&update).expect("failed to serialize new default state"); + let val = ArchivedValue::new(serializer.into_serializer().into_inner()); + db.put(&id.as_bytes(), &val).unwrap(); + val }; - let signal = Mutable::new(state.to_state()); + let signal = Mutable::new(state); Self { id, db, signal, desc } } - pub fn signal(&self) -> impl Signal { - Box::pin(self.signal.signal_cloned().dedupe_cloned()) + pub fn signal(&self) -> impl Signal> { + Box::pin(self.signal.signal_cloned()) } - fn get_state(&self) -> MachineState { - MachineState::from(&self.db.get_output(self.id.as_bytes()).unwrap().unwrap()) + fn get_state(&self) -> ArchivedValue { + self.db.get(self.id.as_bytes()) + .expect("lmdb error") + .expect("state should never be None") } - fn get_raw_state(&self) -> Option>> { - self.db.get_output(self.id.as_bytes()).unwrap() + fn get_ref(&self) -> impl Deref> + '_ { + self.signal.lock_ref() } - fn set_state(&self, state: MachineState) { - let span = tracing::debug_span!("set", id = %self.id, ?state, "Updating state"); + fn set_state(&self, state: ArchivedValue) { + let span = tracing::debug_span!("set_state", id = %self.id, ?state); let _guard = span.enter(); tracing::debug!("Updating state"); + tracing::trace!("Updating DB"); - let update = state.to_state(); - self.db.update(self.id.as_bytes(), &update, &update).unwrap(); + self.db.put(&self.id.as_bytes(), &state).unwrap(); tracing::trace!("Updated DB, sending update signal"); - self.signal.set(update); + + self.signal.set(state); tracing::trace!("Sent update signal"); } } @@ -81,11 +89,7 @@ impl Resource { Self { inner } } - pub fn get_raw_state(&self) -> Option>> { - self.inner.get_raw_state() - } - - pub fn get_state(&self) -> MachineState { + pub fn get_state(&self) -> ArchivedValue { self.inner.get_state() } @@ -93,7 +97,7 @@ impl Resource { &self.inner.id } - pub fn get_signal(&self) -> impl Signal { + pub fn get_signal(&self) -> impl Signal> { self.inner.signal() } @@ -102,46 +106,54 @@ impl Resource { } fn set_state(&self, state: MachineState) { - self.inner.set_state(state) + let mut serializer = AllocSerializer::<1024>::default(); + serializer.serialize_value(&state); + let archived = ArchivedValue::new(serializer.into_serializer().into_inner()); + self.inner.set_state(archived) } fn set_status(&self, state: Status) { let old = self.inner.get_state(); - let new = MachineState { state, .. old }; + let oldref: &Archived = old.as_ref(); + let previous: &Archived> = &oldref.inner.previous; + let previous = Deserialize::, _>::deserialize(previous, &mut rkyv::Infallible) + .expect("Infallible deserializer failed"); + let new = MachineState { state, previous }; self.set_state(new); } pub async fn try_update(&self, session: SessionHandle, new: Status) { let old = self.get_state(); + let old: &Archived = old.as_ref(); let user = session.get_user(); if session.has_manage(self) // Default allow for managers || (session.has_write(self) // Decision tree for writers - && match (&old.state, &new) { + && match (&old.inner.state, &new) { // Going from available to used by the person requesting is okay. - (Status::Free, Status::InUse(who)) + (ArchivedStatus::Free, Status::InUse(who)) // Check that the person requesting does not request for somebody else. // *That* is manage privilege. if who == &user => true, // Reserving things for ourself is okay. - (Status::Free, Status::Reserved(whom)) + (ArchivedStatus::Free, Status::Reserved(whom)) if &user == whom => true, // Returning things we've been using is okay. This includes both if // they're being freed or marked as to be checked. - (Status::InUse(who), Status::Free | Status::ToCheck(_)) + (ArchivedStatus::InUse(who), Status::Free | Status::ToCheck(_)) if who == &user => true, // Un-reserving things we reserved is okay - (Status::Reserved(whom), Status::Free) + (ArchivedStatus::Reserved(whom), Status::Free) if whom == &user => true, // Using things that we've reserved is okay. But the person requesting // that has to be the person that reserved the machine. Otherwise // somebody could make a machine reserved by a different user as used by // that different user but use it themself. - (Status::Reserved(whom), Status::InUse(who)) + (ArchivedStatus::Reserved(whom), Status::InUse(who)) if whom == &user && who == whom => true, // Default is deny. @@ -149,13 +161,13 @@ impl Resource { }) // Default permissions everybody has - || match (&old.state, &new) { + || match (&old.inner.state, &new) { // Returning things we've been using is okay. This includes both if // they're being freed or marked as to be checked. - (Status::InUse(who), Status::Free | Status::ToCheck(_)) if who == &user => true, + (ArchivedStatus::InUse(who), Status::Free | Status::ToCheck(_)) if who == &user => true, // Un-reserving things we reserved is okay - (Status::Reserved(whom), Status::Free) if whom == &user => true, + (ArchivedStatus::Reserved(whom), Status::Free) if whom == &user => true, // Default is deny. _ => false, @@ -166,11 +178,15 @@ impl Resource { } pub async fn give_back(&self, session: SessionHandle) { - if let Status::InUse(user) = self.get_state().state { + let state = self.get_state(); + let s: &Archived = state.as_ref(); + let i: &Archived = &s.inner; + unimplemented!(); + /*if let Status::InUse(user) = self.get_state().state { if user == session.get_user() { self.set_state(MachineState::free(Some(user))); } - } + }*/ } pub async fn force_set(&self, new: Status) { @@ -182,13 +198,13 @@ impl Resource { } pub fn is_owned_by(&self, owner: UserRef) -> bool { - match self.get_state().state { - Status::Free | Status::Disabled => false, + match &self.get_state().as_ref().inner.state { + ArchivedStatus::Free | ArchivedStatus::Disabled => false, - Status::InUse(user) - | Status::ToCheck(user) - | Status::Blocked(user) - | Status::Reserved(user) => user == owner, + ArchivedStatus::InUse(user) + | ArchivedStatus::ToCheck(user) + | ArchivedStatus::Blocked(user) + | ArchivedStatus::Reserved(user) => user == &owner, } } } diff --git a/bffhd/resources/state/db.rs b/bffhd/resources/state/db.rs index b35cfc7..7f39394 100644 --- a/bffhd/resources/state/db.rs +++ b/bffhd/resources/state/db.rs @@ -1,74 +1,52 @@ -use std::{ - sync::Arc, - path::Path, -}; - -use rkyv::Archived; - -use crate::db::{ - DB, - Environment, - - EnvironmentFlags, - DatabaseFlags, +use crate::db; +use crate::db::{ArchivedValue, RawDB, DB, AlignedAdapter}; +use lmdb::{ + DatabaseFlags, Environment, EnvironmentFlags, RoTransaction, RwTransaction, Transaction, WriteFlags, - - Adapter, - AllocAdapter, - DBError, - - Transaction, - RoTransaction, - RwTransaction, - }; +use std::{path::Path, sync::Arc}; use crate::resources::state::State; -type StateAdapter = AllocAdapter; - -/// State Database containing the currently set state -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct StateDB { - /// The environment for all the databases below env: Arc, - - input: DB, - output: DB, - - // TODO: Index resources name/id/uuid -> u64 + db: DB>, } impl StateDB { pub fn open_env>(path: P) -> lmdb::Result> { Environment::new() - .set_flags( EnvironmentFlags::WRITE_MAP - | EnvironmentFlags::NO_SUB_DIR - | EnvironmentFlags::NO_TLS - | EnvironmentFlags::NO_READAHEAD) - .set_max_dbs(4) + .set_flags( + EnvironmentFlags::WRITE_MAP + | EnvironmentFlags::NO_SUB_DIR + | EnvironmentFlags::NO_TLS + | EnvironmentFlags::NO_READAHEAD, + ) .open(path.as_ref()) .map(Arc::new) } - fn new(env: Arc, input: DB, output: DB) -> Self { - Self { env: env, input, output } + fn new(env: Arc, db: RawDB) -> Self { + let db = DB::new(db); + Self { env, db } + } + + pub fn open_with_env(env: Arc) -> lmdb::Result { + let db = unsafe { RawDB::open(&env, Some("state"))? }; + Ok(Self::new(env, db)) } pub fn open>(path: P) -> lmdb::Result { let env = Self::open_env(path)?; - let input = unsafe { DB::open(&env, Some("input"))? }; - let output = unsafe { DB::open(&env, Some("output"))? }; - - Ok(Self::new(env, input, output)) + Self::open_with_env(env) } pub fn create_with_env(env: Arc) -> lmdb::Result { let flags = DatabaseFlags::empty(); - let input = unsafe { DB::create(&env, Some("input"), flags)? }; - let output = unsafe { DB::create(&env, Some("output"), flags)? }; + let db = unsafe { RawDB::create(&env, Some("state"), flags)? }; - Ok(Self::new(env, input, output)) + Ok(Self::new(env, db)) } pub fn create>(path: P) -> lmdb::Result { @@ -76,46 +54,28 @@ impl StateDB { Self::create_with_env(env) } - fn update_txn(&self, txn: &mut RwTransaction, key: impl AsRef<[u8]>, input: &State, output: &State) - -> Result<(), DBError> - { + pub fn begin_ro_txn(&self) -> Result { + self.env.begin_ro_txn() + } + + pub fn get(&self, key: impl AsRef<[u8]>) -> Result>, db::Error> { + let txn = self.env.begin_ro_txn()?; + self.db.get(&txn, &key.as_ref()) + } + + pub fn get_all<'txn, T: Transaction>( + &self, + txn: &'txn T, + ) -> Result)>, db::Error, > { + self.db.get_all(txn) + } + + pub fn put(&self, key: &impl AsRef<[u8]>, val: &ArchivedValue) -> Result<(), db::Error> { + let mut txn = self.env.begin_rw_txn()?; let flags = WriteFlags::empty(); - let k = key.as_ref(); - self.input.put(txn, &k, input, flags)?; - self.output.put(txn, &k, output, flags)?; - Ok(()) + self.db.put(&mut txn, key, val, flags)?; + txn.commit() } - - pub fn update(&self, key: impl AsRef<[u8]>, input: &State, output: &State) - -> Result<(), DBError> - { - let mut txn = self.env.begin_rw_txn().map_err(StateAdapter::from_db_err)?; - self.update_txn(&mut txn, key, input, output)?; - - txn.commit().map_err(StateAdapter::from_db_err) - } - - fn get(&self, db: &DB, key: impl AsRef<[u8]>) - -> Result>>, DBError> - { - let txn = self.env.begin_ro_txn().map_err(StateAdapter::from_db_err)?; - if let Some(state) = db.get(&txn, &key.as_ref())? { - let ptr = state.into(); - Ok(Some(unsafe { LMDBorrow::new(ptr, txn) })) - } else { - Ok(None) - } - } - - #[inline(always)] - pub fn get_input(&self, key: impl AsRef<[u8]>) - -> Result>>, DBError> - { self.get(&self.input, key) } - - #[inline(always)] - pub fn get_output(&self, key: impl AsRef<[u8]>) - -> Result>>, DBError> - { self.get(&self.output, key) } } #[cfg(test)] @@ -123,7 +83,7 @@ mod tests { use super::*; use crate::resource::state::value::Vec3u8; - use crate::resource::state::value::{OID_COLOUR, OID_POWERED, OID_INTENSITY}; + use crate::resource::state::value::{OID_COLOUR, OID_INTENSITY, OID_POWERED}; use std::ops::Deref; #[test] @@ -133,14 +93,14 @@ mod tests { tmppath.push("db"); let db = StateDB::create(tmppath).unwrap(); let b = State::build() - .add(OID_COLOUR.clone(), Box::new(Vec3u8 { a: 1, b: 2, c: 3})) + .add(OID_COLOUR.clone(), Box::new(Vec3u8 { a: 1, b: 2, c: 3 })) .add(OID_POWERED.clone(), Box::new(true)) .add(OID_INTENSITY.clone(), Box::new(1023)) .finish(); println!("({}) {:?}", b.hash(), b); let c = State::build() - .add(OID_COLOUR.clone(), Box::new(Vec3u8 { a: 1, b: 2, c: 3})) + .add(OID_COLOUR.clone(), Box::new(Vec3u8 { a: 1, b: 2, c: 3 })) .add(OID_POWERED.clone(), Box::new(true)) .add(OID_INTENSITY.clone(), Box::new(1023)) .finish(); diff --git a/bffhd/resources/state/mod.rs b/bffhd/resources/state/mod.rs index 8859169..5d244e1 100644 --- a/bffhd/resources/state/mod.rs +++ b/bffhd/resources/state/mod.rs @@ -4,15 +4,12 @@ use std::{ Hasher }, }; -use std::fmt::Formatter; +use std::fmt::{Debug, Display, Formatter}; +use std::marker::PhantomData; use std::ops::Deref; +use std::sync::Arc; -use rkyv::{ - Archive, - Deserialize, - out_field, - Serialize, -}; +use rkyv::{AlignedVec, Archive, Archived, Deserialize, out_field, Serialize}; use serde::de::{Error, MapAccess, Unexpected}; use serde::Deserializer; use serde::ser::SerializeMap; diff --git a/bffhd/session/db.rs b/bffhd/session/db.rs deleted file mode 100644 index 37e0eb3..0000000 --- a/bffhd/session/db.rs +++ /dev/null @@ -1,37 +0,0 @@ -use std::sync::Arc; -use lmdb::{DatabaseFlags, Environment}; - -use rkyv::{Archive, Serialize, Deserialize}; - -use crate::db::{AllocAdapter, DB, RawDB}; -use crate::users::UserRef; - -#[derive(Clone, Debug, PartialEq, Eq)] -#[derive(Archive, Serialize, Deserialize)] -pub struct Session { - userid: UserRef, -} - -type Adapter = AllocAdapter; -pub struct SessionCache { - env: Arc, - db: DB, -} - -impl SessionCache { - pub unsafe fn new(env: Arc, db: RawDB) -> Self { - let db = DB::new_unchecked(db); - Self { env, db } - } - - pub unsafe fn open(env: Arc) -> lmdb::Result { - let db = RawDB::open(&env, Some("sessions"))?; - Ok(Self::new(env, db)) - } - - pub unsafe fn create(env: Arc) -> lmdb::Result { - let flags = DatabaseFlags::empty(); - let db = RawDB::create(&env, Some("sessions"), flags)?; - Ok(Self::new(env, db)) - } -} \ No newline at end of file diff --git a/bffhd/session/mod.rs b/bffhd/session/mod.rs index 84635df..577a0bc 100644 --- a/bffhd/session/mod.rs +++ b/bffhd/session/mod.rs @@ -5,13 +5,9 @@ use once_cell::sync::OnceCell; use crate::authorization::roles::{Roles}; use crate::resources::Resource; -use crate::session::db::SessionCache; use crate::Users; use crate::users::UserRef; -mod db; - -static SESSION_CACHE: OnceCell = OnceCell::new(); #[derive(Clone)] pub struct SessionManager { diff --git a/bffhd/users/db.rs b/bffhd/users/db.rs index f0da9ba..1af3b8c 100644 --- a/bffhd/users/db.rs +++ b/bffhd/users/db.rs @@ -1,12 +1,15 @@ -use crate::db::{AllocAdapter, Environment, RawDB, Result, DB}; -use crate::db::{DatabaseFlags, RoTransaction, WriteFlags}; -use lmdb::{Transaction}; +use lmdb::{DatabaseFlags, Environment, Transaction, WriteFlags}; use std::collections::{HashMap}; +use rkyv::Infallible; use std::sync::Arc; use anyhow::Context; use rkyv::{Archived, Deserialize}; +use rkyv::ser::Serializer; +use rkyv::ser::serializers::AllocSerializer; +use crate::db; +use crate::db::{AlignedAdapter, ArchivedValue, DB, RawDB}; #[derive( Clone, @@ -72,58 +75,55 @@ impl UserData { } } -type Adapter = AllocAdapter; #[derive(Clone, Debug)] pub struct UserDB { env: Arc, - db: DB, + db: DB>, } impl UserDB { pub unsafe fn new(env: Arc, db: RawDB) -> Self { - let db = DB::new_unchecked(db); + let db = DB::new(db); Self { env, db } } - pub unsafe fn open(env: Arc) -> Result { + pub unsafe fn open(env: Arc) -> Result { let db = RawDB::open(&env, Some("user"))?; Ok(Self::new(env, db)) } - pub unsafe fn create(env: Arc) -> Result { + pub unsafe fn create(env: Arc) -> Result { let flags = DatabaseFlags::empty(); let db = RawDB::create(&env, Some("user"), flags)?; Ok(Self::new(env, db)) } - pub fn get(&self, uid: &str) -> Result>>> { + pub fn get(&self, uid: &str) -> Result>, db::Error> { let txn = self.env.begin_ro_txn()?; - if let Some(state) = self.db.get(&txn, &uid.as_bytes())? { - let ptr = state.into(); - Ok(Some(unsafe { LMDBorrow::new(ptr, txn) })) - } else { - Ok(None) - } + self.db.get(&txn, &uid.as_bytes()) } - pub fn put(&self, uid: &str, user: &User) -> Result<()> { + pub fn put(&self, uid: &str, user: &User) -> Result<(), db::Error> { + let mut serializer = AllocSerializer::<1024>::default(); + let pos = serializer.serialize_value(user).expect("rkyv error"); + assert_eq!(pos, 0); + let v = serializer.into_serializer().into_inner(); + let value = ArchivedValue::new(v); + let mut txn = self.env.begin_rw_txn()?; let flags = WriteFlags::empty(); - self.db.put(&mut txn, &uid.as_bytes(), user, flags)?; + self.db.put(&mut txn, &uid.as_bytes(), &value, flags)?; txn.commit()?; Ok(()) } - pub fn get_all(&self) -> Result> { + pub fn get_all(&self) -> Result, db::Error> { let txn = self.env.begin_ro_txn()?; - let mut cursor = self.db.open_ro_cursor(&txn)?; - let iter = cursor.iter_start(); + let mut iter = self.db.get_all(&txn)?; let mut out = Vec::new(); - let mut deserializer = rkyv::Infallible; - for user in iter { - let (uid, user) = user?; + for (uid, user) in iter { let uid = unsafe { std::str::from_utf8_unchecked(uid).to_string() }; - let user: User = user.deserialize(&mut deserializer).unwrap(); + let user: User = Deserialize::::deserialize(user.as_ref(), &mut Infallible).unwrap(); out.push((uid, user)); } diff --git a/bffhd/users/mod.rs b/bffhd/users/mod.rs index efc3bbb..eb1668c 100644 --- a/bffhd/users/mod.rs +++ b/bffhd/users/mod.rs @@ -43,7 +43,18 @@ use crate::UserDB; )] #[archive_attr(derive(Debug, PartialEq))] pub struct UserRef { - id: String, + pub id: String, +} + +impl PartialEq for UserRef { + fn eq(&self, other: &ArchivedUserRef) -> bool { + self.id == other.id + } +} +impl PartialEq for ArchivedUserRef { + fn eq(&self, other: &UserRef) -> bool { + self.id == other.id + } } impl UserRef { @@ -88,7 +99,7 @@ impl Users { self.userdb .get(uid) .unwrap() - .map(|user| Deserialize::::deserialize(user.deref(), &mut Infallible).unwrap()) + .map(|user| Deserialize::::deserialize(user.as_ref(), &mut Infallible).unwrap()) } pub fn load_file>(&self, path: P) -> anyhow::Result<()> { diff --git a/bin/bffhd/main.rs b/bin/bffhd/main.rs index b33856c..f07f9f8 100644 --- a/bin/bffhd/main.rs +++ b/bin/bffhd/main.rs @@ -1,5 +1,4 @@ use clap::{Arg, Command}; -use diflouroborane::db::Dump; use diflouroborane::{config, Diflouroborane}; @@ -112,9 +111,7 @@ fn main() -> anyhow::Result<()> { let mut config = config::read(&PathBuf::from_str(configpath).unwrap()).unwrap(); if matches.is_present("dump") { - let bffh = Diflouroborane::new(config)?; - let dump = Dump::new(bffh.users, bffh.resources)?; - println!("{:?}", dump); + unimplemented!() } else if matches.is_present("load") { let bffh = Diflouroborane::new(config)?; bffh.users.load_file(matches.value_of("load").unwrap());