Make work

This commit is contained in:
Nadja Reitzenstein 2021-09-21 07:48:19 +02:00
parent 143416a308
commit bd635d97ac
16 changed files with 198 additions and 238 deletions

View File

@ -1,7 +1,13 @@
-- { actor_connections = [] : List { _1 : Text, _2 : Text }
{ actor_connections = [{ _1 = "Testmachine", _2 = "Actor" }]
{ actor_connections =
[ { _1 = "Testmachine", _2 = "Actor" }
, { _1 = "Another", _2 = "Actor2" }
, { _1 = "Yetmore", _2 = "Actor3" }
]
, actors =
{ Actor = { module = "Shelly", params = {=} }
{ Actor = { module = "Dummy", params = {=} }
, Actor2 = { module = "Dummy", params = {=} }
, Actor3 = { module = "Dummy", params = {=} }
}
, init_connections = [] : List { _1 : Text, _2 : Text }
--, init_connections = [{ _1 = "Initiator", _2 = "Testmachine" }]
@ -11,6 +17,7 @@
, listens =
[ { address = "127.0.0.1", port = Some 59661 }
, { address = "::1", port = Some 59661 }
, { address = "192.168.0.114", port = Some 59661 }
]
, machines =
{ Testmachine =
@ -38,12 +45,21 @@
, write = "lab.test.write"
}
}
, mqtt_url = "tcp://localhost:1883"
, mqtt_url = ""
, db_path = "/tmp/bffh"
, roles =
{ Testrole =
{ parents = [] : List Text
, permissions = [] : List Text
{ testrole =
{ permissions = [ "lab.test.*" ] }
, somerole =
{ parents = ["testparent"]
, permissions = [ "lab.some.admin" ]
}
, testparent =
{ permissions =
[ "lab.some.write"
, "lab.some.read"
, "lab.some.disclose"
]
}
}
}

View File

@ -1,11 +1,13 @@
[Testuser]
# Define them in roles.toml as well
roles = ["somerole/lmdb", "testrole/lmdb"]
roles = ["somerole/internal", "testrole/internal"]
# If two or more users want to use the same machine at once the higher prio
# wins
priority = 0
passwd = "secret"
# You can add whatever random data you want.
# It will get stored in the `kv` field in UserData.
noot = "noot!"

View File

@ -126,11 +126,11 @@ impl Actuator for Dummy {
}
}
pub fn load(log: &Logger, client: &AsyncClient, config: &Config) -> Result<(ActorMap, Vec<Actor>)> {
pub fn load(log: &Logger, config: &Config) -> Result<(ActorMap, Vec<Actor>)> {
let mut map = HashMap::new();
let actuators = config.actors.iter()
.map(|(k,v)| (k, load_single(log, client, k, &v.module, &v.params)))
.map(|(k,v)| (k, load_single(log, k, &v.module, &v.params)))
.filter_map(|(k, n)| match n {
None => None,
Some(a) => Some((k, a))
@ -149,7 +149,6 @@ pub fn load(log: &Logger, client: &AsyncClient, config: &Config) -> Result<(Acto
fn load_single(
log: &Logger,
client: &AsyncClient,
name: &String,
module_name: &String,
params: &HashMap<String, String>
@ -157,14 +156,8 @@ fn load_single(
{
use crate::modules::*;
info!(log, "Loading actor \"{}\" with module {} and params {:?}", name, module_name, params);
match module_name.as_ref() {
"Shelly" => {
if !params.is_empty() {
warn!(log, "\"{}\" module expects no parameters. Configured as \"{}\".",
module_name, name);
}
Some(Box::new(Shelly::new(log, name.clone(), client.clone())))
},
"Dummy" => {
Some(Box::new(Dummy::new(log)))
}

View File

@ -33,7 +33,6 @@ use crate::db::user::{Internal as UserDB, UserId, User};
use crate::db::access::AccessControl as AccessDB;
pub struct AppData {
passdb: Arc<PassDB>,
userdb: Arc<UserDB>,
}
pub struct SessionData {
@ -93,7 +92,7 @@ impl Auth {
pub fn new(log: Logger, dbs: Databases, session: Rc<RefCell<Option<Session>>>) -> Self {
let mut ctx = SASL::new().unwrap();
let appdata = Box::new(AppData { passdb: dbs.passdb.clone(), userdb: dbs.userdb.clone() });
let appdata = Box::new(AppData { userdb: dbs.userdb.clone() });
ctx.store(appdata);
ctx.install_callback::<CB>();

View File

@ -1,4 +1,5 @@
use std::sync::Arc;
use std::time::Duration;
use capnp::capability::Promise;
use capnp::Error;
@ -102,23 +103,33 @@ impl use_::Server for Machine {
let userid = self.userid.clone();
let f = async move {
let mut guard = machine.lock().await;
match guard.read_state().lock_ref().state {
let mut ok = false;
{
match { guard.read_state().lock_ref().state.clone() } {
Status::Free => {
guard.do_state_change(MachineState::used(Some(userid)));
ok = true;
},
Status::Reserved(ref whom) => {
// If it's reserved for us or we're allowed to take over
if &userid == whom {
guard.do_state_change(MachineState::used(Some(userid)));
ok = true;
}
},
_ => { }
}
}
if ok {
guard.do_state_change(MachineState::used(Some(userid)));
}
Ok(())
};
Promise::from_future(f)
let g = smol::future::race(f, smol::Timer::after(Duration::from_secs(4))
.map(|_| Err(capnp::Error::failed("Waiting for machine lock timed out!".to_string()))));
Promise::from_future(g)
}
}
@ -132,14 +143,21 @@ impl in_use::Server for Machine {
let userid = self.userid.clone();
let f = async move {
let mut guard = machine.lock().await;
match guard.read_state().lock_ref().state {
let mut ok = false;
{
match { guard.read_state().lock_ref().state.clone() } {
Status::InUse(ref whom) => {
if &Some(userid) == whom {
guard.reset_state()
ok = true;
}
},
_ => { }
}
}
if ok {
guard.reset_state()
}
Ok(())
};

View File

@ -152,7 +152,6 @@ impl machines::Server for Machines {
let machineapi = Machine::new(user.clone(), perms, machine.clone());
if perms.write {
builder.set_use(capnp_rpc::new_client(machineapi.clone()));
builder.set_inuse(capnp_rpc::new_client(machineapi.clone()));
}
if perms.manage {
builder.set_transfer(capnp_rpc::new_client(machineapi.clone()));
@ -163,17 +162,25 @@ impl machines::Server for Machines {
builder.set_admin(capnp_rpc::new_client(machineapi.clone()));
}
builder.set_info(capnp_rpc::new_client(machineapi));
let s = match machine.get_status().await {
Status::Free => MachineState::Free,
Status::Disabled => MachineState::Disabled,
Status::Blocked(_) => MachineState::Blocked,
Status::InUse(_) => MachineState::InUse,
Status::InUse(u) => {
if let Some(owner) = u.as_ref() {
if owner == user {
builder.set_inuse(capnp_rpc::new_client(machineapi.clone()));
}
}
MachineState::InUse
},
Status::Reserved(_) => MachineState::Reserved,
Status::ToCheck(_) => MachineState::ToCheck,
};
builder.set_state(s);
builder.set_info(capnp_rpc::new_client(machineapi));
};
Ok(())

View File

@ -44,8 +44,10 @@ pub struct Config {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoleConfig {
parents: Vec<String>,
permissions: Vec<PermRule>,
#[serde(default = "Vec::new")]
pub parents: Vec<String>,
#[serde(default = "Vec::new")]
pub permissions: Vec<PermRule>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@ -28,7 +28,6 @@ pub mod machine;
pub struct Databases {
pub access: Arc<access::AccessControl>,
pub machine: Arc<machine::internal::Internal>,
pub passdb: Arc<pass::PassDB>,
pub userdb: Arc<user::Internal>,
}
@ -53,13 +52,10 @@ impl Databases {
let permdb = access::init(log.new(o!("system" => "permissions")), &config, env.clone())?;
let ac = access::AccessControl::new(permdb);
let passdb = pass::PassDB::init(log.new(o!("system" => "passwords")), env.clone()).unwrap();
let userdb = user::init(log.new(o!("system" => "users")), &config, env.clone())?;
Ok(Self {
access: Arc::new(ac),
passdb: Arc::new(passdb),
machine: Arc::new(mdb),
userdb: Arc::new(userdb),
})

View File

@ -1,6 +1,9 @@
//! Access control logic
//!
use slog::Logger;
use std::sync::Arc;
use std::fmt;
use std::collections::HashMap;
use std::cmp::Ordering;
@ -15,11 +18,23 @@ use crate::error::Result;
pub mod internal;
use crate::config::Config;
use crate::db::user::UserData;
pub use internal::{init, Internal};
pub use internal::Internal;
pub struct AccessControl {
pub internal: Internal,
internal: HashMap<RoleIdentifier, Role>,
}
pub fn init(_log: Logger, config: &Config, _env: Arc<lmdb::Environment>)
-> std::result::Result<HashMap<RoleIdentifier, Role>, crate::error::Error>
{
Ok(config.roles.iter().map(|(name, cfg)| {
let id = RoleIdentifier::new(name, "internal");
let parents = cfg.parents.iter().map(|n| RoleIdentifier::new(n, "internal")).collect();
let role = Role::new(parents, cfg.permissions.clone());
(id, role)
}).collect())
}
pub const ADMINPERM: &'static str = "bffh.admin";
@ -28,7 +43,7 @@ pub fn admin_perm() -> &'static Permission {
}
impl AccessControl {
pub fn new(internal: Internal) -> Self {
pub fn new(internal: HashMap<RoleIdentifier, Role>) -> Self {
Self {
internal,
}
@ -39,7 +54,7 @@ impl AccessControl {
}
pub fn dump_roles(&self) -> Result<Vec<(RoleIdentifier, Role)>> {
self.internal.dump_roles()
Ok(self.internal.iter().map(|(k,v)| (k.clone(), v.clone())).collect())
}
}
@ -56,38 +71,6 @@ pub trait RoleDB {
fn get_role(&self, role_id: &RoleIdentifier) -> Result<Option<Role>>;
/// Check if a given user has the given permission
///
/// Default implementation which adapter may overwrite with more efficient specialized
/// implementations.
fn check(&self, user: &UserData, perm: &Permission) -> Result<bool> {
self.check_roles(&user.roles, perm)
}
/// Check if a given permission is granted by any of the given roles or their respective
/// parents
///
/// A Default implementation exists which adapter may overwrite with more efficient specialized
/// implementations.
fn check_roles(&self, roles: &[RoleIdentifier], perm: &Permission) -> Result<bool> {
// Tally all roles. Makes dependent roles easier
let mut roleset = HashMap::new();
for role_id in roles {
self.tally_role(&mut roleset, role_id)?;
}
// Iter all unique role->permissions we've found and early return on match.
for (_roleid, role) in roleset.iter() {
for perm_rule in role.permissions.iter() {
if perm_rule.match_perm(&perm) {
return Ok(true);
}
}
}
return Ok(false);
}
/// Tally a role dependency tree into a set
///
/// A Default implementation exists which adapter may overwrite with more efficient
@ -125,6 +108,16 @@ pub trait RoleDB {
}
}
impl RoleDB for HashMap<RoleIdentifier, Role> {
fn get_type_name(&self) -> &'static str {
"Internal"
}
fn get_role(&self, role_id: &RoleIdentifier) -> Result<Option<Role>> {
Ok(self.get(role_id).cloned())
}
}
/// A "Role" from the Authorization perspective
///
/// You can think of a role as a bundle of permissions relating to other roles. In most cases a
@ -162,6 +155,10 @@ impl Role {
(RoleIdentifier::local_from_str("lmdb".to_string(), key), value)
})))
}
pub fn new(parents: Vec<RoleIdentifier>, permissions: Vec<PermRule>) -> Self {
Self { parents, permissions }
}
}
impl fmt::Display for Role {
@ -209,6 +206,15 @@ pub struct RoleIdentifier {
source: SourceID,
}
impl RoleIdentifier {
pub fn new<>(name: &str, source: &str) -> Self {
Self { name: name.to_string(), source: source.to_string() }
}
pub fn from_strings(name: String, source: String) -> Self {
Self { name, source }
}
}
impl fmt::Display for RoleIdentifier {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/{}", self.name, self.source)

View File

@ -145,11 +145,6 @@ impl RoleDB for Internal {
"Internal"
}
fn check(&self, user: &UserData, perm: &Permission) -> Result<bool> {
let txn = self.env.begin_ro_txn()?;
self._check(&txn, user, &perm)
}
fn get_role(&self, role_id: &RoleIdentifier) -> Result<Option<Role>> {
let txn = self.env.begin_ro_txn()?;
self._get_role(&txn, role_id)

View File

@ -115,11 +115,22 @@ pub fn load_file<P: AsRef<Path>>(path: P) -> Result<HashMap<String, User>> {
let f = fs::read(path)?;
let mut map: HashMap<String, UserData> = toml::from_slice(&f)?;
Ok(HashMap::from_iter(map.drain().map(|(uid, user_data)|
Ok(HashMap::from_iter(map.drain().map(|(uid, mut user_data)| {
user_data.passwd = user_data.passwd.map(|pw| if !pw.starts_with("$argon2") {
let config = argon2::Config::default();
let salt: [u8; 16] = rand::random();
let hash = argon2::hash_encoded(pw.as_bytes(), &salt, &config)
.expect(&format!("Failed to hash password for {}: ", uid));
println!("Hashed pw for {} to {}", uid, hash);
hash
} else {
pw
});
( uid.clone()
, User::new(UserId::new(uid, None, None), user_data)
)
)))
})))
}
pub fn init(log: Logger, _config: &Config, env: Arc<lmdb::Environment>) -> Result<Internal> {

View File

@ -117,7 +117,8 @@ impl Future for Initiator {
debug!(this.log, "Sensor returned a new state");
this.future.take();
let f = this.machine.as_mut().map(|machine| {
machine.request_state_change(user.as_ref(), state)
unimplemented!()
//machine.request_state_change(user.as_ref(), state)
});
this.state_change_fut = f;
}
@ -127,11 +128,11 @@ impl Future for Initiator {
}
}
pub fn load(log: &Logger, client: &AsyncClient, config: &Config) -> Result<(InitMap, Vec<Initiator>)> {
pub fn load(log: &Logger, config: &Config) -> Result<(InitMap, Vec<Initiator>)> {
let mut map = HashMap::new();
let initiators = config.initiators.iter()
.map(|(k,v)| (k, load_single(log, client, k, &v.module, &v.params)))
.map(|(k,v)| (k, load_single(log, k, &v.module, &v.params)))
.filter_map(|(k,n)| match n {
None => None,
Some(i) => Some((k, i)),
@ -149,7 +150,6 @@ pub fn load(log: &Logger, client: &AsyncClient, config: &Config) -> Result<(Init
fn load_single(
log: &Logger,
_client: &AsyncClient,
name: &String,
module_name: &String,
_params: &HashMap<String, String>

View File

@ -29,28 +29,10 @@ use crate::db::user::{User, UserData};
use crate::network::MachineMap;
use crate::space;
// Registry of all machines configured.
// TODO:
// - Serialize machines into config
// - Deserialize machines from config
// - Index machines on deserialization to we can look them up efficiently
// - Maybe store that index too
// - Iterate over all or a subset of machines efficiently
pub struct Machines {
machines: Vec<Machine>
}
impl Machines {
/// Load machines from the config, looking up and linking the database entries as necessary
pub fn load() -> Self {
unimplemented!()
}
pub fn lookup(id: String) -> Option<Machine> {
unimplemented!()
}
}
#[derive(Debug, Clone)]
pub struct Index {
inner: HashMap<String, Machine>,
@ -80,15 +62,13 @@ pub struct Machine {
pub desc: MachineDescription,
inner: Arc<Mutex<Inner>>,
access: Arc<access::AccessControl>,
}
impl Machine {
pub fn new(inner: Inner, desc: MachineDescription, access: Arc<access::AccessControl>) -> Self {
pub fn new(inner: Inner, desc: MachineDescription, ) -> Self {
Self {
id: uuid::Uuid::default(),
inner: Arc::new(Mutex::new(inner)),
access,
desc,
}
}
@ -97,83 +77,25 @@ impl Machine {
( id: MachineIdentifier
, desc: MachineDescription
, state: MachineState
, access: Arc<access::AccessControl>
) -> Machine
{
Self::new(Inner::new(id, state), desc, access)
}
pub fn from_file<P: AsRef<Path>>(path: P, access: Arc<access::AccessControl>)
-> Result<Vec<Machine>>
{
let mut map: HashMap<MachineIdentifier, MachineDescription> = MachineDescription::load_file(path)?;
Ok(map.drain().map(|(id, desc)| {
Self::construct(id, desc, MachineState::new(), access.clone())
}).collect())
}
/// Requests to use a machine. Returns a return token if successful.
///
/// This will update the internal state of the machine, notifying connected actors, if any.
/// The return token is a channel that considers the machine 'returned' if anything is sent
/// along it or if the sending end gets dropped. Anybody who holds this token needs to check if
/// the receiving end was canceled which indicates that the machine has been taken off their
/// hands.
pub fn request_state_change(&self, who: Option<&User>, new_state: MachineState)
-> BoxFuture<'static, Result<ReturnToken>>
{
let this = self.clone();
let perms: Vec<access::PermRule> = who
.map(|u| self.access.collect_permrules(&u.data))
.and_then(|result| result.ok())
.unwrap_or(vec![]);
let write: bool = perms.iter().any(|rule| rule.match_perm(&self.desc.privs.write));
let manage: bool = perms.iter().any(|rule| rule.match_perm(&self.desc.privs.manage));
let f = async move {
match &new_state.state {
Status::Free => {
let mut guard = this.inner.lock().await;
guard.do_state_change(new_state);
return Ok(ReturnToken::new(this.inner.clone()));
},
Status::InUse(_) | Status::ToCheck(_) if manage || write => {
let mut guard = this.inner.lock().await;
guard.do_state_change(new_state);
return Ok(ReturnToken::new(this.inner.clone()))
},
Status::Blocked(_) | Status::Disabled | Status::Reserved(_) if manage => {
let mut guard = this.inner.lock().await;
guard.do_state_change(new_state);
return Ok(ReturnToken::new(this.inner.clone()))
},
_ => {
return Err(Error::Denied);
}
}
};
Box::pin(f)
Self::new(Inner::new(id, state), desc)
}
pub fn do_state_change(&self, new_state: MachineState)
-> BoxFuture<'static, Result<ReturnToken>>
-> BoxFuture<'static, Result<()>>
{
let this = self.clone();
let f = async move {
let mut guard = this.inner.lock().await;
guard.do_state_change(new_state);
return Ok(ReturnToken::new(this.inner.clone()))
return Ok(())
};
Box::pin(f)
}
pub fn create_token(&self) -> ReturnToken {
ReturnToken::new(self.inner.clone())
}
pub async fn get_status(&self) -> Status {
let guard = self.inner.lock().await;
guard.state.get_cloned().state
@ -241,7 +163,9 @@ impl Inner {
}
pub fn do_state_change(&mut self, new_state: MachineState) {
print!("State {:?}", &new_state);
let old_state = self.state.replace(new_state);
print!("<- {:?}", &old_state);
self.reset.replace(old_state);
}
@ -316,7 +240,7 @@ impl MachineDescription {
}
}
pub fn load(config: &crate::config::Config, access: Arc<access::AccessControl>)
pub fn load(config: &crate::config::Config)
-> Result<MachineMap>
{
let mut map = config.machines.clone();
@ -324,7 +248,7 @@ pub fn load(config: &crate::config::Config, access: Arc<access::AccessControl>)
let it = map.drain()
.map(|(k,v)| {
// TODO: Read state from the state db
(v.name.clone(), Machine::construct(k, v, MachineState::new(), access.clone()))
(v.name.clone(), Machine::construct(k, v, MachineState::new()))
});

View File

@ -69,7 +69,6 @@ fn main() {
.help("Dump all databases into the given directory")
.long("dump")
.conflicts_with("load")
.takes_value(true)
)
.arg(Arg::with_name("load")
.help("Load databases from the given directory")
@ -145,6 +144,11 @@ fn maybe(matches: clap::ArgMatches, log: Arc<Logger>) -> Result<(), Error> {
for (id, role) in v.iter() {
info!(log, "Role {}:\n{}", id, role);
}
let v = db.userdb.list_users()?;
for user in v.iter() {
info!(log, "User {}:\n{:?}", user.id, user.data);
}
Ok(())
} else if matches.is_present("load") {
let db = db::Databases::new(&log, &config)?;
@ -158,41 +162,34 @@ fn maybe(matches: clap::ArgMatches, log: Arc<Logger>) -> Result<(), Error> {
debug!(log, "Loaded users: {:?}", map);
dir.pop();
dir.push("roles.toml");
db.access.internal.load_roles(&dir)?;
dir.pop();
dir.push("pass.toml");
db.passdb.load_file(&dir);
dir.pop();
Ok(())
} else {
let ex = Executor::new();
let db = db::Databases::new(&log, &config)?;
let mqtt = AsyncClient::new(config.mqtt_url.clone())?;
let tok = mqtt.connect(paho_mqtt::ConnectOptions::new());
//let mqtt = AsyncClient::new(config.mqtt_url.clone())?;
//let tok = mqtt.connect(paho_mqtt::ConnectOptions::new());
smol::block_on(tok)?;
//smol::block_on(tok)?;
let machines = machine::load(&config, db.access.clone())?;
let (actor_map, actors) = actor::load(&log, &mqtt, &config)?;
let (init_map, initiators) = initiator::load(&log, &mqtt, &config)?;
let machines = machine::load(&config)?;
let (actor_map, actors) = actor::load(&log, &config)?;
let (init_map, initiators) = initiator::load(&log, &config)?;
// TODO: restore connections between initiators, machines, actors
let mut network = network::Network::new(machines, actor_map, init_map);
for (a,b) in config.actor_connections.iter() {
if let Err(e) = network.connect_actor(a,b) {
error!(log, "{}", e);
}
info!(log, "[Actor] Connected {} to {}", a, b);
}
for (a,b) in config.init_connections.iter() {
if let Err(e) = network.connect_init(a,b) {
error!(log, "{}", e);
}
info!(log, "[Initi] Connected {} to {}", a, b);
}
for actor in actors.into_iter() {
@ -202,22 +199,6 @@ fn maybe(matches: clap::ArgMatches, log: Arc<Logger>) -> Result<(), Error> {
ex.spawn(init).detach();
}
let (signal, shutdown) = async_channel::bounded::<()>(1);
let (_, r) = easy_parallel::Parallel::new()
.each(0..4, |_| smol::block_on(ex.run(shutdown.recv())))
.finish(|| {
// TODO: Spawn api connections on their own (non-main) thread, use the main thread to
// handle signals (a cli if stdin is not closed?) and make it stop and clean up all threads
// when bffh should exit
let r = server::serve_api_connections(log.clone(), config, db, network);
// One of them would be enough really, but *shrug*
signal.try_send(());
std::mem::drop(signal);
return r;
});
return r;
server::serve_api_connections(log.clone(), config, db, network, ex)
}
}

View File

@ -17,17 +17,17 @@ pub type InitMap = HashMap<String, Mutable<Option<Machine>>>;
#[derive(Debug, PartialEq, Eq)]
pub enum Error {
NoSuchInitiator,
NoSuchMachine,
NoSuchActor,
NoSuchInitiator(String),
NoSuchMachine(String),
NoSuchActor(String),
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::NoSuchInitiator => write!(f, "No initiator found with that name"),
Error::NoSuchActor => write!(f, "No actor found with that name"),
Error::NoSuchMachine => write!(f, "No machine found with that name"),
Error::NoSuchInitiator(n) => write!(f, "No initiator \"{}\" found.", n),
Error::NoSuchActor(n) => write!(f, "No actor \"{}\" found.", n),
Error::NoSuchMachine(n) => write!(f, "No machine \"{}\" found.", n),
}
}
}
@ -57,9 +57,9 @@ impl Network {
pub fn connect_init(&self, init_key: &String, machine_key: &String) -> Result<()> {
let init = self.inits.get(init_key)
.ok_or(Error::NoSuchInitiator)?;
.ok_or_else(|| Error::NoSuchInitiator(init_key.clone()))?;
let machine = self.machines.get(machine_key)
.ok_or(Error::NoSuchMachine)?;
.ok_or_else(|| Error::NoSuchMachine(machine_key.clone()))?;
init.set(Some(machine.clone()));
Ok(())
@ -69,13 +69,14 @@ impl Network {
-> Result<()>
{
let machine = self.machines.get(machine_key)
.ok_or(Error::NoSuchMachine)?;
.ok_or_else(|| Error::NoSuchMachine(machine_key.clone()))?;
let actor = self.actors.get(actor_key)
.ok_or(Error::NoSuchActor)?;
.ok_or_else(|| Error::NoSuchActor(actor_key.clone()))?;
// FIXME Yeah this should not unwrap. Really, really shoudln't.
let mut guard = actor.try_lock().unwrap();
guard.try_send(Some(Box::new(machine.signal()))).map_err(|_| Error::NoSuchActor.into())
guard.try_send(Some(Box::new(machine.signal())))
.map_err(|_| Error::NoSuchActor(actor_key.clone()).into())
}
}

View File

@ -8,6 +8,7 @@ use crate::connection;
use smol::net::TcpListener;
use smol::net::unix::UnixStream;
use smol::LocalExecutor;
use smol::Executor;
use futures::prelude::*;
@ -22,7 +23,7 @@ use crate::db::Databases;
use crate::network::Network;
/// Handle all API connections and run the RPC tasks spawned from that on the local thread.
pub fn serve_api_connections(log: Arc<Logger>, config: Config, db: Databases, nw: Network)
pub fn serve_api_connections(log: Arc<Logger>, config: Config, db: Databases, nw: Network, ex: Executor)
-> Result<(), Error>
{
let signal = Box::pin(async {
@ -72,13 +73,11 @@ pub fn serve_api_connections(log: Arc<Logger>, config: Config, db: Databases, nw
let inner_log = log.clone();
let loop_log = log.clone();
smol::block_on(local_ex.run(async {
let control_fut = async {
// Generate a stream of TcpStreams appearing on any of the interfaces we listen to
let listeners = listeners_s.await;
let incoming = stream::select_all(listeners.iter().map(|l| l.incoming()));
let mut handler = connection::ConnectionHandler::new(inner_log.new(o!()), db, network.clone());
// For each incoming connection start a new task to handle it
let handle_sockets = incoming.map(|socket| {
// incoming.next() returns an error when the underlying `accept` call yielded an error
@ -94,6 +93,13 @@ pub fn serve_api_connections(log: Arc<Logger>, config: Config, db: Databases, nw
inner_log.new(o!())
};
let db = db.clone();
let network = network.clone();
let tlog = inner_log.new(o!());
std::thread::spawn(move || {
let local_ex = LocalExecutor::new();
let mut handler = connection::ConnectionHandler::new(tlog, db, network);
// We handle the error using map_err
let f = handler.handle(socket)
.map_err(move |e| {
@ -105,7 +111,8 @@ pub fn serve_api_connections(log: Arc<Logger>, config: Config, db: Databases, nw
// Spawn the connection context onto the local executor since it isn't Send
// Also `detach` it so the task isn't canceled as soon as it's dropped.
// TODO: Store all those tasks to have a easier way of managing them?
local_ex.spawn(f).detach();
smol::block_on(f);
});
},
Err(e) => {
error!(inner_log, "Socket `accept` error: {}", e);
@ -147,7 +154,9 @@ pub fn serve_api_connections(log: Arc<Logger>, config: Config, db: Databases, nw
}
}
}
}));
};
smol::block_on(smol::future::race(control_fut, ex.run(smol::future::pending())));
// TODO: Run actual shut down code here
info!(log, "Shutting down...");