diff --git a/src/actor.rs b/src/actor.rs index 28fc75a..0122512 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -2,7 +2,6 @@ use std::pin::Pin; use std::task::{Poll, Context}; use std::sync::Mutex; use std::collections::HashMap; -use std::convert::TryFrom; use std::future::Future; use std::time::Duration; @@ -18,8 +17,7 @@ use crate::network::ActorMap; use slog::Logger; use url::Url; -use crate::Error; -use crate::Error::{MQTTConnectionError}; +use crate::Error::{BadConfiguration, MQTTConnectionError}; pub trait Actuator { fn apply(&mut self, state: MachineState) -> BoxFuture<'static, ()>; @@ -135,9 +133,32 @@ pub fn load(log: &Logger, config: &Config) -> Result<(ActorMap, Vec)> { let mut map = HashMap::new(); let mqtt_url = Url::parse(config.mqtt_url.as_str())?; - let mut mqttoptions = MqttOptions::try_from(mqtt_url) - .map_err(|opt| Error::Boxed(Box::new(opt)))?; - mqttoptions.set_keep_alive(Duration::from_secs(20)); + let (transport, default_port) = match mqtt_url.scheme() { + "mqtts" | "ssl" => + (rumqttc::Transport::tls_with_config(rumqttc::ClientConfig::new().into()), 8883), + + "mqtt" | "tcp" => (rumqttc::Transport::tcp(), 1883), + + scheme => { + error!(log, "MQTT url uses invalid scheme {}", scheme); + return Err(BadConfiguration); + } + }; + let host = mqtt_url.host_str().ok_or_else(|| { + error!(log, "MQTT url must contain a hostname"); + BadConfiguration + })?; + let port = mqtt_url.port().unwrap_or(default_port); + + let mut mqttoptions = MqttOptions::new("bffh", host, port); + + mqttoptions + .set_transport(transport) + .set_keep_alive(Duration::from_secs(20)); + + if !mqtt_url.username().is_empty() { + mqttoptions.set_credentials(mqtt_url.username(), mqtt_url.password().unwrap_or_default()); + } let (mqtt, mut eventloop) = AsyncClient::new(mqttoptions, 256); let dlog = log.clone();