Better database abstraction

This commit is contained in:
Gregor Reitzenstein 2021-09-10 16:22:58 +02:00 committed by Nadja Reitzenstein
parent d0b73c9b49
commit 8ceee0bd94
3 changed files with 316 additions and 0 deletions

34
Cargo.lock generated
View File

@ -200,6 +200,15 @@ version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "bincode"
version = "2.0.0-dev"
source = "git+https://github.com/dequbed/bincode.git?branch=feature/in_place_buffer#bdea73bc3542257ef3b9788d4d54e53af236ed7a"
dependencies = [
"byteorder",
"serde",
]
[[package]]
name = "bindgen"
version = "0.55.1"
@ -510,6 +519,7 @@ version = "0.2.0"
dependencies = [
"async-channel",
"async-trait",
"bincode",
"capnp",
"capnp-futures",
"capnp-rpc",
@ -536,6 +546,7 @@ dependencies = [
"slog-async",
"slog-term",
"smol",
"tempfile",
"toml",
"uuid",
"walkdir",
@ -1536,6 +1547,15 @@ version = "0.6.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
[[package]]
name = "remove_dir_all"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7"
dependencies = [
"winapi",
]
[[package]]
name = "rsasl"
version = "1.4.0"
@ -1790,6 +1810,20 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60"
[[package]]
name = "tempfile"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22"
dependencies = [
"cfg-if 1.0.0",
"libc",
"rand",
"redox_syscall",
"remove_dir_all",
"winapi",
]
[[package]]
name = "term"
version = "0.7.0"

View File

@ -35,6 +35,7 @@ capnp-futures = "0.14.1"
serde = { version = "1.0.130", features = ["derive"] }
toml = "0.5.8"
flexbuffers = "2.0.0"
bincode = "2.0.0-dev"
serde_dhall = { version = "0.10.1", default-features = false }
@ -72,3 +73,7 @@ walkdir = "2.3.2"
[dev-dependencies]
futures-test = "0.3.16"
tempfile = "3.2"
[patch.crates-io]
bincode = { git = "https://github.com/dequbed/bincode.git", branch = "feature/in_place_buffer" }

277
src/db.rs
View File

@ -64,3 +64,280 @@ impl Databases {
})
}
}
use lmdb::{
Environment,
Database,
Transaction,
RoTransaction,
RwTransaction,
WriteFlags,
};
#[derive(Debug, Clone)]
pub struct DB {
env: Arc<Environment>,
db: Database,
}
impl DB {
pub fn new(env: Arc<Environment>, db: Database) -> Self {
Self { env, db }
}
pub fn open(env: Arc<Environment>, name: &str) -> lmdb::Result<Self> {
env.open_db(Some(name)).map(|db| { Self::new(env.clone(), db) })
}
pub fn get<'txn, T: Transaction, K>(&self, txn: &'txn T, key: &K) -> lmdb::Result<&'txn [u8]>
where K: AsRef<[u8]>
{
txn.get(self.db, key)
}
pub fn put<K, V>(&self, txn: &mut RwTransaction, key: &K, value: &V, flags: WriteFlags)
-> lmdb::Result<()>
where K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
txn.put(self.db, key, value, flags)
}
pub fn reserve<'txn, K>(&self, txn: &'txn mut RwTransaction, key: &K, size: usize, flags: WriteFlags)
-> lmdb::Result<&'txn mut [u8]>
where K: AsRef<[u8]>
{
txn.reserve(self.db, key, size, flags)
}
pub fn del<K, V>(&self, txn: &mut RwTransaction, key: &K, value: Option<&V>) -> lmdb::Result<()>
where K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
txn.del(self.db, key, value.map(AsRef::as_ref))
}
pub fn begin_ro_txn<'env>(&'env self) -> lmdb::Result<RoTransaction<'env>> {
self.env.begin_ro_txn()
}
pub fn begin_rw_txn<'env>(&'env self) -> lmdb::Result<RwTransaction<'env>> {
self.env.begin_rw_txn()
}
}
use std::result::Result as StdResult;
use serde::{Serialize, Deserialize};
use bincode::Options;
pub trait DatabaseAdapter {
type Key: ?Sized;
type Err: From<lmdb::Error> + From<bincode::Error>;
fn serialize_key(key: &Self::Key) -> &[u8];
fn deserialize_key<'de>(input: &'de [u8]) -> StdResult<&'de Self::Key, Self::Err>;
}
// Should we for some reason ever need to have different Options for different adapters we can have
// this in the DatabaseAdapter trait too
fn bincode_default() -> impl bincode::Options {
bincode::DefaultOptions::new()
.with_varint_encoding()
}
use std::marker::PhantomData;
pub struct TypedDatabase<'a, A, V: ?Sized> {
pub db: DB,
adapter: A,
marker: PhantomData<&'a V>
}
impl<A, V: ?Sized> TypedDatabase<'_, A, V> {
pub fn new(db: DB, adapter: A) -> Self {
Self {
db: db,
adapter: adapter,
marker: PhantomData,
}
}
}
impl<'txn, A, V> TypedDatabase<'txn, A, V>
where A: DatabaseAdapter,
V: ?Sized + Serialize + Deserialize<'txn>,
{
pub fn get<T: Transaction>(&self, txn: &'txn T, key: &A::Key)
-> StdResult<Option<V>, A::Err>
{
let opts = bincode_default();
self.db.get(txn, &A::serialize_key(key))
.map_or_else(
|err| match err {
lmdb::Error::NotFound => Ok(None),
e => Err(e.into()),
},
|ok| opts.deserialize(ok)
.map_err(|e| e.into())
.map(Option::Some)
)
}
/// Update `value` in-place from the database
///
/// Returns `Ok(false)` if the key wasn't found. If this functions returns an error `value`
/// will be in an indeterminate state where some parts may be updated from the db.
pub fn get_in_place<T: Transaction>(&self, txn: &'txn T, key: &A::Key, value: &mut V)
-> StdResult<bool, A::Err>
{
let opts = bincode_default();
self.db.get(txn, &A::serialize_key(key))
.map_or_else(
|err| match err {
lmdb::Error::NotFound => Ok(false),
e => Err(e.into()),
},
|ok| opts.deserialize_in_place_buffer(ok, value)
.map_err(|e| e.into())
.map(|()| true)
)
}
pub fn put(&self, txn: &'txn mut RwTransaction, key: &A::Key, value: &V, flags: lmdb::WriteFlags)
-> StdResult<(), A::Err>
{
let opts = bincode::DefaultOptions::new()
.with_varint_encoding();
// Serialized values are always at most as big as their memory representation.
// So even if usize is 32 bit this is safe given no segmenting is taking place.
let bufsize = opts.serialized_size(value)? as usize;
let buffer = self.db.reserve(txn, &A::serialize_key(key), bufsize, flags)?;
opts.serialize_into(buffer, value).map_err(|e| e.into())
}
pub fn del(&self, txn: &'txn mut RwTransaction, key: &A::Key)
-> StdResult<(), A::Err>
{
self.db.del::<&[u8], &[u8]>(txn, &A::serialize_key(key), None).map_err(|e| e.into())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::result::Result;
use std::ops::Deref;
use lmdb::{
EnvironmentFlags as EF,
DatabaseFlags as DF,
WriteFlags as WF,
};
pub struct TempEnv {
dir: tempfile::TempDir,
env: Arc<Environment>,
}
impl Deref for TempEnv {
type Target = Arc<Environment>;
fn deref(&self) -> &Self::Target {
&self.env
}
}
pub fn open_test_env() -> TempEnv {
let dir = tempfile::tempdir().expect("Failed to create tempdir for testdb");
let env = Environment::new()
.set_flags(EF::NO_SYNC | EF::WRITE_MAP)
.open(dir.path()).expect("Failed to open lmdb");
let env = Arc::new(env);
TempEnv { dir, env }
}
struct TestAdapter;
#[derive(Debug)]
enum TestErr {
Utf8(std::str::Utf8Error),
Binc(Box<bincode::ErrorKind>),
LMDB(lmdb::Error),
}
impl From<lmdb::Error> for TestErr {
fn from(e: lmdb::Error) -> TestErr {
TestErr::LMDB(e)
}
}
impl From<std::str::Utf8Error> for TestErr {
fn from(e: std::str::Utf8Error) -> TestErr {
TestErr::Utf8(e)
}
}
impl From<bincode::Error> for TestErr {
fn from(e: bincode::Error) -> TestErr {
TestErr::Binc(e)
}
}
impl DatabaseAdapter for TestAdapter {
type Key = str;
type Err = TestErr;
fn serialize_key(key: &Self::Key) -> &[u8] {
key.as_bytes()
}
fn deserialize_key<'de>(input: &'de [u8]) -> Result<&'de Self::Key, Self::Err> {
std::str::from_utf8(input).map_err(|e| e.into())
}
}
type TestDB<'txn> = TypedDatabase<'txn, TestAdapter, &'txn str>;
#[test]
fn simple_get() {
let e = open_test_env();
let ldb = e.create_db(None, DF::empty()).expect("Failed to create lmdb db");
let db = DB::new(e.env.clone(), ldb);
let adapter = TestAdapter;
let testdb = TestDB::new(db.clone(), adapter);
let mut val = "value";
let mut txn = db.begin_rw_txn().expect("Failed to being rw txn");
testdb.put(&mut txn, "key", &val, WF::empty()).expect("Failed to insert");
txn.commit().expect("commit failed");
{
let txn;
txn = db.begin_ro_txn().unwrap();
let val = testdb.get(&txn, "key").expect("Failed to retrieve");
assert_eq!(Some("value"), val);
}
{
let val2 = "longer_value";
let mut txn = db.begin_rw_txn().unwrap();
testdb.put(&mut txn, "key", &val2, WF::empty()).expect("Failed to update");
txn.commit().unwrap();
}
{
let txn = db.begin_ro_txn().unwrap();
let found = testdb.get_in_place(&txn, "key", &mut val).expect("Failed to retrieve update");
assert!(found);
assert_eq!("longer_value", val);
}
}
}