mirror of
https://gitlab.com/fabinfra/fabaccess/bffh.git
synced 2024-11-23 23:27:57 +01:00
Improvements for process actor and raw_write
This commit is contained in:
parent
1602879e1e
commit
98ed9efec9
@ -31,11 +31,18 @@ mod process;
|
|||||||
mod shelly;
|
mod shelly;
|
||||||
|
|
||||||
pub trait Actor {
|
pub trait Actor {
|
||||||
|
/// The state is being restored after a restart or recreation of the actor
|
||||||
|
fn restore(&mut self, state: ArchivedValue<State>) -> BoxFuture<'static, ()> {
|
||||||
|
self.apply(state)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The state is a changed state that is applied
|
||||||
fn apply(&mut self, state: ArchivedValue<State>) -> BoxFuture<'static, ()>;
|
fn apply(&mut self, state: ArchivedValue<State>) -> BoxFuture<'static, ()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ActorDriver<S: 'static> {
|
pub struct ActorDriver<S: 'static> {
|
||||||
signal: S,
|
signal: S,
|
||||||
|
first: bool,
|
||||||
|
|
||||||
actor: Box<dyn Actor + Send + Sync>,
|
actor: Box<dyn Actor + Send + Sync>,
|
||||||
future: Option<BoxFuture<'static, ()>>,
|
future: Option<BoxFuture<'static, ()>>,
|
||||||
@ -45,6 +52,7 @@ impl<S: Signal<Item = ArchivedValue<State>>> ActorDriver<S> {
|
|||||||
pub fn new(signal: S, actor: Box<dyn Actor + Send + Sync>) -> Self {
|
pub fn new(signal: S, actor: Box<dyn Actor + Send + Sync>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
signal,
|
signal,
|
||||||
|
first: true,
|
||||||
actor,
|
actor,
|
||||||
future: None,
|
future: None,
|
||||||
}
|
}
|
||||||
|
@ -28,10 +28,12 @@ impl Process {
|
|||||||
pub fn into_boxed_actuator(self) -> Box<dyn Actor + Sync + Send> {
|
pub fn into_boxed_actuator(self) -> Box<dyn Actor + Sync + Send> {
|
||||||
Box::new(self)
|
Box::new(self)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl Actor for Process {
|
fn build_command<I, S>(&mut self, state: &ArchivedValue<State>, extra_args: I) -> Command
|
||||||
fn apply(&mut self, state: ArchivedValue<State>) -> BoxFuture<'static, ()> {
|
where
|
||||||
|
I: IntoIterator<Item = S>,
|
||||||
|
S: AsRef<std::ffi::OsStr>,
|
||||||
|
{
|
||||||
tracing::debug!(name=%self.name, cmd=%self.cmd, ?state,
|
tracing::debug!(name=%self.name, cmd=%self.cmd, ?state,
|
||||||
"Process actor updating state");
|
"Process actor updating state");
|
||||||
let mut command = Command::new(&self.cmd);
|
let mut command = Command::new(&self.cmd);
|
||||||
@ -66,6 +68,42 @@ impl Actor for Process {
|
|||||||
command.arg("raw").arg(b64);
|
command.arg("raw").arg(b64);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
command
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Actor for Process {
|
||||||
|
fn restore(&mut self, state: ArchivedValue<State>) -> BoxFuture<'static, ()> {
|
||||||
|
let mut command = self.build_command(&state, ["--restore"]);
|
||||||
|
let name = self.name.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
match command.output() {
|
||||||
|
Ok(retv) if retv.status.success() => {
|
||||||
|
tracing::trace!("Actor was restored");
|
||||||
|
let outstr = String::from_utf8_lossy(&retv.stdout);
|
||||||
|
for line in outstr.lines() {
|
||||||
|
tracing::debug!(%name, %line, "actor stdout");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(retv) => {
|
||||||
|
tracing::warn!(%name, ?state, code=?retv.status,
|
||||||
|
"Actor failed to restore: nonzero exitcode"
|
||||||
|
);
|
||||||
|
if !retv.stderr.is_empty() {
|
||||||
|
let errstr = String::from_utf8_lossy(&retv.stderr);
|
||||||
|
for line in errstr.lines() {
|
||||||
|
tracing::warn!(%name, %line, "actor stderr");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(error) => tracing::warn!(%name, ?error, "process actor failed to run cmd"),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn apply(&mut self, state: ArchivedValue<State>) -> BoxFuture<'static, ()> {
|
||||||
|
let empty: [&str; 0] = [];
|
||||||
|
let mut command = self.build_command(&state, empty);
|
||||||
let name = self.name.clone();
|
let name = self.name.clone();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
match command.output() {
|
match command.output() {
|
||||||
|
@ -179,10 +179,8 @@ impl InUseServer for Machine {
|
|||||||
let data: Vec<u8> = pry!(pry!(params.get()).get_data()).to_vec();
|
let data: Vec<u8> = pry!(pry!(params.get()).get_data()).to_vec();
|
||||||
let resource = self.resource.clone();
|
let resource = self.resource.clone();
|
||||||
let session = self.session.clone();
|
let session = self.session.clone();
|
||||||
Promise::from_future(async move {
|
resource.send_raw(data);
|
||||||
resource.send_raw(data).await;
|
Promise::ok(())
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -177,7 +177,7 @@ impl Resource {
|
|||||||
self.set_state(new);
|
self.set_state(new);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn send_raw(&self, data: Vec<u8>) {
|
pub fn send_raw(&self, data: Vec<u8>) {
|
||||||
let mut serializer = AllocSerializer::<1024>::default();
|
let mut serializer = AllocSerializer::<1024>::default();
|
||||||
let old_state_ref = self.inner.get_state_ref();
|
let old_state_ref = self.inner.get_state_ref();
|
||||||
let old_state: &ArchivedState = old_state_ref.as_ref();
|
let old_state: &ArchivedState = old_state_ref.as_ref();
|
||||||
|
@ -213,7 +213,7 @@
|
|||||||
-- actor can only be connected to one machine.
|
-- actor can only be connected to one machine.
|
||||||
actor_connections = [
|
actor_connections = [
|
||||||
{ machine = "Testmachine", actor = "Shelly1234" },
|
{ machine = "Testmachine", actor = "Shelly1234" },
|
||||||
{ machine = "Another", actor = "Bash" },
|
{ machine = "Another", actor = "DoorControl1" },
|
||||||
{ machine = "Yetmore", actor = "Bash2" },
|
{ machine = "Yetmore", actor = "Bash2" },
|
||||||
{ machine = "Yetmore", actor = "FailBash"}
|
{ machine = "Yetmore", actor = "FailBash"}
|
||||||
],
|
],
|
||||||
|
Loading…
Reference in New Issue
Block a user