mirror of
https://gitlab.com/fabinfra/fabaccess/bffh.git
synced 2024-11-26 08:34:55 +01:00
Look ma, an event network!
This commit is contained in:
parent
7a876a538d
commit
026aa40019
@ -23,6 +23,8 @@ use smol::channel::{Receiver, Sender};
|
|||||||
|
|
||||||
use futures_signals::signal::*;
|
use futures_signals::signal::*;
|
||||||
|
|
||||||
|
use crate::registries::StatusSignal;
|
||||||
|
|
||||||
pub type ID = Uuid;
|
pub type ID = Uuid;
|
||||||
|
|
||||||
/// Status of a Machine
|
/// Status of a Machine
|
||||||
@ -138,8 +140,8 @@ impl Machine {
|
|||||||
/// dedupe ensures that if state is changed but only changes to the value it had beforehand
|
/// dedupe ensures that if state is changed but only changes to the value it had beforehand
|
||||||
/// (could for example happen if the machine changes current user but stays activated) no
|
/// (could for example happen if the machine changes current user but stays activated) no
|
||||||
/// update is sent.
|
/// update is sent.
|
||||||
pub fn signal(&self) -> impl Signal {
|
pub fn signal(&self) -> StatusSignal {
|
||||||
self.state.signal().dedupe()
|
Box::pin(self.state.signal().dedupe())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Requests to use a machine. Returns `true` if successful.
|
/// Requests to use a machine. Returns `true` if successful.
|
||||||
@ -159,6 +161,10 @@ impl Machine {
|
|||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_state(&mut self, state: Status) {
|
||||||
|
self.state.set(state)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct MachineDB {
|
pub struct MachineDB {
|
||||||
|
40
src/main.rs
40
src/main.rs
@ -197,16 +197,16 @@ fn main() -> Result<(), Error> {
|
|||||||
}
|
}
|
||||||
}).collect();
|
}).collect();
|
||||||
|
|
||||||
let (mach, auth) = exec.run_until(async {
|
//let (mach, auth) = exec.run_until(async {
|
||||||
// Rull all futures to completion in parallel.
|
// // Rull all futures to completion in parallel.
|
||||||
// This will block until all three are done starting up.
|
// // This will block until all three are done starting up.
|
||||||
join!(machinedb_f, authentication_f)
|
// join!(machinedb_f, authentication_f)
|
||||||
});
|
//});
|
||||||
|
|
||||||
// Error out if any of the subsystems failed to start.
|
// Error out if any of the subsystems failed to start.
|
||||||
let mach = mach?;
|
//let mach = mach?;
|
||||||
let pdb = pdb?;
|
let pdb = pdb?;
|
||||||
let auth = auth?;
|
//let auth = auth?;
|
||||||
|
|
||||||
// Since the below closures will happen at a much later time we need to make sure all pointers
|
// Since the below closures will happen at a much later time we need to make sure all pointers
|
||||||
// are still valid. Thus, Arc.
|
// are still valid. Thus, Arc.
|
||||||
@ -228,8 +228,8 @@ fn main() -> Result<(), Error> {
|
|||||||
// FIXME: implement notification so the modules can shut down cleanly instead of being killed
|
// FIXME: implement notification so the modules can shut down cleanly instead of being killed
|
||||||
// without warning.
|
// without warning.
|
||||||
let modlog = log.clone();
|
let modlog = log.clone();
|
||||||
let regs = Registries::new();
|
let mut regs = Registries::new();
|
||||||
match exec.run_until(modules::init(modlog.new(o!("system" => "modules")), config.clone(), pool.clone(), regs)) {
|
match exec.run_until(modules::init(modlog.new(o!("system" => "modules")), config.clone(), pool.clone(), regs.clone())) {
|
||||||
Ok(()) => {}
|
Ok(()) => {}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(modlog, "Module startup failed: {}", e);
|
error!(modlog, "Module startup failed: {}", e);
|
||||||
@ -237,6 +237,28 @@ fn main() -> Result<(), Error> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
use uuid::Uuid;
|
||||||
|
use machine::{Status, Machine};
|
||||||
|
let mut machine = Machine::new(Uuid::new_v4(), "Testmachine".to_string(), 0);
|
||||||
|
let f = regs.actuators.subscribe("shelly".to_string(), machine.signal());
|
||||||
|
exec.run_until(f);
|
||||||
|
|
||||||
|
let postlog = log.clone();
|
||||||
|
use std::thread;
|
||||||
|
use std::time::Duration;
|
||||||
|
thread::spawn(move || {
|
||||||
|
info!(postlog, "Zzz");
|
||||||
|
thread::sleep(Duration::from_millis(1000));
|
||||||
|
machine.set_state(Status::Occupied);
|
||||||
|
info!(postlog, "Beep");
|
||||||
|
thread::sleep(Duration::from_millis(2000));
|
||||||
|
machine.set_state(Status::Blocked);
|
||||||
|
info!(postlog, "Bap");
|
||||||
|
thread::sleep(Duration::from_millis(3000));
|
||||||
|
machine.set_state(Status::Free);
|
||||||
|
info!(postlog, "Boop");
|
||||||
|
});
|
||||||
|
|
||||||
// Closure inefficiencies. Lucky cloning an Arc is pretty cheap.
|
// Closure inefficiencies. Lucky cloning an Arc is pretty cheap.
|
||||||
let inner_log = log.clone();
|
let inner_log = log.clone();
|
||||||
let loop_log = log.clone();
|
let loop_log = log.clone();
|
||||||
|
@ -18,7 +18,7 @@ use crate::registries::Registries;
|
|||||||
|
|
||||||
// spawner is a type that allows 'tasks' to be spawned on it, running them to completion.
|
// spawner is a type that allows 'tasks' to be spawned on it, running them to completion.
|
||||||
pub async fn init<S: Spawn + Clone + Send>(log: Logger, config: Settings, spawner: S, registries: Registries) -> Result<()> {
|
pub async fn init<S: Spawn + Clone + Send>(log: Logger, config: Settings, spawner: S, registries: Registries) -> Result<()> {
|
||||||
shelly::run(log.clone(), config.clone(), registries.clone()).await;
|
shelly::run(log.clone(), config.clone(), registries.clone(), spawner.clone()).await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -7,8 +7,9 @@ use crate::machine::Status;
|
|||||||
|
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
|
use futures::channel::mpsc;
|
||||||
use futures::ready;
|
use futures::ready;
|
||||||
use futures::task::{Poll, Context, Waker, Spawn};
|
use futures::task::{Poll, Context, Waker, Spawn, FutureObj};
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use futures_signals::signal::Signal;
|
use futures_signals::signal::Signal;
|
||||||
|
|
||||||
@ -17,10 +18,15 @@ use paho_mqtt as mqtt;
|
|||||||
// TODO: Late config parsing. Right now the config is validated at the very startup in its
|
// TODO: Late config parsing. Right now the config is validated at the very startup in its
|
||||||
// entirety. This works reasonably enough for this static modules here but if we do dynamic loading
|
// entirety. This works reasonably enough for this static modules here but if we do dynamic loading
|
||||||
// via dlopen(), lua API, python API etc it will not.
|
// via dlopen(), lua API, python API etc it will not.
|
||||||
pub async fn run(log: Logger, config: Settings, registries: Registries) {
|
pub async fn run<S: Spawn>(log: Logger, config: Settings, registries: Registries, spawner: S) {
|
||||||
let shelly = Shelly::new(config).await;
|
let (tx, rx) = mpsc::channel(1);
|
||||||
|
let mut shelly = Shelly::new(log, config, rx).await;
|
||||||
|
|
||||||
|
let r = registries.actuators.register("shelly".to_string(), tx).await;
|
||||||
|
|
||||||
|
let f = shelly.for_each(|f| f);
|
||||||
|
spawner.spawn_obj(FutureObj::from(Box::pin(f)));
|
||||||
|
|
||||||
let r = registries.actuators.register("shelly".to_string(), Box::new(shelly)).await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An actuator for all Shellies connected listening on one MQTT broker
|
/// An actuator for all Shellies connected listening on one MQTT broker
|
||||||
@ -28,6 +34,8 @@ pub async fn run(log: Logger, config: Settings, registries: Registries) {
|
|||||||
/// This actuator can power toggle an arbitrariy named shelly on the broker it is connected to. If
|
/// This actuator can power toggle an arbitrariy named shelly on the broker it is connected to. If
|
||||||
/// you need to toggle shellies on multiple brokers you need multiple instanced of this actuator.
|
/// you need to toggle shellies on multiple brokers you need multiple instanced of this actuator.
|
||||||
struct Shelly {
|
struct Shelly {
|
||||||
|
log: Logger,
|
||||||
|
sigchan: mpsc::Receiver<StatusSignal>,
|
||||||
signal: Option<StatusSignal>,
|
signal: Option<StatusSignal>,
|
||||||
waker: Option<Waker>,
|
waker: Option<Waker>,
|
||||||
name: String,
|
name: String,
|
||||||
@ -36,16 +44,17 @@ struct Shelly {
|
|||||||
|
|
||||||
impl Shelly {
|
impl Shelly {
|
||||||
// Can't use Error, it's not Send. fabinfra/fabaccess/bffh#7
|
// Can't use Error, it's not Send. fabinfra/fabaccess/bffh#7
|
||||||
pub async fn new(config: Settings) -> Self {
|
pub async fn new(log: Logger, config: Settings, sigchan: mpsc::Receiver<StatusSignal>) -> Self {
|
||||||
let client = mqtt::AsyncClient::new(config.shelly.unwrap().mqtt_url).unwrap();
|
let client = mqtt::AsyncClient::new(config.shelly.unwrap().mqtt_url).unwrap();
|
||||||
|
|
||||||
client.connect(mqtt::ConnectOptions::new()).await.unwrap();
|
let o = client.connect(mqtt::ConnectOptions::new()).await.unwrap();
|
||||||
|
println!("{:?}", o);
|
||||||
|
|
||||||
let name = "test".to_string();
|
let name = "test".to_string();
|
||||||
let signal: Option<StatusSignal> = None;
|
let signal: Option<StatusSignal> = None;
|
||||||
let waker = None;
|
let waker = None;
|
||||||
|
|
||||||
Shelly { signal, waker, name, client }
|
Shelly { log, sigchan, signal, waker, name, client }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -64,8 +73,33 @@ impl Stream for Shelly {
|
|||||||
|
|
||||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
let unpin = Pin::into_inner(self);
|
let unpin = Pin::into_inner(self);
|
||||||
|
|
||||||
|
info!(unpin.log, "tick {}", unpin.signal.is_some());
|
||||||
|
|
||||||
|
if let Poll::Ready(v) = Stream::poll_next(Pin::new(&mut unpin.sigchan), cx) {
|
||||||
|
if let Some(s) = v {
|
||||||
|
// We have received a new signal to use
|
||||||
|
unpin.signal.replace(s);
|
||||||
|
// We use `if let` instead of .and_then because we want the waker to be dropped
|
||||||
|
// afterwards. It's only there to ensure the future is called when a signal is
|
||||||
|
// installed the first time
|
||||||
|
// TODO probably don't need that here because we're polling it either way directly
|
||||||
|
// afterwards, eh?
|
||||||
|
if let Some(waker) = unpin.waker.take() {
|
||||||
|
waker.wake();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
info!(unpin.log, "bye");
|
||||||
|
// This means that the sending end was dropped, so we shut down
|
||||||
|
unpin.signal.take();
|
||||||
|
unpin.waker.take();
|
||||||
|
return Poll::Ready(None);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(ref mut s) = unpin.signal {
|
if let Some(ref mut s) = unpin.signal {
|
||||||
if let Some(status) = ready!(Signal::poll_change(Pin::new(s), cx)) {
|
if let Some(status) = ready!(Signal::poll_change(Pin::new(s), cx)) {
|
||||||
|
info!(unpin.log, "Machine Status changed: {:?}", status);
|
||||||
let topic = format!("shellies/{}/relay/0/command", unpin.name);
|
let topic = format!("shellies/{}/relay/0/command", unpin.name);
|
||||||
let pl = match status {
|
let pl = match status {
|
||||||
Status::Free | Status::Blocked => "off",
|
Status::Free | Status::Blocked => "off",
|
||||||
@ -77,6 +111,7 @@ impl Stream for Shelly {
|
|||||||
return Poll::Ready(Some(Box::pin(f)));
|
return Poll::Ready(Some(Box::pin(f)));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
info!(unpin.log, "I ain't got no signal son");
|
||||||
unpin.waker.replace(cx.waker().clone());
|
unpin.waker.replace(cx.waker().clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6,7 +6,8 @@ use smol::lock::RwLock;
|
|||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use futures::ready;
|
use futures::ready;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use futures::task::{Context, Poll};
|
use futures::channel::mpsc;
|
||||||
|
use futures::task::{Context, Poll, Spawn};
|
||||||
use futures_signals::signal::Signal;
|
use futures_signals::signal::Signal;
|
||||||
|
|
||||||
use crate::machine::Status;
|
use crate::machine::Status;
|
||||||
@ -18,9 +19,9 @@ pub struct Actuators {
|
|||||||
inner: Arc<RwLock<Inner>>,
|
inner: Arc<RwLock<Inner>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type ActBox = Box<dyn Actuator + Sync + Send>;
|
pub type ActBox = Box<dyn Actuator + Sync + Send + Unpin>;
|
||||||
|
|
||||||
type Inner = HashMap<String, ActBox>;
|
type Inner = HashMap<String, mpsc::Sender<StatusSignal>>;
|
||||||
|
|
||||||
impl Actuators {
|
impl Actuators {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
@ -29,23 +30,16 @@ impl Actuators {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn register(&self, name: String, act: ActBox) {
|
pub async fn register(&self, name: String, tx: mpsc::Sender<StatusSignal>) {
|
||||||
let mut wlock = self.inner.write().await;
|
let mut wlock = self.inner.write().await;
|
||||||
// TODO: Log an error or something if that name was already taken
|
// TODO: Log an error or something if that name was already taken
|
||||||
wlock.insert(name, act);
|
wlock.insert(name, tx);
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn run<S: Spawn>(&mut self) {
|
|
||||||
let mut wlock = self.inner.write().await;
|
|
||||||
for (_name, act) in wlock.into_iter() {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn subscribe(&mut self, name: String, signal: StatusSignal) {
|
pub async fn subscribe(&mut self, name: String, signal: StatusSignal) {
|
||||||
let mut wlock = self.inner.write().await;
|
let mut wlock = self.inner.write().await;
|
||||||
if let Some(act) = wlock.get_mut(&name) {
|
if let Some(tx) = wlock.get_mut(&name) {
|
||||||
act.subscribe(signal);
|
tx.send(signal).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -60,7 +54,8 @@ pub trait Actuator: Stream<Item = future::BoxFuture<'static, ()>> {
|
|||||||
// systems with halting problems.
|
// systems with halting problems.
|
||||||
struct Dummy {
|
struct Dummy {
|
||||||
log: Logger,
|
log: Logger,
|
||||||
signal: Option<StatusSignal>
|
sigchan: mpsc::Receiver<StatusSignal>,
|
||||||
|
signal: Option<StatusSignal>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Actuator for Dummy {
|
impl Actuator for Dummy {
|
||||||
|
Loading…
Reference in New Issue
Block a user