This commit is contained in:
Gregor Reitzenstein 2020-11-24 10:44:53 +01:00
parent 2fbb1bb4b6
commit 9227b632e4
4 changed files with 30 additions and 6 deletions

View File

@ -92,11 +92,16 @@ type MachMap = HashMap<MachineIdentifier, MachineDescription>;
pub struct MachineDB { pub struct MachineDB {
state_db: Internal, state_db: Internal,
def_db: MachMap, def_db: MachMap,
signals_db: HashMap<MachineIdentifier, Mutable<MachineState>>,
} }
impl MachineDB { impl MachineDB {
pub fn new(state_db: Internal, def_db: MachMap) -> Self { pub fn new(state_db: Internal, def_db: MachMap) -> Self {
Self { state_db, def_db } Self {
state_db: state_db,
def_db: def_db,
signals_db: HashMap::new(),
}
} }
pub fn exists(&self, id: MachineIdentifier) -> bool { pub fn exists(&self, id: MachineIdentifier) -> bool {
@ -111,4 +116,16 @@ impl MachineDB {
// TODO: Error Handling // TODO: Error Handling
self.state_db.get(id).unwrap_or(None) self.state_db.get(id).unwrap_or(None)
} }
pub fn update_state(&self, id: &MachineIdentifier, new_state: MachineState) -> Result<()> {
// If an error happens the new state was not applied so this will not desync the sources
self.state_db.put(id, &new_state)?;
self.signals_db.get(id).map(|mutable| mutable.set(new_state));
Ok(())
}
pub fn get_signal(&self, id: &MachineIdentifier) -> Option<MutableSignalCloned<MachineState>> {
self.signals_db.get(&id).map(|mutable| mutable.signal_cloned())
}
} }

View File

@ -48,7 +48,7 @@ impl Internal {
self.get_with_txn(&txn, id) self.get_with_txn(&txn, id)
} }
pub fn put_with_txn(&self, txn: &mut RwTransaction, uuid: &Uuid, status: MachineState) pub fn put_with_txn(&self, txn: &mut RwTransaction, uuid: &Uuid, status: &MachineState)
-> Result<()> -> Result<()>
{ {
let bytes = flexbuffers::to_vec(status)?; let bytes = flexbuffers::to_vec(status)?;
@ -57,6 +57,12 @@ impl Internal {
Ok(()) Ok(())
} }
pub fn put(&self, id: &MachineIdentifier, status: &MachineState) -> Result<()> {
let mut txn = self.env.begin_rw_txn()?;
self.put_with_txn(&mut txn, id, status)?;
txn.commit().map_err(Into::into)
}
pub fn iter<T: Transaction>(&self, txn: &T) -> Result<impl Iterator<Item=MachineState>> { pub fn iter<T: Transaction>(&self, txn: &T) -> Result<impl Iterator<Item=MachineState>> {
let mut cursor = txn.open_ro_cursor(self.db)?; let mut cursor = txn.open_ro_cursor(self.db)?;
Ok(cursor.iter_start().map(|buf| { Ok(cursor.iter_start().map(|buf| {

View File

@ -19,11 +19,9 @@ use paho_mqtt as mqtt;
// entirety. This works reasonably enough for this static modules here but if we do dynamic loading // entirety. This works reasonably enough for this static modules here but if we do dynamic loading
// via dlopen(), lua API, python API etc it will not. // via dlopen(), lua API, python API etc it will not.
pub async fn run<S: Spawn>(log: Logger, config: Settings, registries: Registries, spawner: S) { pub async fn run<S: Spawn>(log: Logger, config: Settings, registries: Registries, spawner: S) {
let (tx, rx) = mpsc::channel(1); let rx = registries.actuators.register("shelly".to_string()).await;
let mut shelly = Shelly::new(log, config, rx).await; let mut shelly = Shelly::new(log, config, rx).await;
let r = registries.actuators.register("shelly".to_string(), tx).await;
let f = shelly.for_each(|f| f); let f = shelly.for_each(|f| f);
spawner.spawn_obj(FutureObj::from(Box::pin(f))); spawner.spawn_obj(FutureObj::from(Box::pin(f)));

View File

@ -30,10 +30,13 @@ impl Actuators {
} }
} }
pub async fn register(&self, name: String, tx: mpsc::Sender<StatusSignal>) { pub async fn register(&self, name: String) -> mpsc::Receiver<StatusSignal> {
let (tx, rx) = mpsc::channel(1);
let mut wlock = self.inner.write().await; let mut wlock = self.inner.write().await;
// TODO: Log an error or something if that name was already taken // TODO: Log an error or something if that name was already taken
wlock.insert(name, tx); wlock.insert(name, tx);
return rx;
} }
pub async fn subscribe(&mut self, name: String, signal: StatusSignal) { pub async fn subscribe(&mut self, name: String, signal: StatusSignal) {