Actor loading & configuring

This commit is contained in:
Nadja Reitzenstein 2022-03-13 21:30:26 +01:00
parent bd98f13f67
commit 613e62c7e6
6 changed files with 512 additions and 30 deletions

244
Cargo.lock generated
View File

@ -96,6 +96,19 @@ dependencies = [
"futures-core",
]
[[package]]
name = "async-compat"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b48b4ff0c2026db683dea961cd8ea874737f56cffca86fa84415eaddc51c00d"
dependencies = [
"futures-core",
"futures-io",
"once_cell",
"pin-project-lite",
"tokio",
]
[[package]]
name = "async-executor"
version = "1.4.1"
@ -383,6 +396,12 @@ version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]]
name = "cache-padded"
version = "1.1.1"
@ -502,6 +521,22 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
[[package]]
name = "core-foundation"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
[[package]]
name = "cpufeatures"
version = "0.2.1"
@ -633,7 +668,7 @@ checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1"
dependencies = [
"bstr",
"csv-core",
"itoa",
"itoa 0.4.8",
"ryu",
"serde",
]
@ -699,6 +734,7 @@ dependencies = [
"anyhow",
"api",
"async-channel",
"async-compat",
"async-net",
"async-oneshot",
"async-trait",
@ -726,9 +762,11 @@ dependencies = [
"rkyv_dyn",
"rkyv_typename",
"rsasl",
"rumqttc",
"rust-argon2",
"rustls",
"rustls-pemfile",
"rustls-native-certs",
"rustls-pemfile 0.3.0",
"serde",
"serde_dhall",
"serde_json",
@ -738,6 +776,7 @@ dependencies = [
"tracing",
"tracing-futures",
"tracing-subscriber 0.2.25",
"url",
"uuid",
]
@ -860,6 +899,12 @@ dependencies = [
"instant",
]
[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "form_urlencoded"
version = "1.0.1"
@ -1135,6 +1180,17 @@ dependencies = [
"digest 0.10.3",
]
[[package]]
name = "http"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03"
dependencies = [
"bytes",
"fnv",
"itoa 1.0.1",
]
[[package]]
name = "idna"
version = "0.2.3"
@ -1211,6 +1267,12 @@ version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
[[package]]
name = "itoa"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
[[package]]
name = "js-sys"
version = "0.3.55"
@ -1250,9 +1312,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.108"
version = "0.2.119"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8521a1b57e76b1ec69af7599e75e38e7b7fad6610f037db8c79b127201b5d119"
checksum = "1bf2e165bb3457c8e098ea76f3e3bc9db55f87aa90d52d0e6be741470916aaa4"
[[package]]
name = "lightproc"
@ -1366,6 +1428,28 @@ dependencies = [
"autocfg",
]
[[package]]
name = "mio"
version = "0.7.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc"
dependencies = [
"libc",
"log",
"miow",
"ntapi",
"winapi",
]
[[package]]
name = "miow"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21"
dependencies = [
"winapi",
]
[[package]]
name = "nix"
version = "0.23.1"
@ -1390,6 +1474,15 @@ dependencies = [
"version_check",
]
[[package]]
name = "ntapi"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f"
dependencies = [
"winapi",
]
[[package]]
name = "num-integer"
version = "0.1.44"
@ -1443,6 +1536,12 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "os_str_bytes"
version = "6.0.0"
@ -1617,6 +1716,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "pollster"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5da3b0203fd7ee5720aa0b5e790b591aa5d3f41c3ed2c34a3a393382198af2f7"
[[package]]
name = "ppv-lite86"
version = "0.2.15"
@ -1900,6 +2005,24 @@ dependencies = [
"stringprep",
]
[[package]]
name = "rumqttc"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29fa50cc13f4c9c4962d925c3f99f822fb19995cc527ec1d556ee7635dfa2e3d"
dependencies = [
"async-channel",
"bytes",
"http",
"log",
"pollster",
"rustls-pemfile 0.3.0",
"thiserror",
"tokio",
"tokio-rustls",
"webpki",
]
[[package]]
name = "rust-argon2"
version = "0.8.3"
@ -1933,6 +2056,27 @@ dependencies = [
"webpki",
]
[[package]]
name = "rustls-native-certs"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ca9ebdfa27d3fc180e42879037b5338ab1c040c06affd00d8338598e7800943"
dependencies = [
"openssl-probe",
"rustls-pemfile 0.2.1",
"schannel",
"security-framework",
]
[[package]]
name = "rustls-pemfile"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9"
dependencies = [
"base64",
]
[[package]]
name = "rustls-pemfile"
version = "0.3.0"
@ -1957,6 +2101,16 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "schannel"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75"
dependencies = [
"lazy_static",
"winapi",
]
[[package]]
name = "scopeguard"
version = "1.1.0"
@ -1998,6 +2152,29 @@ version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b"
[[package]]
name = "security-framework"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dc14f172faf8a0194a3aded622712b0de276821addc574fa54fc0a1167e10dc"
dependencies = [
"bitflags",
"core-foundation",
"core-foundation-sys",
"libc",
"security-framework-sys",
]
[[package]]
name = "security-framework-sys"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0160a13a177a45bfb43ce71c01580998474f556ad854dcbca936dd2841a5c556"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "semver"
version = "1.0.4"
@ -2053,7 +2230,7 @@ version = "1.0.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f690853975602e1bfe1ccbf50504d67174e3bcf340f23b5ea9992e0587a52d8"
dependencies = [
"itoa",
"itoa 0.4.8",
"ryu",
"serde",
]
@ -2256,6 +2433,26 @@ version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb"
[[package]]
name = "thiserror"
version = "1.0.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.3"
@ -2300,6 +2497,43 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c27a64b625de6d309e8c57716ba93021dccf1b3b5c97edd6d3dd2d2135afc0a"
dependencies = [
"bytes",
"libc",
"memchr",
"mio",
"pin-project-lite",
"tokio-macros",
"winapi",
]
[[package]]
name = "tokio-macros"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tokio-rustls"
version = "0.23.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a27d5f2b839802bd8267fa19b0530f5a08b9c08cd417976be2a65d130fe1c11b"
dependencies = [
"rustls",
"tokio",
"webpki",
]
[[package]]
name = "toml"
version = "0.5.8"

View File

@ -89,6 +89,11 @@ rustls = "0.20"
rustls-pemfile = "0.3.0"
futures-rustls = "0.22"
rumqttc = "0.11.0"
async-compat = "0.2.1"
url = "2.2.2"
rustls-native-certs = "0.6.1"
[dev-dependencies]
futures-test = "0.3.16"
tempfile = "3.2"

View File

@ -1,10 +1,23 @@
use std::cell::Cell;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::actors::shelly::Shelly;
use crate::resources::state::State;
use crate::{Config, ResourcesHandle};
use async_compat::CompatExt;
use executor::pool::Executor;
use futures_signals::signal::{MutableSignalRef, ReadOnlyMutable, Signal};
use futures_util::future::BoxFuture;
use crate::resources::state::State;
use rumqttc::{AsyncClient, ConnectionError, Event, Incoming, MqttOptions};
use std::cell::Cell;
use std::collections::HashMap;
use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::Mutex;
use std::task::{Context, Poll};
use std::time::Duration;
use anyhow::Context as _;
use once_cell::sync::Lazy;
use rustls::{Certificate, RootCertStore};
use url::Url;
mod shelly;
@ -23,11 +36,8 @@ pub struct ActorDriver<S: 'static> {
future: Option<BoxFuture<'static, ()>>,
}
impl<S: Signal<Item = State>> ActorDriver<S>
{
pub fn new(signal: S, actor: Box<dyn Actor + Send + Sync>)
-> Self
{
impl<S: Signal<Item = State>> ActorDriver<S> {
pub fn new(signal: S, actor: Box<dyn Actor + Send + Sync>) -> Self {
Self {
signal,
actor,
@ -36,22 +46,23 @@ impl<S: Signal<Item = State>> ActorDriver<S>
}
}
impl<S> Future for ActorDriver<S>
where S: Signal<Item=State> + Unpin + Send,
where
S: Signal<Item = State> + Unpin + Send,
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
// Work until there is no more work to do.
loop {
// Poll the `apply` future. And ensure it's completed before the next one is started
match self.future.as_mut()
match self
.future
.as_mut()
.map(|future| Future::poll(Pin::new(future), cx))
{
// Skip and poll for a new future to do
None => { }
None => {}
// This apply future is done, get a new one
Some(Poll::Ready(_)) => self.future = None,
@ -61,10 +72,9 @@ impl<S> Future for ActorDriver<S>
}
// Poll the signal and apply any change that happen to the inner Actuator
match Pin::new(&mut self.signal).poll_change(cx)
{
match Pin::new(&mut self.signal).poll_change(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(state)) => {
// This future MUST be polled before we exit from the Actor::poll because if we
// do not do that it will not register the dependency and thus NOT BE POLLED.
@ -76,3 +86,170 @@ impl<S> Future for ActorDriver<S>
}
}
static ROOT_CERTS: Lazy<RootCertStore> = Lazy::new(|| {
let span = tracing::info_span!("loading system certificates");
let _guard = span.enter();
let mut store = RootCertStore::empty();
match rustls_native_certs::load_native_certs() {
Ok(certs) => {
let certs: Vec<Vec<u8>> = certs.into_iter().map(|c| c.0).collect();
let (loaded, ignored) = store.add_parsable_certificates(&certs[..]);
if ignored != 0 {
tracing::info!(loaded, ignored, "certificates loaded, some ignored");
} else {
tracing::info!(loaded, "certificates loaded");
}
},
Err(error) => {
tracing::error!(%error, "failed to load system certificates");
}
}
store
});
pub fn load(executor: Executor, config: &Config, resources: ResourcesHandle) -> anyhow::Result<()> {
let span = tracing::info_span!("loading actors");
let _guard = span;
let mqtt_url = Url::parse(config.mqtt_url.as_str())?;
let (transport, default_port) = match mqtt_url.scheme() {
"mqtts" | "ssl" => (
rumqttc::Transport::tls_with_config(
rumqttc::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(ROOT_CERTS.clone())
.with_no_client_auth()
.into(),
),
8883,
),
"mqtt" | "tcp" => (rumqttc::Transport::tcp(), 1883),
scheme => {
tracing::error!(%scheme, "MQTT url uses invalid scheme");
anyhow::bail!("invalid config");
}
};
let host = mqtt_url.host_str().ok_or_else(|| {
tracing::error!("MQTT url must contain a hostname");
anyhow::anyhow!("invalid config")
})?;
let port = mqtt_url.port().unwrap_or(default_port);
let mut mqttoptions = MqttOptions::new("bffh", host, port);
mqttoptions
.set_transport(transport)
.set_keep_alive(Duration::from_secs(20));
if !mqtt_url.username().is_empty() {
mqttoptions.set_credentials(mqtt_url.username(), mqtt_url.password().unwrap_or_default());
}
let (mqtt, mut eventloop) = AsyncClient::new(mqttoptions, 256);
let mut eventloop = executor.run(
async move {
match eventloop.poll().await {
Ok(Event::Incoming(Incoming::Connect(_connect))) => {}
Ok(event) => {
tracing::warn!(?event, "Got unexpected mqtt event");
}
Err(error) => {
tracing::error!(?error, "MQTT connection failed");
anyhow::bail!("mqtt connection failed")
}
}
Ok(eventloop)
}
.compat(),
)?;
executor.spawn(
async move {
let mut fault = false;
loop {
match eventloop.poll().compat().await {
Ok(_) => {
fault = false;
// TODO: Handle incoming MQTT messages
}
Err(ConnectionError::Cancel)
| Err(ConnectionError::StreamDone)
| Err(ConnectionError::RequestsDone) => {
// Normal exit
tracing::info!("MQTT request queue closed, stopping client.");
return;
}
Err(ConnectionError::Timeout(_)) => {
tracing::error!("MQTT operation timed out!");
tracing::warn!(
"MQTT client will continue, but messages may have been lost."
)
// Timeout does not close the client
}
Err(ConnectionError::Io(error)) if fault => {
tracing::error!(?error, "MQTT recurring IO error, closing client");
// Repeating IO errors close client. Any Ok() in between resets fault to false.
return;
}
Err(ConnectionError::Io(error)) => {
fault = true;
tracing::error!(?error, "MQTT encountered IO error");
// *First* IO error does not close the client.
}
Err(error) => {
tracing::error!(?error, "MQTT client encountered unhandled error");
return;
}
}
}
}
.compat(),
);
let mut actor_map: HashMap<String, _> = config.actor_connections.iter()
.filter_map(|(k,v)| {
if let Some(resource) = resources.get_by_id(v) {
Some((k.clone(), resource.get_signal()))
} else {
tracing::error!(actor=%k, machine=%v, "Machine configured for actor not found!");
None
}
})
.collect();
for (name, cfg) in config.actors.iter() {
if let Some(sig) = actor_map.remove(name) {
if let Some(actor) = load_single(name, &cfg.module, &cfg.params, mqtt.clone()) {
let driver = ActorDriver::new(sig, actor);
tracing::debug!(module_name=%cfg.module, %name, "starting actor task");
executor.spawn(driver);
} else {
tracing::error!(module_name=%cfg.module, %name, "Actor module type not found");
}
} else {
tracing::warn!(actor=%name, ?config, "Actor has no machine configured. Skipping!");
}
}
Ok(())
}
fn load_single(
name: &String,
module_name: &String,
params: &HashMap<String, String>,
client: AsyncClient,
) -> Option<Box<dyn Actor + Sync + Send>> {
tracing::info!(%name, %module_name, ?params, "Loading actor");
match module_name.as_ref() {
//"Dummy" => Some(Box::new(Dummy::new())),
//"Process" => Process::new(name.clone(), params).map(|a| a.into_boxed_actuator()),
"Shelly" => Some(Box::new(Shelly::new(name.clone(), client, params))),
_ => {
None
}
}
}

View File

@ -0,0 +1,60 @@
use std::collections::HashMap;
use futures_util::future::BoxFuture;
use rumqttc::{AsyncClient, QoS};
use crate::actors::Actor;
use crate::resources::modules::fabaccess::Status;
use crate::resources::state::State;
/// An actuator for a Shellie connected listening on one MQTT broker
///
/// This actuator will toggle the shellie with the given `name`.
/// If you need to toggle shellies on multiple brokers you need multiple instanced of this
/// actuator with different clients.
pub struct Shelly {
name: String,
client: AsyncClient,
topic: String,
}
impl Shelly {
pub fn new(name: String, client: AsyncClient, params: &HashMap<String, String>) -> Self {
let topic = if let Some(topic) = params.get("topic") {
format!("shellies/{}/relay/0/command", topic)
} else {
format!("shellies/{}/relay/0/command", name)
};
tracing::debug!(%name,%topic,"Starting shelly module");
Shelly { name, client, topic, }
}
/// Set the name to a new one. This changes the shelly that will be activated
pub fn set_name(&mut self, new_name: String) {
tracing::debug!(old=%self.name, new=%new_name, "Renaming shelly actor");
self.name = new_name;
}
}
impl Actor for Shelly {
fn apply(&mut self, state: State) -> BoxFuture<'static, ()> {
tracing::debug!(?state, "Shelly changing state");
let pl = match state.inner.state {
Status::InUse(_) => "on",
_ => "off",
};
let name = self.name.clone();
let client = self.client.clone();
let topic = self.topic.clone();
let f = async move {
let res = client.publish(topic, QoS::AtLeastOnce, false, pl).await;
if let Err(error) = res {
tracing::error!(?error, %name, "`Shelly` actor failed to update state");
}
};
return Box::pin(f);
}
}

View File

@ -39,10 +39,12 @@ mod logging;
mod audit;
mod session;
use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
use std::path::Path;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use anyhow::Context;
use futures_rustls::TlsAcceptor;
use futures_util::StreamExt;
@ -101,7 +103,6 @@ impl Diflouroborane {
}));
RESOURCES.set(resources);
let tlsconfig = TlsConfig::new(config.tlskeylog.as_ref(), !config.is_quiet())?;
let acceptor = tlsconfig.make_tls_acceptor(&config.tlsconfig)?;
@ -127,4 +128,5 @@ impl Diflouroborane {
self.executor.run(f);
Ok(())
}
}
}

View File

@ -24,7 +24,7 @@ pub struct PermissionDenied;
pub(crate) struct Inner {
id: String,
db: Arc<StateDB>,
signal: Mutable<MachineState>,
signal: Mutable<State>,
desc: MachineDescription,
}
impl Inner {
@ -37,12 +37,12 @@ impl Inner {
tracing::info!(%id, "No previous state, defaulting to `free`");
MachineState::free(None)
};
let signal = Mutable::new(state);
let signal = Mutable::new(state.to_state());
Self { id, db, signal, desc }
}
pub fn signal(&self) -> impl Signal<Item=MachineState> {
pub fn signal(&self) -> impl Signal<Item=State> {
Box::pin(self.signal.signal_cloned().dedupe_cloned())
}
@ -62,7 +62,7 @@ impl Inner {
let update = state.to_state();
self.db.update(self.id.as_bytes(), &update, &update).unwrap();
tracing::trace!("Updated DB, sending update signal");
self.signal.set(state);
self.signal.set(update);
tracing::trace!("Sent update signal");
}
}
@ -89,6 +89,10 @@ impl Resource {
&self.inner.id
}
pub fn get_signal(&self) -> impl Signal<Item=State> {
self.inner.signal()
}
fn set_state(&self, state: MachineState) {
}