From 6cf4b1d078e6e7bc8b1573f622fb8a8247c3c767 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Tue, 1 Dec 2020 16:06:39 +0100 Subject: [PATCH] Turns out none of that works. --- Cargo.lock | 18 ++++++++ Cargo.toml | 3 ++ src/actor.rs | 109 ++++++++++++++++++++++++++++++++++------------- src/db/access.rs | 2 +- src/db/user.rs | 2 +- src/initiator.rs | 4 ++ src/machine.rs | 2 +- src/main.rs | 8 +--- 8 files changed, 110 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 035fddd..e630afb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -422,6 +422,7 @@ dependencies = [ "flexbuffers", "futures 0.3.7", "futures-signals", + "futures-test", "futures-util", "glob", "lazy_static", @@ -678,6 +679,23 @@ dependencies = [ "once_cell", ] +[[package]] +name = "futures-test" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a493f2b7cb378c703e494b8c7e1f4505236ca54e9db6d7f1f769d4f1441d6771" +dependencies = [ + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", + "once_cell", + "pin-project", + "pin-utils", +] + [[package]] name = "futures-timer" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index c4b80ad..31d3abf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,3 +65,6 @@ easy-parallel = "3.1" [build-dependencies] capnpc = "0.13" + +[dev-dependencies] +futures-test = "0.3" diff --git a/src/actor.rs b/src/actor.rs index 8897288..4d22ded 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -6,53 +6,104 @@ use std::future::Future; use smol::Executor; use futures::{future::BoxFuture, Stream, StreamExt}; -use futures_signals::signal::Signal; +use futures::channel::mpsc; +use futures_signals::signal::{Signal, MutableSignalCloned, MutableSignal, Mutable}; use crate::db::machine::MachineState; use crate::registries::Actuator; use crate::config::Settings; use crate::error::Result; -pub struct Actor { - inner: Box, - f: Option> +pub struct Actor { + // FIXME: This should really be a Signal. + // But, alas, MutableSignalCloned is itself not `Clone`. For good reason as keeping track of + // the changes itself happens in a way that Clone won't work (well). + // So, you can't clone it, you can't copy it and you can't get at the variable inside outside + // of a task context. In short, using Mutable isn't possible and we would have to write our own + // implementation of MutableSignal*'s . Preferably with the correct optimizations for our case + // where there is only one consumer. So a mpsc channel that drops all but the last input. + rx: mpsc::Receiver> + inner: S, } -unsafe impl Send for Actor {} +pub fn load() { + let s = Mutable::new(MachineState::new()); -impl Actor { - pub fn new(inner: Box) -> Self { - Self { - inner: inner, - f: None, - } - } + Ok(()) } -impl Future for Actor { - type Output = (); +#[must_use = "Signals do nothing unless polled"] +pub struct MaybeFlatten { + signal: Option, + inner: Option, +} - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { +// Poll parent => Has inner => Poll inner => Output +// -------------------------------------------------------- +// Some(Some(inner)) => => Some(value) => Some(value) +// Some(Some(inner)) => => => Pending +// Some(None) => => => Pending +// None => Some(inner) => Some(value) => Some(value) +// None => Some(inner) => None => None +// None => Some(inner) => Pending => Pending +// None => None => => None +// Pending => Some(inner) => Some(value) => Some(value) +// Pending => Some(inner) => None => Pending +// Pending => Some(inner) => Pending => Pending +// Pending => None => => Pending +impl Signal for MaybeFlatten + where A: Signal> + Unpin, + B: Signal + Unpin, +{ + type Item = B::Item; + + #[inline] + fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut this = &mut *self; - // If we have a future at the moment, poll it - if let Some(mut f) = this.f.take() { - if Future::poll(Pin::new(&mut f), cx).is_pending() { - this.f.replace(f); - } + let done = match this.signal.as_mut().map(|signal| Signal::poll_change(Pin::new(signal), cx)) { + None => true, + Some(Poll::Ready(None)) => { + this.signal = None; + true + }, + Some(Poll::Ready(Some(new_inner))) => { + this.inner = new_inner; + false + }, + Some(Poll::Pending) => false, + }; + + match this.inner.as_mut().map(|inner| Signal::poll_change(Pin::new(inner), cx)) { + Some(Poll::Ready(None)) => { + this.inner = None; + }, + Some(poll) => { + return poll; + }, + None => {}, } - match Stream::poll_next(Pin::new(&mut this.inner), cx) { - Poll::Ready(None) => Poll::Ready(()), - Poll::Ready(Some(f)) => { - this.f.replace(f); - Poll::Pending - } - Poll::Pending => Poll::Pending + if done { + Poll::Ready(None) + + } else { + Poll::Pending } } } -pub fn load(config: &Settings) -> Result> { - unimplemented!() +#[cfg(test)] +mod tests { + use super::*; + use futures_test::*; + use futures_signals::signal::Signal; + + #[test] + fn load_test() { + let (a, s, m) = super::load().unwrap(); + + let cx = task::panic_context(); + a.signal.poll_change(&mut cx); + } } diff --git a/src/db/access.rs b/src/db/access.rs index 9c6bad6..7d7b0ab 100644 --- a/src/db/access.rs +++ b/src/db/access.rs @@ -527,7 +527,7 @@ impl TryFrom for PermRule { } } -#[cfg(test)] +#[cfg(test_DISABLED)] mod tests { use super::*; diff --git a/src/db/user.rs b/src/db/user.rs index 41519c5..6962ffd 100644 --- a/src/db/user.rs +++ b/src/db/user.rs @@ -85,7 +85,7 @@ const fn default_priority() -> u64 { 0 } -#[cfg(test)] +#[cfg(test_DISABLED)] mod tests { use super::*; diff --git a/src/initiator.rs b/src/initiator.rs index 088e1e0..5b64bb2 100644 --- a/src/initiator.rs +++ b/src/initiator.rs @@ -1,9 +1,13 @@ use std::future::Future; use smol::Task; +use futures_signals::signal::Signal; +use crate::machine::Machine; + use crate::error::Result; pub struct Initiator { + machine: Box + Send>, } impl Initiator { diff --git a/src/machine.rs b/src/machine.rs index f6934a4..2ed0f0f 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -213,7 +213,7 @@ pub fn load(config: &crate::config::Settings) -> Result> { unimplemented!() } -#[cfg(test)] +#[cfg(test_DISABLED)] mod tests { use super::*; use std::iter::FromIterator; diff --git a/src/main.rs b/src/main.rs index 3dcb68c..e2ea63b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -141,20 +141,16 @@ fn maybe(matches: clap::ArgMatches, log: Arc) -> Result<(), Error> { // handle signals (a cli if stdin is not closed?) and make it stop and clean up all threads // when bffh should exit let machines = machine::load(&config)?; - let actors = actor::load(&config)?; + let actors = actor::load()?; let initiators = initiator::load(&config)?; - // TODO restore connections between initiators, machines, actors - let ex = Executor::new(); for i in initiators.into_iter() { ex.spawn(i.run()); } - for a in actors.into_iter() { - ex.spawn(a); - } + // TODO HERE: restore connections between initiators, machines, actors let (signal, shutdown) = async_channel::bounded::<()>(1); easy_parallel::Parallel::new()