mirror of
https://gitlab.com/fabinfra/fabaccess/bffh.git
synced 2024-11-22 06:47:56 +01:00
Merge branch 'feature/event-network' into 'development'
Feature/event network See merge request fabinfra/fabaccess/bffh!3
This commit is contained in:
commit
832baea07d
@ -19,9 +19,9 @@ use crate::config::Settings;
|
|||||||
use crate::error::Result;
|
use crate::error::Result;
|
||||||
|
|
||||||
// FIXME: fabinfra/fabaccess/bffh#3
|
// FIXME: fabinfra/fabaccess/bffh#3
|
||||||
type UserIdentifier = u64;
|
pub type UserIdentifier = u64;
|
||||||
type RoleIdentifier = u64;
|
pub type RoleIdentifier = u64;
|
||||||
type PermIdentifier = u64;
|
pub type PermIdentifier = u64;
|
||||||
|
|
||||||
pub struct PermissionsProvider {
|
pub struct PermissionsProvider {
|
||||||
log: Logger,
|
log: Logger,
|
||||||
|
143
src/machine.rs
143
src/machine.rs
@ -1,6 +1,9 @@
|
|||||||
|
use std::str::FromStr;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::fs;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{Read, Write};
|
use std::io::{Read, Write};
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
|
||||||
use slog::Logger;
|
use slog::Logger;
|
||||||
|
|
||||||
@ -11,19 +14,25 @@ use smol::lock::RwLock;
|
|||||||
|
|
||||||
use crate::error::Result;
|
use crate::error::Result;
|
||||||
use crate::config::Settings;
|
use crate::config::Settings;
|
||||||
|
use crate::access;
|
||||||
|
|
||||||
use capnp::Error;
|
use capnp::Error;
|
||||||
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use lmdb::{Transaction, RwTransaction};
|
use lmdb::{Transaction, RwTransaction, Cursor};
|
||||||
|
|
||||||
use smol::channel::{Receiver, Sender};
|
use smol::channel::{Receiver, Sender};
|
||||||
|
|
||||||
use futures_signals::signal::*;
|
use futures_signals::signal::*;
|
||||||
|
|
||||||
|
use crate::registries::StatusSignal;
|
||||||
|
|
||||||
|
pub type ID = Uuid;
|
||||||
|
|
||||||
/// Status of a Machine
|
/// Status of a Machine
|
||||||
#[derive(Clone, Copy, PartialEq, Eq, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Copy, PartialEq, Eq, Debug, Serialize, Deserialize)]
|
||||||
|
#[repr(u8)]
|
||||||
pub enum Status {
|
pub enum Status {
|
||||||
/// Not currently used by anybody
|
/// Not currently used by anybody
|
||||||
Free,
|
Free,
|
||||||
@ -33,17 +42,6 @@ pub enum Status {
|
|||||||
Blocked,
|
Blocked,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct MachinesProvider {
|
|
||||||
log: Logger,
|
|
||||||
mdb: MachineDB,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MachinesProvider {
|
|
||||||
pub fn new(log: Logger, mdb: MachineDB) -> Self {
|
|
||||||
Self { log, mdb }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Machines {
|
pub struct Machines {
|
||||||
inner: Arc<RwLock<MachinesProvider>>,
|
inner: Arc<RwLock<MachinesProvider>>,
|
||||||
@ -98,11 +96,16 @@ impl MachineManager {
|
|||||||
/// machine, checking that the user who wants the machine (de)activated has the required
|
/// machine, checking that the user who wants the machine (de)activated has the required
|
||||||
/// permissions.
|
/// permissions.
|
||||||
pub struct Machine {
|
pub struct Machine {
|
||||||
|
/// Computer-readable identifier for this machine
|
||||||
|
// Implicit in database since it's the key.
|
||||||
|
#[serde(skip)]
|
||||||
|
id: ID,
|
||||||
|
|
||||||
/// The human-readable name of the machine. Does not need to be unique
|
/// The human-readable name of the machine. Does not need to be unique
|
||||||
name: String,
|
name: String,
|
||||||
|
|
||||||
/// The required permission to use this machine.
|
/// The required permission to use this machine.
|
||||||
perm: String,
|
perm: access::PermIdentifier,
|
||||||
|
|
||||||
/// The state of the machine as bffh thinks the machine *should* be in.
|
/// The state of the machine as bffh thinks the machine *should* be in.
|
||||||
///
|
///
|
||||||
@ -112,8 +115,9 @@ pub struct Machine {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Machine {
|
impl Machine {
|
||||||
pub fn new(name: String, perm: String) -> Machine {
|
pub fn new(id: Uuid, name: String, perm: access::PermIdentifier) -> Machine {
|
||||||
Machine {
|
Machine {
|
||||||
|
id: id,
|
||||||
name: name,
|
name: name,
|
||||||
perm: perm,
|
perm: perm,
|
||||||
state: Mutable::new(Status::Free),
|
state: Mutable::new(Status::Free),
|
||||||
@ -128,18 +132,41 @@ impl Machine {
|
|||||||
/// dedupe ensures that if state is changed but only changes to the value it had beforehand
|
/// dedupe ensures that if state is changed but only changes to the value it had beforehand
|
||||||
/// (could for example happen if the machine changes current user but stays activated) no
|
/// (could for example happen if the machine changes current user but stays activated) no
|
||||||
/// update is sent.
|
/// update is sent.
|
||||||
pub fn signal(&self) -> impl Signal {
|
pub fn signal(&self) -> StatusSignal {
|
||||||
self.state.signal().dedupe()
|
Box::pin(self.state.signal().dedupe())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Requests to use a machine. Returns `true` if successful.
|
||||||
|
///
|
||||||
|
/// This will update the internal state of the machine, notifying connected actors, if any.
|
||||||
|
pub fn request_use<T: Transaction>
|
||||||
|
( &mut self
|
||||||
|
, txn: &T
|
||||||
|
, pp: &access::PermissionsProvider
|
||||||
|
, who: access::UserIdentifier
|
||||||
|
) -> Result<bool>
|
||||||
|
{
|
||||||
|
if pp.check(txn, who, self.perm)? {
|
||||||
|
self.state.set(Status::Occupied);
|
||||||
|
return Ok(true);
|
||||||
|
} else {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_state(&mut self, state: Status) {
|
||||||
|
self.state.set(state)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct MachineDB {
|
pub struct MachinesProvider {
|
||||||
|
log: Logger,
|
||||||
db: lmdb::Database,
|
db: lmdb::Database,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MachineDB {
|
impl MachinesProvider {
|
||||||
pub fn new(db: lmdb::Database) -> Self {
|
pub fn new(log: Logger, db: lmdb::Database) -> Self {
|
||||||
Self { db }
|
Self { log, db }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_machine<T: Transaction>(&self, txn: &T, uuid: Uuid)
|
pub fn get_machine<T: Transaction>(&self, txn: &T, uuid: Uuid)
|
||||||
@ -147,7 +174,10 @@ impl MachineDB {
|
|||||||
{
|
{
|
||||||
match txn.get(self.db, &uuid.as_bytes()) {
|
match txn.get(self.db, &uuid.as_bytes()) {
|
||||||
Ok(bytes) => {
|
Ok(bytes) => {
|
||||||
Ok(Some(flexbuffers::from_slice(bytes)?))
|
let mut machine: Machine = flexbuffers::from_slice(bytes)?;
|
||||||
|
machine.id = uuid;
|
||||||
|
|
||||||
|
Ok(Some(machine))
|
||||||
},
|
},
|
||||||
Err(lmdb::Error::NotFound) => { Ok(None) },
|
Err(lmdb::Error::NotFound) => { Ok(None) },
|
||||||
Err(e) => { Err(e.into()) },
|
Err(e) => { Err(e.into()) },
|
||||||
@ -162,8 +192,75 @@ impl MachineDB {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn load_db(&mut self, txn: &mut RwTransaction, mut path: PathBuf) -> Result<()> {
|
||||||
|
path.push("machines");
|
||||||
|
for entry in std::fs::read_dir(path)? {
|
||||||
|
let entry = entry?;
|
||||||
|
let path = entry.path();
|
||||||
|
if path.is_file() {
|
||||||
|
// will only ever be none if the path has no file name and then how is it a file?!
|
||||||
|
let machID_str = path
|
||||||
|
.file_stem().expect("Found a file with no filename?")
|
||||||
|
.to_str().expect("Found an OsStr that isn't valid Unicode. Fix your OS!");
|
||||||
|
let machID = match uuid::Uuid::from_str(machID_str) {
|
||||||
|
Ok(i) => i,
|
||||||
|
Err(e) => {
|
||||||
|
warn!(self.log, "File {} had a invalid name. Expected an u64 in [0-9a-z] hex with optional file ending: {}. Skipping!", path.display(), e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let s = match fs::read_to_string(path.as_path()) {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => {
|
||||||
|
warn!(self.log, "Failed to open file {}: {}, skipping!"
|
||||||
|
, path.display()
|
||||||
|
, e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let mach: Machine = match toml::from_str(&s) {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => {
|
||||||
|
warn!(self.log, "Failed to parse mach at path {}: {}, skipping!"
|
||||||
|
, path.display()
|
||||||
|
, e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
self.put_machine(txn, machID, mach)?;
|
||||||
|
debug!(self.log, "Loaded machine {}", machID);
|
||||||
|
} else {
|
||||||
|
warn!(self.log, "Path {} is not a file, skipping!", path.display());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn dump_db<T: Transaction>(&self, txn: &T, mut path: PathBuf) -> Result<()> {
|
||||||
|
path.push("machines");
|
||||||
|
let mut mach_cursor = txn.open_ro_cursor(self.db)?;
|
||||||
|
for buf in mach_cursor.iter_start() {
|
||||||
|
let (kbuf, vbuf) = buf?;
|
||||||
|
let machID = uuid::Uuid::from_slice(kbuf).unwrap();
|
||||||
|
let mach: Machine = flexbuffers::from_slice(vbuf)?;
|
||||||
|
let filename = format!("{}.yml", machID.to_hyphenated().to_string());
|
||||||
|
path.set_file_name(filename);
|
||||||
|
let mut fp = std::fs::File::create(&path)?;
|
||||||
|
let out = toml::to_vec(&mach)?;
|
||||||
|
fp.write_all(&out)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn init(log: Logger, config: &Settings) -> Result<MachinesProvider> {
|
pub fn init(log: Logger, config: &Settings, env: &lmdb::Environment) -> Result<MachinesProvider> {
|
||||||
unimplemented!()
|
let mut flags = lmdb::DatabaseFlags::empty();
|
||||||
|
flags.set(lmdb::DatabaseFlags::INTEGER_KEY, true);
|
||||||
|
let machdb = env.create_db(Some("machines"), flags)?;
|
||||||
|
debug!(&log, "Opened machine db successfully.");
|
||||||
|
|
||||||
|
Ok(MachinesProvider::new(log, machdb))
|
||||||
}
|
}
|
||||||
|
27
src/main.rs
27
src/main.rs
@ -132,7 +132,7 @@ fn main() -> Result<(), Error> {
|
|||||||
// Start loading the machine database, authentication system and permission system
|
// Start loading the machine database, authentication system and permission system
|
||||||
// All of those get a custom logger so the source of a log message can be better traced and
|
// All of those get a custom logger so the source of a log message can be better traced and
|
||||||
// filtered
|
// filtered
|
||||||
let machinedb_f = machine::init(log.new(o!("system" => "machines")), &config);
|
let mdb = machine::init(log.new(o!("system" => "machines")), &config, &env);
|
||||||
let pdb = access::init(log.new(o!("system" => "permissions")), &config, &env);
|
let pdb = access::init(log.new(o!("system" => "permissions")), &config, &env);
|
||||||
let authentication_f = auth::init(log.new(o!("system" => "authentication")), config.clone());
|
let authentication_f = auth::init(log.new(o!("system" => "authentication")), config.clone());
|
||||||
|
|
||||||
@ -148,7 +148,8 @@ fn main() -> Result<(), Error> {
|
|||||||
|
|
||||||
let mut txn = env.begin_rw_txn()?;
|
let mut txn = env.begin_rw_txn()?;
|
||||||
let path = path.to_path_buf();
|
let path = path.to_path_buf();
|
||||||
pdb?.load_db(&mut txn, path)?;
|
pdb?.load_db(&mut txn, path.clone())?;
|
||||||
|
mdb?.load_db(&mut txn, path)?;
|
||||||
txn.commit();
|
txn.commit();
|
||||||
} else {
|
} else {
|
||||||
error!(log, "You must provide a directory path to load from");
|
error!(log, "You must provide a directory path to load from");
|
||||||
@ -165,9 +166,9 @@ fn main() -> Result<(), Error> {
|
|||||||
|
|
||||||
let txn = env.begin_ro_txn()?;
|
let txn = env.begin_ro_txn()?;
|
||||||
let path = path.to_path_buf();
|
let path = path.to_path_buf();
|
||||||
pdb?.dump_db(&txn, path)?;
|
pdb?.dump_db(&txn, path.clone())?;
|
||||||
|
mdb?.dump_db(&txn, path)?;
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
error!(log, "You must provide a directory path to dump into");
|
error!(log, "You must provide a directory path to dump into");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -197,16 +198,16 @@ fn main() -> Result<(), Error> {
|
|||||||
}
|
}
|
||||||
}).collect();
|
}).collect();
|
||||||
|
|
||||||
let (mach, auth) = exec.run_until(async {
|
//let (mach, auth) = exec.run_until(async {
|
||||||
// Rull all futures to completion in parallel.
|
// // Rull all futures to completion in parallel.
|
||||||
// This will block until all three are done starting up.
|
// // This will block until all three are done starting up.
|
||||||
join!(machinedb_f, authentication_f)
|
// join!(machinedb_f, authentication_f)
|
||||||
});
|
//});
|
||||||
|
|
||||||
// Error out if any of the subsystems failed to start.
|
// Error out if any of the subsystems failed to start.
|
||||||
let mach = mach?;
|
let mdb = mdb?;
|
||||||
let pdb = pdb?;
|
let pdb = pdb?;
|
||||||
let auth = auth?;
|
//let auth = auth?;
|
||||||
|
|
||||||
// Since the below closures will happen at a much later time we need to make sure all pointers
|
// Since the below closures will happen at a much later time we need to make sure all pointers
|
||||||
// are still valid. Thus, Arc.
|
// are still valid. Thus, Arc.
|
||||||
@ -228,8 +229,8 @@ fn main() -> Result<(), Error> {
|
|||||||
// FIXME: implement notification so the modules can shut down cleanly instead of being killed
|
// FIXME: implement notification so the modules can shut down cleanly instead of being killed
|
||||||
// without warning.
|
// without warning.
|
||||||
let modlog = log.clone();
|
let modlog = log.clone();
|
||||||
let regs = Registries::new();
|
let mut regs = Registries::new();
|
||||||
match modules::init(modlog.new(o!("system" => "modules")), &config, &local_spawn, regs) {
|
match exec.run_until(modules::init(modlog.new(o!("system" => "modules")), config.clone(), pool.clone(), regs.clone())) {
|
||||||
Ok(()) => {}
|
Ok(()) => {}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(modlog, "Module startup failed: {}", e);
|
error!(modlog, "Module startup failed: {}", e);
|
||||||
|
@ -10,16 +10,15 @@ use slog::Logger;
|
|||||||
mod shelly;
|
mod shelly;
|
||||||
|
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use futures::task::LocalSpawn;
|
use futures::task::Spawn;
|
||||||
|
|
||||||
use crate::config::Settings;
|
use crate::config::Settings;
|
||||||
use crate::error::Result;
|
use crate::error::Result;
|
||||||
use crate::registries::Registries;
|
use crate::registries::Registries;
|
||||||
|
|
||||||
// spawner is a type that allows 'tasks' to be spawned on it, running them to completion.
|
// spawner is a type that allows 'tasks' to be spawned on it, running them to completion.
|
||||||
pub fn init<S: LocalSpawn>(log: Logger, config: &Settings, spawner: &S, registries: Registries) -> Result<()> {
|
pub async fn init<S: Spawn + Clone + Send>(log: Logger, config: Settings, spawner: S, registries: Registries) -> Result<()> {
|
||||||
let f = Box::new(shelly::run(log.clone(), config.clone(), registries.clone()));
|
shelly::run(log.clone(), config.clone(), registries.clone(), spawner.clone()).await;
|
||||||
spawner.spawn_local_obj(f.into())?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -1,30 +1,32 @@
|
|||||||
use slog::Logger;
|
use slog::Logger;
|
||||||
|
|
||||||
use crate::config::Settings;
|
use crate::config::Settings;
|
||||||
use crate::registries::{Registries, Actuator, ActBox};
|
use crate::registries::{Registries, Actuator, ActBox, StatusSignal};
|
||||||
use crate::error::Result;
|
use crate::error::Result;
|
||||||
|
use crate::machine::Status;
|
||||||
|
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
|
use futures::channel::mpsc;
|
||||||
use futures::ready;
|
use futures::ready;
|
||||||
use futures::task::{Poll, Context};
|
use futures::task::{Poll, Context, Waker, Spawn, FutureObj};
|
||||||
|
use futures::StreamExt;
|
||||||
|
use futures_signals::signal::Signal;
|
||||||
|
|
||||||
use paho_mqtt as mqtt;
|
use paho_mqtt as mqtt;
|
||||||
|
|
||||||
// TODO: Late config parsing. Right now the config is validated at the very startup in its
|
// TODO: Late config parsing. Right now the config is validated at the very startup in its
|
||||||
// entirety. This works reasonably enough for this static modules here but if we do dynamic loading
|
// entirety. This works reasonably enough for this static modules here but if we do dynamic loading
|
||||||
// via dlopen(), lua API, python API etc it will not.
|
// via dlopen(), lua API, python API etc it will not.
|
||||||
pub async fn run(log: Logger, config: Settings, registries: Registries) {
|
pub async fn run<S: Spawn>(log: Logger, config: Settings, registries: Registries, spawner: S) {
|
||||||
let shelly_r = Shelly::new(config).await;
|
let (tx, rx) = mpsc::channel(1);
|
||||||
if let Err(e) = shelly_r {
|
let mut shelly = Shelly::new(log, config, rx).await;
|
||||||
error!(log, "Shelly module errored: {}", e);
|
|
||||||
return;
|
let r = registries.actuators.register("shelly".to_string(), tx).await;
|
||||||
}
|
|
||||||
|
let f = shelly.for_each(|f| f);
|
||||||
|
spawner.spawn_obj(FutureObj::from(Box::pin(f)));
|
||||||
|
|
||||||
let r = registries.actuators.register(
|
|
||||||
"shelly".to_string(),
|
|
||||||
shelly_r.unwrap()
|
|
||||||
).await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An actuator for all Shellies connected listening on one MQTT broker
|
/// An actuator for all Shellies connected listening on one MQTT broker
|
||||||
@ -32,31 +34,87 @@ pub async fn run(log: Logger, config: Settings, registries: Registries) {
|
|||||||
/// This actuator can power toggle an arbitrariy named shelly on the broker it is connected to. If
|
/// This actuator can power toggle an arbitrariy named shelly on the broker it is connected to. If
|
||||||
/// you need to toggle shellies on multiple brokers you need multiple instanced of this actuator.
|
/// you need to toggle shellies on multiple brokers you need multiple instanced of this actuator.
|
||||||
struct Shelly {
|
struct Shelly {
|
||||||
|
log: Logger,
|
||||||
|
sigchan: mpsc::Receiver<StatusSignal>,
|
||||||
|
signal: Option<StatusSignal>,
|
||||||
|
waker: Option<Waker>,
|
||||||
|
name: String,
|
||||||
client: mqtt::AsyncClient,
|
client: mqtt::AsyncClient,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Shelly {
|
impl Shelly {
|
||||||
pub async fn new(config: Settings) -> Result<ActBox> {
|
// Can't use Error, it's not Send. fabinfra/fabaccess/bffh#7
|
||||||
let client = mqtt::AsyncClient::new(config.shelly.unwrap().mqtt_url)?;
|
pub async fn new(log: Logger, config: Settings, sigchan: mpsc::Receiver<StatusSignal>) -> Self {
|
||||||
|
let client = mqtt::AsyncClient::new(config.shelly.unwrap().mqtt_url).unwrap();
|
||||||
|
|
||||||
client.connect(mqtt::ConnectOptions::new()).await?;
|
let o = client.connect(mqtt::ConnectOptions::new()).await.unwrap();
|
||||||
|
println!("{:?}", o);
|
||||||
|
|
||||||
Ok(Box::new(Shelly { client }) as ActBox)
|
let name = "test".to_string();
|
||||||
|
let signal: Option<StatusSignal> = None;
|
||||||
|
let waker = None;
|
||||||
|
|
||||||
|
Shelly { log, sigchan, signal, waker, name, client }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl Actuator for Shelly {
|
impl Actuator for Shelly {
|
||||||
async fn power_on(&mut self, name: String) {
|
fn subscribe(&mut self, signal: StatusSignal) {
|
||||||
let topic = format!("shellies/{}/relay/0/command", name);
|
self.signal.replace(signal);
|
||||||
let msg = mqtt::Message::new(topic, "on", 0);
|
if let Some(waker) = self.waker.take() {
|
||||||
self.client.publish(msg).map(|_| ()).await
|
waker.wake();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
async fn power_off(&mut self, name: String) {
|
}
|
||||||
let topic = format!("shellies/{}/relay/0/command", name);
|
|
||||||
let msg = mqtt::Message::new(topic, "off", 0);
|
impl Stream for Shelly {
|
||||||
self.client.publish(msg).map(|_| ()).await
|
type Item = future::BoxFuture<'static, ()>;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
|
let unpin = Pin::into_inner(self);
|
||||||
|
|
||||||
|
info!(unpin.log, "tick {}", unpin.signal.is_some());
|
||||||
|
|
||||||
|
if let Poll::Ready(v) = Stream::poll_next(Pin::new(&mut unpin.sigchan), cx) {
|
||||||
|
if let Some(s) = v {
|
||||||
|
// We have received a new signal to use
|
||||||
|
unpin.signal.replace(s);
|
||||||
|
// We use `if let` instead of .and_then because we want the waker to be dropped
|
||||||
|
// afterwards. It's only there to ensure the future is called when a signal is
|
||||||
|
// installed the first time
|
||||||
|
// TODO probably don't need that here because we're polling it either way directly
|
||||||
|
// afterwards, eh?
|
||||||
|
if let Some(waker) = unpin.waker.take() {
|
||||||
|
waker.wake();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
info!(unpin.log, "bye");
|
||||||
|
// This means that the sending end was dropped, so we shut down
|
||||||
|
unpin.signal.take();
|
||||||
|
unpin.waker.take();
|
||||||
|
return Poll::Ready(None);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(ref mut s) = unpin.signal {
|
||||||
|
if let Some(status) = ready!(Signal::poll_change(Pin::new(s), cx)) {
|
||||||
|
info!(unpin.log, "Machine Status changed: {:?}", status);
|
||||||
|
let topic = format!("shellies/{}/relay/0/command", unpin.name);
|
||||||
|
let pl = match status {
|
||||||
|
Status::Free | Status::Blocked => "off",
|
||||||
|
Status::Occupied => "on",
|
||||||
|
};
|
||||||
|
let msg = mqtt::Message::new(topic, pl, 0);
|
||||||
|
let f = unpin.client.publish(msg).map(|_| ());
|
||||||
|
|
||||||
|
return Poll::Ready(Some(Box::pin(f)));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
info!(unpin.log, "I ain't got no signal son");
|
||||||
|
unpin.waker.replace(cx.waker().clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
Poll::Pending
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,8 @@
|
|||||||
use futures_signals::signal::Signal;
|
use futures_signals::signal::Signal;
|
||||||
|
|
||||||
|
use crate::machine;
|
||||||
|
use crate::access;
|
||||||
|
|
||||||
struct Network {
|
struct Network {
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -24,10 +27,12 @@ impl Network {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The internal bffh event type
|
||||||
|
///
|
||||||
|
/// Everything that BFFH considers an event is contained in an instance of this.
|
||||||
|
#[derive(PartialEq, Eq, Clone, PartialOrd, Ord, Debug)]
|
||||||
enum Event {
|
enum Event {
|
||||||
|
/// An user wants to use a machine
|
||||||
}
|
// TODO: Define /what/ an user wants to do with said machine?
|
||||||
|
MachineRequest(machine::ID, access::UserIdentifier),
|
||||||
trait Filter<S> {
|
|
||||||
fn filter(&self, f: Fn(&S) -> bool);
|
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
mod actuators;
|
mod actuators;
|
||||||
|
mod sensors;
|
||||||
|
|
||||||
pub use actuators::{Actuator, ActBox};
|
pub use actuators::{Actuator, ActBox, StatusSignal};
|
||||||
|
pub use sensors::{Sensor, SensBox};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
/// BFFH registries
|
/// BFFH registries
|
||||||
@ -9,12 +11,14 @@ pub use actuators::{Actuator, ActBox};
|
|||||||
/// reference, not clone the registries
|
/// reference, not clone the registries
|
||||||
pub struct Registries {
|
pub struct Registries {
|
||||||
pub actuators: actuators::Actuators,
|
pub actuators: actuators::Actuators,
|
||||||
|
pub sensors: sensors::Sensors,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Registries {
|
impl Registries {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Registries {
|
Registries {
|
||||||
actuators: actuators::Actuators::new()
|
actuators: actuators::Actuators::new(),
|
||||||
|
sensors: sensors::Sensors::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,16 @@
|
|||||||
|
use slog::Logger;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use smol::lock::RwLock;
|
use smol::lock::RwLock;
|
||||||
|
|
||||||
|
use std::pin::Pin;
|
||||||
|
use futures::ready;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
|
use futures::channel::mpsc;
|
||||||
|
use futures::task::{Context, Poll, Spawn};
|
||||||
|
use futures_signals::signal::Signal;
|
||||||
|
|
||||||
|
use crate::machine::Status;
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
@ -10,11 +19,9 @@ pub struct Actuators {
|
|||||||
inner: Arc<RwLock<Inner>>,
|
inner: Arc<RwLock<Inner>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe impl Send for Actuators { }
|
pub type ActBox = Box<dyn Actuator + Sync + Send + Unpin>;
|
||||||
|
|
||||||
pub type ActBox = Box<dyn Actuator>;
|
type Inner = HashMap<String, mpsc::Sender<StatusSignal>>;
|
||||||
|
|
||||||
type Inner = HashMap<String, ActBox>;
|
|
||||||
|
|
||||||
impl Actuators {
|
impl Actuators {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
@ -23,31 +30,53 @@ impl Actuators {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn register(&self, name: String, act: ActBox) {
|
pub async fn register(&self, name: String, tx: mpsc::Sender<StatusSignal>) {
|
||||||
let mut wlock = self.inner.write().await;
|
let mut wlock = self.inner.write().await;
|
||||||
// TODO: Log an error or something if that name was already taken
|
// TODO: Log an error or something if that name was already taken
|
||||||
wlock.insert(name, act);
|
wlock.insert(name, tx);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn subscribe(&mut self, name: String, signal: StatusSignal) {
|
||||||
|
let mut wlock = self.inner.write().await;
|
||||||
|
if let Some(tx) = wlock.get_mut(&name) {
|
||||||
|
tx.send(signal).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub type StatusSignal = Pin<Box<dyn Signal<Item = Status> + Send + Sync>>;
|
||||||
|
|
||||||
#[async_trait]
|
pub trait Actuator: Stream<Item = future::BoxFuture<'static, ()>> {
|
||||||
pub trait Actuator {
|
fn subscribe(&mut self, signal: StatusSignal);
|
||||||
// TODO: Is it smarter to pass a (reference to?) a machine instead of 'name'? Do we need to
|
|
||||||
// pass basically arbitrary parameters to the Actuator?
|
|
||||||
async fn power_on(&mut self, name: String);
|
|
||||||
async fn power_off(&mut self, name: String);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is merely a proof that Actuator *can* be implemented on a finite, known type. Yay for type
|
// This is merely a proof that Actuator *can* be implemented on a finite, known type. Yay for type
|
||||||
// systems with halting problems.
|
// systems with halting problems.
|
||||||
struct Dummy;
|
struct Dummy {
|
||||||
#[async_trait]
|
log: Logger,
|
||||||
|
sigchan: mpsc::Receiver<StatusSignal>,
|
||||||
|
signal: Option<StatusSignal>,
|
||||||
|
}
|
||||||
|
|
||||||
impl Actuator for Dummy {
|
impl Actuator for Dummy {
|
||||||
async fn power_on(&mut self, _name: String) {
|
fn subscribe(&mut self, signal: StatusSignal) {
|
||||||
return
|
self.signal.replace(signal);
|
||||||
}
|
}
|
||||||
async fn power_off(&mut self, _name: String) {
|
}
|
||||||
return
|
|
||||||
|
impl Stream for Dummy {
|
||||||
|
type Item = future::BoxFuture<'static, ()>;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
|
let unpin = Pin::into_inner(self);
|
||||||
|
if let Some(ref mut s) = unpin.signal {
|
||||||
|
let status = ready!(Signal::poll_change(Pin::new(s), cx));
|
||||||
|
|
||||||
|
info!(unpin.log, "Dummy actuator would set status to {:?}, but is a Dummy", status);
|
||||||
|
|
||||||
|
Poll::Ready(Some(Box::pin(futures::future::ready(()))))
|
||||||
|
} else {
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
73
src/registries/sensors.rs
Normal file
73
src/registries/sensors.rs
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
use std::pin::Pin;
|
||||||
|
use futures::task::{Context, Poll};
|
||||||
|
use futures::{Future, Stream};
|
||||||
|
use futures::future::BoxFuture;
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
use smol::lock::RwLock;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Sensors {
|
||||||
|
inner: Arc<RwLock<Inner>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Sensors {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Sensors {
|
||||||
|
inner: Arc::new(RwLock::new(Inner::new())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type SensBox = Box<dyn Sensor + Send + Sync>;
|
||||||
|
type Inner = HashMap<String, SensBox>;
|
||||||
|
|
||||||
|
|
||||||
|
// Implementing Sensors.
|
||||||
|
//
|
||||||
|
// Given the coroutine/task split stays as it is - Sensor input to machine update being one,
|
||||||
|
// machine update signal to actor doing thing being another, a Sensor implementation would send a
|
||||||
|
// Stream of futures - each future being an atomic Machine update.
|
||||||
|
#[async_trait]
|
||||||
|
/// BFFH Sensor
|
||||||
|
///
|
||||||
|
/// A sensor is anything that can forward an intent of an user to do something to bffh.
|
||||||
|
/// This may be a card reader connected to a machine, a website allowing users to select a machine
|
||||||
|
/// they want to use or something like QRHello
|
||||||
|
pub trait Sensor: Stream<Item = BoxFuture<'static, ()>> {
|
||||||
|
/// Setup the Sensor.
|
||||||
|
///
|
||||||
|
/// After this async function completes the Stream implementation should be able to generate
|
||||||
|
/// futures when polled.
|
||||||
|
/// Implementations can rely on this function being polled to completeion before the stream
|
||||||
|
/// is polled.
|
||||||
|
// TODO Is this sensible vs just having module-specific setup fns?
|
||||||
|
async fn setup(&mut self);
|
||||||
|
|
||||||
|
/// Shutdown the sensor gracefully
|
||||||
|
///
|
||||||
|
/// Implementations can rely on that the stream will not be polled after this function has been
|
||||||
|
/// called.
|
||||||
|
async fn shutdown(&mut self);
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Dummy;
|
||||||
|
#[async_trait]
|
||||||
|
impl Sensor for Dummy {
|
||||||
|
async fn setup(&mut self) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn shutdown(&mut self) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stream for Dummy {
|
||||||
|
type Item = BoxFuture<'static, ()>;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
|
Poll::Ready(Some(Box::pin(futures::future::ready(()))))
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user