diff --git a/src/actor.rs b/src/actor.rs index b6e37f9..c331874 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -1,6 +1,7 @@ use std::pin::Pin; use std::task::{Poll, Context}; use std::sync::Arc; +use std::collections::HashMap; use std::future::Future; use smol::Executor; @@ -14,7 +15,11 @@ use crate::registries::actuators::Actuator; use crate::config::Settings; use crate::error::Result; -pub struct Actor { +use crate::network::ActorMap; + +pub type ActorSignal = Box + Unpin + Send>; + +pub struct Actor { // FIXME: This should really be a Signal. // But, alas, MutableSignalCloned is itself not `Clone`. For good reason as keeping track of // the changes itself happens in a way that Clone won't work (well). @@ -22,15 +27,15 @@ pub struct Actor { // of a task context. In short, using Mutable isn't possible and we would have to write our own // implementation of MutableSignal*'s . Preferably with the correct optimizations for our case // where there is only one consumer. So a mpsc channel that drops all but the last input. - rx: mpsc::Receiver>, - inner: Option, + rx: mpsc::Receiver>, + inner: Option, actuator: Box, future: Option>, } -impl Actor { - pub fn new(rx: mpsc::Receiver>, actuator: Box) -> Self { +impl Actor { + pub fn new(rx: mpsc::Receiver>, actuator: Box) -> Self { Self { rx: rx, inner: None, @@ -39,13 +44,13 @@ impl Actor { } } - pub fn wrap(actuator: Box) -> (mpsc::Sender>, Self) { + pub fn wrap(actuator: Box) -> (mpsc::Sender>, Self) { let (tx, rx) = mpsc::channel(1); (tx, Self::new(rx, actuator)) } } -impl + Unpin> Future for Actor { +impl Future for Actor { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { @@ -87,9 +92,12 @@ impl + Unpin> Future for Actor { } } -pub fn load + Unpin>() -> Result<(mpsc::Sender>, Actor)> { +pub fn load() -> Result<(ActorMap, Vec)> { let d = Box::new(crate::registries::actuators::Dummy); - let a = Actor::wrap(d); + let (tx, a) = Actor::wrap(d); - Ok(a) + let mut map = HashMap::new(); + map.insert("Dummy".to_string(), tx); + + Ok(( map, vec![a] )) } diff --git a/src/error.rs b/src/error.rs index cd74c64..03f57e0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -140,8 +140,8 @@ impl From for Error { } } -impl From for Error { - fn from(e: futures::SpawnError) -> Error { +impl From for Error { + fn from(e: futures_task::SpawnError) -> Error { Error::FuturesSpawn(e) } } diff --git a/src/initiator.rs b/src/initiator.rs index 56c7da2..a5cb2d2 100644 --- a/src/initiator.rs +++ b/src/initiator.rs @@ -1,6 +1,7 @@ use std::pin::Pin; use std::task::{Poll, Context}; use std::future::Future; +use std::collections::HashMap; use smol::{Task, Timer}; use futures::FutureExt; @@ -14,19 +15,22 @@ use crate::db::machine::MachineState; use crate::db::user::{User, UserId, UserData}; use crate::registries::sensors::Sensor; +use crate::network::InitMap; use crate::error::Result; -pub struct Initiator { +type BoxSensor = Box; + +pub struct Initiator { signal: MutableSignalCloned>, machine: Option, future: Option, MachineState)>>, token: Option, - sensor: Box, + sensor: BoxSensor, } -impl Initiator { - pub fn new(sensor: Box, signal: MutableSignalCloned>) -> Self { +impl Initiator { + pub fn new(sensor: BoxSensor, signal: MutableSignalCloned>) -> Self { Self { signal: signal, machine: None, @@ -36,7 +40,7 @@ impl Initiator { } } - pub fn wrap(sensor: Box) -> (Mutable>, Self) { + pub fn wrap(sensor: BoxSensor) -> (Mutable>, Self) { let m = Mutable::new(None); let s = m.signal_cloned(); @@ -44,7 +48,7 @@ impl Initiator { } } -impl Future for Initiator { +impl Future for Initiator { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { @@ -75,9 +79,14 @@ impl Future for Initiator { } } -pub fn load() -> Result<(Mutable>, Initiator)> { +pub fn load() -> Result<(InitMap, Vec)> { let d = Box::new(Dummy::new()); - Ok(Initiator::wrap(d)) + let (m, i) = Initiator::wrap(d); + + let mut map = HashMap::new(); + map.insert("Dummy".to_string(), m); + + Ok((map, vec![i])) } pub struct Dummy { diff --git a/src/machine.rs b/src/machine.rs index 0872098..62ee38a 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -1,4 +1,5 @@ use std::ops::{Deref, DerefMut}; +use std::iter::FromIterator; use std::sync::Arc; use futures_util::lock::Mutex; use std::path::Path; @@ -23,6 +24,8 @@ use crate::db::access; use crate::db::machine::{MachineIdentifier, Status, MachineState}; use crate::db::user::User; +use crate::network::MachineMap; + #[derive(Debug, Clone)] pub struct Index { inner: HashMap, @@ -231,15 +234,15 @@ impl MachineDescription { } } -pub fn load(config: &crate::config::Settings) -> Result> { +pub fn load(config: &crate::config::Settings) -> Result { let mut map = MachineDescription::load_file(&config.machines)?; - Ok(map.drain() + let it = map.drain() .map(|(k,v)| { // TODO: Read state from the state db - Machine::construct(k, v, MachineState::new()) - }) - .collect()) + (v.name.clone(), Machine::construct(k, v, MachineState::new())) + }); + Ok(HashMap::from_iter(it)) } #[cfg(test_DISABLED)] diff --git a/src/main.rs b/src/main.rs index 173ad8a..ce8f4a8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -136,32 +136,25 @@ fn maybe(matches: clap::ArgMatches, log: Arc) -> Result<(), Error> { let ex = Executor::new(); let machines = machine::load(&config)?; - let m = futures_signals::signal::Mutable::new(crate::db::machine::MachineState::new()); - let (mut tx, actor) = actor::load()?; + let (mut actor_map, actors) = actor::load()?; + let (mut init_map, initiators) = initiator::load()?; + + let network = network::Network::new(machines, actor_map, init_map); - let (mut init_machine, initiator) = initiator::load()?; // TODO HERE: restore connections between initiators, machines, actors - // Like so - let m = machines[0].signal(); - tx.try_send(Some(m)).unwrap(); - // TODO HERE: Spawn all actors & inits // Like so - let t = ex.spawn(actor); - let t2 = ex.spawn(initiator); + let actor_tasks = actors.into_iter().map(|actor| ex.spawn(actor)); + let init_tasks = initiators.into_iter().map(|init| ex.spawn(init)); let (signal, shutdown) = async_channel::bounded::<()>(1); easy_parallel::Parallel::new() .each(0..4, |_| smol::block_on(ex.run(shutdown.recv()))) .run(); - smol::block_on(t); - - - let db = db::Databases::new(&log, &config)?; // TODO: Spawn api connections on their own (non-main) thread, use the main thread to // handle signals (a cli if stdin is not closed?) and make it stop and clean up all threads diff --git a/src/network.rs b/src/network.rs index 9b652df..d45fac4 100644 --- a/src/network.rs +++ b/src/network.rs @@ -9,16 +9,17 @@ use futures::channel::mpsc; use futures_signals::signal::{Signal, MutableSignalCloned, Mutable}; use crate::machine::Machine; -use crate::actor::Actor; +use crate::actor::{Actor, ActorSignal}; use crate::initiator::Initiator; use crate::db::machine::MachineState; use crate::error::Result; -type MachineMap = HashMap; -type ActorMap = HashMap>>>; -type InitMap = HashMap>>; +pub type MachineMap = HashMap; +pub type ActorMap = HashMap>>; +pub type InitMap = HashMap>>; +#[derive(Debug, PartialEq, Eq)] pub enum Error { NoSuchInitiator, NoSuchMachine, @@ -39,9 +40,17 @@ impl fmt::Display for Error { /// /// Network as per FRP, not the one with packages and frames pub struct Network { - machines: MachineMap, - actors: ActorMap, inits: InitMap, + + // Store connections + //miconn: Vec<(String, String)>, + + machines: MachineMap, + + // Store connections + //maconn: Vec<(String, String)>, + + actors: ActorMap, } impl Network { @@ -55,15 +64,16 @@ impl Network { let machine = self.machines.get(machine_key) .ok_or(Error::NoSuchMachine)?; - init.set(machine); + init.set(Some(machine.clone())); + Ok(()) } - pub fn connect_actor(&self, machine_key: &String, actor_key: &String) -> Result<()> { + pub fn connect_actor(&mut self, machine_key: &String, actor_key: &String) -> Result<()> { let machine = self.machines.get(machine_key) .ok_or(Error::NoSuchMachine)?; - let actor = self.actors.get(actor_key) + let actor = self.actors.get_mut(actor_key) .ok_or(Error::NoSuchActor)?; - actor.try_send(Some(machine.signal())).map_err(|_| Error::NoSuchActor.into()) + actor.try_send(Some(Box::new(machine.signal()))).map_err(|_| Error::NoSuchActor.into()) } }