Fix warnings

This commit is contained in:
Nadja Reitzenstein 2022-02-26 14:00:00 +01:00
parent 07e181b107
commit 53cdfeda5d
13 changed files with 105 additions and 61 deletions

View File

@ -2,20 +2,24 @@ use std::pin::Pin;
use std::task::{Poll, Context}; use std::task::{Poll, Context};
use std::sync::Mutex; use std::sync::Mutex;
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::TryFrom;
use std::future::Future; use std::future::Future;
use std::time::Duration; use std::time::Duration;
use futures::{future::BoxFuture, Stream}; use futures::{future::BoxFuture, Stream};
use futures::channel::mpsc; use futures::channel::mpsc;
use futures_signals::signal::Signal; use futures_signals::signal::Signal;
use rumqttc::{AsyncClient, ConnectionError, Event, Incoming, MqttOptions, OptionError, Transport};
use crate::db::machine::MachineState; use crate::db::machine::MachineState;
use crate::config::Config; use crate::config::Config;
use crate::error::Result; use crate::error::Result;
use crate::network::ActorMap; use crate::network::ActorMap;
use paho_mqtt::AsyncClient;
use slog::Logger; use slog::Logger;
use url::Url;
use crate::Error;
use crate::Error::{BadConfiguration, MQTTConnectionError};
pub trait Actuator { pub trait Actuator {
fn apply(&mut self, state: MachineState) -> BoxFuture<'static, ()>; fn apply(&mut self, state: MachineState) -> BoxFuture<'static, ()>;
@ -130,24 +134,65 @@ impl Actuator for Dummy {
pub fn load(log: &Logger, config: &Config) -> Result<(ActorMap, Vec<Actor>)> { pub fn load(log: &Logger, config: &Config) -> Result<(ActorMap, Vec<Actor>)> {
let mut map = HashMap::new(); let mut map = HashMap::new();
let mut mqtt = AsyncClient::new(config.mqtt_url.clone())?; let mqtt_url = Url::parse(config.mqtt_url.as_str())?;
let mut mqttoptions = MqttOptions::try_from(mqtt_url)
.map_err(|opt| Error::Boxed(Box::new(opt)))?;
mqttoptions.set_keep_alive(Duration::from_secs(20));
let (mut mqtt, mut eventloop) = AsyncClient::new(mqttoptions, 256);
let dlog = log.clone(); let dlog = log.clone();
mqtt.set_disconnected_callback(move |c, prop, reason| { let mut eventloop = smol::block_on(async move {
error!(dlog, "got Disconnect({}) message from MQTT Broker: {:?}", reason, prop); match eventloop.poll().await {
let tok = c.reconnect(); Ok(Event::Incoming(Incoming::Connect(connect))) => {},
smol::block_on(tok); Ok(e) => {
}); warn!(dlog, "Got unexpected mqtt event {:?}", e);
}
Err(connerror) => {
error!(dlog, "MQTT connection failed: {:?}", &connerror);
return Err(MQTTConnectionError(connerror));
}
}
Ok(eventloop)
})?;
let dlog = log.clone(); let dlog = log.clone();
mqtt.set_connection_lost_callback(move |c| { smol::spawn(async move {
error!(dlog, "lost connection to MQTT Broker!"); let mut fault = false;
let tok = c.reconnect(); loop {
smol::block_on(tok); match eventloop.poll().await {
Ok(_) => {
fault = false;
// TODO: Handle incoming MQTT messages
}
Err(ConnectionError::Cancel) |
Err(ConnectionError::StreamDone) |
Err(ConnectionError::RequestsDone) => {
// Normal exit
info!(dlog, "MQTT request queue closed, stopping client.");
return;
}
Err(ConnectionError::Timeout(_)) => {
error!(dlog, "MQTT operation timed out!");
warn!(dlog, "MQTT client will continue, but messages may have been lost.")
// Timeout does not close the client
}
Err(ConnectionError::Io(e)) if fault => {
error!(dlog, "MQTT recurring IO error, closing client: {}", e);
// Repeating IO errors close client. Any Ok() in between resets fault to false.
return;
}
Err(ConnectionError::Io(e)) => {
fault = true;
error!(dlog, "MQTT encountered IO error: {}", e);
// *First* IO error does not close the client.
}
Err(e) => {
error!(dlog, "MQTT client encountered unhandled error: {:?}", e);
return;
}
}
}
}); });
let conn_opts = paho_mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.finalize();
let tok = mqtt.connect(conn_opts);
smol::block_on(tok)?;
let actuators = config.actors.iter() let actuators = config.actors.iter()
.map(|(k,v)| (k, load_single(log, k, &v.module, &v.params, mqtt.clone()))) .map(|(k,v)| (k, load_single(log, k, &v.module, &v.params, mqtt.clone())))

View File

@ -6,13 +6,13 @@ use slog::Logger;
use std::sync::Arc; use std::sync::Arc;
use capnp::capability::{Params, Results, Promise}; use capnp::capability::{Promise};
use crate::schema::connection_capnp; use crate::schema::connection_capnp;
use crate::connection::Session; use crate::connection::Session;
use crate::db::Databases; use crate::db::Databases;
use crate::db::user::UserId;
use crate::network::Network; use crate::network::Network;
@ -108,7 +108,7 @@ impl connection_capnp::bootstrap::Server for Bootstrap {
_: GetAPIVersionParams, _: GetAPIVersionParams,
mut results: GetAPIVersionResults mut results: GetAPIVersionResults
) -> Promise<(), capnp::Error> { ) -> Promise<(), capnp::Error> {
let mut builder = results.get(); let builder = results.get();
let mut builder = builder.init_version(); let mut builder = builder.init_version();
builder.set_major(API_VERSION_MAJOR); builder.set_major(API_VERSION_MAJOR);
builder.set_minor(API_VERSION_MINOR); builder.set_minor(API_VERSION_MINOR);

View File

@ -6,7 +6,7 @@
use std::sync::Arc; use std::sync::Arc;
use std::rc::Rc; use std::rc::Rc;
use std::cell::RefCell; use std::cell::RefCell;
use std::ops::Deref;
use slog::Logger; use slog::Logger;
@ -22,14 +22,14 @@ use rsasl::{
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use capnp::capability::{Params, Results, Promise}; use capnp::capability::{Promise};
use crate::api::Session; use crate::api::Session;
pub use crate::schema::authenticationsystem_capnp as auth_system; pub use crate::schema::authenticationsystem_capnp as auth_system;
use crate::db::Databases; use crate::db::Databases;
use crate::db::pass::PassDB;
use crate::db::user::{Internal as UserDB, UserId, User}; use crate::db::user::{Internal as UserDB, User};
use crate::db::access::AccessControl as AccessDB; use crate::db::access::AccessControl as AccessDB;
pub struct AppData { pub struct AppData {

View File

@ -1,12 +1,12 @@
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use capnp::capability::Promise; use capnp::capability::Promise;
use capnp::Error;
use futures::FutureExt; use futures::FutureExt;
use crate::db::access::{PrivilegesBuf, PermRule, Perms}; use crate::db::access::{Perms};
use crate::db::user::UserId; use crate::db::user::UserId;
use crate::db::machine::{Status, MachineState}; use crate::db::machine::{Status, MachineState};
use crate::machine::Machine as NwMachine; use crate::machine::Machine as NwMachine;
@ -36,7 +36,7 @@ impl info::Server for Machine {
let perms = self.perms.clone(); let perms = self.perms.clone();
let f = async move { let f = async move {
if perms.manage { if perms.manage {
let mut builder = results.get(); let builder = results.get();
let mut extinfo = builder.init_machine_info_extended(); let mut extinfo = builder.init_machine_info_extended();
let guard = machine.lock().await; let guard = machine.lock().await;
@ -83,7 +83,7 @@ impl info::Server for Machine {
fn get_reservation_list( fn get_reservation_list(
&mut self, &mut self,
_: info::GetReservationListParams, _: info::GetReservationListParams,
mut results: info::GetReservationListResults, _results: info::GetReservationListResults,
) -> Promise<(), capnp::Error> { ) -> Promise<(), capnp::Error> {
Promise::err(capnp::Error::unimplemented("Reservations are unavailable".to_string())) Promise::err(capnp::Error::unimplemented("Reservations are unavailable".to_string()))
} }
@ -91,7 +91,7 @@ impl info::Server for Machine {
fn get_property_list( fn get_property_list(
&mut self, &mut self,
_: info::GetPropertyListParams, _: info::GetPropertyListParams,
mut results: info::GetPropertyListResults, _results: info::GetPropertyListResults,
) -> Promise<(), capnp::Error> { ) -> Promise<(), capnp::Error> {
Promise::err(capnp::Error::unimplemented("Extended Properties are unavailable".to_string())) Promise::err(capnp::Error::unimplemented("Extended Properties are unavailable".to_string()))
} }

View File

@ -13,7 +13,7 @@ use crate::schema::machinesystem_capnp::machine_system;
use crate::schema::machinesystem_capnp::machine_system::info as machines; use crate::schema::machinesystem_capnp::machine_system::info as machines;
use crate::network::Network; use crate::network::Network;
use crate::db::user::UserId; use crate::db::user::UserId;
use crate::db::access::{PermRule, admin_perm, Permission, Perms}; use crate::db::access::{PermRule, admin_perm, Perms};
use crate::connection::Session; use crate::connection::Session;
use crate::machine::Machine as NwMachine; use crate::machine::Machine as NwMachine;

View File

@ -7,8 +7,8 @@ use capnp::capability::Promise;
use crate::api::user::User; use crate::api::user::User;
use crate::connection::Session; use crate::connection::Session;
use crate::db::access::{PermRule, Permission}; use crate::db::access::{Permission};
use crate::db::user::{UserId, Internal as UserDB}; use crate::db::user::{Internal as UserDB};
use crate::schema::usersystem_capnp::user_system; use crate::schema::usersystem_capnp::user_system;
use crate::schema::usersystem_capnp::user_system::{info, manage}; use crate::schema::usersystem_capnp::user_system::{info, manage};
use crate::error; use crate::error;
@ -86,7 +86,7 @@ impl manage::Server for Users {
match result { match result {
Ok(()) => Promise::ok(()), Ok(()) => Promise::ok(()),
Err(e) => Promise::err(capnp::Error::failed("User lookup failed: {}".to_string())), Err(_e) => Promise::err(capnp::Error::failed("User lookup failed: {}".to_string())),
} }
} }

View File

@ -2,7 +2,7 @@ use std::fs::{File, OpenOptions};
use std::io; use std::io;
use std::io::{LineWriter, Write}; use std::io::{LineWriter, Write};
use std::sync::Mutex; use std::sync::Mutex;
use std::time::Instant;
use crate::Config; use crate::Config;
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use serde_json::Serializer; use serde_json::Serializer;

View File

@ -1,23 +1,23 @@
use std::fmt::Debug; use std::fmt::Debug;
use std::ops::DerefMut;
use futures::{AsyncRead, AsyncWrite, FutureExt}; use futures::{AsyncRead, AsyncWrite, FutureExt};
use std::future::Future; use std::future::Future;
use std::io::{IoSlice, IoSliceMut};
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll};
use async_rustls::server::TlsStream; use async_rustls::server::TlsStream;
use slog::Logger; use slog::Logger;
use smol::lock::Mutex;
use crate::api::Bootstrap; use crate::api::Bootstrap;
use crate::error::Result; use crate::error::Result;
use capnp_rpc::{rpc_twoparty_capnp, twoparty}; use capnp_rpc::{rpc_twoparty_capnp, twoparty};
use futures_util::{pin_mut, ready};
use smol::io::split; use smol::io::split;
use crate::schema::connection_capnp; use crate::schema::connection_capnp;
@ -86,7 +86,7 @@ impl ConnectionHandler {
pub fn handle<IO: 'static + Unpin + AsyncWrite + AsyncRead>(&mut self, stream: TlsStream<IO>) pub fn handle<IO: 'static + Unpin + AsyncWrite + AsyncRead>(&mut self, stream: TlsStream<IO>)
-> impl Future<Output=Result<()>> -> impl Future<Output=Result<()>>
{ {
let (mut reader, mut writer) = split(stream); let (reader, writer) = split(stream);
let boots = Bootstrap::new(self.log.new(o!()), self.db.clone(), self.network.clone()); let boots = Bootstrap::new(self.log.new(o!()), self.db.clone(), self.network.clone());
let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(boots); let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(boots);

View File

@ -1,7 +1,7 @@
use std::sync::Arc; use std::sync::Arc;
use std::path::PathBuf;
use std::str::FromStr;
use std::ops::{Deref, DerefMut};
use slog::Logger; use slog::Logger;
@ -74,7 +74,6 @@ use lmdb::{
WriteFlags, WriteFlags,
Cursor, Cursor,
RoCursor, RoCursor,
RwCursor,
Iter, Iter,
}; };
@ -359,7 +358,7 @@ mod tests {
let db = DB::new(e.env.clone(), ldb); let db = DB::new(e.env.clone(), ldb);
let adapter = TestAdapter; let _adapter = TestAdapter;
let testdb = TestDB::new(db.clone()); let testdb = TestDB::new(db.clone());
let mut val = "value"; let mut val = "value";

View File

@ -75,7 +75,7 @@ impl MachineState {
} }
pub fn init(log: Logger, config: &Config, env: Arc<lmdb::Environment>) -> Result<Internal> { pub fn init(log: Logger, config: &Config, env: Arc<lmdb::Environment>) -> Result<Internal> {
let mut flags = lmdb::DatabaseFlags::empty(); let flags = lmdb::DatabaseFlags::empty();
//flags.set(lmdb::DatabaseFlags::INTEGER_KEY, true); //flags.set(lmdb::DatabaseFlags::INTEGER_KEY, true);
let machdb = env.create_db(Some("machines"), flags)?; let machdb = env.create_db(Some("machines"), flags)?;
debug!(&log, "Opened machine db successfully."); debug!(&log, "Opened machine db successfully.");

View File

@ -7,14 +7,14 @@ use smol::Timer;
use slog::Logger; use slog::Logger;
use paho_mqtt::AsyncClient;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures_signals::signal::{Signal, Mutable, MutableSignalCloned}; use futures_signals::signal::{Signal, Mutable, MutableSignalCloned};
use crate::machine::Machine; use crate::machine::Machine;
use crate::db::machine::MachineState; use crate::db::machine::MachineState;
use crate::db::user::{User, UserId, UserData}; use crate::db::user::{UserId};
use crate::network::InitMap; use crate::network::InitMap;
@ -90,7 +90,7 @@ impl Future for Initiator {
debug!(this.log, "State change blocked"); debug!(this.log, "State change blocked");
return Poll::Pending; return Poll::Pending;
}, },
Poll::Ready(Ok(rt)) => { Poll::Ready(Ok(_rt)) => {
debug!(this.log, "State change returned ok"); debug!(this.log, "State change returned ok");
// Explicity drop the future // Explicity drop the future
let _ = this.state_change_fut.take(); let _ = this.state_change_fut.take();

View File

@ -3,34 +3,34 @@ use std::iter::FromIterator;
use std::sync::Arc; use std::sync::Arc;
use futures_util::lock::Mutex; use futures_util::lock::Mutex;
use std::path::Path; use std::path::Path;
use std::task::{Poll, Context};
use std::pin::Pin;
use std::future::Future;
use std::collections::HashMap; use std::collections::HashMap;
use std::fs; use std::fs;
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use futures::Stream;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::channel::{mpsc, oneshot};
use futures_signals::signal::Signal; use futures_signals::signal::Signal;
use futures_signals::signal::SignalExt; use futures_signals::signal::SignalExt;
use futures_signals::signal::{Mutable, ReadOnlyMutable}; use futures_signals::signal::{Mutable, ReadOnlyMutable};
use slog::Logger; use slog::Logger;
use crate::error::{Result, Error}; use crate::error::{Result};
use crate::db::{access, Databases, MachineDB, UserDB}; use crate::db::{access, Databases, MachineDB, UserDB};
use crate::db::access::{AccessControl, Perms}; use crate::db::access::{AccessControl, Perms};
use crate::db::machine::{MachineIdentifier, MachineState, Status}; use crate::db::machine::{MachineIdentifier, MachineState, Status};
use crate::db::user::{User, UserData, UserId}; use crate::db::user::{UserId};
use crate::Error::Denied; use crate::Error::Denied;
use crate::network::MachineMap; use crate::network::MachineMap;
use crate::space;
use crate::config::deser_option; use crate::config::deser_option;

View File

@ -22,7 +22,7 @@ use std::os::unix::io::AsRawFd;
use std::path::Path; use std::path::Path;
use async_rustls::TlsAcceptor; use async_rustls::TlsAcceptor;
use rustls::{Certificate, KeyLogFile, NoClientAuth, PrivateKey, ServerConfig}; use rustls::{Certificate, KeyLogFile, NoClientAuth, PrivateKey, ServerConfig};
use rustls_pemfile::Item;
use signal_hook::low_level::pipe as sigpipe; use signal_hook::low_level::pipe as sigpipe;
use crate::db::Databases; use crate::db::Databases;
@ -103,7 +103,7 @@ pub fn serve_api_connections(log: Arc<Logger>, config: Config, db: Databases, nw
} }
}).collect(); }).collect();
let local_ex = LocalExecutor::new(); let _local_ex = LocalExecutor::new();
let network = Arc::new(nw); let network = Arc::new(nw);