diff --git a/src/actor.rs b/src/actor.rs index 25ec244..fdbff1c 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -1,6 +1,6 @@ use std::pin::Pin; use std::task::{Poll, Context}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::collections::HashMap; use std::future::Future; @@ -141,7 +141,7 @@ pub fn load(log: &Logger, client: &AsyncClient, config: &Config) -> Result<(Acto let mut v = Vec::new(); for (name, actuator) in actuators { let (tx, a) = Actor::wrap(actuator); - map.insert(name.clone(), tx); + map.insert(name.clone(), Mutex::new(tx)); v.push(a); } diff --git a/src/api.rs b/src/api.rs index 279ab19..70604b7 100644 --- a/src/api.rs +++ b/src/api.rs @@ -11,6 +11,8 @@ use crate::db::Databases; use crate::builtin; +use crate::network::Network; + pub mod auth; mod machine; mod machines; @@ -21,12 +23,13 @@ use machines::Machines; pub struct Bootstrap { session: Arc, db: Databases, + nw: Arc, } impl Bootstrap { - pub fn new(session: Arc, db: Databases) -> Self { + pub fn new(session: Arc, db: Databases, nw: Arc) -> Self { info!(session.log, "Created Bootstrap"); - Self { session, db } + Self { session, db, nw } } } @@ -57,7 +60,7 @@ impl connection_capnp::bootstrap::Server for Bootstrap { mut res: Results ) -> Promise<(), capnp::Error> { // TODO actual permission check and stuff - let c = capnp_rpc::new_client(Machines::new(self.session.clone(), self.db.clone())); + let c = capnp_rpc::new_client(Machines::new(self.session.clone(), self.db.clone(), self.nw.clone())); res.get().set_machines(c); Promise::ok(()) diff --git a/src/api/machines.rs b/src/api/machines.rs index 285f0cf..76220c2 100644 --- a/src/api/machines.rs +++ b/src/api/machines.rs @@ -9,6 +9,8 @@ use crate::connection::Session; use crate::db::Databases; use crate::db::machine::uuid_from_api; +use crate::network::Network; + use super::machine::Machine; /// An implementation of the `Machines` API @@ -18,12 +20,13 @@ pub struct Machines { session: Arc, db: Databases, + network: Arc, } impl Machines { - pub fn new(session: Arc, db: Databases) -> Self { + pub fn new(session: Arc, db: Databases, network: Arc) -> Self { info!(session.log, "Machines created"); - Self { session, db } + Self { session, db, network } } } diff --git a/src/connection.rs b/src/connection.rs index 4f4961e..7022c7d 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -18,6 +18,7 @@ use crate::db::Databases; use crate::db::access::{AccessControl, Permission}; use crate::db::user::User; use crate::builtin; +use crate::network::Network; #[derive(Debug, Clone)] /// Connection context @@ -49,17 +50,18 @@ impl Session { pub struct ConnectionHandler { log: Logger, db: Databases, + network: Arc, } impl ConnectionHandler { - pub fn new(log: Logger, db: Databases) -> Self { - Self { log, db } + pub fn new(log: Logger, db: Databases, network: Arc) -> Self { + Self { log, db, network } } pub fn handle(&mut self, mut stream: TcpStream) -> impl Future> { info!(self.log, "New connection from on {:?}", stream); let session = Arc::new(Session::new(self.log.new(o!()), self.db.access.clone())); - let boots = Bootstrap::new(session, self.db.clone()); + let boots = Bootstrap::new(session, self.db.clone(), self.network.clone()); let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(boots); let network = twoparty::VatNetwork::new(stream.clone(), stream, diff --git a/src/network.rs b/src/network.rs index 3e93bf4..65f8403 100644 --- a/src/network.rs +++ b/src/network.rs @@ -1,6 +1,6 @@ use std::fmt; -use std::sync::Arc; +use std::sync::{Arc, Mutex, MutexGuard, TryLockResult}; use std::collections::HashMap; use smol::Executor; @@ -16,7 +16,7 @@ use crate::db::machine::MachineState; use crate::error::Result; pub type MachineMap = HashMap; -pub type ActorMap = HashMap>>; +pub type ActorMap = HashMap>>>; pub type InitMap = HashMap>>; #[derive(Debug, PartialEq, Eq)] @@ -69,12 +69,17 @@ impl Network { Ok(()) } - pub fn connect_actor(&mut self, machine_key: &String, actor_key: &String) -> Result<()> { + pub fn connect_actor(&mut self, machine_key: &String, actor_key: &String) + -> Result<()> + { let machine = self.machines.get(machine_key) .ok_or(Error::NoSuchMachine)?; - let actor = self.actors.get_mut(actor_key) + let actor = self.actors.get(actor_key) .ok_or(Error::NoSuchActor)?; - actor.try_send(Some(Box::new(machine.signal()))).map_err(|_| Error::NoSuchActor.into()) + // FIXME Yeah this should not unwrap. Really, really shoudln't. + let mut guard = actor.try_lock().unwrap(); + + guard.try_send(Some(Box::new(machine.signal()))).map_err(|_| Error::NoSuchActor.into()) } } diff --git a/src/server.rs b/src/server.rs index 4828733..252b1a8 100644 --- a/src/server.rs +++ b/src/server.rs @@ -25,6 +25,7 @@ use std::str::FromStr; use std::sync::Arc; use crate::db::Databases; +use crate::network::Network; /// Handle all API connections and run the RPC tasks spawned from that on the local thread. pub fn serve_api_connections(log: Arc, config: Settings, db: Databases, nw: Network) @@ -71,6 +72,8 @@ pub fn serve_api_connections(log: Arc, config: Settings, db: Databases, let local_ex = LocalExecutor::new(); + let network = Arc::new(nw); + let inner_log = log.clone(); let loop_log = log.clone(); @@ -79,7 +82,7 @@ pub fn serve_api_connections(log: Arc, config: Settings, db: Databases, let listeners = listeners_s.await; let incoming = stream::select_all(listeners.iter().map(|l| l.incoming())); - let mut handler = connection::ConnectionHandler::new(inner_log.new(o!()), db); + let mut handler = connection::ConnectionHandler::new(inner_log.new(o!()), db, network.clone()); // For each incoming connection start a new task to handle it let handle_sockets = incoming.map(|socket| {