From 5bd5cd57df302ff481ac8dee4f922403c83701cc Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Thu, 17 Sep 2020 10:18:02 +0200 Subject: [PATCH 01/18] Using events --- src/machine.rs | 2 ++ src/network.rs | 15 ++++++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/machine.rs b/src/machine.rs index d7fb90e..4ad62ed 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -22,6 +22,8 @@ use smol::channel::{Receiver, Sender}; use futures_signals::signal::*; +pub type ID = Uuid; + /// Status of a Machine #[derive(Clone, Copy, PartialEq, Eq, Debug, Serialize, Deserialize)] pub enum Status { diff --git a/src/network.rs b/src/network.rs index 5e4559d..9a72bfd 100644 --- a/src/network.rs +++ b/src/network.rs @@ -1,5 +1,8 @@ use futures_signals::signal::Signal; +use crate::machine; +use crate::access; + struct Network { } @@ -24,10 +27,12 @@ impl Network { } } +/// The internal bffh event type +/// +/// Everything that BFFH considers an event is contained in an instance of this. +#[derive(PartialEq, Eq, Clone, PartialOrd, Ord, Debug)] enum Event { - -} - -trait Filter { - fn filter(&self, f: Fn(&S) -> bool); + /// An user wants to use a machine + // TODO: Define /what/ an user wants to do with said machine? + MachineRequest(machine::ID, access::UserIdentifer), } From 2686ea112ff37671525de53f88e8a25942477539 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Thu, 17 Sep 2020 10:51:51 +0200 Subject: [PATCH 02/18] Refines machines --- src/access.rs | 6 +++--- src/machine.rs | 35 ++++++++++++++++++++++++++++++++--- src/network.rs | 2 +- 3 files changed, 36 insertions(+), 7 deletions(-) diff --git a/src/access.rs b/src/access.rs index bdcee5a..cef7470 100644 --- a/src/access.rs +++ b/src/access.rs @@ -19,9 +19,9 @@ use crate::config::Settings; use crate::error::Result; // FIXME: fabinfra/fabaccess/bffh#3 -type UserIdentifier = u64; -type RoleIdentifier = u64; -type PermIdentifier = u64; +pub type UserIdentifier = u64; +pub type RoleIdentifier = u64; +pub type PermIdentifier = u64; pub struct PermissionsProvider { log: Logger, diff --git a/src/machine.rs b/src/machine.rs index 4ad62ed..c60e369 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -11,6 +11,7 @@ use smol::lock::RwLock; use crate::error::Result; use crate::config::Settings; +use crate::access; use capnp::Error; @@ -26,6 +27,7 @@ pub type ID = Uuid; /// Status of a Machine #[derive(Clone, Copy, PartialEq, Eq, Debug, Serialize, Deserialize)] +#[repr(u8)] pub enum Status { /// Not currently used by anybody Free, @@ -100,11 +102,16 @@ impl MachineManager { /// machine, checking that the user who wants the machine (de)activated has the required /// permissions. pub struct Machine { + /// Computer-readable identifier for this machine + // Implicit in database since it's the key. + #[serde(skip)] + id: ID, + /// The human-readable name of the machine. Does not need to be unique name: String, /// The required permission to use this machine. - perm: String, + perm: access::PermIdentifier, /// The state of the machine as bffh thinks the machine *should* be in. /// @@ -114,8 +121,9 @@ pub struct Machine { } impl Machine { - pub fn new(name: String, perm: String) -> Machine { + pub fn new(id: Uuid, name: String, perm: access::PermIdentifier) -> Machine { Machine { + id: id, name: name, perm: perm, state: Mutable::new(Status::Free), @@ -133,6 +141,24 @@ impl Machine { pub fn signal(&self) -> impl Signal { self.state.signal().dedupe() } + + /// Requests to use a machine. Returns `true` if successful. + /// + /// This will update the internal state of the machine, notifying connected actors, if any. + pub fn request_use + ( &mut self + , txn: &T + , pp: &access::PermissionsProvider + , who: access::UserIdentifier + ) -> Result + { + if pp.check(txn, who, self.perm)? { + self.state.set(Status::Occupied); + return Ok(true); + } else { + return Ok(false); + } + } } pub struct MachineDB { @@ -149,7 +175,10 @@ impl MachineDB { { match txn.get(self.db, &uuid.as_bytes()) { Ok(bytes) => { - Ok(Some(flexbuffers::from_slice(bytes)?)) + let mut machine: Machine = flexbuffers::from_slice(bytes)?; + machine.id = uuid; + + Ok(Some(machine)) }, Err(lmdb::Error::NotFound) => { Ok(None) }, Err(e) => { Err(e.into()) }, diff --git a/src/network.rs b/src/network.rs index 9a72bfd..a93b8b0 100644 --- a/src/network.rs +++ b/src/network.rs @@ -34,5 +34,5 @@ impl Network { enum Event { /// An user wants to use a machine // TODO: Define /what/ an user wants to do with said machine? - MachineRequest(machine::ID, access::UserIdentifer), + MachineRequest(machine::ID, access::UserIdentifier), } From 406cebadb89f1eb3fa8bc98e9c9e684ae2265398 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Thu, 17 Sep 2020 11:20:55 +0200 Subject: [PATCH 03/18] Sensor implementation draft --- src/registries/sensors.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 src/registries/sensors.rs diff --git a/src/registries/sensors.rs b/src/registries/sensors.rs new file mode 100644 index 0000000..3508b2e --- /dev/null +++ b/src/registries/sensors.rs @@ -0,0 +1,16 @@ +use futures::{Future, Stream}; + +pub struct Sensors { + +} + + +// Implementing Sensors. +// +// Given the coroutine/task split stays as it is - Sensor input to machine update being one, +// machine update signal to actor doing thing being another, a Sensor implementation would send a +// Stream of futures - each future being an atomic Machine update. +#[async_trait] +pub trait Sensor: Stream>> { + async fn setup() -> Self; +} From fc1480314f4f92ac266cf5332992e13c1610c05f Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Thu, 17 Sep 2020 11:43:43 +0200 Subject: [PATCH 04/18] Adds Sensors registry --- src/registries.rs | 1 + src/registries/sensors.rs | 55 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/src/registries.rs b/src/registries.rs index e5d218c..bf7d371 100644 --- a/src/registries.rs +++ b/src/registries.rs @@ -1,4 +1,5 @@ mod actuators; +mod sensors; pub use actuators::{Actuator, ActBox}; diff --git a/src/registries/sensors.rs b/src/registries/sensors.rs index 3508b2e..25c41ee 100644 --- a/src/registries/sensors.rs +++ b/src/registries/sensors.rs @@ -1,9 +1,19 @@ +use std::pin::Pin; +use futures::task::{Context, Poll}; use futures::{Future, Stream}; +use futures::future::BoxFuture; + +use std::sync::Arc; +use smol::lock::RwLock; +use std::collections::HashMap; pub struct Sensors { - + inner: Arc>, } +pub type SensBox<'a> = Box>; +type Inner = HashMap>; + // Implementing Sensors. // @@ -11,6 +21,45 @@ pub struct Sensors { // machine update signal to actor doing thing being another, a Sensor implementation would send a // Stream of futures - each future being an atomic Machine update. #[async_trait] -pub trait Sensor: Stream>> { - async fn setup() -> Self; +/// BFFH Sensor +/// +/// A sensor is anything that can forward an intent of an user to do something to bffh. +/// This may be a card reader connected to a machine, a website allowing users to select a machine +/// they want to use or something like QRHello +pub trait Sensor<'a>: Stream> { + /// Setup the Sensor. + /// + /// After this async function completes the Stream implementation should be able to generate + /// futures when polled. + /// Implementations can rely on this function being polled to completeion before the stream + /// is polled. + async fn setup(&mut self); + + /// Shutdown the sensor gracefully + /// + /// Implementations can rely on that the stream will not be polled after this function has been + /// called. + async fn shutdown(&mut self); +} + +struct Dummy<'a> { + phantom: &'a std::marker::PhantomData<()>, +} +#[async_trait] +impl<'a> Sensor<'a> for Dummy<'a> { + async fn setup(&mut self) { + return; + } + + async fn shutdown(&mut self) { + return; + } +} + +impl<'a> Stream for Dummy<'a> { + type Item = BoxFuture<'a, ()>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + todo!() + } } From 637490bd75c0bfa959e7c662af9074c5da71ed46 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Thu, 17 Sep 2020 11:46:05 +0200 Subject: [PATCH 05/18] Add sensors registry to global registries --- src/registries.rs | 5 ++++- src/registries/sensors.rs | 9 +++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/registries.rs b/src/registries.rs index bf7d371..069403a 100644 --- a/src/registries.rs +++ b/src/registries.rs @@ -2,6 +2,7 @@ mod actuators; mod sensors; pub use actuators::{Actuator, ActBox}; +pub use sensors::{Sensor, SensBox}; #[derive(Clone)] /// BFFH registries @@ -10,12 +11,14 @@ pub use actuators::{Actuator, ActBox}; /// reference, not clone the registries pub struct Registries { pub actuators: actuators::Actuators, + pub sensors: sensors::Sensors, } impl Registries { pub fn new() -> Self { Registries { - actuators: actuators::Actuators::new() + actuators: actuators::Actuators::new(), + sensors: sensors::Sensors::new(), } } } diff --git a/src/registries/sensors.rs b/src/registries/sensors.rs index 25c41ee..042a00c 100644 --- a/src/registries/sensors.rs +++ b/src/registries/sensors.rs @@ -7,10 +7,19 @@ use std::sync::Arc; use smol::lock::RwLock; use std::collections::HashMap; +#[derive(Clone)] pub struct Sensors { inner: Arc>, } +impl Sensors { + pub fn new() -> Self { + Sensors { + inner: Arc::new(RwLock::new(Inner::new())), + } + } +} + pub type SensBox<'a> = Box>; type Inner = HashMap>; From 267ff63016fc826438ffd9f202cad5ac0f549351 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Thu, 17 Sep 2020 11:57:45 +0200 Subject: [PATCH 06/18] Improves lifetimes to be more ergonomic --- src/registries/sensors.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/registries/sensors.rs b/src/registries/sensors.rs index 042a00c..b926989 100644 --- a/src/registries/sensors.rs +++ b/src/registries/sensors.rs @@ -20,8 +20,8 @@ impl Sensors { } } -pub type SensBox<'a> = Box>; -type Inner = HashMap>; +pub type SensBox = Box; +type Inner = HashMap; // Implementing Sensors. @@ -35,13 +35,14 @@ type Inner = HashMap>; /// A sensor is anything that can forward an intent of an user to do something to bffh. /// This may be a card reader connected to a machine, a website allowing users to select a machine /// they want to use or something like QRHello -pub trait Sensor<'a>: Stream> { +pub trait Sensor: Stream> { /// Setup the Sensor. /// /// After this async function completes the Stream implementation should be able to generate /// futures when polled. /// Implementations can rely on this function being polled to completeion before the stream /// is polled. + // TODO Is this sensible vs just having module-specific setup fns? async fn setup(&mut self); /// Shutdown the sensor gracefully @@ -51,11 +52,9 @@ pub trait Sensor<'a>: Stream> { async fn shutdown(&mut self); } -struct Dummy<'a> { - phantom: &'a std::marker::PhantomData<()>, -} +struct Dummy; #[async_trait] -impl<'a> Sensor<'a> for Dummy<'a> { +impl Sensor for Dummy { async fn setup(&mut self) { return; } @@ -65,10 +64,10 @@ impl<'a> Sensor<'a> for Dummy<'a> { } } -impl<'a> Stream for Dummy<'a> { - type Item = BoxFuture<'a, ()>; +impl Stream for Dummy { + type Item = BoxFuture<'static, ()>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - todo!() + Poll::Ready(Some(Box::pin(futures::future::ready(())))) } } From c943e78cc6b918392c9c368bcfd4850208ead65f Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Thu, 17 Sep 2020 14:32:53 +0200 Subject: [PATCH 07/18] Move modules back to threadpool --- src/main.rs | 2 +- src/modules.rs | 6 +++--- src/modules/shelly.rs | 20 +++++++------------- src/registries/actuators.rs | 4 +--- src/registries/sensors.rs | 2 +- 5 files changed, 13 insertions(+), 21 deletions(-) diff --git a/src/main.rs b/src/main.rs index 25bc642..2cd09e9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -229,7 +229,7 @@ fn main() -> Result<(), Error> { // without warning. let modlog = log.clone(); let regs = Registries::new(); - match modules::init(modlog.new(o!("system" => "modules")), &config, &local_spawn, regs) { + match modules::init(modlog.new(o!("system" => "modules")), &config, &pool, regs) { Ok(()) => {} Err(e) => { error!(modlog, "Module startup failed: {}", e); diff --git a/src/modules.rs b/src/modules.rs index 8faaf35..8bb06bb 100644 --- a/src/modules.rs +++ b/src/modules.rs @@ -10,16 +10,16 @@ use slog::Logger; mod shelly; use futures::prelude::*; -use futures::task::LocalSpawn; +use futures::task::Spawn; use crate::config::Settings; use crate::error::Result; use crate::registries::Registries; // spawner is a type that allows 'tasks' to be spawned on it, running them to completion. -pub fn init(log: Logger, config: &Settings, spawner: &S, registries: Registries) -> Result<()> { +pub fn init(log: Logger, config: &Settings, spawner: &S, registries: Registries) -> Result<()> { let f = Box::new(shelly::run(log.clone(), config.clone(), registries.clone())); - spawner.spawn_local_obj(f.into())?; + spawner.spawn_obj(f.into())?; Ok(()) } diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index 732782c..c51ceb8 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -15,33 +15,27 @@ use paho_mqtt as mqtt; // entirety. This works reasonably enough for this static modules here but if we do dynamic loading // via dlopen(), lua API, python API etc it will not. pub async fn run(log: Logger, config: Settings, registries: Registries) { - let shelly_r = Shelly::new(config).await; - if let Err(e) = shelly_r { - error!(log, "Shelly module errored: {}", e); - return; - } + let shelly = Shelly::new(config).await; - let r = registries.actuators.register( - "shelly".to_string(), - shelly_r.unwrap() - ).await; + let r = registries.actuators.register("shelly".to_string(), shelly).await; } /// An actuator for all Shellies connected listening on one MQTT broker /// /// This actuator can power toggle an arbitrariy named shelly on the broker it is connected to. If /// you need to toggle shellies on multiple brokers you need multiple instanced of this actuator. +#[derive(Clone)] struct Shelly { client: mqtt::AsyncClient, } impl Shelly { - pub async fn new(config: Settings) -> Result { - let client = mqtt::AsyncClient::new(config.shelly.unwrap().mqtt_url)?; + pub async fn new(config: Settings) -> ActBox { + let client = mqtt::AsyncClient::new(config.shelly.unwrap().mqtt_url).unwrap(); - client.connect(mqtt::ConnectOptions::new()).await?; + client.connect(mqtt::ConnectOptions::new()).await.unwrap(); - Ok(Box::new(Shelly { client }) as ActBox) + Box::new(Shelly { client }) } } diff --git a/src/registries/actuators.rs b/src/registries/actuators.rs index 890fc1e..3110ada 100644 --- a/src/registries/actuators.rs +++ b/src/registries/actuators.rs @@ -10,9 +10,7 @@ pub struct Actuators { inner: Arc>, } -unsafe impl Send for Actuators { } - -pub type ActBox = Box; +pub type ActBox = Box; type Inner = HashMap; diff --git a/src/registries/sensors.rs b/src/registries/sensors.rs index b926989..2b7819e 100644 --- a/src/registries/sensors.rs +++ b/src/registries/sensors.rs @@ -20,7 +20,7 @@ impl Sensors { } } -pub type SensBox = Box; +pub type SensBox = Box; type Inner = HashMap; From 7e6748ad15b4ec1c49580c1b056db15101f749ef Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Thu, 17 Sep 2020 14:34:57 +0200 Subject: [PATCH 08/18] =?UTF-8?q?Error=20is=20not=20Send=20=C2=AF\(?= =?UTF-8?q?=E3=83=84)/=C2=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/modules/shelly.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index c51ceb8..2db6be8 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -30,6 +30,7 @@ struct Shelly { } impl Shelly { + // Can't use Error, it's not Send. fabinfra/fabaccess/bffh#7 pub async fn new(config: Settings) -> ActBox { let client = mqtt::AsyncClient::new(config.shelly.unwrap().mqtt_url).unwrap(); From 0ea9177e1417fe768960ec4279a475bbb32509a6 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Thu, 17 Sep 2020 15:34:35 +0200 Subject: [PATCH 09/18] Moves actuators to be coroutines --- src/modules/shelly.rs | 29 ++++++++++++++++++++--- src/registries.rs | 2 +- src/registries/actuators.rs | 46 +++++++++++++++++++++++++++++++++++-- 3 files changed, 71 insertions(+), 6 deletions(-) diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index 2db6be8..7e706f5 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -1,13 +1,14 @@ use slog::Logger; use crate::config::Settings; -use crate::registries::{Registries, Actuator, ActBox}; +use crate::registries::{Registries, Actuator, ActBox, StatusSignal}; use crate::error::Result; use std::pin::Pin; use futures::prelude::*; use futures::ready; use futures::task::{Poll, Context}; +use futures_signals::signal::Signal; use paho_mqtt as mqtt; @@ -24,8 +25,9 @@ pub async fn run(log: Logger, config: Settings, registries: Registries) { /// /// This actuator can power toggle an arbitrariy named shelly on the broker it is connected to. If /// you need to toggle shellies on multiple brokers you need multiple instanced of this actuator. -#[derive(Clone)] struct Shelly { + signal: Option, + name: String, client: mqtt::AsyncClient, } @@ -36,7 +38,10 @@ impl Shelly { client.connect(mqtt::ConnectOptions::new()).await.unwrap(); - Box::new(Shelly { client }) + let name = "test".to_string(); + let signal: Option = None; + + Box::new(Shelly { signal, name, client }) } } @@ -55,3 +60,21 @@ impl Actuator for Shelly { self.client.publish(msg).map(|_| ()).await } } + +impl Stream for Shelly { + type Item = future::BoxFuture<'static, ()>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let unpin = Pin::into_inner(self); + if let Some(ref mut s) = unpin.signal { + let status = ready!(Signal::poll_change(Pin::new(s), cx)); + let topic = format!("shellies/{}/relay/0/command", unpin.name); + let msg = mqtt::Message::new(topic, "on", 0); + let f = unpin.client.publish(msg).map(|_| ()); + + Poll::Ready(Some(Box::pin(f))) + } else { + Poll::Pending + } + } +} diff --git a/src/registries.rs b/src/registries.rs index 069403a..0975bab 100644 --- a/src/registries.rs +++ b/src/registries.rs @@ -1,7 +1,7 @@ mod actuators; mod sensors; -pub use actuators::{Actuator, ActBox}; +pub use actuators::{Actuator, ActBox, StatusSignal}; pub use sensors::{Sensor, SensBox}; #[derive(Clone)] diff --git a/src/registries/actuators.rs b/src/registries/actuators.rs index 3110ada..1e6a6e5 100644 --- a/src/registries/actuators.rs +++ b/src/registries/actuators.rs @@ -1,7 +1,15 @@ +use slog::Logger; + use std::sync::Arc; use smol::lock::RwLock; +use std::pin::Pin; +use futures::ready; use futures::prelude::*; +use futures::task::{Context, Poll}; +use futures_signals::signal::Signal; + +use crate::machine::Status; use std::collections::HashMap; @@ -30,16 +38,26 @@ impl Actuators { #[async_trait] -pub trait Actuator { +pub trait Actuator: Stream> { // TODO: Is it smarter to pass a (reference to?) a machine instead of 'name'? Do we need to // pass basically arbitrary parameters to the Actuator? async fn power_on(&mut self, name: String); async fn power_off(&mut self, name: String); } +pub type StatusSignal = Pin + Send + Sync>>; + +#[async_trait] +pub trait Subscriber { + async fn subscribe(&mut self, signal: StatusSignal); +} + // This is merely a proof that Actuator *can* be implemented on a finite, known type. Yay for type // systems with halting problems. -struct Dummy; +struct Dummy { + log: Logger, + signal: Option +} #[async_trait] impl Actuator for Dummy { async fn power_on(&mut self, _name: String) { @@ -49,3 +67,27 @@ impl Actuator for Dummy { return } } + +#[async_trait] +impl Subscriber for Dummy { + async fn subscribe(&mut self, signal: StatusSignal) { + self.signal.replace(signal); + } +} + +impl Stream for Dummy { + type Item = future::BoxFuture<'static, ()>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let unpin = Pin::into_inner(self); + if let Some(ref mut s) = unpin.signal { + let status = ready!(Signal::poll_change(Pin::new(s), cx)); + + info!(unpin.log, "Dummy actuator would set status to {:?}, but is a Dummy", status); + + Poll::Ready(Some(Box::pin(futures::future::ready(())))) + } else { + Poll::Pending + } + } +} From 4bd62216e277097ef6fca6d374e339e99487ec35 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Thu, 17 Sep 2020 15:36:42 +0200 Subject: [PATCH 10/18] Subscriber is Actuator now --- src/modules/shelly.rs | 12 ++---------- src/registries/actuators.rs | 21 +-------------------- 2 files changed, 3 insertions(+), 30 deletions(-) diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index 7e706f5..d7da62b 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -48,16 +48,8 @@ impl Shelly { #[async_trait] impl Actuator for Shelly { - async fn power_on(&mut self, name: String) { - let topic = format!("shellies/{}/relay/0/command", name); - let msg = mqtt::Message::new(topic, "on", 0); - self.client.publish(msg).map(|_| ()).await - } - - async fn power_off(&mut self, name: String) { - let topic = format!("shellies/{}/relay/0/command", name); - let msg = mqtt::Message::new(topic, "off", 0); - self.client.publish(msg).map(|_| ()).await + async fn subscribe(&mut self, signal: StatusSignal) { + self.signal.replace(signal); } } diff --git a/src/registries/actuators.rs b/src/registries/actuators.rs index 1e6a6e5..936d503 100644 --- a/src/registries/actuators.rs +++ b/src/registries/actuators.rs @@ -36,19 +36,10 @@ impl Actuators { } } - -#[async_trait] -pub trait Actuator: Stream> { - // TODO: Is it smarter to pass a (reference to?) a machine instead of 'name'? Do we need to - // pass basically arbitrary parameters to the Actuator? - async fn power_on(&mut self, name: String); - async fn power_off(&mut self, name: String); -} - pub type StatusSignal = Pin + Send + Sync>>; #[async_trait] -pub trait Subscriber { +pub trait Actuator: Stream> { async fn subscribe(&mut self, signal: StatusSignal); } @@ -60,16 +51,6 @@ struct Dummy { } #[async_trait] impl Actuator for Dummy { - async fn power_on(&mut self, _name: String) { - return - } - async fn power_off(&mut self, _name: String) { - return - } -} - -#[async_trait] -impl Subscriber for Dummy { async fn subscribe(&mut self, signal: StatusSignal) { self.signal.replace(signal); } From f4d5a70841214aac70f3ec9eea4d4d8f889840db Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Thu, 17 Sep 2020 15:45:43 +0200 Subject: [PATCH 11/18] Actually make the shellies switch --- src/modules/shelly.rs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index d7da62b..5cb4640 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -3,6 +3,7 @@ use slog::Logger; use crate::config::Settings; use crate::registries::{Registries, Actuator, ActBox, StatusSignal}; use crate::error::Result; +use crate::machine::Status; use std::pin::Pin; use futures::prelude::*; @@ -59,14 +60,20 @@ impl Stream for Shelly { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let unpin = Pin::into_inner(self); if let Some(ref mut s) = unpin.signal { - let status = ready!(Signal::poll_change(Pin::new(s), cx)); - let topic = format!("shellies/{}/relay/0/command", unpin.name); - let msg = mqtt::Message::new(topic, "on", 0); - let f = unpin.client.publish(msg).map(|_| ()); + if let Some(status) = ready!(Signal::poll_change(Pin::new(s), cx)) { + let topic = format!("shellies/{}/relay/0/command", unpin.name); + let pl = match status { + Status::Free => "off", + Status::Occupied => "on", + Status::Blocked => "off", + }; + let msg = mqtt::Message::new(topic, pl, 0); + let f = unpin.client.publish(msg).map(|_| ()); - Poll::Ready(Some(Box::pin(f))) - } else { - Poll::Pending + return Poll::Ready(Some(Box::pin(f))); + } } + + Poll::Pending } } From 33d9d76755201871875c2bf2f409a01fb9e7619d Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Thu, 17 Sep 2020 15:47:41 +0200 Subject: [PATCH 12/18] Cleanup --- src/modules/shelly.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index 5cb4640..4dc1d11 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -63,9 +63,8 @@ impl Stream for Shelly { if let Some(status) = ready!(Signal::poll_change(Pin::new(s), cx)) { let topic = format!("shellies/{}/relay/0/command", unpin.name); let pl = match status { - Status::Free => "off", + Status::Free | Status::Blocked => "off", Status::Occupied => "on", - Status::Blocked => "off", }; let msg = mqtt::Message::new(topic, pl, 0); let f = unpin.client.publish(msg).map(|_| ()); From 173ef6d055e068d6f9e7dce05b0a4a1dd51d3d03 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Thu, 17 Sep 2020 16:00:23 +0200 Subject: [PATCH 13/18] Ensure the task polling a shelly stream gets woken up if shelly is subscribed to something --- src/modules/shelly.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index 4dc1d11..3da55b6 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -8,7 +8,7 @@ use crate::machine::Status; use std::pin::Pin; use futures::prelude::*; use futures::ready; -use futures::task::{Poll, Context}; +use futures::task::{Poll, Context, Waker}; use futures_signals::signal::Signal; use paho_mqtt as mqtt; @@ -28,6 +28,7 @@ pub async fn run(log: Logger, config: Settings, registries: Registries) { /// you need to toggle shellies on multiple brokers you need multiple instanced of this actuator. struct Shelly { signal: Option, + waker: Option, name: String, client: mqtt::AsyncClient, } @@ -41,8 +42,9 @@ impl Shelly { let name = "test".to_string(); let signal: Option = None; + let waker = None; - Box::new(Shelly { signal, name, client }) + Box::new(Shelly { signal, waker, name, client }) } } @@ -51,6 +53,9 @@ impl Shelly { impl Actuator for Shelly { async fn subscribe(&mut self, signal: StatusSignal) { self.signal.replace(signal); + if let Some(waker) = self.waker.take() { + waker.wake(); + } } } @@ -71,6 +76,8 @@ impl Stream for Shelly { return Poll::Ready(Some(Box::pin(f))); } + } else { + unpin.waker.replace(cx.waker().clone()); } Poll::Pending From e0c1ce868d75acc65e4d1c3a03dd1bdf6b03e87f Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Thu, 17 Sep 2020 16:05:46 +0200 Subject: [PATCH 14/18] Make Actuator trait non-async --- src/modules/shelly.rs | 3 +-- src/registries/actuators.rs | 7 +++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index 3da55b6..ec23895 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -49,9 +49,8 @@ impl Shelly { } -#[async_trait] impl Actuator for Shelly { - async fn subscribe(&mut self, signal: StatusSignal) { + fn subscribe(&mut self, signal: StatusSignal) { self.signal.replace(signal); if let Some(waker) = self.waker.take() { waker.wake(); diff --git a/src/registries/actuators.rs b/src/registries/actuators.rs index 936d503..15d64fa 100644 --- a/src/registries/actuators.rs +++ b/src/registries/actuators.rs @@ -38,9 +38,8 @@ impl Actuators { pub type StatusSignal = Pin + Send + Sync>>; -#[async_trait] pub trait Actuator: Stream> { - async fn subscribe(&mut self, signal: StatusSignal); + fn subscribe(&mut self, signal: StatusSignal); } // This is merely a proof that Actuator *can* be implemented on a finite, known type. Yay for type @@ -49,9 +48,9 @@ struct Dummy { log: Logger, signal: Option } -#[async_trait] + impl Actuator for Dummy { - async fn subscribe(&mut self, signal: StatusSignal) { + fn subscribe(&mut self, signal: StatusSignal) { self.signal.replace(signal); } } From 7a876a538d4ebfada334c2c5be5e64e462f0ca2e Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Thu, 17 Sep 2020 21:12:30 +0200 Subject: [PATCH 15/18] Status commit --- src/main.rs | 2 +- src/modules.rs | 5 ++--- src/modules/shelly.rs | 9 +++++---- src/registries/actuators.rs | 14 ++++++++++++++ 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/src/main.rs b/src/main.rs index 2cd09e9..a923881 100644 --- a/src/main.rs +++ b/src/main.rs @@ -229,7 +229,7 @@ fn main() -> Result<(), Error> { // without warning. let modlog = log.clone(); let regs = Registries::new(); - match modules::init(modlog.new(o!("system" => "modules")), &config, &pool, regs) { + match exec.run_until(modules::init(modlog.new(o!("system" => "modules")), config.clone(), pool.clone(), regs)) { Ok(()) => {} Err(e) => { error!(modlog, "Module startup failed: {}", e); diff --git a/src/modules.rs b/src/modules.rs index 8bb06bb..e200129 100644 --- a/src/modules.rs +++ b/src/modules.rs @@ -17,9 +17,8 @@ use crate::error::Result; use crate::registries::Registries; // spawner is a type that allows 'tasks' to be spawned on it, running them to completion. -pub fn init(log: Logger, config: &Settings, spawner: &S, registries: Registries) -> Result<()> { - let f = Box::new(shelly::run(log.clone(), config.clone(), registries.clone())); - spawner.spawn_obj(f.into())?; +pub async fn init(log: Logger, config: Settings, spawner: S, registries: Registries) -> Result<()> { + shelly::run(log.clone(), config.clone(), registries.clone()).await; Ok(()) } diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index ec23895..9abf94a 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -8,7 +8,8 @@ use crate::machine::Status; use std::pin::Pin; use futures::prelude::*; use futures::ready; -use futures::task::{Poll, Context, Waker}; +use futures::task::{Poll, Context, Waker, Spawn}; +use futures::StreamExt; use futures_signals::signal::Signal; use paho_mqtt as mqtt; @@ -19,7 +20,7 @@ use paho_mqtt as mqtt; pub async fn run(log: Logger, config: Settings, registries: Registries) { let shelly = Shelly::new(config).await; - let r = registries.actuators.register("shelly".to_string(), shelly).await; + let r = registries.actuators.register("shelly".to_string(), Box::new(shelly)).await; } /// An actuator for all Shellies connected listening on one MQTT broker @@ -35,7 +36,7 @@ struct Shelly { impl Shelly { // Can't use Error, it's not Send. fabinfra/fabaccess/bffh#7 - pub async fn new(config: Settings) -> ActBox { + pub async fn new(config: Settings) -> Self { let client = mqtt::AsyncClient::new(config.shelly.unwrap().mqtt_url).unwrap(); client.connect(mqtt::ConnectOptions::new()).await.unwrap(); @@ -44,7 +45,7 @@ impl Shelly { let signal: Option = None; let waker = None; - Box::new(Shelly { signal, waker, name, client }) + Shelly { signal, waker, name, client } } } diff --git a/src/registries/actuators.rs b/src/registries/actuators.rs index 15d64fa..1183b16 100644 --- a/src/registries/actuators.rs +++ b/src/registries/actuators.rs @@ -34,6 +34,20 @@ impl Actuators { // TODO: Log an error or something if that name was already taken wlock.insert(name, act); } + + pub async fn run(&mut self) { + let mut wlock = self.inner.write().await; + for (_name, act) in wlock.into_iter() { + + } + } + + pub async fn subscribe(&mut self, name: String, signal: StatusSignal) { + let mut wlock = self.inner.write().await; + if let Some(act) = wlock.get_mut(&name) { + act.subscribe(signal); + } + } } pub type StatusSignal = Pin + Send + Sync>>; From 026aa40019c90d8efe150489633d946787e3f0a1 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Fri, 18 Sep 2020 12:34:18 +0200 Subject: [PATCH 16/18] Look ma, an event network! --- src/machine.rs | 10 ++++++-- src/main.rs | 40 +++++++++++++++++++++++------- src/modules.rs | 2 +- src/modules/shelly.rs | 49 +++++++++++++++++++++++++++++++------ src/registries/actuators.rs | 25 ++++++++----------- 5 files changed, 92 insertions(+), 34 deletions(-) diff --git a/src/machine.rs b/src/machine.rs index c60e369..9229e76 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -23,6 +23,8 @@ use smol::channel::{Receiver, Sender}; use futures_signals::signal::*; +use crate::registries::StatusSignal; + pub type ID = Uuid; /// Status of a Machine @@ -138,8 +140,8 @@ impl Machine { /// dedupe ensures that if state is changed but only changes to the value it had beforehand /// (could for example happen if the machine changes current user but stays activated) no /// update is sent. - pub fn signal(&self) -> impl Signal { - self.state.signal().dedupe() + pub fn signal(&self) -> StatusSignal { + Box::pin(self.state.signal().dedupe()) } /// Requests to use a machine. Returns `true` if successful. @@ -159,6 +161,10 @@ impl Machine { return Ok(false); } } + + pub fn set_state(&mut self, state: Status) { + self.state.set(state) + } } pub struct MachineDB { diff --git a/src/main.rs b/src/main.rs index a923881..7e8739e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -197,16 +197,16 @@ fn main() -> Result<(), Error> { } }).collect(); - let (mach, auth) = exec.run_until(async { - // Rull all futures to completion in parallel. - // This will block until all three are done starting up. - join!(machinedb_f, authentication_f) - }); + //let (mach, auth) = exec.run_until(async { + // // Rull all futures to completion in parallel. + // // This will block until all three are done starting up. + // join!(machinedb_f, authentication_f) + //}); // Error out if any of the subsystems failed to start. - let mach = mach?; + //let mach = mach?; let pdb = pdb?; - let auth = auth?; + //let auth = auth?; // Since the below closures will happen at a much later time we need to make sure all pointers // are still valid. Thus, Arc. @@ -228,8 +228,8 @@ fn main() -> Result<(), Error> { // FIXME: implement notification so the modules can shut down cleanly instead of being killed // without warning. let modlog = log.clone(); - let regs = Registries::new(); - match exec.run_until(modules::init(modlog.new(o!("system" => "modules")), config.clone(), pool.clone(), regs)) { + let mut regs = Registries::new(); + match exec.run_until(modules::init(modlog.new(o!("system" => "modules")), config.clone(), pool.clone(), regs.clone())) { Ok(()) => {} Err(e) => { error!(modlog, "Module startup failed: {}", e); @@ -237,6 +237,28 @@ fn main() -> Result<(), Error> { } } + use uuid::Uuid; + use machine::{Status, Machine}; + let mut machine = Machine::new(Uuid::new_v4(), "Testmachine".to_string(), 0); + let f = regs.actuators.subscribe("shelly".to_string(), machine.signal()); + exec.run_until(f); + + let postlog = log.clone(); + use std::thread; + use std::time::Duration; + thread::spawn(move || { + info!(postlog, "Zzz"); + thread::sleep(Duration::from_millis(1000)); + machine.set_state(Status::Occupied); + info!(postlog, "Beep"); + thread::sleep(Duration::from_millis(2000)); + machine.set_state(Status::Blocked); + info!(postlog, "Bap"); + thread::sleep(Duration::from_millis(3000)); + machine.set_state(Status::Free); + info!(postlog, "Boop"); + }); + // Closure inefficiencies. Lucky cloning an Arc is pretty cheap. let inner_log = log.clone(); let loop_log = log.clone(); diff --git a/src/modules.rs b/src/modules.rs index e200129..9f8a21e 100644 --- a/src/modules.rs +++ b/src/modules.rs @@ -18,7 +18,7 @@ use crate::registries::Registries; // spawner is a type that allows 'tasks' to be spawned on it, running them to completion. pub async fn init(log: Logger, config: Settings, spawner: S, registries: Registries) -> Result<()> { - shelly::run(log.clone(), config.clone(), registries.clone()).await; + shelly::run(log.clone(), config.clone(), registries.clone(), spawner.clone()).await; Ok(()) } diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index 9abf94a..91ebee6 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -7,8 +7,9 @@ use crate::machine::Status; use std::pin::Pin; use futures::prelude::*; +use futures::channel::mpsc; use futures::ready; -use futures::task::{Poll, Context, Waker, Spawn}; +use futures::task::{Poll, Context, Waker, Spawn, FutureObj}; use futures::StreamExt; use futures_signals::signal::Signal; @@ -17,10 +18,15 @@ use paho_mqtt as mqtt; // TODO: Late config parsing. Right now the config is validated at the very startup in its // entirety. This works reasonably enough for this static modules here but if we do dynamic loading // via dlopen(), lua API, python API etc it will not. -pub async fn run(log: Logger, config: Settings, registries: Registries) { - let shelly = Shelly::new(config).await; +pub async fn run(log: Logger, config: Settings, registries: Registries, spawner: S) { + let (tx, rx) = mpsc::channel(1); + let mut shelly = Shelly::new(log, config, rx).await; + + let r = registries.actuators.register("shelly".to_string(), tx).await; + + let f = shelly.for_each(|f| f); + spawner.spawn_obj(FutureObj::from(Box::pin(f))); - let r = registries.actuators.register("shelly".to_string(), Box::new(shelly)).await; } /// An actuator for all Shellies connected listening on one MQTT broker @@ -28,6 +34,8 @@ pub async fn run(log: Logger, config: Settings, registries: Registries) { /// This actuator can power toggle an arbitrariy named shelly on the broker it is connected to. If /// you need to toggle shellies on multiple brokers you need multiple instanced of this actuator. struct Shelly { + log: Logger, + sigchan: mpsc::Receiver, signal: Option, waker: Option, name: String, @@ -36,16 +44,17 @@ struct Shelly { impl Shelly { // Can't use Error, it's not Send. fabinfra/fabaccess/bffh#7 - pub async fn new(config: Settings) -> Self { + pub async fn new(log: Logger, config: Settings, sigchan: mpsc::Receiver) -> Self { let client = mqtt::AsyncClient::new(config.shelly.unwrap().mqtt_url).unwrap(); - client.connect(mqtt::ConnectOptions::new()).await.unwrap(); + let o = client.connect(mqtt::ConnectOptions::new()).await.unwrap(); + println!("{:?}", o); let name = "test".to_string(); let signal: Option = None; let waker = None; - Shelly { signal, waker, name, client } + Shelly { log, sigchan, signal, waker, name, client } } } @@ -64,8 +73,33 @@ impl Stream for Shelly { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let unpin = Pin::into_inner(self); + + info!(unpin.log, "tick {}", unpin.signal.is_some()); + + if let Poll::Ready(v) = Stream::poll_next(Pin::new(&mut unpin.sigchan), cx) { + if let Some(s) = v { + // We have received a new signal to use + unpin.signal.replace(s); + // We use `if let` instead of .and_then because we want the waker to be dropped + // afterwards. It's only there to ensure the future is called when a signal is + // installed the first time + // TODO probably don't need that here because we're polling it either way directly + // afterwards, eh? + if let Some(waker) = unpin.waker.take() { + waker.wake(); + } + } else { + info!(unpin.log, "bye"); + // This means that the sending end was dropped, so we shut down + unpin.signal.take(); + unpin.waker.take(); + return Poll::Ready(None); + } + } + if let Some(ref mut s) = unpin.signal { if let Some(status) = ready!(Signal::poll_change(Pin::new(s), cx)) { + info!(unpin.log, "Machine Status changed: {:?}", status); let topic = format!("shellies/{}/relay/0/command", unpin.name); let pl = match status { Status::Free | Status::Blocked => "off", @@ -77,6 +111,7 @@ impl Stream for Shelly { return Poll::Ready(Some(Box::pin(f))); } } else { + info!(unpin.log, "I ain't got no signal son"); unpin.waker.replace(cx.waker().clone()); } diff --git a/src/registries/actuators.rs b/src/registries/actuators.rs index 1183b16..2ffa81f 100644 --- a/src/registries/actuators.rs +++ b/src/registries/actuators.rs @@ -6,7 +6,8 @@ use smol::lock::RwLock; use std::pin::Pin; use futures::ready; use futures::prelude::*; -use futures::task::{Context, Poll}; +use futures::channel::mpsc; +use futures::task::{Context, Poll, Spawn}; use futures_signals::signal::Signal; use crate::machine::Status; @@ -18,9 +19,9 @@ pub struct Actuators { inner: Arc>, } -pub type ActBox = Box; +pub type ActBox = Box; -type Inner = HashMap; +type Inner = HashMap>; impl Actuators { pub fn new() -> Self { @@ -29,23 +30,16 @@ impl Actuators { } } - pub async fn register(&self, name: String, act: ActBox) { + pub async fn register(&self, name: String, tx: mpsc::Sender) { let mut wlock = self.inner.write().await; // TODO: Log an error or something if that name was already taken - wlock.insert(name, act); - } - - pub async fn run(&mut self) { - let mut wlock = self.inner.write().await; - for (_name, act) in wlock.into_iter() { - - } + wlock.insert(name, tx); } pub async fn subscribe(&mut self, name: String, signal: StatusSignal) { let mut wlock = self.inner.write().await; - if let Some(act) = wlock.get_mut(&name) { - act.subscribe(signal); + if let Some(tx) = wlock.get_mut(&name) { + tx.send(signal).await; } } } @@ -60,7 +54,8 @@ pub trait Actuator: Stream> { // systems with halting problems. struct Dummy { log: Logger, - signal: Option + sigchan: mpsc::Receiver, + signal: Option, } impl Actuator for Dummy { From 9d7ba0eeda37358825adaa29bf5091ccb09c2636 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Fri, 18 Sep 2020 13:14:24 +0200 Subject: [PATCH 17/18] MachineDB dump/load --- src/machine.rs | 96 ++++++++++++++++++++++++++++++++++++++++---------- src/main.rs | 12 ++++--- 2 files changed, 85 insertions(+), 23 deletions(-) diff --git a/src/machine.rs b/src/machine.rs index 9229e76..3cd8a62 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -1,6 +1,9 @@ +use std::str::FromStr; use std::collections::HashMap; +use std::fs; use std::fs::File; use std::io::{Read, Write}; +use std::path::{Path, PathBuf}; use slog::Logger; @@ -17,7 +20,7 @@ use capnp::Error; use uuid::Uuid; -use lmdb::{Transaction, RwTransaction}; +use lmdb::{Transaction, RwTransaction, Cursor}; use smol::channel::{Receiver, Sender}; @@ -39,17 +42,6 @@ pub enum Status { Blocked, } -pub struct MachinesProvider { - log: Logger, - mdb: MachineDB, -} - -impl MachinesProvider { - pub fn new(log: Logger, mdb: MachineDB) -> Self { - Self { log, mdb } - } -} - #[derive(Clone)] pub struct Machines { inner: Arc>, @@ -167,13 +159,14 @@ impl Machine { } } -pub struct MachineDB { +pub struct MachinesProvider { + log: Logger, db: lmdb::Database, } -impl MachineDB { - pub fn new(db: lmdb::Database) -> Self { - Self { db } +impl MachinesProvider { + pub fn new(log: Logger, db: lmdb::Database) -> Self { + Self { log, db } } pub fn get_machine(&self, txn: &T, uuid: Uuid) @@ -199,8 +192,75 @@ impl MachineDB { Ok(()) } + + pub fn load_db(&mut self, txn: &mut RwTransaction, mut path: PathBuf) -> Result<()> { + path.push("machines"); + for entry in std::fs::read_dir(path)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() { + // will only ever be none if the path has no file name and then how is it a file?! + let machID_str = path + .file_stem().expect("Found a file with no filename?") + .to_str().expect("Found an OsStr that isn't valid Unicode. Fix your OS!"); + let machID = match uuid::Uuid::from_str(machID_str) { + Ok(i) => i, + Err(e) => { + warn!(self.log, "File {} had a invalid name. Expected an u64 in [0-9a-z] hex with optional file ending: {}. Skipping!", path.display(), e); + continue; + } + }; + let s = match fs::read_to_string(path.as_path()) { + Ok(s) => s, + Err(e) => { + warn!(self.log, "Failed to open file {}: {}, skipping!" + , path.display() + , e); + continue; + } + }; + let mach: Machine = match toml::from_str(&s) { + Ok(r) => r, + Err(e) => { + warn!(self.log, "Failed to parse mach at path {}: {}, skipping!" + , path.display() + , e); + continue; + } + }; + self.put_machine(txn, machID, mach)?; + debug!(self.log, "Loaded machine {}", machID); + } else { + warn!(self.log, "Path {} is not a file, skipping!", path.display()); + } + } + + Ok(()) + } + + pub fn dump_db(&self, txn: &T, mut path: PathBuf) -> Result<()> { + path.push("machines"); + let mut mach_cursor = txn.open_ro_cursor(self.db)?; + for buf in mach_cursor.iter_start() { + let (kbuf, vbuf) = buf?; + let machID = uuid::Uuid::from_slice(kbuf).unwrap(); + let mach: Machine = flexbuffers::from_slice(vbuf)?; + let filename = format!("{}.yml", machID.to_hyphenated().to_string()); + path.set_file_name(filename); + let mut fp = std::fs::File::create(&path)?; + let out = toml::to_vec(&mach)?; + fp.write_all(&out)?; + } + + Ok(()) + } } -pub async fn init(log: Logger, config: &Settings) -> Result { - unimplemented!() +pub fn init(log: Logger, config: &Settings, env: &lmdb::Environment) -> Result { + let mut flags = lmdb::DatabaseFlags::empty(); + flags.set(lmdb::DatabaseFlags::INTEGER_KEY, true); + let machdb = env.create_db(Some("machines"), flags)?; + debug!(&log, "Opened machine db successfully."); + + Ok(MachinesProvider::new(log, machdb)) } diff --git a/src/main.rs b/src/main.rs index 7e8739e..facdb4c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -132,7 +132,7 @@ fn main() -> Result<(), Error> { // Start loading the machine database, authentication system and permission system // All of those get a custom logger so the source of a log message can be better traced and // filtered - let machinedb_f = machine::init(log.new(o!("system" => "machines")), &config); + let mdb = machine::init(log.new(o!("system" => "machines")), &config, &env); let pdb = access::init(log.new(o!("system" => "permissions")), &config, &env); let authentication_f = auth::init(log.new(o!("system" => "authentication")), config.clone()); @@ -148,7 +148,8 @@ fn main() -> Result<(), Error> { let mut txn = env.begin_rw_txn()?; let path = path.to_path_buf(); - pdb?.load_db(&mut txn, path)?; + pdb?.load_db(&mut txn, path.clone())?; + mdb?.load_db(&mut txn, path)?; txn.commit(); } else { error!(log, "You must provide a directory path to load from"); @@ -165,9 +166,9 @@ fn main() -> Result<(), Error> { let txn = env.begin_ro_txn()?; let path = path.to_path_buf(); - pdb?.dump_db(&txn, path)?; + pdb?.dump_db(&txn, path.clone())?; + mdb?.dump_db(&txn, path)?; } else { - error!(log, "You must provide a directory path to dump into"); } @@ -204,7 +205,7 @@ fn main() -> Result<(), Error> { //}); // Error out if any of the subsystems failed to start. - //let mach = mach?; + let mdb = mdb?; let pdb = pdb?; //let auth = auth?; @@ -240,6 +241,7 @@ fn main() -> Result<(), Error> { use uuid::Uuid; use machine::{Status, Machine}; let mut machine = Machine::new(Uuid::new_v4(), "Testmachine".to_string(), 0); + println!("{}", toml::to_string(&machine).unwrap()); let f = regs.actuators.subscribe("shelly".to_string(), machine.signal()); exec.run_until(f); From ce8ba084d5796983e83e806460f824f0db8b3616 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Fri, 18 Sep 2020 13:20:04 +0200 Subject: [PATCH 18/18] Remove in-promptu test code --- src/main.rs | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/src/main.rs b/src/main.rs index facdb4c..1572ae2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -238,29 +238,6 @@ fn main() -> Result<(), Error> { } } - use uuid::Uuid; - use machine::{Status, Machine}; - let mut machine = Machine::new(Uuid::new_v4(), "Testmachine".to_string(), 0); - println!("{}", toml::to_string(&machine).unwrap()); - let f = regs.actuators.subscribe("shelly".to_string(), machine.signal()); - exec.run_until(f); - - let postlog = log.clone(); - use std::thread; - use std::time::Duration; - thread::spawn(move || { - info!(postlog, "Zzz"); - thread::sleep(Duration::from_millis(1000)); - machine.set_state(Status::Occupied); - info!(postlog, "Beep"); - thread::sleep(Duration::from_millis(2000)); - machine.set_state(Status::Blocked); - info!(postlog, "Bap"); - thread::sleep(Duration::from_millis(3000)); - machine.set_state(Status::Free); - info!(postlog, "Boop"); - }); - // Closure inefficiencies. Lucky cloning an Arc is pretty cheap. let inner_log = log.clone(); let loop_log = log.clone();