diff --git a/src/db.rs b/src/db.rs deleted file mode 100644 index e25a57c..0000000 --- a/src/db.rs +++ /dev/null @@ -1,368 +0,0 @@ -use std::{ - mem::size_of, - ops::Deref, - ptr::NonNull, - rc::Rc, - sync::Arc, - marker::PhantomData, - hash::{ - Hash, - Hasher, - BuildHasher, - }, - collections::hash_map::RandomState, -}; - -use rkyv::{ - Archive, - Archived, - archived_root, - - Serialize, - Deserialize, - - ser::serializers::AllocScratchError, -}; - -use lmdb::{ - Database, - Cursor, - RoCursor, - Iter, -}; - -pub use rkyv::{ - Fallible, -}; -pub use lmdb::{ - Environment, - - DatabaseFlags, - WriteFlags, - - Transaction, - RoTransaction, - RwTransaction, -}; - - -#[derive(Debug, Clone)] -pub struct RawDB { - db: Database, -} - -impl RawDB { - pub fn open(env: &Environment, name: Option<&str>) -> lmdb::Result { - env.open_db(name).map(|db| Self { db }) - } - - pub fn create(env: &Environment, name: Option<&str>, flags: DatabaseFlags) -> lmdb::Result { - env.create_db(name, flags).map(|db| Self { db }) - } - - pub fn get<'txn, T: Transaction, K>(&self, txn: &'txn T, key: &K) -> lmdb::Result> - where K: AsRef<[u8]> - { - match txn.get(self.db, key) { - Ok(buf) => Ok(Some(buf)), - Err(lmdb::Error::NotFound) => Ok(None), - Err(e) => Err(e), - } - } - - pub fn put(&self, txn: &mut RwTransaction, key: &K, value: &V, flags: WriteFlags) - -> lmdb::Result<()> - where K: AsRef<[u8]>, - V: AsRef<[u8]>, - { - txn.put(self.db, key, value, flags) - } - - pub fn reserve<'txn, K>(&self, txn: &'txn mut RwTransaction, key: &K, size: usize, flags: WriteFlags) - -> lmdb::Result<&'txn mut [u8]> - where K: AsRef<[u8]> - { - txn.reserve(self.db, key, size, flags) - } - - pub fn del(&self, txn: &mut RwTransaction, key: &K, value: Option<&V>) -> lmdb::Result<()> - where K: AsRef<[u8]>, - V: AsRef<[u8]>, - { - txn.del(self.db, key, value.map(AsRef::as_ref)) - } - - pub fn iter<'txn, C: Cursor<'txn>>(&self, cursor: &'txn mut C) -> Iter<'txn> { - cursor.iter_start() - } - - pub fn open_ro_cursor<'txn, T: Transaction>(&self, txn: &'txn T) -> lmdb::Result> { - txn.open_ro_cursor(self.db) - } -} - -/// An read-only entry reference -pub struct EntryPtr<'txn, K, V> { - key: &'txn K, - val: &'txn V, -} - -#[derive(Archive, Serialize, Deserialize)] -/// The entry as it is stored inside the database. -struct Entry { - key: K, - val: V, -} - -pub struct HashDB<'txn, K, V, S = RandomState> { - db: RawDB, - hash_builder: S, - phantom: &'txn PhantomData<(K,V)>, -} - -impl HashDB<'_, K, V> -{ - pub fn create(env: &Environment, name: Option<&str>) -> lmdb::Result { - Self::create_with_hasher(env, name, RandomState::new()) - } - pub fn open(env: &Environment, name: Option<&str>) -> lmdb::Result { - Self::open_with_hasher(env, name, RandomState::new()) - } -} - -impl HashDB<'_, K, V, S> -{ - pub fn create_with_hasher(env: &Environment, name: Option<&str>, hash_builder: S) -> lmdb::Result { - let flags = DatabaseFlags::INTEGER_KEY | DatabaseFlags::DUP_SORT; - let db = RawDB::create(env, name, flags)?; - - Ok(Self { - db, - hash_builder, - phantom: &PhantomData, - }) - } - pub fn open_with_hasher(env: &Environment, name: Option<&str>, hash_builder: S) -> lmdb::Result { - let db = RawDB::open(env, name)?; - - Ok(Self { - db, - hash_builder, - phantom: &PhantomData, - }) - } - -} - -impl<'txn, K, V, S> HashDB<'txn, K, V, S> - where K: Eq + Hash + Archive, - V: Archive, - S: BuildHasher, - K::Archived: PartialEq, -{ - /// Retrieve an entry from the hashdb - /// - /// The result is a view pinned to the lifetime of the transaction. You can get owned Values - /// using [`Deserialize`]. - pub fn get(&self, txn: &'txn T, key: &K) -> lmdb::Result>>> - { - let mut hasher = self.hash_builder.build_hasher(); - key.hash(&mut hasher); - let hash = hasher.finish(); - - let mut cursor = self.db.open_ro_cursor(txn)?; - for res in cursor.iter_dup_of(&hash.to_ne_bytes()) { - let (_keybuf, valbuf) = res?; - let entry: &Archived> = unsafe { archived_root::>(valbuf.as_ref()) }; - - if &entry.key == key { - return Ok(Some(entry)) /*(EntryPtr { - key: &entry.key, - val: &entry.val, - }))*/; - } - } - - Ok(None) - } - - pub fn insert(&self, txn: &mut RwTransaction, entry: Archived>) -> lmdb::Result<()> { - - } -} - -/// Memory Fixpoint for a value in the DB -/// -/// LMDB binds lifetimes of buffers to the transaction that returned the buffer. As long as this -/// transaction is not `commit()`ed, `abort()`ed or `reset()`ed the pages containing these values -/// are not returned into circulation. -/// This struct encodes this by binding a live reference to the Transaction to the returned -/// and interpreted buffer. The placeholder `T` is the container for the transaction. This may be a -/// plain `RoTransaction<'env>`, a `Rc` (meaning Fix is !Send) or an `Arc`, depending -/// on your needs. -pub struct Fix { - ptr: NonNull, - txn: T, -} -pub type PinnedGet<'env, V> = Fix, V>; -pub type LocalKeep<'env, V> = Fix>, V>; -pub type GlobalKeep<'env, V> = Fix>, V>; - -impl<'env, T, V> Fix - where T: AsRef>, - V: Archive, -{ - pub fn get(txn: T, db: &DB, key: u64) -> lmdb::Result> { - match db.get(txn.as_ref(), &key.to_ne_bytes()) { - Ok(buf) => Ok(Some( - Self { - ptr: unsafe { archived_root::(buf.as_ref()).into() }, - txn, - } - )), - Err(lmdb::Error::NotFound) => Ok(None), - Err(e) => Err(e), - } - } -} -impl<'env, T, V> Deref for Fix - where T: AsRef>, - V: Archive, -{ - type Target = V::Archived; - - fn deref(&self) -> &Self::Target { - // As long as the transaction is kept alive (which it is, because it's in self) state is a - // valid pointer so this is safe. - unsafe { self.ptr.as_ref() } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::result::Result; - use std::ops::Deref; - - use lmdb::{ - EnvironmentFlags as EF, - DatabaseFlags as DF, - WriteFlags as WF, - }; - - pub struct TempEnv { - dir: tempfile::TempDir, - env: Arc, - } - - impl Deref for TempEnv { - type Target = Arc; - fn deref(&self) -> &Self::Target { - &self.env - } - } - - pub fn open_test_env() -> TempEnv { - let dir = tempfile::tempdir().expect("Failed to create tempdir for testdb"); - let env = Environment::new() - .set_flags(EF::NO_SYNC | EF::WRITE_MAP) - .open(dir.path()).expect("Failed to open lmdb"); - let env = Arc::new(env); - - TempEnv { dir, env } - } - - struct TestAdapter; - - #[derive(Debug)] - enum TestErr { - Utf8(std::str::Utf8Error), - Binc(Box), - LMDB(lmdb::Error), - } - - impl From for TestErr { - fn from(e: lmdb::Error) -> TestErr { - TestErr::LMDB(e) - } - } - - impl From for TestErr { - fn from(e: std::str::Utf8Error) -> TestErr { - TestErr::Utf8(e) - } - } - - impl From for TestErr { - fn from(e: bincode::Error) -> TestErr { - TestErr::Binc(e) - } - } - - impl DatabaseAdapter for TestAdapter { - type Key = str; - type Err = TestErr; - - fn serialize_key(key: &Self::Key) -> &[u8] { - key.as_bytes() - } - fn deserialize_key<'de>(input: &'de [u8]) -> Result<&'de Self::Key, Self::Err> { - std::str::from_utf8(input).map_err(|e| e.into()) - } - } - - type TestDB<'txn> = Objectstore<'txn, TestAdapter, &'txn str>; - - #[test] - fn simple_get() { - let e = open_test_env(); - let ldb = e.create_db(None, DF::empty()).expect("Failed to create lmdb db"); - - let db = DB::new(e.env.clone(), ldb); - - let testdb = TestDB::new(db.clone()); - - let mut val = "value"; - let mut txn = db.begin_rw_txn().expect("Failed to being rw txn"); - testdb.put(&mut txn, "key", &val, WF::empty()).expect("Failed to insert"); - testdb.put(&mut txn, "key2", &val, WF::empty()).expect("Failed to insert"); - testdb.put(&mut txn, "key3", &val, WF::empty()).expect("Failed to insert"); - testdb.put(&mut txn, "key4", &val, WF::empty()).expect("Failed to insert"); - testdb.put(&mut txn, "key5", &val, WF::empty()).expect("Failed to insert"); - txn.commit().expect("commit failed"); - - { - let txn; - txn = db.begin_ro_txn().unwrap(); - - let val = testdb.get(&txn, "key").expect("Failed to retrieve"); - assert_eq!(Some("value"), val); - } - - { - let val2 = "longer_value"; - let mut txn = db.begin_rw_txn().unwrap(); - testdb.put(&mut txn, "key", &val2, WF::empty()).expect("Failed to update"); - txn.commit().unwrap(); - } - - { - let txn = db.begin_ro_txn().unwrap(); - let found = testdb.get_in_place(&txn, "key", &mut val).expect("Failed to retrieve update"); - assert!(found); - assert_eq!("longer_value", val); - } - - { - let txn = db.begin_ro_txn().unwrap(); - let mut it = testdb.iter(&txn).unwrap(); - assert_eq!("longer_value", it.next().unwrap().unwrap()); - let mut i = 0; - while let Some(e) = it.next() { - assert_eq!("value", e.unwrap()); - i += 1; - } - assert_eq!(i, 4) - } - } -} diff --git a/src/db/fix.rs b/src/db/fix.rs new file mode 100644 index 0000000..fd08c3c --- /dev/null +++ b/src/db/fix.rs @@ -0,0 +1,41 @@ +use std::{ + ptr::NonNull, + ops::Deref, +}; + +use lmdb::Transaction; + +/// Memory Fixpoint for a value in the DB +/// +/// LMDB binds lifetimes of buffers to the transaction that returned the buffer. As long as this +/// transaction is not `commit()`ed, `abort()`ed or `reset()`ed the pages containing these values +/// are not returned into circulation. +/// This struct encodes this by binding a live reference to the Transaction to the returned +/// and interpreted buffer. The placeholder `T` is the container for the transaction. This may be a +/// plain `RoTransaction<'env>`, a `Rc` (meaning Fix is !Send) or an `Arc`, depending +/// on your needs. +pub struct LMDBorrow { + ptr: NonNull, + txn: T, +} + +impl<'env, T, V> LMDBorrow + where T: Transaction, +{ + pub unsafe fn fix(txn: T, ptr: &'_ V) -> Self { + Self { ptr: ptr.into(), txn, } + } +} + +impl<'env, T, V> Deref for LMDBorrow +{ + type Target = V; + + fn deref(&self) -> &Self::Target { + // As long as the transaction is kept alive (which it is, because it's in self) state is a + // valid pointer so this is safe. + unsafe { self.ptr.as_ref() } + } +} + + diff --git a/src/db/hash.rs b/src/db/hash.rs new file mode 100644 index 0000000..962fda4 --- /dev/null +++ b/src/db/hash.rs @@ -0,0 +1,163 @@ +use std::{ + marker::PhantomData, + hash::{ + Hash, + Hasher, + BuildHasher, + }, + collections::hash_map::RandomState, +}; + +use rkyv::{ + Archive, + Archived, + Serialize, + Deserialize, + Fallible, +}; + +use super::{ + DB, + Adapter, + OutputBuffer, + + Environment, + + DatabaseFlags, + WriteFlags, + + Transaction, + RwTransaction, +}; + + +#[derive(Archive, Serialize, Deserialize)] +/// The entry as it is stored inside the database. +pub struct Entry { + pub key: K, + pub val: V, +} + +pub struct HashAdapter { + k: PhantomData, + a: PhantomData, +} +impl HashAdapter { + pub fn new() -> Self { + Self { k: PhantomData, a: PhantomData } + } +} + +impl Fallible for HashAdapter { type Error = ::Error; } +impl Adapter for HashAdapter + where K: Archive, + Entry: Serialize, +{ + type Value = Entry; + type Serializer = A::Serializer; + + fn new_serializer() -> Self::Serializer + { A::new_serializer() } + + fn from_db_err(e: lmdb::Error) -> ::Error + { A::from_db_err(e) } + + fn from_ser_err(e: ::Error) -> ::Error + { A::from_ser_err(e) } +} + + +const DEFAULT_HASH_FLAGS: libc::c_uint = + DatabaseFlags::INTEGER_KEY.bits() + DatabaseFlags::DUP_SORT.bits(); + +pub struct HashDB +{ + db: DB>, + hash_builder: H, +} + +impl HashDB +{ + pub unsafe fn create(env: &Environment, name: Option<&str>) -> lmdb::Result { + Self::create_with_hasher(env, name, RandomState::new()) + } + pub unsafe fn open(env: &Environment, name: Option<&str>) -> lmdb::Result { + Self::open_with_hasher(env, name, RandomState::new()) + } +} + +impl HashDB +{ + fn new(db: DB>, hash_builder: H) -> Self { + Self { db, hash_builder } + } + + pub unsafe fn create_with_hasher(env: &Environment, name: Option<&str>, hash_builder: H) + -> lmdb::Result + { + let flags = DatabaseFlags::from_bits(DEFAULT_HASH_FLAGS).unwrap(); + DB::create(env, name, flags).map(|db| Self::new(db, hash_builder)) + } + pub unsafe fn open_with_hasher(env: &Environment, name: Option<&str>, hash_builder: H) + -> lmdb::Result + { + DB::open(env, name).map(|db| Self::new(db, hash_builder)) + } + +} + +impl HashDB + where A: Adapter, + HashAdapter: Adapter>, + H: BuildHasher, + K: Hash + Archive, + K::Archived: PartialEq, +{ + /// Retrieve an entry from the hashdb + /// + /// The result is a view pinned to the lifetime of the transaction. You can get owned Values + /// using [`Deserialize`]. + pub fn get<'txn, T: Transaction>(&self, txn: &'txn T, key: &K) + -> Result< + Option<&'txn Archived< as Adapter>::Value>>, + as Fallible>::Error + > + { + let mut hasher = self.hash_builder.build_hasher(); + key.hash(&mut hasher); + let hash = hasher.finish(); + + let mut cursor = self.db.open_ro_cursor(txn)?; + let i = cursor + .iter_dup_of(&hash.to_ne_bytes()).filter_map(|r| r.ok()) + .map(|(_keybuf, entry)| entry); + for entry in i { + let entry: &Archived> = entry; + if entry.key == *key { + return Ok(Some(entry)); + } + } + + Ok(None) + } +} + +impl<'a, A, K, H> HashDB + where A: Adapter, + A::Serializer: OutputBuffer, + H: BuildHasher, + K: Hash + Serialize, + K::Archived: PartialEq, +{ + pub fn insert_entry(&self, txn: &mut RwTransaction, entry: &Entry) + -> Result<(), A::Error> + { + let mut hasher = self.hash_builder.build_hasher(); + entry.key.hash(&mut hasher); + let hash = hasher.finish(); + + self.db.put(txn, &hash.to_ne_bytes(), entry, WriteFlags::empty())?; + + Ok(()) + } +} diff --git a/src/db/mod.rs b/src/db/mod.rs new file mode 100644 index 0000000..0c7fa33 --- /dev/null +++ b/src/db/mod.rs @@ -0,0 +1,164 @@ +pub use lmdb::{ + Environment, + + DatabaseFlags, + WriteFlags, + EnvironmentFlags, + + Transaction, + RoTransaction, + RwTransaction, +}; + +mod raw; +use raw::RawDB; + +mod typed; +// re-exports +pub use typed::{ + DB, + Cursor, + + Adapter, + OutputBuffer, + OutputWriter, +}; + +mod hash; +pub use hash::{ + HashDB, + Entry, +}; + +mod fix; +pub use fix::LMDBorrow; + + +#[cfg(test)] +mod tests { + use super::*; + use std::result::Result; + use std::ops::Deref; + + use lmdb::{ + EnvironmentFlags as EF, + DatabaseFlags as DF, + WriteFlags as WF, + }; + + pub struct TempEnv { + dir: tempfile::TempDir, + env: Arc, + } + + impl Deref for TempEnv { + type Target = Arc; + fn deref(&self) -> &Self::Target { + &self.env + } + } + + pub fn open_test_env() -> TempEnv { + let dir = tempfile::tempdir().expect("Failed to create tempdir for testdb"); + let env = Environment::new() + .set_flags(EF::NO_SYNC | EF::WRITE_MAP) + .open(dir.path()).expect("Failed to open lmdb"); + let env = Arc::new(env); + + TempEnv { dir, env } + } + + struct TestAdapter; + + #[derive(Debug)] + enum TestErr { + Utf8(std::str::Utf8Error), + Binc(Box), + LMDB(lmdb::Error), + } + + impl From for TestErr { + fn from(e: lmdb::Error) -> TestErr { + TestErr::LMDB(e) + } + } + + impl From for TestErr { + fn from(e: std::str::Utf8Error) -> TestErr { + TestErr::Utf8(e) + } + } + + impl From for TestErr { + fn from(e: bincode::Error) -> TestErr { + TestErr::Binc(e) + } + } + + impl DatabaseAdapter for TestAdapter { + type Key = str; + type Err = TestErr; + + fn serialize_key(key: &Self::Key) -> &[u8] { + key.as_bytes() + } + fn deserialize_key<'de>(input: &'de [u8]) -> Result<&'de Self::Key, Self::Err> { + std::str::from_utf8(input).map_err(|e| e.into()) + } + } + + type TestDB<'txn> = Objectstore<'txn, TestAdapter, &'txn str>; + + #[test] + fn simple_get() { + let e = open_test_env(); + let ldb = e.create_db(None, DF::empty()).expect("Failed to create lmdb db"); + + let db = DB::new(e.env.clone(), ldb); + + let testdb = TestDB::new(db.clone()); + + let mut val = "value"; + let mut txn = db.begin_rw_txn().expect("Failed to being rw txn"); + testdb.put(&mut txn, "key", &val, WF::empty()).expect("Failed to insert"); + testdb.put(&mut txn, "key2", &val, WF::empty()).expect("Failed to insert"); + testdb.put(&mut txn, "key3", &val, WF::empty()).expect("Failed to insert"); + testdb.put(&mut txn, "key4", &val, WF::empty()).expect("Failed to insert"); + testdb.put(&mut txn, "key5", &val, WF::empty()).expect("Failed to insert"); + txn.commit().expect("commit failed"); + + { + let txn; + txn = db.begin_ro_txn().unwrap(); + + let val = testdb.get(&txn, "key").expect("Failed to retrieve"); + assert_eq!(Some("value"), val); + } + + { + let val2 = "longer_value"; + let mut txn = db.begin_rw_txn().unwrap(); + testdb.put(&mut txn, "key", &val2, WF::empty()).expect("Failed to update"); + txn.commit().unwrap(); + } + + { + let txn = db.begin_ro_txn().unwrap(); + let found = testdb.get_in_place(&txn, "key", &mut val).expect("Failed to retrieve update"); + assert!(found); + assert_eq!("longer_value", val); + } + + { + let txn = db.begin_ro_txn().unwrap(); + let mut it = testdb.iter(&txn).unwrap(); + assert_eq!("longer_value", it.next().unwrap().unwrap()); + let mut i = 0; + while let Some(e) = it.next() { + assert_eq!("value", e.unwrap()); + i += 1; + } + assert_eq!(i, 4) + } + } +} diff --git a/src/db/raw.rs b/src/db/raw.rs new file mode 100644 index 0000000..64d8105 --- /dev/null +++ b/src/db/raw.rs @@ -0,0 +1,62 @@ +use lmdb::{ + Transaction, + RwTransaction, + Environment, + DatabaseFlags, + WriteFlags, +}; + +#[derive(Debug, Clone)] +pub struct RawDB { + db: lmdb::Database, +} + +impl RawDB { + pub fn open(env: &Environment, name: Option<&str>) -> lmdb::Result { + env.open_db(name).map(|db| Self { db }) + } + + pub fn create(env: &Environment, name: Option<&str>, flags: DatabaseFlags) -> lmdb::Result { + env.create_db(name, flags).map(|db| Self { db }) + } + + pub fn get<'txn, T: Transaction, K>(&self, txn: &'txn T, key: &K) -> lmdb::Result> + where K: AsRef<[u8]> + { + match txn.get(self.db, key) { + Ok(buf) => Ok(Some(buf)), + Err(lmdb::Error::NotFound) => Ok(None), + Err(e) => Err(e), + } + } + + pub fn put(&self, txn: &mut RwTransaction, key: &K, value: &V, flags: WriteFlags) + -> lmdb::Result<()> + where K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + txn.put(self.db, key, value, flags) + } + + pub fn reserve<'txn, K>(&self, txn: &'txn mut RwTransaction, key: &K, size: usize, flags: WriteFlags) + -> lmdb::Result<&'txn mut [u8]> + where K: AsRef<[u8]> + { + txn.reserve(self.db, key, size, flags) + } + + pub fn del(&self, txn: &mut RwTransaction, key: &K, value: Option<&V>) -> lmdb::Result<()> + where K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + txn.del(self.db, key, value.map(AsRef::as_ref)) + } + + pub fn iter<'txn, C: lmdb::Cursor<'txn>>(&self, cursor: &'txn mut C) -> lmdb::Iter<'txn> { + cursor.iter_start() + } + + pub fn open_ro_cursor<'txn, T: Transaction>(&self, txn: &'txn T) -> lmdb::Result> { + txn.open_ro_cursor(self.db) + } +} diff --git a/src/state.rs b/src/db/state.rs similarity index 61% rename from src/state.rs rename to src/db/state.rs index 9c0e643..c78998f 100644 --- a/src/state.rs +++ b/src/db/state.rs @@ -1,13 +1,18 @@ -use std::fmt; -use std::rc::Rc; -use std::sync::Arc; -use std::any::Any; -use std::collections::{HashMap, hash_map::DefaultHasher}; -use std::hash::{Hash, Hasher}; -use std::default::Default; -use std::ptr::NonNull; -use std::alloc::Layout; -use std::ops::Deref; +use std::{ + fmt, + + any::Any, + + collections::{ + hash_map::DefaultHasher + }, + hash::{ + Hash, + Hasher + }, + + path::Path, +}; use rkyv::{ Archive, @@ -16,28 +21,29 @@ use rkyv::{ Serialize, Deserialize, - Fallible, - ser::{ - Serializer, - ScratchSpace, - serializers::*, - }, - - string::{ - StringResolver, - ArchivedString, - }, - out_field, - archived_root, + + Fallible, + ser::serializers::AllocSerializer, }; use rkyv_dyn::{ archive_dyn, }; use rkyv_typename::TypeName; -use crate::error::Error; -use crate::db::{DB, Environment, WriteFlags, Transaction, RoTransaction}; +use crate::db::{ + DB, + Environment, + + EnvironmentFlags, + DatabaseFlags, + WriteFlags, + + Adapter, + + Transaction, + RwTransaction, +}; #[archive_dyn(deserialize)] /// Trait to be implemented by any value in the state map. @@ -160,122 +166,101 @@ impl StateBuilder { } } -pub struct StateStorage { - key: u64, - db: StateDB +struct StateAdapter; + +enum StateError { + LMDB(lmdb::Error), + RKYV( as Fallible>::Error), } -impl StateStorage { - pub fn new(key: u64, db: StateDB) -> Self { - Self { key, db } - } - - pub fn store(&mut self, instate: &State, outstate: &State) -> Result<(), Error> { - self.db.store(self.key, instate, outstate) +impl From for StateError { + fn from(e: lmdb::Error) -> Self { + Self::LMDB(e) } } -struct SizeSerializer { - pos: usize, - scratch: FallbackScratch, AllocScratch>, +impl Fallible for StateAdapter { + type Error = StateError; } -impl SizeSerializer { - pub fn new() -> Self { - Self { pos: 0, scratch: FallbackScratch::default() } - } -} -impl Fallible for SizeSerializer { - type Error = AllocScratchError; -} -impl Serializer for SizeSerializer { - fn pos(&self) -> usize { - self.pos - } - fn write(&mut self, bytes: &[u8]) -> Result<(), Self::Error> { - self.pos += bytes.len(); - Ok(()) - } -} -impl ScratchSpace for SizeSerializer { - unsafe fn push_scratch( - &mut self, - layout: Layout - ) -> Result, Self::Error> { - self.scratch.push_scratch(layout) +impl Adapter for StateAdapter { + type Serializer = AllocSerializer<1024>; + type Value = State; + + fn new_serializer() -> Self::Serializer { + Self::Serializer::default() } - unsafe fn pop_scratch( - &mut self, - ptr: NonNull, - layout: Layout - ) -> Result<(), Self::Error> { - self.scratch.pop_scratch(ptr, layout) + fn from_ser_err(e: ::Error) -> Self::Error { + StateError::RKYV(e) + } + fn from_db_err(e: lmdb::Error) -> Self::Error { + e.into() } } -type LmdbSerializer = CompositeSerializer< - BufferSerializer, - FallbackScratch, AllocScratch>, - SharedSerializeMap, ->; - - +/// State Database containing the currently set state pub struct StateDB { - input: DB, - output: DB, + /// The environment for all the databases below + env: Environment, + + input: DB, + output: DB, + + // TODO: Index resource name/id/uuid -> u64 } impl StateDB { - pub fn new(input: DB, output: DB) -> Self { - Self { input, output } + fn open_env>(path: &P) -> lmdb::Result { + Environment::new() + .set_flags( EnvironmentFlags::WRITE_MAP + | EnvironmentFlags::NO_SUB_DIR + | EnvironmentFlags::NO_TLS + | EnvironmentFlags::NO_READAHEAD) + .set_max_dbs(2) + .open(path.as_ref()) } - fn get_size(&self, state: &State) -> usize { - let mut serializer = SizeSerializer::new(); - serializer.serialize_value(state); - serializer.pos() + fn new(env: Environment, input: DB, output: DB) -> Self { + Self { env, input, output } } - pub fn store(&self, key: u64, instate: &State, outstate: &State) -> Result<(), Error> { - let insize = self.get_size(instate); - let outsize = self.get_size(outstate); + pub fn init>(path: &P) -> lmdb::Result { + let env = Self::open_env(path)?; + let input = unsafe { + DB::create(&env, Some("input"), DatabaseFlags::INTEGER_KEY)? + }; + let output = unsafe { + DB::create(&env, Some("output"), DatabaseFlags::INTEGER_KEY)? + }; - let mut txn = self.input.begin_rw_txn()?; + Ok(Self::new(env, input, output)) + } - let mut inbuf = self.input.reserve(&mut txn, &key.to_ne_bytes(), insize, WriteFlags::empty())?; - let bufser = BufferSerializer::new(inbuf); - let ser: LmdbSerializer<&mut [u8], 1024> = LmdbSerializer::new( - bufser, - FallbackScratch::default(), - SharedSerializeMap::default() - ); + pub fn open>(path: &P) -> lmdb::Result { + let env = Self::open_env(path)?; + let input = unsafe { DB::open(&env, Some("input"))? }; + let output = unsafe { DB::open(&env, Some("output"))? }; - let mut outbuf = self.output.reserve(&mut txn, &key.to_ne_bytes(), outsize, WriteFlags::empty())?; - let bufser = BufferSerializer::new(outbuf); - let ser: LmdbSerializer<&mut [u8], 1024> = LmdbSerializer::new( - bufser, - FallbackScratch::default(), - SharedSerializeMap::default() - ); - - txn.commit()?; + Ok(Self::new(env, input, output)) + } + fn update_txn(&self, txn: &mut RwTransaction, key: u64, input: &State, output: &State) + -> Result<(), ::Error> + { + let flags = WriteFlags::empty(); + let k = key.to_ne_bytes(); + self.input.put(txn, &k, input, flags)?; + self.output.put(txn, &k, output, flags)?; Ok(()) } - pub fn get_txn<'txn, T: Transaction>(&self, key: u64, txn: &'txn T) - -> Result<(&'txn ArchivedState, &'txn ArchivedState), Error> + fn update(&self, key: u64, input: &State, output: &State) + -> Result<(), ::Error> { - let inbuf = self.input.get(txn, &key.to_ne_bytes())?; - let outbuf = self.output.get(txn, &key.to_ne_bytes())?; - let instate = unsafe { - archived_root::(inbuf.as_ref()) - }; - let outstate = unsafe { - archived_root::(outbuf.as_ref()) - }; + let mut txn = self.env.begin_rw_txn().map_err(StateAdapter::from_db_err)?; + self.update_txn(&mut txn, key, input, output)?; - Ok((instate, outstate)) + txn.commit().map_err(StateAdapter::from_db_err) } } diff --git a/src/db/typed.rs b/src/db/typed.rs new file mode 100644 index 0000000..08e99ff --- /dev/null +++ b/src/db/typed.rs @@ -0,0 +1,198 @@ +use std::{ + marker::PhantomData, +}; + +use rkyv::{ + Archived, + archived_root, + + Serialize, + + ser::{ + Serializer, + serializers::AllocSerializer, + }, + + util::AlignedVec, + + Fallible, +}; + +use lmdb::{ + Environment, + DatabaseFlags, + WriteFlags, + + Transaction, + RwTransaction, +}; + +use super::RawDB; + +pub trait Adapter: Fallible { + type Serializer: rkyv::ser::Serializer; + type Value: Serialize; + + fn new_serializer() -> Self::Serializer; + + fn from_ser_err(e: ::Error) -> ::Error; + fn from_db_err(e: lmdb::Error) -> ::Error; +} + +pub trait OutputBuffer { + type Buffer: AsRef<[u8]>; + fn into_slice(self) -> Self::Buffer; +} + +impl OutputBuffer for AllocSerializer { + type Buffer = AlignedVec; + fn into_slice(self) -> Self::Buffer { + self.into_serializer().into_inner() + } +} + +pub trait OutputWriter: Fallible { + fn write_into(&mut self, buf: &mut [u8]) -> Result<(), Self::Error>; +} + +pub struct DB { + db: RawDB, + phantom: PhantomData, +} + +impl DB { + fn new(db: RawDB) -> Self { + Self { db, phantom: PhantomData } + } + + /// Open the underlying DB, creating it if necessary + /// + /// This function is unsafe since if the DB does not contain `A::Archived` we may end up doing + /// random memory reads or writes + pub unsafe fn create(env: &Environment, name: Option<&str>, flags: DatabaseFlags) + -> lmdb::Result + { + RawDB::create(env, name, flags).map(Self::new) + } + + /// Open the underlying DB + /// + /// This function is unsafe since if the DB does not contain `A::Archived` we may end up doing + /// random memory reads or writes + pub unsafe fn open(env: &Environment, name: Option<&str>) -> lmdb::Result { + RawDB::open(env, name).map(Self::new) + } +} + +impl DB +{ + pub fn del>(&self, txn: &mut RwTransaction, key: &K) -> Result<(), A::Error> { + let v: Option<&Vec> = None; + self.db.del(txn, key, v).map_err(A::from_db_err) + } +} + +impl DB +{ + pub fn get<'txn, T: Transaction, K: AsRef<[u8]>>(&self, txn: &'txn T, key: &K) + -> Result>, A::Error> + { + if let Some(buf) = self.db.get(txn, key).map_err(A::from_db_err)? { + Ok(Some(unsafe { archived_root::(buf.as_ref()) })) + } else { + Ok(None) + } + } + + pub fn open_ro_cursor<'txn, T: Transaction>(&self, txn: &'txn T) + -> Result, A>, A::Error> + { + let c = self.db.open_ro_cursor(txn) + .map_err(A::from_db_err)?; + // Safe because we are providing both Adapter and cursor and know it matches + Ok(unsafe { Cursor::new(c) }) + } +} + +impl<'a, A> DB + where A: Adapter, + A::Serializer: OutputBuffer, +{ + pub fn put>(&self, txn: &mut RwTransaction, key: &K, val: &A::Value, flags: WriteFlags) + -> Result + { + let mut serializer = A::new_serializer(); + let pos = serializer.serialize_value(val) + .map_err(A::from_ser_err)?; + + let buf = serializer.into_slice(); + self.db.put(txn, key, &buf, flags) + .map_err(A::from_db_err)?; + + Ok(pos) + } +} + +impl<'a, A> DB + where A: Adapter, + A::Serializer: OutputWriter, +{ + pub fn put_nocopy>(&self, txn: &mut RwTransaction, key: &K, val: &A::Value, flags: WriteFlags) + -> Result + { + let mut serializer = A::new_serializer(); + let pos = serializer.serialize_value(val) + .map_err(A::from_ser_err)?; + + let mut buf = self.db.reserve(txn, &key.as_ref(), pos, flags) + .map_err(A::from_db_err)?; + serializer.write_into(&mut buf) + .map_err(A::from_ser_err)?; + + Ok(pos) + } +} + +pub struct Cursor { + cursor: C, + phantom: PhantomData, +} + +impl<'txn, C, A> Cursor + where C: lmdb::Cursor<'txn>, + A: Adapter, +{ + // Unsafe because we don't know if the given adapter matches the given cursor + pub unsafe fn new(cursor: C) -> Self { + Self { cursor, phantom: PhantomData } + } + + pub fn iter_dup_of>(&mut self, key: &K) -> Iter<'txn, A> { + let iter = self.cursor.iter_dup_of(key); + // Safe because `new` isn't :P + unsafe { Iter::new(iter) } + } +} + +pub struct Iter<'txn, A> { + iter: lmdb::Iter<'txn>, + phantom: PhantomData, +} + +impl<'txn, A: Adapter> Iter<'txn, A> { + pub unsafe fn new(iter: lmdb::Iter<'txn>) -> Self { + Self { iter, phantom: PhantomData } + } +} + +impl<'txn, A: Adapter> Iterator for Iter<'txn, A> + where Archived: 'txn +{ + type Item = Result<(&'txn [u8], &'txn Archived), A::Error>; + + fn next(&mut self) -> Option { + self.iter.next().map(|r| r + .map_err(A::from_db_err) + .map(|(key, buf)| { (key, unsafe { archived_root::(buf) }) })) + } +} diff --git a/src/main.rs b/src/main.rs index a4206a1..703cb9b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,6 @@ // FIXME: No. #![allow(dead_code)] - -#[macro_use] -extern crate slog; - -#[macro_use] -extern crate capnp_rpc; +#![forbid(unused_imports)] extern crate async_trait; diff --git a/src/resource.rs b/src/resource.rs index bf9a31b..d35ceb3 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -1,20 +1,12 @@ use async_trait::async_trait; -use std::pin::Pin; -use std::task::{Poll, Context}; - -use futures::ready; -use futures::future::{Future, BoxFuture}; use futures::channel::oneshot; -use futures::sink::Sink; use futures_signals::signal::Mutable; -use smol::prelude::*; -use smol::future::FutureExt; -use smol::channel::{Sender, Receiver}; +use smol::channel::Receiver; use crate::error::Error; -use crate::state::{State, StateStorage}; +use crate::state::{State, StateDB}; /// A resource in BFFH has to contain several different parts; /// - Currently set state @@ -60,7 +52,7 @@ pub struct Update { pub struct ResourceDriver { res: Box, - db: StateStorage, + db: StateDB, rx: Receiver, signal: Mutable, @@ -80,7 +72,7 @@ impl ResourceDriver { // Not applying the new state isn't correct either since we don't know what the // internal logic of the resource has done to make this happen. // Another half right solution is to unwrap and recreate everything. - self.db.store(&state, &outstate); + //self.db.store(&state, &outstate); self.signal.set_neq(outstate); }, Err(e) => {