Showcase version commit

This commit is contained in:
Gregor Reitzenstein 2020-02-18 16:55:19 +01:00
parent 796e957b27
commit 9c4144ac66
9 changed files with 368 additions and 184 deletions

View File

@ -1,5 +1,5 @@
[package]
name = "bffh"
name = "diflouroborane"
version = "0.1.0"
authors = ["Gregor Reitzenstein <me@dequbed.space>"]
edition = "2018"

View File

@ -5,37 +5,50 @@ use slog::Logger;
use casbin::prelude::*;
use super::config::Config;
use futures_signals::signal::Mutable;
use crate::api::api;
use crate::config::Config;
use crate::auth::Authentication;
use crate::error::Result;
use std::rc::Rc;
use async_std::sync::{Arc, RwLock};
use std::ops::Deref;
pub struct PermissionsProvider {
log: Logger,
pdb: Enforcer,
}
impl PermissionsProvider {
pub fn new(log: Logger, pdb: Enforcer) -> Self {
Self { log, pdb }
}
pub fn enforce(&self, actor: &str, object: &str, action: &str) -> Result<bool> {
let b = self.pdb.enforce(vec![actor, object, action])?;
Ok(b)
}
}
#[derive(Clone)]
pub struct Permissions {
log: Logger,
pdb: Mutable<Enforcer>,
auth: Authentication,
inner: Arc<RwLock<PermissionsProvider>>,
auth: Rc<Authentication>,
}
impl Permissions {
pub fn new(log: Logger, pdb: Mutable<Enforcer>, auth: Authentication) -> Self {
Self { log, pdb, auth }
pub fn new(inner: Arc<RwLock<PermissionsProvider>>, auth: Rc<Authentication>) -> Self {
Self { inner, auth }
}
pub fn enforce(&self, object: &str, action: &str) -> bool {
if let Some(actor) = self.auth.get_authzid() {
trace!(self.log, "Checking permission {} for {} on {}", action, actor, object);
let r = self.pdb.lock_ref().enforce(vec![&actor,object,action]).unwrap();
if !r {
info!(self.log, "Failed permission {} for {} on {}", action, actor, object);
}
return r;
pub async fn enforce(&self, object: &str, action: &str) -> Result<bool> {
if let Some(actor) = self.auth.state.read().await.deref() {
self.inner.read().await.enforce(&actor, object, action)
} else {
info!(self.log, "Attempted anonymous access: {} on {}", action, object);
false
Ok(false)
}
}
}
@ -45,11 +58,11 @@ impl api::permissions::Server for Permissions {
}
/// This line documents init
pub async fn init(config: &Config) -> std::result::Result<Enforcer, Box<dyn std::error::Error>> {
pub async fn init(log: Logger, config: &Config) -> std::result::Result<PermissionsProvider, Box<dyn std::error::Error>> {
let model = Model::from_file(config.access.model.clone()).await?;
let adapter = Box::new(FileAdapter::new(config.access.policy.clone()));
let e = Enforcer::new(model, adapter).await?;
return Ok(e);
return Ok(PermissionsProvider::new(log, e));
}

View File

@ -7,13 +7,20 @@ pub mod api {
use std::default::Default;
use async_std::net::TcpStream;
use futures::task::Spawn;
use futures::FutureExt;
use futures_signals::signal::Mutable;
use casbin::Enforcer;
use casbin::MgmtApi;
use crate::machine::Machines;
use crate::auth::Authentication;
use crate::access::Permissions;
use slog::Logger;
use std::rc::Rc;
use async_std::sync::{Arc, RwLock};
use crate::machine::{MachinesProvider, Machines};
use crate::auth::{AuthenticationProvider, Authentication};
use crate::access::{PermissionsProvider, Permissions};
use capnp::{Error};
use capnp::capability::Promise;
@ -21,35 +28,76 @@ use capnp_rpc::RpcSystem;
use capnp_rpc::twoparty::VatNetwork;
use capnp_rpc::rpc_twoparty_capnp::Side;
use std::ops::Deref;
use api::diflouroborane;
pub fn init() {
}
#[derive(Clone)]
pub struct API<S> {
auth: Arc<RwLock<AuthenticationProvider>>,
perm: Arc<RwLock<PermissionsProvider>>,
mach: Arc<RwLock<MachinesProvider>>,
pub async fn process_socket(auth: Authentication, perm: Permissions, mach: Machines, socket: TcpStream)
-> Result<(), Error>
spawner: S,
}
impl<S: Spawn> API<S> {
pub fn new(auth: AuthenticationProvider,
perm: PermissionsProvider,
mach: MachinesProvider,
spawner: S)
-> Self
{
let api = Api { auth, perm, mach };
let a = api::diflouroborane::ToClient::new(api).into_client::<capnp_rpc::Server>();
let netw = VatNetwork::new(socket.clone(), socket, Side::Server, Default::default());
let rpc = RpcSystem::new(Box::new(netw), Some(a.clone().client));
rpc.await
let auth = Arc::new(RwLock::new(auth));
let perm = Arc::new(RwLock::new(perm));
let mach = Arc::new(RwLock::new(mach));
Self { auth, perm, mach, spawner }
}
struct Api {
auth: Authentication,
perm: Permissions,
pub fn into_connection(self) -> Bootstrap {
let auth = Rc::new(Authentication::new(self.auth));
let perm = Rc::new(Permissions::new(self.perm, auth.clone()));
let mach = Machines::new(self.mach, perm.clone());
Bootstrap {
auth: auth,
perm: perm,
mach: mach,
}
}
}
pub async fn handle_connection<S: Spawn>(api: API<S>, log: Logger, socket: TcpStream) -> Result<(), Error> {
info!(log, "A new connection");
let client = api.into_connection();
let a = api::diflouroborane::ToClient::new(client).into_client::<capnp_rpc::Server>();
let netw = VatNetwork::new(socket.clone(), socket, Side::Server, Default::default());
let rpc = RpcSystem::new(Box::new(netw), Some(a.clone().client)).map(|_| ());
rpc.await;
Ok(())
}
/// Bootstrap capability of the Diflouroborane API
///
/// This is the starting point for any client connecting
#[derive(Clone)]
pub struct Bootstrap {
auth: Rc<Authentication>,
perm: Rc<Permissions>,
mach: Machines,
}
impl diflouroborane::Server for Api {
impl diflouroborane::Server for Bootstrap {
fn authentication(&mut self,
_params: diflouroborane::AuthenticationParams,
mut results: diflouroborane::AuthenticationResults)
-> Promise<(), Error>
{
let mut b = results.get();
let auth = api::authentication::ToClient::new(self.auth.clone()).into_client::<capnp_rpc::Server>();
let auth = api::authentication::ToClient::new(self.auth.deref().clone()).into_client::<capnp_rpc::Server>();
b.set_auth(auth);
Promise::ok(())
}
@ -59,9 +107,9 @@ impl diflouroborane::Server for Api {
mut results: diflouroborane::PermissionsResults)
-> Promise<(), Error>
{
let mut b = results.get();
let perm = api::permissions::ToClient::new(self.perm.clone()).into_client::<capnp_rpc::Server>();
b.set_perm(perm);
//let mut b = results.get();
//let perm = api::permissions::ToClient::new(self.perm).into_client::<capnp_rpc::Server>();
//b.set_perm(perm);
Promise::ok(())
}

View File

@ -11,6 +11,9 @@ use std::fs::File;
use std::io::{Read, Write};
use std::ops::Deref;
use async_std::sync::{Arc, RwLock};
use capnp::capability::Promise;
use futures_signals::signal::Mutable;
use casbin::{Enforcer, Model, FileAdapter};
@ -133,24 +136,16 @@ impl AuthenticationProvider {
#[derive(Clone)]
pub struct Authentication {
state: Mutable<Option<String>>,
provider: Mutable<AuthenticationProvider>,
pub state: Arc<RwLock<Option<String>>>,
provider: Arc<RwLock<AuthenticationProvider>>,
}
impl Authentication {
pub fn new(provider: Mutable<AuthenticationProvider>) -> Self {
pub fn new(provider: Arc<RwLock<AuthenticationProvider>>) -> Self {
Self {
state: Mutable::new(None),
state: Arc::new(RwLock::new(None)),
provider: provider,
}
}
pub fn get_authzid(&self) -> Option<String> {
self.state.lock_ref().clone()
}
pub fn mechs(&self) -> Vec<&'static str> {
self.provider.lock_ref().mechs()
}
}
@ -162,15 +157,19 @@ impl api::authentication::Server for Authentication {
mut results: api::authentication::AvailableMechanismsResults)
-> ::capnp::capability::Promise<(), ::capnp::Error>
{
let m = self.mechs();
let p = self.provider.clone();
let f = async move {
let m = p.read().await.mechs();
let mut b = results.get()
.init_mechanisms(m.len() as u32);
for (i, mech) in m.iter().enumerate() {
let mut bldr = b.reborrow();
bldr.set(i as u32, mech);
}
Ok(())
};
::capnp::capability::Promise::ok(())
::capnp::capability::Promise::from_future(f)
}
fn initialize_authentication(&mut self,
@ -178,20 +177,24 @@ impl api::authentication::Server for Authentication {
mut results: api::authentication::InitializeAuthenticationResults)
-> ::capnp::capability::Promise<(), ::capnp::Error>
{
let params = pry!(params.get());
let mechanism = pry!(params.get_mechanism());
let prov = self.provider.clone();
let stat = self.state.clone();
Promise::from_future(async move {
let params = params.get()?;
let mechanism = params.get_mechanism()?;
match mechanism {
"PLAIN" => {
use api::authentication::maybe_data::Which;
let data = pry!(params.get_initial_data());
let data = params.get_initial_data()?;
if let Ok(Which::Some(data)) = data.which() {
let data = pry!(data);
if let Ok((b, name)) = self.provider.lock_ref().plain.step(data) {
// If login was successful, also set the current authzid
let data = data?;
if let Ok((b, name)) = prov.read().await.plain.step(data) {
// If login was successful set the authzid
if b {
self.state.lock_mut().replace(name.to_string());
stat.write().await.replace(name.to_string());
}
let outcome = Outcome::value(b);
@ -201,19 +204,20 @@ impl api::authentication::Server for Authentication {
.set_outcome(api::authentication::outcome::ToClient::new(outcome)
.into_client::<::capnp_rpc::Server>());
}
::capnp::capability::Promise::ok(())
Ok(())
} else {
return
::capnp::capability::Promise::err(::capnp::Error::unimplemented(
"SASL PLAIN requires initial data set".to_string()));
Err(::capnp::Error::unimplemented(
"SASL PLAIN requires initial data set".to_string()))
}
},
m => {
return
::capnp::capability::Promise::err(::capnp::Error::unimplemented(
format!("SASL Mechanism {} is not implemented", m)));
Err(::capnp::Error::unimplemented(
format!("SASL Mechanism {} is not implemented", m)
))
}
}
})
}
fn get_authzid(&mut self,
@ -221,12 +225,18 @@ impl api::authentication::Server for Authentication {
mut results: api::authentication::GetAuthzidResults)
-> ::capnp::capability::Promise<(), ::capnp::Error>
{
if let Some(zid) = self.state.lock_ref().deref() {
results.get().set_authzid(zid);
let state = self.state.clone();
let f = async move {
if let Some(zid) = state.read().await.deref() {
results.get().set_authzid(&zid);
} else {
results.get().set_authzid("");
}
::capnp::capability::Promise::ok(())
Ok(())
};
Promise::from_future(f)
}
}

View File

@ -1,25 +1,28 @@
use std::str::FromStr;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use serde::{Serialize, Deserialize};
use std::io::Read;
use std::fs::File;
use crate::error::Result;
pub fn read() -> Result<Config> {
Ok(Config {
machinedb: PathBuf::from_str("/tmp/machines.db").unwrap(),
access: Access {
model: PathBuf::from_str("/tmp/model.conf").unwrap(),
policy: PathBuf::from_str("/tmp/policy.csv").unwrap(),
},
passdb: PathBuf::from_str("/tmp/passwd.db").unwrap(),
})
use std::default::Default;
pub fn read(path: &Path) -> Result<Config> {
let mut fp = File::open(path)?;
let mut contents = String::new();
fp.read_to_string(&mut contents)?;
let config = toml::from_str(&contents)?;
Ok(config)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
pub(crate) access: Access,
pub machinedb: PathBuf,
pub passdb: PathBuf,
pub(crate) access: Access,
pub listen: Box<[Listen]>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -27,3 +30,33 @@ pub struct Access {
pub(crate) model: PathBuf,
pub(crate) policy: PathBuf
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Listen {
pub address: String,
pub port: Option<u16>,
}
impl Default for Config {
fn default() -> Self {
Config {
machinedb: PathBuf::from_str("/tmp/machines.db").unwrap(),
access: Access {
model: PathBuf::from_str("/tmp/model.conf").unwrap(),
policy: PathBuf::from_str("/tmp/policy.csv").unwrap(),
},
passdb: PathBuf::from_str("/tmp/passwd.db").unwrap(),
listen: Box::new([Listen {
address: "127.0.0.1".to_string(),
port: Some(DEFAULT_PORT)
},
Listen {
address: "::1".to_string(),
port: Some(DEFAULT_PORT)
}]),
}
}
}
// The default port in the non-assignable i.e. free-use area
pub const DEFAULT_PORT: u16 = 59661;

View File

@ -9,6 +9,7 @@ pub enum Error {
TomlSer(toml::ser::Error),
SASL(SASLError),
IO(io::Error),
Boxed(Box<dyn std::error::Error>),
}
impl From<SASLError> for Error {
@ -35,4 +36,10 @@ impl From<toml::ser::Error> for Error {
}
}
impl From<Box<dyn std::error::Error>> for Error {
fn from(e: Box<dyn std::error::Error>) -> Error {
Error::Boxed(e)
}
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@ -1,8 +1,9 @@
use slog::{Drain, Logger};
use slog_async;
use slog_term::{TermDecorator, FullFormat};
use crate::config::Config;
pub fn init() -> Logger {
pub fn init(_config: &Config) -> Logger {
let decorator = TermDecorator::new().build();
let drain = FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();

View File

@ -15,6 +15,9 @@ use crate::config::Config;
use crate::api::api;
use crate::access::Permissions;
use std::rc::Rc;
use async_std::sync::{Arc, RwLock};
use capnp::capability::Promise;
use capnp::Error;
use capnp_rpc::Server;
@ -32,16 +35,64 @@ pub enum Status {
Blocked,
}
#[derive(Clone)]
pub struct Machines {
pub struct MachinesProvider {
log: Logger,
mdb: Mutable<MachineDB>,
perm: Permissions
mdb: MachineDB,
}
impl MachinesProvider {
pub fn new(log: Logger, mdb: MachineDB) -> Self {
Self { log, mdb }
}
pub fn use_(&mut self, uuid: &Uuid) -> std::result::Result<(), capnp::Error> {
if let Some(m) = self.mdb.get_mut(uuid) {
match m.status {
Status::Free => {
trace!(self.log, "Granted use on machine {}", uuid);
m.status = Status::Occupied;
Ok(())
},
Status::Occupied => {
info!(self.log, "Attempted use on an occupied machine {}", uuid);
Err(Error::failed("Machine is occupied".to_string()))
},
Status::Blocked => {
info!(self.log, "Attempted use on a blocked machine {}", uuid);
Err(Error::failed("Machine is blocked".to_string()))
}
}
} else {
info!(self.log, "Attempted use on invalid machine {}", uuid);
Err(Error::failed("No such machine".to_string()))
}
}
pub fn give_back(&mut self, uuid: &Uuid) -> std::result::Result<(), capnp::Error> {
if let Some(m) = self.mdb.get_mut(uuid) {
m.status = Status::Free;
} else {
warn!(self.log, "A giveback was issued for a unknown machine {}", uuid);
}
Ok(())
}
pub fn get_perm_req(&self, uuid: &Uuid) -> Option<String> {
self.mdb.get(uuid).map(|m| m.perm.clone())
}
}
#[derive(Clone)]
pub struct Machines {
inner: Arc<RwLock<MachinesProvider>>,
perm: Rc<Permissions>,
}
impl Machines {
pub fn new(log: Logger, mdb: Mutable<MachineDB>, perm: Permissions) -> Self {
Self { log, mdb, perm }
pub fn new(inner: Arc<RwLock<MachinesProvider>>, perm: Rc<Permissions>) -> Self {
Self { inner, perm }
}
}
impl api::machines::Server for Machines {
@ -52,79 +103,95 @@ impl api::machines::Server for Machines {
{
let params = pry!(params.get());
let uuid_s = pry!(params.get_uuid());
let uuid = uuid_from_api(uuid_s);
let db = self.mdb.lock_ref();
// We need to copy the Arc here because we don't have access to it from within the closure
// witout moving it out of self.
let i = self.inner.clone();
let p = self.perm.clone();
if let Some(m) = db.get(&uuid) {
let manager = MachineManager::new(uuid, self.mdb.clone());
let f = async move {
// We only need a read lock at first there's no reason to aquire a write lock.
let i_lock = i.read().await;
if self.perm.enforce(&m.perm, "manage") {
if let Some(ps) = i_lock.get_perm_req(&uuid) {
// drop the lock as soon as possible to prevent locking as much as possible
drop(i_lock);
if let Ok(true) = p.enforce(&ps, "manage").await {
// We're here and have not returned an error yet - that means we're free to
// send a successful manage back.
let mut b = results.get();
let mngr = api::machines::manage::ToClient::new(manager).into_client::<Server>();
b.set_manage(mngr);
trace!(self.log, "Granted manage on machine {}", uuid);
Promise::ok(())
} else {
Promise::err(Error::failed("Permission denied".to_string()))
// Magic incantation to get a capability to send
// Also since we move i in here we at this point *must* have dropped
// all locks we may still have on it.
b.set_manage(api::machines::give_back::ToClient::new(
MachineManager::new(i, uuid)).into_client::<Server>());
}
} else {
info!(self.log, "Attempted manage on invalid machine {}", uuid);
Promise::err(Error::failed("No such machine".to_string()))
}
Ok(())
};
Promise::from_future(f)
}
fn use_(&mut self,
params: api::machines::UseParams,
mut results: api::machines::UseResults)
-> Promise<(), Error>
-> Promise<(), capnp::Error>
{
let params = pry!(params.get());
let uuid_s = pry!(params.get_uuid());
let uuid = uuid_from_api(uuid_s);
let mdb = self.mdb.lock_ref();
if let Some(m) = mdb.get(&uuid) {
match m.status {
Status::Free => {
trace!(self.log, "Granted use on machine {}", uuid);
// We need to copy the Arc here because we don't have access to it from within the closure
// witout moving it out of self.
let i = self.inner.clone();
let p = self.perm.clone();
let f = async move {
// We only need a read lock at first there's no reason to aquire a write lock.
let i_lock = i.read().await;
if let Some(ps) = i_lock.get_perm_req(&uuid) {
// drop the lock as soon as possible to prevent locking as much as possible
drop(i_lock);
if let Ok(true) = p.enforce(&ps, "write").await {
{
// If use_() returns an error that is our error. If it doesn't that means we can use
// the machine
// Using a subscope to again make the time the lock is valid as short as
// possible. Less locking == more good
let mut i_lock = i.write().await;
i_lock.use_(&uuid)?;
}
// We're here and have not returned an error yet - that means we're free to
// send a successful use back.
let mut b = results.get();
let gb = api::machines::give_back::ToClient::new(
GiveBack::new(self.log.new(o!()), uuid, self.mdb.clone())
).into_client::<Server>();
// Magic incantation to get a capability to send
// Also since we move i in here we at this point *must* have dropped
// all locks we may still have on it.
b.set_giveback(api::machines::give_back::ToClient::new(
GiveBack::new(i, uuid)).into_client::<Server>());
}
}
Ok(())
};
b.set_giveback(gb);
Promise::ok(())
},
Status::Occupied => {
info!(self.log, "Attempted use on an occupied machine {}", uuid);
Promise::err(Error::failed("Machine is occupied".to_string()))
},
Status::Blocked => {
info!(self.log, "Attempted use on a blocked machine {}", uuid);
Promise::err(Error::failed("Machine is blocked".to_string()))
}
}
} else {
info!(self.log, "Attempted use on invalid machine {}", uuid);
Promise::err(Error::failed("No such machine".to_string()))
}
Promise::from_future(f)
}
}
#[derive(Clone)]
pub struct GiveBack {
log: Logger,
mdb: Mutable<MachineDB>,
mdb: Arc<RwLock<MachinesProvider>>,
uuid: Uuid,
}
impl GiveBack {
pub fn new(log: Logger, uuid: Uuid, mdb: Mutable<MachineDB>) -> Self {
trace!(log, "Giveback initialized for {}", uuid);
Self { log, mdb, uuid }
pub fn new(mdb: Arc<RwLock<MachinesProvider>>, uuid: Uuid) -> Self {
Self { mdb, uuid }
}
}
@ -134,19 +201,16 @@ impl api::machines::give_back::Server for GiveBack {
_results: api::machines::give_back::GivebackResults)
-> Promise<(), Error>
{
trace!(log, "Returning {}...", uuid);
let mut mdb = self.mdb.lock_mut();
if let Some(m) = mdb.get_mut(&self.uuid) {
m.status = Status::Free;
} else {
warn!(self.log, "A giveback was issued for a unknown machine {}", self.uuid);
}
let mdb = self.mdb.clone();
let uuid = self.uuid.clone();
let f = async move {
mdb.write().await.give_back(&uuid)
};
Promise::ok(())
Promise::from_future(f)
}
}
// FIXME: Test this exhaustively!
fn uuid_from_api(uuid: api::u_u_i_d::Reader) -> Uuid {
let uuid0 = uuid.get_uuid0() as u128;
let uuid1 = uuid.get_uuid1() as u128;
@ -163,12 +227,12 @@ fn api_from_uuid(uuid: Uuid, mut wr: api::u_u_i_d::Builder) {
#[derive(Clone)]
pub struct MachineManager {
mdb: Mutable<MachineDB>,
mdb: Arc<RwLock<MachinesProvider>>,
uuid: Uuid,
}
impl MachineManager {
pub fn new(uuid: Uuid, mdb: Mutable<MachineDB>) -> Self {
pub fn new(uuid: Uuid, mdb: Arc<RwLock<MachineDB>>) -> Self {
Self { mdb, uuid }
}
}
@ -222,16 +286,18 @@ impl Machine {
pub type MachineDB = HashMap<Uuid, Machine>;
pub fn init(config: &Config) -> Result<MachineDB> {
if config.machinedb.is_file() {
pub async fn init(log: Logger, config: &Config) -> Result<MachinesProvider> {
let mdb = if config.machinedb.is_file() {
let mut fp = File::open(&config.machinedb)?;
let mut content = String::new();
fp.read_to_string(&mut content)?;
let map = toml::from_str(&content)?;
return Ok(map);
map
} else {
return Ok(HashMap::new());
}
HashMap::new()
};
Ok(MachinesProvider::new(log, mdb))
}
pub fn save(config: &Config, mdb: &MachineDB) -> Result<()> {

View File

@ -26,7 +26,7 @@ use futures::prelude::*;
use futures::executor::{LocalPool, ThreadPool};
use futures::compat::Stream01CompatExt;
use futures::join;
use futures::task::SpawnExt;
use futures::task::{SpawnExt, LocalSpawn};
use capnp_rpc::twoparty::{VatNetwork, VatId};
use capnp_rpc::rpc_twoparty_capnp::Side;
@ -43,6 +43,8 @@ use std::sync::Arc;
use error::Error;
use api::API;
// Returning a `Result` from `main` allows us to use the `?` shorthand.
// In the case of an Err it will be printed using `fmt::Debug`
fn main() -> Result<(), Error> {
@ -106,7 +108,7 @@ fn main() -> Result<(), Error> {
// Start loading the machine database, authentication system and permission system
// All of those get a custom logger so the source of a log message can be better traced and
// filtered
let machinedb_f = machine::init(log.new(o!("system" => "machinedb")), &config);
let machinedb_f = machine::init(log.new(o!("system" => "machines")), &config);
let permission_f = access::init(log.new(o!("system" => "permissions")), &config);
let authentication_f = auth::init(log.new(o!("system" => "authentication")), config.clone());
@ -132,14 +134,14 @@ fn main() -> Result<(), Error> {
}
}).collect();
let (mdb, pdb, auth) = exec.run_until(async {
let (mach, pdb, auth) = exec.run_until(async {
// Rull all futures to completion in parallel.
// This will block until all three are done starting up.
join!(machinedb_f, permission_f, authentication_f)
});
// Error out if any of the subsystems failed to start.
let mdb = mdb?;
let mach = mach?;
let pdb = pdb.unwrap();
let auth = auth?;
@ -157,6 +159,12 @@ fn main() -> Result<(), Error> {
info!(stop_log.new(o!("system" => "threadpool")), "Stopping Thread <{}>", i)
})
.create()?;
let local_spawn = exec.spawner();
// The API has access to all subsystems it needs and the Threadpool as capability to spawn new
// tasks for CPU-intensive work
let api = API::new(auth, pdb, mach, pool);
// Closure inefficiencies. Lucky cloning an Arc is pretty cheap.
let inner_log = log.clone();
@ -167,9 +175,6 @@ fn main() -> Result<(), Error> {
let listeners = listeners_s.await;
let incoming = stream::select_all(listeners.iter().map(|l| l.incoming()));
// Spawner is a handle to the shared ThreadPool forwarded into each connection
let spawner = pool.clone();
// For each incoming connection start a new task to handle it and throw it on the thread
// pool
let handle_sockets = incoming.map(|socket| {
@ -191,7 +196,7 @@ fn main() -> Result<(), Error> {
// We handle the error using map_err, `let _` is used to quiet the compiler
// warning
let f = api::handle_connection(log.clone(), socket, spawner.clone())
let f = api::handle_connection(api.clone(), log.clone(), socket)
.map_err(move |e| {
error!(log, "Error occured during protocol handling: {}", e);
})
@ -199,7 +204,8 @@ fn main() -> Result<(), Error> {
.map(|_| ());
// In this case only the error is relevant since the Value is always ()
if let Err(e) = pool.spawn(f) {
// The future is Boxed to make it the `LocalFutureObj` that LocalSpawn expects
if let Err(e) = local_spawn.spawn_local_obj(Box::new(f).into()) {
error!(elog, "Failed to spawn connection handler: {}", e);
// Failing to spawn a handler means we are most likely overloaded
return LoopResult::Overloaded;