Tear it down so you can rebuild it again

This commit is contained in:
Nadja Reitzenstein 2021-10-06 13:53:14 +02:00
parent 65830af01d
commit 6a6bc4e452
12 changed files with 860 additions and 368 deletions

226
Cargo.lock generated
View File

@ -23,6 +23,17 @@ dependencies = [
"pretty",
]
[[package]]
name = "ahash"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43bb833f0bf979d8475d38fbf09ed3b8a55e1885fe93ad3f93239fc6a4f17b98"
dependencies = [
"getrandom",
"once_cell",
"version_check",
]
[[package]]
name = "aho-corasick"
version = "0.7.18"
@ -300,6 +311,27 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7"
[[package]]
name = "bytecheck"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb738a1e65989ecdcd5bba16079641bd7209688fa546e1064832fd6e012fd32a"
dependencies = [
"bytecheck_derive",
"ptr_meta",
]
[[package]]
name = "bytecheck_derive"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3b4dff26fdc9f847dab475c9fec16f2cba82d5aa1f09981b87c44520721e10a"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "byteorder"
version = "1.4.3"
@ -467,6 +499,16 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "ctor"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccc0a48a9b826acdf4028595adc9db92caea352f7af011a3034acd172a52a0aa"
dependencies = [
"quote",
"syn",
]
[[package]]
name = "derivative"
version = "2.2.0"
@ -515,7 +557,7 @@ dependencies = [
[[package]]
name = "diflouroborane"
version = "0.2.0"
version = "0.3.0"
dependencies = [
"async-channel",
"async-trait",
@ -536,7 +578,11 @@ dependencies = [
"libc",
"lmdb-rkv",
"paho-mqtt",
"ptr_meta",
"rand",
"rkyv",
"rkyv_dyn",
"rkyv_typename",
"rsasl",
"rust-argon2",
"serde",
@ -549,6 +595,7 @@ dependencies = [
"smol",
"tempfile",
"toml",
"tracing",
"uuid",
"walkdir",
]
@ -968,6 +1015,17 @@ dependencies = [
"wasi",
]
[[package]]
name = "ghost"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a5bcf1bbeab73aa4cf2fde60a846858dc036163c7c33bec309f8d17de785479"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "glob"
version = "0.3.0"
@ -994,6 +1052,9 @@ name = "hashbrown"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
dependencies = [
"ahash",
]
[[package]]
name = "hermit-abi"
@ -1049,6 +1110,28 @@ dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "inventory"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f0f7efb804ec95e33db9ad49e4252f049e37e8b0a4652e3cd61f7999f2eff7f"
dependencies = [
"ctor",
"ghost",
"inventory-impl",
]
[[package]]
name = "inventory-impl"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75c094e94816723ab936484666968f5b58060492e880f3c8d00489a1e244fa51"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "itertools"
version = "0.9.0"
@ -1408,9 +1491,9 @@ dependencies = [
[[package]]
name = "proc-macro-crate"
version = "1.0.0"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41fdbd1df62156fbc5945f4762632564d7d038153091c3fcf1067f6aef7cff92"
checksum = "1ebace6889caf889b4d3f76becee12e90353f2b8c7d875534a71e5742f8f6f83"
dependencies = [
"thiserror",
"toml",
@ -1463,6 +1546,26 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "ptr_meta"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0738ccf7ea06b608c10564b31debd4f5bc5e197fc8bfe088f68ae5ce81e7a4f1"
dependencies = [
"ptr_meta_derive",
]
[[package]]
name = "ptr_meta_derive"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "quick-error"
version = "1.2.3"
@ -1563,6 +1666,85 @@ dependencies = [
"winapi",
]
[[package]]
name = "rend"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d0351a2e529ee30d571ef31faa5a4e0b9addaad087697b77efb20d2809e41c7"
dependencies = [
"bytecheck",
]
[[package]]
name = "rkyv"
version = "0.7.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f371763528cb5a45ff01879d08355c39d49b849e2e6e665989abe3262997bde0"
dependencies = [
"bytecheck",
"hashbrown",
"ptr_meta",
"rend",
"rkyv_derive",
"seahash",
]
[[package]]
name = "rkyv_derive"
version = "0.7.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fc962ad6ffbcc218212c465318c7c151221474350b281b4912f4890ea590b5"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "rkyv_dyn"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a75e044390151cefefc450726260d1a164348c4038ccd9c81c1066e8b52155b2"
dependencies = [
"inventory",
"lazy_static",
"ptr_meta",
"rkyv",
"rkyv_dyn_derive",
"rkyv_typename",
]
[[package]]
name = "rkyv_dyn_derive"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "184c2b11917482e21af6c380505d535e27db4929ad81bc49091a7350cabacd7d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "rkyv_typename"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41ea58eb82006d34657d395fafa4e62521c017b02ee84fd90fbd25b1b67fa923"
dependencies = [
"rkyv_typename_derive",
]
[[package]]
name = "rkyv_typename_derive"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b362b976c52da543e6cab9325b62fc5fec98852c6fb984e71c146014f79e28f"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "rsasl"
version = "1.4.0"
@ -1613,6 +1795,12 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "seahash"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b"
[[package]]
name = "serde"
version = "1.0.130"
@ -1934,6 +2122,38 @@ dependencies = [
"serde",
]
[[package]]
name = "tracing"
version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84f96e095c0c82419687c20ddf5cb3eadb61f4e1405923c9dc8e53a1adacbda8"
dependencies = [
"cfg-if 1.0.0",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4f915eb6abf914599c200260efced9203504c4c37380af10cdf3b7d36970650"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4"
dependencies = [
"lazy_static",
]
[[package]]
name = "typed-arena"
version = "1.7.0"

View File

@ -22,6 +22,8 @@ futures-signals = "0.3.22"
smol = "1.2.5"
tracing = "0.1.28"
signal-hook = "0.3.9"
slog = { version = "2.7.0", features = ["max_level_trace"] }
@ -29,6 +31,7 @@ slog-term = "2.8.0"
slog-async = "2.7.0"
capnp = "0.14.3"
capnpc = "0.14.4"
capnp-rpc = "0.14.1"
capnp-futures = "0.14.1"
@ -37,6 +40,11 @@ toml = "0.5.8"
flexbuffers = "2.0.0"
bincode = "2.0.0-dev"
rkyv = "0.7"
ptr_meta = "0.1"
rkyv_typename = "0.7"
rkyv_dyn = "0.7"
serde_dhall = { version = "0.10.1", default-features = false }
serde_json = "1.0"

2
schema

@ -1 +1 @@
Subproject commit 2c01a62085937fbe89223b1bce65f1f2913878cc
Subproject commit 2a14158a76e96de753f37a50af0b604e0ccfa7b7

View File

@ -1,102 +0,0 @@
use std::rc::Rc;
use std::cell::RefCell;
use std::ops::Deref;
use slog::Logger;
use std::sync::Arc;
use capnp::capability::{Params, Results, Promise};
use crate::schema::connection_capnp;
use crate::connection::Session;
use crate::db::Databases;
use crate::db::user::UserId;
use crate::network::Network;
pub mod auth;
mod machine;
mod machines;
use machines::Machines;
mod user;
mod users;
use users::Users;
// TODO Session restoration by making the Bootstrap cap a SturdyRef
pub struct Bootstrap {
log: Logger,
db: Databases,
nw: Arc<Network>,
session: Rc<RefCell<Option<Session>>>,
}
impl Bootstrap {
pub fn new(log: Logger, db: Databases, nw: Arc<Network>) -> Self {
info!(log, "Created Bootstrap");
let session = Rc::new(RefCell::new(None));
Self { session, db, nw, log }
}
}
use connection_capnp::bootstrap::*;
impl connection_capnp::bootstrap::Server for Bootstrap {
fn authentication_system(&mut self,
_: AuthenticationSystemParams,
mut res: AuthenticationSystemResults
) -> Promise<(), capnp::Error> {
// TODO: Forbid mutltiple authentication for now
// TODO: When should we allow multiple auth and how do me make sure that does not leak
// priviledges (e.g. due to previously issues caps)?
// If this Rc has a strong count of 1 then there's no other cap issued yet meaning we can
// safely transform the inner session with an auth.
if Rc::strong_count(&self.session) == 1 {
let session = Rc::clone(&self.session);
let db = self.db.clone();
res.get().set_authentication_system(capnp_rpc::new_client(
auth::Auth::new(self.log.new(o!()), db, session))
);
}
Promise::ok(())
}
fn machine_system(&mut self,
_: MachineSystemParams,
mut res: MachineSystemResults
) -> Promise<(), capnp::Error> {
if let Some(session) = self.session.borrow().deref() {
debug!(self.log, "Giving MachineSystem cap to user {} with perms:", session.authzid);
for r in session.perms.iter() {
debug!(session.log, " {}", r);
}
// TODO actual permission check and stuff
// Right now we only check that the user has authenticated at all.
let c = capnp_rpc::new_client(Machines::new(Rc::clone(&self.session), self.nw.clone()));
res.get().set_machine_system(c);
}
Promise::ok(())
}
fn user_system(
&mut self,
_: UserSystemParams,
mut results: UserSystemResults
) -> Promise<(), capnp::Error> {
if self.session.borrow().is_some() {
// TODO actual permission check and stuff
// Right now we only check that the user has authenticated at all.
let c = capnp_rpc::new_client(Users::new(Rc::clone(&self.session), self.db.userdb.clone()));
results.get().set_user_system(c);
}
Promise::ok(())
}
}

View File

@ -7,7 +7,6 @@ use slog::Logger;
use smol::lock::Mutex;
use smol::net::TcpStream;
use crate::api::Bootstrap;
use crate::error::Result;
use capnp_rpc::{rpc_twoparty_capnp, twoparty};
@ -78,7 +77,8 @@ impl ConnectionHandler {
pub fn handle(&mut self, stream: TcpStream) -> impl Future<Output = Result<()>> {
info!(self.log, "New connection from on {:?}", stream);
let boots = Bootstrap::new(self.log.new(o!()), self.db.clone(), self.network.clone());
let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(boots);
unimplemented!();
/*let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(boots);
let network = twoparty::VatNetwork::new(
stream.clone(),
@ -90,5 +90,6 @@ impl ConnectionHandler {
// Convert the error type to one of our errors
rpc_system.map(|r| r.map_err(Into::into))
*/
}
}

353
src/db.rs
View File

@ -1,99 +1,73 @@
use std::sync::Arc;
use std::path::PathBuf;
use std::str::FromStr;
use std::ops::{Deref, DerefMut};
use std::{
mem::size_of,
ops::Deref,
ptr::NonNull,
rc::Rc,
sync::Arc,
marker::PhantomData,
hash::{
Hash,
Hasher,
BuildHasher,
},
collections::hash_map::RandomState,
};
use slog::Logger;
use rkyv::{
Archive,
Archived,
archived_root,
use crate::error::Result;
use crate::config::Config;
Serialize,
Deserialize,
/// (Hashed) password database
pub mod pass;
/// User storage
pub mod user;
/// Access control storage
///
/// Stores&Retrieves Permissions and Roles
pub mod access;
/// Machine storage
///
/// Stores&Retrieves Machines
pub mod machine;
#[derive(Clone)]
pub struct Databases {
pub access: Arc<access::AccessControl>,
pub machine: Arc<machine::internal::Internal>,
pub userdb: Arc<user::Internal>,
}
const LMDB_MAX_DB: u32 = 16;
impl Databases {
pub fn new(log: &Logger, config: &Config) -> Result<Self> {
// Initialize the LMDB environment. This blocks until the mmap() finishes
info!(log, "LMDB env");
let env = lmdb::Environment::new()
.set_flags(lmdb::EnvironmentFlags::MAP_ASYNC | lmdb::EnvironmentFlags::NO_SUB_DIR)
.set_max_dbs(LMDB_MAX_DB as libc::c_uint)
.open(config.db_path.as_path())?;
// Start loading the machine database, authentication system and permission system
// All of those get a custom logger so the source of a log message can be better traced and
// filtered
let env = Arc::new(env);
let mdb = machine::init(log.new(o!("system" => "machines")), &config, env.clone())?;
let permdb = access::init(log.new(o!("system" => "permissions")), &config, env.clone())?;
let ac = access::AccessControl::new(permdb);
let userdb = user::init(log.new(o!("system" => "users")), &config, env.clone())?;
Ok(Self {
access: Arc::new(ac),
machine: Arc::new(mdb),
userdb: Arc::new(userdb),
})
}
}
ser::serializers::AllocScratchError,
};
use lmdb::{
Environment,
Database,
Transaction,
RoTransaction,
RwTransaction,
WriteFlags,
Cursor,
RoCursor,
RwCursor,
Iter,
};
pub use rkyv::{
Fallible,
};
pub use lmdb::{
Environment,
DatabaseFlags,
WriteFlags,
Transaction,
RoTransaction,
RwTransaction,
};
#[derive(Debug, Clone)]
pub struct DB {
env: Arc<Environment>,
pub struct RawDB {
db: Database,
}
impl DB {
pub fn new(env: Arc<Environment>, db: Database) -> Self {
Self { env, db }
impl RawDB {
pub fn open(env: &Environment, name: Option<&str>) -> lmdb::Result<Self> {
env.open_db(name).map(|db| Self { db })
}
pub fn open(env: Arc<Environment>, name: &str) -> lmdb::Result<Self> {
env.open_db(Some(name)).map(|db| { Self::new(env.clone(), db) })
pub fn create(env: &Environment, name: Option<&str>, flags: DatabaseFlags) -> lmdb::Result<Self> {
env.create_db(name, flags).map(|db| Self { db })
}
pub fn get<'txn, T: Transaction, K>(&self, txn: &'txn T, key: &K) -> lmdb::Result<&'txn [u8]>
pub fn get<'txn, T: Transaction, K>(&self, txn: &'txn T, key: &K) -> lmdb::Result<Option<&'txn [u8]>>
where K: AsRef<[u8]>
{
txn.get(self.db, key)
match txn.get(self.db, key) {
Ok(buf) => Ok(Some(buf)),
Err(lmdb::Error::NotFound) => Ok(None),
Err(e) => Err(e),
}
}
pub fn put<K, V>(&self, txn: &mut RwTransaction, key: &K, value: &V, flags: WriteFlags)
@ -125,154 +99,144 @@ impl DB {
pub fn open_ro_cursor<'txn, T: Transaction>(&self, txn: &'txn T) -> lmdb::Result<RoCursor<'txn>> {
txn.open_ro_cursor(self.db)
}
pub fn begin_ro_txn<'env>(&'env self) -> lmdb::Result<RoTransaction<'env>> {
self.env.begin_ro_txn()
}
pub fn begin_rw_txn<'env>(&'env self) -> lmdb::Result<RwTransaction<'env>> {
self.env.begin_rw_txn()
}
}
use std::result::Result as StdResult;
use serde::{Serialize, Deserialize};
use bincode::Options;
pub trait DatabaseAdapter {
type Key: ?Sized;
type Err: From<lmdb::Error> + From<bincode::Error>;
fn serialize_key(key: &Self::Key) -> &[u8];
fn deserialize_key<'de>(input: &'de [u8]) -> StdResult<&'de Self::Key, Self::Err>;
/// An read-only entry reference
pub struct EntryPtr<'txn, K, V> {
key: &'txn K,
val: &'txn V,
}
// Should we for some reason ever need to have different Options for different adapters we can have
// this in the DatabaseAdapter trait too
fn bincode_default() -> impl bincode::Options {
bincode::DefaultOptions::new()
.with_varint_encoding()
#[derive(Archive, Serialize, Deserialize)]
/// The entry as it is stored inside the database.
struct Entry<K: Archive, V: Archive> {
key: K,
val: V,
}
use std::marker::PhantomData;
pub struct Objectstore<'a, A, V: ?Sized> {
pub db: DB,
adapter: PhantomData<A>,
marker: PhantomData<&'a V>
pub struct HashDB<'txn, K, V, S = RandomState> {
db: RawDB,
hash_builder: S,
phantom: &'txn PhantomData<(K,V)>,
}
impl<A, V: ?Sized> Objectstore<'_, A, V> {
pub fn new(db: DB) -> Self {
Self {
db: db,
adapter: PhantomData,
marker: PhantomData,
}
}
}
impl<'txn, A, V> Objectstore<'txn, A, V>
where A: DatabaseAdapter,
V: ?Sized + Serialize + Deserialize<'txn>,
impl<K, V> HashDB<'_, K, V>
{
pub fn get<T: Transaction>(&self, txn: &'txn T, key: &A::Key)
-> StdResult<Option<V>, A::Err>
{
let opts = bincode_default();
pub fn create(env: &Environment, name: Option<&str>) -> lmdb::Result<Self> {
Self::create_with_hasher(env, name, RandomState::new())
}
pub fn open(env: &Environment, name: Option<&str>) -> lmdb::Result<Self> {
Self::open_with_hasher(env, name, RandomState::new())
}
}
self.db.get(txn, &A::serialize_key(key))
.map_or_else(
|err| match err {
lmdb::Error::NotFound => Ok(None),
e => Err(e.into()),
},
|ok| opts.deserialize(ok)
.map_err(|e| e.into())
.map(Option::Some)
)
impl<K, V, S> HashDB<'_, K, V, S>
{
pub fn create_with_hasher(env: &Environment, name: Option<&str>, hash_builder: S) -> lmdb::Result<Self> {
let flags = DatabaseFlags::INTEGER_KEY | DatabaseFlags::DUP_SORT;
let db = RawDB::create(env, name, flags)?;
Ok(Self {
db,
hash_builder,
phantom: &PhantomData,
})
}
pub fn open_with_hasher(env: &Environment, name: Option<&str>, hash_builder: S) -> lmdb::Result<Self> {
let db = RawDB::open(env, name)?;
Ok(Self {
db,
hash_builder,
phantom: &PhantomData,
})
}
/// Update `value` in-place from the database
}
impl<'txn, K, V, S> HashDB<'txn, K, V, S>
where K: Eq + Hash + Archive,
V: Archive,
S: BuildHasher,
K::Archived: PartialEq<K>,
{
/// Retrieve an entry from the hashdb
///
/// Returns `Ok(false)` if the key wasn't found. If this functions returns an error `value`
/// will be in an indeterminate state where some parts may be updated from the db.
pub fn get_in_place<T: Transaction>(&self, txn: &'txn T, key: &A::Key, value: &mut V)
-> StdResult<bool, A::Err>
/// The result is a view pinned to the lifetime of the transaction. You can get owned Values
/// using [`Deserialize`].
pub fn get<T: Transaction>(&self, txn: &'txn T, key: &K) -> lmdb::Result<Option<&'txn Archived<Entry<K, V>>>>
{
let opts = bincode_default();
let mut hasher = self.hash_builder.build_hasher();
key.hash(&mut hasher);
let hash = hasher.finish();
self.db.get(txn, &A::serialize_key(key))
.map_or_else(
|err| match err {
lmdb::Error::NotFound => Ok(false),
e => Err(e.into()),
},
|ok| opts.deserialize_in_place_buffer(ok, value)
.map_err(|e| e.into())
.map(|()| true)
)
}
pub fn iter<T: Transaction>(&self, txn: &'txn T) -> StdResult<ObjectIter<'txn, A, V>, A::Err> {
let mut cursor = self.db.open_ro_cursor(txn)?;
let iter = cursor.iter_start();
Ok(ObjectIter::new(cursor, iter))
for res in cursor.iter_dup_of(&hash.to_ne_bytes()) {
let (_keybuf, valbuf) = res?;
let entry: &Archived<Entry<K, V>> = unsafe { archived_root::<Entry<K,V>>(valbuf.as_ref()) };
if &entry.key == key {
return Ok(Some(entry)) /*(EntryPtr {
key: &entry.key,
val: &entry.val,
}))*/;
}
}
pub fn put(&self, txn: &'txn mut RwTransaction, key: &A::Key, value: &V, flags: lmdb::WriteFlags)
-> StdResult<(), A::Err>
{
let opts = bincode::DefaultOptions::new()
.with_varint_encoding();
// Serialized values are always at most as big as their memory representation.
// So even if usize is 32 bit this is safe given no segmenting is taking place.
let bufsize = opts.serialized_size(value)? as usize;
let buffer = self.db.reserve(txn, &A::serialize_key(key), bufsize, flags)?;
opts.serialize_into(buffer, value).map_err(|e| e.into())
Ok(None)
}
pub fn del(&self, txn: &'txn mut RwTransaction, key: &A::Key)
-> StdResult<(), A::Err>
{
self.db.del::<&[u8], &[u8]>(txn, &A::serialize_key(key), None).map_err(|e| e.into())
pub fn insert(&self, txn: &mut RwTransaction, entry: Archived<Entry<K, V>>) -> lmdb::Result<()> {
}
}
pub struct ObjectIter<'txn, A, V: ?Sized> {
cursor: RoCursor<'txn>,
inner: Iter<'txn>,
adapter: PhantomData<A>,
marker: PhantomData<&'txn V>,
/// Memory Fixpoint for a value in the DB
///
/// LMDB binds lifetimes of buffers to the transaction that returned the buffer. As long as this
/// transaction is not `commit()`ed, `abort()`ed or `reset()`ed the pages containing these values
/// are not returned into circulation.
/// This struct encodes this by binding a live reference to the Transaction to the returned
/// and interpreted buffer. The placeholder `T` is the container for the transaction. This may be a
/// plain `RoTransaction<'env>`, a `Rc<RoTxn>` (meaning Fix is !Send) or an `Arc<RoTxn>`, depending
/// on your needs.
pub struct Fix<T, V: Archive> {
ptr: NonNull<V::Archived>,
txn: T,
}
pub type PinnedGet<'env, V> = Fix<RoTransaction<'env>, V>;
pub type LocalKeep<'env, V> = Fix<Rc<RoTransaction<'env>>, V>;
pub type GlobalKeep<'env, V> = Fix<Arc<RoTransaction<'env>>, V>;
impl<'txn, A, V: ?Sized> ObjectIter<'txn, A, V> {
pub fn new(cursor: RoCursor<'txn>, inner: Iter<'txn>) -> Self {
let marker = PhantomData;
let adapter = PhantomData;
Self { cursor, inner, adapter, marker }
}
}
impl<'txn, A, V> Iterator for ObjectIter<'txn, A, V>
where A: DatabaseAdapter,
V: ?Sized + Serialize + Deserialize<'txn>,
impl<'env, T, V> Fix<T, V>
where T: AsRef<RoTransaction<'env>>,
V: Archive,
{
type Item = StdResult<V, A::Err>;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()?
.map_or_else(
|err| Some(Err(err.into())),
|(_, v)| Some(bincode_default().deserialize(v).map_err(|e| e.into()))
)
pub fn get(txn: T, db: &DB<V>, key: u64) -> lmdb::Result<Option<Self>> {
match db.get(txn.as_ref(), &key.to_ne_bytes()) {
Ok(buf) => Ok(Some(
Self {
ptr: unsafe { archived_root::<V>(buf.as_ref()).into() },
txn,
}
)),
Err(lmdb::Error::NotFound) => Ok(None),
Err(e) => Err(e),
}
}
}
impl<'env, T, V> Deref for Fix<T, V>
where T: AsRef<RoTransaction<'env>>,
V: Archive,
{
type Target = V::Archived;
fn deref(&self) -> &Self::Target {
// As long as the transaction is kept alive (which it is, because it's in self) state is a
// valid pointer so this is safe.
unsafe { self.ptr.as_ref() }
}
}
#[cfg(test)]
mod tests {
@ -356,7 +320,6 @@ mod tests {
let db = DB::new(e.env.clone(), ldb);
let adapter = TestAdapter;
let testdb = TestDB::new(db.clone());
let mut val = "value";

View File

@ -75,7 +75,7 @@ impl MachineState {
pub fn init(log: Logger, _config: &Config, env: Arc<lmdb::Environment>) -> Result<Internal> {
let mut flags = lmdb::DatabaseFlags::empty();
flags.set(lmdb::DatabaseFlags::INTEGER_KEY, true);
//flags.set(lmdb::DatabaseFlags::INTEGER_KEY, true);
let machdb = env.create_db(Some("machines"), flags)?;
debug!(&log, "Opened machine db successfully.");

View File

@ -10,7 +10,7 @@ use futures::task as futures_task;
use paho_mqtt::errors as mqtt;
use crate::network;
//FIXME use crate::network;
#[derive(Debug)]
pub enum Error {
@ -28,7 +28,7 @@ pub enum Error {
MQTT(mqtt::Error),
BadVersion((u32,u32)),
Argon2(argon2::Error),
EventNetwork(network::Error),
//EventNetwork(network::Error),
Denied,
}
@ -80,9 +80,9 @@ impl fmt::Display for Error {
Error::Denied => {
write!(f, "You do not have the permission required to do that.")
}
Error::EventNetwork(e) => {
/*Error::EventNetwork(e) => {
e.fmt(f)
}
}*/
}
}
}
@ -159,11 +159,11 @@ impl From<mqtt::Error> for Error {
}
}
impl From<network::Error> for Error {
/*impl From<network::Error> for Error {
fn from(e: network::Error) -> Error {
Error::EventNetwork(e)
}
}
}*/
impl From<argon2::Error> for Error {
fn from(e: argon2::Error) -> Error {

View File

@ -9,13 +9,11 @@ extern crate capnp_rpc;
extern crate async_trait;
/*
mod modules;
mod log;
mod api;
mod config;
mod error;
mod connection;
mod schema;
mod db;
mod machine;
mod builtin;
@ -24,8 +22,15 @@ mod network;
mod actor;
mod initiator;
mod space;
*/
mod resource;
mod schema;
mod state;
mod error;
mod db;
/*
use clap::{App, Arg};
@ -44,8 +49,13 @@ use slog::Logger;
use paho_mqtt::AsyncClient;
use crate::config::Config;
*/
fn main() {
pub fn main() {
}
/*fn main() {
use clap::{crate_version, crate_description, crate_name};
// Argument parsing
@ -199,3 +209,4 @@ fn maybe(matches: clap::ArgMatches, log: Arc<Logger>) -> Result<(), Error> {
server::serve_api_connections(log.clone(), config, db, network, ex)
}
}
*/

View File

@ -1,25 +1,92 @@
use core::sync::atomic;
use async_trait::async_trait;
/// A something BFFH holds internal state of
pub struct Resource {
// claims
strong: atomic::AtomicUsize,
weak: atomic::AtomicUsize,
max_strong: usize,
use std::pin::Pin;
use std::task::{Poll, Context};
use futures::ready;
use futures::future::{Future, BoxFuture};
use futures::channel::oneshot;
use futures::sink::Sink;
use futures_signals::signal::Mutable;
use smol::prelude::*;
use smol::future::FutureExt;
use smol::channel::{Sender, Receiver};
use crate::error::Error;
use crate::state::{State, StateStorage};
/// A resource in BFFH has to contain several different parts;
/// - Currently set state
/// - Execution state of attached actors (⇒ BFFH's job)
/// - Output of interal logic of a resource
/// ⇒ Resource logic gets read access to set state and write access to output state.
/// ⇒ state `update` happens via resource logic. This logic should do access control. If the update
/// succeeds then BFFH stores those input parameters ("set" state) and results / output state.
/// Storing input parameters is relevant so that BFFH can know that an "update" is a no-op
/// without having to run the module code.
/// ⇒ in fact actors only really care about the output state, and shouldn't (need to) see "set"
/// state.
/// ⇒ example reserving:
/// - Claimant sends 'update' message with a new state
/// - Doesn't set the state until `update` has returned Ok.
/// - This runs the `update` function with that new state and the claimants user context returning
/// either an Ok or an Error.
/// - Error is returned to Claimant to show user, stop.
/// - On ok:
/// - Commit new "set" state, storing it and making it visible to all other claimants
/// - Commit new output state, storing it and notifying all connected actors / Notify
/// ⇒ BFFHs job in this whole ordeal is:
/// - Message passing primitives so that update message are queued
/// - As reliable as possible storage system for input and output state
/// - Again message passing so that updates are broadcasted to all Notify and Actors.
/// ⇒ Resource module's job is:
/// - Validating updates semantically i.e. are the types correct
/// - Check authorization of updates i.e. is this user allowed to do that
#[async_trait]
pub trait Resource {
/// Returns true if the given state is valid, and false otherwise
fn validate(&mut self, state: &State) -> bool;
/// Run whatever internal logic this resource has for the given State update, and return the
/// new output state that this update produces.
async fn update(&mut self, state: &State) -> Result<State, Error>;
}
/// A claim is taken in lieu of an user on a resource.
///
/// They come in two flavours: Weak, of which an infinite amount can exist, and Strong which may be
/// limited in number. Strong claims represent the right of the user to use this resource
/// "writable". A weak claim indicates co-usage of a resource and are mainly useful for notice and
/// information of the respective other ones. E.g. a space would be strongly claimed by keyholders
/// when they check in and released when they check out and weakly claimed by everybody else. In
/// that case the last strong claim could also fail to be released if there are outstanding weak
/// claims. Alternatively, releasing the last strong claim also releases all weak claims and sets
/// the resource to "Free" again.
///
/// Most importantly, claims can be released by *both* the claim holder and the resource.
pub struct Claim {
id: u128,
pub struct Update {
pub state: State,
pub errchan: oneshot::Sender<Error>,
}
pub struct ResourceDriver {
res: Box<dyn Resource>,
db: StateStorage,
rx: Receiver<Update>,
signal: Mutable<State>,
}
impl ResourceDriver {
pub async fn drive_to_end(&mut self) {
while let Ok(update) = self.rx.recv().await {
let state = update.state;
let errchan = update.errchan;
match self.res.update(&state).await {
Ok(outstate) => {
// FIXME: Send any error here to some global error collector. A failed write to
// the DB is not necessarily fatal, but it means that BFFH is now in an
// inconsistent state until a future update succeeds with writing to the DB.
// Not applying the new state isn't correct either since we don't know what the
// internal logic of the resource has done to make this happen.
// Another half right solution is to unwrap and recreate everything.
self.db.store(&state, &outstate);
self.signal.set_neq(outstate);
},
Err(e) => {
errchan.send(e);
}
}
}
}
}

View File

@ -1,31 +1,28 @@
pub use capnpc::schema_capnp;
#[allow(dead_code)]
pub mod authenticationsystem_capnp {
include!(concat!(env!("OUT_DIR"), "/schema/authenticationsystem_capnp.rs"));
pub mod auth_capnp {
include!(concat!(env!("OUT_DIR"), "/schema/auth_capnp.rs"));
}
#[allow(dead_code)]
pub mod connection_capnp {
include!(concat!(env!("OUT_DIR"), "/schema/connection_capnp.rs"));
pub mod main_capnp {
include!(concat!(env!("OUT_DIR"), "/schema/main_capnp.rs"));
}
#[allow(dead_code)]
pub mod general_capnp {
include!(concat!(env!("OUT_DIR"), "/schema/general_capnp.rs"));
pub mod utils_capnp {
include!(concat!(env!("OUT_DIR"), "/schema/utils_capnp.rs"));
}
#[allow(dead_code)]
pub mod machine_capnp {
include!(concat!(env!("OUT_DIR"), "/schema/machine_capnp.rs"));
pub mod resource_capnp {
include!(concat!(env!("OUT_DIR"), "/schema/resource_capnp.rs"));
}
#[allow(dead_code)]
pub mod machinesystem_capnp {
include!(concat!(env!("OUT_DIR"), "/schema/machinesystem_capnp.rs"));
}
#[allow(dead_code)]
pub mod permissionsystem_capnp {
include!(concat!(env!("OUT_DIR"), "/schema/permissionsystem_capnp.rs"));
pub mod resources_capnp {
include!(concat!(env!("OUT_DIR"), "/schema/resources_capnp.rs"));
}
#[allow(dead_code)]
@ -33,17 +30,12 @@ pub mod role_capnp {
include!(concat!(env!("OUT_DIR"), "/schema/role_capnp.rs"));
}
#[allow(dead_code)]
pub mod space_capnp {
include!(concat!(env!("OUT_DIR"), "/schema/space_capnp.rs"));
}
#[allow(dead_code)]
pub mod user_capnp {
include!(concat!(env!("OUT_DIR"), "/schema/user_capnp.rs"));
}
#[allow(dead_code)]
pub mod usersystem_capnp {
include!(concat!(env!("OUT_DIR"), "/schema/usersystem_capnp.rs"));
pub mod users_capnp {
include!(concat!(env!("OUT_DIR"), "/schema/users_capnp.rs"));
}

332
src/state.rs Normal file
View File

@ -0,0 +1,332 @@
use std::fmt;
use std::rc::Rc;
use std::sync::Arc;
use std::any::Any;
use std::collections::{HashMap, hash_map::DefaultHasher};
use std::hash::{Hash, Hasher};
use std::default::Default;
use std::ptr::NonNull;
use std::alloc::Layout;
use std::ops::Deref;
use rkyv::{
Archive,
Archived,
Serialize,
Deserialize,
Fallible,
ser::{
Serializer,
ScratchSpace,
serializers::*,
},
string::{
StringResolver,
ArchivedString,
},
out_field,
archived_root,
};
use rkyv_dyn::{
archive_dyn,
};
use rkyv_typename::TypeName;
use crate::error::Error;
use crate::db::{DB, Environment, WriteFlags, Transaction, RoTransaction};
#[archive_dyn(deserialize)]
/// Trait to be implemented by any value in the state map.
///
/// A value can be any type not having dangling references (with the added restriction that it has
/// to implement `Debug` for debugger QoL).
/// In fact Value *also* needs to implement Hash since BFFH checks if the state is different to
/// before on input and output before updating the resource re. notifying actors and notifys. This
/// dependency is not expressable via supertraits since it is not possible to make Hash into a
/// trait object.
/// To solve this [`State`] uses the [`StateBuilder`] which adds an `Hash` requirement for inputs
/// on [`add`](struct::StateBuilder::add). The hash is being created over all inserted values and
/// then used to check for equality. Note that in addition to collisions, Hash is not guaranteed
/// stable over ordering and will additionally not track overwrites, so if the order of insertions
/// changes or values are set and later overwritten then two equal States can and are likely to
/// have different hashes.
pub trait Value: Any + fmt::Debug { }
#[repr(transparent)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Archive, Serialize, Deserialize)]
#[archive_attr(derive(TypeName, Debug))]
pub struct Bool(bool);
#[archive_dyn(deserialize)]
impl Value for Bool { }
impl Value for Archived<Bool> { }
#[repr(transparent)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Archive, Serialize, Deserialize)]
#[archive_attr(derive(TypeName, Debug))]
pub struct UInt32(u32);
#[archive_dyn(deserialize)]
impl Value for UInt32 { }
impl Value for Archived<UInt32> { }
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Archive, Serialize, Deserialize)]
#[archive_attr(derive(TypeName, Debug))]
pub struct Vec3u8 {
a: u8,
b: u8,
c: u8,
}
#[archive_dyn(deserialize)]
impl Value for Vec3u8 { }
impl Value for Archived<Vec3u8> { }
#[derive(Archive, Serialize, Deserialize)]
/// State object of a resource
///
/// This object serves three functions:
/// 1. it is constructed by modification via Claims or via internal resource logic
/// 2. it is serializable and storable in the database
/// 3. it is sendable and forwarded to all Actors and Notifys
pub struct State {
hash: u64,
inner: Vec<(String, Box<dyn SerializeValue>)>,
}
impl PartialEq for State {
fn eq(&self, other: &Self) -> bool {
self.hash == other.hash
}
}
impl Eq for State {}
impl fmt::Debug for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut sf = f.debug_struct("State");
for (k, v) in self.inner.iter() {
sf.field(k, v);
}
sf.finish()
}
}
impl State {
pub fn build() -> StateBuilder {
StateBuilder::new()
}
pub fn hash(&self) -> u64 {
self.hash
}
}
pub struct StateBuilder {
hasher: DefaultHasher,
inner: Vec<(String, Box<dyn SerializeValue>)>
}
impl StateBuilder {
pub fn new() -> Self {
let hasher = DefaultHasher::new();
Self { inner: Vec::new(), hasher }
}
pub fn finish(self) -> State {
State {
hash: self.hasher.finish(),
inner: self.inner,
}
}
/// Add key-value pair to the State being built.
///
/// We have to use this split system here because type erasure prevents us from limiting values
/// to `Hash`. Specifically, you can't have a trait object of `Hash` because `Hash` depends on
/// `Self`. In this function however the compiler still knows the exact type of `V` and can
/// call statically call its `hash` method.
pub fn add<V>(mut self, key: String, val: V) -> Self
where V: SerializeValue + Hash
{
key.hash(&mut self.hasher);
val.hash(&mut self.hasher);
self.inner.push((key, Box::new(val)));
self
}
}
pub struct StateStorage {
key: u64,
db: StateDB
}
impl StateStorage {
pub fn new(key: u64, db: StateDB) -> Self {
Self { key, db }
}
pub fn store(&mut self, instate: &State, outstate: &State) -> Result<(), Error> {
self.db.store(self.key, instate, outstate)
}
}
struct SizeSerializer {
pos: usize,
scratch: FallbackScratch<HeapScratch<1024>, AllocScratch>,
}
impl SizeSerializer {
pub fn new() -> Self {
Self { pos: 0, scratch: FallbackScratch::default() }
}
}
impl Fallible for SizeSerializer {
type Error = AllocScratchError;
}
impl Serializer for SizeSerializer {
fn pos(&self) -> usize {
self.pos
}
fn write(&mut self, bytes: &[u8]) -> Result<(), Self::Error> {
self.pos += bytes.len();
Ok(())
}
}
impl ScratchSpace for SizeSerializer {
unsafe fn push_scratch(
&mut self,
layout: Layout
) -> Result<NonNull<[u8]>, Self::Error> {
self.scratch.push_scratch(layout)
}
unsafe fn pop_scratch(
&mut self,
ptr: NonNull<u8>,
layout: Layout
) -> Result<(), Self::Error> {
self.scratch.pop_scratch(ptr, layout)
}
}
type LmdbSerializer<B, const N: usize> = CompositeSerializer<
BufferSerializer<B>,
FallbackScratch<HeapScratch<N>, AllocScratch>,
SharedSerializeMap,
>;
pub struct StateDB {
input: DB,
output: DB,
}
impl StateDB {
pub fn new(input: DB, output: DB) -> Self {
Self { input, output }
}
fn get_size(&self, state: &State) -> usize {
let mut serializer = SizeSerializer::new();
serializer.serialize_value(state);
serializer.pos()
}
pub fn store(&self, key: u64, instate: &State, outstate: &State) -> Result<(), Error> {
let insize = self.get_size(instate);
let outsize = self.get_size(outstate);
let mut txn = self.input.begin_rw_txn()?;
let mut inbuf = self.input.reserve(&mut txn, &key.to_ne_bytes(), insize, WriteFlags::empty())?;
let bufser = BufferSerializer::new(inbuf);
let ser: LmdbSerializer<&mut [u8], 1024> = LmdbSerializer::new(
bufser,
FallbackScratch::default(),
SharedSerializeMap::default()
);
let mut outbuf = self.output.reserve(&mut txn, &key.to_ne_bytes(), outsize, WriteFlags::empty())?;
let bufser = BufferSerializer::new(outbuf);
let ser: LmdbSerializer<&mut [u8], 1024> = LmdbSerializer::new(
bufser,
FallbackScratch::default(),
SharedSerializeMap::default()
);
txn.commit()?;
Ok(())
}
pub fn get_txn<'txn, T: Transaction>(&self, key: u64, txn: &'txn T)
-> Result<(&'txn ArchivedState, &'txn ArchivedState), Error>
{
let inbuf = self.input.get(txn, &key.to_ne_bytes())?;
let outbuf = self.output.get(txn, &key.to_ne_bytes())?;
let instate = unsafe {
archived_root::<State>(inbuf.as_ref())
};
let outstate = unsafe {
archived_root::<State>(outbuf.as_ref())
};
Ok((instate, outstate))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::tests::open_test_env;
use lmdb::{
EnvironmentFlags as EF,
DatabaseFlags as DF,
WriteFlags as WF,
};
use rkyv::Infallible;
use rkyv::ser::serializers::AllocSerializer;
use rkyv::archived_root;
use rkyv::util::archived_value;
#[test]
fn construct_state() {
let b = State::build()
.add("Colour".to_string(), Vec3u8 { a: 1, b: 2, c: 3})
.add("Powered".to_string(), Bool(true))
.add("Intensity".to_string(), UInt32(4242))
.finish();
println!("({}) {:?}", b.hash(), b);
let mut serializer = AllocSerializer::<256>::default();
let pos = serializer.serialize_value(&b).unwrap();
let buf = serializer.into_serializer().into_inner();
println!("Encsize: {}", buf.len());
let archived_state = unsafe {
archived_value::<State>(buf.as_ref(), pos)
};
let s: State = archived_state.deserialize(&mut Infallible).unwrap();
println!("({}) {:?}", pos, s);
}
#[test]
fn function_name_test() {
let te = open_text_env();
let ildb = e.create_db(Some("input"), DF::empty()).expect("Failed to create db file");
let oldb = e.create_db(Some("output"), DF::empty()).expect("Failed to create db file");
let idb = DB::new(e.env.clone(), ildb);
let odb = DB::new(e.env.clone(), oldb);
let db = StateDB::new(idb, odb);
}
}