From 53cdfeda5d337c54f413bbf000071365cc82021f Mon Sep 17 00:00:00 2001 From: Nadja Reitzenstein Date: Sat, 26 Feb 2022 14:00:00 +0100 Subject: [PATCH] Fix warnings --- src/actor.rs | 77 +++++++++++++++++++++++++++++++++++---------- src/api.rs | 6 ++-- src/api/auth.rs | 8 ++--- src/api/machine.rs | 12 +++---- src/api/machines.rs | 2 +- src/api/users.rs | 6 ++-- src/audit.rs | 2 +- src/connection.rs | 16 +++++----- src/db.rs | 9 +++--- src/db/machine.rs | 2 +- src/initiator.rs | 6 ++-- src/machine.rs | 16 +++++----- src/server.rs | 4 +-- 13 files changed, 105 insertions(+), 61 deletions(-) diff --git a/src/actor.rs b/src/actor.rs index 34dab10..a295f4f 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -2,20 +2,24 @@ use std::pin::Pin; use std::task::{Poll, Context}; use std::sync::Mutex; use std::collections::HashMap; +use std::convert::TryFrom; use std::future::Future; use std::time::Duration; use futures::{future::BoxFuture, Stream}; use futures::channel::mpsc; use futures_signals::signal::Signal; +use rumqttc::{AsyncClient, ConnectionError, Event, Incoming, MqttOptions, OptionError, Transport}; use crate::db::machine::MachineState; use crate::config::Config; use crate::error::Result; use crate::network::ActorMap; -use paho_mqtt::AsyncClient; use slog::Logger; +use url::Url; +use crate::Error; +use crate::Error::{BadConfiguration, MQTTConnectionError}; pub trait Actuator { fn apply(&mut self, state: MachineState) -> BoxFuture<'static, ()>; @@ -130,24 +134,65 @@ impl Actuator for Dummy { pub fn load(log: &Logger, config: &Config) -> Result<(ActorMap, Vec)> { let mut map = HashMap::new(); - let mut mqtt = AsyncClient::new(config.mqtt_url.clone())?; + let mqtt_url = Url::parse(config.mqtt_url.as_str())?; + let mut mqttoptions = MqttOptions::try_from(mqtt_url) + .map_err(|opt| Error::Boxed(Box::new(opt)))?; + mqttoptions.set_keep_alive(Duration::from_secs(20)); + + let (mut mqtt, mut eventloop) = AsyncClient::new(mqttoptions, 256); let dlog = log.clone(); - mqtt.set_disconnected_callback(move |c, prop, reason| { - error!(dlog, "got Disconnect({}) message from MQTT Broker: {:?}", reason, prop); - let tok = c.reconnect(); - smol::block_on(tok); - }); + let mut eventloop = smol::block_on(async move { + match eventloop.poll().await { + Ok(Event::Incoming(Incoming::Connect(connect))) => {}, + Ok(e) => { + warn!(dlog, "Got unexpected mqtt event {:?}", e); + } + Err(connerror) => { + error!(dlog, "MQTT connection failed: {:?}", &connerror); + return Err(MQTTConnectionError(connerror)); + } + } + + Ok(eventloop) + })?; let dlog = log.clone(); - mqtt.set_connection_lost_callback(move |c| { - error!(dlog, "lost connection to MQTT Broker!"); - let tok = c.reconnect(); - smol::block_on(tok); + smol::spawn(async move { + let mut fault = false; + loop { + match eventloop.poll().await { + Ok(_) => { + fault = false; + // TODO: Handle incoming MQTT messages + } + Err(ConnectionError::Cancel) | + Err(ConnectionError::StreamDone) | + Err(ConnectionError::RequestsDone) => { + // Normal exit + info!(dlog, "MQTT request queue closed, stopping client."); + return; + } + Err(ConnectionError::Timeout(_)) => { + error!(dlog, "MQTT operation timed out!"); + warn!(dlog, "MQTT client will continue, but messages may have been lost.") + // Timeout does not close the client + } + Err(ConnectionError::Io(e)) if fault => { + error!(dlog, "MQTT recurring IO error, closing client: {}", e); + // Repeating IO errors close client. Any Ok() in between resets fault to false. + return; + } + Err(ConnectionError::Io(e)) => { + fault = true; + error!(dlog, "MQTT encountered IO error: {}", e); + // *First* IO error does not close the client. + } + Err(e) => { + error!(dlog, "MQTT client encountered unhandled error: {:?}", e); + return; + } + } + } }); - let conn_opts = paho_mqtt::ConnectOptionsBuilder::new() - .keep_alive_interval(Duration::from_secs(20)) - .finalize(); - let tok = mqtt.connect(conn_opts); - smol::block_on(tok)?; let actuators = config.actors.iter() .map(|(k,v)| (k, load_single(log, k, &v.module, &v.params, mqtt.clone()))) diff --git a/src/api.rs b/src/api.rs index 797b692..0918dde 100644 --- a/src/api.rs +++ b/src/api.rs @@ -6,13 +6,13 @@ use slog::Logger; use std::sync::Arc; -use capnp::capability::{Params, Results, Promise}; +use capnp::capability::{Promise}; use crate::schema::connection_capnp; use crate::connection::Session; use crate::db::Databases; -use crate::db::user::UserId; + use crate::network::Network; @@ -108,7 +108,7 @@ impl connection_capnp::bootstrap::Server for Bootstrap { _: GetAPIVersionParams, mut results: GetAPIVersionResults ) -> Promise<(), capnp::Error> { - let mut builder = results.get(); + let builder = results.get(); let mut builder = builder.init_version(); builder.set_major(API_VERSION_MAJOR); builder.set_minor(API_VERSION_MINOR); diff --git a/src/api/auth.rs b/src/api/auth.rs index eb43429..4269bbb 100644 --- a/src/api/auth.rs +++ b/src/api/auth.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use std::rc::Rc; use std::cell::RefCell; -use std::ops::Deref; + use slog::Logger; @@ -22,14 +22,14 @@ use rsasl::{ use serde::{Serialize, Deserialize}; -use capnp::capability::{Params, Results, Promise}; +use capnp::capability::{Promise}; use crate::api::Session; pub use crate::schema::authenticationsystem_capnp as auth_system; use crate::db::Databases; -use crate::db::pass::PassDB; -use crate::db::user::{Internal as UserDB, UserId, User}; + +use crate::db::user::{Internal as UserDB, User}; use crate::db::access::AccessControl as AccessDB; pub struct AppData { diff --git a/src/api/machine.rs b/src/api/machine.rs index ab30a7f..c2774b3 100644 --- a/src/api/machine.rs +++ b/src/api/machine.rs @@ -1,12 +1,12 @@ -use std::sync::Arc; + use std::time::Duration; use capnp::capability::Promise; -use capnp::Error; + use futures::FutureExt; -use crate::db::access::{PrivilegesBuf, PermRule, Perms}; +use crate::db::access::{Perms}; use crate::db::user::UserId; use crate::db::machine::{Status, MachineState}; use crate::machine::Machine as NwMachine; @@ -36,7 +36,7 @@ impl info::Server for Machine { let perms = self.perms.clone(); let f = async move { if perms.manage { - let mut builder = results.get(); + let builder = results.get(); let mut extinfo = builder.init_machine_info_extended(); let guard = machine.lock().await; @@ -83,7 +83,7 @@ impl info::Server for Machine { fn get_reservation_list( &mut self, _: info::GetReservationListParams, - mut results: info::GetReservationListResults, + _results: info::GetReservationListResults, ) -> Promise<(), capnp::Error> { Promise::err(capnp::Error::unimplemented("Reservations are unavailable".to_string())) } @@ -91,7 +91,7 @@ impl info::Server for Machine { fn get_property_list( &mut self, _: info::GetPropertyListParams, - mut results: info::GetPropertyListResults, + _results: info::GetPropertyListResults, ) -> Promise<(), capnp::Error> { Promise::err(capnp::Error::unimplemented("Extended Properties are unavailable".to_string())) } diff --git a/src/api/machines.rs b/src/api/machines.rs index 97a0d87..03c0be1 100644 --- a/src/api/machines.rs +++ b/src/api/machines.rs @@ -13,7 +13,7 @@ use crate::schema::machinesystem_capnp::machine_system; use crate::schema::machinesystem_capnp::machine_system::info as machines; use crate::network::Network; use crate::db::user::UserId; -use crate::db::access::{PermRule, admin_perm, Permission, Perms}; +use crate::db::access::{PermRule, admin_perm, Perms}; use crate::connection::Session; use crate::machine::Machine as NwMachine; diff --git a/src/api/users.rs b/src/api/users.rs index 3258edd..c617821 100644 --- a/src/api/users.rs +++ b/src/api/users.rs @@ -7,8 +7,8 @@ use capnp::capability::Promise; use crate::api::user::User; use crate::connection::Session; -use crate::db::access::{PermRule, Permission}; -use crate::db::user::{UserId, Internal as UserDB}; +use crate::db::access::{Permission}; +use crate::db::user::{Internal as UserDB}; use crate::schema::usersystem_capnp::user_system; use crate::schema::usersystem_capnp::user_system::{info, manage}; use crate::error; @@ -86,7 +86,7 @@ impl manage::Server for Users { match result { Ok(()) => Promise::ok(()), - Err(e) => Promise::err(capnp::Error::failed("User lookup failed: {}".to_string())), + Err(_e) => Promise::err(capnp::Error::failed("User lookup failed: {}".to_string())), } } diff --git a/src/audit.rs b/src/audit.rs index 833bca9..2cdf9ff 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -2,7 +2,7 @@ use std::fs::{File, OpenOptions}; use std::io; use std::io::{LineWriter, Write}; use std::sync::Mutex; -use std::time::Instant; + use crate::Config; use serde::{Serialize, Deserialize}; use serde_json::Serializer; diff --git a/src/connection.rs b/src/connection.rs index 6072463..769c94a 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,23 +1,23 @@ use std::fmt::Debug; -use std::ops::DerefMut; + use futures::{AsyncRead, AsyncWrite, FutureExt}; use std::future::Future; -use std::io::{IoSlice, IoSliceMut}; -use std::pin::Pin; -use std::rc::Rc; + + + use std::sync::Arc; -use std::task::{Context, Poll}; + use async_rustls::server::TlsStream; use slog::Logger; -use smol::lock::Mutex; + use crate::api::Bootstrap; use crate::error::Result; use capnp_rpc::{rpc_twoparty_capnp, twoparty}; -use futures_util::{pin_mut, ready}; + use smol::io::split; use crate::schema::connection_capnp; @@ -86,7 +86,7 @@ impl ConnectionHandler { pub fn handle(&mut self, stream: TlsStream) -> impl Future> { - let (mut reader, mut writer) = split(stream); + let (reader, writer) = split(stream); let boots = Bootstrap::new(self.log.new(o!()), self.db.clone(), self.network.clone()); let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(boots); diff --git a/src/db.rs b/src/db.rs index 5bec845..7581cb8 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,7 +1,7 @@ use std::sync::Arc; -use std::path::PathBuf; -use std::str::FromStr; -use std::ops::{Deref, DerefMut}; + + + use slog::Logger; @@ -74,7 +74,6 @@ use lmdb::{ WriteFlags, Cursor, RoCursor, - RwCursor, Iter, }; @@ -359,7 +358,7 @@ mod tests { let db = DB::new(e.env.clone(), ldb); - let adapter = TestAdapter; + let _adapter = TestAdapter; let testdb = TestDB::new(db.clone()); let mut val = "value"; diff --git a/src/db/machine.rs b/src/db/machine.rs index a9353ae..f9158c7 100644 --- a/src/db/machine.rs +++ b/src/db/machine.rs @@ -75,7 +75,7 @@ impl MachineState { } pub fn init(log: Logger, config: &Config, env: Arc) -> Result { - let mut flags = lmdb::DatabaseFlags::empty(); + let flags = lmdb::DatabaseFlags::empty(); //flags.set(lmdb::DatabaseFlags::INTEGER_KEY, true); let machdb = env.create_db(Some("machines"), flags)?; debug!(&log, "Opened machine db successfully."); diff --git a/src/initiator.rs b/src/initiator.rs index 39ca5d8..d4ebda1 100644 --- a/src/initiator.rs +++ b/src/initiator.rs @@ -7,14 +7,14 @@ use smol::Timer; use slog::Logger; -use paho_mqtt::AsyncClient; + use futures::future::BoxFuture; use futures_signals::signal::{Signal, Mutable, MutableSignalCloned}; use crate::machine::Machine; use crate::db::machine::MachineState; -use crate::db::user::{User, UserId, UserData}; +use crate::db::user::{UserId}; use crate::network::InitMap; @@ -90,7 +90,7 @@ impl Future for Initiator { debug!(this.log, "State change blocked"); return Poll::Pending; }, - Poll::Ready(Ok(rt)) => { + Poll::Ready(Ok(_rt)) => { debug!(this.log, "State change returned ok"); // Explicity drop the future let _ = this.state_change_fut.take(); diff --git a/src/machine.rs b/src/machine.rs index d644592..c2d9269 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -3,34 +3,34 @@ use std::iter::FromIterator; use std::sync::Arc; use futures_util::lock::Mutex; use std::path::Path; -use std::task::{Poll, Context}; -use std::pin::Pin; -use std::future::Future; + + + use std::collections::HashMap; use std::fs; use serde::{Serialize, Deserialize}; -use futures::Stream; + use futures::future::BoxFuture; -use futures::channel::{mpsc, oneshot}; + use futures_signals::signal::Signal; use futures_signals::signal::SignalExt; use futures_signals::signal::{Mutable, ReadOnlyMutable}; use slog::Logger; -use crate::error::{Result, Error}; +use crate::error::{Result}; use crate::db::{access, Databases, MachineDB, UserDB}; use crate::db::access::{AccessControl, Perms}; use crate::db::machine::{MachineIdentifier, MachineState, Status}; -use crate::db::user::{User, UserData, UserId}; +use crate::db::user::{UserId}; use crate::Error::Denied; use crate::network::MachineMap; -use crate::space; + use crate::config::deser_option; diff --git a/src/server.rs b/src/server.rs index 7775330..4f24991 100644 --- a/src/server.rs +++ b/src/server.rs @@ -22,7 +22,7 @@ use std::os::unix::io::AsRawFd; use std::path::Path; use async_rustls::TlsAcceptor; use rustls::{Certificate, KeyLogFile, NoClientAuth, PrivateKey, ServerConfig}; -use rustls_pemfile::Item; + use signal_hook::low_level::pipe as sigpipe; use crate::db::Databases; @@ -103,7 +103,7 @@ pub fn serve_api_connections(log: Arc, config: Config, db: Databases, nw } }).collect(); - let local_ex = LocalExecutor::new(); + let _local_ex = LocalExecutor::new(); let network = Arc::new(nw);