Makes rumqttc futures run on the tokio runtime as required.

This commit is contained in:
Nadja Reitzenstein 2022-02-26 14:30:06 +01:00
parent e9b1ba1f50
commit ea863e71af
3 changed files with 19 additions and 3 deletions

14
Cargo.lock generated
View File

@ -79,6 +79,19 @@ dependencies = [
"futures-core", "futures-core",
] ]
[[package]]
name = "async-compat"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b48b4ff0c2026db683dea961cd8ea874737f56cffca86fa84415eaddc51c00d"
dependencies = [
"futures-core",
"futures-io",
"once_cell",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "async-executor" name = "async-executor"
version = "1.4.1" version = "1.4.1"
@ -515,6 +528,7 @@ name = "diflouroborane"
version = "0.3.2" version = "0.3.2"
dependencies = [ dependencies = [
"async-channel", "async-channel",
"async-compat",
"async-rustls", "async-rustls",
"async-trait", "async-trait",
"bincode", "bincode",

View File

@ -51,6 +51,7 @@ rsasl = "1.4.0"
#rsasl = { path = "../../rsasl" } #rsasl = { path = "../../rsasl" }
rumqttc = { version = "0.10", features = ["url"] } rumqttc = { version = "0.10", features = ["url"] }
async-compat = "0.2.1"
url = "2.2.2" url = "2.2.2"
#mlua = { version = "0.4", features = ["async", "luajit"] } #mlua = { version = "0.4", features = ["async", "luajit"] }

View File

@ -9,6 +9,7 @@ use futures::{future::BoxFuture, Stream};
use futures::channel::mpsc; use futures::channel::mpsc;
use futures_signals::signal::Signal; use futures_signals::signal::Signal;
use rumqttc::{AsyncClient, ConnectionError, Event, Incoming, MqttOptions}; use rumqttc::{AsyncClient, ConnectionError, Event, Incoming, MqttOptions};
use async_compat::CompatExt;
use crate::db::machine::MachineState; use crate::db::machine::MachineState;
use crate::config::Config; use crate::config::Config;
@ -175,12 +176,12 @@ pub fn load(log: &Logger, config: &Config) -> Result<(ActorMap, Vec<Actor>)> {
} }
Ok(eventloop) Ok(eventloop)
})?; }.compat())?;
let dlog = log.clone(); let dlog = log.clone();
smol::spawn(async move { smol::spawn(async move {
let mut fault = false; let mut fault = false;
loop { loop {
match eventloop.poll().await { match eventloop.poll().compat().await {
Ok(_) => { Ok(_) => {
fault = false; fault = false;
// TODO: Handle incoming MQTT messages // TODO: Handle incoming MQTT messages
@ -213,7 +214,7 @@ pub fn load(log: &Logger, config: &Config) -> Result<(ActorMap, Vec<Actor>)> {
} }
} }
} }
}).detach(); }.compat()).detach();
let actuators = config.actors.iter() let actuators = config.actors.iter()
.map(|(k,v)| (k, load_single(log, k, &v.module, &v.params, mqtt.clone()))) .map(|(k,v)| (k, load_single(log, k, &v.module, &v.params, mqtt.clone())))