Turns out none of that works.

This commit is contained in:
Gregor Reitzenstein 2020-12-01 16:06:39 +01:00
parent 1041afd0ab
commit 6cf4b1d078
8 changed files with 110 additions and 38 deletions

18
Cargo.lock generated
View File

@ -422,6 +422,7 @@ dependencies = [
"flexbuffers", "flexbuffers",
"futures 0.3.7", "futures 0.3.7",
"futures-signals", "futures-signals",
"futures-test",
"futures-util", "futures-util",
"glob", "glob",
"lazy_static", "lazy_static",
@ -678,6 +679,23 @@ dependencies = [
"once_cell", "once_cell",
] ]
[[package]]
name = "futures-test"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a493f2b7cb378c703e494b8c7e1f4505236ca54e9db6d7f1f769d4f1441d6771"
dependencies = [
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
"once_cell",
"pin-project",
"pin-utils",
]
[[package]] [[package]]
name = "futures-timer" name = "futures-timer"
version = "0.3.0" version = "0.3.0"

View File

@ -65,3 +65,6 @@ easy-parallel = "3.1"
[build-dependencies] [build-dependencies]
capnpc = "0.13" capnpc = "0.13"
[dev-dependencies]
futures-test = "0.3"

View File

@ -6,53 +6,104 @@ use std::future::Future;
use smol::Executor; use smol::Executor;
use futures::{future::BoxFuture, Stream, StreamExt}; use futures::{future::BoxFuture, Stream, StreamExt};
use futures_signals::signal::Signal; use futures::channel::mpsc;
use futures_signals::signal::{Signal, MutableSignalCloned, MutableSignal, Mutable};
use crate::db::machine::MachineState; use crate::db::machine::MachineState;
use crate::registries::Actuator; use crate::registries::Actuator;
use crate::config::Settings; use crate::config::Settings;
use crate::error::Result; use crate::error::Result;
pub struct Actor { pub struct Actor<S: Signal> {
inner: Box<dyn Actuator + Unpin>, // FIXME: This should really be a Signal.
f: Option<BoxFuture<'static, ()>> // But, alas, MutableSignalCloned is itself not `Clone`. For good reason as keeping track of
// the changes itself happens in a way that Clone won't work (well).
// So, you can't clone it, you can't copy it and you can't get at the variable inside outside
// of a task context. In short, using Mutable isn't possible and we would have to write our own
// implementation of MutableSignal*'s . Preferably with the correct optimizations for our case
// where there is only one consumer. So a mpsc channel that drops all but the last input.
rx: mpsc::Receiver<Option<S>>
inner: S,
} }
unsafe impl Send for Actor {} pub fn load() {
let s = Mutable::new(MachineState::new());
impl Actor { Ok(())
pub fn new(inner: Box<dyn Actuator + Unpin>) -> Self {
Self {
inner: inner,
f: None,
}
}
} }
impl Future for Actor { #[must_use = "Signals do nothing unless polled"]
type Output = (); pub struct MaybeFlatten<A: Signal, B: Signal> {
signal: Option<A>,
inner: Option<B>,
}
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { // Poll parent => Has inner => Poll inner => Output
// --------------------------------------------------------
// Some(Some(inner)) => => Some(value) => Some(value)
// Some(Some(inner)) => => => Pending
// Some(None) => => => Pending
// None => Some(inner) => Some(value) => Some(value)
// None => Some(inner) => None => None
// None => Some(inner) => Pending => Pending
// None => None => => None
// Pending => Some(inner) => Some(value) => Some(value)
// Pending => Some(inner) => None => Pending
// Pending => Some(inner) => Pending => Pending
// Pending => None => => Pending
impl<A, B> Signal for MaybeFlatten<A, B>
where A: Signal<Item=Option<B>> + Unpin,
B: Signal + Unpin,
{
type Item = B::Item;
#[inline]
fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut this = &mut *self; let mut this = &mut *self;
// If we have a future at the moment, poll it let done = match this.signal.as_mut().map(|signal| Signal::poll_change(Pin::new(signal), cx)) {
if let Some(mut f) = this.f.take() { None => true,
if Future::poll(Pin::new(&mut f), cx).is_pending() { Some(Poll::Ready(None)) => {
this.f.replace(f); this.signal = None;
} true
},
Some(Poll::Ready(Some(new_inner))) => {
this.inner = new_inner;
false
},
Some(Poll::Pending) => false,
};
match this.inner.as_mut().map(|inner| Signal::poll_change(Pin::new(inner), cx)) {
Some(Poll::Ready(None)) => {
this.inner = None;
},
Some(poll) => {
return poll;
},
None => {},
} }
match Stream::poll_next(Pin::new(&mut this.inner), cx) { if done {
Poll::Ready(None) => Poll::Ready(()), Poll::Ready(None)
Poll::Ready(Some(f)) => {
this.f.replace(f); } else {
Poll::Pending Poll::Pending
} }
Poll::Pending => Poll::Pending
}
} }
} }
pub fn load(config: &Settings) -> Result<Vec<Actor>> { #[cfg(test)]
unimplemented!() mod tests {
use super::*;
use futures_test::*;
use futures_signals::signal::Signal;
#[test]
fn load_test() {
let (a, s, m) = super::load().unwrap();
let cx = task::panic_context();
a.signal.poll_change(&mut cx);
}
} }

View File

@ -527,7 +527,7 @@ impl TryFrom<String> for PermRule {
} }
} }
#[cfg(test)] #[cfg(test_DISABLED)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -85,7 +85,7 @@ const fn default_priority() -> u64 {
0 0
} }
#[cfg(test)] #[cfg(test_DISABLED)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -1,9 +1,13 @@
use std::future::Future; use std::future::Future;
use smol::Task; use smol::Task;
use futures_signals::signal::Signal;
use crate::machine::Machine;
use crate::error::Result; use crate::error::Result;
pub struct Initiator { pub struct Initiator {
machine: Box<dyn Signal<Item=Machine> + Send>,
} }
impl Initiator { impl Initiator {

View File

@ -213,7 +213,7 @@ pub fn load(config: &crate::config::Settings) -> Result<Vec<Machine>> {
unimplemented!() unimplemented!()
} }
#[cfg(test)] #[cfg(test_DISABLED)]
mod tests { mod tests {
use super::*; use super::*;
use std::iter::FromIterator; use std::iter::FromIterator;

View File

@ -141,20 +141,16 @@ fn maybe(matches: clap::ArgMatches, log: Arc<Logger>) -> Result<(), Error> {
// handle signals (a cli if stdin is not closed?) and make it stop and clean up all threads // handle signals (a cli if stdin is not closed?) and make it stop and clean up all threads
// when bffh should exit // when bffh should exit
let machines = machine::load(&config)?; let machines = machine::load(&config)?;
let actors = actor::load(&config)?; let actors = actor::load()?;
let initiators = initiator::load(&config)?; let initiators = initiator::load(&config)?;
// TODO restore connections between initiators, machines, actors
let ex = Executor::new(); let ex = Executor::new();
for i in initiators.into_iter() { for i in initiators.into_iter() {
ex.spawn(i.run()); ex.spawn(i.run());
} }
for a in actors.into_iter() { // TODO HERE: restore connections between initiators, machines, actors
ex.spawn(a);
}
let (signal, shutdown) = async_channel::bounded::<()>(1); let (signal, shutdown) = async_channel::bounded::<()>(1);
easy_parallel::Parallel::new() easy_parallel::Parallel::new()