diff --git a/Cargo.lock b/Cargo.lock index 5455972..2420627 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,6 +23,17 @@ dependencies = [ "pretty", ] +[[package]] +name = "ahash" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43bb833f0bf979d8475d38fbf09ed3b8a55e1885fe93ad3f93239fc6a4f17b98" +dependencies = [ + "getrandom", + "once_cell", + "version_check", +] + [[package]] name = "aho-corasick" version = "0.7.18" @@ -300,6 +311,27 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" +[[package]] +name = "bytecheck" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb738a1e65989ecdcd5bba16079641bd7209688fa546e1064832fd6e012fd32a" +dependencies = [ + "bytecheck_derive", + "ptr_meta", +] + +[[package]] +name = "bytecheck_derive" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3b4dff26fdc9f847dab475c9fec16f2cba82d5aa1f09981b87c44520721e10a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "byteorder" version = "1.4.3" @@ -467,6 +499,16 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "ctor" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccc0a48a9b826acdf4028595adc9db92caea352f7af011a3034acd172a52a0aa" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "derivative" version = "2.2.0" @@ -515,7 +557,7 @@ dependencies = [ [[package]] name = "diflouroborane" -version = "0.2.0" +version = "0.3.0" dependencies = [ "async-channel", "async-trait", @@ -536,7 +578,11 @@ dependencies = [ "libc", "lmdb-rkv", "paho-mqtt", + "ptr_meta", "rand", + "rkyv", + "rkyv_dyn", + "rkyv_typename", "rsasl", "rust-argon2", "serde", @@ -549,6 +595,7 @@ dependencies = [ "smol", "tempfile", "toml", + "tracing", "uuid", "walkdir", ] @@ -968,6 +1015,17 @@ dependencies = [ "wasi", ] +[[package]] +name = "ghost" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a5bcf1bbeab73aa4cf2fde60a846858dc036163c7c33bec309f8d17de785479" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "glob" version = "0.3.0" @@ -994,6 +1052,9 @@ name = "hashbrown" version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +dependencies = [ + "ahash", +] [[package]] name = "hermit-abi" @@ -1049,6 +1110,28 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "inventory" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f0f7efb804ec95e33db9ad49e4252f049e37e8b0a4652e3cd61f7999f2eff7f" +dependencies = [ + "ctor", + "ghost", + "inventory-impl", +] + +[[package]] +name = "inventory-impl" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75c094e94816723ab936484666968f5b58060492e880f3c8d00489a1e244fa51" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "itertools" version = "0.9.0" @@ -1408,9 +1491,9 @@ dependencies = [ [[package]] name = "proc-macro-crate" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41fdbd1df62156fbc5945f4762632564d7d038153091c3fcf1067f6aef7cff92" +checksum = "1ebace6889caf889b4d3f76becee12e90353f2b8c7d875534a71e5742f8f6f83" dependencies = [ "thiserror", "toml", @@ -1463,6 +1546,26 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "ptr_meta" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0738ccf7ea06b608c10564b31debd4f5bc5e197fc8bfe088f68ae5ce81e7a4f1" +dependencies = [ + "ptr_meta_derive", +] + +[[package]] +name = "ptr_meta_derive" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -1563,6 +1666,85 @@ dependencies = [ "winapi", ] +[[package]] +name = "rend" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0351a2e529ee30d571ef31faa5a4e0b9addaad087697b77efb20d2809e41c7" +dependencies = [ + "bytecheck", +] + +[[package]] +name = "rkyv" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f371763528cb5a45ff01879d08355c39d49b849e2e6e665989abe3262997bde0" +dependencies = [ + "bytecheck", + "hashbrown", + "ptr_meta", + "rend", + "rkyv_derive", + "seahash", +] + +[[package]] +name = "rkyv_derive" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fc962ad6ffbcc218212c465318c7c151221474350b281b4912f4890ea590b5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "rkyv_dyn" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75e044390151cefefc450726260d1a164348c4038ccd9c81c1066e8b52155b2" +dependencies = [ + "inventory", + "lazy_static", + "ptr_meta", + "rkyv", + "rkyv_dyn_derive", + "rkyv_typename", +] + +[[package]] +name = "rkyv_dyn_derive" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "184c2b11917482e21af6c380505d535e27db4929ad81bc49091a7350cabacd7d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "rkyv_typename" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41ea58eb82006d34657d395fafa4e62521c017b02ee84fd90fbd25b1b67fa923" +dependencies = [ + "rkyv_typename_derive", +] + +[[package]] +name = "rkyv_typename_derive" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b362b976c52da543e6cab9325b62fc5fec98852c6fb984e71c146014f79e28f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "rsasl" version = "1.4.0" @@ -1613,6 +1795,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "seahash" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" + [[package]] name = "serde" version = "1.0.130" @@ -1934,6 +2122,38 @@ dependencies = [ "serde", ] +[[package]] +name = "tracing" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84f96e095c0c82419687c20ddf5cb3eadb61f4e1405923c9dc8e53a1adacbda8" +dependencies = [ + "cfg-if 1.0.0", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4f915eb6abf914599c200260efced9203504c4c37380af10cdf3b7d36970650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4" +dependencies = [ + "lazy_static", +] + [[package]] name = "typed-arena" version = "1.7.0" diff --git a/Cargo.toml b/Cargo.toml index 00178ab..04ad4de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,8 @@ futures-signals = "0.3.22" smol = "1.2.5" +tracing = "0.1.28" + signal-hook = "0.3.9" slog = { version = "2.7.0", features = ["max_level_trace"] } @@ -29,6 +31,7 @@ slog-term = "2.8.0" slog-async = "2.7.0" capnp = "0.14.3" +capnpc = "0.14.4" capnp-rpc = "0.14.1" capnp-futures = "0.14.1" @@ -37,6 +40,11 @@ toml = "0.5.8" flexbuffers = "2.0.0" bincode = "2.0.0-dev" +rkyv = "0.7" +ptr_meta = "0.1" +rkyv_typename = "0.7" +rkyv_dyn = "0.7" + serde_dhall = { version = "0.10.1", default-features = false } serde_json = "1.0" diff --git a/schema b/schema index 2c01a62..2a14158 160000 --- a/schema +++ b/schema @@ -1 +1 @@ -Subproject commit 2c01a62085937fbe89223b1bce65f1f2913878cc +Subproject commit 2a14158a76e96de753f37a50af0b604e0ccfa7b7 diff --git a/src/api.rs b/src/api.rs deleted file mode 100644 index 8effc5d..0000000 --- a/src/api.rs +++ /dev/null @@ -1,102 +0,0 @@ -use std::rc::Rc; -use std::cell::RefCell; -use std::ops::Deref; - -use slog::Logger; - -use std::sync::Arc; - -use capnp::capability::{Params, Results, Promise}; - -use crate::schema::connection_capnp; -use crate::connection::Session; - -use crate::db::Databases; -use crate::db::user::UserId; - -use crate::network::Network; - -pub mod auth; -mod machine; -mod machines; -use machines::Machines; - -mod user; -mod users; -use users::Users; - -// TODO Session restoration by making the Bootstrap cap a SturdyRef -pub struct Bootstrap { - log: Logger, - - db: Databases, - nw: Arc, - - session: Rc>>, -} - -impl Bootstrap { - pub fn new(log: Logger, db: Databases, nw: Arc) -> Self { - info!(log, "Created Bootstrap"); - let session = Rc::new(RefCell::new(None)); - Self { session, db, nw, log } - } -} - -use connection_capnp::bootstrap::*; -impl connection_capnp::bootstrap::Server for Bootstrap { - fn authentication_system(&mut self, - _: AuthenticationSystemParams, - mut res: AuthenticationSystemResults - ) -> Promise<(), capnp::Error> { - // TODO: Forbid mutltiple authentication for now - // TODO: When should we allow multiple auth and how do me make sure that does not leak - // priviledges (e.g. due to previously issues caps)? - - // If this Rc has a strong count of 1 then there's no other cap issued yet meaning we can - // safely transform the inner session with an auth. - if Rc::strong_count(&self.session) == 1 { - let session = Rc::clone(&self.session); - let db = self.db.clone(); - res.get().set_authentication_system(capnp_rpc::new_client( - auth::Auth::new(self.log.new(o!()), db, session)) - ); - } - - Promise::ok(()) - } - - fn machine_system(&mut self, - _: MachineSystemParams, - mut res: MachineSystemResults - ) -> Promise<(), capnp::Error> { - if let Some(session) = self.session.borrow().deref() { - debug!(self.log, "Giving MachineSystem cap to user {} with perms:", session.authzid); - for r in session.perms.iter() { - debug!(session.log, " {}", r); - } - - // TODO actual permission check and stuff - // Right now we only check that the user has authenticated at all. - let c = capnp_rpc::new_client(Machines::new(Rc::clone(&self.session), self.nw.clone())); - res.get().set_machine_system(c); - } - - Promise::ok(()) - } - - fn user_system( - &mut self, - _: UserSystemParams, - mut results: UserSystemResults - ) -> Promise<(), capnp::Error> { - if self.session.borrow().is_some() { - // TODO actual permission check and stuff - // Right now we only check that the user has authenticated at all. - let c = capnp_rpc::new_client(Users::new(Rc::clone(&self.session), self.db.userdb.clone())); - results.get().set_user_system(c); - } - - Promise::ok(()) - } -} diff --git a/src/connection.rs b/src/connection.rs index e2cbcd5..9dfaefd 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -7,7 +7,6 @@ use slog::Logger; use smol::lock::Mutex; use smol::net::TcpStream; -use crate::api::Bootstrap; use crate::error::Result; use capnp_rpc::{rpc_twoparty_capnp, twoparty}; @@ -78,7 +77,8 @@ impl ConnectionHandler { pub fn handle(&mut self, stream: TcpStream) -> impl Future> { info!(self.log, "New connection from on {:?}", stream); let boots = Bootstrap::new(self.log.new(o!()), self.db.clone(), self.network.clone()); - let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(boots); + unimplemented!(); + /*let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(boots); let network = twoparty::VatNetwork::new( stream.clone(), @@ -90,5 +90,6 @@ impl ConnectionHandler { // Convert the error type to one of our errors rpc_system.map(|r| r.map_err(Into::into)) + */ } } diff --git a/src/db.rs b/src/db.rs index d4c1305..e25a57c 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,99 +1,73 @@ -use std::sync::Arc; -use std::path::PathBuf; -use std::str::FromStr; -use std::ops::{Deref, DerefMut}; +use std::{ + mem::size_of, + ops::Deref, + ptr::NonNull, + rc::Rc, + sync::Arc, + marker::PhantomData, + hash::{ + Hash, + Hasher, + BuildHasher, + }, + collections::hash_map::RandomState, +}; -use slog::Logger; +use rkyv::{ + Archive, + Archived, + archived_root, -use crate::error::Result; -use crate::config::Config; + Serialize, + Deserialize, -/// (Hashed) password database -pub mod pass; - -/// User storage -pub mod user; - -/// Access control storage -/// -/// Stores&Retrieves Permissions and Roles -pub mod access; - -/// Machine storage -/// -/// Stores&Retrieves Machines -pub mod machine; - -#[derive(Clone)] -pub struct Databases { - pub access: Arc, - pub machine: Arc, - pub userdb: Arc, -} - -const LMDB_MAX_DB: u32 = 16; - -impl Databases { - pub fn new(log: &Logger, config: &Config) -> Result { - - // Initialize the LMDB environment. This blocks until the mmap() finishes - info!(log, "LMDB env"); - let env = lmdb::Environment::new() - .set_flags(lmdb::EnvironmentFlags::MAP_ASYNC | lmdb::EnvironmentFlags::NO_SUB_DIR) - .set_max_dbs(LMDB_MAX_DB as libc::c_uint) - .open(config.db_path.as_path())?; - - // Start loading the machine database, authentication system and permission system - // All of those get a custom logger so the source of a log message can be better traced and - // filtered - let env = Arc::new(env); - let mdb = machine::init(log.new(o!("system" => "machines")), &config, env.clone())?; - - let permdb = access::init(log.new(o!("system" => "permissions")), &config, env.clone())?; - let ac = access::AccessControl::new(permdb); - - let userdb = user::init(log.new(o!("system" => "users")), &config, env.clone())?; - - Ok(Self { - access: Arc::new(ac), - machine: Arc::new(mdb), - userdb: Arc::new(userdb), - }) - } -} + ser::serializers::AllocScratchError, +}; use lmdb::{ - Environment, Database, - Transaction, - RoTransaction, - RwTransaction, - WriteFlags, Cursor, RoCursor, - RwCursor, Iter, }; +pub use rkyv::{ + Fallible, +}; +pub use lmdb::{ + Environment, + + DatabaseFlags, + WriteFlags, + + Transaction, + RoTransaction, + RwTransaction, +}; + + #[derive(Debug, Clone)] -pub struct DB { - env: Arc, +pub struct RawDB { db: Database, } -impl DB { - pub fn new(env: Arc, db: Database) -> Self { - Self { env, db } +impl RawDB { + pub fn open(env: &Environment, name: Option<&str>) -> lmdb::Result { + env.open_db(name).map(|db| Self { db }) + } + + pub fn create(env: &Environment, name: Option<&str>, flags: DatabaseFlags) -> lmdb::Result { + env.create_db(name, flags).map(|db| Self { db }) } - pub fn open(env: Arc, name: &str) -> lmdb::Result { - env.open_db(Some(name)).map(|db| { Self::new(env.clone(), db) }) - } - - pub fn get<'txn, T: Transaction, K>(&self, txn: &'txn T, key: &K) -> lmdb::Result<&'txn [u8]> + pub fn get<'txn, T: Transaction, K>(&self, txn: &'txn T, key: &K) -> lmdb::Result> where K: AsRef<[u8]> { - txn.get(self.db, key) + match txn.get(self.db, key) { + Ok(buf) => Ok(Some(buf)), + Err(lmdb::Error::NotFound) => Ok(None), + Err(e) => Err(e), + } } pub fn put(&self, txn: &mut RwTransaction, key: &K, value: &V, flags: WriteFlags) @@ -125,155 +99,145 @@ impl DB { pub fn open_ro_cursor<'txn, T: Transaction>(&self, txn: &'txn T) -> lmdb::Result> { txn.open_ro_cursor(self.db) } +} - pub fn begin_ro_txn<'env>(&'env self) -> lmdb::Result> { - self.env.begin_ro_txn() +/// An read-only entry reference +pub struct EntryPtr<'txn, K, V> { + key: &'txn K, + val: &'txn V, +} + +#[derive(Archive, Serialize, Deserialize)] +/// The entry as it is stored inside the database. +struct Entry { + key: K, + val: V, +} + +pub struct HashDB<'txn, K, V, S = RandomState> { + db: RawDB, + hash_builder: S, + phantom: &'txn PhantomData<(K,V)>, +} + +impl HashDB<'_, K, V> +{ + pub fn create(env: &Environment, name: Option<&str>) -> lmdb::Result { + Self::create_with_hasher(env, name, RandomState::new()) } - - pub fn begin_rw_txn<'env>(&'env self) -> lmdb::Result> { - self.env.begin_rw_txn() + pub fn open(env: &Environment, name: Option<&str>) -> lmdb::Result { + Self::open_with_hasher(env, name, RandomState::new()) } } -use std::result::Result as StdResult; -use serde::{Serialize, Deserialize}; -use bincode::Options; +impl HashDB<'_, K, V, S> +{ + pub fn create_with_hasher(env: &Environment, name: Option<&str>, hash_builder: S) -> lmdb::Result { + let flags = DatabaseFlags::INTEGER_KEY | DatabaseFlags::DUP_SORT; + let db = RawDB::create(env, name, flags)?; -pub trait DatabaseAdapter { - type Key: ?Sized; - type Err: From + From; + Ok(Self { + db, + hash_builder, + phantom: &PhantomData, + }) + } + pub fn open_with_hasher(env: &Environment, name: Option<&str>, hash_builder: S) -> lmdb::Result { + let db = RawDB::open(env, name)?; + + Ok(Self { + db, + hash_builder, + phantom: &PhantomData, + }) + } - fn serialize_key(key: &Self::Key) -> &[u8]; - fn deserialize_key<'de>(input: &'de [u8]) -> StdResult<&'de Self::Key, Self::Err>; } -// Should we for some reason ever need to have different Options for different adapters we can have -// this in the DatabaseAdapter trait too -fn bincode_default() -> impl bincode::Options { - bincode::DefaultOptions::new() - .with_varint_encoding() +impl<'txn, K, V, S> HashDB<'txn, K, V, S> + where K: Eq + Hash + Archive, + V: Archive, + S: BuildHasher, + K::Archived: PartialEq, +{ + /// Retrieve an entry from the hashdb + /// + /// The result is a view pinned to the lifetime of the transaction. You can get owned Values + /// using [`Deserialize`]. + pub fn get(&self, txn: &'txn T, key: &K) -> lmdb::Result>>> + { + let mut hasher = self.hash_builder.build_hasher(); + key.hash(&mut hasher); + let hash = hasher.finish(); + + let mut cursor = self.db.open_ro_cursor(txn)?; + for res in cursor.iter_dup_of(&hash.to_ne_bytes()) { + let (_keybuf, valbuf) = res?; + let entry: &Archived> = unsafe { archived_root::>(valbuf.as_ref()) }; + + if &entry.key == key { + return Ok(Some(entry)) /*(EntryPtr { + key: &entry.key, + val: &entry.val, + }))*/; + } + } + + Ok(None) + } + + pub fn insert(&self, txn: &mut RwTransaction, entry: Archived>) -> lmdb::Result<()> { + + } } -use std::marker::PhantomData; - -pub struct Objectstore<'a, A, V: ?Sized> { - pub db: DB, - adapter: PhantomData, - marker: PhantomData<&'a V> +/// Memory Fixpoint for a value in the DB +/// +/// LMDB binds lifetimes of buffers to the transaction that returned the buffer. As long as this +/// transaction is not `commit()`ed, `abort()`ed or `reset()`ed the pages containing these values +/// are not returned into circulation. +/// This struct encodes this by binding a live reference to the Transaction to the returned +/// and interpreted buffer. The placeholder `T` is the container for the transaction. This may be a +/// plain `RoTransaction<'env>`, a `Rc` (meaning Fix is !Send) or an `Arc`, depending +/// on your needs. +pub struct Fix { + ptr: NonNull, + txn: T, } +pub type PinnedGet<'env, V> = Fix, V>; +pub type LocalKeep<'env, V> = Fix>, V>; +pub type GlobalKeep<'env, V> = Fix>, V>; -impl Objectstore<'_, A, V> { - pub fn new(db: DB) -> Self { - Self { - db: db, - adapter: PhantomData, - marker: PhantomData, +impl<'env, T, V> Fix + where T: AsRef>, + V: Archive, +{ + pub fn get(txn: T, db: &DB, key: u64) -> lmdb::Result> { + match db.get(txn.as_ref(), &key.to_ne_bytes()) { + Ok(buf) => Ok(Some( + Self { + ptr: unsafe { archived_root::(buf.as_ref()).into() }, + txn, + } + )), + Err(lmdb::Error::NotFound) => Ok(None), + Err(e) => Err(e), } } } - -impl<'txn, A, V> Objectstore<'txn, A, V> - where A: DatabaseAdapter, - V: ?Sized + Serialize + Deserialize<'txn>, +impl<'env, T, V> Deref for Fix + where T: AsRef>, + V: Archive, { - pub fn get(&self, txn: &'txn T, key: &A::Key) - -> StdResult, A::Err> - { - let opts = bincode_default(); + type Target = V::Archived; - self.db.get(txn, &A::serialize_key(key)) - .map_or_else( - |err| match err { - lmdb::Error::NotFound => Ok(None), - e => Err(e.into()), - }, - |ok| opts.deserialize(ok) - .map_err(|e| e.into()) - .map(Option::Some) - ) - } - - /// Update `value` in-place from the database - /// - /// Returns `Ok(false)` if the key wasn't found. If this functions returns an error `value` - /// will be in an indeterminate state where some parts may be updated from the db. - pub fn get_in_place(&self, txn: &'txn T, key: &A::Key, value: &mut V) - -> StdResult - { - let opts = bincode_default(); - - self.db.get(txn, &A::serialize_key(key)) - .map_or_else( - |err| match err { - lmdb::Error::NotFound => Ok(false), - e => Err(e.into()), - }, - |ok| opts.deserialize_in_place_buffer(ok, value) - .map_err(|e| e.into()) - .map(|()| true) - ) - } - - pub fn iter(&self, txn: &'txn T) -> StdResult, A::Err> { - let mut cursor = self.db.open_ro_cursor(txn)?; - let iter = cursor.iter_start(); - Ok(ObjectIter::new(cursor, iter)) - } - - pub fn put(&self, txn: &'txn mut RwTransaction, key: &A::Key, value: &V, flags: lmdb::WriteFlags) - -> StdResult<(), A::Err> - { - let opts = bincode::DefaultOptions::new() - .with_varint_encoding(); - - // Serialized values are always at most as big as their memory representation. - // So even if usize is 32 bit this is safe given no segmenting is taking place. - let bufsize = opts.serialized_size(value)? as usize; - - let buffer = self.db.reserve(txn, &A::serialize_key(key), bufsize, flags)?; - - opts.serialize_into(buffer, value).map_err(|e| e.into()) - } - - pub fn del(&self, txn: &'txn mut RwTransaction, key: &A::Key) - -> StdResult<(), A::Err> - { - self.db.del::<&[u8], &[u8]>(txn, &A::serialize_key(key), None).map_err(|e| e.into()) + fn deref(&self) -> &Self::Target { + // As long as the transaction is kept alive (which it is, because it's in self) state is a + // valid pointer so this is safe. + unsafe { self.ptr.as_ref() } } } -pub struct ObjectIter<'txn, A, V: ?Sized> { - cursor: RoCursor<'txn>, - inner: Iter<'txn>, - - adapter: PhantomData, - marker: PhantomData<&'txn V>, -} - -impl<'txn, A, V: ?Sized> ObjectIter<'txn, A, V> { - pub fn new(cursor: RoCursor<'txn>, inner: Iter<'txn>) -> Self { - let marker = PhantomData; - let adapter = PhantomData; - Self { cursor, inner, adapter, marker } - } -} - -impl<'txn, A, V> Iterator for ObjectIter<'txn, A, V> - where A: DatabaseAdapter, - V: ?Sized + Serialize + Deserialize<'txn>, -{ - type Item = StdResult; - - fn next(&mut self) -> Option { - self.inner.next()? - .map_or_else( - |err| Some(Err(err.into())), - |(_, v)| Some(bincode_default().deserialize(v).map_err(|e| e.into())) - ) - } -} - - #[cfg(test)] mod tests { use super::*; @@ -356,7 +320,6 @@ mod tests { let db = DB::new(e.env.clone(), ldb); - let adapter = TestAdapter; let testdb = TestDB::new(db.clone()); let mut val = "value"; diff --git a/src/db/machine.rs b/src/db/machine.rs index 01b8002..7e371c4 100644 --- a/src/db/machine.rs +++ b/src/db/machine.rs @@ -75,7 +75,7 @@ impl MachineState { pub fn init(log: Logger, _config: &Config, env: Arc) -> Result { let mut flags = lmdb::DatabaseFlags::empty(); - flags.set(lmdb::DatabaseFlags::INTEGER_KEY, true); + //flags.set(lmdb::DatabaseFlags::INTEGER_KEY, true); let machdb = env.create_db(Some("machines"), flags)?; debug!(&log, "Opened machine db successfully."); diff --git a/src/error.rs b/src/error.rs index 801b435..bd6b7bc 100644 --- a/src/error.rs +++ b/src/error.rs @@ -10,7 +10,7 @@ use futures::task as futures_task; use paho_mqtt::errors as mqtt; -use crate::network; +//FIXME use crate::network; #[derive(Debug)] pub enum Error { @@ -28,7 +28,7 @@ pub enum Error { MQTT(mqtt::Error), BadVersion((u32,u32)), Argon2(argon2::Error), - EventNetwork(network::Error), + //EventNetwork(network::Error), Denied, } @@ -80,9 +80,9 @@ impl fmt::Display for Error { Error::Denied => { write!(f, "You do not have the permission required to do that.") } - Error::EventNetwork(e) => { + /*Error::EventNetwork(e) => { e.fmt(f) - } + }*/ } } } @@ -159,11 +159,11 @@ impl From for Error { } } -impl From for Error { +/*impl From for Error { fn from(e: network::Error) -> Error { Error::EventNetwork(e) } -} +}*/ impl From for Error { fn from(e: argon2::Error) -> Error { diff --git a/src/main.rs b/src/main.rs index 7fb60fa..a4206a1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,13 +9,11 @@ extern crate capnp_rpc; extern crate async_trait; +/* mod modules; mod log; -mod api; mod config; -mod error; mod connection; -mod schema; mod db; mod machine; mod builtin; @@ -24,8 +22,15 @@ mod network; mod actor; mod initiator; mod space; +*/ mod resource; +mod schema; +mod state; +mod error; +mod db; + +/* use clap::{App, Arg}; @@ -44,8 +49,13 @@ use slog::Logger; use paho_mqtt::AsyncClient; use crate::config::Config; +*/ -fn main() { +pub fn main() { + +} + +/*fn main() { use clap::{crate_version, crate_description, crate_name}; // Argument parsing @@ -199,3 +209,4 @@ fn maybe(matches: clap::ArgMatches, log: Arc) -> Result<(), Error> { server::serve_api_connections(log.clone(), config, db, network, ex) } } +*/ diff --git a/src/resource.rs b/src/resource.rs index d800b4a..bf9a31b 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -1,25 +1,92 @@ -use core::sync::atomic; +use async_trait::async_trait; -/// A something BFFH holds internal state of -pub struct Resource { - // claims - strong: atomic::AtomicUsize, - weak: atomic::AtomicUsize, - max_strong: usize, +use std::pin::Pin; +use std::task::{Poll, Context}; + +use futures::ready; +use futures::future::{Future, BoxFuture}; +use futures::channel::oneshot; +use futures::sink::Sink; +use futures_signals::signal::Mutable; + +use smol::prelude::*; +use smol::future::FutureExt; +use smol::channel::{Sender, Receiver}; + +use crate::error::Error; +use crate::state::{State, StateStorage}; + +/// A resource in BFFH has to contain several different parts; +/// - Currently set state +/// - Execution state of attached actors (⇒ BFFH's job) +/// - Output of interal logic of a resource +/// ⇒ Resource logic gets read access to set state and write access to output state. +/// ⇒ state `update` happens via resource logic. This logic should do access control. If the update +/// succeeds then BFFH stores those input parameters ("set" state) and results / output state. +/// Storing input parameters is relevant so that BFFH can know that an "update" is a no-op +/// without having to run the module code. +/// ⇒ in fact actors only really care about the output state, and shouldn't (need to) see "set" +/// state. +/// ⇒ example reserving: +/// - Claimant sends 'update' message with a new state +/// - Doesn't set the state until `update` has returned Ok. +/// - This runs the `update` function with that new state and the claimants user context returning +/// either an Ok or an Error. +/// - Error is returned to Claimant to show user, stop. +/// - On ok: +/// - Commit new "set" state, storing it and making it visible to all other claimants +/// - Commit new output state, storing it and notifying all connected actors / Notify +/// ⇒ BFFHs job in this whole ordeal is: +/// - Message passing primitives so that update message are queued +/// - As reliable as possible storage system for input and output state +/// - Again message passing so that updates are broadcasted to all Notify and Actors. +/// ⇒ Resource module's job is: +/// - Validating updates semantically i.e. are the types correct +/// - Check authorization of updates i.e. is this user allowed to do that +#[async_trait] +pub trait Resource { + /// Returns true if the given state is valid, and false otherwise + fn validate(&mut self, state: &State) -> bool; + + /// Run whatever internal logic this resource has for the given State update, and return the + /// new output state that this update produces. + async fn update(&mut self, state: &State) -> Result; } -/// A claim is taken in lieu of an user on a resource. -/// -/// They come in two flavours: Weak, of which an infinite amount can exist, and Strong which may be -/// limited in number. Strong claims represent the right of the user to use this resource -/// "writable". A weak claim indicates co-usage of a resource and are mainly useful for notice and -/// information of the respective other ones. E.g. a space would be strongly claimed by keyholders -/// when they check in and released when they check out and weakly claimed by everybody else. In -/// that case the last strong claim could also fail to be released if there are outstanding weak -/// claims. Alternatively, releasing the last strong claim also releases all weak claims and sets -/// the resource to "Free" again. -/// -/// Most importantly, claims can be released by *both* the claim holder and the resource. -pub struct Claim { - id: u128, +pub struct Update { + pub state: State, + pub errchan: oneshot::Sender, +} + +pub struct ResourceDriver { + res: Box, + db: StateStorage, + + rx: Receiver, + signal: Mutable, +} + +impl ResourceDriver { + pub async fn drive_to_end(&mut self) { + while let Ok(update) = self.rx.recv().await { + let state = update.state; + let errchan = update.errchan; + + match self.res.update(&state).await { + Ok(outstate) => { + // FIXME: Send any error here to some global error collector. A failed write to + // the DB is not necessarily fatal, but it means that BFFH is now in an + // inconsistent state until a future update succeeds with writing to the DB. + // Not applying the new state isn't correct either since we don't know what the + // internal logic of the resource has done to make this happen. + // Another half right solution is to unwrap and recreate everything. + self.db.store(&state, &outstate); + self.signal.set_neq(outstate); + }, + Err(e) => { + errchan.send(e); + } + } + } + } } diff --git a/src/schema.rs b/src/schema.rs index 15314d8..70d6308 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -1,31 +1,28 @@ +pub use capnpc::schema_capnp; + #[allow(dead_code)] -pub mod authenticationsystem_capnp { - include!(concat!(env!("OUT_DIR"), "/schema/authenticationsystem_capnp.rs")); +pub mod auth_capnp { + include!(concat!(env!("OUT_DIR"), "/schema/auth_capnp.rs")); } #[allow(dead_code)] -pub mod connection_capnp { - include!(concat!(env!("OUT_DIR"), "/schema/connection_capnp.rs")); +pub mod main_capnp { + include!(concat!(env!("OUT_DIR"), "/schema/main_capnp.rs")); } #[allow(dead_code)] -pub mod general_capnp { - include!(concat!(env!("OUT_DIR"), "/schema/general_capnp.rs")); +pub mod utils_capnp { + include!(concat!(env!("OUT_DIR"), "/schema/utils_capnp.rs")); } #[allow(dead_code)] -pub mod machine_capnp { - include!(concat!(env!("OUT_DIR"), "/schema/machine_capnp.rs")); +pub mod resource_capnp { + include!(concat!(env!("OUT_DIR"), "/schema/resource_capnp.rs")); } #[allow(dead_code)] -pub mod machinesystem_capnp { - include!(concat!(env!("OUT_DIR"), "/schema/machinesystem_capnp.rs")); -} - -#[allow(dead_code)] -pub mod permissionsystem_capnp { - include!(concat!(env!("OUT_DIR"), "/schema/permissionsystem_capnp.rs")); +pub mod resources_capnp { + include!(concat!(env!("OUT_DIR"), "/schema/resources_capnp.rs")); } #[allow(dead_code)] @@ -33,17 +30,12 @@ pub mod role_capnp { include!(concat!(env!("OUT_DIR"), "/schema/role_capnp.rs")); } -#[allow(dead_code)] -pub mod space_capnp { - include!(concat!(env!("OUT_DIR"), "/schema/space_capnp.rs")); -} - #[allow(dead_code)] pub mod user_capnp { include!(concat!(env!("OUT_DIR"), "/schema/user_capnp.rs")); } #[allow(dead_code)] -pub mod usersystem_capnp { - include!(concat!(env!("OUT_DIR"), "/schema/usersystem_capnp.rs")); +pub mod users_capnp { + include!(concat!(env!("OUT_DIR"), "/schema/users_capnp.rs")); } diff --git a/src/state.rs b/src/state.rs new file mode 100644 index 0000000..9c0e643 --- /dev/null +++ b/src/state.rs @@ -0,0 +1,332 @@ +use std::fmt; +use std::rc::Rc; +use std::sync::Arc; +use std::any::Any; +use std::collections::{HashMap, hash_map::DefaultHasher}; +use std::hash::{Hash, Hasher}; +use std::default::Default; +use std::ptr::NonNull; +use std::alloc::Layout; +use std::ops::Deref; + +use rkyv::{ + Archive, + Archived, + + Serialize, + Deserialize, + + Fallible, + ser::{ + Serializer, + ScratchSpace, + serializers::*, + }, + + string::{ + StringResolver, + ArchivedString, + }, + + out_field, + archived_root, +}; +use rkyv_dyn::{ + archive_dyn, +}; +use rkyv_typename::TypeName; + +use crate::error::Error; +use crate::db::{DB, Environment, WriteFlags, Transaction, RoTransaction}; + +#[archive_dyn(deserialize)] +/// Trait to be implemented by any value in the state map. +/// +/// A value can be any type not having dangling references (with the added restriction that it has +/// to implement `Debug` for debugger QoL). +/// In fact Value *also* needs to implement Hash since BFFH checks if the state is different to +/// before on input and output before updating the resource re. notifying actors and notifys. This +/// dependency is not expressable via supertraits since it is not possible to make Hash into a +/// trait object. +/// To solve this [`State`] uses the [`StateBuilder`] which adds an `Hash` requirement for inputs +/// on [`add`](struct::StateBuilder::add). The hash is being created over all inserted values and +/// then used to check for equality. Note that in addition to collisions, Hash is not guaranteed +/// stable over ordering and will additionally not track overwrites, so if the order of insertions +/// changes or values are set and later overwritten then two equal States can and are likely to +/// have different hashes. +pub trait Value: Any + fmt::Debug { } + +#[repr(transparent)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Archive, Serialize, Deserialize)] +#[archive_attr(derive(TypeName, Debug))] +pub struct Bool(bool); + +#[archive_dyn(deserialize)] +impl Value for Bool { } +impl Value for Archived { } + +#[repr(transparent)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Archive, Serialize, Deserialize)] +#[archive_attr(derive(TypeName, Debug))] +pub struct UInt32(u32); + +#[archive_dyn(deserialize)] +impl Value for UInt32 { } +impl Value for Archived { } + +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Archive, Serialize, Deserialize)] +#[archive_attr(derive(TypeName, Debug))] +pub struct Vec3u8 { + a: u8, + b: u8, + c: u8, +} + +#[archive_dyn(deserialize)] +impl Value for Vec3u8 { } +impl Value for Archived { } + +#[derive(Archive, Serialize, Deserialize)] +/// State object of a resource +/// +/// This object serves three functions: +/// 1. it is constructed by modification via Claims or via internal resource logic +/// 2. it is serializable and storable in the database +/// 3. it is sendable and forwarded to all Actors and Notifys +pub struct State { + hash: u64, + inner: Vec<(String, Box)>, +} +impl PartialEq for State { + fn eq(&self, other: &Self) -> bool { + self.hash == other.hash + } +} +impl Eq for State {} + +impl fmt::Debug for State { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut sf = f.debug_struct("State"); + for (k, v) in self.inner.iter() { + sf.field(k, v); + } + sf.finish() + } +} + +impl State { + pub fn build() -> StateBuilder { + StateBuilder::new() + } + + pub fn hash(&self) -> u64 { + self.hash + } +} + +pub struct StateBuilder { + hasher: DefaultHasher, + inner: Vec<(String, Box)> +} + +impl StateBuilder { + pub fn new() -> Self { + let hasher = DefaultHasher::new(); + Self { inner: Vec::new(), hasher } + } + + pub fn finish(self) -> State { + State { + hash: self.hasher.finish(), + inner: self.inner, + } + } + + /// Add key-value pair to the State being built. + /// + /// We have to use this split system here because type erasure prevents us from limiting values + /// to `Hash`. Specifically, you can't have a trait object of `Hash` because `Hash` depends on + /// `Self`. In this function however the compiler still knows the exact type of `V` and can + /// call statically call its `hash` method. + pub fn add(mut self, key: String, val: V) -> Self + where V: SerializeValue + Hash + { + key.hash(&mut self.hasher); + val.hash(&mut self.hasher); + + self.inner.push((key, Box::new(val))); + + self + } +} + +pub struct StateStorage { + key: u64, + db: StateDB +} + +impl StateStorage { + pub fn new(key: u64, db: StateDB) -> Self { + Self { key, db } + } + + pub fn store(&mut self, instate: &State, outstate: &State) -> Result<(), Error> { + self.db.store(self.key, instate, outstate) + } +} + +struct SizeSerializer { + pos: usize, + scratch: FallbackScratch, AllocScratch>, +} +impl SizeSerializer { + pub fn new() -> Self { + Self { pos: 0, scratch: FallbackScratch::default() } + } +} +impl Fallible for SizeSerializer { + type Error = AllocScratchError; +} +impl Serializer for SizeSerializer { + fn pos(&self) -> usize { + self.pos + } + fn write(&mut self, bytes: &[u8]) -> Result<(), Self::Error> { + self.pos += bytes.len(); + Ok(()) + } +} +impl ScratchSpace for SizeSerializer { + unsafe fn push_scratch( + &mut self, + layout: Layout + ) -> Result, Self::Error> { + self.scratch.push_scratch(layout) + } + + unsafe fn pop_scratch( + &mut self, + ptr: NonNull, + layout: Layout + ) -> Result<(), Self::Error> { + self.scratch.pop_scratch(ptr, layout) + } +} + +type LmdbSerializer = CompositeSerializer< + BufferSerializer, + FallbackScratch, AllocScratch>, + SharedSerializeMap, +>; + + +pub struct StateDB { + input: DB, + output: DB, +} + +impl StateDB { + pub fn new(input: DB, output: DB) -> Self { + Self { input, output } + } + + fn get_size(&self, state: &State) -> usize { + let mut serializer = SizeSerializer::new(); + serializer.serialize_value(state); + serializer.pos() + } + + pub fn store(&self, key: u64, instate: &State, outstate: &State) -> Result<(), Error> { + let insize = self.get_size(instate); + let outsize = self.get_size(outstate); + + let mut txn = self.input.begin_rw_txn()?; + + let mut inbuf = self.input.reserve(&mut txn, &key.to_ne_bytes(), insize, WriteFlags::empty())?; + let bufser = BufferSerializer::new(inbuf); + let ser: LmdbSerializer<&mut [u8], 1024> = LmdbSerializer::new( + bufser, + FallbackScratch::default(), + SharedSerializeMap::default() + ); + + let mut outbuf = self.output.reserve(&mut txn, &key.to_ne_bytes(), outsize, WriteFlags::empty())?; + let bufser = BufferSerializer::new(outbuf); + let ser: LmdbSerializer<&mut [u8], 1024> = LmdbSerializer::new( + bufser, + FallbackScratch::default(), + SharedSerializeMap::default() + ); + + txn.commit()?; + + Ok(()) + } + + pub fn get_txn<'txn, T: Transaction>(&self, key: u64, txn: &'txn T) + -> Result<(&'txn ArchivedState, &'txn ArchivedState), Error> + { + let inbuf = self.input.get(txn, &key.to_ne_bytes())?; + let outbuf = self.output.get(txn, &key.to_ne_bytes())?; + let instate = unsafe { + archived_root::(inbuf.as_ref()) + }; + let outstate = unsafe { + archived_root::(outbuf.as_ref()) + }; + + Ok((instate, outstate)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::db::tests::open_test_env; + use lmdb::{ + EnvironmentFlags as EF, + DatabaseFlags as DF, + WriteFlags as WF, + }; + + use rkyv::Infallible; + use rkyv::ser::serializers::AllocSerializer; + use rkyv::archived_root; + use rkyv::util::archived_value; + + #[test] + fn construct_state() { + let b = State::build() + .add("Colour".to_string(), Vec3u8 { a: 1, b: 2, c: 3}) + .add("Powered".to_string(), Bool(true)) + .add("Intensity".to_string(), UInt32(4242)) + .finish(); + + println!("({}) {:?}", b.hash(), b); + + let mut serializer = AllocSerializer::<256>::default(); + let pos = serializer.serialize_value(&b).unwrap(); + let buf = serializer.into_serializer().into_inner(); + + println!("Encsize: {}", buf.len()); + + let archived_state = unsafe { + archived_value::(buf.as_ref(), pos) + }; + let s: State = archived_state.deserialize(&mut Infallible).unwrap(); + + println!("({}) {:?}", pos, s); + } + + #[test] + fn function_name_test() { + let te = open_text_env(); + let ildb = e.create_db(Some("input"), DF::empty()).expect("Failed to create db file"); + let oldb = e.create_db(Some("output"), DF::empty()).expect("Failed to create db file"); + + let idb = DB::new(e.env.clone(), ildb); + let odb = DB::new(e.env.clone(), oldb); + let db = StateDB::new(idb, odb); + } +}