DB up/down/sidegrade

This commit is contained in:
Nadja Reitzenstein 2022-03-13 20:11:37 +01:00
parent df5ee9a0a1
commit cc48dcca17
11 changed files with 180 additions and 349 deletions

View File

@ -1,4 +1,4 @@
use crate::resources::modules::fabaccess::MachineState; use crate::resources::modules::fabaccess::{MachineState, Status};
use crate::resources::Resource; use crate::resources::Resource;
use crate::session::SessionHandle; use crate::session::SessionHandle;
use api::machine_capnp::machine::{ use api::machine_capnp::machine::{
@ -49,7 +49,7 @@ impl UseServer for Machine {
let session = self.session.clone(); let session = self.session.clone();
Promise::from_future(async move { Promise::from_future(async move {
let user = session.get_user(); let user = session.get_user();
resource.try_update(session, MachineState::used(user)).await; resource.try_update(session, Status::InUse(user)).await;
Ok(()) Ok(())
}) })
} }
@ -64,7 +64,7 @@ impl UseServer for Machine {
Promise::from_future(async move { Promise::from_future(async move {
let user = session.get_user(); let user = session.get_user();
resource resource
.try_update(session, MachineState::reserved(user)) .try_update(session, Status::Reserved(user))
.await; .await;
Ok(()) Ok(())
}) })
@ -166,7 +166,7 @@ impl ManageServer for Machine {
let session = self.session.clone(); let session = self.session.clone();
Promise::from_future(async move { Promise::from_future(async move {
resource resource
.force_set(MachineState::used(session.get_user())) .force_set(Status::InUse(session.get_user()))
.await; .await;
Ok(()) Ok(())
}) })
@ -180,7 +180,7 @@ impl ManageServer for Machine {
let resource = self.resource.clone(); let resource = self.resource.clone();
let session = self.session.clone(); let session = self.session.clone();
Promise::from_future(async move { Promise::from_future(async move {
resource.force_set(MachineState::free()).await; resource.force_set(Status::Free).await;
Ok(()) Ok(())
}) })
} }
@ -203,7 +203,7 @@ impl ManageServer for Machine {
let session = self.session.clone(); let session = self.session.clone();
Promise::from_future(async move { Promise::from_future(async move {
resource resource
.force_set(MachineState::blocked(session.get_user())) .force_set(Status::Blocked(session.get_user()))
.await; .await;
Ok(()) Ok(())
}) })
@ -215,7 +215,7 @@ impl ManageServer for Machine {
) -> Promise<(), ::capnp::Error> { ) -> Promise<(), ::capnp::Error> {
let mut resource = self.resource.clone(); let mut resource = self.resource.clone();
Promise::from_future(async move { Promise::from_future(async move {
resource.force_set(MachineState::disabled()).await; resource.force_set(Status::Disabled).await;
Ok(()) Ok(())
}) })
} }
@ -230,12 +230,12 @@ impl AdminServer for Machine {
use api::schema::machine_capnp::machine::MachineState as APIMState; use api::schema::machine_capnp::machine::MachineState as APIMState;
let user = self.session.get_user(); let user = self.session.get_user();
let state = match pry!(pry!(params.get()).get_state()) { let state = match pry!(pry!(params.get()).get_state()) {
APIMState::Free => MachineState::free(), APIMState::Free => Status::Free,
APIMState::Blocked => MachineState::blocked(user), APIMState::Blocked => Status::Blocked(user),
APIMState::Disabled => MachineState::disabled(), APIMState::Disabled => Status::Disabled,
APIMState::InUse => MachineState::used(user), APIMState::InUse => Status::InUse(user),
APIMState::Reserved => MachineState::reserved(user), APIMState::Reserved => Status::Reserved(user),
APIMState::ToCheck => MachineState::check(user), APIMState::ToCheck => Status::ToCheck(user),
APIMState::Totakeover => return Promise::err(::capnp::Error::unimplemented( APIMState::Totakeover => return Promise::err(::capnp::Error::unimplemented(
"totakeover not implemented".to_string(), "totakeover not implemented".to_string(),
)), )),

View File

@ -30,7 +30,6 @@ pub use typed::{
mod hash; mod hash;
pub use hash::{ pub use hash::{
HashDB, HashDB,
Entry,
}; };
mod fix; mod fix;
@ -45,11 +44,12 @@ use std::sync::Arc;
use std::path::Path; use std::path::Path;
use crate::users::db::{User, UserDB}; use crate::users::db::{User, UserDB};
use std::collections::HashMap; use std::collections::HashMap;
use crate::resources::state::{OwnedEntry, State, db::StateDB}; use rkyv::Infallible;
use crate::resources::state::{State, db::StateDB};
use std::iter::FromIterator; use std::iter::FromIterator;
use std::ops::Deref; use std::ops::Deref;
use crate::authentication::db::PassDB; use crate::authentication::db::PassDB;
use crate::resources::db::ResourceDB; use crate::resources::search::ResourcesHandle;
use crate::utils::oid::{ArchivedObjectIdentifier, ObjectIdentifier}; use crate::utils::oid::{ArchivedObjectIdentifier, ObjectIdentifier};
use crate::resources::state::value::SerializeValue; use crate::resources::state::value::SerializeValue;
@ -117,98 +117,24 @@ impl<V: Serialize<AlignedSerializer<AlignedVec>>> Adapter for AlignedAdapter<V>
} }
} }
#[derive(Debug)]
pub struct Databases {
pub userdb: UserDB,
pub passdb: PassDB,
pub resourcedb: ResourceDB,
pub statedb: StateDB,
}
impl Databases {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let env = Arc::new(Environment::new()
.open(&Path::join(path.as_ref(), "internal"))?
);
let userdb = unsafe { UserDB::open(env.clone())? };
let passdb = unsafe { PassDB::open(env.clone())? };
let resourcedb = unsafe { ResourceDB::open(env)? };
let statedb = StateDB::open(&Path::join(path.as_ref(), "state"))?;
Ok(Self { userdb, passdb, resourcedb, statedb })
}
pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
let env = Arc::new(Environment::new()
.set_max_dbs(16)
.open(path.as_ref())?
);
let userdb = unsafe { UserDB::create(env.clone())? };
let passdb = unsafe { PassDB::create(env.clone())? };
let resourcedb = unsafe { ResourceDB::create(env)? };
let statedb = StateDB::create(&Path::join(path.as_ref(), "state"))?;
Ok(Self { userdb, passdb, resourcedb, statedb })
}
}
#[derive(Debug, serde::Serialize)] #[derive(Debug, serde::Serialize)]
pub struct Dump { pub struct Dump {
users: HashMap<String, User>, users: HashMap<String, User>,
passwds: HashMap<String, String>, passwds: HashMap<String, String>,
states: HashMap<String, (State, State)>, states: HashMap<String, State>,
} }
impl Dump { impl Dump {
pub fn new(dbs: &Databases) -> Result<Self> { pub fn new(userdb: UserDB, passdb: PassDB, resources: ResourcesHandle) -> Result<Self> {
let users = HashMap::from_iter(dbs.userdb.get_all()?.into_iter()); let users = HashMap::from_iter(userdb.get_all()?.into_iter());
let passwds = HashMap::from_iter(dbs.passdb.get_all()?.into_iter()); let passwds = HashMap::from_iter(passdb.get_all()?.into_iter());
let mut states = HashMap::new(); let mut states = HashMap::new();
for (name, id) in dbs.resourcedb.get_all()?.into_iter() { for resource in resources.list_all().into_iter() {
let input = dbs.statedb.get_input(id)?.map(|input| { if let Some(output) = resource.get_raw_state() {
let input: &Archived<State> = input.deref(); let output: State = Deserialize::<State, _>::deserialize(output.deref(), &mut Infallible).unwrap();
let hash: u64 = input.hash; let old = states.insert(resource.get_id().to_string(), output);
let inner = input.inner.iter() assert!(old.is_none());
.map(|entry| { }
let oid: &ArchivedObjectIdentifier = &entry.oid;
let bytes: &[u8] = oid.deref();
let mut vec = Vec::with_capacity(bytes.len());
vec.copy_from_slice(bytes);
let oid = ObjectIdentifier::new_unchecked(vec.into_boxed_slice());
let val: Box<dyn SerializeValue> = entry.val
.deserialize(&mut rkyv::Infallible).unwrap();
OwnedEntry { oid, val }
}).collect();
State { hash, inner }
}).unwrap_or(State::build().finish());
let output = dbs.statedb.get_output(id)?.map(|output| {
let output: &Archived<State> = output.deref();
let hash: u64 = output.hash;
let inner = output.inner.iter().map(|entry| {
let oid: &ArchivedObjectIdentifier = &entry.oid;
let bytes: &[u8] = oid.deref();
let mut vec = Vec::with_capacity(bytes.len());
vec.copy_from_slice(bytes);
let oid = ObjectIdentifier::new_unchecked(vec.into_boxed_slice());
let val: Box<dyn SerializeValue> = entry.val
.deserialize(&mut rkyv::Infallible).unwrap();
OwnedEntry { oid, val }
}).collect();
State { hash, inner }
}).unwrap_or(State::build().finish());
let old = states.insert(name, (input, output));
assert!(old.is_none());
} }
Ok(Self { users, passwds, states }) Ok(Self { users, passwds, states })

View File

@ -53,6 +53,7 @@ use executor::pool::Executor;
use crate::authentication::AuthenticationHandle; use crate::authentication::AuthenticationHandle;
use crate::capnp::APIServer; use crate::capnp::APIServer;
use crate::config::{Config, TlsListen}; use crate::config::{Config, TlsListen};
use crate::resources::modules::fabaccess::MachineState;
use crate::session::SessionManager; use crate::session::SessionManager;
use crate::tls::TlsConfig; use crate::tls::TlsConfig;
@ -87,7 +88,9 @@ impl Diflouroborane {
SIGTERM, SIGTERM,
]).context("Failed to construct signal handler")?; ]).context("Failed to construct signal handler")?;
// - Load Machines from config
// - Load states from DB
// - Connect modules to machines
let tlsconfig = TlsConfig::new(config.tlskeylog.as_ref(), !config.is_quiet())?; let tlsconfig = TlsConfig::new(config.tlskeylog.as_ref(), !config.is_quiet())?;
let acceptor = tlsconfig.make_tls_acceptor(&config.tlsconfig)?; let acceptor = tlsconfig.make_tls_acceptor(&config.tlsconfig)?;

View File

@ -6,6 +6,7 @@ use crate::db::RawDB;
use std::sync::Arc; use std::sync::Arc;
use crate::db::{Environment, DatabaseFlags}; use crate::db::{Environment, DatabaseFlags};
use crate::db::Result; use crate::db::Result;
use crate::resources::state::db::StateDB;
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Archive, Serialize, Deserialize)] #[derive(Archive, Serialize, Deserialize)]
@ -16,55 +17,3 @@ pub struct Resource {
name_idx: u64, name_idx: u64,
description_idx: u64, description_idx: u64,
} }
#[derive(Debug, Clone)]
pub struct ResourceDB {
env: Arc<Environment>,
db: DB<AllocAdapter<Resource>>,
id_index: DB<AlignedAdapter<u64>>,
}
impl ResourceDB {
pub unsafe fn new(env: Arc<Environment>, db: RawDB, id_index: RawDB) -> Self {
let db = DB::new_unchecked(db);
let id_index = DB::new_unchecked(id_index);
Self { env, db, id_index }
}
pub unsafe fn open(env: Arc<Environment>) -> Result<Self> {
let db = RawDB::open(&env, Some("resources"))?;
let idx = RawDB::open(&env, Some("resources-idx"))?;
Ok(Self::new(env, db, idx))
}
pub unsafe fn create(env: Arc<Environment>) -> Result<Self> {
let flags = DatabaseFlags::empty();
let db = RawDB::create(&env, Some("resources"), flags)?;
let idx = RawDB::create(&env, Some("resources-idx"), flags)?;
Ok(Self::new(env, db, idx))
}
pub fn lookup_id<S: AsRef<str>>(&self, id: S) -> Result<Option<u64>> {
let txn = self.env.begin_ro_txn()?;
let id = self.id_index.get(&txn, &id.as_ref().as_bytes()).map(|ok| {
ok.map(|num| *num)
})?;
Ok(id)
}
pub fn get_all(&self) -> Result<Vec<(String, u64)>> {
let txn = self.env.begin_ro_txn()?;
let mut cursor = self.id_index.open_ro_cursor(&txn)?;
let iter = cursor.iter_start();
let mut out = Vec::new();
for id in iter {
let (name, id) = id?;
let name = unsafe { std::str::from_utf8_unchecked(name).to_string() };
out.push((name, *id));
}
Ok(out)
}
}

View File

@ -4,9 +4,9 @@ use async_trait::async_trait;
use futures_signals::signal::Mutable; use futures_signals::signal::Mutable;
use async_oneshot::Sender; use async_oneshot::Sender;
use async_channel::Receiver; use async_channel::Receiver;
use crate::resources::state::db::StateDB;
use super::state::State; use super::state::State;
use super::state::db::StateAccessor;
/// A resources in BFFH has to contain several different parts; /// A resources in BFFH has to contain several different parts;
/// - Currently set state /// - Currently set state
@ -77,7 +77,8 @@ pub struct ResourceDriver {
rx: Receiver<Update>, rx: Receiver<Update>,
// output // output
db: StateAccessor, db: StateDB,
key: String,
signal: Mutable<State>, signal: Mutable<State>,
} }
@ -99,7 +100,7 @@ impl ResourceDriver {
// "Best" solution would be to tell the resources to rollback their interal // "Best" solution would be to tell the resources to rollback their interal
// changes on a fatal failure and then notify the Claimant, while simply trying // changes on a fatal failure and then notify the Claimant, while simply trying
// again for temporary failures. // again for temporary failures.
let _ = self.db.set(&state, &outstate); let _ = self.db.update(self.key.as_bytes(), &state, &outstate);
self.signal.set(outstate); self.signal.set(outstate);
}, },
Err(e) => { Err(e) => {

View File

@ -1,4 +1,11 @@
use std::ops::Deref;
use std::sync::Arc;
use futures_signals::signal::{Mutable, Signal, SignalExt};
use lmdb::RoTransaction;
use rkyv::Archived;
use crate::db::LMDBorrow;
use crate::resources::modules::fabaccess::{MachineState, Status}; use crate::resources::modules::fabaccess::{MachineState, Status};
use crate::resources::state::db::StateDB;
use crate::resources::state::State; use crate::resources::state::State;
use crate::session::SessionHandle; use crate::session::SessionHandle;
use crate::users::User; use crate::users::User;
@ -13,15 +20,78 @@ pub mod modules;
pub struct PermissionDenied; pub struct PermissionDenied;
#[derive(Clone)] pub(crate) struct Inner {
pub struct Resource {} id: String,
db: StateDB,
signal: Mutable<MachineState>,
}
impl Inner {
pub fn new(id: String, db: StateDB) -> Self {
let state = if let Some(previous) = db.get_output(id.as_bytes()).unwrap() {
let state = MachineState::from(&previous);
tracing::info!(%id, ?state, "Found previous state");
state
} else {
tracing::info!(%id, "No previous state, defaulting to `free`");
MachineState::free(None)
};
let signal = Mutable::new(state);
impl Resource { Self { id, db, signal }
pub fn get_state(&self) -> MachineState { }
unimplemented!()
pub fn signal(&self) -> impl Signal<Item=MachineState> {
Box::pin(self.signal.signal_cloned().dedupe_cloned())
}
fn get_state(&self) -> MachineState {
MachineState::from(&self.db.get_output(self.id.as_bytes()).unwrap().unwrap())
}
fn get_raw_state(&self) -> Option<LMDBorrow<RoTransaction, Archived<State>>> {
self.db.get_output(self.id.as_bytes()).unwrap()
} }
fn set_state(&self, state: MachineState) { fn set_state(&self, state: MachineState) {
let span = tracing::debug_span!("set", id = %self.id, ?state, "Updating state");
let _guard = span.enter();
tracing::debug!("Updating state");
tracing::trace!("Updating DB");
let update = state.to_state();
self.db.update(self.id.as_bytes(), &update, &update).unwrap();
tracing::trace!("Updated DB, sending update signal");
self.signal.set(state);
tracing::trace!("Sent update signal");
}
}
#[derive(Clone)]
pub struct Resource {
inner: Arc<Inner>
}
impl Resource {
pub(crate) fn new(inner: Arc<Inner>) -> Self {
Self { inner }
}
pub fn get_raw_state(&self) -> Option<LMDBorrow<RoTransaction, Archived<State>>> {
self.inner.get_raw_state()
}
pub fn get_state(&self) -> MachineState {
self.inner.get_state()
}
pub fn get_id(&self) -> &str {
&self.inner.id
}
fn set_state(&self, state: MachineState) {
}
fn set_status(&self, state: Status) {
unimplemented!() unimplemented!()
} }
@ -29,14 +99,14 @@ impl Resource {
unimplemented!() unimplemented!()
} }
pub async fn try_update(&self, session: SessionHandle, new: MachineState) { pub async fn try_update(&self, session: SessionHandle, new: Status) {
let old = self.get_state(); let old = self.get_state();
let user = session.get_user(); let user = session.get_user();
if session.has_manage(self) // Default allow for managers if session.has_manage(self) // Default allow for managers
|| (session.has_write(self) // Decision tree for writers || (session.has_write(self) // Decision tree for writers
&& match (old.state, &new.state) { && match (old.state, &new) {
// Going from available to used by the person requesting is okay. // Going from available to used by the person requesting is okay.
(Status::Free, Status::InUse(who)) (Status::Free, Status::InUse(who))
// Check that the person requesting does not request for somebody else. // Check that the person requesting does not request for somebody else.
@ -67,7 +137,7 @@ impl Resource {
}) })
// Default permissions everybody has // Default permissions everybody has
|| match (old.state, &new.state) { || match (old.state, &new) {
// Returning things we've been using is okay. This includes both if // Returning things we've been using is okay. This includes both if
// they're being freed or marked as to be checked. // they're being freed or marked as to be checked.
(Status::InUse(who), Status::Free | Status::ToCheck(_)) if who == user => true, (Status::InUse(who), Status::Free | Status::ToCheck(_)) if who == user => true,
@ -79,20 +149,19 @@ impl Resource {
_ => false, _ => false,
} }
{ {
self.set_state(new); self.set_status(new);
} }
} }
pub async fn give_back(&self, session: SessionHandle) { pub async fn give_back(&self, session: SessionHandle) {
if let Status::InUse(user) = self.get_state().state { if let Status::InUse(user) = self.get_state().state {
if user == session.get_user() { if user == session.get_user() {
self.set_state(MachineState::free()); self.set_state(MachineState::free(Some(user)));
self.set_previous_user(user);
} }
} }
} }
pub async fn force_set(&self, new: MachineState) { pub async fn force_set(&self, new: Status) {
unimplemented!() unimplemented!()
} }

View File

@ -1,10 +1,13 @@
use std::ops::Deref;
use crate::utils::oid::ObjectIdentifier; use crate::utils::oid::ObjectIdentifier;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use rkyv::{Archive, Deserialize, Serialize}; use rkyv::{Archive, Archived, Deserialize, Serialize, Infallible};
use rkyv_dyn::{DynError, DynSerializer}; use rkyv_dyn::{DynError, DynSerializer};
use std::str::FromStr; use std::str::FromStr;
use crate::oidvalue; use crate::oidvalue;
use crate::resources::state::{State};
use crate::resources::state::value::Value;
use crate::session::SessionHandle; use crate::session::SessionHandle;
use crate::users::User; use crate::users::User;
@ -49,66 +52,77 @@ pub enum Status {
serde::Serialize, serde::Serialize,
serde::Deserialize, serde::Deserialize,
)] )]
#[archive_attr(derive(Debug, PartialEq, serde::Serialize, serde::Deserialize))] #[archive_attr(derive(Debug, PartialEq))]
/// The status of the machine /// The status of the machine
pub struct MachineState { pub struct MachineState {
pub state: Status, pub state: Status,
pub previous: Option<User>,
} }
impl MachineState { impl MachineState {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
state: Status::Free, state: Status::Free,
previous: None,
} }
} }
pub fn from(state: Status) -> Self { pub fn from(dbstate: &Archived<State>) -> Self {
Self { state } use std::any::TypeId;
let state: &Archived<MachineState> = &dbstate.inner;
Deserialize::deserialize(state, &mut Infallible).unwrap()
}
pub fn to_state(&self) -> State {
State {
inner: self.clone()
}
} }
pub fn free() -> Self { pub fn free(previous: Option<User>) -> Self {
Self { Self {
state: Status::Free, state: Status::Free,
previous,
} }
} }
pub fn used(user: User) -> Self { pub fn used(user: User, previous: Option<User>) -> Self {
Self { Self {
state: Status::InUse(user), state: Status::InUse(user),
previous,
} }
} }
pub fn blocked(user: User) -> Self { pub fn blocked(user: User, previous: Option<User>) -> Self {
Self { Self {
state: Status::Blocked(user), state: Status::Blocked(user),
previous,
} }
} }
pub fn disabled() -> Self { pub fn disabled(previous: Option<User>) -> Self {
Self { Self {
state: Status::Disabled, state: Status::Disabled,
previous,
} }
} }
pub fn reserved(user: User) -> Self { pub fn reserved(user: User, previous: Option<User>) -> Self {
Self { Self {
state: Status::Reserved(user), state: Status::Reserved(user),
previous,
} }
} }
pub fn check(user: User) -> Self { pub fn check(user: User) -> Self {
Self { Self {
state: Status::ToCheck(user), state: Status::ToCheck(user),
previous: Some(user),
} }
} }
pub fn make_used(&mut self, session: SessionHandle) -> Self {
unimplemented!()
}
} }
static OID_TYPE: Lazy<ObjectIdentifier> = pub static OID_TYPE: Lazy<ObjectIdentifier> =
Lazy::new(|| ObjectIdentifier::from_str("1.3.6.1.4.1.48398.612.1.14").unwrap()); Lazy::new(|| ObjectIdentifier::from_str("1.3.6.1.4.1.48398.612.1.14").unwrap());
static OID_VALUE: Lazy<ObjectIdentifier> = pub static OID_VALUE: Lazy<ObjectIdentifier> =
Lazy::new(|| ObjectIdentifier::from_str("1.3.6.1.4.1.48398.612.2.4").unwrap()); Lazy::new(|| ObjectIdentifier::from_str("1.3.6.1.4.1.48398.612.2.4").unwrap());
oidvalue!(OID_TYPE, MachineState, ArchivedMachineState); oidvalue!(OID_TYPE, MachineState, ArchivedMachineState);

View File

@ -29,7 +29,7 @@ use crate::resources::state::State;
type StateAdapter = AllocAdapter<State>; type StateAdapter = AllocAdapter<State>;
/// State Database containing the currently set state /// State Database containing the currently set state
#[derive(Clone, Debug)] #[derive(Debug)]
pub struct StateDB { pub struct StateDB {
/// The environment for all the databases below /// The environment for all the databases below
env: Arc<Environment>, env: Arc<Environment>,
@ -55,18 +55,6 @@ impl StateDB {
Self { env: Arc::new(env), input, output } Self { env: Arc::new(env), input, output }
} }
pub fn init<P: AsRef<Path>>(path: P) -> lmdb::Result<Self> {
let env = Self::open_env(path)?;
let input = unsafe {
DB::create(&env, Some("input"), DatabaseFlags::INTEGER_KEY)?
};
let output = unsafe {
DB::create(&env, Some("output"), DatabaseFlags::INTEGER_KEY)?
};
Ok(Self::new(env, input, output))
}
pub fn open<P: AsRef<Path>>(path: P) -> lmdb::Result<Self> { pub fn open<P: AsRef<Path>>(path: P) -> lmdb::Result<Self> {
let env = Self::open_env(path)?; let env = Self::open_env(path)?;
let input = unsafe { DB::open(&env, Some("input"))? }; let input = unsafe { DB::open(&env, Some("input"))? };
@ -84,17 +72,17 @@ impl StateDB {
Ok(Self::new(env, input, output)) Ok(Self::new(env, input, output))
} }
fn update_txn(&self, txn: &mut RwTransaction, key: u64, input: &State, output: &State) fn update_txn(&self, txn: &mut RwTransaction, key: impl AsRef<[u8]>, input: &State, output: &State)
-> Result<(), DBError> -> Result<(), DBError>
{ {
let flags = WriteFlags::empty(); let flags = WriteFlags::empty();
let k = key.to_ne_bytes(); let k = key.as_ref();
self.input.put(txn, &k, input, flags)?; self.input.put(txn, &k, input, flags)?;
self.output.put(txn, &k, output, flags)?; self.output.put(txn, &k, output, flags)?;
Ok(()) Ok(())
} }
pub fn update(&self, key: u64, input: &State, output: &State) pub fn update(&self, key: impl AsRef<[u8]>, input: &State, output: &State)
-> Result<(), DBError> -> Result<(), DBError>
{ {
let mut txn = self.env.begin_rw_txn().map_err(StateAdapter::from_db_err)?; let mut txn = self.env.begin_rw_txn().map_err(StateAdapter::from_db_err)?;
@ -103,11 +91,11 @@ impl StateDB {
txn.commit().map_err(StateAdapter::from_db_err) txn.commit().map_err(StateAdapter::from_db_err)
} }
fn get(&self, db: &DB<StateAdapter>, key: u64) fn get(&self, db: &DB<StateAdapter>, key: impl AsRef<[u8]>)
-> Result<Option<LMDBorrow<RoTransaction, Archived<State>>>, DBError> -> Result<Option<LMDBorrow<RoTransaction, Archived<State>>>, DBError>
{ {
let txn = self.env.begin_ro_txn().map_err(StateAdapter::from_db_err)?; let txn = self.env.begin_ro_txn().map_err(StateAdapter::from_db_err)?;
if let Some(state) = db.get(&txn, &key.to_ne_bytes())? { if let Some(state) = db.get(&txn, &key.as_ref())? {
let ptr = state.into(); let ptr = state.into();
Ok(Some(unsafe { LMDBorrow::new(ptr, txn) })) Ok(Some(unsafe { LMDBorrow::new(ptr, txn) }))
} else { } else {
@ -116,46 +104,14 @@ impl StateDB {
} }
#[inline(always)] #[inline(always)]
pub fn get_input(&self, key: u64) pub fn get_input(&self, key: impl AsRef<[u8]>)
-> Result<Option<LMDBorrow<RoTransaction, Archived<State>>>, DBError> -> Result<Option<LMDBorrow<RoTransaction, Archived<State>>>, DBError>
{ self.get(&self.input, key) } { self.get(&self.input, key) }
#[inline(always)] #[inline(always)]
pub fn get_output(&self, key: u64) pub fn get_output(&self, key: impl AsRef<[u8]>)
-> Result<Option<LMDBorrow<RoTransaction, Archived<State>>>, DBError> -> Result<Option<LMDBorrow<RoTransaction, Archived<State>>>, DBError>
{ self.get(&self.output, key) } { self.get(&self.output, key) }
pub fn accessor(&self, key: u64) -> StateAccessor {
StateAccessor::new(key, self.clone())
}
}
#[derive(Debug)]
pub struct StateAccessor {
key: u64,
db: StateDB
}
impl StateAccessor {
pub fn new(key: u64, db: StateDB) -> Self {
Self { key, db }
}
pub fn get_input(&self)
-> Result<Option<LMDBorrow<RoTransaction, Archived<State>>>, DBError>
{
self.db.get_input(self.key)
}
pub fn get_output(&self)
-> Result<Option<LMDBorrow<RoTransaction, Archived<State>>>, DBError>
{
self.db.get_output(self.key)
}
pub fn set(&self, input: &State, output: &State) -> Result<(), DBError> {
self.db.update(self.key, input, output)
}
} }
#[cfg(test)] #[cfg(test)]
@ -171,7 +127,7 @@ mod tests {
let tmpdir = tempfile::tempdir().unwrap(); let tmpdir = tempfile::tempdir().unwrap();
let mut tmppath = tmpdir.path().to_owned(); let mut tmppath = tmpdir.path().to_owned();
tmppath.push("db"); tmppath.push("db");
let db = StateDB::init(tmppath).unwrap(); let db = StateDB::create(tmppath).unwrap();
let b = State::build() let b = State::build()
.add(OID_COLOUR.clone(), Box::new(Vec3u8 { a: 1, b: 2, c: 3})) .add(OID_COLOUR.clone(), Box::new(Vec3u8 { a: 1, b: 2, c: 3}))
.add(OID_POWERED.clone(), Box::new(true)) .add(OID_POWERED.clone(), Box::new(true))

View File

@ -16,159 +16,72 @@ use rkyv::{
out_field, out_field,
Serialize, Serialize,
}; };
use serde::de::{Error, MapAccess}; use serde::de::{Error, MapAccess, Unexpected};
use serde::Deserializer; use serde::Deserializer;
use serde::ser::SerializeMap; use serde::ser::SerializeMap;
use value::{RegisteredImpl, SerializeValue}; use value::{RegisteredImpl, SerializeValue};
use crate::MachineState;
use crate::resources::modules::fabaccess::OID_VALUE;
use crate::utils::oid::ObjectIdentifier; use crate::utils::oid::ObjectIdentifier;
use crate::resources::state::value::{DynOwnedVal, DynVal, TypeOid, }; use crate::resources::state::value::{DynOwnedVal, DynVal, TypeOid, Value};
pub mod value; pub mod value;
pub mod db; pub mod db;
#[derive(serde::Serialize, serde::Deserialize)]
#[derive(Archive, Serialize, Deserialize)] #[derive(Archive, Serialize, Deserialize)]
#[derive(Clone, PartialEq)] #[derive(Clone, PartialEq, Eq)]
#[archive_attr(derive(Debug))] #[archive_attr(derive(Debug))]
/// State object of a resources
///
/// This object serves three functions:
/// 1. it is constructed by modification via Claims or via internal resources logic
/// 2. it is serializable and storable in the database
/// 3. it is sendable and forwarded to all Actors and Notifys
pub struct State { pub struct State {
pub hash: u64, pub inner: MachineState,
pub inner: Vec<OwnedEntry>,
} }
impl State {
pub fn build() -> StateBuilder {
StateBuilder::new()
}
pub fn hash(&self) -> u64 {
self.hash
}
}
impl PartialEq<Archived<State>> for State {
fn eq(&self, other: &Archived<Self>) -> bool {
self.hash == other.hash
}
}
impl Eq for State {}
impl fmt::Debug for State { impl fmt::Debug for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut sf = f.debug_struct("State"); let mut sf = f.debug_struct("State");
for OwnedEntry { oid, val } in self.inner.iter() { //for Entry { oid, val } in self.inner.iter() {
let k: String = oid.into(); let k: String = OID_VALUE.deref().into();
sf.field(k.as_ref(), val); sf.field(k.as_ref(), &self.inner);
} //}
sf.finish() sf.finish()
} }
} }
#[derive(Debug)] impl serde::Serialize for State {
pub struct StateBuilder {
hasher: DefaultHasher,
inner: Vec<OwnedEntry>
}
impl StateBuilder {
pub fn new() -> Self {
let hasher = DefaultHasher::new();
Self { inner: Vec::new(), hasher }
}
pub fn finish(self) -> State {
State {
hash: self.hasher.finish(),
inner: self.inner,
}
}
/// Add key-value pair to the State being built.
///
/// We have to use this split system here because type erasure prevents us from limiting values
/// to `Hash`. Specifically, you can't have a trait object of `Hash` because `Hash` depends on
/// `Self`. In this function however the compiler still knows the exact type of `V` and can
/// call statically call its `hash` method.
pub fn add<V>(mut self, oid: ObjectIdentifier, val: Box<V>) -> Self
where V: SerializeValue + Hash + Archive,
Archived<V>: TypeOid + RegisteredImpl,
{
// Hash before creating the StateEntry struct which removes the type information
oid.hash(&mut self.hasher);
val.hash(&mut self.hasher);
self.inner.push(OwnedEntry { oid, val });
self
}
}
#[derive(Debug)]
pub struct Entry<'a> {
pub oid: &'a ObjectIdentifier,
pub val: &'a dyn SerializeValue,
}
#[derive(Debug, Clone, Archive, Serialize, Deserialize)]
#[archive_attr(derive(Debug))]
pub struct OwnedEntry {
pub oid: ObjectIdentifier,
pub val: Box<dyn SerializeValue>,
}
impl PartialEq for OwnedEntry {
fn eq(&self, other: &Self) -> bool {
self.oid == other.oid && self.val.dyn_eq(other.val.as_value())
}
}
impl<'a> serde::Serialize for Entry<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: serde::Serializer where S: serde::Serializer
{ {
let mut ser = serializer.serialize_map(Some(1))?; let mut ser = serializer.serialize_map(Some(1))?;
ser.serialize_entry(&self.oid, &DynVal(self.val))?; ser.serialize_entry(OID_VALUE.deref(), &self.inner)?;
ser.end() ser.end()
} }
} }
impl<'de> serde::Deserialize<'de> for State {
impl serde::Serialize for OwnedEntry {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: serde::Serializer
{
let mut ser = serializer.serialize_map(Some(1))?;
ser.serialize_entry(&self.oid, &DynVal(self.val.deref()))?;
ser.end()
}
}
impl<'de> serde::Deserialize<'de> for OwnedEntry {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where D: Deserializer<'de> where D: Deserializer<'de>
{ {
deserializer.deserialize_map(OwnedEntryVisitor) deserializer.deserialize_map(StateVisitor)
} }
} }
struct OwnedEntryVisitor; struct StateVisitor;
impl<'de> serde::de::Visitor<'de> for OwnedEntryVisitor { impl<'de> serde::de::Visitor<'de> for StateVisitor {
type Value = OwnedEntry; type Value = State;
fn expecting(&self, formatter: &mut Formatter) -> fmt::Result { fn expecting(&self, formatter: &mut Formatter) -> fmt::Result {
write!(formatter, "an one entry map from OID to some value object") write!(formatter, "a map from OIDs to value objects")
} }
fn visit_map<A: MapAccess<'de>>(self, mut map: A) -> Result<Self::Value, A::Error> fn visit_map<A: MapAccess<'de>>(self, mut map: A) -> Result<Self::Value, A::Error>
{ {
let oid: ObjectIdentifier = map.next_key()? let oid: ObjectIdentifier = map.next_key()?
.ok_or(A::Error::missing_field("oid"))?; .ok_or(A::Error::missing_field("oid"))?;
let val: DynOwnedVal = map.next_value()?; if oid != *OID_VALUE.deref() {
Ok(OwnedEntry { oid, val: val.0 }) return Err(A::Error::invalid_value(Unexpected::Other("Unknown OID"), &"OID of fabaccess state"))
}
let val: MachineState = map.next_value()?;
Ok(State { inner: val })
} }
} }

View File

@ -276,8 +276,8 @@ impl Clone for Box<dyn SerializeValue> {
} }
#[ptr_meta::pointee] #[ptr_meta::pointee]
pub trait DeserializeValue: Value + DeserializeDynOid {} pub trait DeserializeValue: DeserializeDynOid {}
impl<T: Value + DeserializeDynOid> DeserializeValue for T {} impl<T: DeserializeDynOid> DeserializeValue for T {}
impl ArchivePointee for dyn DeserializeValue { impl ArchivePointee for dyn DeserializeValue {
type ArchivedMetadata = ArchivedValueMetadata; type ArchivedMetadata = ArchivedValueMetadata;

View File

@ -1,5 +1,5 @@
use clap::{Arg, Command}; use clap::{Arg, Command};
use diflouroborane::db::{Databases, Dump}; use diflouroborane::db::Dump;
use diflouroborane::{config, Diflouroborane, error::Error}; use diflouroborane::{config, Diflouroborane, error::Error};
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use std::os::unix::prelude::AsRawFd; use std::os::unix::prelude::AsRawFd;