Pass Arc<Network> to everywhere

This commit is contained in:
Gregor Reitzenstein 2020-12-15 13:12:22 +01:00
parent e22ed819bb
commit ec20859f6d
6 changed files with 32 additions and 16 deletions

View File

@ -1,6 +1,6 @@
use std::pin::Pin; use std::pin::Pin;
use std::task::{Poll, Context}; use std::task::{Poll, Context};
use std::sync::Arc; use std::sync::{Arc, Mutex};
use std::collections::HashMap; use std::collections::HashMap;
use std::future::Future; use std::future::Future;
@ -141,7 +141,7 @@ pub fn load(log: &Logger, client: &AsyncClient, config: &Config) -> Result<(Acto
let mut v = Vec::new(); let mut v = Vec::new();
for (name, actuator) in actuators { for (name, actuator) in actuators {
let (tx, a) = Actor::wrap(actuator); let (tx, a) = Actor::wrap(actuator);
map.insert(name.clone(), tx); map.insert(name.clone(), Mutex::new(tx));
v.push(a); v.push(a);
} }

View File

@ -11,6 +11,8 @@ use crate::db::Databases;
use crate::builtin; use crate::builtin;
use crate::network::Network;
pub mod auth; pub mod auth;
mod machine; mod machine;
mod machines; mod machines;
@ -21,12 +23,13 @@ use machines::Machines;
pub struct Bootstrap { pub struct Bootstrap {
session: Arc<Session>, session: Arc<Session>,
db: Databases, db: Databases,
nw: Arc<Network>,
} }
impl Bootstrap { impl Bootstrap {
pub fn new(session: Arc<Session>, db: Databases) -> Self { pub fn new(session: Arc<Session>, db: Databases, nw: Arc<Network>) -> Self {
info!(session.log, "Created Bootstrap"); info!(session.log, "Created Bootstrap");
Self { session, db } Self { session, db, nw }
} }
} }
@ -57,7 +60,7 @@ impl connection_capnp::bootstrap::Server for Bootstrap {
mut res: Results<machines_results::Owned> mut res: Results<machines_results::Owned>
) -> Promise<(), capnp::Error> { ) -> Promise<(), capnp::Error> {
// TODO actual permission check and stuff // TODO actual permission check and stuff
let c = capnp_rpc::new_client(Machines::new(self.session.clone(), self.db.clone())); let c = capnp_rpc::new_client(Machines::new(self.session.clone(), self.db.clone(), self.nw.clone()));
res.get().set_machines(c); res.get().set_machines(c);
Promise::ok(()) Promise::ok(())

View File

@ -9,6 +9,8 @@ use crate::connection::Session;
use crate::db::Databases; use crate::db::Databases;
use crate::db::machine::uuid_from_api; use crate::db::machine::uuid_from_api;
use crate::network::Network;
use super::machine::Machine; use super::machine::Machine;
/// An implementation of the `Machines` API /// An implementation of the `Machines` API
@ -18,12 +20,13 @@ pub struct Machines {
session: Arc<Session>, session: Arc<Session>,
db: Databases, db: Databases,
network: Arc<Network>,
} }
impl Machines { impl Machines {
pub fn new(session: Arc<Session>, db: Databases) -> Self { pub fn new(session: Arc<Session>, db: Databases, network: Arc<Network>) -> Self {
info!(session.log, "Machines created"); info!(session.log, "Machines created");
Self { session, db } Self { session, db, network }
} }
} }

View File

@ -18,6 +18,7 @@ use crate::db::Databases;
use crate::db::access::{AccessControl, Permission}; use crate::db::access::{AccessControl, Permission};
use crate::db::user::User; use crate::db::user::User;
use crate::builtin; use crate::builtin;
use crate::network::Network;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
/// Connection context /// Connection context
@ -49,17 +50,18 @@ impl Session {
pub struct ConnectionHandler { pub struct ConnectionHandler {
log: Logger, log: Logger,
db: Databases, db: Databases,
network: Arc<Network>,
} }
impl ConnectionHandler { impl ConnectionHandler {
pub fn new(log: Logger, db: Databases) -> Self { pub fn new(log: Logger, db: Databases, network: Arc<Network>) -> Self {
Self { log, db } Self { log, db, network }
} }
pub fn handle(&mut self, mut stream: TcpStream) -> impl Future<Output=Result<()>> { pub fn handle(&mut self, mut stream: TcpStream) -> impl Future<Output=Result<()>> {
info!(self.log, "New connection from on {:?}", stream); info!(self.log, "New connection from on {:?}", stream);
let session = Arc::new(Session::new(self.log.new(o!()), self.db.access.clone())); let session = Arc::new(Session::new(self.log.new(o!()), self.db.access.clone()));
let boots = Bootstrap::new(session, self.db.clone()); let boots = Bootstrap::new(session, self.db.clone(), self.network.clone());
let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(boots); let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(boots);
let network = twoparty::VatNetwork::new(stream.clone(), stream, let network = twoparty::VatNetwork::new(stream.clone(), stream,

View File

@ -1,6 +1,6 @@
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::{Arc, Mutex, MutexGuard, TryLockResult};
use std::collections::HashMap; use std::collections::HashMap;
use smol::Executor; use smol::Executor;
@ -16,7 +16,7 @@ use crate::db::machine::MachineState;
use crate::error::Result; use crate::error::Result;
pub type MachineMap = HashMap<String, Machine>; pub type MachineMap = HashMap<String, Machine>;
pub type ActorMap = HashMap<String, mpsc::Sender<Option<ActorSignal>>>; pub type ActorMap = HashMap<String, Mutex<mpsc::Sender<Option<ActorSignal>>>>;
pub type InitMap = HashMap<String, Mutable<Option<Machine>>>; pub type InitMap = HashMap<String, Mutable<Option<Machine>>>;
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
@ -69,12 +69,17 @@ impl Network {
Ok(()) Ok(())
} }
pub fn connect_actor(&mut self, machine_key: &String, actor_key: &String) -> Result<()> { pub fn connect_actor(&mut self, machine_key: &String, actor_key: &String)
-> Result<()>
{
let machine = self.machines.get(machine_key) let machine = self.machines.get(machine_key)
.ok_or(Error::NoSuchMachine)?; .ok_or(Error::NoSuchMachine)?;
let actor = self.actors.get_mut(actor_key) let actor = self.actors.get(actor_key)
.ok_or(Error::NoSuchActor)?; .ok_or(Error::NoSuchActor)?;
actor.try_send(Some(Box::new(machine.signal()))).map_err(|_| Error::NoSuchActor.into()) // FIXME Yeah this should not unwrap. Really, really shoudln't.
let mut guard = actor.try_lock().unwrap();
guard.try_send(Some(Box::new(machine.signal()))).map_err(|_| Error::NoSuchActor.into())
} }
} }

View File

@ -25,6 +25,7 @@ use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use crate::db::Databases; use crate::db::Databases;
use crate::network::Network;
/// Handle all API connections and run the RPC tasks spawned from that on the local thread. /// Handle all API connections and run the RPC tasks spawned from that on the local thread.
pub fn serve_api_connections(log: Arc<Logger>, config: Settings, db: Databases, nw: Network) pub fn serve_api_connections(log: Arc<Logger>, config: Settings, db: Databases, nw: Network)
@ -71,6 +72,8 @@ pub fn serve_api_connections(log: Arc<Logger>, config: Settings, db: Databases,
let local_ex = LocalExecutor::new(); let local_ex = LocalExecutor::new();
let network = Arc::new(nw);
let inner_log = log.clone(); let inner_log = log.clone();
let loop_log = log.clone(); let loop_log = log.clone();
@ -79,7 +82,7 @@ pub fn serve_api_connections(log: Arc<Logger>, config: Settings, db: Databases,
let listeners = listeners_s.await; let listeners = listeners_s.await;
let incoming = stream::select_all(listeners.iter().map(|l| l.incoming())); let incoming = stream::select_all(listeners.iter().map(|l| l.incoming()));
let mut handler = connection::ConnectionHandler::new(inner_log.new(o!()), db); let mut handler = connection::ConnectionHandler::new(inner_log.new(o!()), db, network.clone());
// For each incoming connection start a new task to handle it // For each incoming connection start a new task to handle it
let handle_sockets = incoming.map(|socket| { let handle_sockets = incoming.map(|socket| {