Splits off MachineDB

This commit is contained in:
Gregor Reitzenstein 2020-11-10 13:34:09 +01:00
parent fe89175113
commit 62e1e9276f
8 changed files with 165 additions and 139 deletions

View File

@ -42,8 +42,8 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
clap = "2.33" clap = "2.33"
# TODO update this if bindgen breaks (again) # TODO update this if bindgen breaks (again)
#rsasl = "0.2.3" rsasl = "0.2.3"
rsasl = { path = "../../rsasl" } #rsasl = { path = "../../rsasl" }
# rumqtt needs tokio which I'm trying to get away from # rumqtt needs tokio which I'm trying to get away from
paho-mqtt = { git = "https://github.com/dequbed/paho.mqtt.rust.git", branch = "master", features = ["build_bindgen"] } paho-mqtt = { git = "https://github.com/dequbed/paho.mqtt.rust.git", branch = "master", features = ["build_bindgen"] }

View File

@ -55,14 +55,23 @@ impl connection_capnp::bootstrap::Server for Connection {
} }
async fn handshake(log: &Logger, stream: &mut TcpStream) -> Result<()> { async fn handshake(log: &Logger, stream: &mut TcpStream) -> Result<()> {
if let Some(m) = capnp_futures::serialize::read_message(stream, Default::default()).await? { if let Some(m) = capnp_futures::serialize::read_message(stream.clone(), Default::default()).await? {
let greeting = m.get_root::<connection_capnp::greeting::Reader>()?; let greeting = m.get_root::<connection_capnp::greeting::Reader>()?;
let major = greeting.get_major(); let major = greeting.get_major();
let minor = greeting.get_minor(); let minor = greeting.get_minor();
if major != 1 { if major != 0 {
Err(Error::BadVersion((major, minor))) Err(Error::BadVersion((major, minor)))
} else { } else {
let program = format!("{}-{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"));
let mut answer = ::capnp::message::Builder::new_default();
let mut b = answer.init_root::<connection_capnp::greeting::Builder>();
b.set_program(&program);
b.set_host("localhost");
b.set_major(0);
b.set_minor(1);
capnp_futures::serialize::write_message(stream, answer).await?;
info!(log, "Handshake successful with peer {} running {}, API {}.{}", info!(log, "Handshake successful with peer {} running {}, API {}.{}",
greeting.get_host()?, greeting.get_program()?, major, minor); greeting.get_host()?, greeting.get_program()?, major, minor);
Ok(()) Ok(())
@ -73,7 +82,7 @@ async fn handshake(log: &Logger, stream: &mut TcpStream) -> Result<()> {
} }
pub async fn handle_connection(log: Logger, mut stream: TcpStream) -> Result<()> { pub async fn handle_connection(log: Logger, mut stream: TcpStream) -> Result<()> {
handshake(&log, &mut stream).await?; //handshake(&log, &mut stream).await?;
let mut conn = Connection::new(log); let mut conn = Connection::new(log);
let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(conn); let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(conn);

View File

@ -21,11 +21,11 @@ use lmdb::{Environment, Transaction, RwTransaction, Cursor};
use crate::config::Settings; use crate::config::Settings;
use crate::error::Result; use crate::error::Result;
mod adapter_lmdb; mod internal;
use crate::db::user::User; use crate::db::user::User;
use adapter_lmdb::PermissionsDB; use internal::PermissionsDB;
pub use adapter_lmdb::init; pub use internal::init;
pub trait RoleDB { pub trait RoleDB {
fn get_role(&self, roleID: &RoleIdentifier) -> Result<Option<Role>>; fn get_role(&self, roleID: &RoleIdentifier) -> Result<Option<Role>>;
@ -40,8 +40,8 @@ pub trait RoleDB {
/// Check if a given permission is granted by any of the given roles or their respective /// Check if a given permission is granted by any of the given roles or their respective
/// parents /// parents
/// ///
/// Default implementation which adapter may overwrite with more efficient specialized /// A Default implementation exists which adapter may overwrite with more efficient specialized
/// implementations. /// implementations.
fn check_roles(&self, roles: &[RoleIdentifier], permID: &PermIdentifier) -> Result<bool> { fn check_roles(&self, roles: &[RoleIdentifier], permID: &PermIdentifier) -> Result<bool> {
// Tally all roles. Makes dependent roles easier // Tally all roles. Makes dependent roles easier
@ -64,7 +64,8 @@ pub trait RoleDB {
/// Tally a role dependency tree into a set /// Tally a role dependency tree into a set
/// ///
/// Default implementation which adapter may overwrite with more efficient implementations /// A Default implementation exists which adapter may overwrite with more efficient
/// implementations.
fn tally_role(&self, roles: &mut HashSet<Role>, roleID: &RoleIdentifier) -> Result<()> { fn tally_role(&self, roles: &mut HashSet<Role>, roleID: &RoleIdentifier) -> Result<()> {
if let Some(role) = self.get_role(roleID)? { if let Some(role) = self.get_role(roleID)? {
// Only check and tally parents of a role at the role itself if it's the first time we // Only check and tally parents of a role at the role itself if it's the first time we

View File

@ -31,7 +31,10 @@ use futures_signals::signal::*;
use crate::registries::StatusSignal; use crate::registries::StatusSignal;
use crate::db::user::User; use crate::db::user::User;
pub type ID = Uuid; mod internal;
use internal::Internal;
pub type MachineIdentifier = Uuid;
/// Status of a Machine /// Status of a Machine
#[derive(Clone, Copy, PartialEq, Eq, Debug, Serialize, Deserialize)] #[derive(Clone, Copy, PartialEq, Eq, Debug, Serialize, Deserialize)]
@ -45,23 +48,13 @@ pub enum Status {
Blocked, Blocked,
} }
#[derive(Clone)]
pub struct Machines {
inner: Arc<RwLock<MachinesProvider>>,
}
impl Machines {
pub fn new(inner: Arc<RwLock<MachinesProvider>>) -> Self {
Self { inner }
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct GiveBack { pub struct GiveBack {
mdb: Arc<RwLock<MachinesProvider>>, mdb: Arc<Box<dyn MachineDB>>,
uuid: Uuid, uuid: Uuid,
} }
impl GiveBack { impl GiveBack {
pub fn new(mdb: Arc<RwLock<MachinesProvider>>, uuid: Uuid) -> Self { pub fn new(mdb: Arc<Box<dyn MachineDB>>, uuid: Uuid) -> Self {
Self { mdb, uuid } Self { mdb, uuid }
} }
} }
@ -80,18 +73,6 @@ fn api_from_uuid(uuid: Uuid, mut wr: crate::api::api_capnp::u_u_i_d::Builder) {
wr.set_uuid1(uuid1); wr.set_uuid1(uuid1);
} }
#[derive(Clone)]
pub struct MachineManager {
mdb: Arc<RwLock<MachinesProvider>>,
uuid: Uuid,
}
impl MachineManager {
pub fn new(uuid: Uuid, mdb: Arc<RwLock<MachinesProvider>>) -> Self {
Self { mdb, uuid }
}
}
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
/// Internal machine representation /// Internal machine representation
/// ///
@ -102,7 +83,7 @@ pub struct Machine {
/// Computer-readable identifier for this machine /// Computer-readable identifier for this machine
// Implicit in database since it's the key. // Implicit in database since it's the key.
#[serde(skip)] #[serde(skip)]
id: ID, id: MachineIdentifier,
/// The human-readable name of the machine. Does not need to be unique /// The human-readable name of the machine. Does not need to be unique
name: String, name: String,
@ -132,10 +113,10 @@ impl Machine {
/// A signal is a lossy stream of state changes. Lossy in that if changes happen in quick /// A signal is a lossy stream of state changes. Lossy in that if changes happen in quick
/// succession intermediary values may be lost. But this isn't really relevant in this case /// succession intermediary values may be lost. But this isn't really relevant in this case
/// since the only relevant state is the latest one. /// since the only relevant state is the latest one.
/// dedupe ensures that if state is changed but only changes to the value it had beforehand
/// (could for example happen if the machine changes current user but stays activated) no
/// update is sent.
pub fn signal(&self) -> StatusSignal { pub fn signal(&self) -> StatusSignal {
// dedupe ensures that if state is changed but only changes to the value it had beforehand
// (could for example happen if the machine changes current user but stays activated) no
// update is sent.
Box::pin(self.state.signal().dedupe()) Box::pin(self.state.signal().dedupe())
} }
@ -161,108 +142,17 @@ impl Machine {
} }
} }
pub struct MachinesProvider { // TODO split up for non-writable Definition Databases
log: Logger, pub trait MachineDB {
db: lmdb::Database, fn get_machine(&self, machID: &MachineIdentifier) -> Result<Option<Machine>>;
fn put_machine(&self, machID: &MachineIdentifier, machine: Machine) -> Result<()>;
} }
impl MachinesProvider { pub fn init(log: Logger, config: &Settings, env: Arc<lmdb::Environment>) -> Result<Internal> {
pub fn new(log: Logger, db: lmdb::Database) -> Self {
Self { log, db }
}
pub fn get_machine<T: Transaction>(&self, txn: &T, uuid: Uuid)
-> Result<Option<Machine>>
{
match txn.get(self.db, &uuid.as_bytes()) {
Ok(bytes) => {
let mut machine: Machine = flexbuffers::from_slice(bytes)?;
machine.id = uuid;
Ok(Some(machine))
},
Err(lmdb::Error::NotFound) => { Ok(None) },
Err(e) => { Err(e.into()) },
}
}
pub fn put_machine( &self, txn: &mut RwTransaction, uuid: Uuid, machine: Machine)
-> Result<()>
{
let bytes = flexbuffers::to_vec(machine)?;
txn.put(self.db, &uuid.as_bytes(), &bytes, lmdb::WriteFlags::empty())?;
Ok(())
}
pub fn load_db(&mut self, txn: &mut RwTransaction, mut path: PathBuf) -> Result<()> {
path.push("machines");
for entry in std::fs::read_dir(path)? {
let entry = entry?;
let path = entry.path();
if path.is_file() {
// will only ever be none if the path has no file name and then how is it a file?!
let machID_str = path
.file_stem().expect("Found a file with no filename?")
.to_str().expect("Found an OsStr that isn't valid Unicode. Fix your OS!");
let machID = match uuid::Uuid::from_str(machID_str) {
Ok(i) => i,
Err(e) => {
warn!(self.log, "File {} had a invalid name. Expected an u64 in [0-9a-z] hex with optional file ending: {}. Skipping!", path.display(), e);
continue;
}
};
let s = match fs::read_to_string(path.as_path()) {
Ok(s) => s,
Err(e) => {
warn!(self.log, "Failed to open file {}: {}, skipping!"
, path.display()
, e);
continue;
}
};
let mach: Machine = match toml::from_str(&s) {
Ok(r) => r,
Err(e) => {
warn!(self.log, "Failed to parse mach at path {}: {}, skipping!"
, path.display()
, e);
continue;
}
};
self.put_machine(txn, machID, mach)?;
debug!(self.log, "Loaded machine {}", machID);
} else {
warn!(self.log, "Path {} is not a file, skipping!", path.display());
}
}
Ok(())
}
pub fn dump_db<T: Transaction>(&self, txn: &T, mut path: PathBuf) -> Result<()> {
path.push("machines");
let mut mach_cursor = txn.open_ro_cursor(self.db)?;
for buf in mach_cursor.iter_start() {
let (kbuf, vbuf) = buf?;
let machID = uuid::Uuid::from_slice(kbuf).unwrap();
let mach: Machine = flexbuffers::from_slice(vbuf)?;
let filename = format!("{}.yml", machID.to_hyphenated().to_string());
path.set_file_name(filename);
let mut fp = std::fs::File::create(&path)?;
let out = toml::to_vec(&mach)?;
fp.write_all(&out)?;
}
Ok(())
}
}
pub fn init(log: Logger, config: &Settings, env: Arc<lmdb::Environment>) -> Result<MachinesProvider> {
let mut flags = lmdb::DatabaseFlags::empty(); let mut flags = lmdb::DatabaseFlags::empty();
flags.set(lmdb::DatabaseFlags::INTEGER_KEY, true); flags.set(lmdb::DatabaseFlags::INTEGER_KEY, true);
let machdb = env.create_db(Some("machines"), flags)?; let machdb = env.create_db(Some("machines"), flags)?;
debug!(&log, "Opened machine db successfully."); debug!(&log, "Opened machine db successfully.");
Ok(MachinesProvider::new(log, machdb)) Ok(Internal::new(log, env, machdb))
} }

127
src/db/machine/internal.rs Normal file
View File

@ -0,0 +1,127 @@
use std::sync::Arc;
use std::fs;
use std::io::Write;
use std::str::FromStr;
use std::path::PathBuf;
use slog::Logger;
use uuid::Uuid;
use lmdb::{Environment, Transaction, RwTransaction, Cursor};
use super::{MachineIdentifier, Machine, MachineDB};
use crate::error::Result;
#[derive(Clone, Debug)]
pub struct Internal {
log: Logger,
env: Arc<Environment>,
db: lmdb::Database,
}
impl Internal {
pub fn new(log: Logger, env: Arc<Environment>, db: lmdb::Database) -> Self {
Self { log, env, db }
}
pub fn _get_machine<T: Transaction>(&self, txn: &T, uuid: &Uuid)
-> Result<Option<Machine>>
{
match txn.get(self.db, uuid.as_bytes()) {
Ok(bytes) => {
let mut machine: Machine = flexbuffers::from_slice(bytes)?;
machine.id = *uuid;
Ok(Some(machine))
},
Err(lmdb::Error::NotFound) => { Ok(None) },
Err(e) => { Err(e.into()) },
}
}
pub fn _put_machine( &self, txn: &mut RwTransaction, uuid: &Uuid, machine: Machine)
-> Result<()>
{
let bytes = flexbuffers::to_vec(machine)?;
txn.put(self.db, uuid.as_bytes(), &bytes, lmdb::WriteFlags::empty())?;
Ok(())
}
pub fn load_db(&mut self, txn: &mut RwTransaction, mut path: PathBuf) -> Result<()> {
path.push("machines");
for entry in std::fs::read_dir(path)? {
let entry = entry?;
let path = entry.path();
if path.is_file() {
// will only ever be none if the path has no file name and then how is it a file?!
let machID_str = path
.file_stem().expect("Found a file with no filename?")
.to_str().expect("Found an OsStr that isn't valid Unicode. Fix your OS!");
let machID = match uuid::Uuid::from_str(machID_str) {
Ok(i) => i,
Err(e) => {
warn!(self.log, "File {} had a invalid name. Expected an u64 in [0-9a-z] hex with optional file ending: {}. Skipping!", path.display(), e);
continue;
}
};
let s = match fs::read_to_string(path.as_path()) {
Ok(s) => s,
Err(e) => {
warn!(self.log, "Failed to open file {}: {}, skipping!"
, path.display()
, e);
continue;
}
};
let mach: Machine = match toml::from_str(&s) {
Ok(r) => r,
Err(e) => {
warn!(self.log, "Failed to parse mach at path {}: {}, skipping!"
, path.display()
, e);
continue;
}
};
self._put_machine(txn, &machID, mach)?;
debug!(self.log, "Loaded machine {}", machID);
} else {
warn!(self.log, "Path {} is not a file, skipping!", path.display());
}
}
Ok(())
}
pub fn dump_db<T: Transaction>(&self, txn: &T, mut path: PathBuf) -> Result<()> {
path.push("machines");
let mut mach_cursor = txn.open_ro_cursor(self.db)?;
for buf in mach_cursor.iter_start() {
let (kbuf, vbuf) = buf?;
let machID = uuid::Uuid::from_slice(kbuf).unwrap();
let mach: Machine = flexbuffers::from_slice(vbuf)?;
let filename = format!("{}.yml", machID.to_hyphenated().to_string());
path.set_file_name(filename);
let mut fp = std::fs::File::create(&path)?;
let out = toml::to_vec(&mach)?;
fp.write_all(&out)?;
}
Ok(())
}
}
impl MachineDB for Internal {
fn get_machine(&self, machID: &MachineIdentifier) -> Result<Option<Machine>> {
let txn = self.env.begin_ro_txn()?;
self._get_machine(&txn, machID)
}
fn put_machine(&self, machID: &MachineIdentifier, machine: Machine) -> Result<()> {
let mut txn = self.env.begin_rw_txn()?;
self._put_machine(&mut txn, machID, machine)?;
txn.commit()?;
Ok(())
}
}

View File

@ -52,7 +52,6 @@ const LMDB_MAX_DB: u32 = 16;
// Returning a `Result` from `main` allows us to use the `?` shorthand. // Returning a `Result` from `main` allows us to use the `?` shorthand.
// In the case of an Err it will be printed using `fmt::Debug` // In the case of an Err it will be printed using `fmt::Debug`
fn main() -> Result<(), Error> { fn main() -> Result<(), Error> {
let signal = Box::pin(async { let signal = Box::pin(async {
let (tx, mut rx) = UnixStream::pair()?; let (tx, mut rx) = UnixStream::pair()?;
// Initialize signal handler. // Initialize signal handler.

View File

@ -35,5 +35,5 @@ impl Network {
enum Event { enum Event {
/// An user wants to use a machine /// An user wants to use a machine
// TODO: Define /what/ an user wants to do with said machine? // TODO: Define /what/ an user wants to do with said machine?
MachineRequest(machine::ID, UserIdentifier), MachineRequest(machine::MachineIdentifier, UserIdentifier),
} }