Burn more CPUs!

This commit is contained in:
Nadja Reitzenstein 2021-10-27 21:32:50 +02:00
parent 4844fcc0c9
commit b95d21a092
12 changed files with 253 additions and 201 deletions

145
Cargo.lock generated
View File

@ -413,7 +413,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a15248c8facb189a3c5fee74fbf1ff3adc134261d27da663b89c7d19ebaf983"
dependencies = [
"capnp",
"futures 0.3.17",
"futures",
]
[[package]]
@ -424,7 +424,7 @@ checksum = "4c4f17f96f68f2c1168ed7105d9e5cb4a095a5bef3578aee0f9c0644b85ca95e"
dependencies = [
"capnp",
"capnp-futures",
"futures 0.3.17",
"futures",
]
[[package]]
@ -472,6 +472,8 @@ dependencies = [
"libc",
"num-integer",
"num-traits",
"serde",
"time",
"winapi",
]
@ -501,15 +503,6 @@ dependencies = [
"vec_map",
]
[[package]]
name = "cmake"
version = "0.1.46"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7b858541263efe664aead4a5209a4ae5c5d2811167d4ed4ee0944503f8d2089"
dependencies = [
"cc",
]
[[package]]
name = "concurrent-queue"
version = "1.2.2"
@ -618,6 +611,7 @@ dependencies = [
"capnp-futures",
"capnp-rpc",
"capnpc",
"chrono",
"clap",
"erased-serde",
"futures-signals",
@ -626,7 +620,6 @@ dependencies = [
"lazy_static",
"libc",
"lmdb-rkv",
"paho-mqtt",
"ptr_meta",
"rand",
"rkyv",
@ -759,12 +752,6 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678"
[[package]]
name = "futures"
version = "0.3.17"
@ -773,7 +760,6 @@ checksum = "a12aa0eb539080d55c3f2d45a67c3b58b6b0773c1a3ca2dfec66d58c97fd66ca"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
@ -790,28 +776,12 @@ dependencies = [
"futures-sink",
]
[[package]]
name = "futures-channel-preview"
version = "0.3.0-alpha.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5e5f4df964fa9c1c2f8bddeb5c3611631cacd93baf810fc8bb2fb4b495c263a"
dependencies = [
"futures-core-preview",
"futures-sink-preview",
]
[[package]]
name = "futures-core"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d"
[[package]]
name = "futures-core-preview"
version = "0.3.0-alpha.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b35b6263fb1ef523c3056565fa67b1d16f0a8604ff12b11b08c25f28a734c60a"
[[package]]
name = "futures-executor"
version = "0.3.17"
@ -823,29 +793,12 @@ dependencies = [
"futures-util",
]
[[package]]
name = "futures-executor-preview"
version = "0.3.0-alpha.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75236e88bd9fe88e5e8bfcd175b665d0528fe03ca4c5207fabc028c8f9d93e98"
dependencies = [
"futures-core-preview",
"futures-util-preview",
"num_cpus",
]
[[package]]
name = "futures-io"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377"
[[package]]
name = "futures-io-preview"
version = "0.3.0-alpha.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4914ae450db1921a56c91bde97a27846287d062087d4a652efc09bb3a01ebda"
[[package]]
name = "futures-lite"
version = "1.12.0"
@ -883,20 +836,6 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "futures-preview"
version = "0.3.0-alpha.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b1dce2a0267ada5c6ff75a8ba864b4e679a9e2aa44262af7a3b5516d530d76e"
dependencies = [
"futures-channel-preview",
"futures-core-preview",
"futures-executor-preview",
"futures-io-preview",
"futures-sink-preview",
"futures-util-preview",
]
[[package]]
name = "futures-signals"
version = "0.3.23"
@ -917,12 +856,6 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36ea153c13024fe480590b3e3d4cad89a0cfacecc24577b68f86c6ced9c2bc11"
[[package]]
name = "futures-sink-preview"
version = "0.3.0-alpha.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86f148ef6b69f75bb610d4f9a2336d4fc88c4b5b67129d1a340dd0fd362efeec"
[[package]]
name = "futures-task"
version = "0.3.17"
@ -946,16 +879,6 @@ dependencies = [
"pin-utils",
]
[[package]]
name = "futures-timer"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f9eb554aa23143abc64ec4d0016f038caf53bb7cbc3d91490835c54edc96550"
dependencies = [
"futures-preview",
"pin-utils",
]
[[package]]
name = "futures-util"
version = "0.3.17"
@ -963,7 +886,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36568465210a3a6ee45e1f165136d68671471a501e632e9a98d96872222b5481"
dependencies = [
"autocfg",
"futures 0.1.31",
"futures-channel",
"futures-core",
"futures-io",
@ -978,21 +900,6 @@ dependencies = [
"slab",
]
[[package]]
name = "futures-util-preview"
version = "0.3.0-alpha.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ce968633c17e5f97936bd2797b6e38fb56cf16a7422319f7ec2e30d3c470e8d"
dependencies = [
"futures-channel-preview",
"futures-core-preview",
"futures-io-preview",
"futures-sink-preview",
"memchr",
"pin-utils",
"slab",
]
[[package]]
name = "generic-array"
version = "0.12.4"
@ -1312,16 +1219,6 @@ dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "once_cell"
version = "1.8.0"
@ -1373,28 +1270,6 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "paho-mqtt"
version = "0.8.0"
source = "git+https://github.com/dequbed/paho.mqtt.rust.git?branch=master#14ec804ecf284564ee71b04345d1fdf1f75571df"
dependencies = [
"futures 0.3.17",
"futures-timer",
"libc",
"log",
"paho-mqtt-sys",
"thiserror",
]
[[package]]
name = "paho-mqtt-sys"
version = "0.4.1"
source = "git+https://github.com/dequbed/paho.mqtt.rust.git?branch=master#14ec804ecf284564ee71b04345d1fdf1f75571df"
dependencies = [
"bindgen",
"cmake",
]
[[package]]
name = "parking"
version = "2.0.0"
@ -2089,6 +1964,16 @@ dependencies = [
"once_cell",
]
[[package]]
name = "time"
version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "tinyvec"
version = "1.5.0"

View File

@ -9,9 +9,6 @@ authors = [ "dequbed <me@dequbed.space>"
license = "LGPL-3.0"
edition = "2018"
[profile.dev]
lto = "thin"
[profile.release]
opt-level = 3
debug = true
@ -40,6 +37,7 @@ ptr_meta = "0.1"
rkyv_typename = "0.7"
rkyv_dyn = "0.7"
inventory = "0.1"
chrono = { version = "0.4", features = ["serde"] }
# Password hashing for internal users
rust-argon2 = "0.8.3"
@ -69,9 +67,6 @@ erased-serde = "0.3"
serde_dhall = { version = "0.10.1", default-features = false }
serde_json = "1.0"
# Shelly support
paho-mqtt = { git = "https://github.com/dequbed/paho.mqtt.rust.git", branch = "master", features = ["build_bindgen"] }
[build-dependencies]
capnpc = "0.14.4"
# Used in build.rs to iterate over all files in schema/

View File

@ -37,8 +37,8 @@ pub struct Config {
pub mqtt_url: String,
pub actor_connections: Box<[(String, String)]>,
pub init_connections: Box<[(String, String)]>,
pub actor_connections: Vec<(String, String)>,
pub init_connections: Vec<(String, String)>,
pub db_path: PathBuf,
@ -174,12 +174,12 @@ impl Default for Config {
actors,
initiators,
mqtt_url: "tcp://localhost:1883".to_string(),
actor_connections: Box::new([
actor_connections: vec![
("Testmachine".to_string(), "Actor".to_string()),
]),
init_connections: Box::new([
],
init_connections: vec![
("Initiator".to_string(), "Testmachine".to_string()),
]),
],
db_path: PathBuf::from("/run/bffh/database"),
roles: HashMap::new(),

View File

@ -66,7 +66,7 @@ use std::sync::Arc;
use std::path::Path;
use crate::db::user::User;
use std::collections::HashMap;
use crate::state::{State, OwnedEntry};
use crate::state::{OwnedEntry, State};
use std::iter::FromIterator;
use std::ops::Deref;
use crate::oid::{ArchivedObjectIdentifier, ObjectIdentifier};
@ -188,7 +188,8 @@ impl Dump {
let input = dbs.statedb.get_input(id)?.map(|input| {
let input: &Archived<State> = input.deref();
let hash: u64 = input.hash;
let inner: Vec<OwnedEntry> = input.inner.iter().map(|entry| {
let inner = input.inner.iter()
.map(|entry| {
let oid: &ArchivedObjectIdentifier = &entry.oid;
let bytes: &[u8] = oid.deref();
@ -207,7 +208,7 @@ impl Dump {
let output = dbs.statedb.get_output(id)?.map(|output| {
let output: &Archived<State> = output.deref();
let hash: u64 = output.hash;
let inner: Vec<OwnedEntry> = output.inner.iter().map(|entry| {
let inner = output.inner.iter().map(|entry| {
let oid: &ArchivedObjectIdentifier = &entry.oid;
let bytes: &[u8] = oid.deref();

View File

@ -54,17 +54,17 @@ impl<K, A: Adapter> Adapter for HashAdapter<K, A>
where K: Archive,
Entry<K, A::Value>: Serialize<A::Serializer>,
{
type Value = Entry<K, A::Value>;
type Serializer = A::Serializer;
type Value = Entry<K, A::Value>;
fn new_serializer() -> Self::Serializer
{ A::new_serializer() }
fn from_db_err(e: lmdb::Error) -> <A as Fallible>::Error
{ A::from_db_err(e) }
fn from_ser_err(e: <Self::Serializer as Fallible>::Error) -> <A as Fallible>::Error
{ A::from_ser_err(e) }
fn from_db_err(e: lmdb::Error) -> <A as Fallible>::Error
{ A::from_db_err(e) }
}

View File

@ -4,7 +4,6 @@ use serde_dhall;
use rsasl::SaslError;
use paho_mqtt::errors as mqtt;
use crate::db::DBError;
//FIXME use crate::network;
@ -17,7 +16,6 @@ pub enum Error {
Boxed(Box<dyn std::error::Error>),
Capnp(capnp::Error),
DB(DBError),
MQTT(mqtt::Error),
Denied,
}
@ -42,9 +40,6 @@ impl fmt::Display for Error {
Error::DB(e) => {
write!(f, "DB Error: {:?}", e)
},
Error::MQTT(e) => {
write!(f, "Paho MQTT encountered an error: {}", e)
},
Error::Denied => {
write!(f, "You do not have the permission required to do that.")
}
@ -88,10 +83,4 @@ impl From<DBError> for Error {
}
}
impl From<mqtt::Error> for Error {
fn from(e: mqtt::Error) -> Error {
Error::MQTT(e)
}
}
pub(crate) type Result<T> = std::result::Result<T, Error>;

View File

@ -1,24 +1,12 @@
// FIXME: No.
#![allow(dead_code)]
#![forbid(unused_imports)]
//mod modules;
//mod log;
//mod config;
//mod connection;
//mod machine;
//mod builtin;
//mod server;
//mod actor;
//mod initiator;
mod config;
mod db;
mod error;
mod network;
mod oid;
mod permissions;
mod resource;
mod schema;
mod state;
mod varint;
pub mod config;
pub mod db;
pub mod error;
pub mod network;
pub mod oid;
pub mod permissions;
pub mod resource;
pub mod schema;
pub mod state;
pub mod varint;

View File

@ -505,10 +505,21 @@ mod serde_support {
}
#[cfg(test)]
mod tests {
pub(crate) mod tests {
use super::*;
use std::convert::TryInto;
pub(crate) fn gen_random() -> ObjectIdentifier {
let amt: u8 = rand::random::<u8>() % 10 + 1;
let mut children = Vec::new();
for i in 0..amt {
children.push(rand::random());
}
ObjectIdentifier::build(ObjectIdentifierRoot::JointIsoItuT, 25, children)
.unwrap()
}
#[test]
fn bincode_serde_roundtrip() {
let expected = ObjectIdentifier::build(

View File

@ -7,7 +7,6 @@ use async_channel::Receiver;
use crate::state::State;
use crate::db::{
state::StateAccessor,
DBError,
};
/// A resource in BFFH has to contain several different parts;
@ -41,12 +40,31 @@ use crate::db::{
pub trait Resource {
/// Run whatever internal logic this resource has for the given State update, and return the
/// new output state that this update produces.
async fn update(&mut self, input: &State /*, internal: &State*/) -> Result<State, DBError>;
async fn on_update(&mut self, input: &State) -> Result<State, Error>;
async fn shutdown(&mut self);
}
pub struct Passthrough;
#[async_trait]
impl Resource for Passthrough {
async fn on_update(&mut self, input: &State) -> Result<State, Error> {
Ok(input.clone())
}
async fn shutdown(&mut self) {}
}
/// Error type a resource implementation can produce
#[derive(Debug)]
pub enum Error {
Internal(Box<dyn std::error::Error>),
Denied,
}
// TODO: more message context
pub struct Update {
pub state: State,
pub errchan: Sender<DBError>,
pub errchan: Sender<Error>,
}
pub struct ResourceDriver {
@ -68,7 +86,7 @@ impl ResourceDriver {
let state = update.state;
let mut errchan = update.errchan;
match self.res.update(&state).await {
match self.res.on_update(&state).await {
Ok(outstate) => {
// FIXME: Send any error here to some global error collector. A failed write to
// the DB is not necessarily fatal, but it means that BFFH is now in an
@ -89,3 +107,32 @@ impl ResourceDriver {
}
}
}
#[cfg(test)]
mod tests {
use std::pin::Pin;
use std::task::Poll;
use std::future::Future;
use super::*;
#[futures_test::test]
async fn test_passthrough_is_id() {
let inp = crate::state::tests::gen_random();
let mut res = Passthrough;
let out = res.on_update(&inp).await.unwrap();
assert_eq!(inp, out);
}
#[test]
fn test_passthrough_is_always_ready() {
let inp = State::build().finish();
let mut res = Passthrough;
let mut cx = futures_test::task::panic_context();
if let Poll::Ready(_) = Pin::new(&mut res.on_update(&inp)).poll(&mut cx) {
return;
}
panic!("Passthrough returned Poll::Pending")
}
}

View File

@ -33,6 +33,7 @@ use serde::de::Error as _;
#[derive(serde::Serialize, serde::Deserialize)]
#[derive(Archive, Serialize, Deserialize)]
#[derive(Clone, PartialEq)]
#[archive_attr(derive(Debug))]
/// State object of a resource
///
@ -54,12 +55,6 @@ impl State {
}
}
impl PartialEq for State {
fn eq(&self, other: &Self) -> bool {
self.hash == other.hash
}
}
impl PartialEq<Archived<State>> for State {
fn eq(&self, other: &Archived<Self>) -> bool {
self.hash == other.hash
@ -122,13 +117,19 @@ pub struct Entry<'a> {
pub val: &'a dyn SerializeValue,
}
#[derive(Debug, Archive, Serialize, Deserialize)]
#[derive(Debug, Clone, Archive, Serialize, Deserialize)]
#[archive_attr(derive(Debug))]
pub struct OwnedEntry {
pub oid: ObjectIdentifier,
pub val: Box<dyn SerializeValue>,
}
impl PartialEq for OwnedEntry {
fn eq(&self, other: &Self) -> bool {
self.oid == other.oid && self.val.dyn_eq(other.val.as_value())
}
}
impl<'a> serde::Serialize for Entry<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: serde::Serializer
@ -172,3 +173,75 @@ impl<'de> serde::de::Visitor<'de> for OwnedEntryVisitor {
Ok(OwnedEntry { oid, val: val.0 })
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::state::value::*;
pub(crate) fn gen_random() -> State {
let amt: u8 = rand::random::<u8>() % 20;
let mut sb = State::build();
for _ in 0..amt {
let oid = crate::oid::tests::gen_random();
sb = match rand::random::<u32>()%12 {
0 => sb.add(oid, Box::new(rand::random::<bool>())),
1 => sb.add(oid, Box::new(rand::random::<u8>())),
2 => sb.add(oid, Box::new(rand::random::<u16>())),
3 => sb.add(oid, Box::new(rand::random::<u32>())),
4 => sb.add(oid, Box::new(rand::random::<u64>())),
5 => sb.add(oid, Box::new(rand::random::<u128>())),
6 => sb.add(oid, Box::new(rand::random::<i8>())),
7 => sb.add(oid, Box::new(rand::random::<i16>())),
8 => sb.add(oid, Box::new(rand::random::<i32>())),
9 => sb.add(oid, Box::new(rand::random::<i64>())),
10 => sb.add(oid, Box::new(rand::random::<i128>())),
11 => sb.add(oid, Box::new(rand::random::<Vec3u8>())),
_ => unreachable!(),
}
}
sb.finish()
}
#[test]
fn test_equal_state_is_eq() {
let stateA = State::build()
.add(OID_POWERED.clone(), Box::new(false))
.add(OID_INTENSITY.clone(), Box::new(1024))
.finish();
let stateB = State::build()
.add(OID_POWERED.clone(), Box::new(false))
.add(OID_INTENSITY.clone(), Box::new(1024))
.finish();
assert_eq!(stateA, stateB);
}
#[test]
fn test_unequal_state_is_ne() {
let stateA = State::build()
.add(OID_POWERED.clone(), Box::new(true))
.add(OID_INTENSITY.clone(), Box::new(512))
.finish();
let stateB = State::build()
.add(OID_POWERED.clone(), Box::new(false))
.add(OID_INTENSITY.clone(), Box::new(1024))
.finish();
assert_ne!(stateA, stateB);
}
#[test]
fn test_state_is_clone() {
let stateA = gen_random();
let stateB = stateA.clone();
let stateC = stateB.clone();
drop(stateA);
assert_eq!(stateC, stateB);
}
}

View File

@ -30,7 +30,7 @@ use std::mem::MaybeUninit;
/// 2. Implement rkyv's [`Serialize`](rkyv::Serialize).
/// 3. Implement TypeOid on your Archived type (i.e. `<T as Archive>::Archived`)
/// 4. Implement this
pub trait Value: Any + fmt::Debug + erased_serde::Serialize {
pub trait Value: Any + fmt::Debug + erased_serde::Serialize + Sync {
/// Initialize `&mut self` from `deserializer`
///
/// At the point this is called &mut self is of undefined value but guaranteed to be well
@ -41,13 +41,23 @@ pub trait Value: Any + fmt::Debug + erased_serde::Serialize {
/// implementations this is important to keep in mind.
fn deserialize_init<'de>(&mut self, deserializer: &mut dyn erased_serde::Deserializer<'de>)
-> Result<(), erased_serde::Error>;
/// Implement `PartialEq` dynamically.
///
/// This should return `true` iff the Value is of the same type and `self` == `other` for
/// non-dynamic types would return `true`.
/// It is safe to always return `false`.
fn dyn_eq(&self, other: &dyn Value) -> bool;
fn as_value(&self) -> &dyn Value;
fn as_any(&self) -> &dyn Any;
}
erased_serde::serialize_trait_object!(Value);
erased_serde::serialize_trait_object!(SerializeValue);
erased_serde::serialize_trait_object!(DeserializeValue);
impl<T> Value for T
where T: Any + fmt::Debug
where T: Any + fmt::Debug + PartialEq + Sync
+ erased_serde::Serialize
+ for<'de> serde::Deserialize<'de>
{
@ -57,6 +67,24 @@ impl<T> Value for T
*self = erased_serde::deserialize(deserializer)?;
Ok(())
}
fn dyn_eq(&self, other: &dyn Value) -> bool {
other.as_any().downcast_ref().map_or(false, |other: &T| other == self)
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_value(&self) -> &dyn Value {
self
}
}
impl PartialEq for dyn Value {
fn eq(&self, other: &Self) -> bool {
self.dyn_eq(other)
}
}
#[repr(transparent)]
@ -216,12 +244,30 @@ pub trait DeserializeDynOid {
}
#[ptr_meta::pointee]
pub trait SerializeValue: Value + SerializeDynOid {}
pub trait SerializeValue: Value + SerializeDynOid {
fn dyn_clone(&self) -> Box<dyn SerializeValue>;
}
impl<T: Archive + Value + SerializeDynOid> SerializeValue for T
impl<T: Archive + Value + SerializeDynOid + Clone> SerializeValue for T
where
T::Archived: RegisteredImpl
{}
{
fn dyn_clone(&self) -> Box<dyn SerializeValue> {
Box::new(self.clone())
}
}
impl PartialEq for dyn SerializeValue {
fn eq(&self, other: &Self) -> bool {
self.dyn_eq(other.as_value())
}
}
impl Clone for Box<dyn SerializeValue> {
fn clone(&self) -> Self {
self.dyn_clone()
}
}
#[ptr_meta::pointee]
pub trait DeserializeValue: Value + DeserializeDynOid {}
@ -519,10 +565,27 @@ oidvalue!(OID_I128, i128);
#[derive(serde::Serialize, serde::Deserialize)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
#[archive_attr(derive(TypeName, Debug, serde::Serialize, serde::Deserialize))]
#[archive_attr(derive(TypeName, Debug, PartialEq, serde::Serialize, serde::Deserialize))]
pub struct Vec3u8 {
pub a: u8,
pub b: u8,
pub c: u8,
}
oidvalue!(OID_VEC3U8, Vec3u8, ArchivedVec3u8);
oidvalue!(OID_VEC3U8, Vec3u8, ArchivedVec3u8);
#[cfg(test)]
mod tests {
use rand::Rng;
use rand::distributions::Standard;
use rand::prelude::Distribution;
use crate::state::value::Vec3u8;
impl Distribution<Vec3u8> for Standard {
fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> Vec3u8 {
let a = self.sample(rng);
let b = self.sample(rng);
let c = self.sample(rng);
Vec3u8 { a, b, c }
}
}
}