From 68418161d7d25ef85d8af5ce8dfcdcfe796c7f8a Mon Sep 17 00:00:00 2001 From: Nadja Reitzenstein Date: Sat, 26 Feb 2022 14:00:26 +0100 Subject: [PATCH] Replaces paho_mqtt with rumqttc --- Cargo.lock | 238 +++++++++++++++++++++++------------------- Cargo.toml | 4 +- src/actor.rs | 2 +- src/error.rs | 28 +++-- src/main.rs | 4 +- src/modules/shelly.rs | 21 ++-- 6 files changed, 169 insertions(+), 128 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4507936..85f06e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -329,6 +329,12 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +[[package]] +name = "bytes" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" + [[package]] name = "cache-padded" version = "1.2.0" @@ -424,15 +430,6 @@ dependencies = [ "vec_map", ] -[[package]] -name = "cmake" -version = "0.1.48" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8ad8cef104ac57b68b89df3208164d228503abbdce70f6880ffa3d970e7443a" -dependencies = [ - "cc", -] - [[package]] name = "concurrent-queue" version = "1.2.2" @@ -536,9 +533,9 @@ dependencies = [ "lazy_static", "libc", "lmdb-rkv", - "paho-mqtt", "rand", "rsasl", + "rumqttc", "rust-argon2", "rustls", "rustls-pemfile", @@ -553,6 +550,7 @@ dependencies = [ "tempfile", "time", "toml", + "url", "uuid", "walkdir", ] @@ -676,6 +674,12 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "form_urlencoded" version = "1.0.1" @@ -717,28 +721,12 @@ dependencies = [ "futures-sink", ] -[[package]] -name = "futures-channel-preview" -version = "0.3.0-alpha.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5e5f4df964fa9c1c2f8bddeb5c3611631cacd93baf810fc8bb2fb4b495c263a" -dependencies = [ - "futures-core-preview", - "futures-sink-preview", -] - [[package]] name = "futures-core" version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" -[[package]] -name = "futures-core-preview" -version = "0.3.0-alpha.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b35b6263fb1ef523c3056565fa67b1d16f0a8604ff12b11b08c25f28a734c60a" - [[package]] name = "futures-executor" version = "0.3.21" @@ -751,29 +739,12 @@ dependencies = [ "num_cpus", ] -[[package]] -name = "futures-executor-preview" -version = "0.3.0-alpha.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75236e88bd9fe88e5e8bfcd175b665d0528fe03ca4c5207fabc028c8f9d93e98" -dependencies = [ - "futures-core-preview", - "futures-util-preview", - "num_cpus", -] - [[package]] name = "futures-io" version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" -[[package]] -name = "futures-io-preview" -version = "0.3.0-alpha.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4914ae450db1921a56c91bde97a27846287d062087d4a652efc09bb3a01ebda" - [[package]] name = "futures-lite" version = "1.12.0" @@ -800,20 +771,6 @@ dependencies = [ "syn", ] -[[package]] -name = "futures-preview" -version = "0.3.0-alpha.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b1dce2a0267ada5c6ff75a8ba864b4e679a9e2aa44262af7a3b5516d530d76e" -dependencies = [ - "futures-channel-preview", - "futures-core-preview", - "futures-executor-preview", - "futures-io-preview", - "futures-sink-preview", - "futures-util-preview", -] - [[package]] name = "futures-signals" version = "0.3.24" @@ -835,12 +792,6 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" -[[package]] -name = "futures-sink-preview" -version = "0.3.0-alpha.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86f148ef6b69f75bb610d4f9a2336d4fc88c4b5b67129d1a340dd0fd362efeec" - [[package]] name = "futures-task" version = "0.3.21" @@ -864,16 +815,6 @@ dependencies = [ "pin-utils", ] -[[package]] -name = "futures-timer" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f9eb554aa23143abc64ec4d0016f038caf53bb7cbc3d91490835c54edc96550" -dependencies = [ - "futures-preview", - "pin-utils", -] - [[package]] name = "futures-util" version = "0.3.21" @@ -893,21 +834,6 @@ dependencies = [ "slab", ] -[[package]] -name = "futures-util-preview" -version = "0.3.0-alpha.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ce968633c17e5f97936bd2797b6e38fb56cf16a7422319f7ec2e30d3c470e8d" -dependencies = [ - "futures-channel-preview", - "futures-core-preview", - "futures-io-preview", - "futures-sink-preview", - "memchr", - "pin-utils", - "slab", -] - [[package]] name = "genawaiter" version = "0.99.1" @@ -1019,6 +945,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "http" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "humantime" version = "1.3.0" @@ -1173,6 +1110,37 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +[[package]] +name = "mio" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba272f85fa0b41fc91872be579b3bbe0f56b792aa361a380eb669469f68dafb2" +dependencies = [ + "libc", + "log", + "miow", + "ntapi", + "winapi", +] + +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi", +] + +[[package]] +name = "mqttbytes" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7bd39d24e28e1544d74ff5746e322a477e52353c8ba7adcaa83d2e760752853" +dependencies = [ + "bytes", +] + [[package]] name = "nom" version = "5.1.2" @@ -1184,6 +1152,15 @@ dependencies = [ "version_check", ] +[[package]] +name = "ntapi" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f" +dependencies = [ + "winapi", +] + [[package]] name = "num_cpus" version = "1.13.1" @@ -1242,28 +1219,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" -[[package]] -name = "paho-mqtt" -version = "0.8.0" -source = "git+https://github.com/dequbed/paho.mqtt.rust.git?branch=master#14ec804ecf284564ee71b04345d1fdf1f75571df" -dependencies = [ - "futures 0.3.21", - "futures-timer", - "libc", - "log", - "paho-mqtt-sys", - "thiserror", -] - -[[package]] -name = "paho-mqtt-sys" -version = "0.4.1" -source = "git+https://github.com/dequbed/paho.mqtt.rust.git?branch=master#14ec804ecf284564ee71b04345d1fdf1f75571df" -dependencies = [ - "bindgen", - "cmake", -] - [[package]] name = "parking" version = "2.0.0" @@ -1398,6 +1353,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "pollster" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5da3b0203fd7ee5720aa0b5e790b591aa5d3f41c3ed2c34a3a393382198af2f7" + [[package]] name = "ppv-lite86" version = "0.2.16" @@ -1580,6 +1541,25 @@ dependencies = [ "libc", ] +[[package]] +name = "rumqttc" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e63ee9fd315db8880bf3fd3c20684dee03ca42cdd59b7d5cfdd4378f100a2aa0" +dependencies = [ + "async-channel", + "bytes", + "http", + "log", + "mqttbytes", + "pollster", + "thiserror", + "tokio", + "tokio-rustls", + "url", + "webpki", +] + [[package]] name = "rust-argon2" version = "1.0.0" @@ -1978,6 +1958,44 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" +[[package]] +name = "tokio" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" +dependencies = [ + "bytes", + "libc", + "memchr", + "mio", + "pin-project-lite", + "socket2", + "tokio-macros", + "winapi", +] + +[[package]] +name = "tokio-macros" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-rustls" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" +dependencies = [ + "rustls", + "tokio", + "webpki", +] + [[package]] name = "toml" version = "0.5.8" diff --git a/Cargo.toml b/Cargo.toml index 312d48e..85541ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,8 +50,8 @@ clap = "2.33.3" rsasl = "1.4.0" #rsasl = { path = "../../rsasl" } -# rumqtt needs tokio which I'm trying to get away from -paho-mqtt = { git = "https://github.com/dequbed/paho.mqtt.rust.git", branch = "master", features = ["build_bindgen"] } +rumqttc = { version = "0.10", features = ["url"] } +url = "2.2.2" #mlua = { version = "0.4", features = ["async", "luajit"] } diff --git a/src/actor.rs b/src/actor.rs index a295f4f..3a898a3 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -192,7 +192,7 @@ pub fn load(log: &Logger, config: &Config) -> Result<(ActorMap, Vec)> { } } } - }); + }).detach(); let actuators = config.actors.iter() .map(|(k,v)| (k, load_single(log, k, &v.module, &v.params, mqtt.clone()))) diff --git a/src/error.rs b/src/error.rs index e61cb00..8256dfa 100644 --- a/src/error.rs +++ b/src/error.rs @@ -8,12 +8,11 @@ use rsasl::SaslError; // SpawnError is a somewhat ambigous name, `use as` to make it futures::SpawnError instead. use futures::task as futures_task; -use paho_mqtt::errors as mqtt; - use crate::network; #[derive(Debug)] pub enum Error { + BadConfiguration, TomlDe(toml::de::Error), TomlSer(toml::ser::Error), Dhall(serde_dhall::Error), @@ -25,11 +24,13 @@ pub enum Error { FlexbuffersDe(flexbuffers::DeserializationError), FlexbuffersSer(flexbuffers::SerializationError), FuturesSpawn(futures_task::SpawnError), - MQTT(mqtt::Error), + MQTT(rumqttc::ClientError), + MQTTConnectionError(rumqttc::ConnectionError), BadVersion((u32,u32)), Argon2(argon2::Error), EventNetwork(network::Error), RustTls(rustls::TLSError), + ParseUrl(url::ParseError), Denied, } @@ -70,7 +71,7 @@ impl fmt::Display for Error { write!(f, "Future could not be spawned: {}", e) }, Error::MQTT(e) => { - write!(f, "Paho MQTT encountered an error: {}", e) + write!(f, "MQTT client encountered an error: {:?}", e) }, Error::Argon2(e) => { write!(f, "Argon2 en/decoding failure: {}", e) @@ -87,6 +88,9 @@ impl fmt::Display for Error { Error::RustTls(e) => { write!(f, "TLS Error: {}", e) } + Error::ParseUrl(e) => write!(f, "Failed to parse URL: {}", e), + Error::BadConfiguration => write!(f, "Bad configuration provided"), + Error::MQTTConnectionError(e) => write!(f, "MQTT connection error: {:?}", e), } } } @@ -157,18 +161,30 @@ impl From for Error { } } -impl From for Error { - fn from(e: mqtt::Error) -> Error { +impl From for Error { + fn from(e: rumqttc::ClientError) -> Error { Error::MQTT(e) } } +impl From for Error { + fn from(e: rumqttc::ConnectionError) -> Error { + Error::MQTTConnectionError(e) + } +} + impl From for Error { fn from(e: network::Error) -> Error { Error::EventNetwork(e) } } +impl From for Error { + fn from(e: url::ParseError) -> Error { + Error::ParseUrl(e) + } +} + impl From for Error { fn from(e: argon2::Error) -> Error { Error::Argon2(e) diff --git a/src/main.rs b/src/main.rs index 9857ef8..4ed00a7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -42,7 +42,7 @@ use error::Error; use slog::Logger; -use paho_mqtt::AsyncClient; + use crate::config::{ActorConn, Config, InitiatorConn}; const RELEASE: &'static str = env!("BFFHD_RELEASE_STRING"); @@ -101,7 +101,7 @@ fn main() { } else if matches.is_present("check config") { let configpath = matches.value_of("config").unwrap_or("/etc/diflouroborane.dhall"); match config::read(&PathBuf::from_str(configpath).unwrap()) { - Ok(cfg) => { + Ok(_cfg) => { //TODO: print a normalized version of the supplied config println!("config is valid"); std::process::exit(0); diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index 843e2a3..bd7f102 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -3,14 +3,12 @@ use slog::Logger; use crate::db::machine::Status; -use futures::prelude::*; use futures::future::BoxFuture; +use rumqttc::{AsyncClient, QoS}; use crate::actor::Actuator; use crate::db::machine::MachineState; -use paho_mqtt as mqtt; - /// An actuator for a Shellie connected listening on one MQTT broker /// /// This actuator will toggle the shellie with the given `name`. @@ -19,12 +17,12 @@ use paho_mqtt as mqtt; pub struct Shelly { log: Logger, name: String, - client: mqtt::AsyncClient, + client: AsyncClient, topic: String, } impl Shelly { - pub fn new(log: Logger, name: String, client: mqtt::AsyncClient, params: &HashMap) -> Self { + pub fn new(log: Logger, name: String, client: AsyncClient, params: &HashMap) -> Self { let topic = if let Some(topic) = params.get("topic") { format!("shellies/{}/relay/0/command", topic) } else { @@ -55,8 +53,17 @@ impl Actuator for Shelly { Status::InUse(_) => "on", _ => "off", }; - let msg = mqtt::Message::new(&self.topic.clone(), pl, 0); - let f = self.client.publish(msg).map(|_| ()); + + let elog = self.log.clone(); + let name = self.name.clone(); + let client = self.client.clone(); + let topic = self.topic.clone(); + let f = async move { + let res = client.publish(topic, QoS::AtLeastOnce, false, pl).await; + if let Err(e) = res { + error!(elog,"Shelly actor {} failed to update state: {:?}",name,e,); + } + }; return Box::pin(f); }