From 9c4144ac6634a38e4413340530f45c6dbe220ed8 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Tue, 18 Feb 2020 16:55:19 +0100 Subject: [PATCH] Showcase version commit --- Cargo.toml | 2 +- src/access.rs | 51 ++++++++----- src/api.rs | 86 ++++++++++++++++----- src/auth.rs | 122 +++++++++++++++-------------- src/config.rs | 55 +++++++++++--- src/error.rs | 7 ++ src/log.rs | 3 +- src/machine.rs | 202 ++++++++++++++++++++++++++++++++----------------- src/main.rs | 24 +++--- 9 files changed, 368 insertions(+), 184 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4a53de0..678b276 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "bffh" +name = "diflouroborane" version = "0.1.0" authors = ["Gregor Reitzenstein "] edition = "2018" diff --git a/src/access.rs b/src/access.rs index 0d87576..a3e06ce 100644 --- a/src/access.rs +++ b/src/access.rs @@ -5,37 +5,50 @@ use slog::Logger; use casbin::prelude::*; -use super::config::Config; - use futures_signals::signal::Mutable; use crate::api::api; +use crate::config::Config; use crate::auth::Authentication; use crate::error::Result; +use std::rc::Rc; +use async_std::sync::{Arc, RwLock}; + +use std::ops::Deref; + +pub struct PermissionsProvider { + log: Logger, + pdb: Enforcer, +} + +impl PermissionsProvider { + pub fn new(log: Logger, pdb: Enforcer) -> Self { + Self { log, pdb } + } + + pub fn enforce(&self, actor: &str, object: &str, action: &str) -> Result { + let b = self.pdb.enforce(vec![actor, object, action])?; + Ok(b) + } +} + #[derive(Clone)] pub struct Permissions { - log: Logger, - pdb: Mutable, - auth: Authentication, + inner: Arc>, + auth: Rc, } impl Permissions { - pub fn new(log: Logger, pdb: Mutable, auth: Authentication) -> Self { - Self { log, pdb, auth } + pub fn new(inner: Arc>, auth: Rc) -> Self { + Self { inner, auth } } - pub fn enforce(&self, object: &str, action: &str) -> bool { - if let Some(actor) = self.auth.get_authzid() { - trace!(self.log, "Checking permission {} for {} on {}", action, actor, object); - let r = self.pdb.lock_ref().enforce(vec![&actor,object,action]).unwrap(); - if !r { - info!(self.log, "Failed permission {} for {} on {}", action, actor, object); - } - return r; + pub async fn enforce(&self, object: &str, action: &str) -> Result { + if let Some(actor) = self.auth.state.read().await.deref() { + self.inner.read().await.enforce(&actor, object, action) } else { - info!(self.log, "Attempted anonymous access: {} on {}", action, object); - false + Ok(false) } } } @@ -45,11 +58,11 @@ impl api::permissions::Server for Permissions { } /// This line documents init -pub async fn init(config: &Config) -> std::result::Result> { +pub async fn init(log: Logger, config: &Config) -> std::result::Result> { let model = Model::from_file(config.access.model.clone()).await?; let adapter = Box::new(FileAdapter::new(config.access.policy.clone())); let e = Enforcer::new(model, adapter).await?; - return Ok(e); + return Ok(PermissionsProvider::new(log, e)); } diff --git a/src/api.rs b/src/api.rs index 58d631f..4876e05 100644 --- a/src/api.rs +++ b/src/api.rs @@ -7,13 +7,20 @@ pub mod api { use std::default::Default; use async_std::net::TcpStream; +use futures::task::Spawn; +use futures::FutureExt; use futures_signals::signal::Mutable; use casbin::Enforcer; use casbin::MgmtApi; -use crate::machine::Machines; -use crate::auth::Authentication; -use crate::access::Permissions; +use slog::Logger; + +use std::rc::Rc; +use async_std::sync::{Arc, RwLock}; + +use crate::machine::{MachinesProvider, Machines}; +use crate::auth::{AuthenticationProvider, Authentication}; +use crate::access::{PermissionsProvider, Permissions}; use capnp::{Error}; use capnp::capability::Promise; @@ -21,35 +28,76 @@ use capnp_rpc::RpcSystem; use capnp_rpc::twoparty::VatNetwork; use capnp_rpc::rpc_twoparty_capnp::Side; +use std::ops::Deref; + use api::diflouroborane; -pub fn init() { +#[derive(Clone)] +pub struct API { + auth: Arc>, + perm: Arc>, + mach: Arc>, + + spawner: S, +} +impl API { + pub fn new(auth: AuthenticationProvider, + perm: PermissionsProvider, + mach: MachinesProvider, + spawner: S) + -> Self + { + let auth = Arc::new(RwLock::new(auth)); + let perm = Arc::new(RwLock::new(perm)); + let mach = Arc::new(RwLock::new(mach)); + + Self { auth, perm, mach, spawner } + } + + pub fn into_connection(self) -> Bootstrap { + let auth = Rc::new(Authentication::new(self.auth)); + let perm = Rc::new(Permissions::new(self.perm, auth.clone())); + let mach = Machines::new(self.mach, perm.clone()); + Bootstrap { + auth: auth, + perm: perm, + mach: mach, + } + } } -pub async fn process_socket(auth: Authentication, perm: Permissions, mach: Machines, socket: TcpStream) - -> Result<(), Error> -{ - let api = Api { auth, perm, mach }; - let a = api::diflouroborane::ToClient::new(api).into_client::(); +pub async fn handle_connection(api: API, log: Logger, socket: TcpStream) -> Result<(), Error> { + info!(log, "A new connection"); + let client = api.into_connection(); + let a = api::diflouroborane::ToClient::new(client).into_client::(); + let netw = VatNetwork::new(socket.clone(), socket, Side::Server, Default::default()); - let rpc = RpcSystem::new(Box::new(netw), Some(a.clone().client)); - rpc.await + + let rpc = RpcSystem::new(Box::new(netw), Some(a.clone().client)).map(|_| ()); + + rpc.await; + + Ok(()) } -struct Api { - auth: Authentication, - perm: Permissions, +/// Bootstrap capability of the Diflouroborane API +/// +/// This is the starting point for any client connecting +#[derive(Clone)] +pub struct Bootstrap { + auth: Rc, + perm: Rc, mach: Machines, } -impl diflouroborane::Server for Api { +impl diflouroborane::Server for Bootstrap { fn authentication(&mut self, _params: diflouroborane::AuthenticationParams, mut results: diflouroborane::AuthenticationResults) -> Promise<(), Error> { let mut b = results.get(); - let auth = api::authentication::ToClient::new(self.auth.clone()).into_client::(); + let auth = api::authentication::ToClient::new(self.auth.deref().clone()).into_client::(); b.set_auth(auth); Promise::ok(()) } @@ -59,9 +107,9 @@ impl diflouroborane::Server for Api { mut results: diflouroborane::PermissionsResults) -> Promise<(), Error> { - let mut b = results.get(); - let perm = api::permissions::ToClient::new(self.perm.clone()).into_client::(); - b.set_perm(perm); + //let mut b = results.get(); + //let perm = api::permissions::ToClient::new(self.perm).into_client::(); + //b.set_perm(perm); Promise::ok(()) } diff --git a/src/auth.rs b/src/auth.rs index 44cbc8a..9f06684 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -11,6 +11,9 @@ use std::fs::File; use std::io::{Read, Write}; use std::ops::Deref; +use async_std::sync::{Arc, RwLock}; +use capnp::capability::Promise; + use futures_signals::signal::Mutable; use casbin::{Enforcer, Model, FileAdapter}; @@ -133,24 +136,16 @@ impl AuthenticationProvider { #[derive(Clone)] pub struct Authentication { - state: Mutable>, - provider: Mutable, + pub state: Arc>>, + provider: Arc>, } impl Authentication { - pub fn new(provider: Mutable) -> Self { + pub fn new(provider: Arc>) -> Self { Self { - state: Mutable::new(None), + state: Arc::new(RwLock::new(None)), provider: provider, } } - - pub fn get_authzid(&self) -> Option { - self.state.lock_ref().clone() - } - - pub fn mechs(&self) -> Vec<&'static str> { - self.provider.lock_ref().mechs() - } } @@ -162,15 +157,19 @@ impl api::authentication::Server for Authentication { mut results: api::authentication::AvailableMechanismsResults) -> ::capnp::capability::Promise<(), ::capnp::Error> { - let m = self.mechs(); - let mut b = results.get() - .init_mechanisms(m.len() as u32); - for (i, mech) in m.iter().enumerate() { - let mut bldr = b.reborrow(); - bldr.set(i as u32, mech); - } + let p = self.provider.clone(); + let f = async move { + let m = p.read().await.mechs(); + let mut b = results.get() + .init_mechanisms(m.len() as u32); + for (i, mech) in m.iter().enumerate() { + let mut bldr = b.reborrow(); + bldr.set(i as u32, mech); + } + Ok(()) + }; - ::capnp::capability::Promise::ok(()) + ::capnp::capability::Promise::from_future(f) } fn initialize_authentication(&mut self, @@ -178,42 +177,47 @@ impl api::authentication::Server for Authentication { mut results: api::authentication::InitializeAuthenticationResults) -> ::capnp::capability::Promise<(), ::capnp::Error> { - let params = pry!(params.get()); - let mechanism = pry!(params.get_mechanism()); - match mechanism { - "PLAIN" => { - use api::authentication::maybe_data::Which; + let prov = self.provider.clone(); + let stat = self.state.clone(); - let data = pry!(params.get_initial_data()); - if let Ok(Which::Some(data)) = data.which() { - let data = pry!(data); - if let Ok((b, name)) = self.provider.lock_ref().plain.step(data) { + Promise::from_future(async move { + let params = params.get()?; + let mechanism = params.get_mechanism()?; - // If login was successful, also set the current authzid - if b { - self.state.lock_mut().replace(name.to_string()); + match mechanism { + "PLAIN" => { + use api::authentication::maybe_data::Which; + + let data = params.get_initial_data()?; + if let Ok(Which::Some(data)) = data.which() { + let data = data?; + if let Ok((b, name)) = prov.read().await.plain.step(data) { + // If login was successful set the authzid + if b { + stat.write().await.replace(name.to_string()); + } + + let outcome = Outcome::value(b); + results + .get() + .init_response() + .set_outcome(api::authentication::outcome::ToClient::new(outcome) + .into_client::<::capnp_rpc::Server>()); } - - let outcome = Outcome::value(b); - results - .get() - .init_response() - .set_outcome(api::authentication::outcome::ToClient::new(outcome) - .into_client::<::capnp_rpc::Server>()); + Ok(()) + } else { + Err(::capnp::Error::unimplemented( + "SASL PLAIN requires initial data set".to_string())) } - ::capnp::capability::Promise::ok(()) - } else { - return - ::capnp::capability::Promise::err(::capnp::Error::unimplemented( - "SASL PLAIN requires initial data set".to_string())); + }, + m => { + Err(::capnp::Error::unimplemented( + format!("SASL Mechanism {} is not implemented", m) + )) } - }, - m => { - return - ::capnp::capability::Promise::err(::capnp::Error::unimplemented( - format!("SASL Mechanism {} is not implemented", m))); + } - } + }) } fn get_authzid(&mut self, @@ -221,12 +225,18 @@ impl api::authentication::Server for Authentication { mut results: api::authentication::GetAuthzidResults) -> ::capnp::capability::Promise<(), ::capnp::Error> { - if let Some(zid) = self.state.lock_ref().deref() { - results.get().set_authzid(zid); - } else { - results.get().set_authzid(""); - } - ::capnp::capability::Promise::ok(()) + let state = self.state.clone(); + let f = async move { + if let Some(zid) = state.read().await.deref() { + results.get().set_authzid(&zid); + } else { + results.get().set_authzid(""); + } + + Ok(()) + }; + + Promise::from_future(f) } } diff --git a/src/config.rs b/src/config.rs index b7b81c8..dccc9c4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,25 +1,28 @@ use std::str::FromStr; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use serde::{Serialize, Deserialize}; +use std::io::Read; +use std::fs::File; use crate::error::Result; -pub fn read() -> Result { - Ok(Config { - machinedb: PathBuf::from_str("/tmp/machines.db").unwrap(), - access: Access { - model: PathBuf::from_str("/tmp/model.conf").unwrap(), - policy: PathBuf::from_str("/tmp/policy.csv").unwrap(), - }, - passdb: PathBuf::from_str("/tmp/passwd.db").unwrap(), - }) +use std::default::Default; + +pub fn read(path: &Path) -> Result { + let mut fp = File::open(path)?; + let mut contents = String::new(); + fp.read_to_string(&mut contents)?; + + let config = toml::from_str(&contents)?; + Ok(config) } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { - pub(crate) access: Access, pub machinedb: PathBuf, pub passdb: PathBuf, + pub(crate) access: Access, + pub listen: Box<[Listen]>, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -27,3 +30,33 @@ pub struct Access { pub(crate) model: PathBuf, pub(crate) policy: PathBuf } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Listen { + pub address: String, + pub port: Option, +} + +impl Default for Config { + fn default() -> Self { + Config { + machinedb: PathBuf::from_str("/tmp/machines.db").unwrap(), + access: Access { + model: PathBuf::from_str("/tmp/model.conf").unwrap(), + policy: PathBuf::from_str("/tmp/policy.csv").unwrap(), + }, + passdb: PathBuf::from_str("/tmp/passwd.db").unwrap(), + listen: Box::new([Listen { + address: "127.0.0.1".to_string(), + port: Some(DEFAULT_PORT) + }, + Listen { + address: "::1".to_string(), + port: Some(DEFAULT_PORT) + }]), + } + } +} + +// The default port in the non-assignable i.e. free-use area +pub const DEFAULT_PORT: u16 = 59661; diff --git a/src/error.rs b/src/error.rs index 25fd6ea..3fe1f90 100644 --- a/src/error.rs +++ b/src/error.rs @@ -9,6 +9,7 @@ pub enum Error { TomlSer(toml::ser::Error), SASL(SASLError), IO(io::Error), + Boxed(Box), } impl From for Error { @@ -35,4 +36,10 @@ impl From for Error { } } +impl From> for Error { + fn from(e: Box) -> Error { + Error::Boxed(e) + } +} + pub type Result = std::result::Result; diff --git a/src/log.rs b/src/log.rs index 0b0715b..34c7b2c 100644 --- a/src/log.rs +++ b/src/log.rs @@ -1,8 +1,9 @@ use slog::{Drain, Logger}; use slog_async; use slog_term::{TermDecorator, FullFormat}; +use crate::config::Config; -pub fn init() -> Logger { +pub fn init(_config: &Config) -> Logger { let decorator = TermDecorator::new().build(); let drain = FullFormat::new(decorator).build().fuse(); let drain = slog_async::Async::new(drain).build().fuse(); diff --git a/src/machine.rs b/src/machine.rs index 2782adf..a4033c9 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -15,6 +15,9 @@ use crate::config::Config; use crate::api::api; use crate::access::Permissions; +use std::rc::Rc; +use async_std::sync::{Arc, RwLock}; + use capnp::capability::Promise; use capnp::Error; use capnp_rpc::Server; @@ -32,16 +35,64 @@ pub enum Status { Blocked, } -#[derive(Clone)] -pub struct Machines { +pub struct MachinesProvider { log: Logger, - mdb: Mutable, - perm: Permissions + mdb: MachineDB, } +impl MachinesProvider { + pub fn new(log: Logger, mdb: MachineDB) -> Self { + Self { log, mdb } + } + + pub fn use_(&mut self, uuid: &Uuid) -> std::result::Result<(), capnp::Error> { + if let Some(m) = self.mdb.get_mut(uuid) { + match m.status { + Status::Free => { + trace!(self.log, "Granted use on machine {}", uuid); + + m.status = Status::Occupied; + + Ok(()) + }, + Status::Occupied => { + info!(self.log, "Attempted use on an occupied machine {}", uuid); + Err(Error::failed("Machine is occupied".to_string())) + }, + Status::Blocked => { + info!(self.log, "Attempted use on a blocked machine {}", uuid); + Err(Error::failed("Machine is blocked".to_string())) + } + } + } else { + info!(self.log, "Attempted use on invalid machine {}", uuid); + Err(Error::failed("No such machine".to_string())) + } + } + + pub fn give_back(&mut self, uuid: &Uuid) -> std::result::Result<(), capnp::Error> { + if let Some(m) = self.mdb.get_mut(uuid) { + m.status = Status::Free; + } else { + warn!(self.log, "A giveback was issued for a unknown machine {}", uuid); + } + + Ok(()) + } + + pub fn get_perm_req(&self, uuid: &Uuid) -> Option { + self.mdb.get(uuid).map(|m| m.perm.clone()) + } +} + +#[derive(Clone)] +pub struct Machines { + inner: Arc>, + perm: Rc, +} impl Machines { - pub fn new(log: Logger, mdb: Mutable, perm: Permissions) -> Self { - Self { log, mdb, perm } + pub fn new(inner: Arc>, perm: Rc) -> Self { + Self { inner, perm } } } impl api::machines::Server for Machines { @@ -52,79 +103,95 @@ impl api::machines::Server for Machines { { let params = pry!(params.get()); let uuid_s = pry!(params.get_uuid()); - let uuid = uuid_from_api(uuid_s); - let db = self.mdb.lock_ref(); + // We need to copy the Arc here because we don't have access to it from within the closure + // witout moving it out of self. + let i = self.inner.clone(); + let p = self.perm.clone(); - if let Some(m) = db.get(&uuid) { - let manager = MachineManager::new(uuid, self.mdb.clone()); + let f = async move { + // We only need a read lock at first there's no reason to aquire a write lock. + let i_lock = i.read().await; - if self.perm.enforce(&m.perm, "manage") { - let mut b = results.get(); - let mngr = api::machines::manage::ToClient::new(manager).into_client::(); - b.set_manage(mngr); - trace!(self.log, "Granted manage on machine {}", uuid); - Promise::ok(()) - } else { - Promise::err(Error::failed("Permission denied".to_string())) + if let Some(ps) = i_lock.get_perm_req(&uuid) { + // drop the lock as soon as possible to prevent locking as much as possible + drop(i_lock); + if let Ok(true) = p.enforce(&ps, "manage").await { + // We're here and have not returned an error yet - that means we're free to + // send a successful manage back. + let mut b = results.get(); + + // Magic incantation to get a capability to send + // Also since we move i in here we at this point *must* have dropped + // all locks we may still have on it. + b.set_manage(api::machines::give_back::ToClient::new( + MachineManager::new(i, uuid)).into_client::()); + } } - } else { - info!(self.log, "Attempted manage on invalid machine {}", uuid); - Promise::err(Error::failed("No such machine".to_string())) - } + Ok(()) + }; + + Promise::from_future(f) } fn use_(&mut self, params: api::machines::UseParams, mut results: api::machines::UseResults) - -> Promise<(), Error> + -> Promise<(), capnp::Error> { let params = pry!(params.get()); let uuid_s = pry!(params.get_uuid()); let uuid = uuid_from_api(uuid_s); - let mdb = self.mdb.lock_ref(); - if let Some(m) = mdb.get(&uuid) { - match m.status { - Status::Free => { - trace!(self.log, "Granted use on machine {}", uuid); + // We need to copy the Arc here because we don't have access to it from within the closure + // witout moving it out of self. + let i = self.inner.clone(); + let p = self.perm.clone(); + let f = async move { + // We only need a read lock at first there's no reason to aquire a write lock. + let i_lock = i.read().await; + + if let Some(ps) = i_lock.get_perm_req(&uuid) { + // drop the lock as soon as possible to prevent locking as much as possible + drop(i_lock); + if let Ok(true) = p.enforce(&ps, "write").await { + { + // If use_() returns an error that is our error. If it doesn't that means we can use + // the machine + // Using a subscope to again make the time the lock is valid as short as + // possible. Less locking == more good + let mut i_lock = i.write().await; + i_lock.use_(&uuid)?; + } + + // We're here and have not returned an error yet - that means we're free to + // send a successful use back. let mut b = results.get(); - let gb = api::machines::give_back::ToClient::new( - GiveBack::new(self.log.new(o!()), uuid, self.mdb.clone()) - ).into_client::(); - - b.set_giveback(gb); - - Promise::ok(()) - }, - Status::Occupied => { - info!(self.log, "Attempted use on an occupied machine {}", uuid); - Promise::err(Error::failed("Machine is occupied".to_string())) - }, - Status::Blocked => { - info!(self.log, "Attempted use on a blocked machine {}", uuid); - Promise::err(Error::failed("Machine is blocked".to_string())) + // Magic incantation to get a capability to send + // Also since we move i in here we at this point *must* have dropped + // all locks we may still have on it. + b.set_giveback(api::machines::give_back::ToClient::new( + GiveBack::new(i, uuid)).into_client::()); } } - } else { - info!(self.log, "Attempted use on invalid machine {}", uuid); - Promise::err(Error::failed("No such machine".to_string())) - } + Ok(()) + }; + + Promise::from_future(f) } } +#[derive(Clone)] pub struct GiveBack { - log: Logger, - mdb: Mutable, + mdb: Arc>, uuid: Uuid, } impl GiveBack { - pub fn new(log: Logger, uuid: Uuid, mdb: Mutable) -> Self { - trace!(log, "Giveback initialized for {}", uuid); - Self { log, mdb, uuid } + pub fn new(mdb: Arc>, uuid: Uuid) -> Self { + Self { mdb, uuid } } } @@ -134,19 +201,16 @@ impl api::machines::give_back::Server for GiveBack { _results: api::machines::give_back::GivebackResults) -> Promise<(), Error> { - trace!(log, "Returning {}...", uuid); - let mut mdb = self.mdb.lock_mut(); - if let Some(m) = mdb.get_mut(&self.uuid) { - m.status = Status::Free; - } else { - warn!(self.log, "A giveback was issued for a unknown machine {}", self.uuid); - } + let mdb = self.mdb.clone(); + let uuid = self.uuid.clone(); + let f = async move { + mdb.write().await.give_back(&uuid) + }; - Promise::ok(()) + Promise::from_future(f) } } -// FIXME: Test this exhaustively! fn uuid_from_api(uuid: api::u_u_i_d::Reader) -> Uuid { let uuid0 = uuid.get_uuid0() as u128; let uuid1 = uuid.get_uuid1() as u128; @@ -163,12 +227,12 @@ fn api_from_uuid(uuid: Uuid, mut wr: api::u_u_i_d::Builder) { #[derive(Clone)] pub struct MachineManager { - mdb: Mutable, + mdb: Arc>, uuid: Uuid, } impl MachineManager { - pub fn new(uuid: Uuid, mdb: Mutable) -> Self { + pub fn new(uuid: Uuid, mdb: Arc>) -> Self { Self { mdb, uuid } } } @@ -222,16 +286,18 @@ impl Machine { pub type MachineDB = HashMap; -pub fn init(config: &Config) -> Result { - if config.machinedb.is_file() { +pub async fn init(log: Logger, config: &Config) -> Result { + let mdb = if config.machinedb.is_file() { let mut fp = File::open(&config.machinedb)?; let mut content = String::new(); fp.read_to_string(&mut content)?; let map = toml::from_str(&content)?; - return Ok(map); + map } else { - return Ok(HashMap::new()); - } + HashMap::new() + }; + + Ok(MachinesProvider::new(log, mdb)) } pub fn save(config: &Config, mdb: &MachineDB) -> Result<()> { diff --git a/src/main.rs b/src/main.rs index 104a127..814eae8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,7 +26,7 @@ use futures::prelude::*; use futures::executor::{LocalPool, ThreadPool}; use futures::compat::Stream01CompatExt; use futures::join; -use futures::task::SpawnExt; +use futures::task::{SpawnExt, LocalSpawn}; use capnp_rpc::twoparty::{VatNetwork, VatId}; use capnp_rpc::rpc_twoparty_capnp::Side; @@ -43,6 +43,8 @@ use std::sync::Arc; use error::Error; +use api::API; + // Returning a `Result` from `main` allows us to use the `?` shorthand. // In the case of an Err it will be printed using `fmt::Debug` fn main() -> Result<(), Error> { @@ -106,7 +108,7 @@ fn main() -> Result<(), Error> { // Start loading the machine database, authentication system and permission system // All of those get a custom logger so the source of a log message can be better traced and // filtered - let machinedb_f = machine::init(log.new(o!("system" => "machinedb")), &config); + let machinedb_f = machine::init(log.new(o!("system" => "machines")), &config); let permission_f = access::init(log.new(o!("system" => "permissions")), &config); let authentication_f = auth::init(log.new(o!("system" => "authentication")), config.clone()); @@ -132,14 +134,14 @@ fn main() -> Result<(), Error> { } }).collect(); - let (mdb, pdb, auth) = exec.run_until(async { + let (mach, pdb, auth) = exec.run_until(async { // Rull all futures to completion in parallel. // This will block until all three are done starting up. join!(machinedb_f, permission_f, authentication_f) }); // Error out if any of the subsystems failed to start. - let mdb = mdb?; + let mach = mach?; let pdb = pdb.unwrap(); let auth = auth?; @@ -157,6 +159,12 @@ fn main() -> Result<(), Error> { info!(stop_log.new(o!("system" => "threadpool")), "Stopping Thread <{}>", i) }) .create()?; + let local_spawn = exec.spawner(); + + + // The API has access to all subsystems it needs and the Threadpool as capability to spawn new + // tasks for CPU-intensive work + let api = API::new(auth, pdb, mach, pool); // Closure inefficiencies. Lucky cloning an Arc is pretty cheap. let inner_log = log.clone(); @@ -167,9 +175,6 @@ fn main() -> Result<(), Error> { let listeners = listeners_s.await; let incoming = stream::select_all(listeners.iter().map(|l| l.incoming())); - // Spawner is a handle to the shared ThreadPool forwarded into each connection - let spawner = pool.clone(); - // For each incoming connection start a new task to handle it and throw it on the thread // pool let handle_sockets = incoming.map(|socket| { @@ -191,7 +196,7 @@ fn main() -> Result<(), Error> { // We handle the error using map_err, `let _` is used to quiet the compiler // warning - let f = api::handle_connection(log.clone(), socket, spawner.clone()) + let f = api::handle_connection(api.clone(), log.clone(), socket) .map_err(move |e| { error!(log, "Error occured during protocol handling: {}", e); }) @@ -199,7 +204,8 @@ fn main() -> Result<(), Error> { .map(|_| ()); // In this case only the error is relevant since the Value is always () - if let Err(e) = pool.spawn(f) { + // The future is Boxed to make it the `LocalFutureObj` that LocalSpawn expects + if let Err(e) = local_spawn.spawn_local_obj(Box::new(f).into()) { error!(elog, "Failed to spawn connection handler: {}", e); // Failing to spawn a handler means we are most likely overloaded return LoopResult::Overloaded;