diff --git a/Cargo.lock b/Cargo.lock index c74eb6d..cd65160 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -96,6 +96,19 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-compat" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b48b4ff0c2026db683dea961cd8ea874737f56cffca86fa84415eaddc51c00d" +dependencies = [ + "futures-core", + "futures-io", + "once_cell", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-executor" version = "1.4.1" @@ -383,6 +396,12 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +[[package]] +name = "bytes" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" + [[package]] name = "cache-padded" version = "1.1.1" @@ -502,6 +521,22 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" +[[package]] +name = "core-foundation" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" + [[package]] name = "cpufeatures" version = "0.2.1" @@ -633,7 +668,7 @@ checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1" dependencies = [ "bstr", "csv-core", - "itoa", + "itoa 0.4.8", "ryu", "serde", ] @@ -699,6 +734,7 @@ dependencies = [ "anyhow", "api", "async-channel", + "async-compat", "async-net", "async-oneshot", "async-trait", @@ -726,9 +762,11 @@ dependencies = [ "rkyv_dyn", "rkyv_typename", "rsasl", + "rumqttc", "rust-argon2", "rustls", - "rustls-pemfile", + "rustls-native-certs", + "rustls-pemfile 0.3.0", "serde", "serde_dhall", "serde_json", @@ -738,6 +776,7 @@ dependencies = [ "tracing", "tracing-futures", "tracing-subscriber 0.2.25", + "url", "uuid", ] @@ -860,6 +899,12 @@ dependencies = [ "instant", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "form_urlencoded" version = "1.0.1" @@ -1135,6 +1180,17 @@ dependencies = [ "digest 0.10.3", ] +[[package]] +name = "http" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03" +dependencies = [ + "bytes", + "fnv", + "itoa 1.0.1", +] + [[package]] name = "idna" version = "0.2.3" @@ -1211,6 +1267,12 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" +[[package]] +name = "itoa" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" + [[package]] name = "js-sys" version = "0.3.55" @@ -1250,9 +1312,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.108" +version = "0.2.119" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8521a1b57e76b1ec69af7599e75e38e7b7fad6610f037db8c79b127201b5d119" +checksum = "1bf2e165bb3457c8e098ea76f3e3bc9db55f87aa90d52d0e6be741470916aaa4" [[package]] name = "lightproc" @@ -1366,6 +1428,28 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mio" +version = "0.7.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc" +dependencies = [ + "libc", + "log", + "miow", + "ntapi", + "winapi", +] + +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi", +] + [[package]] name = "nix" version = "0.23.1" @@ -1390,6 +1474,15 @@ dependencies = [ "version_check", ] +[[package]] +name = "ntapi" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f" +dependencies = [ + "winapi", +] + [[package]] name = "num-integer" version = "0.1.44" @@ -1443,6 +1536,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + [[package]] name = "os_str_bytes" version = "6.0.0" @@ -1617,6 +1716,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "pollster" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5da3b0203fd7ee5720aa0b5e790b591aa5d3f41c3ed2c34a3a393382198af2f7" + [[package]] name = "ppv-lite86" version = "0.2.15" @@ -1900,6 +2005,24 @@ dependencies = [ "stringprep", ] +[[package]] +name = "rumqttc" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29fa50cc13f4c9c4962d925c3f99f822fb19995cc527ec1d556ee7635dfa2e3d" +dependencies = [ + "async-channel", + "bytes", + "http", + "log", + "pollster", + "rustls-pemfile 0.3.0", + "thiserror", + "tokio", + "tokio-rustls", + "webpki", +] + [[package]] name = "rust-argon2" version = "0.8.3" @@ -1933,6 +2056,27 @@ dependencies = [ "webpki", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ca9ebdfa27d3fc180e42879037b5338ab1c040c06affd00d8338598e7800943" +dependencies = [ + "openssl-probe", + "rustls-pemfile 0.2.1", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9" +dependencies = [ + "base64", +] + [[package]] name = "rustls-pemfile" version = "0.3.0" @@ -1957,6 +2101,16 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" +dependencies = [ + "lazy_static", + "winapi", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -1998,6 +2152,29 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" +[[package]] +name = "security-framework" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dc14f172faf8a0194a3aded622712b0de276821addc574fa54fc0a1167e10dc" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0160a13a177a45bfb43ce71c01580998474f556ad854dcbca936dd2841a5c556" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "1.0.4" @@ -2053,7 +2230,7 @@ version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0f690853975602e1bfe1ccbf50504d67174e3bcf340f23b5ea9992e0587a52d8" dependencies = [ - "itoa", + "itoa 0.4.8", "ryu", "serde", ] @@ -2256,6 +2433,26 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" +[[package]] +name = "thiserror" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.3" @@ -2300,6 +2497,43 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" +[[package]] +name = "tokio" +version = "1.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c27a64b625de6d309e8c57716ba93021dccf1b3b5c97edd6d3dd2d2135afc0a" +dependencies = [ + "bytes", + "libc", + "memchr", + "mio", + "pin-project-lite", + "tokio-macros", + "winapi", +] + +[[package]] +name = "tokio-macros" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-rustls" +version = "0.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a27d5f2b839802bd8267fa19b0530f5a08b9c08cd417976be2a65d130fe1c11b" +dependencies = [ + "rustls", + "tokio", + "webpki", +] + [[package]] name = "toml" version = "0.5.8" diff --git a/Cargo.toml b/Cargo.toml index 7068fd1..c865912 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,6 +89,11 @@ rustls = "0.20" rustls-pemfile = "0.3.0" futures-rustls = "0.22" +rumqttc = "0.11.0" +async-compat = "0.2.1" +url = "2.2.2" +rustls-native-certs = "0.6.1" + [dev-dependencies] futures-test = "0.3.16" tempfile = "3.2" diff --git a/bffhd/actors/mod.rs b/bffhd/actors/mod.rs index a0323a9..00fccee 100644 --- a/bffhd/actors/mod.rs +++ b/bffhd/actors/mod.rs @@ -1,10 +1,23 @@ -use std::cell::Cell; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; +use crate::actors::shelly::Shelly; +use crate::resources::state::State; +use crate::{Config, ResourcesHandle}; +use async_compat::CompatExt; +use executor::pool::Executor; use futures_signals::signal::{MutableSignalRef, ReadOnlyMutable, Signal}; use futures_util::future::BoxFuture; -use crate::resources::state::State; +use rumqttc::{AsyncClient, ConnectionError, Event, Incoming, MqttOptions}; +use std::cell::Cell; +use std::collections::HashMap; +use std::future::Future; +use std::ops::Deref; +use std::pin::Pin; +use std::sync::Mutex; +use std::task::{Context, Poll}; +use std::time::Duration; +use anyhow::Context as _; +use once_cell::sync::Lazy; +use rustls::{Certificate, RootCertStore}; +use url::Url; mod shelly; @@ -23,11 +36,8 @@ pub struct ActorDriver { future: Option>, } -impl> ActorDriver -{ - pub fn new(signal: S, actor: Box) - -> Self - { +impl> ActorDriver { + pub fn new(signal: S, actor: Box) -> Self { Self { signal, actor, @@ -36,22 +46,23 @@ impl> ActorDriver } } - impl Future for ActorDriver - where S: Signal + Unpin + Send, +where + S: Signal + Unpin + Send, { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { // Work until there is no more work to do. loop { - // Poll the `apply` future. And ensure it's completed before the next one is started - match self.future.as_mut() + match self + .future + .as_mut() .map(|future| Future::poll(Pin::new(future), cx)) { // Skip and poll for a new future to do - None => { } + None => {} // This apply future is done, get a new one Some(Poll::Ready(_)) => self.future = None, @@ -61,10 +72,9 @@ impl Future for ActorDriver } // Poll the signal and apply any change that happen to the inner Actuator - match Pin::new(&mut self.signal).poll_change(cx) - { + match Pin::new(&mut self.signal).poll_change(cx) { Poll::Pending => return Poll::Pending, - Poll::Ready(None) => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(Some(state)) => { // This future MUST be polled before we exit from the Actor::poll because if we // do not do that it will not register the dependency and thus NOT BE POLLED. @@ -76,3 +86,170 @@ impl Future for ActorDriver } } +static ROOT_CERTS: Lazy = Lazy::new(|| { + let span = tracing::info_span!("loading system certificates"); + let _guard = span.enter(); + let mut store = RootCertStore::empty(); + match rustls_native_certs::load_native_certs() { + Ok(certs) => { + let certs: Vec> = certs.into_iter().map(|c| c.0).collect(); + let (loaded, ignored) = store.add_parsable_certificates(&certs[..]); + if ignored != 0 { + tracing::info!(loaded, ignored, "certificates loaded, some ignored"); + } else { + tracing::info!(loaded, "certificates loaded"); + } + }, + Err(error) => { + tracing::error!(%error, "failed to load system certificates"); + } + } + store +}); + +pub fn load(executor: Executor, config: &Config, resources: ResourcesHandle) -> anyhow::Result<()> { + let span = tracing::info_span!("loading actors"); + let _guard = span; + + let mqtt_url = Url::parse(config.mqtt_url.as_str())?; + let (transport, default_port) = match mqtt_url.scheme() { + "mqtts" | "ssl" => ( + rumqttc::Transport::tls_with_config( + rumqttc::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(ROOT_CERTS.clone()) + .with_no_client_auth() + .into(), + ), + 8883, + ), + + "mqtt" | "tcp" => (rumqttc::Transport::tcp(), 1883), + + scheme => { + tracing::error!(%scheme, "MQTT url uses invalid scheme"); + anyhow::bail!("invalid config"); + } + }; + let host = mqtt_url.host_str().ok_or_else(|| { + tracing::error!("MQTT url must contain a hostname"); + anyhow::anyhow!("invalid config") + })?; + let port = mqtt_url.port().unwrap_or(default_port); + + let mut mqttoptions = MqttOptions::new("bffh", host, port); + + mqttoptions + .set_transport(transport) + .set_keep_alive(Duration::from_secs(20)); + + if !mqtt_url.username().is_empty() { + mqttoptions.set_credentials(mqtt_url.username(), mqtt_url.password().unwrap_or_default()); + } + + let (mqtt, mut eventloop) = AsyncClient::new(mqttoptions, 256); + let mut eventloop = executor.run( + async move { + match eventloop.poll().await { + Ok(Event::Incoming(Incoming::Connect(_connect))) => {} + Ok(event) => { + tracing::warn!(?event, "Got unexpected mqtt event"); + } + Err(error) => { + tracing::error!(?error, "MQTT connection failed"); + anyhow::bail!("mqtt connection failed") + } + } + + Ok(eventloop) + } + .compat(), + )?; + + executor.spawn( + async move { + let mut fault = false; + loop { + match eventloop.poll().compat().await { + Ok(_) => { + fault = false; + // TODO: Handle incoming MQTT messages + } + Err(ConnectionError::Cancel) + | Err(ConnectionError::StreamDone) + | Err(ConnectionError::RequestsDone) => { + // Normal exit + tracing::info!("MQTT request queue closed, stopping client."); + return; + } + Err(ConnectionError::Timeout(_)) => { + tracing::error!("MQTT operation timed out!"); + tracing::warn!( + "MQTT client will continue, but messages may have been lost." + ) + // Timeout does not close the client + } + Err(ConnectionError::Io(error)) if fault => { + tracing::error!(?error, "MQTT recurring IO error, closing client"); + // Repeating IO errors close client. Any Ok() in between resets fault to false. + return; + } + Err(ConnectionError::Io(error)) => { + fault = true; + tracing::error!(?error, "MQTT encountered IO error"); + // *First* IO error does not close the client. + } + Err(error) => { + tracing::error!(?error, "MQTT client encountered unhandled error"); + return; + } + } + } + } + .compat(), + ); + + let mut actor_map: HashMap = config.actor_connections.iter() + .filter_map(|(k,v)| { + if let Some(resource) = resources.get_by_id(v) { + Some((k.clone(), resource.get_signal())) + } else { + tracing::error!(actor=%k, machine=%v, "Machine configured for actor not found!"); + None + } + }) + .collect(); + + for (name, cfg) in config.actors.iter() { + if let Some(sig) = actor_map.remove(name) { + if let Some(actor) = load_single(name, &cfg.module, &cfg.params, mqtt.clone()) { + let driver = ActorDriver::new(sig, actor); + tracing::debug!(module_name=%cfg.module, %name, "starting actor task"); + executor.spawn(driver); + } else { + tracing::error!(module_name=%cfg.module, %name, "Actor module type not found"); + } + } else { + tracing::warn!(actor=%name, ?config, "Actor has no machine configured. Skipping!"); + } + } + + Ok(()) +} + +fn load_single( + name: &String, + module_name: &String, + params: &HashMap, + client: AsyncClient, +) -> Option> { + tracing::info!(%name, %module_name, ?params, "Loading actor"); + match module_name.as_ref() { + //"Dummy" => Some(Box::new(Dummy::new())), + //"Process" => Process::new(name.clone(), params).map(|a| a.into_boxed_actuator()), + "Shelly" => Some(Box::new(Shelly::new(name.clone(), client, params))), + _ => { + None + } + } +} diff --git a/bffhd/actors/shelly.rs b/bffhd/actors/shelly.rs index e69de29..b80f530 100644 --- a/bffhd/actors/shelly.rs +++ b/bffhd/actors/shelly.rs @@ -0,0 +1,60 @@ +use std::collections::HashMap; +use futures_util::future::BoxFuture; +use rumqttc::{AsyncClient, QoS}; +use crate::actors::Actor; +use crate::resources::modules::fabaccess::Status; +use crate::resources::state::State; + +/// An actuator for a Shellie connected listening on one MQTT broker +/// +/// This actuator will toggle the shellie with the given `name`. +/// If you need to toggle shellies on multiple brokers you need multiple instanced of this +/// actuator with different clients. +pub struct Shelly { + name: String, + client: AsyncClient, + topic: String, +} + +impl Shelly { + pub fn new(name: String, client: AsyncClient, params: &HashMap) -> Self { + let topic = if let Some(topic) = params.get("topic") { + format!("shellies/{}/relay/0/command", topic) + } else { + format!("shellies/{}/relay/0/command", name) + }; + + tracing::debug!(%name,%topic,"Starting shelly module"); + + Shelly { name, client, topic, } + } + + /// Set the name to a new one. This changes the shelly that will be activated + pub fn set_name(&mut self, new_name: String) { + tracing::debug!(old=%self.name, new=%new_name, "Renaming shelly actor"); + self.name = new_name; + } +} + + +impl Actor for Shelly { + fn apply(&mut self, state: State) -> BoxFuture<'static, ()> { + tracing::debug!(?state, "Shelly changing state"); + let pl = match state.inner.state { + Status::InUse(_) => "on", + _ => "off", + }; + + let name = self.name.clone(); + let client = self.client.clone(); + let topic = self.topic.clone(); + let f = async move { + let res = client.publish(topic, QoS::AtLeastOnce, false, pl).await; + if let Err(error) = res { + tracing::error!(?error, %name, "`Shelly` actor failed to update state"); + } + }; + + return Box::pin(f); + } +} diff --git a/bffhd/lib.rs b/bffhd/lib.rs index 27fd686..501ec6d 100644 --- a/bffhd/lib.rs +++ b/bffhd/lib.rs @@ -39,10 +39,12 @@ mod logging; mod audit; mod session; +use std::collections::HashMap; use std::fs::File; use std::io::BufReader; use std::path::Path; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::time::Duration; use anyhow::Context; use futures_rustls::TlsAcceptor; use futures_util::StreamExt; @@ -101,7 +103,6 @@ impl Diflouroborane { })); RESOURCES.set(resources); - let tlsconfig = TlsConfig::new(config.tlskeylog.as_ref(), !config.is_quiet())?; let acceptor = tlsconfig.make_tls_acceptor(&config.tlsconfig)?; @@ -127,4 +128,5 @@ impl Diflouroborane { self.executor.run(f); Ok(()) } -} \ No newline at end of file +} + diff --git a/bffhd/resources/mod.rs b/bffhd/resources/mod.rs index 5191cde..a37f60d 100644 --- a/bffhd/resources/mod.rs +++ b/bffhd/resources/mod.rs @@ -24,7 +24,7 @@ pub struct PermissionDenied; pub(crate) struct Inner { id: String, db: Arc, - signal: Mutable, + signal: Mutable, desc: MachineDescription, } impl Inner { @@ -37,12 +37,12 @@ impl Inner { tracing::info!(%id, "No previous state, defaulting to `free`"); MachineState::free(None) }; - let signal = Mutable::new(state); + let signal = Mutable::new(state.to_state()); Self { id, db, signal, desc } } - pub fn signal(&self) -> impl Signal { + pub fn signal(&self) -> impl Signal { Box::pin(self.signal.signal_cloned().dedupe_cloned()) } @@ -62,7 +62,7 @@ impl Inner { let update = state.to_state(); self.db.update(self.id.as_bytes(), &update, &update).unwrap(); tracing::trace!("Updated DB, sending update signal"); - self.signal.set(state); + self.signal.set(update); tracing::trace!("Sent update signal"); } } @@ -89,6 +89,10 @@ impl Resource { &self.inner.id } + pub fn get_signal(&self) -> impl Signal { + self.inner.signal() + } + fn set_state(&self, state: MachineState) { }