diff --git a/Cargo.lock b/Cargo.lock index 41f97f7..f57cdc1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -146,6 +146,18 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-native-tls" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e9e7a929bd34c68a82d58a4de7f86fffdaf97fb2af850162a7bb19dd7269b33" +dependencies = [ + "async-std", + "native-tls", + "thiserror", + "url", +] + [[package]] name = "async-net" version = "1.6.1" @@ -183,6 +195,26 @@ dependencies = [ "winapi", ] +[[package]] +name = "async-std" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8056f1455169ab86dd47b47391e4ab0cbd25410a70e9fe675544f49bafaf952" +dependencies = [ + "async-channel", + "async-lock", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + [[package]] name = "async-task" version = "4.0.3" @@ -323,6 +355,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "bumpalo" +version = "3.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1e260c3a9040a7c19a12468758f4c16f31a81a1fe087482be9570ec864bb6c" + [[package]] name = "byte-tools" version = "0.3.1" @@ -487,6 +525,22 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" +[[package]] +name = "core-foundation" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6888e10551bb93e424d8df1d07f1a8b4fceb0001a3a4b048bfc47554946f47b3" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" + [[package]] name = "cpufeatures" version = "0.2.1" @@ -556,6 +610,7 @@ name = "diflouroborane" version = "0.3.0" dependencies = [ "async-channel", + "async-native-tls", "async-oneshot", "async-trait", "bincode", @@ -679,6 +734,21 @@ dependencies = [ "instant", ] +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.0.1" @@ -1085,6 +1155,15 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" +[[package]] +name = "js-sys" +version = "0.3.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cc9ffccd38c451a86bf13657df244e9c3f37493cce8e5e21e940963777acc84" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -1185,6 +1264,24 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +[[package]] +name = "native-tls" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48ba9f7719b5a0f42f338907614285fb5fd70e53858141f69898a1fb7203b24d" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nom" version = "5.1.2" @@ -1243,6 +1340,39 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "openssl" +version = "0.10.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d9facdb76fec0b73c406f125d44d86fdad818d66fef0531eec9233ca425ff4a" +dependencies = [ + "bitflags", + "cfg-if 1.0.0", + "foreign-types", + "libc", + "once_cell", + "openssl-sys", +] + +[[package]] +name = "openssl-probe" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a" + +[[package]] +name = "openssl-sys" +version = "0.9.67" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69df2d8dfc6ce3aaf44b40dec6f487d5a886516cf6879c49e98e0710f310a058" +dependencies = [ + "autocfg", + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "paho-mqtt" version = "0.8.0" @@ -1677,12 +1807,45 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" +dependencies = [ + "lazy_static", + "winapi", +] + [[package]] name = "seahash" version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" +[[package]] +name = "security-framework" +version = "2.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "525bc1abfda2e1998d152c45cf13e696f76d0a4972310b22fac1658b05df7c87" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9dd14d83160b528b7bfd66439110573efcfbe281b17fc2ca9f39f550d619c7e" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.130" @@ -2083,6 +2246,12 @@ dependencies = [ "serde", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vec_map" version = "0.8.2" @@ -2118,6 +2287,82 @@ version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +[[package]] +name = "wasm-bindgen" +version = "0.2.78" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce" +dependencies = [ + "cfg-if 1.0.0", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.78" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a317bf8f9fba2476b4b2c85ef4c4af8ff39c3c7f0cdfeed4f82c34a880aa837b" +dependencies = [ + "bumpalo", + "lazy_static", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e8d7523cb1f2a4c96c1317ca690031b714a51cc14e05f712446691f413f5d39" +dependencies = [ + "cfg-if 1.0.0", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.78" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.78" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.78" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0237232789cf037d5480773fe568aac745bfe2afbc11a863e97901780a6b47cc" + +[[package]] +name = "web-sys" +version = "0.3.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38eb105f1c59d9eaa6b5cdc92b859d85b926e82cb2e0945cd0c9259faa6fe9fb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "wepoll-ffi" version = "0.1.2" diff --git a/Cargo.toml b/Cargo.toml index 96e56c0..4050cfe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ libc = "0.2.101" lazy_static = "1.4.0" uuid = { version = "0.8.2", features = ["serde", "v4"] } async-trait = "0.1.51" +async-native-tls = "0.3" # Runtime smol = "1.2.5" diff --git a/src/access/internal.rs b/src/access/internal.rs deleted file mode 100644 index fa6d2ce..0000000 --- a/src/access/internal.rs +++ /dev/null @@ -1,155 +0,0 @@ -use std::collections::HashMap; - -use std::path::Path; -use std::sync::Arc; - -use slog::Logger; -use lmdb::{Environment, Transaction, RwTransaction, Cursor}; - -use crate::config::Config; -use crate::error::Result; - -use crate::db::access::{Permission, Role, RoleIdentifier, RoleDB}; - -#[derive(Clone, Debug)] -pub struct Internal { - log: Logger, - env: Arc, - roledb: lmdb::Database, -} - -impl Internal { - pub fn new(log: Logger, env: Arc, roledb: lmdb::Database) -> Self { - Self { log, env, roledb, } - } - - /// Check if a given user has the given permission - #[allow(unused)] - pub fn _check>(&self, txn: &T, user: &UserData, perm: &P) - -> Result - { - tracing::debug!("Checking user {:?} for permission {:?}", user, perm.as_ref()); - // Tally all roles. Makes dependent roles easier - let mut roles = HashMap::new(); - for role_id in user.roles.iter() { - tracing::debug!("Tallying role {} for its parents", role_id); - self._tally_role(txn, &mut roles, role_id)?; - } - - // Iter all unique role->permissions we've found and early return on match. - // TODO: Change this for negative permissions? - for (roleid, role) in roles.iter() { - tracing::debug!(" checking role {}", roleid); - for perm_rule in role.permissions.iter() { - if perm_rule.match_perm(perm) { - tracing::debug!(" matches permission rule {}", perm_rule); - return Ok(true); - } - tracing::trace!(" rejecting permission rule {}", perm_rule); - } - } - - tracing::debug!("Checked all roles, rejecting access"); - - return Ok(false); - } - - fn _tally_role(&self, txn: &T, roles: &mut HashMap, role_id: &RoleIdentifier) -> Result<()> { - if let Some(role) = self._get_role(txn, role_id)? { - // Only check and tally parents of a role at the role itself if it's the first time we - // see it - if !roles.contains_key(&role_id) { - for parent in role.parents.iter() { - self._tally_role(txn, roles, parent)?; - } - - roles.insert(role_id.clone(), role); - } - } else { - tracing::warn!("Did not find role {} while trying to tally", role_id); - } - - Ok(()) - } - - pub fn _get_role<'txn, T: Transaction>(&self, txn: &'txn T, role_id: &RoleIdentifier) -> Result> { - tracing::debug!("Reading role '{}'", role_id.name); - match txn.get(self.roledb, &role_id.name.as_bytes()) { - Ok(bytes) => { - Ok(Some(flexbuffers::from_slice(bytes)?)) - }, - Err(lmdb::Error::NotFound) => { Ok(None) }, - Err(e) => { Err(e.into()) } - } - } - - fn put_role(&self, txn: &mut RwTransaction, role_id: &RoleIdentifier, role: Role) -> Result<()> { - let bytes = flexbuffers::to_vec(role)?; - txn.put(self.roledb, &role_id.name.as_bytes(), &bytes, lmdb::WriteFlags::empty())?; - - Ok(()) - } - - - pub fn dump_roles(&self) -> Result> { - let txn = self.env.begin_ro_txn()?; - self.dump_roles_txn(&txn) - } - pub fn dump_roles_txn(&self, txn: &T) -> Result> { - let mut cursor = txn.open_ro_cursor(self.roledb)?; - - let mut vec = Vec::new(); - for r in cursor.iter_start() { - match r { - Ok( (k,v) ) => { - let role_name_str = unsafe { std::str::from_utf8_unchecked(k) }; - let role_id = RoleIdentifier::local_from_str("lmdb".to_string(), role_name_str.to_string()); - match flexbuffers::from_slice(v) { - Ok(role) => vec.push((role_id, role)), - Err(e) => tracing::error!("Bad format for roleid {}: {}", role_id, - e), - } - }, - Err(e) => return Err(e.into()), - } - } - - Ok(vec) - } - - pub fn load_roles>(&self, path: P) -> Result<()> { - let mut txn = self.env.begin_rw_txn()?; - self.load_roles_txn(&mut txn, path.as_ref())?; - - // In case the above didn't error, commit. - txn.commit()?; - Ok(()) - } - fn load_roles_txn(&self, txn: &mut RwTransaction, path: &Path) -> Result<()> { - let roles = Role::load_file(path)?; - - for (k,v) in roles.iter() { - self.put_role(txn, k, v.clone())?; - } - - tracing::debug!("Loaded roles: {:?}", roles); - - Ok(()) - } -} - -impl RoleDB for Internal { - fn get_type_name(&self) -> &'static str { - "Internal" - } - - fn get_role(&self, role_id: &RoleIdentifier) -> Result> { - let txn = self.env.begin_ro_txn()?; - self._get_role(&txn, role_id) - } - - fn tally_role(&self, roles: &mut HashMap, role_id: &RoleIdentifier) -> Result<()> { - let txn = self.env.begin_ro_txn()?; - self._tally_role(&txn, roles, role_id) - } -} \ No newline at end of file diff --git a/src/actor.rs b/src/actor.rs deleted file mode 100644 index 8d00cf7..0000000 --- a/src/actor.rs +++ /dev/null @@ -1,181 +0,0 @@ -use std::pin::Pin; -use std::task::{Poll, Context}; -use std::sync::Mutex; -use std::collections::HashMap; -use std::future::Future; - -use futures::{future::BoxFuture, Stream}; -use futures::channel::mpsc; -use futures_signals::signal::Signal; - -use crate::db::machine::MachineState; -use crate::config::Config; -use crate::error::Result; - -use paho_mqtt::AsyncClient; -use slog::Logger; - -pub trait Actuator { - fn apply(&mut self, state: MachineState) -> BoxFuture<'static, ()>; -} - -pub type ActorSignal = Box + Unpin + Send>; - -pub struct Actor { - // FIXME: This should really be a Signal. - // But, alas, MutableSignalCloned is itself not `Clone`. For good reason as keeping track of - // the changes itself happens in a way that Clone won't work (well). - // So, you can't clone it, you can't copy it and you can't get at the variable inside outside - // of a task context. In short, using Mutable isn't possible and we would have to write our own - // implementation of MutableSignal*'s . Preferably with the correct optimizations for our case - // where there is only one consumer. So a mpsc channel that drops all but the last input. - rx: mpsc::Receiver>, - inner: Option, - - actuator: Box, - future: Option>, -} - -impl Actor { - pub fn new(rx: mpsc::Receiver>, actuator: Box) -> Self { - Self { - rx: rx, - inner: None, - actuator: actuator, - future: None, - } - } - - pub fn wrap(actuator: Box) -> (mpsc::Sender>, Self) { - let (tx, rx) = mpsc::channel(1); - (tx, Self::new(rx, actuator)) - } -} - -impl Future for Actor { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let mut this = &mut *self; - let mut done = false; // Is the channel with new state-signals exhausted? - - // FIXME: This is potentially invalid, and may lead to the situation that the signal is - // replaced *twice* but the second change will not be honoured since this implementation of - // events is *EDGE*-triggered! - // Update the signal we're polling from, if there is an update that is. - match Stream::poll_next(Pin::new(&mut this.rx), cx) { - Poll::Ready(None) => done = true, - Poll::Ready(Some(new_signal)) => this.inner = new_signal, - Poll::Pending => { }, - } - - // Work until there is no more work to do. - loop { - - // Poll the `apply` future. And ensure it's completed before the next one is started - match this.future.as_mut().map(|future| Future::poll(Pin::new(future), cx)) { - // Skip and poll for a new future to do - None => { } - - // This apply future is done, get a new one - Some(Poll::Ready(_)) => this.future = None, - - // This future would block so we return to continue work another time - Some(Poll::Pending) => return Poll::Pending, - } - - // Poll the signal and apply any change that happen to the inner Actuator - match this.inner.as_mut().map(|inner| Signal::poll_change(Pin::new(inner), cx)) { - // No signal to poll - None => return Poll::Pending, - Some(Poll::Pending) => return Poll::Pending, - Some(Poll::Ready(None)) => { - this.inner = None; - - if done { - return Poll::Ready(()); - } else { - return Poll::Pending; - } - }, - Some(Poll::Ready(Some(state))) => { - // This future MUST be polled before we exit from the Actor::poll because if we - // do not do that it will not register the dependency and thus NOT BE POLLED. - this.future.replace(this.actuator.apply(state)); - } - } - } - } -} - -pub struct Dummy { - log: Logger, -} - -impl Dummy { - pub fn new(log: Logger) -> Self { - Self { log } - } -} - -impl Actuator for Dummy { - fn apply(&mut self, state: MachineState) -> BoxFuture<'static, ()> { - info!(self.log, "New state for dummy actuator: {:?}", state); - Box::pin(smol::future::ready(())) - } -} - -pub fn load(log: &Logger, config: &Config) -> Result<(ActorMap, Vec)> { - let mut map = HashMap::new(); - - let mqtt = AsyncClient::new(config.mqtt_url.clone())?; - let tok = mqtt.connect(paho_mqtt::ConnectOptions::new()); - smol::block_on(tok)?; - - let actuators = config.actors.iter() - .map(|(k,v)| (k, load_single(log, k, &v.module, &v.params, mqtt.clone()))) - .filter_map(|(k, n)| match n { - None => None, - Some(a) => Some((k, a)) - }); - - let mut v = Vec::new(); - for (name, actuator) in actuators { - let (tx, a) = Actor::wrap(actuator); - map.insert(name.clone(), Mutex::new(tx)); - v.push(a); - } - - - Ok(( map, v )) -} - -fn load_single( - log: &Logger, - name: &String, - module_name: &String, - params: &HashMap, - client: AsyncClient, - ) -> Option> -{ - use crate::modules::*; - - info!(log, "Loading actor \"{}\" with module {} and params {:?}", name, module_name, params); - let log = log.new(o!("name" => name.clone())); - match module_name.as_ref() { - "Dummy" => { - Some(Box::new(Dummy::new(log))) - } - "Process" => { - Process::new(log, name.clone(), params) - .map(|a| a.into_boxed_actuator()) - } - "Shelly" => { - Some(Box::new(Shelly::new(log, name.clone(), client))) - } - _ => { - error!(log, "No actor found with name \"{}\", configured as \"{}\".", module_name, name); - None - }, - } -} diff --git a/src/api/auth.rs b/src/api/auth.rs deleted file mode 100644 index 2daba86..0000000 --- a/src/api/auth.rs +++ /dev/null @@ -1,292 +0,0 @@ -//! Authentication subsystem -//! -//! Authorization is over in `permissions` -//! Authentication using SASL - -use std::sync::Arc; -use std::rc::Rc; -use std::cell::RefCell; -use std::ops::Deref; - -use slog::Logger; - -use rsasl::{ - SASL, - RSASL, - Property, - Session as SaslSession, - ReturnCode, - Callback, - Step, -}; - -use serde::{Serialize, Deserialize}; - -use capnp::capability::{Params, Results, Promise}; - -use crate::api::Session; - -pub use crate::schema::authenticationsystem_capnp as auth_system; -use crate::db::Databases; -use crate::db::pass::PassDB; -use crate::db::user::{Internal as UserDB, UserId, User}; -use crate::db::access::AccessControl as AccessDB; - -pub struct AppData { - userdb: Arc, -} -pub struct SessionData { - authz: Option, -} - -struct CB; -impl Callback for CB { - fn callback(sasl: &mut SASL, - session: &mut SaslSession, - prop: Property - ) -> Result<(), ReturnCode> - { - let ret = match prop { - Property::GSASL_VALIDATE_SIMPLE => { - // FIXME: get_property and retrieve_mut can't be used interleaved but that's - // technically safe. - - let authid: &str = session - .get_property(Property::GSASL_AUTHID) - .ok_or(ReturnCode::GSASL_NO_AUTHID) - .and_then(|a| match a.to_str() { - Ok(s) => Ok(s), - Err(_) => Err(ReturnCode::GSASL_SASLPREP_ERROR), - })?; - - let pass = session.get_property(Property::GSASL_PASSWORD) - .ok_or(ReturnCode::GSASL_NO_PASSWORD)?; - - - if let Some(appdata) = sasl.retrieve_mut() { - if let Ok(Some(user)) = appdata.userdb.login(authid, pass.to_bytes()) { - session.retrieve_mut().unwrap().authz.replace(user); - return Ok(()); - } - } - - ReturnCode::GSASL_AUTHENTICATION_ERROR - } - p => { - println!("Callback called with property {:?}", p); - ReturnCode::GSASL_NO_CALLBACK - } - }; - Err(ret) - } -} - -pub struct Auth { - pub ctx: RSASL, - session: Rc>>, - access: Arc, - log: Logger, -} - -impl Auth { - pub fn new(log: Logger, dbs: Databases, session: Rc>>) -> Self { - let mut ctx = SASL::new().unwrap(); - - let appdata = Box::new(AppData { userdb: dbs.userdb.clone() }); - - ctx.store(appdata); - ctx.install_callback::(); - - Self { log, ctx, session, access: dbs.access.clone() } - } -} - -use crate::schema::authenticationsystem_capnp::*; -impl authentication_system::Server for Auth { - fn mechanisms(&mut self, - _: authentication_system::MechanismsParams, - mut res: authentication_system::MechanismsResults - ) -> Promise<(), capnp::Error> { - /*let mechs = match self.ctx.server_mech_list() { - Ok(m) => m, - Err(e) => { - return Promise::err(capnp::Error { - kind: capnp::ErrorKind::Failed, - description: format!("SASL Failure: {}", e), - }) - }, - }; - - let mechvec: Vec<&str> = mechs.iter().collect(); - - let mut res_mechs = res.get().init_mechs(mechvec.len() as u32); - for (i, m) in mechvec.into_iter().enumerate() { - res_mechs.set(i as u32, m); - }*/ - // For now, only PLAIN - let mut res_mechs = res.get().init_mechs(1); - res_mechs.set(0, "PLAIN"); - - Promise::ok(()) - } - - // TODO: return Outcome instead of exceptions - fn start(&mut self, - params: authentication_system::StartParams, - mut res: authentication_system::StartResults - ) -> Promise<(), capnp::Error> { - let req = pry!(pry!(params.get()).get_request()); - - // Extract the MECHANISM the client wants to use and start a session. - // Or fail at that and thrown an exception TODO: return Outcome - let mech = pry!(req.get_mechanism()); - if pry!(req.get_mechanism()) != "PLAIN" { - return Promise::err(capnp::Error { - kind: capnp::ErrorKind::Failed, - description: format!("Invalid SASL mech"), - }) - } - - let mut session = match self.ctx.server_start(mech) { - Ok(s) => s, - Err(e) => - return Promise::err(capnp::Error { - kind: capnp::ErrorKind::Failed, - description: format!("SASL error: {}", e), - }), - }; - - session.store(Box::new(SessionData { authz: None })); - - // If the client has provided initial data go use that - use request::initial_response::Which; - let step_res = match req.get_initial_response().which() { - Err(capnp::NotInSchema(_)) => - return Promise::err(capnp::Error { - kind: capnp::ErrorKind::Failed, - description: "Initial data is badly formatted".to_string(), - }), - - Ok(Which::None(_)) => { - // FIXME: Actually this needs to indicate NO data instead of SOME data of 0 length - session.step(&[]) - } - Ok(Which::Initial(data)) => { - session.step(pry!(data)) - } - }; - - // The step may either return an error, a success or the need for more data - // TODO: Set the session user. Needs a lookup though <.> - use response::Result as Resres; - match step_res { - Ok(Step::Done(b)) => { - let user = session - .retrieve_mut() - .and_then(|data| { - data.authz.take() - }) - .expect("Authentication returned OK but didn't set user id"); - - let perms = pry!(self.access.collect_permrules(&user.data) - .map_err(|e| capnp::Error::failed(format!("AccessDB lookup failed: {}", e)))); - self.session.replace(Some(Session::new( - self.log.new(o!()), - user.id, - "".to_string(), - user.data.roles.into_boxed_slice(), - perms.into_boxed_slice() - ))); - - let mut outcome = pry!(res.get().get_response()).init_outcome(); - outcome.reborrow().set_result(Resres::Successful); - if b.len() != 0 { - outcome.init_additional_data().set_additional(&b); - } - Promise::ok(()) - }, - Ok(Step::NeedsMore(b)) => { - pry!(res.get().get_response()).set_challence(&b); - Promise::ok(()) - } - // TODO: This should really be an outcome because this is failed auth just as much atm. - Err(e) => { - let mut outcome = pry!(res.get().get_response()).init_outcome(); - outcome.reborrow().set_result(Resres::Failed); - let text = format!("{}", e); - outcome.set_help_text(&text); - Promise::ok(()) - } - } - - } -} - -// Use the newtype pattern here to make the type system work for us; even though AuthCId is for all -// intents and purposes just a String the compiler will still complain if you return or more -// importantly pass a String intead of a AuthCId. This prevents bugs where you get an object from -// somewhere and pass it somewhere else and in between don't check if it's the right type and -// accidentally pass the authzid where the authcid should have gone. - -// What is a man?! A miserable little pile of secrets! -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] -/// Authentication/Authorization user object. -/// -/// This struct describes the user as can be gathered from API authentication exchanges. -/// Specifically this is the value bffh gets after a successful authentication. -/// -pub struct AuthenticationData { - /// Contains the Authentication ID used - /// - /// The authentication ID is an identifier for the authentication exchange. This is - /// conceptually different than the ID of the user to be authenticated; for example when using - /// x509 the authcid is the dn of the certificate, when using GSSAPI the authcid is of form - /// `@` - authcid: String, - - /// Authorization ID - /// - /// The authzid represents the identity that a client wants to act as. In our case this is - /// always an user id. If unset no preference is indicated and the server will authenticate the - /// client as whatever user — if any — they associate with the authcid. Setting the authzid is - /// useful in a number if situations: - /// If somebody wants to authenticate as somebody else, su-style. - /// If a person wants to authenticate as a higher-permissions account, e.g. foo may set authzid foo+admin - /// to split normal user and "admin" accounts. - /// If a method requires a specific authcid that is different from the identifier of the user - /// to authenticate as, e.g. GSSAPI, x509 client certificates, API TOKEN authentication. - authzid: String, - - /// Contains the authentication method used - /// - /// For the most part this is the SASL method - auth_method: String, - - /// Method-specific key-value pairs - /// - /// Each method can use their own key-value pairs. - /// E.g. EXTERNAL encodes the actual method used (x509 client certs, UID/GID for unix sockets, - /// ...) - kvs: Box<[(String, String)]>, -} - -// Authentication has two parts: Granting the authentication itself and then performing the -// authentication. -// Granting the authentication checks if -// a) the given authcid fits with the given (authMethod, kvs). In general a failure here indicates -// a programming failure — the authcid come from the same source as that tuple -// b) the given authcid may authenticate as the given authzid. E.g. if a given client certificate -// has been configured for that user, if a GSSAPI user maps to a given user, - -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] -pub enum AuthError { - /// Authentication ID is bad/unknown/.. - BadAuthcid, - /// Authorization ID is unknown/.. - BadAuthzid, - /// Authorization ID is not of form user+uid@realm - MalformedAuthzid, - /// User may not use that authorization id - NotAllowedAuthzid, - -} diff --git a/src/api/machine.rs b/src/api/machine.rs deleted file mode 100644 index 256a492..0000000 --- a/src/api/machine.rs +++ /dev/null @@ -1,253 +0,0 @@ -use std::sync::Arc; -use std::time::Duration; - -use capnp::capability::Promise; -use capnp::Error; - -use futures::FutureExt; - -use crate::db::access::{PrivilegesBuf, PermRule}; -use crate::db::user::UserId; -use crate::db::machine::{Status, MachineState}; -use crate::machine::Machine as NwMachine; -use crate::schema::machine_capnp::machine::*; -use crate::schema::machine_capnp::machine::MachineState as APIMState; - -#[derive(Clone, Copy)] -pub struct Perms { - pub disclose: bool, - pub read: bool, - pub write: bool, - pub manage: bool, -} - -impl Perms { - pub fn get_for<'a, I: Iterator>(privs: &'a PrivilegesBuf, rules: I) -> Self { - let mut disclose = false; - let mut read = false; - let mut write = false; - let mut manage = false; - for rule in rules { - if rule.match_perm(&privs.disclose) { - disclose = true; - } - if rule.match_perm(&privs.read) { - read = true; - } - if rule.match_perm(&privs.write) { - write = true; - } - if rule.match_perm(&privs.manage) { - manage = true; - } - } - - Self { disclose, read, write, manage } - } -} - -#[derive(Clone)] -pub struct Machine { - userid: UserId, - perms: Perms, - machine: NwMachine, -} - -impl Machine { - pub fn new(userid: UserId, perms: Perms, machine: NwMachine) -> Self { - Self { userid, perms, machine } - } -} - -impl info::Server for Machine { - fn get_machine_info_extended( - &mut self, - _: info::GetMachineInfoExtendedParams, - _results: info::GetMachineInfoExtendedResults, - ) -> Promise<(), capnp::Error> { - /*if self.perms.manage { - let mut builder = results.get(); - let mut extinfo = builder.init_machine_info_extended(); - let mut current = extinfo.init_current_user(); - // FIXME fill user - } - Promise::ok(())*/ - - Promise::err(capnp::Error::unimplemented("Extended Infos are unavailable".to_string())) - } - - fn get_reservation_list( - &mut self, - _: info::GetReservationListParams, - mut results: info::GetReservationListResults, - ) -> Promise<(), capnp::Error> { - Promise::err(capnp::Error::unimplemented("Reservations are unavailable".to_string())) - } - - fn get_property_list( - &mut self, - _: info::GetPropertyListParams, - mut results: info::GetPropertyListResults, - ) -> Promise<(), capnp::Error> { - Promise::err(capnp::Error::unimplemented("Extended Properties are unavailable".to_string())) - } -} - -impl use_::Server for Machine { - fn use_( - &mut self, - _: use_::UseParams, - _: use_::UseResults - ) -> Promise<(), capnp::Error> { - let machine = self.machine.get_inner(); - let userid = self.userid.clone(); - let f = async move { - let mut guard = machine.lock().await; - let mut ok = false; - { - match { guard.read_state().lock_ref().state.clone() } { - Status::Free => { - ok = true; - }, - Status::Reserved(ref whom) => { - // If it's reserved for us or we're allowed to take over - if &userid == whom { - ok = true; - } - }, - _ => { } - } - } - - if ok { - guard.do_state_change(MachineState::used(Some(userid))); - } - - Ok(()) - }; - - let g = smol::future::race(f, smol::Timer::after(Duration::from_secs(4)) - .map(|_| Err(capnp::Error::failed("Waiting for machine lock timed out!".to_string())))); - - Promise::from_future(g) - } -} - -impl in_use::Server for Machine { - fn give_back( - &mut self, - _:in_use::GiveBackParams, - _:in_use::GiveBackResults - ) -> Promise<(), capnp::Error> { - let machine = self.machine.get_inner(); - let userid = self.userid.clone(); - let f = async move { - let mut guard = machine.lock().await; - let mut ok = false; - { - match { guard.read_state().lock_ref().state.clone() } { - Status::InUse(ref whom) => { - if &Some(userid) == whom { - ok = true; - } - }, - _ => { } - } - } - - if ok { - guard.reset_state() - } - - Ok(()) - }; - - Promise::from_future(f) - } -} - -impl transfer::Server for Machine { -} - -impl check::Server for Machine { -} - -impl manage::Server for Machine { - fn force_free(&mut self, - _: manage::ForceFreeParams, - _: manage::ForceFreeResults - ) -> Promise<(), capnp::Error> { - let machine = self.machine.get_inner(); - let f = async move { - let mut guard = machine.lock().await; - guard.do_state_change(MachineState::free()); - Ok(()) - }; - Promise::from_future(f) - } - - fn force_use(&mut self, - _: manage::ForceUseParams, - _: manage::ForceUseResults - ) -> Promise<(), capnp::Error> { - let machine = self.machine.get_inner(); - let f = async move { - let mut guard = machine.lock().await; - guard.do_state_change(MachineState::used(None)); - Ok(()) - }; - Promise::from_future(f) - } - - fn block(&mut self, - _:manage::BlockParams, - _:manage::BlockResults - ) -> Promise<(), capnp::Error> { - let machine = self.machine.get_inner(); - let uid = self.userid.clone(); - let f = async move { - let mut guard = machine.lock().await; - guard.do_state_change(MachineState::blocked(uid)); - Ok(()) - }; - Promise::from_future(f) - } - - fn disabled(&mut self, - _:manage::DisabledParams, - _:manage::DisabledResults - ) -> Promise<(), capnp::Error> { - let machine = self.machine.get_inner(); - let f = async move { - let mut guard = machine.lock().await; - guard.do_state_change(MachineState::disabled()); - Ok(()) - }; - Promise::from_future(f) - } -} - -impl admin::Server for Machine { - fn force_set_state(&mut self, - params: admin::ForceSetStateParams, - _:admin::ForceSetStateResults - ) -> Promise<(), capnp::Error> { - let uid = self.userid.clone(); - let state = match pry!(pry!(params.get()).get_state()) { - APIMState::Free => MachineState::free(), - APIMState::Blocked => MachineState::blocked(uid), - APIMState::Disabled => MachineState::disabled(), - APIMState::InUse => MachineState::used(Some(uid)), - APIMState::Reserved => MachineState::reserved(uid), - APIMState::ToCheck => MachineState::check(uid), - }; - let machine = self.machine.get_inner(); - let f = async move { - let mut guard = machine.lock().await; - guard.do_state_change(state); - Ok(()) - }; - Promise::from_future(f) - } - -} diff --git a/src/api/machines.rs b/src/api/machines.rs deleted file mode 100644 index 82edf71..0000000 --- a/src/api/machines.rs +++ /dev/null @@ -1,194 +0,0 @@ -use std::sync::Arc; -use std::cell::RefCell; -use std::rc::Rc; -use std::ops::Deref; - -use capnp::capability::Promise; -use capnp::Error; - -use crate::db::machine::Status; -use crate::api::machine::*; -use crate::schema::machine_capnp::machine::MachineState; -use crate::schema::machinesystem_capnp::machine_system; -use crate::schema::machinesystem_capnp::machine_system::info as machines; -use crate::network::Network; -use crate::db::user::UserId; -use crate::db::access::{PermRule, admin_perm}; -use crate::connection::Session; - -/// An implementation of the `Machines` API -#[derive(Clone)] -pub struct Machines { - session: Rc>>, - network: Arc, -} - -impl Machines { - pub fn new(session: Rc>>, network: Arc) -> Self { - Self { session, network } - } -} - -impl machine_system::Server for Machines { - // This function shouldn't exist. See fabaccess-api issue #16 - fn info(&mut self, - _:machine_system::InfoParams, - mut results: machine_system::InfoResults - ) -> capnp::capability::Promise<(), capnp::Error> - { - results.get().set_info(capnp_rpc::new_client(self.clone())); - Promise::ok(()) - } -} - -impl machines::Server for Machines { - fn get_machine_list(&mut self, - _params: machines::GetMachineListParams, - mut results: machines::GetMachineListResults) - -> Promise<(), Error> - { - let rc = Rc::clone(&self.session); - let session = self.session.borrow(); - if session.deref().is_some() { - let v: Vec<(String, crate::machine::Machine)> = self.network.machines.iter() - .filter(|(_name, machine)| { - let required_disclose = &machine.desc.privs.disclose; - for perm_rule in session.as_ref().unwrap().perms.iter() { - if perm_rule.match_perm(required_disclose) { - return true; - } - } - - false - }) - .map(|(n,m)| (n.clone(), m.clone())) - .collect(); - - let f = async move { - let session = rc.borrow(); - let user = &session.as_ref().unwrap().authzid; - let permissions = &session.as_ref().unwrap().perms; - - let mut machines = results.get().init_machine_list(v.len() as u32); - for (i, (name, machine)) in v.into_iter().enumerate() { - let perms = Perms::get_for(&machine.desc.privs, permissions.iter()); - - let mut builder = machines.reborrow().get(i as u32); - builder.set_name(&name); - if let Some(ref desc) = machine.desc.description { - builder.set_description(desc); - } - - let machineapi = Machine::new(user.clone(), perms, machine.clone()); - - let state = machine.get_status().await; - let s = match state { - Status::Free => MachineState::Free, - Status::Disabled => MachineState::Disabled, - Status::Blocked(_) => MachineState::Blocked, - Status::InUse(ref u) => { - if let Some(owner) = u.as_ref() { - if owner == user { - builder.set_inuse(capnp_rpc::new_client(machineapi.clone())); - } - } - - MachineState::InUse - }, - Status::Reserved(_) => MachineState::Reserved, - Status::ToCheck(_) => MachineState::ToCheck, - }; - builder.set_state(s); - - if perms.write && state == Status::Free { - builder.set_use(capnp_rpc::new_client(machineapi.clone())); - } - if perms.manage { - //builder.set_transfer(capnp_rpc::new_client(machineapi.clone())); - //builder.set_check(capnp_rpc::new_client(machineapi.clone())); - builder.set_manage(capnp_rpc::new_client(machineapi.clone())); - } - if permissions.iter().any(|r| r.match_perm(&admin_perm())) { - builder.set_admin(capnp_rpc::new_client(machineapi.clone())); - } - - builder.set_info(capnp_rpc::new_client(machineapi)); - } - - Ok(()) - }; - - Promise::from_future(f) - } else { - Promise::ok(()) - } - } - - fn get_machine(&mut self, - params: machines::GetMachineParams, - mut results: machines::GetMachineResults - ) -> Promise<(), capnp::Error> { - let rc = Rc::clone(&self.session); - if self.session.borrow().is_some() { - let name = { - let params = pry!(params.get()); - pry!(params.get_name()).to_string() - }; - - let network = self.network.clone(); - let f = async move { - let session = rc.borrow(); - let user = &session.as_ref().unwrap().authzid; - let permissions = &session.as_ref().unwrap().perms; - - if let Some(machine) = network.machines.get(&name) { - let mut builder = results.get().init_machine(); - let perms = Perms::get_for(&machine.desc.privs, permissions.iter()); - builder.set_name(&name); - if let Some(ref desc) = machine.desc.description { - builder.set_description(desc); - } - - let machineapi = Machine::new(user.clone(), perms, machine.clone()); - let state = machine.get_status().await; - if perms.write && state == Status::Free { - builder.set_use(capnp_rpc::new_client(machineapi.clone())); - } - if perms.manage { - //builder.set_transfer(capnp_rpc::new_client(machineapi.clone())); - //builder.set_check(capnp_rpc::new_client(machineapi.clone())); - builder.set_manage(capnp_rpc::new_client(machineapi.clone())); - } - if permissions.iter().any(|r| r.match_perm(&admin_perm())) { - builder.set_admin(capnp_rpc::new_client(machineapi.clone())); - } - - - let s = match machine.get_status().await { - Status::Free => MachineState::Free, - Status::Disabled => MachineState::Disabled, - Status::Blocked(_) => MachineState::Blocked, - Status::InUse(u) => { - if let Some(owner) = u.as_ref() { - if owner == user { - builder.set_inuse(capnp_rpc::new_client(machineapi.clone())); - } - } - MachineState::InUse - }, - Status::Reserved(_) => MachineState::Reserved, - Status::ToCheck(_) => MachineState::ToCheck, - }; - builder.set_state(s); - - builder.set_info(capnp_rpc::new_client(machineapi)); - }; - - Ok(()) - }; - Promise::from_future(f) - } else { - Promise::ok(()) - } - } -} diff --git a/src/api/user.rs b/src/api/user.rs deleted file mode 100644 index 381d46d..0000000 --- a/src/api/user.rs +++ /dev/null @@ -1,42 +0,0 @@ -use std::rc::Rc; -use std::cell::RefCell; -use std::ops::Deref; - -use crate::connection::Session; -use crate::db::user as db; -use crate::schema::user_capnp::user::*; - -#[derive(Clone)] -pub struct User { - session: Rc>>, -} - -impl User { - pub fn new(session: Rc>>) -> Self { - Self { session } - } - - pub fn fill_self(&self, builder: &mut Builder) { - if let Some(session) = self.session.borrow().deref() { - self.fill_userid(builder, &session.authzid); - } - } - - pub fn fill_with(&self, builder: &mut Builder, user: db::User) { - self.fill_userid(builder, &user.id) - } - - pub fn fill_userid(&self, builder: &mut Builder, uid: &db::UserId) { - builder.set_username(&uid.uid); - if let Some(ref realm) = &uid.realm { - let mut space = builder.reborrow().init_space(); - space.set_name(&realm); - } - } -} - -impl info::Server for User { - -} -impl manage::Server for User {} -impl admin::Server for User {} diff --git a/src/api/users.rs b/src/api/users.rs deleted file mode 100644 index 3258edd..0000000 --- a/src/api/users.rs +++ /dev/null @@ -1,106 +0,0 @@ -use std::cell::RefCell; -use std::rc::Rc; -use std::sync::Arc; -use std::ops::Deref; - -use capnp::capability::Promise; - -use crate::api::user::User; -use crate::connection::Session; -use crate::db::access::{PermRule, Permission}; -use crate::db::user::{UserId, Internal as UserDB}; -use crate::schema::usersystem_capnp::user_system; -use crate::schema::usersystem_capnp::user_system::{info, manage}; -use crate::error; - -#[derive(Clone, Debug)] -pub struct Users { - session: Rc>>, - userdb: Arc, -} - -impl Users { - pub fn new(session: Rc>>, userdb: Arc) -> Self { - Self { session, userdb } - } -} - -impl user_system::Server for Users { - fn info( - &mut self, - _: user_system::InfoParams, - mut results: user_system::InfoResults, - ) -> Promise<(), capnp::Error> { - results.get().set_info(capnp_rpc::new_client(self.clone())); - Promise::ok(()) - } - - fn manage( - &mut self, - _: user_system::ManageParams, - mut results: user_system::ManageResults, - ) -> Promise<(), capnp::Error> { - let perm: &Permission = Permission::new("bffh.users.manage"); - if let Some(session) = self.session.borrow().deref() { - if session.perms.iter().any(|rule| rule.match_perm(perm)) { - results - .get() - .set_manage(capnp_rpc::new_client(self.clone())); - } - } - - Promise::ok(()) - } -} - -impl info::Server for Users { - fn get_user_self( - &mut self, - _: info::GetUserSelfParams, - mut results: info::GetUserSelfResults, - ) -> Promise<(), capnp::Error> { - let user = User::new(Rc::clone(&self.session)); - let mut builder = results.get().init_user(); - user.fill_self(&mut builder); - Promise::ok(()) - } -} - -impl manage::Server for Users { - fn get_user_list( - &mut self, - _: manage::GetUserListParams, - mut results: manage::GetUserListResults, - ) -> Promise<(), capnp::Error> { - let result: Result<(), error::Error> = - self.userdb.list_users() - .and_then(|users| { - let mut builder = results.get().init_user_list(users.len() as u32); - let u = User::new(Rc::clone(&self.session)); - for (i, user) in users.into_iter().enumerate() { - let mut b = builder.reborrow().get(i as u32); - u.fill_with(&mut b, user); - } - Ok(()) - }); - - match result { - Ok(()) => Promise::ok(()), - Err(e) => Promise::err(capnp::Error::failed("User lookup failed: {}".to_string())), - } - } - - /*fn add_user( - &mut self, - params: manage::AddUserParams, - mut results: manage::AddUserResults, - ) -> Promise<(), capnp::Error> { - } - - fn remove_user( - &mut self, - _: manage::RemoveUserParams, - mut results: manage::RemoveUserResults, - ) -> Promise<(), capnp::Error> { - }*/ -} diff --git a/src/builtin.rs b/src/builtin.rs deleted file mode 100644 index d867643..0000000 --- a/src/builtin.rs +++ /dev/null @@ -1,35 +0,0 @@ -use lazy_static::lazy_static; -use crate::db::access::Permission; - -lazy_static! { - static ref AUTH_PERM: &'static Permission = Permission::new("bffh.auth"); -} - -// -// lazy_static! { -// pub static ref AUTH_ROLE: RoleIdentifier = { -// RoleIdentifier::Local { -// name: "mayauth".to_string(), -// source: "builtin".to_string(), -// } -// }; -// } -// -// lazy_static! { -// pub static ref DEFAULT_ROLEIDS: [RoleIdentifier; 1] = { -// [ AUTH_ROLE.clone(), ] -// }; -// -// pub static ref DEFAULT_ROLES: HashMap = { -// let mut m = HashMap::new(); -// m.insert(AUTH_ROLE.clone(), -// Role { -// parents: vec![], -// permissions: vec![ -// PermRule::Base(PermissionBuf::from_perm(AUTH_PERM)), -// ] -// } -// ); -// m -// }; -// } diff --git a/src/connection.rs b/src/connection.rs deleted file mode 100644 index deaef3a..0000000 --- a/src/connection.rs +++ /dev/null @@ -1,89 +0,0 @@ -use futures::FutureExt; -use std::future::Future; -use std::sync::Arc; - -use slog::Logger; - -use smol::lock::Mutex; -use smol::net::TcpStream; - -use crate::error::Result; - -use capnp_rpc::{rpc_twoparty_capnp, twoparty}; - - -#[derive(Debug)] -/// Connection context -// TODO this should track over several connections -pub struct Session { - // Session-spezific log - pub log: Logger, - - /// User this session has been authorized as. - /// - /// Slightly different than the authnid which indicates as what this session has been - /// authenticated as (e.g. using EXTERNAL auth the authnid would be the CN of the client - /// certificate, but the authzid would be an user) - pub authzid: UserId, - - pub authnid: String, - - /// Roles this session has been assigned via group memberships, direct role assignment or - /// authentication types - pub roles: Box<[RoleIdentifier]>, - - /// Permissions this session has. - /// - /// This is a snapshot of the permissions the underlying user has - /// take at time of creation (i.e. session establishment) - pub perms: Box<[PermRule]>, -} - -impl Session { - pub fn new( - log: Logger, - authzid: UserId, - authnid: String, - roles: Box<[RoleIdentifier]>, - perms: Box<[PermRule]>, - ) -> Self { - Session { - log, - authzid, - authnid, - roles, - perms, - } - } -} - -pub struct ConnectionHandler { - log: Logger, - db: Databases, - network: Arc, -} - -impl ConnectionHandler { - pub fn new(log: Logger, db: Databases, network: Arc) -> Self { - Self { log, db, network } - } - - pub fn handle(&mut self, stream: TcpStream) -> impl Future> { - info!(self.log, "New connection from on {:?}", stream); - let boots = Bootstrap::new(self.log.new(o!()), self.db.clone(), self.network.clone()); - unimplemented!(); - /*let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(boots); - - let network = twoparty::VatNetwork::new( - stream.clone(), - stream, - rpc_twoparty_capnp::Side::Server, - Default::default(), - ); - let rpc_system = capnp_rpc::RpcSystem::new(Box::new(network), Some(rpc.client)); - - // Convert the error type to one of our errors - rpc_system.map(|r| r.map_err(Into::into)) - */ - } -} diff --git a/src/db/mod.rs b/src/db.rs similarity index 100% rename from src/db/mod.rs rename to src/db.rs diff --git a/src/initiator.rs b/src/initiator.rs deleted file mode 100644 index e6bc689..0000000 --- a/src/initiator.rs +++ /dev/null @@ -1,207 +0,0 @@ -use std::sync::Arc; -use std::pin::Pin; -use std::task::{Poll, Context}; -use std::future::Future; -use std::collections::HashMap; - -use smol::Timer; - -use slog::Logger; - -use futures::future::BoxFuture; - -use futures_signals::signal::{Signal, Mutable, MutableSignalCloned}; -use crate::machine::{Machine, ReturnToken}; -use crate::db::machine::MachineState; -use crate::db::user::{User, UserId, UserData, Internal as UserDB}; -use crate::db::access::AccessControl; - -use crate::network::InitMap; - -use crate::error::Result; -use crate::config::Config; - -pub trait Sensor { - fn run_sensor(&mut self) -> BoxFuture<'static, (Option, MachineState)>; -} - -type BoxSensor = Box; - -pub struct Initiator { - log: Logger, - signal: MutableSignalCloned>, - machine: Option, - future: Option, MachineState)>>, - // TODO: Prepare the init for async state change requests. - state_change_fut: Option>>, - token: Option, - sensor: BoxSensor, - - userdb: UserDB, - access: AccessControl, -} - -impl Initiator { - pub fn new(log: Logger, sensor: BoxSensor, signal: MutableSignalCloned>, userdb: UserDB, access: AccessControl) -> Self { - Self { - log: log, - signal: signal, - machine: None, - future: None, - state_change_fut: None, - token: None, - sensor: sensor, - userdb, - access, - } - } - - pub fn wrap(log: Logger, sensor: BoxSensor, userdb: UserDB, access: Arc) -> (Mutable>, Self) { - let m = Mutable::new(None); - let s = m.signal_cloned(); - - (m, Self::new(log, sensor, s, userdb, access)) - } -} - -impl Future for Initiator { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let mut this = &mut *self; - - // First of course, see what machine we should work with. - match Signal::poll_change(Pin::new(&mut this.signal), cx) { - Poll::Pending => { } - Poll::Ready(None) => return Poll::Ready(()), - // Keep in mind this is actually an Option - Poll::Ready(Some(machine)) => { - - match machine.as_ref().map(|m| m.try_lock()) { - None => info!(this.log, "Deinstalled machine"), - Some(None) => info!(this.log, "Installed new machine with locked mutex!"), - Some(Some(g)) => info!(this.log, "Installed new machine {}", g.id), - } - - this.machine = machine; - }, - } - - // Do as much work as we can: - loop { - // Always poll the state change future first - if let Some(ref mut f) = this.state_change_fut { - match Future::poll(Pin::new(f), cx) { - // If there is a state change future and it would block we return early - Poll::Pending => { - debug!(this.log, "State change blocked"); - return Poll::Pending; - }, - Poll::Ready(Ok(tok)) => { - debug!(this.log, "State change returned ok"); - // Explicity drop the future - let _ = this.state_change_fut.take(); - - // Store the given return token for future use - this.token.replace(tok); - } - Poll::Ready(Err(e)) => { - info!(this.log, "State change returned err: {}", e); - // Explicity drop the future - let _ = this.state_change_fut.take(); - } - } - } - - // If there is a future, poll it - match this.future.as_mut().map(|future| Future::poll(Pin::new(future), cx)) { - None => { - this.future = Some(this.sensor.run_sensor()); - }, - Some(Poll::Ready((uid, state))) => { - debug!(this.log, "Sensor returned a new state"); - this.future.take(); - let f = this.machine.as_mut().map(|machine| { - machine.request_state_change(state, this.access.clone(), user) - }); - this.state_change_fut = f; - } - Some(Poll::Pending) => return Poll::Pending, - } - } - } -} - -pub fn load(log: &Logger, config: &Config, userdb: UserDB, access: Arc) -> Result<(InitMap, Vec)> { - let mut map = HashMap::new(); - - let initiators = config.initiators.iter() - .map(|(k,v)| (k, load_single(log, k, &v.module, &v.params))) - .filter_map(|(k,n)| match n { - None => None, - Some(i) => Some((k, i)), - }); - - let mut v = Vec::new(); - for (name, initiator) in initiators { - let (m, i) = Initiator::wrap(log.new(o!("name" => name.clone())), initiator, userdb.clone(), access.clone()); - map.insert(name.clone(), m); - v.push(i); - } - - Ok((map, v)) -} - -fn load_single( - log: &Logger, - name: &String, - module_name: &String, - _params: &HashMap - ) -> Option -{ - match module_name.as_ref() { - "Dummy" => { - Some(Box::new(Dummy::new(log))) - }, - _ => { - error!(log, "No initiator found with name \"{}\", configured as \"{}\"", - module_name, name); - None - } - } -} - -pub struct Dummy { - log: Logger, - step: bool, -} - -impl Dummy { - pub fn new(log: &Logger) -> Self { - Self { log: log.new(o!("module" => "Dummy Initiator")), step: false } - } -} - -impl Sensor for Dummy { - fn run_sensor(&mut self) - -> BoxFuture<'static, (Option, MachineState)> - { - let step = self.step; - self.step = !step; - - info!(self.log, "Kicking off new dummy initiator state change: {}", step); - - let f = async move { - Timer::after(std::time::Duration::from_secs(1)).await; - if step { - return (None, MachineState::free()); - } else { - let user = UserId::new("test".to_string(), None, None); - return (Some(user), MachineState::used(Some(user))); - } - }; - - Box::pin(f) - } -} - diff --git a/src/lib.rs b/src/lib.rs index f200d37..394aec4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,15 +11,14 @@ //mod server; //mod actor; //mod initiator; -mod space; +mod config; +mod db; +mod error; +mod network; +mod oid; +mod permissions; mod resource; mod schema; mod state; -pub mod db; -mod network; -pub mod oid; -mod varint; -pub mod error; -pub mod config; -mod permissions; \ No newline at end of file +mod varint; \ No newline at end of file diff --git a/src/log.rs b/src/log.rs deleted file mode 100644 index 0b0715b..0000000 --- a/src/log.rs +++ /dev/null @@ -1,11 +0,0 @@ -use slog::{Drain, Logger}; -use slog_async; -use slog_term::{TermDecorator, FullFormat}; - -pub fn init() -> Logger { - let decorator = TermDecorator::new().build(); - let drain = FullFormat::new(decorator).build().fuse(); - let drain = slog_async::Async::new(drain).build().fuse(); - - return slog::Logger::root(drain, o!()); -} diff --git a/src/machine.rs b/src/machine.rs deleted file mode 100644 index 04cd2ec..0000000 --- a/src/machine.rs +++ /dev/null @@ -1,363 +0,0 @@ -use std::ops::Deref; -use std::iter::FromIterator; -use std::sync::Arc; -use std::path::Path; -use std::task::{Poll, Context}; -use std::pin::Pin; -use std::future::Future; - -use std::collections::HashMap; -use std::fs; - -use serde::{Serialize, Deserialize}; - -use futures::Stream; -use futures::future::BoxFuture; -use futures::channel::{mpsc, oneshot}; - -use futures_signals::signal::Signal; -use futures_signals::signal::SignalExt; -use futures_signals::signal::{Mutable, ReadOnlyMutable}; - -use crate::error::{Result, Error}; - -use crate::db::access::{AccessControl, PrivilegesBuf, PermissionBuf}; -use crate::db::machine::{MachineIdentifier, MachineState, Status}; -use crate::db::user::{User, UserData, UserId}; - -use crate::space; - -pub struct Machines { - machines: Vec -} - -#[derive(Debug, Clone)] -pub struct Index { - inner: HashMap, -} - -impl Index { - pub fn new() -> Self { - Self { - inner: HashMap::new(), - } - } - - pub fn insert(&mut self, key: String, value: Machine) -> Option { - self.inner.insert(key, value) - } - - pub fn get(&mut self, key: &String) -> Option { - self.inner.get(key).map(|m| m.clone()) - } -} - -// Access data of one machine efficiently, using getters/setters for data stored in LMDB backed -// memory -#[derive(Debug, Clone)] -pub struct Machine { - pub id: uuid::Uuid, - pub desc: MachineDescription, - - inner: Arc>, -} - -impl Machine { - pub fn new(inner: Inner, desc: MachineDescription, ) -> Self { - Self { - id: uuid::Uuid::default(), - inner: Arc::new(Mutex::new(inner)), - desc, - } - } - - pub fn construct - ( id: MachineIdentifier - , desc: MachineDescription - , state: MachineState - ) -> Machine - { - Self::new(Inner::new(id, state), desc) - } - - fn match_perm(&self, status: &Status) -> Option<&PermissionBuf> { - let p = self.desc.privs; - match status { - // If you were allowed to use it you're allowed to give it back - Status::Free - | Status::ToCheck(_) - => None, - - Status::Blocked(_) - | Status::Disabled - | Status::Reserved(_) - => Some(&p.manage), - - Status::InUse(_) => Some(&p.write), - } - } - - pub fn request_state_change(&self, new_state: MachineState, access: AccessControl, user: &User) - -> BoxFuture<'static, Result<()>> - { - let this = self.clone(); - let perm = self.match_perm(&new_state.state); - let grant = perm.map(|p| access.check(&user.data, p).unwrap_or(false)); - - let uid = user.id.clone(); - // is it a return - let is_ret = new_state.state == Status::Free; - // is it a (normal) write /the user is allowed to do/? - let is_wri = new_state.state == Status::InUse(Some(uid)) - && access.check(&user.data, self.desc.privs.write).unwrap_or(false); - - let f = async move { - let mut guard = this.inner.lock().await; - // either e.g. InUse() => Free or I'm allowed to overwrite - if (is_ret && guard.is_self(uid)) - || (is_wri && guard.is_free()) - || grant.unwrap_or(false) - { - guard.do_state_change(new_state); - } - return Ok(()) - }; - - Box::pin(f) - } - - pub fn do_state_change(&self, new_state: MachineState) - -> BoxFuture<'static, Result<()>> - { - let this = self.clone(); - - let f = async move { - let mut guard = this.inner.lock().await; - guard.do_state_change(new_state); - return Ok(()) - }; - - Box::pin(f) - } - - pub async fn get_status(&self) -> Status { - let guard = self.inner.lock().await; - guard.state.get_cloned().state - } - - pub fn signal(&self) -> impl Signal { - let guard = self.inner.try_lock().unwrap(); - guard.signal() - } - - pub fn get_inner(&self) -> Arc> { - self.inner.clone() - } -} - -impl Deref for Machine { - type Target = Mutex; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} - - -#[derive(Debug)] -/// Internal machine representation -/// -/// A machine connects an event from a sensor to an actor activating/deactivating a real-world -/// machine, checking that the user who wants the machine (de)activated has the required -/// permissions. -/// -/// Machines have a rather complex state machine since they have to be eventually consistent and -/// can fail at any point in time (e.g. because power cuts out suddenly, a different task on this -/// thread panics, some loaded code produces a segfault, ...) -pub struct Inner { - /// Globally unique machine readable identifier - pub id: MachineIdentifier, - - /// The state of the machine as bffh thinks the machine *should* be in. - /// - /// This is a Signal generator. Subscribers to this signal will be notified of changes. In the - /// case of an actor it should then make sure that the real world matches up with the set state - state: Mutable, - reset: Option, -} - -impl Inner { - pub fn new ( id: MachineIdentifier - , state: MachineState - ) -> Inner - { - Inner { - id, - state: Mutable::new(state), - reset: None, - } - } - - /// Generate a signal from the internal state. - /// - /// A signal is a lossy stream of state changes. Lossy in that if changes happen in quick - /// succession intermediary values may be lost. But this isn't really relevant in this case - /// since the only relevant state is the latest one. - pub fn signal(&self) -> impl Signal { - // dedupe ensures that if state is changed but only changes to the value it had beforehand - // (could for example happen if the machine changes current user but stays activated) no - // update is sent. - Box::pin(self.state.signal_cloned().dedupe_cloned()) - } - - pub fn do_state_change(&mut self, new_state: MachineState) { - let old_state = self.state.replace(new_state); - self.reset.replace(old_state); - } - - pub fn read_state(&self) -> ReadOnlyMutable { - self.state.read_only() - } - - pub fn get_signal(&self) -> impl Signal { - self.state.signal_cloned() - } - - pub fn reset_state(&mut self) { - if let Some(state) = self.reset.take() { - self.state.replace(state); - } - } - - pub fn is_self(&mut self, uid: UserId) -> bool { - match self.read_state().get_cloned().state { - Status::InUse(u) if u == uid => true, - _ => false, - } - } - - pub fn is_free(&mut self) -> bool { - match self.read_state().get_cloned().state { - Status::Free => true, - _ => false, - } - } -} - -//pub type ReturnToken = futures::channel::oneshot::Sender<()>; -pub struct ReturnToken { - f: Option>, -} - -impl ReturnToken { - pub fn new(inner: Arc>) -> Self { - let f = async move { - let mut guard = inner.lock().await; - guard.reset_state(); - }; - - Self { f: Some(Box::pin(f)) } - } -} - -impl Future for ReturnToken { - type Output = (); // FIXME: This should probably be a Result<(), Error> - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let mut this = &mut *self; - - match this.f.as_mut().map(|f| Future::poll(Pin::new(f), cx)) { - None => Poll::Ready(()), // TODO: Is it saner to return Pending here? This can only happen after the future completed - Some(Poll::Pending) => Poll::Pending, - Some(Poll::Ready(())) => { - let _ = this.f.take(); // Remove the future to not poll after completion - Poll::Ready(()) - } - } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -/// A description of a machine -/// -/// This is the struct that a machine is serialized to/from. -/// Combining this with the actual state of the system will return a machine -pub struct MachineDescription { - /// The name of the machine. Doesn't need to be unique but is what humans will be presented. - pub name: String, - /// An optional description of the Machine. - pub description: Option, - - /// The permission required - #[serde(flatten)] - pub privs: PrivilegesBuf, -} - -impl MachineDescription { - pub fn load_file>(path: P) -> Result> { - let content = fs::read(path)?; - Ok(toml::from_slice(&content[..])?) - } -} - -pub fn load(config: &crate::config::Config) - -> Result -{ - let mut map = config.machines.clone(); - - let it = map.drain() - .map(|(k,v)| { - // TODO: Read state from the state db - (v.name.clone(), Machine::construct(k, v, MachineState::new())) - }); - - - Ok(HashMap::from_iter(it)) -} - -#[cfg(test_DISABLED)] -mod tests { - use super::*; - use std::iter::FromIterator; - - use crate::db::access::{PermissionBuf, PrivilegesBuf}; - - #[test] - fn load_examples_descriptions_test() { - let mut machines = MachineDescription::load_file("examples/machines.toml") - .expect("Couldn't load the example machine defs. Does `examples/machines.toml` exist?"); - - let expected = - vec![ - (Uuid::parse_str("e5408099-d3e5-440b-a92b-3aabf7683d6b").unwrap(), - MachineDescription { - name: "Somemachine".to_string(), - description: None, - privs: PrivilegesBuf { - disclose: PermissionBuf::from_string("lab.some.disclose".to_string()), - read: PermissionBuf::from_string("lab.some.read".to_string()), - write: PermissionBuf::from_string("lab.some.write".to_string()), - manage: PermissionBuf::from_string("lab.some.admin".to_string()), - }, - }), - (Uuid::parse_str("eaabebae-34d1-4a3a-912a-967b495d3d6e").unwrap(), - MachineDescription { - name: "Testmachine".to_string(), - description: Some("An optional description".to_string()), - privs: PrivilegesBuf { - disclose: PermissionBuf::from_string("lab.test.read".to_string()), - read: PermissionBuf::from_string("lab.test.read".to_string()), - write: PermissionBuf::from_string("lab.test.write".to_string()), - manage: PermissionBuf::from_string("lab.test.admin".to_string()), - }, - }), - ]; - - for (id, machine) in expected.into_iter() { - - assert_eq!(machines.remove(&id).unwrap(), machine); - } - - assert!(machines.is_empty()); - } -} diff --git a/src/modules.rs b/src/modules.rs deleted file mode 100644 index d95aa42..0000000 --- a/src/modules.rs +++ /dev/null @@ -1,15 +0,0 @@ -//! Indpendent Communication modules -//! -//! This is where dynamic modules are implemented later on using libloading / abi_stable_crates et -//! al. -//! Additionally, FFI modules to other languages (Python/Lua/...) make the most sense in here as -//! well. - -mod shelly; -pub use shelly::Shelly; - -mod process; -pub use process::Process; - -mod batch; -pub use batch::Batch; diff --git a/src/modules/batch.rs b/src/modules/batch.rs deleted file mode 100644 index db6327e..0000000 --- a/src/modules/batch.rs +++ /dev/null @@ -1,171 +0,0 @@ -use std::io::{Read, Write}; -use std::pin::Pin; -use std::cell::RefCell; - -use std::collections::HashMap; -use std::process::Stdio; -use smol::process::{Command, Child}; -use smol::io::{AsyncWrite, AsyncWriteExt, AsyncReadExt}; - -use futures::future::{Future, FutureExt}; - -use crate::actor::Actuator; -use crate::initiator::Sensor; -use crate::db::machine::{MachineState, Status}; -use crate::db::user::{User, Internal as UserDB}; -use futures::future::BoxFuture; - -use slog::Logger; - -use serde::{Serialize, Deserialize}; - -pub struct Batch { - log: Logger, - userdb: UserDB, - name: String, - cmd: String, - args: Vec, - kill: bool, - child: Child, - stdout: Pin>, -} - -impl Batch { - pub fn new(log: Logger, name: String, params: &HashMap, userdb: UserDB) - -> Option - { - let cmd = params.get("cmd").map(|s| s.to_string())?; - let args = params.get("args").map(|argv| - argv.split_whitespace() - .map(|s| s.to_string()) - .collect()) - .unwrap_or_else(Vec::new); - - let kill = params - .get("kill_on_exit") - .and_then(|kill| - kill.parse() - .or_else(|_| { - warn!(log, "Can't parse `kill_on_exit` for {} set as {} as boolean. \ - Must be either \"True\" or \"False\".", &name, &s); - Ok(false) - }) - .ok()) - .unwrap_or(false); - - info!(log, "Starting {} ({})…", &name, &cmd); - let mut child = Self::start(&name, &cmd, &args) - .map_err(|err| error!(log, "Failed to spawn {} ({}): {}", &name, &cmd, err)) - .ok()?; - let stdout = Self::get_stdout(&mut child); - - Ok(Self { log, userdb, name, cmd, args, kill, child, stdout }) - } - - fn start(name: &String, cmd: &String, args: &Vec) -> std::io::Result { - let mut command = Command::new(cmd); - command - .stdin(Stdio::piped()) - .stdout(Stdio::null()) - .stderr(Stdio::piped()) - .args(args.iter()) - .arg(name); - - command - .spawn() - } - - fn get_stdout(child: &mut Child) -> Pin> { - let stdout = child.stdout.expect("Actor child has closed stdout"); - stdout.boxed_writer() - } - - fn maybe_restart(&mut self, f: &mut Option>) -> bool { - let stat = self.child.try_status(); - if stat.is_err() { - error!(self.log, "Can't check process for {} ({}) [{}]: {}", - &self.name, &self.cmd, self.child.id(), stat.unwrap_err()); - return false; - } - if let Some(status) = stat.unwrap() { - warn!(self.log, "Process for {} ({}) exited with code {}", - &self.name, &self.cmd, status); - let errlog = self.log.new(o!("pid" => self.child.id())); - // If we have any stderr try to log it - if let Some(stderr) = self.child.stderr.take() { - *f = Some(Box::pin(async move { - let mut out = String::new(); - match stderr.read_to_string(&mut out).await { - Err(e) => warn!(errlog, "Failed to read child stderr: {}", e), - Ok(n) => if n != 0 { - let errstr = String::from_utf8_lossy(out); - for line in errstr.lines() { - warn!(errlog, "{}", line); - } - } - } - })); - } - info!(self.log, "Attempting to re-start {}", &self.name); - let mut child = Self::start(&self.name, &self.cmd, &self.args) - .map_err(|err| error!(self.log, "Failed to spawn {} ({}): {}", &self.name, &self.cmd, err)) - .ok(); - // Nothing else to do with the currect architecture. In reality we should fail here - // because we *didn't apply* the change. - if child.is_none() { - false - } - self.child = child.unwrap(); - } - - true - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct StateChangeObj { - name: String, - state: MachineState, -} - -impl StateChangeObj { - pub fn new(name: String, state: MachineState) -> Self { - Self { name, state } - } -} - -impl Actuator for Batch { - fn apply(&mut self, state: MachineState) -> BoxFuture<'static, ()> { - debug!(self.log, "Giving {} ({}) new state: {:?}", &self.name, &self.cmd, &state); - - let mut f = None; - if !self.maybe_restart(&mut f) { - return Box::pin(futures::future::ready(())); - } - - let mut json = String::new(); - // Per default compact - let ser = serde_json::ser::Serializer::new(&mut json); - - let change = StateChangeObj::new(self.name.clone(), state); - change.serialize(&mut ser); - - // Verify that this "line" does not contain any whitespace. - debug_assert!(!json.chars().any(|c| c == "\n")); - - let stdin = self.child.stdin.take().expect("Batch actor child has closed stdin?!"); - - let errlog = self.log.new(o!("pid" => self.child.id())); - let g = async move { - if let Some(f) = f { - f.await; - } - - if let Err(e) = stdin.write(json).await { - error!(errlog, "Failed to send statechange to child: {}", e); - } - }; - - Box::pin(g); - } -} diff --git a/src/modules/process.rs b/src/modules/process.rs deleted file mode 100644 index bcbfd38..0000000 --- a/src/modules/process.rs +++ /dev/null @@ -1,92 +0,0 @@ -use std::collections::HashMap; -use std::process::Stdio; -use smol::process::Command; - -use futures::future::FutureExt; - -use crate::actor::Actuator; -use crate::db::machine::{MachineState, Status}; -use futures::future::BoxFuture; - -use slog::Logger; - -pub struct Process { - log: Logger, - name: String, - cmd: String, - args: Vec, -} - -impl Process { - pub fn new(log: Logger, name: String, params: &HashMap) -> Option { - let cmd = params.get("cmd").map(|s| s.to_string())?; - let args = params.get("args").map(|argv| - argv.split_whitespace() - .map(|s| s.to_string()) - .collect()) - .unwrap_or_else(Vec::new); - - Some(Self { log, name, cmd, args }) - } - - pub fn into_boxed_actuator(self) -> Box { - Box::new(self) - } -} - -impl Actuator for Process { - fn apply(&mut self, state: MachineState) -> BoxFuture<'static, ()> { - debug!(self.log, "Running {} ({}) for {:?}", &self.name, &self.cmd, &state); - let mut command = Command::new(&self.cmd); - command - .stdin(Stdio::null()) - .args(self.args.iter()) - .arg(&self.name); - - let fstate = state.state.clone(); - match state.state { - Status::Free => { - command.arg("free"); - } - Status::InUse(by) => { - command.arg("inuse"); - by.map(|user| command.arg(format!("{}", user))); - } - Status::ToCheck(by) => { - command.arg("tocheck") - .arg(format!("{}", by)); - } - Status::Blocked(by) => { - command.arg("blocked") - .arg(format!("{}", by)); - } - Status::Disabled => { command.arg("disabled"); }, - Status::Reserved(by) => { - command.arg("reserved") - .arg(format!("{}", by)); - } - } - - let flog = self.log.new(o!()); - let name = self.name.clone(); - Box::pin(command.output().map(move |res| match res { - Ok(retv) if retv.status.success() => { - trace!(flog, "Actor was successful"); - let outstr = String::from_utf8_lossy(&retv.stdout); - for line in outstr.lines() { - debug!(flog, "{}", line); - } - } - Ok(retv) => { - warn!(flog, "Actor {} returned nonzero status {} for State={:?}", name, retv.status, fstate); - if !retv.stderr.is_empty() { - let errstr = String::from_utf8_lossy(&retv.stderr); - for line in errstr.lines() { - warn!(flog, "{}", line); - } - } - } - Err(err) => { warn!(flog, "Actor {} failed to run cmd: {}", name, err); } - })) - } -} diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs deleted file mode 100644 index 3ae1975..0000000 --- a/src/modules/shelly.rs +++ /dev/null @@ -1,52 +0,0 @@ -use slog::Logger; - -use crate::db::machine::Status; - -use futures::prelude::*; -use futures::future::BoxFuture; - -use crate::actor::Actuator; -use crate::db::machine::MachineState; - -use paho_mqtt as mqtt; - -/// An actuator for a Shellie connected listening on one MQTT broker -/// -/// This actuator will toggle the shellie with the given `name`. -/// If you need to toggle shellies on multiple brokers you need multiple instanced of this -/// actuator with different clients. -pub struct Shelly { - log: Logger, - name: String, - client: mqtt::AsyncClient, -} - -impl Shelly { - pub fn new(log: Logger, name: String, client: mqtt::AsyncClient) -> Self { - debug!(log, "Starting shelly module for {}", &name); - Shelly { log, name, client, } - } - - /// Set the name to a new one. This changes the shelly that will be activated - pub fn set_name(&mut self, new_name: String) { - let log = self.log.new(o!("shelly_name" => new_name.clone())); - self.name = new_name; - self.log = log; - } -} - - -impl Actuator for Shelly { - fn apply(&mut self, state: MachineState) -> BoxFuture<'static, ()> { - info!(self.log, "Machine Status changed: {:?}", state); - let topic = format!("shellies/{}/relay/0/command", self.name); - let pl = match state.state { - Status::InUse(_) => "on", - _ => "off", - }; - let msg = mqtt::Message::new(topic, pl, 0); - let f = self.client.publish(msg).map(|_| ()); - - return Box::pin(f); - } -} diff --git a/src/server.rs b/src/server.rs deleted file mode 100644 index a50ef99..0000000 --- a/src/server.rs +++ /dev/null @@ -1,176 +0,0 @@ -use slog::Logger; - -use crate::config; -use crate::config::Config; -use crate::error::Error; -use crate::connection; - -use smol::net::TcpListener; -use smol::net::unix::UnixStream; -use smol::LocalExecutor; -use smol::Executor; - -use futures::prelude::*; - -use std::io; - -use std::sync::Arc; - -use std::os::unix::io::AsRawFd; -use signal_hook::low_level::pipe as sigpipe; - -use crate::db::Databases; -use crate::network::Network; - -/// Handle all API connections and run the RPC tasks spawned from that on the local thread. -pub fn serve_api_connections(log: Arc, config: Config, db: Databases, nw: Network, ex: Executor) - -> Result<(), Error> -{ - let signal = Box::pin(async { - let (tx, mut rx) = UnixStream::pair()?; - // Initialize signal handler. - // We currently only care about Ctrl-C so SIGINT it is. - // TODO: Make this do SIGHUP and a few others too. (By cloning the tx end of the pipe) - sigpipe::register(signal_hook::consts::SIGINT, tx.as_raw_fd())?; - // When a signal is received this future can complete and read a byte from the underlying - // socket — the actual data is discarded but the act of being able to receive data tells us - // that we received a SIGINT. - - // FIXME: What errors are possible and how to handle them properly? - rx.read_exact(&mut [0u8]).await?; - - io::Result::Ok(LoopResult::Stop) - }); - - // Bind to each address in config.listens. - // This is a Stream over Futures so it will do absolutely nothing unless polled to completion - let listeners_s: futures::stream::Collect<_, Vec> - = stream::iter((&config).listens.iter()) - .map(|l| { - let addr = l.address.clone(); - let port = l.port.unwrap_or(config::DEFAULT_PORT); - info!(&log, "Binding to {} port {}.", l.address.as_str(), &port); - TcpListener::bind((l.address.as_str(), port)) - // If the bind errors, include the address so we can log it - // Since this closure is lazy we need to have a cloned addr - .map_err(move |e| { (addr, port, e) }) - }) - // Filter out the sockets we couldn't open and log those - .filter_map(|f| async { - match f.await { - Ok(l) => Some(l), - Err((addr, port, e)) => { - error!(&log, "Could not setup socket on {} port {}: {}", addr, port, e); - None - } - } - }).collect(); - - let local_ex = LocalExecutor::new(); - - let network = Arc::new(nw); - - let inner_log = log.clone(); - let loop_log = log.clone(); - - let control_fut = async { - // Generate a stream of TcpStreams appearing on any of the interfaces we listen to - let listeners = listeners_s.await; - let incoming = stream::select_all(listeners.iter().map(|l| l.incoming())); - - // For each incoming connection start a new task to handle it - let handle_sockets = incoming.map(|socket| { - // incoming.next() returns an error when the underlying `accept` call yielded an error - // In POSIX those are protocol errors we can't really handle, so we just log the error - // and the move on - match socket { - Ok(socket) => { - // If we have it available add the peer's address to all log messages - let log = - if let Ok(addr) = socket.peer_addr() { - inner_log.new(o!("address" => addr)) - } else { - inner_log.new(o!()) - }; - - let db = db.clone(); - let network = network.clone(); - let tlog = inner_log.new(o!()); - std::thread::spawn(move || { - let local_ex = LocalExecutor::new(); - - let mut handler = connection::ConnectionHandler::new(tlog, db, network); - // We handle the error using map_err - let f = handler.handle(socket) - .map_err(move |e| { - error!(log, "Error occured during protocol handling: {}", e); - }) - // Void any and all results since pool.spawn allows no return value. - .map(|_| ()); - - // Spawn the connection context onto the local executor since it isn't Send - // Also `detach` it so the task isn't canceled as soon as it's dropped. - // TODO: Store all those tasks to have a easier way of managing them? - smol::block_on(f); - }); - }, - Err(e) => { - error!(inner_log, "Socket `accept` error: {}", e); - } - } - - // Unless we are overloaded we just want to keep going. - return LoopResult::Continue; - }); - - info!(&log, "Started"); - - // Check each signal as it arrives - let handle_signals = signal.map(|r| { r.unwrap() }).into_stream(); - - let mut combined = stream::select(handle_signals, handle_sockets); - - // This is the basic main loop that drives execution - loop { - match combined.next().await { - // When the result says to continue, do exactly that - Some(LoopResult::Continue) => {} - Some(LoopResult::Overloaded) => { - // In case over server overload we should install a replacement handler that - // would instead just return `overloaded` for all connections until the - // situation is remedied. - // - // For now, just log the overload and keep going. - error!(loop_log, "Server overloaded"); - } - // When the result says to stop the server, do exactly that. - // Also catches a `None` from the stream; None should never be returned because it - // would mean all sockets were closed and we can not receive any further signals. - // Still, in that case shut down cleanly anyway, the only reason this could happen - // are some heavy bugs in the runtime - Some(LoopResult::Stop) | None => { - warn!(loop_log, "Stopping server"); - break; - } - } - } - }; - - smol::block_on(smol::future::race(control_fut, ex.run(smol::future::pending()))); - - // TODO: Run actual shut down code here - info!(log, "Shutting down..."); - - // Returning () is an implicit success so this will properly set the exit code as well - Ok(()) -} - -/// The result of one iteration of the core loop -pub enum LoopResult { - /// Everything was fine, keep going - Continue, - /// Something happened that means we should shut down - Stop, - /// The Server is currently overloaded - Overloaded, -} diff --git a/src/space.rs b/src/space.rs deleted file mode 100644 index 6e28bce..0000000 --- a/src/space.rs +++ /dev/null @@ -1,9 +0,0 @@ -use uuid::Uuid; - -#[derive(Debug, Clone)] -pub struct Space { - pub id: Uuid, - pub name: String, - pub info: String, -} - diff --git a/src/state/mod.rs b/src/state.rs similarity index 100% rename from src/state/mod.rs rename to src/state.rs