initial commit

This commit is contained in:
LastExceed 2024-02-11 11:54:02 +01:00
commit 938dffc63d
10 changed files with 1561 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

1019
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

21
Cargo.toml Normal file
View File

@ -0,0 +1,21 @@
[package]
name = "spacermake"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# paho-mqtt = { version = "0.12.3", default-features=false, features = ["bundled"] }
tokio = { version = "1.35.1", features = ["rt-multi-thread", "fs", "sync"] }
rumqttc = "0.23.0"
json = "0.12.4"
boolinator = "2.4.0"
itertools = "0.12.1"
csv = "1.3.0"
chrono = "0.4.33"
serde = { version = "1.0.196", features = ["derive"] }
tap = "1.0.1"
lazy_static = "1.4.0"
toml = "0.8.10"
futures = "0.3.30"

53
src/main.rs Normal file
View File

@ -0,0 +1,53 @@
use std::time::Duration;
use std::collections::{HashMap, HashSet};
use lazy_static::*;
use rumqttc::{AsyncClient, EventLoop, MqttOptions, QoS};
use state::{Announcer, Listener, State};
use utils::parse_toml_file;
mod state;
mod utils;
pub const BOOKING_TOPIC: &str = "fabaccess/log";
lazy_static! {
static ref SLAVES_BY_MASTER: HashMap<String, HashSet<String>> = parse_toml_file("master-slave_relations.toml");
static ref SLAVE_PROPERTIES: HashMap<String, [bool; 3]> = parse_toml_file("slave_properties.toml");
static ref MACHINE_IDS: HashMap<String, String> = parse_toml_file::<toml::Table>("/root/fabfire/config.toml")
["readers"]
.as_table()
.unwrap()
.iter()
.map(|(_key, value)| {
let entry = value.as_table().unwrap();
(
entry["machine"].as_str().unwrap().replace("urn:fabaccess:resource:", ""),
entry["id"].as_str().unwrap().into()
)
})
.collect();
}
#[tokio::main]
async fn main() {
let (client, event_loop) = create_client().await;
let listener = State::new(Listener, client);
let announcer = listener.duplicate_as(Announcer);
tokio::spawn(announcer.run());
listener.run(event_loop).await;
}
async fn create_client() -> (AsyncClient, EventLoop) {
let mut mqttoptions = MqttOptions::new("spacermake", "mqtt.makerspace-bocholt.local", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
let (client, event_loop) = AsyncClient::new(mqttoptions, 10);
client.subscribe("tele/+/MARGINS", QoS::AtMostOnce).await.expect("failed to subscribe");
client.subscribe(BOOKING_TOPIC, QoS::AtMostOnce).await.expect("failed to subscribe");
(client, event_loop)
}

66
src/state.rs Normal file
View File

@ -0,0 +1,66 @@
use std::time::Instant;
use std::sync::Arc;
use std::collections::{HashMap, VecDeque};
use rumqttc::{AsyncClient, QoS};
use tokio::sync::RwLock;
use crate::utils::booking::Booking;
use crate::SLAVE_PROPERTIES;
mod announcer;
mod listener;
//markers
pub struct Listener;
pub struct Announcer;
pub struct State<Kind> {
pub kind: Kind,
pub client: Arc<RwLock<AsyncClient>>,
pub bookings: Arc<RwLock<HashMap<String, Booking>>>,
pub scheduled_shutdowns: Arc<RwLock<VecDeque<(Instant, String)>>>
}
impl<Kind> State<Kind> {
pub fn new(kind: Kind, client: AsyncClient) -> Self {
Self {
kind,
client: Arc::new(RwLock::new(client)),
bookings: Default::default(),
scheduled_shutdowns: Default::default()
}
}
pub fn duplicate_as<NewKind>(&self, kind: NewKind) -> State<NewKind> {
State {
kind,
client: Arc::clone(&self.client),
bookings: Arc::clone(&self.bookings),
scheduled_shutdowns: Arc::clone(&self.scheduled_shutdowns)
}
}
//probably doesn't belong here, dunno where else to put it
async fn update_power_state(&self, machine: &str, new_state: bool) {
self.client
.read()
.await
.publish(
get_slave_topic(machine),
QoS::AtMostOnce,
false,
if new_state { b"ON".as_slice() } else { b"OFF".as_slice() }
)
.await
.expect("failed to publish");
}
}
fn get_slave_topic(machine: &str) -> String {
if SLAVE_PROPERTIES[machine][2] {
format!("cmnd/{machine}/Power")
} else {
format!("shellies/{machine}/relay/0")
}
}

73
src/state/announcer.rs Normal file
View File

@ -0,0 +1,73 @@
use std::time::{Duration, Instant};
use futures::join;
use futures::future::join_all;
use rumqttc::QoS;
use tap::Pipe;
use tokio::time::sleep;
use crate::MACHINE_IDS;
use crate::utils::{create_display_update_message, minute_mark};
use crate::{Announcer, State};
impl State<Announcer> {
pub async fn run(self) {
loop {
join!(
self.update_all_runtime_displays(),
self.perform_scheduled_shutdowns()
);
sleep(Duration::from_secs(1)).await;
}
}
async fn update_all_runtime_displays(&self) {
self.bookings
.read()
.await
.iter()
.filter_map(|(machine, booking)| {
if !booking.is_running() || !minute_mark(booking.total_runtime()) {
return None;
}
let Some(id) = MACHINE_IDS.get(machine) else {
println!("error: no ID found for {machine}");
return None;
};
let future = self.update_runtime_display(id, booking.total_runtime());
Some(future)
})
.pipe(join_all)
.await;
}
async fn update_runtime_display(&self, machine_id: &str, runtime: Duration) {
self.client
.read()
.await
.publish(
format!("/cmnd/reader/{machine_id}"),
QoS::AtMostOnce,
false,
create_display_update_message(runtime)
)
.await
.expect("failed to publish display update");
}
async fn perform_scheduled_shutdowns(&self) {
let now = Instant::now();
let mut schedule = self.scheduled_shutdowns.write().await;
while let Some((time, machine)) = schedule.front() {
if time > &now {
break;
}
self.update_power_state(&machine, false).await;
schedule.pop_front();
}
}
}

163
src/state/listener.rs Normal file
View File

@ -0,0 +1,163 @@
use std::ops::Sub;
use std::time::{Duration, Instant};
use boolinator::Boolinator;
use rumqttc::EventLoop;
use rumqttc::Event::Incoming;
use rumqttc::Packet::Publish;
use crate::{State, Listener, BOOKING_TOPIC, SLAVES_BY_MASTER, SLAVE_PROPERTIES};
use crate::utils::get_power_state;
use crate::utils::logs::{log_debug, machinelog};
use crate::utils::booking::Booking;
impl State<Listener> {
pub async fn run(mut self, mut event_loop: EventLoop) {
loop {
let Incoming(Publish(publish)) = event_loop
.poll()
.await
.expect("notification error")
else { continue };
self.on_publish(publish).await;
}
}
async fn on_publish(&mut self, publish: rumqttc::Publish) {
//pretty ugly, cant figure out a clean way to do this
let Ok(payload) = String::from_utf8(publish.payload.clone().into())
else {
log_debug(&publish.topic, &format!("{:?}", &publish.payload), Err("non-utf8 payload"))
.expect("debug log failed");
return;
};
//end of ugly
let result = self.handle_payload(&publish.topic, &payload).await;
log_debug(&publish.topic, &payload, result)
.expect("debug log failed")
}
async fn handle_payload(&mut self, topic: &str, payload: &str) -> Result<(), &str> {
let splits: Result<[_; 3], _> = topic
.split('/')
.collect::<Vec<_>>()
.try_into();
match splits {
Ok(["tele", machine_name, "MARGINS"])
=> self.on_machine_activity(payload, &machine_name.into()).await,
_ if topic == BOOKING_TOPIC
=> self.on_booking_change(payload).await,
_ => Err("unknown topic")
}
}
async fn on_booking_change(&mut self, payload: &str) -> Result<(), &'static str> {
let [machine, user, status] = payload
.split(';')
.map(String::from)
.collect::<Vec<_>>()
.try_into()
.map_err(|_| "unexpected data count in payload")?;
match status.as_str() {
"booked" => self.try_book(&machine, &user).await?,
"released" => self.try_release(&machine).await?,
_ => return Ok(()) //ignore other statuses
}
println!("info: {user} {status} {machine}");
Ok(())
}
#[allow(clippy::ptr_arg)] //false positive
async fn try_book(&mut self, machine: &String, user: &String) -> Result<(), &'static str> {
let mut bookings = self.bookings.write().await;
if bookings.contains_key(machine) {
return Err("machine got double-booked");
}
bookings.insert(machine.clone(), Booking::new(user.clone()));
drop(bookings);
self.update_slaves(machine, false, true, true).await
}
async fn try_release(&mut self, machine: &String) -> Result<(), &'static str> {
let mut booking = self
.bookings
.write()
.await
.remove(machine)
.ok_or("released unbooked machine")?;
let was_running = booking.track(false);
self.update_slaves(machine, was_running, true, false).await?;
machinelog(machine, &booking)
.expect("machine log failed");
Ok(())
}
async fn on_machine_activity(&mut self, payload: &str, machine: &String) -> Result<(), &'static str> {
let power_string = get_power_state(payload)?;
let (power, err) =
match power_string.as_str() {
"ON" => (true, "machine was turned on while already running"),
"OFF" => (false, "machine was turned off without running in the first place"),
_ => return Err("unknown power state")
};
self.bookings
.write()
.await
.get_mut(machine)
.ok_or("received activity from unbooked machine")?
.track(power)
.as_result((), err)?;
self.update_slaves(machine, true, false, power).await?;
println!("info: {machine} got turned {power_string}");
Ok(())
}
pub async fn update_slaves(&mut self, master: &String, short_slaves: bool, long_slaves: bool, power: bool) -> Result<(), &'static str> {
let slaves_used_by_others = self
.bookings
.read()
.await
.iter()
.filter(|(_, booking)| booking.is_running())
.flat_map(|(machine, _)| &SLAVES_BY_MASTER[machine]) //todo: error handing
.cloned()
.collect();
let slaves_to_update = SLAVES_BY_MASTER
.get(master)
.ok_or("unknown master")?
.sub(&slaves_used_by_others)
.into_iter()
.filter(|slave| if SLAVE_PROPERTIES[slave][0] { long_slaves } else { short_slaves });
for slave in slaves_to_update {
if !power && SLAVE_PROPERTIES[&slave][1] {
let shutdown_timestamp = Instant::now() + Duration::from_secs(30);
self.scheduled_shutdowns.write().await.push_back((shutdown_timestamp, slave));
continue;
}
self.update_power_state(&slave, power).await;
}
Ok(())
}
}

46
src/utils.rs Normal file
View File

@ -0,0 +1,46 @@
use std::fs;
use std::time::Duration;
use serde::de::DeserializeOwned;
pub mod logs;
pub mod booking;
pub fn parse_toml_file<T: DeserializeOwned>(path: &str) -> T {
let file_content = fs::read_to_string(path).expect("failed to read .toml file");
toml::from_str(&file_content).expect("failed to parse toml")
}
pub fn get_power_state(payload: &str) -> Result<String, &'static str> {
//todo: there gotta be an easier way to do this
json::parse(payload)
.map_err(|_| "payload is not a valid json string")?
.entries()
.find(|(key, _value)| *key == "MARGINS")
.ok_or("no MARGINS data present in payload")?
.1
.entries()
.find(|(key, _value)| *key == "PowerHigh")
.ok_or("no powerHigh information")?
.1
.as_str()
.ok_or("powerHigh state was not a string")
.map(str::to_string)
}
///whether this duration crossed a minute boundary within the last second
pub fn minute_mark(duration: Duration) -> bool {
duration.as_secs() % 60 == 0
}
pub fn create_display_update_message(runtime: Duration) -> String {
let hours = runtime.as_secs() / 3600;
let minutes = runtime.as_secs() / 60 % 60;
json::object! {
Cmd: "message",
MssgID: 12,
ClrTxt: "Nutzungsdauer",
AddnTxt: format!("{hours:.0}:{minutes:0>2.0}")
}.to_string()
}

55
src/utils/booking.rs Normal file
View File

@ -0,0 +1,55 @@
use std::time::{Duration, Instant};
use chrono::{DateTime, Local};
pub struct Booking {
pub user: String,
pub creation_datetime: DateTime<Local>,
pub creation_instant: Instant,
pub currently_running_since: Option<Instant>,
runtime_accumulator: Duration
}
impl Booking {
pub fn new(user: String) -> Self {
Self {
user,
creation_datetime: Local::now(),
creation_instant: Instant::now(),
currently_running_since: None,
runtime_accumulator: Duration::ZERO
}
}
pub fn track(&mut self, power: bool) -> bool {
if self.is_running() == power {
return false;
}
if power {
self.currently_running_since = Some(Instant::now());
} else {
self.runtime_accumulator += self
.currently_running_since
.take()
.unwrap()
.elapsed();
}
true
}
pub fn is_running(&self) -> bool {
self.currently_running_since.is_some()
}
pub fn total_runtime(&self) -> Duration {
let mut total = self.runtime_accumulator;
if let Some(startup) = self.currently_running_since {
total += startup.elapsed();
}
total
}
}

64
src/utils/logs.rs Normal file
View File

@ -0,0 +1,64 @@
use std::time::Duration;
use std::ops::Div;
use std::io::{self, Write};
use std::fs::File;
use chrono::Local;
use serde::Serialize;
use tap::Pipe;
use crate::utils::booking::Booking;
#[derive(Serialize)]
struct Record<'s> {
machine: &'s str,
date: String,
time_booked: String,
time_released: String,
booking_duration: i32, //minutes
runtime: Duration, //minutes
user: &'s str
}
pub fn machinelog(machine: &str, booking: &Booking) -> io::Result<()> {
let record = Record {
machine,
date: booking.creation_datetime.date_naive().to_string(),
time_booked: booking.creation_datetime.time().to_string(),
time_released: Local::now().time().to_string(),
booking_duration: booking.creation_instant.elapsed().as_secs_f32().div(60.0).ceil() as _,
runtime: booking.total_runtime(),
user: &booking.user
};
File::options()
.append(true)
.open("/root/machinelog")?
.pipe(csv::Writer::from_writer)
.serialize(record)
.map_err(|_| io::ErrorKind::Other.into())
}
pub fn log_debug(topic: &str, payload: &str, result: Result<(), &str>) -> io::Result<()> {
if let Err(error) = result {
println!("error: {error}");
println!(" topic: {topic}");
println!(" payload: {payload}");
println!()
}
let time = Local::now().to_string();
let result = result.err().unwrap_or("ok");
let record = format!("
time: {time}
topic: {topic}
payload: {payload}
result: {result}",
);
File::options()
.append(true)
.open("/root/machinelog_debug.csv")?
.write_all(record.as_bytes())
}