From a8af3b287e9773f108368533d8ebd8e14686480a Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein <me@dequbed.space> Date: Mon, 7 Dec 2020 15:58:25 +0100 Subject: [PATCH] Move initialization and recon into network --- src/actor.rs | 28 ++++++++++++++++++---------- src/error.rs | 4 ++-- src/initiator.rs | 25 +++++++++++++++++-------- src/machine.rs | 13 ++++++++----- src/main.rs | 19 ++++++------------- src/network.rs | 30 ++++++++++++++++++++---------- 6 files changed, 71 insertions(+), 48 deletions(-) diff --git a/src/actor.rs b/src/actor.rs index b6e37f9..c331874 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -1,6 +1,7 @@ use std::pin::Pin; use std::task::{Poll, Context}; use std::sync::Arc; +use std::collections::HashMap; use std::future::Future; use smol::Executor; @@ -14,7 +15,11 @@ use crate::registries::actuators::Actuator; use crate::config::Settings; use crate::error::Result; -pub struct Actor<S: Signal> { +use crate::network::ActorMap; + +pub type ActorSignal = Box<dyn Signal<Item=MachineState> + Unpin + Send>; + +pub struct Actor { // FIXME: This should really be a Signal. // But, alas, MutableSignalCloned is itself not `Clone`. For good reason as keeping track of // the changes itself happens in a way that Clone won't work (well). @@ -22,15 +27,15 @@ pub struct Actor<S: Signal> { // of a task context. In short, using Mutable isn't possible and we would have to write our own // implementation of MutableSignal*'s . Preferably with the correct optimizations for our case // where there is only one consumer. So a mpsc channel that drops all but the last input. - rx: mpsc::Receiver<Option<S>>, - inner: Option<S>, + rx: mpsc::Receiver<Option<ActorSignal>>, + inner: Option<ActorSignal>, actuator: Box<dyn Actuator + Send + Sync>, future: Option<BoxFuture<'static, ()>>, } -impl<S: Signal + Unpin> Actor<S> { - pub fn new(rx: mpsc::Receiver<Option<S>>, actuator: Box<dyn Actuator + Send + Sync>) -> Self { +impl Actor { + pub fn new(rx: mpsc::Receiver<Option<ActorSignal>>, actuator: Box<dyn Actuator + Send + Sync>) -> Self { Self { rx: rx, inner: None, @@ -39,13 +44,13 @@ impl<S: Signal + Unpin> Actor<S> { } } - pub fn wrap(actuator: Box<dyn Actuator + Send + Sync>) -> (mpsc::Sender<Option<S>>, Self) { + pub fn wrap(actuator: Box<dyn Actuator + Send + Sync>) -> (mpsc::Sender<Option<ActorSignal>>, Self) { let (tx, rx) = mpsc::channel(1); (tx, Self::new(rx, actuator)) } } -impl<S: Signal<Item=MachineState> + Unpin> Future for Actor<S> { +impl Future for Actor { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { @@ -87,9 +92,12 @@ impl<S: Signal<Item=MachineState> + Unpin> Future for Actor<S> { } } -pub fn load<S: Signal<Item=MachineState> + Unpin>() -> Result<(mpsc::Sender<Option<S>>, Actor<S>)> { +pub fn load() -> Result<(ActorMap, Vec<Actor>)> { let d = Box::new(crate::registries::actuators::Dummy); - let a = Actor::wrap(d); + let (tx, a) = Actor::wrap(d); - Ok(a) + let mut map = HashMap::new(); + map.insert("Dummy".to_string(), tx); + + Ok(( map, vec![a] )) } diff --git a/src/error.rs b/src/error.rs index cd74c64..03f57e0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -140,8 +140,8 @@ impl From<flexbuffers::SerializationError> for Error { } } -impl From<futures::SpawnError> for Error { - fn from(e: futures::SpawnError) -> Error { +impl From<futures_task::SpawnError> for Error { + fn from(e: futures_task::SpawnError) -> Error { Error::FuturesSpawn(e) } } diff --git a/src/initiator.rs b/src/initiator.rs index 56c7da2..a5cb2d2 100644 --- a/src/initiator.rs +++ b/src/initiator.rs @@ -1,6 +1,7 @@ use std::pin::Pin; use std::task::{Poll, Context}; use std::future::Future; +use std::collections::HashMap; use smol::{Task, Timer}; use futures::FutureExt; @@ -14,19 +15,22 @@ use crate::db::machine::MachineState; use crate::db::user::{User, UserId, UserData}; use crate::registries::sensors::Sensor; +use crate::network::InitMap; use crate::error::Result; -pub struct Initiator<S: Sensor> { +type BoxSensor = Box<dyn Sensor + Send>; + +pub struct Initiator { signal: MutableSignalCloned<Option<Machine>>, machine: Option<Machine>, future: Option<BoxFuture<'static, (Option<User>, MachineState)>>, token: Option<ReturnToken>, - sensor: Box<S>, + sensor: BoxSensor, } -impl<S: Sensor> Initiator<S> { - pub fn new(sensor: Box<S>, signal: MutableSignalCloned<Option<Machine>>) -> Self { +impl Initiator { + pub fn new(sensor: BoxSensor, signal: MutableSignalCloned<Option<Machine>>) -> Self { Self { signal: signal, machine: None, @@ -36,7 +40,7 @@ impl<S: Sensor> Initiator<S> { } } - pub fn wrap(sensor: Box<S>) -> (Mutable<Option<Machine>>, Self) { + pub fn wrap(sensor: BoxSensor) -> (Mutable<Option<Machine>>, Self) { let m = Mutable::new(None); let s = m.signal_cloned(); @@ -44,7 +48,7 @@ impl<S: Sensor> Initiator<S> { } } -impl<S: Sensor> Future for Initiator<S> { +impl Future for Initiator { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { @@ -75,9 +79,14 @@ impl<S: Sensor> Future for Initiator<S> { } } -pub fn load() -> Result<(Mutable<Option<Machine>>, Initiator<Dummy>)> { +pub fn load() -> Result<(InitMap, Vec<Initiator>)> { let d = Box::new(Dummy::new()); - Ok(Initiator::wrap(d)) + let (m, i) = Initiator::wrap(d); + + let mut map = HashMap::new(); + map.insert("Dummy".to_string(), m); + + Ok((map, vec![i])) } pub struct Dummy { diff --git a/src/machine.rs b/src/machine.rs index 0872098..62ee38a 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -1,4 +1,5 @@ use std::ops::{Deref, DerefMut}; +use std::iter::FromIterator; use std::sync::Arc; use futures_util::lock::Mutex; use std::path::Path; @@ -23,6 +24,8 @@ use crate::db::access; use crate::db::machine::{MachineIdentifier, Status, MachineState}; use crate::db::user::User; +use crate::network::MachineMap; + #[derive(Debug, Clone)] pub struct Index { inner: HashMap<String, Machine>, @@ -231,15 +234,15 @@ impl MachineDescription { } } -pub fn load(config: &crate::config::Settings) -> Result<Vec<Machine>> { +pub fn load(config: &crate::config::Settings) -> Result<MachineMap> { let mut map = MachineDescription::load_file(&config.machines)?; - Ok(map.drain() + let it = map.drain() .map(|(k,v)| { // TODO: Read state from the state db - Machine::construct(k, v, MachineState::new()) - }) - .collect()) + (v.name.clone(), Machine::construct(k, v, MachineState::new())) + }); + Ok(HashMap::from_iter(it)) } #[cfg(test_DISABLED)] diff --git a/src/main.rs b/src/main.rs index 173ad8a..ce8f4a8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -136,32 +136,25 @@ fn maybe(matches: clap::ArgMatches, log: Arc<Logger>) -> Result<(), Error> { let ex = Executor::new(); let machines = machine::load(&config)?; - let m = futures_signals::signal::Mutable::new(crate::db::machine::MachineState::new()); - let (mut tx, actor) = actor::load()?; + let (mut actor_map, actors) = actor::load()?; + let (mut init_map, initiators) = initiator::load()?; + + let network = network::Network::new(machines, actor_map, init_map); - let (mut init_machine, initiator) = initiator::load()?; // TODO HERE: restore connections between initiators, machines, actors - // Like so - let m = machines[0].signal(); - tx.try_send(Some(m)).unwrap(); - // TODO HERE: Spawn all actors & inits // Like so - let t = ex.spawn(actor); - let t2 = ex.spawn(initiator); + let actor_tasks = actors.into_iter().map(|actor| ex.spawn(actor)); + let init_tasks = initiators.into_iter().map(|init| ex.spawn(init)); let (signal, shutdown) = async_channel::bounded::<()>(1); easy_parallel::Parallel::new() .each(0..4, |_| smol::block_on(ex.run(shutdown.recv()))) .run(); - smol::block_on(t); - - - let db = db::Databases::new(&log, &config)?; // TODO: Spawn api connections on their own (non-main) thread, use the main thread to // handle signals (a cli if stdin is not closed?) and make it stop and clean up all threads diff --git a/src/network.rs b/src/network.rs index 9b652df..d45fac4 100644 --- a/src/network.rs +++ b/src/network.rs @@ -9,16 +9,17 @@ use futures::channel::mpsc; use futures_signals::signal::{Signal, MutableSignalCloned, Mutable}; use crate::machine::Machine; -use crate::actor::Actor; +use crate::actor::{Actor, ActorSignal}; use crate::initiator::Initiator; use crate::db::machine::MachineState; use crate::error::Result; -type MachineMap = HashMap<String, Machine>; -type ActorMap = HashMap<String, mpsc::Sender<Option<MutableSignalCloned<MachineState>>>>; -type InitMap = HashMap<String, Mutable<Option<Machine>>>; +pub type MachineMap = HashMap<String, Machine>; +pub type ActorMap = HashMap<String, mpsc::Sender<Option<ActorSignal>>>; +pub type InitMap = HashMap<String, Mutable<Option<Machine>>>; +#[derive(Debug, PartialEq, Eq)] pub enum Error { NoSuchInitiator, NoSuchMachine, @@ -39,9 +40,17 @@ impl fmt::Display for Error { /// /// Network as per FRP, not the one with packages and frames pub struct Network { - machines: MachineMap, - actors: ActorMap, inits: InitMap, + + // Store connections + //miconn: Vec<(String, String)>, + + machines: MachineMap, + + // Store connections + //maconn: Vec<(String, String)>, + + actors: ActorMap, } impl Network { @@ -55,15 +64,16 @@ impl Network { let machine = self.machines.get(machine_key) .ok_or(Error::NoSuchMachine)?; - init.set(machine); + init.set(Some(machine.clone())); + Ok(()) } - pub fn connect_actor(&self, machine_key: &String, actor_key: &String) -> Result<()> { + pub fn connect_actor(&mut self, machine_key: &String, actor_key: &String) -> Result<()> { let machine = self.machines.get(machine_key) .ok_or(Error::NoSuchMachine)?; - let actor = self.actors.get(actor_key) + let actor = self.actors.get_mut(actor_key) .ok_or(Error::NoSuchActor)?; - actor.try_send(Some(machine.signal())).map_err(|_| Error::NoSuchActor.into()) + actor.try_send(Some(Box::new(machine.signal()))).map_err(|_| Error::NoSuchActor.into()) } }