From 62e1e9276fe7f52c971c8e2e963a54364d87a17b Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Tue, 10 Nov 2020 13:34:09 +0100 Subject: [PATCH] Splits off MachineDB --- Cargo.toml | 4 +- src/connection.rs | 15 +- src/db/access.rs | 13 +- .../access/{adapter_lmdb.rs => internal.rs} | 0 src/db/machine.rs | 142 ++---------------- src/db/machine/internal.rs | 127 ++++++++++++++++ src/main.rs | 1 - src/network.rs | 2 +- 8 files changed, 165 insertions(+), 139 deletions(-) rename src/db/access/{adapter_lmdb.rs => internal.rs} (100%) create mode 100644 src/db/machine/internal.rs diff --git a/Cargo.toml b/Cargo.toml index 32efcb1..7c13c58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,8 +42,8 @@ uuid = { version = "0.8", features = ["serde", "v4"] } clap = "2.33" # TODO update this if bindgen breaks (again) -#rsasl = "0.2.3" -rsasl = { path = "../../rsasl" } +rsasl = "0.2.3" +#rsasl = { path = "../../rsasl" } # rumqtt needs tokio which I'm trying to get away from paho-mqtt = { git = "https://github.com/dequbed/paho.mqtt.rust.git", branch = "master", features = ["build_bindgen"] } diff --git a/src/connection.rs b/src/connection.rs index 7014c3d..faaaca5 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -55,14 +55,23 @@ impl connection_capnp::bootstrap::Server for Connection { } async fn handshake(log: &Logger, stream: &mut TcpStream) -> Result<()> { - if let Some(m) = capnp_futures::serialize::read_message(stream, Default::default()).await? { + if let Some(m) = capnp_futures::serialize::read_message(stream.clone(), Default::default()).await? { let greeting = m.get_root::()?; let major = greeting.get_major(); let minor = greeting.get_minor(); - if major != 1 { + if major != 0 { Err(Error::BadVersion((major, minor))) } else { + let program = format!("{}-{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")); + + let mut answer = ::capnp::message::Builder::new_default(); + let mut b = answer.init_root::(); + b.set_program(&program); + b.set_host("localhost"); + b.set_major(0); + b.set_minor(1); + capnp_futures::serialize::write_message(stream, answer).await?; info!(log, "Handshake successful with peer {} running {}, API {}.{}", greeting.get_host()?, greeting.get_program()?, major, minor); Ok(()) @@ -73,7 +82,7 @@ async fn handshake(log: &Logger, stream: &mut TcpStream) -> Result<()> { } pub async fn handle_connection(log: Logger, mut stream: TcpStream) -> Result<()> { - handshake(&log, &mut stream).await?; + //handshake(&log, &mut stream).await?; let mut conn = Connection::new(log); let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(conn); diff --git a/src/db/access.rs b/src/db/access.rs index 9fc654f..94608ec 100644 --- a/src/db/access.rs +++ b/src/db/access.rs @@ -21,11 +21,11 @@ use lmdb::{Environment, Transaction, RwTransaction, Cursor}; use crate::config::Settings; use crate::error::Result; -mod adapter_lmdb; +mod internal; use crate::db::user::User; -use adapter_lmdb::PermissionsDB; -pub use adapter_lmdb::init; +use internal::PermissionsDB; +pub use internal::init; pub trait RoleDB { fn get_role(&self, roleID: &RoleIdentifier) -> Result>; @@ -40,8 +40,8 @@ pub trait RoleDB { /// Check if a given permission is granted by any of the given roles or their respective /// parents - /// - /// Default implementation which adapter may overwrite with more efficient specialized + /// + /// A Default implementation exists which adapter may overwrite with more efficient specialized /// implementations. fn check_roles(&self, roles: &[RoleIdentifier], permID: &PermIdentifier) -> Result { // Tally all roles. Makes dependent roles easier @@ -64,7 +64,8 @@ pub trait RoleDB { /// Tally a role dependency tree into a set /// - /// Default implementation which adapter may overwrite with more efficient implementations + /// A Default implementation exists which adapter may overwrite with more efficient + /// implementations. fn tally_role(&self, roles: &mut HashSet, roleID: &RoleIdentifier) -> Result<()> { if let Some(role) = self.get_role(roleID)? { // Only check and tally parents of a role at the role itself if it's the first time we diff --git a/src/db/access/adapter_lmdb.rs b/src/db/access/internal.rs similarity index 100% rename from src/db/access/adapter_lmdb.rs rename to src/db/access/internal.rs diff --git a/src/db/machine.rs b/src/db/machine.rs index 1b2f089..8290d1d 100644 --- a/src/db/machine.rs +++ b/src/db/machine.rs @@ -31,7 +31,10 @@ use futures_signals::signal::*; use crate::registries::StatusSignal; use crate::db::user::User; -pub type ID = Uuid; +mod internal; +use internal::Internal; + +pub type MachineIdentifier = Uuid; /// Status of a Machine #[derive(Clone, Copy, PartialEq, Eq, Debug, Serialize, Deserialize)] @@ -45,23 +48,13 @@ pub enum Status { Blocked, } -#[derive(Clone)] -pub struct Machines { - inner: Arc>, -} -impl Machines { - pub fn new(inner: Arc>) -> Self { - Self { inner } - } -} - #[derive(Clone)] pub struct GiveBack { - mdb: Arc>, + mdb: Arc>, uuid: Uuid, } impl GiveBack { - pub fn new(mdb: Arc>, uuid: Uuid) -> Self { + pub fn new(mdb: Arc>, uuid: Uuid) -> Self { Self { mdb, uuid } } } @@ -80,18 +73,6 @@ fn api_from_uuid(uuid: Uuid, mut wr: crate::api::api_capnp::u_u_i_d::Builder) { wr.set_uuid1(uuid1); } -#[derive(Clone)] -pub struct MachineManager { - mdb: Arc>, - uuid: Uuid, -} - -impl MachineManager { - pub fn new(uuid: Uuid, mdb: Arc>) -> Self { - Self { mdb, uuid } - } -} - #[derive(Debug, Serialize, Deserialize)] /// Internal machine representation /// @@ -102,7 +83,7 @@ pub struct Machine { /// Computer-readable identifier for this machine // Implicit in database since it's the key. #[serde(skip)] - id: ID, + id: MachineIdentifier, /// The human-readable name of the machine. Does not need to be unique name: String, @@ -132,10 +113,10 @@ impl Machine { /// A signal is a lossy stream of state changes. Lossy in that if changes happen in quick /// succession intermediary values may be lost. But this isn't really relevant in this case /// since the only relevant state is the latest one. - /// dedupe ensures that if state is changed but only changes to the value it had beforehand - /// (could for example happen if the machine changes current user but stays activated) no - /// update is sent. pub fn signal(&self) -> StatusSignal { + // dedupe ensures that if state is changed but only changes to the value it had beforehand + // (could for example happen if the machine changes current user but stays activated) no + // update is sent. Box::pin(self.state.signal().dedupe()) } @@ -161,108 +142,17 @@ impl Machine { } } -pub struct MachinesProvider { - log: Logger, - db: lmdb::Database, +// TODO split up for non-writable Definition Databases +pub trait MachineDB { + fn get_machine(&self, machID: &MachineIdentifier) -> Result>; + fn put_machine(&self, machID: &MachineIdentifier, machine: Machine) -> Result<()>; } -impl MachinesProvider { - pub fn new(log: Logger, db: lmdb::Database) -> Self { - Self { log, db } - } - - pub fn get_machine(&self, txn: &T, uuid: Uuid) - -> Result> - { - match txn.get(self.db, &uuid.as_bytes()) { - Ok(bytes) => { - let mut machine: Machine = flexbuffers::from_slice(bytes)?; - machine.id = uuid; - - Ok(Some(machine)) - }, - Err(lmdb::Error::NotFound) => { Ok(None) }, - Err(e) => { Err(e.into()) }, - } - } - - pub fn put_machine( &self, txn: &mut RwTransaction, uuid: Uuid, machine: Machine) - -> Result<()> - { - let bytes = flexbuffers::to_vec(machine)?; - txn.put(self.db, &uuid.as_bytes(), &bytes, lmdb::WriteFlags::empty())?; - - Ok(()) - } - - pub fn load_db(&mut self, txn: &mut RwTransaction, mut path: PathBuf) -> Result<()> { - path.push("machines"); - for entry in std::fs::read_dir(path)? { - let entry = entry?; - let path = entry.path(); - if path.is_file() { - // will only ever be none if the path has no file name and then how is it a file?! - let machID_str = path - .file_stem().expect("Found a file with no filename?") - .to_str().expect("Found an OsStr that isn't valid Unicode. Fix your OS!"); - let machID = match uuid::Uuid::from_str(machID_str) { - Ok(i) => i, - Err(e) => { - warn!(self.log, "File {} had a invalid name. Expected an u64 in [0-9a-z] hex with optional file ending: {}. Skipping!", path.display(), e); - continue; - } - }; - let s = match fs::read_to_string(path.as_path()) { - Ok(s) => s, - Err(e) => { - warn!(self.log, "Failed to open file {}: {}, skipping!" - , path.display() - , e); - continue; - } - }; - let mach: Machine = match toml::from_str(&s) { - Ok(r) => r, - Err(e) => { - warn!(self.log, "Failed to parse mach at path {}: {}, skipping!" - , path.display() - , e); - continue; - } - }; - self.put_machine(txn, machID, mach)?; - debug!(self.log, "Loaded machine {}", machID); - } else { - warn!(self.log, "Path {} is not a file, skipping!", path.display()); - } - } - - Ok(()) - } - - pub fn dump_db(&self, txn: &T, mut path: PathBuf) -> Result<()> { - path.push("machines"); - let mut mach_cursor = txn.open_ro_cursor(self.db)?; - for buf in mach_cursor.iter_start() { - let (kbuf, vbuf) = buf?; - let machID = uuid::Uuid::from_slice(kbuf).unwrap(); - let mach: Machine = flexbuffers::from_slice(vbuf)?; - let filename = format!("{}.yml", machID.to_hyphenated().to_string()); - path.set_file_name(filename); - let mut fp = std::fs::File::create(&path)?; - let out = toml::to_vec(&mach)?; - fp.write_all(&out)?; - } - - Ok(()) - } -} - -pub fn init(log: Logger, config: &Settings, env: Arc) -> Result { +pub fn init(log: Logger, config: &Settings, env: Arc) -> Result { let mut flags = lmdb::DatabaseFlags::empty(); flags.set(lmdb::DatabaseFlags::INTEGER_KEY, true); let machdb = env.create_db(Some("machines"), flags)?; debug!(&log, "Opened machine db successfully."); - Ok(MachinesProvider::new(log, machdb)) + Ok(Internal::new(log, env, machdb)) } diff --git a/src/db/machine/internal.rs b/src/db/machine/internal.rs new file mode 100644 index 0000000..1334787 --- /dev/null +++ b/src/db/machine/internal.rs @@ -0,0 +1,127 @@ +use std::sync::Arc; +use std::fs; +use std::io::Write; +use std::str::FromStr; +use std::path::PathBuf; + +use slog::Logger; +use uuid::Uuid; + +use lmdb::{Environment, Transaction, RwTransaction, Cursor}; + +use super::{MachineIdentifier, Machine, MachineDB}; +use crate::error::Result; + +#[derive(Clone, Debug)] +pub struct Internal { + log: Logger, + env: Arc, + db: lmdb::Database, +} + +impl Internal { + pub fn new(log: Logger, env: Arc, db: lmdb::Database) -> Self { + Self { log, env, db } + } + + pub fn _get_machine(&self, txn: &T, uuid: &Uuid) + -> Result> + { + match txn.get(self.db, uuid.as_bytes()) { + Ok(bytes) => { + let mut machine: Machine = flexbuffers::from_slice(bytes)?; + machine.id = *uuid; + + Ok(Some(machine)) + }, + Err(lmdb::Error::NotFound) => { Ok(None) }, + Err(e) => { Err(e.into()) }, + } + } + + pub fn _put_machine( &self, txn: &mut RwTransaction, uuid: &Uuid, machine: Machine) + -> Result<()> + { + let bytes = flexbuffers::to_vec(machine)?; + txn.put(self.db, uuid.as_bytes(), &bytes, lmdb::WriteFlags::empty())?; + + Ok(()) + } + + pub fn load_db(&mut self, txn: &mut RwTransaction, mut path: PathBuf) -> Result<()> { + path.push("machines"); + for entry in std::fs::read_dir(path)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() { + // will only ever be none if the path has no file name and then how is it a file?! + let machID_str = path + .file_stem().expect("Found a file with no filename?") + .to_str().expect("Found an OsStr that isn't valid Unicode. Fix your OS!"); + let machID = match uuid::Uuid::from_str(machID_str) { + Ok(i) => i, + Err(e) => { + warn!(self.log, "File {} had a invalid name. Expected an u64 in [0-9a-z] hex with optional file ending: {}. Skipping!", path.display(), e); + continue; + } + }; + let s = match fs::read_to_string(path.as_path()) { + Ok(s) => s, + Err(e) => { + warn!(self.log, "Failed to open file {}: {}, skipping!" + , path.display() + , e); + continue; + } + }; + let mach: Machine = match toml::from_str(&s) { + Ok(r) => r, + Err(e) => { + warn!(self.log, "Failed to parse mach at path {}: {}, skipping!" + , path.display() + , e); + continue; + } + }; + self._put_machine(txn, &machID, mach)?; + debug!(self.log, "Loaded machine {}", machID); + } else { + warn!(self.log, "Path {} is not a file, skipping!", path.display()); + } + } + + Ok(()) + } + + pub fn dump_db(&self, txn: &T, mut path: PathBuf) -> Result<()> { + path.push("machines"); + let mut mach_cursor = txn.open_ro_cursor(self.db)?; + for buf in mach_cursor.iter_start() { + let (kbuf, vbuf) = buf?; + let machID = uuid::Uuid::from_slice(kbuf).unwrap(); + let mach: Machine = flexbuffers::from_slice(vbuf)?; + let filename = format!("{}.yml", machID.to_hyphenated().to_string()); + path.set_file_name(filename); + let mut fp = std::fs::File::create(&path)?; + let out = toml::to_vec(&mach)?; + fp.write_all(&out)?; + } + + Ok(()) + } +} + +impl MachineDB for Internal { + fn get_machine(&self, machID: &MachineIdentifier) -> Result> { + let txn = self.env.begin_ro_txn()?; + self._get_machine(&txn, machID) + } + + fn put_machine(&self, machID: &MachineIdentifier, machine: Machine) -> Result<()> { + let mut txn = self.env.begin_rw_txn()?; + self._put_machine(&mut txn, machID, machine)?; + txn.commit()?; + + Ok(()) + } +} diff --git a/src/main.rs b/src/main.rs index 9698202..5387e2b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -52,7 +52,6 @@ const LMDB_MAX_DB: u32 = 16; // Returning a `Result` from `main` allows us to use the `?` shorthand. // In the case of an Err it will be printed using `fmt::Debug` fn main() -> Result<(), Error> { - let signal = Box::pin(async { let (tx, mut rx) = UnixStream::pair()?; // Initialize signal handler. diff --git a/src/network.rs b/src/network.rs index bb69984..99057f2 100644 --- a/src/network.rs +++ b/src/network.rs @@ -35,5 +35,5 @@ impl Network { enum Event { /// An user wants to use a machine // TODO: Define /what/ an user wants to do with said machine? - MachineRequest(machine::ID, UserIdentifier), + MachineRequest(machine::MachineIdentifier, UserIdentifier), }