diff --git a/bffhd/actors/mod.rs b/bffhd/actors/mod.rs index d91da4a..9e0429c 100644 --- a/bffhd/actors/mod.rs +++ b/bffhd/actors/mod.rs @@ -5,7 +5,7 @@ use async_compat::CompatExt; use executor::pool::Executor; use futures_signals::signal::Signal; use futures_util::future::BoxFuture; -use rumqttc::{AsyncClient, ConnectionError, Event, Incoming, MqttOptions}; +use rumqttc::{AsyncClient, ConnectionError, Event, Incoming, MqttOptions, StateError}; use std::collections::HashMap; use std::future::Future; @@ -210,8 +210,25 @@ pub fn load(executor: Executor, config: &Config, resources: ResourcesHandle) -> tracing::error!(?error, "MQTT encountered IO error"); // *First* IO error does not close the client. } + Err(ConnectionError::MqttState(error)) => match error { + StateError::Io(_) if fault => { + tracing::error!(?error, "MQTT recurring IO error, closing client"); + // Repeating IO errors close client. Any Ok() in between resets fault to false. + return; + } + StateError::Io(_) => { + fault = true; + tracing::error!(?error, "MQTT encountered IO error"); + // *First* IO error does not close the client. + } + _ => { + tracing::error!(?error, "MQTT encountered state error"); + return; + } + } Err(error) => { tracing::error!(?error, "MQTT client encountered unhandled error"); + return; } }