Whoo boy this is a big one

This commit is contained in:
Nadja Reitzenstein 2021-10-07 16:44:01 +02:00
parent 6a6bc4e452
commit 33131f38c4
9 changed files with 731 additions and 499 deletions

368
src/db.rs
View File

@ -1,368 +0,0 @@
use std::{
mem::size_of,
ops::Deref,
ptr::NonNull,
rc::Rc,
sync::Arc,
marker::PhantomData,
hash::{
Hash,
Hasher,
BuildHasher,
},
collections::hash_map::RandomState,
};
use rkyv::{
Archive,
Archived,
archived_root,
Serialize,
Deserialize,
ser::serializers::AllocScratchError,
};
use lmdb::{
Database,
Cursor,
RoCursor,
Iter,
};
pub use rkyv::{
Fallible,
};
pub use lmdb::{
Environment,
DatabaseFlags,
WriteFlags,
Transaction,
RoTransaction,
RwTransaction,
};
#[derive(Debug, Clone)]
pub struct RawDB {
db: Database,
}
impl RawDB {
pub fn open(env: &Environment, name: Option<&str>) -> lmdb::Result<Self> {
env.open_db(name).map(|db| Self { db })
}
pub fn create(env: &Environment, name: Option<&str>, flags: DatabaseFlags) -> lmdb::Result<Self> {
env.create_db(name, flags).map(|db| Self { db })
}
pub fn get<'txn, T: Transaction, K>(&self, txn: &'txn T, key: &K) -> lmdb::Result<Option<&'txn [u8]>>
where K: AsRef<[u8]>
{
match txn.get(self.db, key) {
Ok(buf) => Ok(Some(buf)),
Err(lmdb::Error::NotFound) => Ok(None),
Err(e) => Err(e),
}
}
pub fn put<K, V>(&self, txn: &mut RwTransaction, key: &K, value: &V, flags: WriteFlags)
-> lmdb::Result<()>
where K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
txn.put(self.db, key, value, flags)
}
pub fn reserve<'txn, K>(&self, txn: &'txn mut RwTransaction, key: &K, size: usize, flags: WriteFlags)
-> lmdb::Result<&'txn mut [u8]>
where K: AsRef<[u8]>
{
txn.reserve(self.db, key, size, flags)
}
pub fn del<K, V>(&self, txn: &mut RwTransaction, key: &K, value: Option<&V>) -> lmdb::Result<()>
where K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
txn.del(self.db, key, value.map(AsRef::as_ref))
}
pub fn iter<'txn, C: Cursor<'txn>>(&self, cursor: &'txn mut C) -> Iter<'txn> {
cursor.iter_start()
}
pub fn open_ro_cursor<'txn, T: Transaction>(&self, txn: &'txn T) -> lmdb::Result<RoCursor<'txn>> {
txn.open_ro_cursor(self.db)
}
}
/// An read-only entry reference
pub struct EntryPtr<'txn, K, V> {
key: &'txn K,
val: &'txn V,
}
#[derive(Archive, Serialize, Deserialize)]
/// The entry as it is stored inside the database.
struct Entry<K: Archive, V: Archive> {
key: K,
val: V,
}
pub struct HashDB<'txn, K, V, S = RandomState> {
db: RawDB,
hash_builder: S,
phantom: &'txn PhantomData<(K,V)>,
}
impl<K, V> HashDB<'_, K, V>
{
pub fn create(env: &Environment, name: Option<&str>) -> lmdb::Result<Self> {
Self::create_with_hasher(env, name, RandomState::new())
}
pub fn open(env: &Environment, name: Option<&str>) -> lmdb::Result<Self> {
Self::open_with_hasher(env, name, RandomState::new())
}
}
impl<K, V, S> HashDB<'_, K, V, S>
{
pub fn create_with_hasher(env: &Environment, name: Option<&str>, hash_builder: S) -> lmdb::Result<Self> {
let flags = DatabaseFlags::INTEGER_KEY | DatabaseFlags::DUP_SORT;
let db = RawDB::create(env, name, flags)?;
Ok(Self {
db,
hash_builder,
phantom: &PhantomData,
})
}
pub fn open_with_hasher(env: &Environment, name: Option<&str>, hash_builder: S) -> lmdb::Result<Self> {
let db = RawDB::open(env, name)?;
Ok(Self {
db,
hash_builder,
phantom: &PhantomData,
})
}
}
impl<'txn, K, V, S> HashDB<'txn, K, V, S>
where K: Eq + Hash + Archive,
V: Archive,
S: BuildHasher,
K::Archived: PartialEq<K>,
{
/// Retrieve an entry from the hashdb
///
/// The result is a view pinned to the lifetime of the transaction. You can get owned Values
/// using [`Deserialize`].
pub fn get<T: Transaction>(&self, txn: &'txn T, key: &K) -> lmdb::Result<Option<&'txn Archived<Entry<K, V>>>>
{
let mut hasher = self.hash_builder.build_hasher();
key.hash(&mut hasher);
let hash = hasher.finish();
let mut cursor = self.db.open_ro_cursor(txn)?;
for res in cursor.iter_dup_of(&hash.to_ne_bytes()) {
let (_keybuf, valbuf) = res?;
let entry: &Archived<Entry<K, V>> = unsafe { archived_root::<Entry<K,V>>(valbuf.as_ref()) };
if &entry.key == key {
return Ok(Some(entry)) /*(EntryPtr {
key: &entry.key,
val: &entry.val,
}))*/;
}
}
Ok(None)
}
pub fn insert(&self, txn: &mut RwTransaction, entry: Archived<Entry<K, V>>) -> lmdb::Result<()> {
}
}
/// Memory Fixpoint for a value in the DB
///
/// LMDB binds lifetimes of buffers to the transaction that returned the buffer. As long as this
/// transaction is not `commit()`ed, `abort()`ed or `reset()`ed the pages containing these values
/// are not returned into circulation.
/// This struct encodes this by binding a live reference to the Transaction to the returned
/// and interpreted buffer. The placeholder `T` is the container for the transaction. This may be a
/// plain `RoTransaction<'env>`, a `Rc<RoTxn>` (meaning Fix is !Send) or an `Arc<RoTxn>`, depending
/// on your needs.
pub struct Fix<T, V: Archive> {
ptr: NonNull<V::Archived>,
txn: T,
}
pub type PinnedGet<'env, V> = Fix<RoTransaction<'env>, V>;
pub type LocalKeep<'env, V> = Fix<Rc<RoTransaction<'env>>, V>;
pub type GlobalKeep<'env, V> = Fix<Arc<RoTransaction<'env>>, V>;
impl<'env, T, V> Fix<T, V>
where T: AsRef<RoTransaction<'env>>,
V: Archive,
{
pub fn get(txn: T, db: &DB<V>, key: u64) -> lmdb::Result<Option<Self>> {
match db.get(txn.as_ref(), &key.to_ne_bytes()) {
Ok(buf) => Ok(Some(
Self {
ptr: unsafe { archived_root::<V>(buf.as_ref()).into() },
txn,
}
)),
Err(lmdb::Error::NotFound) => Ok(None),
Err(e) => Err(e),
}
}
}
impl<'env, T, V> Deref for Fix<T, V>
where T: AsRef<RoTransaction<'env>>,
V: Archive,
{
type Target = V::Archived;
fn deref(&self) -> &Self::Target {
// As long as the transaction is kept alive (which it is, because it's in self) state is a
// valid pointer so this is safe.
unsafe { self.ptr.as_ref() }
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::result::Result;
use std::ops::Deref;
use lmdb::{
EnvironmentFlags as EF,
DatabaseFlags as DF,
WriteFlags as WF,
};
pub struct TempEnv {
dir: tempfile::TempDir,
env: Arc<Environment>,
}
impl Deref for TempEnv {
type Target = Arc<Environment>;
fn deref(&self) -> &Self::Target {
&self.env
}
}
pub fn open_test_env() -> TempEnv {
let dir = tempfile::tempdir().expect("Failed to create tempdir for testdb");
let env = Environment::new()
.set_flags(EF::NO_SYNC | EF::WRITE_MAP)
.open(dir.path()).expect("Failed to open lmdb");
let env = Arc::new(env);
TempEnv { dir, env }
}
struct TestAdapter;
#[derive(Debug)]
enum TestErr {
Utf8(std::str::Utf8Error),
Binc(Box<bincode::ErrorKind>),
LMDB(lmdb::Error),
}
impl From<lmdb::Error> for TestErr {
fn from(e: lmdb::Error) -> TestErr {
TestErr::LMDB(e)
}
}
impl From<std::str::Utf8Error> for TestErr {
fn from(e: std::str::Utf8Error) -> TestErr {
TestErr::Utf8(e)
}
}
impl From<bincode::Error> for TestErr {
fn from(e: bincode::Error) -> TestErr {
TestErr::Binc(e)
}
}
impl DatabaseAdapter for TestAdapter {
type Key = str;
type Err = TestErr;
fn serialize_key(key: &Self::Key) -> &[u8] {
key.as_bytes()
}
fn deserialize_key<'de>(input: &'de [u8]) -> Result<&'de Self::Key, Self::Err> {
std::str::from_utf8(input).map_err(|e| e.into())
}
}
type TestDB<'txn> = Objectstore<'txn, TestAdapter, &'txn str>;
#[test]
fn simple_get() {
let e = open_test_env();
let ldb = e.create_db(None, DF::empty()).expect("Failed to create lmdb db");
let db = DB::new(e.env.clone(), ldb);
let testdb = TestDB::new(db.clone());
let mut val = "value";
let mut txn = db.begin_rw_txn().expect("Failed to being rw txn");
testdb.put(&mut txn, "key", &val, WF::empty()).expect("Failed to insert");
testdb.put(&mut txn, "key2", &val, WF::empty()).expect("Failed to insert");
testdb.put(&mut txn, "key3", &val, WF::empty()).expect("Failed to insert");
testdb.put(&mut txn, "key4", &val, WF::empty()).expect("Failed to insert");
testdb.put(&mut txn, "key5", &val, WF::empty()).expect("Failed to insert");
txn.commit().expect("commit failed");
{
let txn;
txn = db.begin_ro_txn().unwrap();
let val = testdb.get(&txn, "key").expect("Failed to retrieve");
assert_eq!(Some("value"), val);
}
{
let val2 = "longer_value";
let mut txn = db.begin_rw_txn().unwrap();
testdb.put(&mut txn, "key", &val2, WF::empty()).expect("Failed to update");
txn.commit().unwrap();
}
{
let txn = db.begin_ro_txn().unwrap();
let found = testdb.get_in_place(&txn, "key", &mut val).expect("Failed to retrieve update");
assert!(found);
assert_eq!("longer_value", val);
}
{
let txn = db.begin_ro_txn().unwrap();
let mut it = testdb.iter(&txn).unwrap();
assert_eq!("longer_value", it.next().unwrap().unwrap());
let mut i = 0;
while let Some(e) = it.next() {
assert_eq!("value", e.unwrap());
i += 1;
}
assert_eq!(i, 4)
}
}
}

41
src/db/fix.rs Normal file
View File

@ -0,0 +1,41 @@
use std::{
ptr::NonNull,
ops::Deref,
};
use lmdb::Transaction;
/// Memory Fixpoint for a value in the DB
///
/// LMDB binds lifetimes of buffers to the transaction that returned the buffer. As long as this
/// transaction is not `commit()`ed, `abort()`ed or `reset()`ed the pages containing these values
/// are not returned into circulation.
/// This struct encodes this by binding a live reference to the Transaction to the returned
/// and interpreted buffer. The placeholder `T` is the container for the transaction. This may be a
/// plain `RoTransaction<'env>`, a `Rc<RoTxn>` (meaning Fix is !Send) or an `Arc<RoTxn>`, depending
/// on your needs.
pub struct LMDBorrow<T, V> {
ptr: NonNull<V>,
txn: T,
}
impl<'env, T, V> LMDBorrow<T, V>
where T: Transaction,
{
pub unsafe fn fix(txn: T, ptr: &'_ V) -> Self {
Self { ptr: ptr.into(), txn, }
}
}
impl<'env, T, V> Deref for LMDBorrow<T, V>
{
type Target = V;
fn deref(&self) -> &Self::Target {
// As long as the transaction is kept alive (which it is, because it's in self) state is a
// valid pointer so this is safe.
unsafe { self.ptr.as_ref() }
}
}

163
src/db/hash.rs Normal file
View File

@ -0,0 +1,163 @@
use std::{
marker::PhantomData,
hash::{
Hash,
Hasher,
BuildHasher,
},
collections::hash_map::RandomState,
};
use rkyv::{
Archive,
Archived,
Serialize,
Deserialize,
Fallible,
};
use super::{
DB,
Adapter,
OutputBuffer,
Environment,
DatabaseFlags,
WriteFlags,
Transaction,
RwTransaction,
};
#[derive(Archive, Serialize, Deserialize)]
/// The entry as it is stored inside the database.
pub struct Entry<K: Archive, V: Archive> {
pub key: K,
pub val: V,
}
pub struct HashAdapter<K, A> {
k: PhantomData<K>,
a: PhantomData<A>,
}
impl<K, A> HashAdapter<K, A> {
pub fn new() -> Self {
Self { k: PhantomData, a: PhantomData }
}
}
impl<K, A: Fallible> Fallible for HashAdapter<K, A> { type Error = <A as Fallible>::Error; }
impl<K, A: Adapter> Adapter for HashAdapter<K, A>
where K: Archive,
Entry<K, A::Value>: Serialize<A::Serializer>,
{
type Value = Entry<K, A::Value>;
type Serializer = A::Serializer;
fn new_serializer() -> Self::Serializer
{ A::new_serializer() }
fn from_db_err(e: lmdb::Error) -> <A as Fallible>::Error
{ A::from_db_err(e) }
fn from_ser_err(e: <Self::Serializer as Fallible>::Error) -> <A as Fallible>::Error
{ A::from_ser_err(e) }
}
const DEFAULT_HASH_FLAGS: libc::c_uint =
DatabaseFlags::INTEGER_KEY.bits() + DatabaseFlags::DUP_SORT.bits();
pub struct HashDB<A, K, H = RandomState>
{
db: DB<HashAdapter<K, A>>,
hash_builder: H,
}
impl<A, K> HashDB<A, K>
{
pub unsafe fn create(env: &Environment, name: Option<&str>) -> lmdb::Result<Self> {
Self::create_with_hasher(env, name, RandomState::new())
}
pub unsafe fn open(env: &Environment, name: Option<&str>) -> lmdb::Result<Self> {
Self::open_with_hasher(env, name, RandomState::new())
}
}
impl<A, K, H: BuildHasher> HashDB<A, K, H>
{
fn new(db: DB<HashAdapter<K, A>>, hash_builder: H) -> Self {
Self { db, hash_builder }
}
pub unsafe fn create_with_hasher(env: &Environment, name: Option<&str>, hash_builder: H)
-> lmdb::Result<Self>
{
let flags = DatabaseFlags::from_bits(DEFAULT_HASH_FLAGS).unwrap();
DB::create(env, name, flags).map(|db| Self::new(db, hash_builder))
}
pub unsafe fn open_with_hasher(env: &Environment, name: Option<&str>, hash_builder: H)
-> lmdb::Result<Self>
{
DB::open(env, name).map(|db| Self::new(db, hash_builder))
}
}
impl<A, K, H> HashDB<A, K, H>
where A: Adapter,
HashAdapter<K, A>: Adapter<Value=Entry<K, A::Value>>,
H: BuildHasher,
K: Hash + Archive,
K::Archived: PartialEq<K>,
{
/// Retrieve an entry from the hashdb
///
/// The result is a view pinned to the lifetime of the transaction. You can get owned Values
/// using [`Deserialize`].
pub fn get<'txn, T: Transaction>(&self, txn: &'txn T, key: &K)
-> Result<
Option<&'txn Archived<<HashAdapter<K, A> as Adapter>::Value>>,
<HashAdapter<K, A> as Fallible>::Error
>
{
let mut hasher = self.hash_builder.build_hasher();
key.hash(&mut hasher);
let hash = hasher.finish();
let mut cursor = self.db.open_ro_cursor(txn)?;
let i = cursor
.iter_dup_of(&hash.to_ne_bytes()).filter_map(|r| r.ok())
.map(|(_keybuf, entry)| entry);
for entry in i {
let entry: &Archived<Entry<K, A::Value>> = entry;
if entry.key == *key {
return Ok(Some(entry));
}
}
Ok(None)
}
}
impl<'a, A, K, H> HashDB<A, K, H>
where A: Adapter,
A::Serializer: OutputBuffer,
H: BuildHasher,
K: Hash + Serialize<A::Serializer>,
K::Archived: PartialEq<K>,
{
pub fn insert_entry(&self, txn: &mut RwTransaction, entry: &Entry<K, A::Value>)
-> Result<(), A::Error>
{
let mut hasher = self.hash_builder.build_hasher();
entry.key.hash(&mut hasher);
let hash = hasher.finish();
self.db.put(txn, &hash.to_ne_bytes(), entry, WriteFlags::empty())?;
Ok(())
}
}

164
src/db/mod.rs Normal file
View File

@ -0,0 +1,164 @@
pub use lmdb::{
Environment,
DatabaseFlags,
WriteFlags,
EnvironmentFlags,
Transaction,
RoTransaction,
RwTransaction,
};
mod raw;
use raw::RawDB;
mod typed;
// re-exports
pub use typed::{
DB,
Cursor,
Adapter,
OutputBuffer,
OutputWriter,
};
mod hash;
pub use hash::{
HashDB,
Entry,
};
mod fix;
pub use fix::LMDBorrow;
#[cfg(test)]
mod tests {
use super::*;
use std::result::Result;
use std::ops::Deref;
use lmdb::{
EnvironmentFlags as EF,
DatabaseFlags as DF,
WriteFlags as WF,
};
pub struct TempEnv {
dir: tempfile::TempDir,
env: Arc<Environment>,
}
impl Deref for TempEnv {
type Target = Arc<Environment>;
fn deref(&self) -> &Self::Target {
&self.env
}
}
pub fn open_test_env() -> TempEnv {
let dir = tempfile::tempdir().expect("Failed to create tempdir for testdb");
let env = Environment::new()
.set_flags(EF::NO_SYNC | EF::WRITE_MAP)
.open(dir.path()).expect("Failed to open lmdb");
let env = Arc::new(env);
TempEnv { dir, env }
}
struct TestAdapter;
#[derive(Debug)]
enum TestErr {
Utf8(std::str::Utf8Error),
Binc(Box<bincode::ErrorKind>),
LMDB(lmdb::Error),
}
impl From<lmdb::Error> for TestErr {
fn from(e: lmdb::Error) -> TestErr {
TestErr::LMDB(e)
}
}
impl From<std::str::Utf8Error> for TestErr {
fn from(e: std::str::Utf8Error) -> TestErr {
TestErr::Utf8(e)
}
}
impl From<bincode::Error> for TestErr {
fn from(e: bincode::Error) -> TestErr {
TestErr::Binc(e)
}
}
impl DatabaseAdapter for TestAdapter {
type Key = str;
type Err = TestErr;
fn serialize_key(key: &Self::Key) -> &[u8] {
key.as_bytes()
}
fn deserialize_key<'de>(input: &'de [u8]) -> Result<&'de Self::Key, Self::Err> {
std::str::from_utf8(input).map_err(|e| e.into())
}
}
type TestDB<'txn> = Objectstore<'txn, TestAdapter, &'txn str>;
#[test]
fn simple_get() {
let e = open_test_env();
let ldb = e.create_db(None, DF::empty()).expect("Failed to create lmdb db");
let db = DB::new(e.env.clone(), ldb);
let testdb = TestDB::new(db.clone());
let mut val = "value";
let mut txn = db.begin_rw_txn().expect("Failed to being rw txn");
testdb.put(&mut txn, "key", &val, WF::empty()).expect("Failed to insert");
testdb.put(&mut txn, "key2", &val, WF::empty()).expect("Failed to insert");
testdb.put(&mut txn, "key3", &val, WF::empty()).expect("Failed to insert");
testdb.put(&mut txn, "key4", &val, WF::empty()).expect("Failed to insert");
testdb.put(&mut txn, "key5", &val, WF::empty()).expect("Failed to insert");
txn.commit().expect("commit failed");
{
let txn;
txn = db.begin_ro_txn().unwrap();
let val = testdb.get(&txn, "key").expect("Failed to retrieve");
assert_eq!(Some("value"), val);
}
{
let val2 = "longer_value";
let mut txn = db.begin_rw_txn().unwrap();
testdb.put(&mut txn, "key", &val2, WF::empty()).expect("Failed to update");
txn.commit().unwrap();
}
{
let txn = db.begin_ro_txn().unwrap();
let found = testdb.get_in_place(&txn, "key", &mut val).expect("Failed to retrieve update");
assert!(found);
assert_eq!("longer_value", val);
}
{
let txn = db.begin_ro_txn().unwrap();
let mut it = testdb.iter(&txn).unwrap();
assert_eq!("longer_value", it.next().unwrap().unwrap());
let mut i = 0;
while let Some(e) = it.next() {
assert_eq!("value", e.unwrap());
i += 1;
}
assert_eq!(i, 4)
}
}
}

62
src/db/raw.rs Normal file
View File

@ -0,0 +1,62 @@
use lmdb::{
Transaction,
RwTransaction,
Environment,
DatabaseFlags,
WriteFlags,
};
#[derive(Debug, Clone)]
pub struct RawDB {
db: lmdb::Database,
}
impl RawDB {
pub fn open(env: &Environment, name: Option<&str>) -> lmdb::Result<Self> {
env.open_db(name).map(|db| Self { db })
}
pub fn create(env: &Environment, name: Option<&str>, flags: DatabaseFlags) -> lmdb::Result<Self> {
env.create_db(name, flags).map(|db| Self { db })
}
pub fn get<'txn, T: Transaction, K>(&self, txn: &'txn T, key: &K) -> lmdb::Result<Option<&'txn [u8]>>
where K: AsRef<[u8]>
{
match txn.get(self.db, key) {
Ok(buf) => Ok(Some(buf)),
Err(lmdb::Error::NotFound) => Ok(None),
Err(e) => Err(e),
}
}
pub fn put<K, V>(&self, txn: &mut RwTransaction, key: &K, value: &V, flags: WriteFlags)
-> lmdb::Result<()>
where K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
txn.put(self.db, key, value, flags)
}
pub fn reserve<'txn, K>(&self, txn: &'txn mut RwTransaction, key: &K, size: usize, flags: WriteFlags)
-> lmdb::Result<&'txn mut [u8]>
where K: AsRef<[u8]>
{
txn.reserve(self.db, key, size, flags)
}
pub fn del<K, V>(&self, txn: &mut RwTransaction, key: &K, value: Option<&V>) -> lmdb::Result<()>
where K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
txn.del(self.db, key, value.map(AsRef::as_ref))
}
pub fn iter<'txn, C: lmdb::Cursor<'txn>>(&self, cursor: &'txn mut C) -> lmdb::Iter<'txn> {
cursor.iter_start()
}
pub fn open_ro_cursor<'txn, T: Transaction>(&self, txn: &'txn T) -> lmdb::Result<lmdb::RoCursor<'txn>> {
txn.open_ro_cursor(self.db)
}
}

View File

@ -1,13 +1,18 @@
use std::fmt; use std::{
use std::rc::Rc; fmt,
use std::sync::Arc;
use std::any::Any; any::Any,
use std::collections::{HashMap, hash_map::DefaultHasher};
use std::hash::{Hash, Hasher}; collections::{
use std::default::Default; hash_map::DefaultHasher
use std::ptr::NonNull; },
use std::alloc::Layout; hash::{
use std::ops::Deref; Hash,
Hasher
},
path::Path,
};
use rkyv::{ use rkyv::{
Archive, Archive,
@ -16,28 +21,29 @@ use rkyv::{
Serialize, Serialize,
Deserialize, Deserialize,
Fallible,
ser::{
Serializer,
ScratchSpace,
serializers::*,
},
string::{
StringResolver,
ArchivedString,
},
out_field, out_field,
archived_root,
Fallible,
ser::serializers::AllocSerializer,
}; };
use rkyv_dyn::{ use rkyv_dyn::{
archive_dyn, archive_dyn,
}; };
use rkyv_typename::TypeName; use rkyv_typename::TypeName;
use crate::error::Error; use crate::db::{
use crate::db::{DB, Environment, WriteFlags, Transaction, RoTransaction}; DB,
Environment,
EnvironmentFlags,
DatabaseFlags,
WriteFlags,
Adapter,
Transaction,
RwTransaction,
};
#[archive_dyn(deserialize)] #[archive_dyn(deserialize)]
/// Trait to be implemented by any value in the state map. /// Trait to be implemented by any value in the state map.
@ -160,122 +166,101 @@ impl StateBuilder {
} }
} }
pub struct StateStorage { struct StateAdapter;
key: u64,
db: StateDB enum StateError {
LMDB(lmdb::Error),
RKYV(<AllocSerializer<1024> as Fallible>::Error),
} }
impl StateStorage { impl From<lmdb::Error> for StateError {
pub fn new(key: u64, db: StateDB) -> Self { fn from(e: lmdb::Error) -> Self {
Self { key, db } Self::LMDB(e)
}
pub fn store(&mut self, instate: &State, outstate: &State) -> Result<(), Error> {
self.db.store(self.key, instate, outstate)
} }
} }
struct SizeSerializer { impl Fallible for StateAdapter {
pos: usize, type Error = StateError;
scratch: FallbackScratch<HeapScratch<1024>, AllocScratch>,
} }
impl SizeSerializer { impl Adapter for StateAdapter {
pub fn new() -> Self { type Serializer = AllocSerializer<1024>;
Self { pos: 0, scratch: FallbackScratch::default() } type Value = State;
}
} fn new_serializer() -> Self::Serializer {
impl Fallible for SizeSerializer { Self::Serializer::default()
type Error = AllocScratchError;
}
impl Serializer for SizeSerializer {
fn pos(&self) -> usize {
self.pos
}
fn write(&mut self, bytes: &[u8]) -> Result<(), Self::Error> {
self.pos += bytes.len();
Ok(())
}
}
impl ScratchSpace for SizeSerializer {
unsafe fn push_scratch(
&mut self,
layout: Layout
) -> Result<NonNull<[u8]>, Self::Error> {
self.scratch.push_scratch(layout)
} }
unsafe fn pop_scratch( fn from_ser_err(e: <Self::Serializer as Fallible>::Error) -> Self::Error {
&mut self, StateError::RKYV(e)
ptr: NonNull<u8>, }
layout: Layout fn from_db_err(e: lmdb::Error) -> Self::Error {
) -> Result<(), Self::Error> { e.into()
self.scratch.pop_scratch(ptr, layout)
} }
} }
type LmdbSerializer<B, const N: usize> = CompositeSerializer< /// State Database containing the currently set state
BufferSerializer<B>,
FallbackScratch<HeapScratch<N>, AllocScratch>,
SharedSerializeMap,
>;
pub struct StateDB { pub struct StateDB {
input: DB, /// The environment for all the databases below
output: DB, env: Environment,
input: DB<StateAdapter>,
output: DB<StateAdapter>,
// TODO: Index resource name/id/uuid -> u64
} }
impl StateDB { impl StateDB {
pub fn new(input: DB, output: DB) -> Self { fn open_env<P: AsRef<Path>>(path: &P) -> lmdb::Result<Environment> {
Self { input, output } Environment::new()
.set_flags( EnvironmentFlags::WRITE_MAP
| EnvironmentFlags::NO_SUB_DIR
| EnvironmentFlags::NO_TLS
| EnvironmentFlags::NO_READAHEAD)
.set_max_dbs(2)
.open(path.as_ref())
} }
fn get_size(&self, state: &State) -> usize { fn new(env: Environment, input: DB<StateAdapter>, output: DB<StateAdapter>) -> Self {
let mut serializer = SizeSerializer::new(); Self { env, input, output }
serializer.serialize_value(state);
serializer.pos()
} }
pub fn store(&self, key: u64, instate: &State, outstate: &State) -> Result<(), Error> { pub fn init<P: AsRef<Path>>(path: &P) -> lmdb::Result<Self> {
let insize = self.get_size(instate); let env = Self::open_env(path)?;
let outsize = self.get_size(outstate); let input = unsafe {
DB::create(&env, Some("input"), DatabaseFlags::INTEGER_KEY)?
};
let output = unsafe {
DB::create(&env, Some("output"), DatabaseFlags::INTEGER_KEY)?
};
let mut txn = self.input.begin_rw_txn()?; Ok(Self::new(env, input, output))
}
let mut inbuf = self.input.reserve(&mut txn, &key.to_ne_bytes(), insize, WriteFlags::empty())?; pub fn open<P: AsRef<Path>>(path: &P) -> lmdb::Result<Self> {
let bufser = BufferSerializer::new(inbuf); let env = Self::open_env(path)?;
let ser: LmdbSerializer<&mut [u8], 1024> = LmdbSerializer::new( let input = unsafe { DB::open(&env, Some("input"))? };
bufser, let output = unsafe { DB::open(&env, Some("output"))? };
FallbackScratch::default(),
SharedSerializeMap::default()
);
let mut outbuf = self.output.reserve(&mut txn, &key.to_ne_bytes(), outsize, WriteFlags::empty())?; Ok(Self::new(env, input, output))
let bufser = BufferSerializer::new(outbuf); }
let ser: LmdbSerializer<&mut [u8], 1024> = LmdbSerializer::new(
bufser,
FallbackScratch::default(),
SharedSerializeMap::default()
);
txn.commit()?;
fn update_txn(&self, txn: &mut RwTransaction, key: u64, input: &State, output: &State)
-> Result<(), <StateAdapter as Fallible>::Error>
{
let flags = WriteFlags::empty();
let k = key.to_ne_bytes();
self.input.put(txn, &k, input, flags)?;
self.output.put(txn, &k, output, flags)?;
Ok(()) Ok(())
} }
pub fn get_txn<'txn, T: Transaction>(&self, key: u64, txn: &'txn T) fn update(&self, key: u64, input: &State, output: &State)
-> Result<(&'txn ArchivedState, &'txn ArchivedState), Error> -> Result<(), <StateAdapter as Fallible>::Error>
{ {
let inbuf = self.input.get(txn, &key.to_ne_bytes())?; let mut txn = self.env.begin_rw_txn().map_err(StateAdapter::from_db_err)?;
let outbuf = self.output.get(txn, &key.to_ne_bytes())?; self.update_txn(&mut txn, key, input, output)?;
let instate = unsafe {
archived_root::<State>(inbuf.as_ref())
};
let outstate = unsafe {
archived_root::<State>(outbuf.as_ref())
};
Ok((instate, outstate)) txn.commit().map_err(StateAdapter::from_db_err)
} }
} }

198
src/db/typed.rs Normal file
View File

@ -0,0 +1,198 @@
use std::{
marker::PhantomData,
};
use rkyv::{
Archived,
archived_root,
Serialize,
ser::{
Serializer,
serializers::AllocSerializer,
},
util::AlignedVec,
Fallible,
};
use lmdb::{
Environment,
DatabaseFlags,
WriteFlags,
Transaction,
RwTransaction,
};
use super::RawDB;
pub trait Adapter: Fallible {
type Serializer: rkyv::ser::Serializer;
type Value: Serialize<Self::Serializer>;
fn new_serializer() -> Self::Serializer;
fn from_ser_err(e: <Self::Serializer as Fallible>::Error) -> <Self as Fallible>::Error;
fn from_db_err(e: lmdb::Error) -> <Self as Fallible>::Error;
}
pub trait OutputBuffer {
type Buffer: AsRef<[u8]>;
fn into_slice(self) -> Self::Buffer;
}
impl<const N: usize> OutputBuffer for AllocSerializer<N> {
type Buffer = AlignedVec;
fn into_slice(self) -> Self::Buffer {
self.into_serializer().into_inner()
}
}
pub trait OutputWriter: Fallible {
fn write_into(&mut self, buf: &mut [u8]) -> Result<(), Self::Error>;
}
pub struct DB<A> {
db: RawDB,
phantom: PhantomData<A>,
}
impl<A> DB<A> {
fn new(db: RawDB) -> Self {
Self { db, phantom: PhantomData }
}
/// Open the underlying DB, creating it if necessary
///
/// This function is unsafe since if the DB does not contain `A::Archived` we may end up doing
/// random memory reads or writes
pub unsafe fn create(env: &Environment, name: Option<&str>, flags: DatabaseFlags)
-> lmdb::Result<Self>
{
RawDB::create(env, name, flags).map(Self::new)
}
/// Open the underlying DB
///
/// This function is unsafe since if the DB does not contain `A::Archived` we may end up doing
/// random memory reads or writes
pub unsafe fn open(env: &Environment, name: Option<&str>) -> lmdb::Result<Self> {
RawDB::open(env, name).map(Self::new)
}
}
impl<A: Adapter> DB<A>
{
pub fn del<K: AsRef<[u8]>>(&self, txn: &mut RwTransaction, key: &K) -> Result<(), A::Error> {
let v: Option<&Vec<u8>> = None;
self.db.del(txn, key, v).map_err(A::from_db_err)
}
}
impl<A: Adapter> DB<A>
{
pub fn get<'txn, T: Transaction, K: AsRef<[u8]>>(&self, txn: &'txn T, key: &K)
-> Result<Option<&'txn Archived<A::Value>>, A::Error>
{
if let Some(buf) = self.db.get(txn, key).map_err(A::from_db_err)? {
Ok(Some(unsafe { archived_root::<A::Value>(buf.as_ref()) }))
} else {
Ok(None)
}
}
pub fn open_ro_cursor<'txn, T: Transaction>(&self, txn: &'txn T)
-> Result<Cursor<lmdb::RoCursor<'txn>, A>, A::Error>
{
let c = self.db.open_ro_cursor(txn)
.map_err(A::from_db_err)?;
// Safe because we are providing both Adapter and cursor and know it matches
Ok(unsafe { Cursor::new(c) })
}
}
impl<'a, A> DB<A>
where A: Adapter,
A::Serializer: OutputBuffer,
{
pub fn put<K: AsRef<[u8]>>(&self, txn: &mut RwTransaction, key: &K, val: &A::Value, flags: WriteFlags)
-> Result<usize, A::Error>
{
let mut serializer = A::new_serializer();
let pos = serializer.serialize_value(val)
.map_err(A::from_ser_err)?;
let buf = serializer.into_slice();
self.db.put(txn, key, &buf, flags)
.map_err(A::from_db_err)?;
Ok(pos)
}
}
impl<'a, A> DB<A>
where A: Adapter,
A::Serializer: OutputWriter,
{
pub fn put_nocopy<K: AsRef<[u8]>>(&self, txn: &mut RwTransaction, key: &K, val: &A::Value, flags: WriteFlags)
-> Result<usize, A::Error>
{
let mut serializer = A::new_serializer();
let pos = serializer.serialize_value(val)
.map_err(A::from_ser_err)?;
let mut buf = self.db.reserve(txn, &key.as_ref(), pos, flags)
.map_err(A::from_db_err)?;
serializer.write_into(&mut buf)
.map_err(A::from_ser_err)?;
Ok(pos)
}
}
pub struct Cursor<C, A> {
cursor: C,
phantom: PhantomData<A>,
}
impl<'txn, C, A> Cursor<C, A>
where C: lmdb::Cursor<'txn>,
A: Adapter,
{
// Unsafe because we don't know if the given adapter matches the given cursor
pub unsafe fn new(cursor: C) -> Self {
Self { cursor, phantom: PhantomData }
}
pub fn iter_dup_of<K: AsRef<[u8]>>(&mut self, key: &K) -> Iter<'txn, A> {
let iter = self.cursor.iter_dup_of(key);
// Safe because `new` isn't :P
unsafe { Iter::new(iter) }
}
}
pub struct Iter<'txn, A> {
iter: lmdb::Iter<'txn>,
phantom: PhantomData<A>,
}
impl<'txn, A: Adapter> Iter<'txn, A> {
pub unsafe fn new(iter: lmdb::Iter<'txn>) -> Self {
Self { iter, phantom: PhantomData }
}
}
impl<'txn, A: Adapter> Iterator for Iter<'txn, A>
where Archived<A::Value>: 'txn
{
type Item = Result<(&'txn [u8], &'txn Archived<A::Value>), A::Error>;
fn next(&mut self) -> Option<Self::Item> {
self.iter.next().map(|r| r
.map_err(A::from_db_err)
.map(|(key, buf)| { (key, unsafe { archived_root::<A::Value>(buf) }) }))
}
}

View File

@ -1,11 +1,6 @@
// FIXME: No. // FIXME: No.
#![allow(dead_code)] #![allow(dead_code)]
#![forbid(unused_imports)]
#[macro_use]
extern crate slog;
#[macro_use]
extern crate capnp_rpc;
extern crate async_trait; extern crate async_trait;

View File

@ -1,20 +1,12 @@
use async_trait::async_trait; use async_trait::async_trait;
use std::pin::Pin;
use std::task::{Poll, Context};
use futures::ready;
use futures::future::{Future, BoxFuture};
use futures::channel::oneshot; use futures::channel::oneshot;
use futures::sink::Sink;
use futures_signals::signal::Mutable; use futures_signals::signal::Mutable;
use smol::prelude::*; use smol::channel::Receiver;
use smol::future::FutureExt;
use smol::channel::{Sender, Receiver};
use crate::error::Error; use crate::error::Error;
use crate::state::{State, StateStorage}; use crate::state::{State, StateDB};
/// A resource in BFFH has to contain several different parts; /// A resource in BFFH has to contain several different parts;
/// - Currently set state /// - Currently set state
@ -60,7 +52,7 @@ pub struct Update {
pub struct ResourceDriver { pub struct ResourceDriver {
res: Box<dyn Resource>, res: Box<dyn Resource>,
db: StateStorage, db: StateDB,
rx: Receiver<Update>, rx: Receiver<Update>,
signal: Mutable<State>, signal: Mutable<State>,
@ -80,7 +72,7 @@ impl ResourceDriver {
// Not applying the new state isn't correct either since we don't know what the // Not applying the new state isn't correct either since we don't know what the
// internal logic of the resource has done to make this happen. // internal logic of the resource has done to make this happen.
// Another half right solution is to unwrap and recreate everything. // Another half right solution is to unwrap and recreate everything.
self.db.store(&state, &outstate); //self.db.store(&state, &outstate);
self.signal.set_neq(outstate); self.signal.set_neq(outstate);
}, },
Err(e) => { Err(e) => {