diff --git a/bffhd/actors/mod.rs b/bffhd/actors/mod.rs index e56c0e9..a1ca01d 100644 --- a/bffhd/actors/mod.rs +++ b/bffhd/actors/mod.rs @@ -31,11 +31,18 @@ mod process; mod shelly; pub trait Actor { + /// The state is being restored after a restart or recreation of the actor + fn restore(&mut self, state: ArchivedValue) -> BoxFuture<'static, ()> { + self.apply(state) + } + + /// The state is a changed state that is applied fn apply(&mut self, state: ArchivedValue) -> BoxFuture<'static, ()>; } pub struct ActorDriver { signal: S, + first: bool, actor: Box, future: Option>, @@ -45,6 +52,7 @@ impl>> ActorDriver { pub fn new(signal: S, actor: Box) -> Self { Self { signal, + first: true, actor, future: None, } diff --git a/bffhd/actors/process.rs b/bffhd/actors/process.rs index ee5a26b..bf57a79 100644 --- a/bffhd/actors/process.rs +++ b/bffhd/actors/process.rs @@ -28,10 +28,12 @@ impl Process { pub fn into_boxed_actuator(self) -> Box { Box::new(self) } -} -impl Actor for Process { - fn apply(&mut self, state: ArchivedValue) -> BoxFuture<'static, ()> { + fn build_command(&mut self, state: &ArchivedValue, extra_args: I) -> Command + where + I: IntoIterator, + S: AsRef, + { tracing::debug!(name=%self.name, cmd=%self.cmd, ?state, "Process actor updating state"); let mut command = Command::new(&self.cmd); @@ -66,6 +68,42 @@ impl Actor for Process { command.arg("raw").arg(b64); } + command + } +} + +impl Actor for Process { + fn restore(&mut self, state: ArchivedValue) -> BoxFuture<'static, ()> { + let mut command = self.build_command(&state, ["--restore"]); + let name = self.name.clone(); + Box::pin(async move { + match command.output() { + Ok(retv) if retv.status.success() => { + tracing::trace!("Actor was restored"); + let outstr = String::from_utf8_lossy(&retv.stdout); + for line in outstr.lines() { + tracing::debug!(%name, %line, "actor stdout"); + } + } + Ok(retv) => { + tracing::warn!(%name, ?state, code=?retv.status, + "Actor failed to restore: nonzero exitcode" + ); + if !retv.stderr.is_empty() { + let errstr = String::from_utf8_lossy(&retv.stderr); + for line in errstr.lines() { + tracing::warn!(%name, %line, "actor stderr"); + } + } + } + Err(error) => tracing::warn!(%name, ?error, "process actor failed to run cmd"), + } + }) + } + + fn apply(&mut self, state: ArchivedValue) -> BoxFuture<'static, ()> { + let empty: [&str; 0] = []; + let mut command = self.build_command(&state, empty); let name = self.name.clone(); Box::pin(async move { match command.output() { diff --git a/bffhd/capnp/machine.rs b/bffhd/capnp/machine.rs index 8f60dee..3038a71 100644 --- a/bffhd/capnp/machine.rs +++ b/bffhd/capnp/machine.rs @@ -179,10 +179,8 @@ impl InUseServer for Machine { let data: Vec = pry!(pry!(params.get()).get_data()).to_vec(); let resource = self.resource.clone(); let session = self.session.clone(); - Promise::from_future(async move { - resource.send_raw(data).await; - Ok(()) - }) + resource.send_raw(data); + Promise::ok(()) } } diff --git a/bffhd/resources/mod.rs b/bffhd/resources/mod.rs index 3621e1f..18f4d7a 100644 --- a/bffhd/resources/mod.rs +++ b/bffhd/resources/mod.rs @@ -177,7 +177,7 @@ impl Resource { self.set_state(new); } - pub async fn send_raw(&self, data: Vec) { + pub fn send_raw(&self, data: Vec) { let mut serializer = AllocSerializer::<1024>::default(); let old_state_ref = self.inner.get_state_ref(); let old_state: &ArchivedState = old_state_ref.as_ref(); diff --git a/examples/bffh.dhall b/examples/bffh.dhall index d8cab77..7d1afb2 100644 --- a/examples/bffh.dhall +++ b/examples/bffh.dhall @@ -213,7 +213,7 @@ -- actor can only be connected to one machine. actor_connections = [ { machine = "Testmachine", actor = "Shelly1234" }, - { machine = "Another", actor = "Bash" }, + { machine = "Another", actor = "DoorControl1" }, { machine = "Yetmore", actor = "Bash2" }, { machine = "Yetmore", actor = "FailBash"} ],