Remove DB code that assumes alignment

This commit is contained in:
Nadja Reitzenstein 2022-03-15 21:53:21 +01:00
parent a145efc948
commit 1156174d7a
8 changed files with 1 additions and 772 deletions

View File

@ -1,58 +0,0 @@
use std::{
ptr::NonNull,
ops::Deref,
};
use crate::db::Transaction;
use std::fmt::{Debug, Formatter};
/// Memory Fixpoint for a value in the DB
///
/// LMDB binds lifetimes of buffers to the transaction that returned the buffer. As long as this
/// transaction is not `commit()`ed, `abort()`ed or `reset()`ed the pages containing these values
/// are not returned into circulation.
/// This struct encodes this by binding a live reference to the Transaction to the returned
/// and interpreted buffer. The placeholder `T` is the container for the transaction. This may be a
/// plain `RoTransaction<'env>`, a `Rc<RoTxn>` (meaning Fix is !Send) or an `Arc<RoTxn>`, depending
/// on your needs.
pub struct LMDBorrow<T, V> {
ptr: NonNull<V>,
txn: T,
}
impl<'env, T, V> LMDBorrow<T, V>
where T: Transaction,
{
pub unsafe fn new(ptr: NonNull<V>, txn: T) -> Self {
Self { ptr: ptr.into(), txn }
}
pub fn unwrap_txn(self) -> T {
self.txn
}
}
impl<'env, T, V> Deref for LMDBorrow<T, V>
{
type Target = V;
fn deref(&self) -> &Self::Target {
// As long as the transaction is kept alive (which it is, because it's in self) state is a
// valid pointer so this is safe.
unsafe { self.ptr.as_ref() }
}
}
impl<'env, T, V: Debug> Debug for LMDBorrow<T, V> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.deref())
}
}
impl<'env, T, V: serde::Serialize> serde::Serialize for LMDBorrow<T, V> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: serde::Serializer
{
self.deref().serialize(serializer)
}
}

View File

@ -1,183 +0,0 @@
use std::{
marker::PhantomData,
hash::{
Hash,
Hasher,
BuildHasher,
},
collections::hash_map::RandomState,
fmt,
fmt::Debug,
};
use std::fmt::Formatter;
use rkyv::{
Archive,
Archived,
Serialize,
Deserialize,
Fallible,
};
use super::{
DB,
Adapter,
OutputBuffer,
Environment,
DatabaseFlags,
WriteFlags,
Transaction,
RwTransaction,
};
#[derive(Archive, Serialize, Deserialize, Debug)]
/// The entry as it is stored inside the database.
pub struct Entry<K: Archive, V: Archive> {
pub key: K,
pub val: V,
}
#[derive(Clone, Copy)]
pub struct HashAdapter<K, A> {
k: PhantomData<K>,
a: PhantomData<A>,
}
impl<K, A> HashAdapter<K, A> {
pub fn new() -> Self {
Self { k: PhantomData, a: PhantomData }
}
}
impl<K, A> Debug for HashAdapter<K, A> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
use core::any::type_name;
write!(f, "HashAdapter<{}, {}>", type_name::<K>(), type_name::<A>())
}
}
impl<K, A: Fallible> Fallible for HashAdapter<K, A> { type Error = <A as Fallible>::Error; }
impl<K, A: Adapter> Adapter for HashAdapter<K, A>
where K: Archive,
Entry<K, A::Value>: Serialize<A::Serializer>,
{
type Serializer = A::Serializer;
type Value = Entry<K, A::Value>;
fn new_serializer() -> Self::Serializer
{ A::new_serializer() }
fn from_ser_err(e: <Self::Serializer as Fallible>::Error) -> <A as Fallible>::Error
{ A::from_ser_err(e) }
fn from_db_err(e: lmdb::Error) -> <A as Fallible>::Error
{ A::from_db_err(e) }
}
const DEFAULT_HASH_FLAGS: libc::c_uint =
DatabaseFlags::INTEGER_KEY.bits() + DatabaseFlags::DUP_SORT.bits();
pub struct HashDB<A, K, H = RandomState>
{
db: DB<HashAdapter<K, A>>,
hash_builder: H,
}
impl<A: Adapter, K, H: Debug> fmt::Debug for HashDB<A, K, H> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let adapter = HashAdapter::<A,K>::new();
f.debug_struct("HashDB")
.field("db", &adapter)
.field("hasher", &self.hash_builder)
.finish()
}
}
impl<A, K> HashDB<A, K>
{
pub unsafe fn create(env: &Environment, name: Option<&str>) -> lmdb::Result<Self> {
Self::create_with_hasher(env, name, RandomState::new())
}
pub unsafe fn open(env: &Environment, name: Option<&str>) -> lmdb::Result<Self> {
Self::open_with_hasher(env, name, RandomState::new())
}
}
impl<A, K, H: BuildHasher> HashDB<A, K, H>
{
fn new(db: DB<HashAdapter<K, A>>, hash_builder: H) -> Self {
Self { db, hash_builder }
}
pub unsafe fn create_with_hasher(env: &Environment, name: Option<&str>, hash_builder: H)
-> lmdb::Result<Self>
{
let flags = DatabaseFlags::from_bits(DEFAULT_HASH_FLAGS).unwrap();
DB::create(env, name, flags).map(|db| Self::new(db, hash_builder))
}
pub unsafe fn open_with_hasher(env: &Environment, name: Option<&str>, hash_builder: H)
-> lmdb::Result<Self>
{
DB::open(env, name).map(|db| Self::new(db, hash_builder))
}
}
impl<A, K, H> HashDB<A, K, H>
where A: Adapter,
HashAdapter<K, A>: Adapter<Value=Entry<K, A::Value>>,
H: BuildHasher,
K: Hash + Archive,
K::Archived: PartialEq<K>,
{
/// Retrieve an entry from the hashdb
///
/// The result is a view pinned to the lifetime of the transaction. You can get owned Values
/// using [`Deserialize`].
pub fn get<'txn, T: Transaction>(&self, txn: &'txn T, key: &K)
-> Result<
Option<&'txn Archived<<HashAdapter<K, A> as Adapter>::Value>>,
<HashAdapter<K, A> as Fallible>::Error
>
{
let mut hasher = self.hash_builder.build_hasher();
key.hash(&mut hasher);
let hash = hasher.finish();
let mut cursor = self.db.open_ro_cursor(txn)?;
let i = cursor
.iter_dup_of(&hash.to_ne_bytes()).filter_map(|r| r.ok())
.map(|(_keybuf, entry)| entry);
for entry in i {
let entry: &Archived<Entry<K, A::Value>> = entry;
if entry.key == *key {
return Ok(Some(entry));
}
}
Ok(None)
}
}
impl<'a, A, K, H> HashDB<A, K, H>
where A: Adapter,
A::Serializer: OutputBuffer,
H: BuildHasher,
K: Hash + Serialize<A::Serializer>,
K::Archived: PartialEq<K>,
{
pub fn insert_entry(&self, txn: &mut RwTransaction, entry: &Entry<K, A::Value>)
-> Result<(), A::Error>
{
let mut hasher = self.hash_builder.build_hasher();
entry.key.hash(&mut hasher);
let hash = hasher.finish();
self.db.put(txn, &hash.to_ne_bytes(), entry, WriteFlags::empty())?;
Ok(())
}
}

View File

@ -1,193 +0,0 @@
use std::{fs};
use std::collections::HashMap;
use std::fmt::{Debug, Display, Formatter};
use std::io::Write;
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{RwLock};
use anyhow::Context;
use rkyv::{Archive, Serialize, Deserialize, AlignedVec, Archived, with::Lock, Fallible};
use rkyv::de::deserializers::SharedDeserializeMap;
use rkyv::ser::Serializer;
use rkyv::ser::serializers::{AlignedSerializer, AllocScratch, AllocScratchError, AllocSerializer, CompositeSerializer, CompositeSerializerError, FallbackScratch, HeapScratch, ScratchTracker, SharedSerializeMap, SharedSerializeMapError};
use rkyv::with::LockError;
pub trait Index {
type Key: ?Sized;
fn lookup(&self, key: &Self::Key) -> Option<u64>;
fn update(&mut self, key: &Self::Key, value: u64);
}
pub struct StringIndex {
inner: HashMap<String, u64>,
}
impl Index for StringIndex {
type Key = str;
fn lookup(&self, key: &Self::Key) -> Option<u64> {
self.inner.get(key).map(|v| *v)
}
fn update(&mut self, key: &Self::Key, new: u64) {
let old = self.inner.insert(key.to_string(), new);
tracing::trace!(key, ?old, new, "updated string index");
}
}
#[derive(Debug, Archive, Serialize, Deserialize)]
pub struct DbIndexManager<I> {
name: String,
// TODO: use locking? Write are serialized anyway
generation: AtomicU64,
next_id: AtomicU64,
#[with(Lock)]
indices: RwLock<I>,
}
type S = CompositeSerializer<AlignedSerializer<AlignedVec>,
ScratchTracker<FallbackScratch<HeapScratch<1024>, AllocScratch>>, SharedSerializeMap>;
type SE = CompositeSerializerError<std::convert::Infallible, AllocScratchError, SharedSerializeMapError>;
#[derive(Debug)]
pub struct Ser (pub(super) S);
impl Default for Ser {
fn default() -> Self {
Self(CompositeSerializer::new(AlignedSerializer::default(), ScratchTracker::new(FallbackScratch::default()), SharedSerializeMap::default()))
}
}
#[derive(Debug)]
pub enum SerError {
Composite(SE),
Lock(LockError),
}
impl Display for SerError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Composite(e) => Display::fmt(e, f),
Self::Lock(e) => Display::fmt(e, f),
}
}
}
impl std::error::Error for SerError {}
impl From<SE> for SerError {
fn from(e: SE) -> Self {
Self::Composite(e)
}
}
impl From<LockError> for SerError {
fn from(e: LockError) -> Self {
Self::Lock(e)
}
}
impl Fallible for Ser { type Error = SerError; }
impl Serializer for Ser {
fn pos(&self) -> usize {
self.0.pos()
}
fn write(&mut self, bytes: &[u8]) -> Result<(), Self::Error> {
self.0.write(bytes).map_err(|e| e.into())
}
}
type Ser2 = AllocSerializer<4096>;
impl<I> DbIndexManager<I> {
pub fn new(name: String, generation: u64, next_id: u64, indices: I) -> Self {
tracing::debug!(%name, generation, next_id, "constructing db index");
Self {
name,
generation: AtomicU64::new(generation),
next_id: AtomicU64::new(next_id),
indices: RwLock::new(indices),
}
}
}
impl<I> DbIndexManager<I>
where I: 'static + Archive + Serialize<Ser>,
<I as Archive>::Archived: Deserialize<I, SharedDeserializeMap>,
{
pub fn store(&self, path: impl AsRef<Path>) -> anyhow::Result<usize> {
let path = path.as_ref();
let span = tracing::debug_span!("store",
name=%self.name, path=%path.display(),
"storing database index"
);
let _guard = span.enter();
tracing::trace!("opening db index file");
let mut fd = fs::File::create(path)
.with_context(|| format!("failed to open database index file {}", path.display()))?;
tracing::trace!(?fd, "opened db index file");
let mut serializer = Ser::default();
tracing::trace!(?serializer, "serializing db index");
let root = serializer.serialize_value(self).context("serializing database index failed")?;
let (s, c, _h) = serializer.0.into_components();
let v = s.into_inner();
tracing::trace!(%root,
len = v.len(),
max_bytes_allocated = c.max_bytes_allocated(),
max_allocations = c.max_allocations(),
max_alignment = c.max_alignment(),
min_buffer_size = c.min_buffer_size(),
min_buffer_size_max_error = c.min_buffer_size_max_error(),
"serialized db index");
let () = fd.write_all(v.as_slice())
.with_context(|| format!("failed to write {} bytes to database index file at {}", v.len(), path.display()))?;
Ok(v.len())
}
pub fn load<'a>(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let path = path.as_ref();
let span = tracing::debug_span!("load",
path=%path.display(),
"loading database index"
);
let _guard = span.enter();
tracing::trace!("reading db index file");
let data = fs::read(path).with_context(|| format!("failed to read database index file at {}", path.display()))?;
tracing::trace!(len=data.len(), "read db index file");
let res = unsafe {
let maybe_this: &Archived<Self> = rkyv::archived_root::<Self>(&data[..]);
// TODO: validate `maybe_this`
maybe_this
};
tracing::trace!("loaded db index from file");
let mut deser = SharedDeserializeMap::default();
let this: Self = Deserialize::<Self, _>::deserialize(res, &mut deser)?;
tracing::trace!(generation=this.generation.load(Ordering::Relaxed),
"deserialized db index from file");
Ok(this)
}
/// Return a new unused ID using an atomic fetch-add
pub fn get_next_id(&self) -> u64 {
self.next_id.fetch_add(1, Ordering::Release)
}
}
impl<I: Index> DbIndexManager<I> {
pub fn lookup(&self, key: &I::Key) -> Option<u64> {
self.indices.read().unwrap().lookup(key)
}
pub fn update(&self, key: &I::Key, value: u64) {
self.indices.write().unwrap().update(key, value)
}
}

View File

@ -17,26 +17,6 @@ use rkyv::{Fallible, Serialize, ser::serializers::AllocSerializer, AlignedVec};
mod raw;
pub use raw::RawDB;
mod typed;
// re-exports
pub use typed::{
DB,
TypedCursor,
Adapter,
OutputBuffer,
};
mod hash;
pub use hash::{
HashDB,
};
mod fix;
pub mod index;
pub use fix::LMDBorrow;
use lmdb::Error;
use rkyv::Deserialize;
use rkyv::ser::serializers::AlignedSerializer;
@ -54,79 +34,6 @@ use crate::resources::search::ResourcesHandle;
use crate::Users;
#[derive(Debug)]
pub enum DBError {
LMDB(lmdb::Error),
RKYV(<AllocSerializer<1024> as Fallible>::Error),
}
impl Display for DBError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::LMDB(e) => write!(f, "LMDB error: {}", e),
Self::RKYV(e) => write!(f, "rkyv error: {}", e),
}
}
}
impl std::error::Error for DBError { }
pub(crate) type Result<T> = std::result::Result<T, DBError>;
impl From<lmdb::Error> for DBError {
fn from(e: lmdb::Error) -> Self {
Self::LMDB(e)
}
}
type Ser = AllocSerializer<1024>;
#[derive(Clone)]
pub struct AllocAdapter<V> {
phantom: PhantomData<V>,
}
impl<V> Fallible for AllocAdapter<V> {
type Error = DBError;
}
impl<V: Serialize<Ser>> Adapter for AllocAdapter<V> {
type Serializer = Ser;
type Value = V;
fn new_serializer() -> Self::Serializer {
Self::Serializer::default()
}
fn from_ser_err(e: <Self::Serializer as Fallible>::Error) -> Self::Error {
DBError::RKYV(e)
}
fn from_db_err(e: lmdb::Error) -> Self::Error {
e.into()
}
}
#[derive(Copy, Clone, Debug)]
pub struct AlignedAdapter<V> {
phantom: PhantomData<V>,
}
impl<V> Fallible for AlignedAdapter<V> {
type Error = lmdb::Error;
}
impl<V: Serialize<AlignedSerializer<AlignedVec>>> Adapter for AlignedAdapter<V> {
type Serializer = AlignedSerializer<AlignedVec>;
type Value = V;
fn new_serializer() -> Self::Serializer {
Self::Serializer::default()
}
fn from_ser_err(_: <Self::Serializer as Fallible>::Error) -> <Self as Fallible>::Error {
unreachable!()
}
fn from_db_err(e: Error) -> <Self as Fallible>::Error {
e
}
}
#[derive(Debug, serde::Serialize)]
pub struct Dump {
users: HashMap<String, User>,

View File

@ -1,242 +0,0 @@
use std::{
fmt,
any::type_name,
marker::PhantomData,
};
use rkyv::{
Archived,
archived_root,
Serialize,
ser::{
Serializer,
serializers::AllocSerializer,
},
util::AlignedVec,
Fallible,
};
use lmdb::{
Environment,
DatabaseFlags,
WriteFlags,
Transaction,
RwTransaction,
};
use super::RawDB;
/// Database Adapter to create a typed DB returning Rust types
pub trait Adapter: Fallible {
/// The serializer that will be instantiated to resolve the stored types
type Serializer: rkyv::ser::Serializer;
/// Actual Value that will be extracted
type Value: Serialize<Self::Serializer>;
/// Create a new serializer
fn new_serializer() -> Self::Serializer;
/// Convert any Serializer Error in your shared error.
///
/// You *must* implement this if you don't use `Infallible` as Supertrait.
fn from_ser_err(e: <Self::Serializer as Fallible>::Error) -> <Self as Fallible>::Error;
/// Convert the Database Error type into your shared error.
// TODO: Extract both conversion into their own trait because there's a sensible impl for
// `Infallible` for both.
fn from_db_err(e: lmdb::Error) -> <Self as Fallible>::Error;
}
struct AdapterPrettyPrinter<A: Adapter>(PhantomData<A>);
impl<A: Adapter> AdapterPrettyPrinter<A> {
pub fn new() -> Self { Self(PhantomData) }
}
impl<A: Adapter> fmt::Debug for AdapterPrettyPrinter<A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(&type_name::<A>())
.field("serializer", &type_name::<A::Serializer>())
.field("value", &type_name::<A::Value>())
.finish()
}
}
/// Deserialize adapter to write into an Buffer
pub trait OutputBuffer {
/// The kind of buffer
type Buffer: AsRef<[u8]>;
/// convert yourself into this buffer
fn into_slice(self) -> Self::Buffer;
}
impl<const N: usize> OutputBuffer for AllocSerializer<N> {
type Buffer = AlignedVec;
fn into_slice(self) -> Self::Buffer {
self.into_serializer().into_inner()
}
}
pub struct DB<A> {
db: RawDB,
phantom: PhantomData<A>,
}
impl<A> Clone for DB<A> {
fn clone(&self) -> Self {
Self {
db: self.db.clone(),
phantom: PhantomData,
}
}
}
impl<A: Adapter> fmt::Debug for DB<A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DB")
.field("db", &self.db)
.field("adapter", &AdapterPrettyPrinter::<A>::new())
.finish()
}
}
impl<A> DB<A> {
pub unsafe fn new_unchecked(db: RawDB) -> Self {
Self { db, phantom: PhantomData }
}
fn new(db: RawDB) -> Self {
unsafe { Self::new_unchecked(db) }
}
/// Open the underlying DB, creating it if necessary
///
/// This function is unsafe since if the DB does not contain `A::Archived` we may end up doing
/// random memory reads or writes
pub unsafe fn create(env: &Environment, name: Option<&str>, flags: DatabaseFlags)
-> lmdb::Result<Self>
{
RawDB::create(env, name, flags).map(Self::new)
}
/// Open the underlying DB
///
/// This function is unsafe since if the DB does not contain `A::Archived` we may end up doing
/// random memory reads or writes
pub unsafe fn open(env: &Environment, name: Option<&str>) -> lmdb::Result<Self> {
RawDB::open(env, name).map(Self::new)
}
}
impl<A: Adapter> DB<A>
{
pub fn del<K: AsRef<[u8]>>(&self, txn: &mut RwTransaction, key: &K) -> Result<(), A::Error> {
let v: Option<&Vec<u8>> = None;
self.db.del(txn, key, v).map_err(A::from_db_err)
}
}
impl<A: Adapter> DB<A>
{
pub fn get<'txn, T: Transaction, K: AsRef<[u8]>>(&self, txn: &'txn T, key: &K)
-> Result<Option<&'txn Archived<A::Value>>, A::Error>
{
if let Some(buf) = self.db.get(txn, key).map_err(A::from_db_err)? {
tracing::trace!(?buf, ptr=?buf.as_ptr(), "db read");
Ok(Some(unsafe { archived_root::<A::Value>(buf.as_ref()) }))
} else {
Ok(None)
}
}
pub fn open_ro_cursor<'txn, T: Transaction>(&self, txn: &'txn T)
-> Result<TypedCursor<lmdb::RoCursor<'txn>, A>, A::Error>
{
let c = self.db.open_ro_cursor(txn)
.map_err(A::from_db_err)?;
// Safe because we are providing both Adapter and cursor and know it matches
Ok(unsafe { TypedCursor::new(c) })
}
}
impl<'a, A> DB<A>
where A: Adapter,
A::Serializer: OutputBuffer,
{
pub fn put<K: AsRef<[u8]>>(&self, txn: &mut RwTransaction, key: &K, val: &A::Value, flags: WriteFlags)
-> Result<usize, A::Error>
{
let mut serializer = A::new_serializer();
let pos = serializer.serialize_value(val)
.map_err(A::from_ser_err)?;
let buf = serializer.into_slice();
let buf = buf.as_ref();
println!("{:?}", buf);
tracing::trace!(len=buf.len(), pos, "writing value into db");
let mut stor = self.db.reserve(txn, key, buf.len(), flags)
.map_err(A::from_db_err)?;
tracing::trace!(store=?stor.as_ptr(), "store");
stor.copy_from_slice(&buf[..]);
Ok(pos)
}
}
#[derive(Debug)]
pub struct TypedCursor<C, A> {
cursor: C,
phantom: PhantomData<A>,
}
impl<'txn, C, A> TypedCursor<C, A>
where C: lmdb::Cursor<'txn>,
A: Adapter,
{
// Unsafe because we don't know if the given adapter matches the given cursor
pub unsafe fn new(cursor: C) -> Self {
Self { cursor, phantom: PhantomData }
}
pub fn iter_start(&mut self) -> Iter<'txn, A> {
let iter = self.cursor.iter_start();
// Safe because `new` isn't :P
unsafe { Iter::new(iter) }
}
pub fn iter_dup_of<K: AsRef<[u8]>>(&mut self, key: &K) -> Iter<'txn, A> {
let iter = self.cursor.iter_dup_of(key);
// Safe because `new` isn't :P
unsafe { Iter::new(iter) }
}
}
#[derive(Debug)]
pub struct Iter<'txn, A> {
iter: lmdb::Iter<'txn>,
phantom: PhantomData<A>,
}
impl<'txn, A: Adapter> Iter<'txn, A> {
pub unsafe fn new(iter: lmdb::Iter<'txn>) -> Self {
Self { iter, phantom: PhantomData }
}
}
impl<'txn, A: Adapter> Iterator for Iter<'txn, A>
where Archived<A::Value>: 'txn
{
type Item = Result<(&'txn [u8], &'txn Archived<A::Value>), A::Error>;
fn next(&mut self) -> Option<Self::Item> {
self.iter.next().map(|r| r
.map_err(A::from_db_err)
.map(|(key, buf)| { (key, unsafe { archived_root::<A::Value>(buf) }) }))
}
}

View File

@ -5,7 +5,6 @@ use lmdb::RoTransaction;
use rkyv::Archived;
use crate::authorization::permissions::PrivilegesBuf;
use crate::config::MachineDescription;
use crate::db::LMDBorrow;
use crate::resources::modules::fabaccess::{MachineState, Status};
use crate::resources::state::db::StateDB;
use crate::resources::state::State;

View File

@ -21,7 +21,6 @@ use crate::db::{
RoTransaction,
RwTransaction,
LMDBorrow,
};
use crate::resources::state::State;

View File

@ -1,5 +1,5 @@
use crate::db::{AllocAdapter, Environment, RawDB, Result, DB};
use crate::db::{DatabaseFlags, LMDBorrow, RoTransaction, WriteFlags};
use crate::db::{DatabaseFlags, RoTransaction, WriteFlags};
use lmdb::{Transaction};
use std::collections::{HashMap};