Ensure the task polling a shelly stream gets woken up if shelly is subscribed to something

This commit is contained in:
Gregor Reitzenstein 2020-09-17 16:00:23 +02:00
parent 33d9d76755
commit 173ef6d055

View File

@ -8,7 +8,7 @@ use crate::machine::Status;
use std::pin::Pin; use std::pin::Pin;
use futures::prelude::*; use futures::prelude::*;
use futures::ready; use futures::ready;
use futures::task::{Poll, Context}; use futures::task::{Poll, Context, Waker};
use futures_signals::signal::Signal; use futures_signals::signal::Signal;
use paho_mqtt as mqtt; use paho_mqtt as mqtt;
@ -28,6 +28,7 @@ pub async fn run(log: Logger, config: Settings, registries: Registries) {
/// you need to toggle shellies on multiple brokers you need multiple instanced of this actuator. /// you need to toggle shellies on multiple brokers you need multiple instanced of this actuator.
struct Shelly { struct Shelly {
signal: Option<StatusSignal>, signal: Option<StatusSignal>,
waker: Option<Waker>,
name: String, name: String,
client: mqtt::AsyncClient, client: mqtt::AsyncClient,
} }
@ -41,8 +42,9 @@ impl Shelly {
let name = "test".to_string(); let name = "test".to_string();
let signal: Option<StatusSignal> = None; let signal: Option<StatusSignal> = None;
let waker = None;
Box::new(Shelly { signal, name, client }) Box::new(Shelly { signal, waker, name, client })
} }
} }
@ -51,6 +53,9 @@ impl Shelly {
impl Actuator for Shelly { impl Actuator for Shelly {
async fn subscribe(&mut self, signal: StatusSignal) { async fn subscribe(&mut self, signal: StatusSignal) {
self.signal.replace(signal); self.signal.replace(signal);
if let Some(waker) = self.waker.take() {
waker.wake();
}
} }
} }
@ -71,6 +76,8 @@ impl Stream for Shelly {
return Poll::Ready(Some(Box::pin(f))); return Poll::Ready(Some(Box::pin(f)));
} }
} else {
unpin.waker.replace(cx.waker().clone());
} }
Poll::Pending Poll::Pending