diff --git a/examples/bffh.dhall b/examples/bffh.dhall index 3333dee..b5d892b 100644 --- a/examples/bffh.dhall +++ b/examples/bffh.dhall @@ -1,7 +1,13 @@ -- { actor_connections = [] : List { _1 : Text, _2 : Text } -{ actor_connections = [{ _1 = "Testmachine", _2 = "Actor" }] +{ actor_connections = + [ { _1 = "Testmachine", _2 = "Actor" } + , { _1 = "Another", _2 = "Actor2" } + , { _1 = "Yetmore", _2 = "Actor3" } + ] , actors = - { Actor = { module = "Shelly", params = {=} } + { Actor = { module = "Dummy", params = {=} } + , Actor2 = { module = "Dummy", params = {=} } + , Actor3 = { module = "Dummy", params = {=} } } , init_connections = [] : List { _1 : Text, _2 : Text } --, init_connections = [{ _1 = "Initiator", _2 = "Testmachine" }] @@ -11,6 +17,7 @@ , listens = [ { address = "127.0.0.1", port = Some 59661 } , { address = "::1", port = Some 59661 } + , { address = "192.168.0.114", port = Some 59661 } ] , machines = { Testmachine = @@ -38,12 +45,21 @@ , write = "lab.test.write" } } -, mqtt_url = "tcp://localhost:1883" +, mqtt_url = "" , db_path = "/tmp/bffh" , roles = - { Testrole = - { parents = [] : List Text - , permissions = [] : List Text + { testrole = + { permissions = [ "lab.test.*" ] } + , somerole = + { parents = ["testparent"] + , permissions = [ "lab.some.admin" ] + } + , testparent = + { permissions = + [ "lab.some.write" + , "lab.some.read" + , "lab.some.disclose" + ] } } } diff --git a/examples/users.toml b/examples/users.toml index 2d5ef75..3c4aa37 100644 --- a/examples/users.toml +++ b/examples/users.toml @@ -1,11 +1,13 @@ [Testuser] # Define them in roles.toml as well -roles = ["somerole/lmdb", "testrole/lmdb"] +roles = ["somerole/internal", "testrole/internal"] # If two or more users want to use the same machine at once the higher prio # wins priority = 0 +passwd = "secret" + # You can add whatever random data you want. # It will get stored in the `kv` field in UserData. noot = "noot!" diff --git a/src/actor.rs b/src/actor.rs index 1938cdd..8802615 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -126,11 +126,11 @@ impl Actuator for Dummy { } } -pub fn load(log: &Logger, client: &AsyncClient, config: &Config) -> Result<(ActorMap, Vec)> { +pub fn load(log: &Logger, config: &Config) -> Result<(ActorMap, Vec)> { let mut map = HashMap::new(); let actuators = config.actors.iter() - .map(|(k,v)| (k, load_single(log, client, k, &v.module, &v.params))) + .map(|(k,v)| (k, load_single(log, k, &v.module, &v.params))) .filter_map(|(k, n)| match n { None => None, Some(a) => Some((k, a)) @@ -149,7 +149,6 @@ pub fn load(log: &Logger, client: &AsyncClient, config: &Config) -> Result<(Acto fn load_single( log: &Logger, - client: &AsyncClient, name: &String, module_name: &String, params: &HashMap @@ -157,14 +156,8 @@ fn load_single( { use crate::modules::*; + info!(log, "Loading actor \"{}\" with module {} and params {:?}", name, module_name, params); match module_name.as_ref() { - "Shelly" => { - if !params.is_empty() { - warn!(log, "\"{}\" module expects no parameters. Configured as \"{}\".", - module_name, name); - } - Some(Box::new(Shelly::new(log, name.clone(), client.clone()))) - }, "Dummy" => { Some(Box::new(Dummy::new(log))) } diff --git a/src/api/auth.rs b/src/api/auth.rs index fc8178a..eb43429 100644 --- a/src/api/auth.rs +++ b/src/api/auth.rs @@ -33,7 +33,6 @@ use crate::db::user::{Internal as UserDB, UserId, User}; use crate::db::access::AccessControl as AccessDB; pub struct AppData { - passdb: Arc, userdb: Arc, } pub struct SessionData { @@ -93,7 +92,7 @@ impl Auth { pub fn new(log: Logger, dbs: Databases, session: Rc>>) -> Self { let mut ctx = SASL::new().unwrap(); - let appdata = Box::new(AppData { passdb: dbs.passdb.clone(), userdb: dbs.userdb.clone() }); + let appdata = Box::new(AppData { userdb: dbs.userdb.clone() }); ctx.store(appdata); ctx.install_callback::(); diff --git a/src/api/machine.rs b/src/api/machine.rs index bfd85d6..256a492 100644 --- a/src/api/machine.rs +++ b/src/api/machine.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Duration; use capnp::capability::Promise; use capnp::Error; @@ -102,23 +103,33 @@ impl use_::Server for Machine { let userid = self.userid.clone(); let f = async move { let mut guard = machine.lock().await; - match guard.read_state().lock_ref().state { - Status::Free => { - guard.do_state_change(MachineState::used(Some(userid))); - }, - Status::Reserved(ref whom) => { - // If it's reserved for us or we're allowed to take over - if &userid == whom { - guard.do_state_change(MachineState::used(Some(userid))); - } - }, - _ => { } + let mut ok = false; + { + match { guard.read_state().lock_ref().state.clone() } { + Status::Free => { + ok = true; + }, + Status::Reserved(ref whom) => { + // If it's reserved for us or we're allowed to take over + if &userid == whom { + ok = true; + } + }, + _ => { } + } + } + + if ok { + guard.do_state_change(MachineState::used(Some(userid))); } Ok(()) }; - Promise::from_future(f) + let g = smol::future::race(f, smol::Timer::after(Duration::from_secs(4)) + .map(|_| Err(capnp::Error::failed("Waiting for machine lock timed out!".to_string())))); + + Promise::from_future(g) } } @@ -132,13 +143,20 @@ impl in_use::Server for Machine { let userid = self.userid.clone(); let f = async move { let mut guard = machine.lock().await; - match guard.read_state().lock_ref().state { - Status::InUse(ref whom) => { - if &Some(userid) == whom { - guard.reset_state() - } - }, - _ => {} + let mut ok = false; + { + match { guard.read_state().lock_ref().state.clone() } { + Status::InUse(ref whom) => { + if &Some(userid) == whom { + ok = true; + } + }, + _ => { } + } + } + + if ok { + guard.reset_state() } Ok(()) diff --git a/src/api/machines.rs b/src/api/machines.rs index 6aae2c5..9633d82 100644 --- a/src/api/machines.rs +++ b/src/api/machines.rs @@ -152,7 +152,6 @@ impl machines::Server for Machines { let machineapi = Machine::new(user.clone(), perms, machine.clone()); if perms.write { builder.set_use(capnp_rpc::new_client(machineapi.clone())); - builder.set_inuse(capnp_rpc::new_client(machineapi.clone())); } if perms.manage { builder.set_transfer(capnp_rpc::new_client(machineapi.clone())); @@ -163,17 +162,25 @@ impl machines::Server for Machines { builder.set_admin(capnp_rpc::new_client(machineapi.clone())); } - builder.set_info(capnp_rpc::new_client(machineapi)); let s = match machine.get_status().await { Status::Free => MachineState::Free, Status::Disabled => MachineState::Disabled, Status::Blocked(_) => MachineState::Blocked, - Status::InUse(_) => MachineState::InUse, + Status::InUse(u) => { + if let Some(owner) = u.as_ref() { + if owner == user { + builder.set_inuse(capnp_rpc::new_client(machineapi.clone())); + } + } + MachineState::InUse + }, Status::Reserved(_) => MachineState::Reserved, Status::ToCheck(_) => MachineState::ToCheck, }; builder.set_state(s); + + builder.set_info(capnp_rpc::new_client(machineapi)); }; Ok(()) diff --git a/src/config.rs b/src/config.rs index a78cb5c..a820abc 100644 --- a/src/config.rs +++ b/src/config.rs @@ -44,8 +44,10 @@ pub struct Config { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RoleConfig { - parents: Vec, - permissions: Vec, + #[serde(default = "Vec::new")] + pub parents: Vec, + #[serde(default = "Vec::new")] + pub permissions: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/db.rs b/src/db.rs index 77d85ca..d4c1305 100644 --- a/src/db.rs +++ b/src/db.rs @@ -28,7 +28,6 @@ pub mod machine; pub struct Databases { pub access: Arc, pub machine: Arc, - pub passdb: Arc, pub userdb: Arc, } @@ -53,13 +52,10 @@ impl Databases { let permdb = access::init(log.new(o!("system" => "permissions")), &config, env.clone())?; let ac = access::AccessControl::new(permdb); - let passdb = pass::PassDB::init(log.new(o!("system" => "passwords")), env.clone()).unwrap(); - let userdb = user::init(log.new(o!("system" => "users")), &config, env.clone())?; Ok(Self { access: Arc::new(ac), - passdb: Arc::new(passdb), machine: Arc::new(mdb), userdb: Arc::new(userdb), }) diff --git a/src/db/access.rs b/src/db/access.rs index dae67d0..7ab54c5 100644 --- a/src/db/access.rs +++ b/src/db/access.rs @@ -1,6 +1,9 @@ //! Access control logic //! +use slog::Logger; +use std::sync::Arc; + use std::fmt; use std::collections::HashMap; use std::cmp::Ordering; @@ -15,11 +18,23 @@ use crate::error::Result; pub mod internal; +use crate::config::Config; use crate::db::user::UserData; -pub use internal::{init, Internal}; +pub use internal::Internal; pub struct AccessControl { - pub internal: Internal, + internal: HashMap, +} + +pub fn init(_log: Logger, config: &Config, _env: Arc) + -> std::result::Result, crate::error::Error> +{ + Ok(config.roles.iter().map(|(name, cfg)| { + let id = RoleIdentifier::new(name, "internal"); + let parents = cfg.parents.iter().map(|n| RoleIdentifier::new(n, "internal")).collect(); + let role = Role::new(parents, cfg.permissions.clone()); + (id, role) + }).collect()) } pub const ADMINPERM: &'static str = "bffh.admin"; @@ -28,7 +43,7 @@ pub fn admin_perm() -> &'static Permission { } impl AccessControl { - pub fn new(internal: Internal) -> Self { + pub fn new(internal: HashMap) -> Self { Self { internal, } @@ -39,7 +54,7 @@ impl AccessControl { } pub fn dump_roles(&self) -> Result> { - self.internal.dump_roles() + Ok(self.internal.iter().map(|(k,v)| (k.clone(), v.clone())).collect()) } } @@ -56,38 +71,6 @@ pub trait RoleDB { fn get_role(&self, role_id: &RoleIdentifier) -> Result>; - /// Check if a given user has the given permission - /// - /// Default implementation which adapter may overwrite with more efficient specialized - /// implementations. - fn check(&self, user: &UserData, perm: &Permission) -> Result { - self.check_roles(&user.roles, perm) - } - - /// Check if a given permission is granted by any of the given roles or their respective - /// parents - /// - /// A Default implementation exists which adapter may overwrite with more efficient specialized - /// implementations. - fn check_roles(&self, roles: &[RoleIdentifier], perm: &Permission) -> Result { - // Tally all roles. Makes dependent roles easier - let mut roleset = HashMap::new(); - for role_id in roles { - self.tally_role(&mut roleset, role_id)?; - } - - // Iter all unique role->permissions we've found and early return on match. - for (_roleid, role) in roleset.iter() { - for perm_rule in role.permissions.iter() { - if perm_rule.match_perm(&perm) { - return Ok(true); - } - } - } - - return Ok(false); - } - /// Tally a role dependency tree into a set /// /// A Default implementation exists which adapter may overwrite with more efficient @@ -125,6 +108,16 @@ pub trait RoleDB { } } +impl RoleDB for HashMap { + fn get_type_name(&self) -> &'static str { + "Internal" + } + + fn get_role(&self, role_id: &RoleIdentifier) -> Result> { + Ok(self.get(role_id).cloned()) + } +} + /// A "Role" from the Authorization perspective /// /// You can think of a role as a bundle of permissions relating to other roles. In most cases a @@ -162,6 +155,10 @@ impl Role { (RoleIdentifier::local_from_str("lmdb".to_string(), key), value) }))) } + + pub fn new(parents: Vec, permissions: Vec) -> Self { + Self { parents, permissions } + } } impl fmt::Display for Role { @@ -209,6 +206,15 @@ pub struct RoleIdentifier { source: SourceID, } +impl RoleIdentifier { + pub fn new<>(name: &str, source: &str) -> Self { + Self { name: name.to_string(), source: source.to_string() } + } + pub fn from_strings(name: String, source: String) -> Self { + Self { name, source } + } +} + impl fmt::Display for RoleIdentifier { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}/{}", self.name, self.source) diff --git a/src/db/access/internal.rs b/src/db/access/internal.rs index 5ce0458..d3748e3 100644 --- a/src/db/access/internal.rs +++ b/src/db/access/internal.rs @@ -145,11 +145,6 @@ impl RoleDB for Internal { "Internal" } - fn check(&self, user: &UserData, perm: &Permission) -> Result { - let txn = self.env.begin_ro_txn()?; - self._check(&txn, user, &perm) - } - fn get_role(&self, role_id: &RoleIdentifier) -> Result> { let txn = self.env.begin_ro_txn()?; self._get_role(&txn, role_id) diff --git a/src/db/user.rs b/src/db/user.rs index 541eeeb..b212647 100644 --- a/src/db/user.rs +++ b/src/db/user.rs @@ -115,11 +115,22 @@ pub fn load_file>(path: P) -> Result> { let f = fs::read(path)?; let mut map: HashMap = toml::from_slice(&f)?; - Ok(HashMap::from_iter(map.drain().map(|(uid, user_data)| + Ok(HashMap::from_iter(map.drain().map(|(uid, mut user_data)| { + user_data.passwd = user_data.passwd.map(|pw| if !pw.starts_with("$argon2") { + let config = argon2::Config::default(); + let salt: [u8; 16] = rand::random(); + let hash = argon2::hash_encoded(pw.as_bytes(), &salt, &config) + .expect(&format!("Failed to hash password for {}: ", uid)); + println!("Hashed pw for {} to {}", uid, hash); + + hash + } else { + pw + }); ( uid.clone() , User::new(UserId::new(uid, None, None), user_data) ) - ))) + }))) } pub fn init(log: Logger, _config: &Config, env: Arc) -> Result { diff --git a/src/initiator.rs b/src/initiator.rs index 03474c5..f9d9bf6 100644 --- a/src/initiator.rs +++ b/src/initiator.rs @@ -117,7 +117,8 @@ impl Future for Initiator { debug!(this.log, "Sensor returned a new state"); this.future.take(); let f = this.machine.as_mut().map(|machine| { - machine.request_state_change(user.as_ref(), state) + unimplemented!() + //machine.request_state_change(user.as_ref(), state) }); this.state_change_fut = f; } @@ -127,11 +128,11 @@ impl Future for Initiator { } } -pub fn load(log: &Logger, client: &AsyncClient, config: &Config) -> Result<(InitMap, Vec)> { +pub fn load(log: &Logger, config: &Config) -> Result<(InitMap, Vec)> { let mut map = HashMap::new(); let initiators = config.initiators.iter() - .map(|(k,v)| (k, load_single(log, client, k, &v.module, &v.params))) + .map(|(k,v)| (k, load_single(log, k, &v.module, &v.params))) .filter_map(|(k,n)| match n { None => None, Some(i) => Some((k, i)), @@ -149,7 +150,6 @@ pub fn load(log: &Logger, client: &AsyncClient, config: &Config) -> Result<(Init fn load_single( log: &Logger, - _client: &AsyncClient, name: &String, module_name: &String, _params: &HashMap diff --git a/src/machine.rs b/src/machine.rs index 1c25407..270056b 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -29,28 +29,10 @@ use crate::db::user::{User, UserData}; use crate::network::MachineMap; use crate::space; -// Registry of all machines configured. -// TODO: -// - Serialize machines into config -// - Deserialize machines from config -// - Index machines on deserialization to we can look them up efficiently -// - Maybe store that index too -// - Iterate over all or a subset of machines efficiently pub struct Machines { machines: Vec } -impl Machines { - /// Load machines from the config, looking up and linking the database entries as necessary - pub fn load() -> Self { - unimplemented!() - } - - pub fn lookup(id: String) -> Option { - unimplemented!() - } -} - #[derive(Debug, Clone)] pub struct Index { inner: HashMap, @@ -80,15 +62,13 @@ pub struct Machine { pub desc: MachineDescription, inner: Arc>, - access: Arc, } impl Machine { - pub fn new(inner: Inner, desc: MachineDescription, access: Arc) -> Self { + pub fn new(inner: Inner, desc: MachineDescription, ) -> Self { Self { id: uuid::Uuid::default(), inner: Arc::new(Mutex::new(inner)), - access, desc, } } @@ -97,83 +77,25 @@ impl Machine { ( id: MachineIdentifier , desc: MachineDescription , state: MachineState - , access: Arc ) -> Machine { - Self::new(Inner::new(id, state), desc, access) - } - - pub fn from_file>(path: P, access: Arc) - -> Result> - { - let mut map: HashMap = MachineDescription::load_file(path)?; - Ok(map.drain().map(|(id, desc)| { - Self::construct(id, desc, MachineState::new(), access.clone()) - }).collect()) - } - - /// Requests to use a machine. Returns a return token if successful. - /// - /// This will update the internal state of the machine, notifying connected actors, if any. - /// The return token is a channel that considers the machine 'returned' if anything is sent - /// along it or if the sending end gets dropped. Anybody who holds this token needs to check if - /// the receiving end was canceled which indicates that the machine has been taken off their - /// hands. - pub fn request_state_change(&self, who: Option<&User>, new_state: MachineState) - -> BoxFuture<'static, Result> - { - let this = self.clone(); - let perms: Vec = who - .map(|u| self.access.collect_permrules(&u.data)) - .and_then(|result| result.ok()) - .unwrap_or(vec![]); - let write: bool = perms.iter().any(|rule| rule.match_perm(&self.desc.privs.write)); - let manage: bool = perms.iter().any(|rule| rule.match_perm(&self.desc.privs.manage)); - - let f = async move { - match &new_state.state { - Status::Free => { - let mut guard = this.inner.lock().await; - guard.do_state_change(new_state); - return Ok(ReturnToken::new(this.inner.clone())); - }, - Status::InUse(_) | Status::ToCheck(_) if manage || write => { - let mut guard = this.inner.lock().await; - guard.do_state_change(new_state); - return Ok(ReturnToken::new(this.inner.clone())) - }, - Status::Blocked(_) | Status::Disabled | Status::Reserved(_) if manage => { - let mut guard = this.inner.lock().await; - guard.do_state_change(new_state); - return Ok(ReturnToken::new(this.inner.clone())) - }, - _ => { - return Err(Error::Denied); - } - } - }; - - Box::pin(f) + Self::new(Inner::new(id, state), desc) } pub fn do_state_change(&self, new_state: MachineState) - -> BoxFuture<'static, Result> + -> BoxFuture<'static, Result<()>> { let this = self.clone(); let f = async move { let mut guard = this.inner.lock().await; guard.do_state_change(new_state); - return Ok(ReturnToken::new(this.inner.clone())) + return Ok(()) }; Box::pin(f) } - pub fn create_token(&self) -> ReturnToken { - ReturnToken::new(self.inner.clone()) - } - pub async fn get_status(&self) -> Status { let guard = self.inner.lock().await; guard.state.get_cloned().state @@ -241,7 +163,9 @@ impl Inner { } pub fn do_state_change(&mut self, new_state: MachineState) { + print!("State {:?}", &new_state); let old_state = self.state.replace(new_state); + print!("<- {:?}", &old_state); self.reset.replace(old_state); } @@ -316,7 +240,7 @@ impl MachineDescription { } } -pub fn load(config: &crate::config::Config, access: Arc) +pub fn load(config: &crate::config::Config) -> Result { let mut map = config.machines.clone(); @@ -324,7 +248,7 @@ pub fn load(config: &crate::config::Config, access: Arc) let it = map.drain() .map(|(k,v)| { // TODO: Read state from the state db - (v.name.clone(), Machine::construct(k, v, MachineState::new(), access.clone())) + (v.name.clone(), Machine::construct(k, v, MachineState::new())) }); diff --git a/src/main.rs b/src/main.rs index 528f2b9..11faa80 100644 --- a/src/main.rs +++ b/src/main.rs @@ -69,7 +69,6 @@ fn main() { .help("Dump all databases into the given directory") .long("dump") .conflicts_with("load") - .takes_value(true) ) .arg(Arg::with_name("load") .help("Load databases from the given directory") @@ -145,6 +144,11 @@ fn maybe(matches: clap::ArgMatches, log: Arc) -> Result<(), Error> { for (id, role) in v.iter() { info!(log, "Role {}:\n{}", id, role); } + + let v = db.userdb.list_users()?; + for user in v.iter() { + info!(log, "User {}:\n{:?}", user.id, user.data); + } Ok(()) } else if matches.is_present("load") { let db = db::Databases::new(&log, &config)?; @@ -158,41 +162,34 @@ fn maybe(matches: clap::ArgMatches, log: Arc) -> Result<(), Error> { debug!(log, "Loaded users: {:?}", map); dir.pop(); - dir.push("roles.toml"); - db.access.internal.load_roles(&dir)?; - dir.pop(); - - dir.push("pass.toml"); - db.passdb.load_file(&dir); - dir.pop(); - Ok(()) } else { let ex = Executor::new(); let db = db::Databases::new(&log, &config)?; - let mqtt = AsyncClient::new(config.mqtt_url.clone())?; - let tok = mqtt.connect(paho_mqtt::ConnectOptions::new()); + //let mqtt = AsyncClient::new(config.mqtt_url.clone())?; + //let tok = mqtt.connect(paho_mqtt::ConnectOptions::new()); - smol::block_on(tok)?; + //smol::block_on(tok)?; - let machines = machine::load(&config, db.access.clone())?; - let (actor_map, actors) = actor::load(&log, &mqtt, &config)?; - let (init_map, initiators) = initiator::load(&log, &mqtt, &config)?; + let machines = machine::load(&config)?; + let (actor_map, actors) = actor::load(&log, &config)?; + let (init_map, initiators) = initiator::load(&log, &config)?; - // TODO: restore connections between initiators, machines, actors let mut network = network::Network::new(machines, actor_map, init_map); for (a,b) in config.actor_connections.iter() { if let Err(e) = network.connect_actor(a,b) { error!(log, "{}", e); } + info!(log, "[Actor] Connected {} to {}", a, b); } for (a,b) in config.init_connections.iter() { if let Err(e) = network.connect_init(a,b) { error!(log, "{}", e); } + info!(log, "[Initi] Connected {} to {}", a, b); } for actor in actors.into_iter() { @@ -202,22 +199,6 @@ fn maybe(matches: clap::ArgMatches, log: Arc) -> Result<(), Error> { ex.spawn(init).detach(); } - let (signal, shutdown) = async_channel::bounded::<()>(1); - let (_, r) = easy_parallel::Parallel::new() - .each(0..4, |_| smol::block_on(ex.run(shutdown.recv()))) - .finish(|| { - // TODO: Spawn api connections on their own (non-main) thread, use the main thread to - // handle signals (a cli if stdin is not closed?) and make it stop and clean up all threads - // when bffh should exit - let r = server::serve_api_connections(log.clone(), config, db, network); - - // One of them would be enough really, but *shrug* - signal.try_send(()); - std::mem::drop(signal); - - return r; - }); - - return r; + server::serve_api_connections(log.clone(), config, db, network, ex) } } diff --git a/src/network.rs b/src/network.rs index 3fcdec5..68d9a40 100644 --- a/src/network.rs +++ b/src/network.rs @@ -17,17 +17,17 @@ pub type InitMap = HashMap>>; #[derive(Debug, PartialEq, Eq)] pub enum Error { - NoSuchInitiator, - NoSuchMachine, - NoSuchActor, + NoSuchInitiator(String), + NoSuchMachine(String), + NoSuchActor(String), } impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - Error::NoSuchInitiator => write!(f, "No initiator found with that name"), - Error::NoSuchActor => write!(f, "No actor found with that name"), - Error::NoSuchMachine => write!(f, "No machine found with that name"), + Error::NoSuchInitiator(n) => write!(f, "No initiator \"{}\" found.", n), + Error::NoSuchActor(n) => write!(f, "No actor \"{}\" found.", n), + Error::NoSuchMachine(n) => write!(f, "No machine \"{}\" found.", n), } } } @@ -57,9 +57,9 @@ impl Network { pub fn connect_init(&self, init_key: &String, machine_key: &String) -> Result<()> { let init = self.inits.get(init_key) - .ok_or(Error::NoSuchInitiator)?; + .ok_or_else(|| Error::NoSuchInitiator(init_key.clone()))?; let machine = self.machines.get(machine_key) - .ok_or(Error::NoSuchMachine)?; + .ok_or_else(|| Error::NoSuchMachine(machine_key.clone()))?; init.set(Some(machine.clone())); Ok(()) @@ -69,13 +69,14 @@ impl Network { -> Result<()> { let machine = self.machines.get(machine_key) - .ok_or(Error::NoSuchMachine)?; + .ok_or_else(|| Error::NoSuchMachine(machine_key.clone()))?; let actor = self.actors.get(actor_key) - .ok_or(Error::NoSuchActor)?; + .ok_or_else(|| Error::NoSuchActor(actor_key.clone()))?; // FIXME Yeah this should not unwrap. Really, really shoudln't. let mut guard = actor.try_lock().unwrap(); - guard.try_send(Some(Box::new(machine.signal()))).map_err(|_| Error::NoSuchActor.into()) + guard.try_send(Some(Box::new(machine.signal()))) + .map_err(|_| Error::NoSuchActor(actor_key.clone()).into()) } } diff --git a/src/server.rs b/src/server.rs index 4c4a93f..a50ef99 100644 --- a/src/server.rs +++ b/src/server.rs @@ -8,6 +8,7 @@ use crate::connection; use smol::net::TcpListener; use smol::net::unix::UnixStream; use smol::LocalExecutor; +use smol::Executor; use futures::prelude::*; @@ -22,7 +23,7 @@ use crate::db::Databases; use crate::network::Network; /// Handle all API connections and run the RPC tasks spawned from that on the local thread. -pub fn serve_api_connections(log: Arc, config: Config, db: Databases, nw: Network) +pub fn serve_api_connections(log: Arc, config: Config, db: Databases, nw: Network, ex: Executor) -> Result<(), Error> { let signal = Box::pin(async { @@ -72,13 +73,11 @@ pub fn serve_api_connections(log: Arc, config: Config, db: Databases, nw let inner_log = log.clone(); let loop_log = log.clone(); - smol::block_on(local_ex.run(async { + let control_fut = async { // Generate a stream of TcpStreams appearing on any of the interfaces we listen to let listeners = listeners_s.await; let incoming = stream::select_all(listeners.iter().map(|l| l.incoming())); - let mut handler = connection::ConnectionHandler::new(inner_log.new(o!()), db, network.clone()); - // For each incoming connection start a new task to handle it let handle_sockets = incoming.map(|socket| { // incoming.next() returns an error when the underlying `accept` call yielded an error @@ -94,18 +93,26 @@ pub fn serve_api_connections(log: Arc, config: Config, db: Databases, nw inner_log.new(o!()) }; - // We handle the error using map_err - let f = handler.handle(socket) - .map_err(move |e| { - error!(log, "Error occured during protocol handling: {}", e); - }) - // Void any and all results since pool.spawn allows no return value. - .map(|_| ()); + let db = db.clone(); + let network = network.clone(); + let tlog = inner_log.new(o!()); + std::thread::spawn(move || { + let local_ex = LocalExecutor::new(); - // Spawn the connection context onto the local executor since it isn't Send - // Also `detach` it so the task isn't canceled as soon as it's dropped. - // TODO: Store all those tasks to have a easier way of managing them? - local_ex.spawn(f).detach(); + let mut handler = connection::ConnectionHandler::new(tlog, db, network); + // We handle the error using map_err + let f = handler.handle(socket) + .map_err(move |e| { + error!(log, "Error occured during protocol handling: {}", e); + }) + // Void any and all results since pool.spawn allows no return value. + .map(|_| ()); + + // Spawn the connection context onto the local executor since it isn't Send + // Also `detach` it so the task isn't canceled as soon as it's dropped. + // TODO: Store all those tasks to have a easier way of managing them? + smol::block_on(f); + }); }, Err(e) => { error!(inner_log, "Socket `accept` error: {}", e); @@ -147,7 +154,9 @@ pub fn serve_api_connections(log: Arc, config: Config, db: Databases, nw } } } - })); + }; + + smol::block_on(smol::future::race(control_fut, ex.run(smol::future::pending()))); // TODO: Run actual shut down code here info!(log, "Shutting down...");