Use our own MQTT URL dissector so existing configs don't break

This commit is contained in:
Nadja Reitzenstein 2022-02-26 14:16:46 +01:00
parent e6cb1a958d
commit e9b1ba1f50

View File

@ -2,7 +2,6 @@ use std::pin::Pin;
use std::task::{Poll, Context}; use std::task::{Poll, Context};
use std::sync::Mutex; use std::sync::Mutex;
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::TryFrom;
use std::future::Future; use std::future::Future;
use std::time::Duration; use std::time::Duration;
@ -18,8 +17,7 @@ use crate::network::ActorMap;
use slog::Logger; use slog::Logger;
use url::Url; use url::Url;
use crate::Error; use crate::Error::{BadConfiguration, MQTTConnectionError};
use crate::Error::{MQTTConnectionError};
pub trait Actuator { pub trait Actuator {
fn apply(&mut self, state: MachineState) -> BoxFuture<'static, ()>; fn apply(&mut self, state: MachineState) -> BoxFuture<'static, ()>;
@ -135,9 +133,32 @@ pub fn load(log: &Logger, config: &Config) -> Result<(ActorMap, Vec<Actor>)> {
let mut map = HashMap::new(); let mut map = HashMap::new();
let mqtt_url = Url::parse(config.mqtt_url.as_str())?; let mqtt_url = Url::parse(config.mqtt_url.as_str())?;
let mut mqttoptions = MqttOptions::try_from(mqtt_url) let (transport, default_port) = match mqtt_url.scheme() {
.map_err(|opt| Error::Boxed(Box::new(opt)))?; "mqtts" | "ssl" =>
mqttoptions.set_keep_alive(Duration::from_secs(20)); (rumqttc::Transport::tls_with_config(rumqttc::ClientConfig::new().into()), 8883),
"mqtt" | "tcp" => (rumqttc::Transport::tcp(), 1883),
scheme => {
error!(log, "MQTT url uses invalid scheme {}", scheme);
return Err(BadConfiguration);
}
};
let host = mqtt_url.host_str().ok_or_else(|| {
error!(log, "MQTT url must contain a hostname");
BadConfiguration
})?;
let port = mqtt_url.port().unwrap_or(default_port);
let mut mqttoptions = MqttOptions::new("bffh", host, port);
mqttoptions
.set_transport(transport)
.set_keep_alive(Duration::from_secs(20));
if !mqtt_url.username().is_empty() {
mqttoptions.set_credentials(mqtt_url.username(), mqtt_url.password().unwrap_or_default());
}
let (mqtt, mut eventloop) = AsyncClient::new(mqttoptions, 256); let (mqtt, mut eventloop) = AsyncClient::new(mqttoptions, 256);
let dlog = log.clone(); let dlog = log.clone();