Move initialization and recon into network

This commit is contained in:
Gregor Reitzenstein 2020-12-07 15:58:25 +01:00
parent 81ea99405c
commit a8af3b287e
6 changed files with 71 additions and 48 deletions

View File

@ -1,6 +1,7 @@
use std::pin::Pin; use std::pin::Pin;
use std::task::{Poll, Context}; use std::task::{Poll, Context};
use std::sync::Arc; use std::sync::Arc;
use std::collections::HashMap;
use std::future::Future; use std::future::Future;
use smol::Executor; use smol::Executor;
@ -14,7 +15,11 @@ use crate::registries::actuators::Actuator;
use crate::config::Settings; use crate::config::Settings;
use crate::error::Result; use crate::error::Result;
pub struct Actor<S: Signal> { use crate::network::ActorMap;
pub type ActorSignal = Box<dyn Signal<Item=MachineState> + Unpin + Send>;
pub struct Actor {
// FIXME: This should really be a Signal. // FIXME: This should really be a Signal.
// But, alas, MutableSignalCloned is itself not `Clone`. For good reason as keeping track of // But, alas, MutableSignalCloned is itself not `Clone`. For good reason as keeping track of
// the changes itself happens in a way that Clone won't work (well). // the changes itself happens in a way that Clone won't work (well).
@ -22,15 +27,15 @@ pub struct Actor<S: Signal> {
// of a task context. In short, using Mutable isn't possible and we would have to write our own // of a task context. In short, using Mutable isn't possible and we would have to write our own
// implementation of MutableSignal*'s . Preferably with the correct optimizations for our case // implementation of MutableSignal*'s . Preferably with the correct optimizations for our case
// where there is only one consumer. So a mpsc channel that drops all but the last input. // where there is only one consumer. So a mpsc channel that drops all but the last input.
rx: mpsc::Receiver<Option<S>>, rx: mpsc::Receiver<Option<ActorSignal>>,
inner: Option<S>, inner: Option<ActorSignal>,
actuator: Box<dyn Actuator + Send + Sync>, actuator: Box<dyn Actuator + Send + Sync>,
future: Option<BoxFuture<'static, ()>>, future: Option<BoxFuture<'static, ()>>,
} }
impl<S: Signal + Unpin> Actor<S> { impl Actor {
pub fn new(rx: mpsc::Receiver<Option<S>>, actuator: Box<dyn Actuator + Send + Sync>) -> Self { pub fn new(rx: mpsc::Receiver<Option<ActorSignal>>, actuator: Box<dyn Actuator + Send + Sync>) -> Self {
Self { Self {
rx: rx, rx: rx,
inner: None, inner: None,
@ -39,13 +44,13 @@ impl<S: Signal + Unpin> Actor<S> {
} }
} }
pub fn wrap(actuator: Box<dyn Actuator + Send + Sync>) -> (mpsc::Sender<Option<S>>, Self) { pub fn wrap(actuator: Box<dyn Actuator + Send + Sync>) -> (mpsc::Sender<Option<ActorSignal>>, Self) {
let (tx, rx) = mpsc::channel(1); let (tx, rx) = mpsc::channel(1);
(tx, Self::new(rx, actuator)) (tx, Self::new(rx, actuator))
} }
} }
impl<S: Signal<Item=MachineState> + Unpin> Future for Actor<S> { impl Future for Actor {
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
@ -87,9 +92,12 @@ impl<S: Signal<Item=MachineState> + Unpin> Future for Actor<S> {
} }
} }
pub fn load<S: Signal<Item=MachineState> + Unpin>() -> Result<(mpsc::Sender<Option<S>>, Actor<S>)> { pub fn load() -> Result<(ActorMap, Vec<Actor>)> {
let d = Box::new(crate::registries::actuators::Dummy); let d = Box::new(crate::registries::actuators::Dummy);
let a = Actor::wrap(d); let (tx, a) = Actor::wrap(d);
Ok(a) let mut map = HashMap::new();
map.insert("Dummy".to_string(), tx);
Ok(( map, vec![a] ))
} }

View File

@ -140,8 +140,8 @@ impl From<flexbuffers::SerializationError> for Error {
} }
} }
impl From<futures::SpawnError> for Error { impl From<futures_task::SpawnError> for Error {
fn from(e: futures::SpawnError) -> Error { fn from(e: futures_task::SpawnError) -> Error {
Error::FuturesSpawn(e) Error::FuturesSpawn(e)
} }
} }

View File

@ -1,6 +1,7 @@
use std::pin::Pin; use std::pin::Pin;
use std::task::{Poll, Context}; use std::task::{Poll, Context};
use std::future::Future; use std::future::Future;
use std::collections::HashMap;
use smol::{Task, Timer}; use smol::{Task, Timer};
use futures::FutureExt; use futures::FutureExt;
@ -14,19 +15,22 @@ use crate::db::machine::MachineState;
use crate::db::user::{User, UserId, UserData}; use crate::db::user::{User, UserId, UserData};
use crate::registries::sensors::Sensor; use crate::registries::sensors::Sensor;
use crate::network::InitMap;
use crate::error::Result; use crate::error::Result;
pub struct Initiator<S: Sensor> { type BoxSensor = Box<dyn Sensor + Send>;
pub struct Initiator {
signal: MutableSignalCloned<Option<Machine>>, signal: MutableSignalCloned<Option<Machine>>,
machine: Option<Machine>, machine: Option<Machine>,
future: Option<BoxFuture<'static, (Option<User>, MachineState)>>, future: Option<BoxFuture<'static, (Option<User>, MachineState)>>,
token: Option<ReturnToken>, token: Option<ReturnToken>,
sensor: Box<S>, sensor: BoxSensor,
} }
impl<S: Sensor> Initiator<S> { impl Initiator {
pub fn new(sensor: Box<S>, signal: MutableSignalCloned<Option<Machine>>) -> Self { pub fn new(sensor: BoxSensor, signal: MutableSignalCloned<Option<Machine>>) -> Self {
Self { Self {
signal: signal, signal: signal,
machine: None, machine: None,
@ -36,7 +40,7 @@ impl<S: Sensor> Initiator<S> {
} }
} }
pub fn wrap(sensor: Box<S>) -> (Mutable<Option<Machine>>, Self) { pub fn wrap(sensor: BoxSensor) -> (Mutable<Option<Machine>>, Self) {
let m = Mutable::new(None); let m = Mutable::new(None);
let s = m.signal_cloned(); let s = m.signal_cloned();
@ -44,7 +48,7 @@ impl<S: Sensor> Initiator<S> {
} }
} }
impl<S: Sensor> Future for Initiator<S> { impl Future for Initiator {
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
@ -75,9 +79,14 @@ impl<S: Sensor> Future for Initiator<S> {
} }
} }
pub fn load() -> Result<(Mutable<Option<Machine>>, Initiator<Dummy>)> { pub fn load() -> Result<(InitMap, Vec<Initiator>)> {
let d = Box::new(Dummy::new()); let d = Box::new(Dummy::new());
Ok(Initiator::wrap(d)) let (m, i) = Initiator::wrap(d);
let mut map = HashMap::new();
map.insert("Dummy".to_string(), m);
Ok((map, vec![i]))
} }
pub struct Dummy { pub struct Dummy {

View File

@ -1,4 +1,5 @@
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::iter::FromIterator;
use std::sync::Arc; use std::sync::Arc;
use futures_util::lock::Mutex; use futures_util::lock::Mutex;
use std::path::Path; use std::path::Path;
@ -23,6 +24,8 @@ use crate::db::access;
use crate::db::machine::{MachineIdentifier, Status, MachineState}; use crate::db::machine::{MachineIdentifier, Status, MachineState};
use crate::db::user::User; use crate::db::user::User;
use crate::network::MachineMap;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Index { pub struct Index {
inner: HashMap<String, Machine>, inner: HashMap<String, Machine>,
@ -231,15 +234,15 @@ impl MachineDescription {
} }
} }
pub fn load(config: &crate::config::Settings) -> Result<Vec<Machine>> { pub fn load(config: &crate::config::Settings) -> Result<MachineMap> {
let mut map = MachineDescription::load_file(&config.machines)?; let mut map = MachineDescription::load_file(&config.machines)?;
Ok(map.drain() let it = map.drain()
.map(|(k,v)| { .map(|(k,v)| {
// TODO: Read state from the state db // TODO: Read state from the state db
Machine::construct(k, v, MachineState::new()) (v.name.clone(), Machine::construct(k, v, MachineState::new()))
}) });
.collect()) Ok(HashMap::from_iter(it))
} }
#[cfg(test_DISABLED)] #[cfg(test_DISABLED)]

View File

@ -136,32 +136,25 @@ fn maybe(matches: clap::ArgMatches, log: Arc<Logger>) -> Result<(), Error> {
let ex = Executor::new(); let ex = Executor::new();
let machines = machine::load(&config)?; let machines = machine::load(&config)?;
let m = futures_signals::signal::Mutable::new(crate::db::machine::MachineState::new()); let (mut actor_map, actors) = actor::load()?;
let (mut tx, actor) = actor::load()?; let (mut init_map, initiators) = initiator::load()?;
let network = network::Network::new(machines, actor_map, init_map);
let (mut init_machine, initiator) = initiator::load()?;
// TODO HERE: restore connections between initiators, machines, actors // TODO HERE: restore connections between initiators, machines, actors
// Like so
let m = machines[0].signal();
tx.try_send(Some(m)).unwrap();
// TODO HERE: Spawn all actors & inits // TODO HERE: Spawn all actors & inits
// Like so // Like so
let t = ex.spawn(actor); let actor_tasks = actors.into_iter().map(|actor| ex.spawn(actor));
let t2 = ex.spawn(initiator); let init_tasks = initiators.into_iter().map(|init| ex.spawn(init));
let (signal, shutdown) = async_channel::bounded::<()>(1); let (signal, shutdown) = async_channel::bounded::<()>(1);
easy_parallel::Parallel::new() easy_parallel::Parallel::new()
.each(0..4, |_| smol::block_on(ex.run(shutdown.recv()))) .each(0..4, |_| smol::block_on(ex.run(shutdown.recv())))
.run(); .run();
smol::block_on(t);
let db = db::Databases::new(&log, &config)?; let db = db::Databases::new(&log, &config)?;
// TODO: Spawn api connections on their own (non-main) thread, use the main thread to // TODO: Spawn api connections on their own (non-main) thread, use the main thread to
// handle signals (a cli if stdin is not closed?) and make it stop and clean up all threads // handle signals (a cli if stdin is not closed?) and make it stop and clean up all threads

View File

@ -9,16 +9,17 @@ use futures::channel::mpsc;
use futures_signals::signal::{Signal, MutableSignalCloned, Mutable}; use futures_signals::signal::{Signal, MutableSignalCloned, Mutable};
use crate::machine::Machine; use crate::machine::Machine;
use crate::actor::Actor; use crate::actor::{Actor, ActorSignal};
use crate::initiator::Initiator; use crate::initiator::Initiator;
use crate::db::machine::MachineState; use crate::db::machine::MachineState;
use crate::error::Result; use crate::error::Result;
type MachineMap = HashMap<String, Machine>; pub type MachineMap = HashMap<String, Machine>;
type ActorMap = HashMap<String, mpsc::Sender<Option<MutableSignalCloned<MachineState>>>>; pub type ActorMap = HashMap<String, mpsc::Sender<Option<ActorSignal>>>;
type InitMap = HashMap<String, Mutable<Option<Machine>>>; pub type InitMap = HashMap<String, Mutable<Option<Machine>>>;
#[derive(Debug, PartialEq, Eq)]
pub enum Error { pub enum Error {
NoSuchInitiator, NoSuchInitiator,
NoSuchMachine, NoSuchMachine,
@ -39,9 +40,17 @@ impl fmt::Display for Error {
/// ///
/// Network as per FRP, not the one with packages and frames /// Network as per FRP, not the one with packages and frames
pub struct Network { pub struct Network {
machines: MachineMap,
actors: ActorMap,
inits: InitMap, inits: InitMap,
// Store connections
//miconn: Vec<(String, String)>,
machines: MachineMap,
// Store connections
//maconn: Vec<(String, String)>,
actors: ActorMap,
} }
impl Network { impl Network {
@ -55,15 +64,16 @@ impl Network {
let machine = self.machines.get(machine_key) let machine = self.machines.get(machine_key)
.ok_or(Error::NoSuchMachine)?; .ok_or(Error::NoSuchMachine)?;
init.set(machine); init.set(Some(machine.clone()));
Ok(())
} }
pub fn connect_actor(&self, machine_key: &String, actor_key: &String) -> Result<()> { pub fn connect_actor(&mut self, machine_key: &String, actor_key: &String) -> Result<()> {
let machine = self.machines.get(machine_key) let machine = self.machines.get(machine_key)
.ok_or(Error::NoSuchMachine)?; .ok_or(Error::NoSuchMachine)?;
let actor = self.actors.get(actor_key) let actor = self.actors.get_mut(actor_key)
.ok_or(Error::NoSuchActor)?; .ok_or(Error::NoSuchActor)?;
actor.try_send(Some(machine.signal())).map_err(|_| Error::NoSuchActor.into()) actor.try_send(Some(Box::new(machine.signal()))).map_err(|_| Error::NoSuchActor.into())
} }
} }