diff --git a/src/db/machine.rs b/src/db/machine.rs index e508643..c94a179 100644 --- a/src/db/machine.rs +++ b/src/db/machine.rs @@ -92,11 +92,16 @@ type MachMap = HashMap; pub struct MachineDB { state_db: Internal, def_db: MachMap, + signals_db: HashMap>, } impl MachineDB { pub fn new(state_db: Internal, def_db: MachMap) -> Self { - Self { state_db, def_db } + Self { + state_db: state_db, + def_db: def_db, + signals_db: HashMap::new(), + } } pub fn exists(&self, id: MachineIdentifier) -> bool { @@ -111,4 +116,16 @@ impl MachineDB { // TODO: Error Handling self.state_db.get(id).unwrap_or(None) } + + pub fn update_state(&self, id: &MachineIdentifier, new_state: MachineState) -> Result<()> { + // If an error happens the new state was not applied so this will not desync the sources + self.state_db.put(id, &new_state)?; + self.signals_db.get(id).map(|mutable| mutable.set(new_state)); + + Ok(()) + } + + pub fn get_signal(&self, id: &MachineIdentifier) -> Option> { + self.signals_db.get(&id).map(|mutable| mutable.signal_cloned()) + } } diff --git a/src/db/machine/internal.rs b/src/db/machine/internal.rs index 8b6f90b..1079dc9 100644 --- a/src/db/machine/internal.rs +++ b/src/db/machine/internal.rs @@ -48,7 +48,7 @@ impl Internal { self.get_with_txn(&txn, id) } - pub fn put_with_txn(&self, txn: &mut RwTransaction, uuid: &Uuid, status: MachineState) + pub fn put_with_txn(&self, txn: &mut RwTransaction, uuid: &Uuid, status: &MachineState) -> Result<()> { let bytes = flexbuffers::to_vec(status)?; @@ -57,6 +57,12 @@ impl Internal { Ok(()) } + pub fn put(&self, id: &MachineIdentifier, status: &MachineState) -> Result<()> { + let mut txn = self.env.begin_rw_txn()?; + self.put_with_txn(&mut txn, id, status)?; + txn.commit().map_err(Into::into) + } + pub fn iter(&self, txn: &T) -> Result> { let mut cursor = txn.open_ro_cursor(self.db)?; Ok(cursor.iter_start().map(|buf| { diff --git a/src/modules/shelly.rs b/src/modules/shelly.rs index b8037c0..6b40967 100644 --- a/src/modules/shelly.rs +++ b/src/modules/shelly.rs @@ -19,11 +19,9 @@ use paho_mqtt as mqtt; // entirety. This works reasonably enough for this static modules here but if we do dynamic loading // via dlopen(), lua API, python API etc it will not. pub async fn run(log: Logger, config: Settings, registries: Registries, spawner: S) { - let (tx, rx) = mpsc::channel(1); + let rx = registries.actuators.register("shelly".to_string()).await; let mut shelly = Shelly::new(log, config, rx).await; - let r = registries.actuators.register("shelly".to_string(), tx).await; - let f = shelly.for_each(|f| f); spawner.spawn_obj(FutureObj::from(Box::pin(f))); diff --git a/src/registries/actuators.rs b/src/registries/actuators.rs index 5094f3c..b23049f 100644 --- a/src/registries/actuators.rs +++ b/src/registries/actuators.rs @@ -30,10 +30,13 @@ impl Actuators { } } - pub async fn register(&self, name: String, tx: mpsc::Sender) { + pub async fn register(&self, name: String) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(1); let mut wlock = self.inner.write().await; // TODO: Log an error or something if that name was already taken wlock.insert(name, tx); + + return rx; } pub async fn subscribe(&mut self, name: String, signal: StatusSignal) {