Reimplement the dummy initiator

This commit is contained in:
Nadja Reitzenstein 2022-06-07 14:05:46 +02:00
parent 9100811c50
commit 6d8d1384d9
6 changed files with 262 additions and 145 deletions

5
Cargo.lock generated
View File

@ -171,9 +171,9 @@ dependencies = [
[[package]] [[package]]
name = "async-io" name = "async-io"
version = "1.6.0" version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a811e6a479f2439f0c04038796b5cfb3d2ad56c230e0f2d3f7b04d68cfee607b" checksum = "e5e18f61464ae81cde0a23e713ae8fd299580c54d697a35820cfd0625b8b0e07"
dependencies = [ dependencies = [
"concurrent-queue", "concurrent-queue",
"futures-lite", "futures-lite",
@ -836,6 +836,7 @@ dependencies = [
"api", "api",
"async-channel", "async-channel",
"async-compat", "async-compat",
"async-io",
"async-net", "async-net",
"async-oneshot", "async-oneshot",
"async-trait", "async-trait",

View File

@ -37,6 +37,7 @@ pin-utils = "0.1.0"
futures-util = "0.3" futures-util = "0.3"
futures-lite = "1.12.0" futures-lite = "1.12.0"
async-net = "1.6.1" async-net = "1.6.1"
async-io = "1.7.0"
backtrace = "0.3.65" backtrace = "0.3.65"
miette = { version = "4.7.1", features = ["fancy"] } miette = { version = "4.7.1", features = ["fancy"] }
thiserror = "1.0.31" thiserror = "1.0.31"

118
bffhd/initiators/dummy.rs Normal file
View File

@ -0,0 +1,118 @@
use miette::{miette, Diagnostic};
use thiserror::Error;
use super::Initiator;
use crate::initiators::InitiatorCallbacks;
use crate::resources::modules::fabaccess::Status;
use crate::session::SessionHandle;
use crate::users::UserRef;
use async_io::Timer;
use futures_util::future::BoxFuture;
use futures_util::ready;
use lmdb::Stat;
use std::collections::HashMap;
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
pub struct Dummy {
callbacks: InitiatorCallbacks,
session: SessionHandle,
state: DummyState,
}
enum DummyState {
Empty,
Sleeping(Timer, Option<Status>),
Updating(BoxFuture<'static, Status>),
}
impl Dummy {
fn timer() -> Timer {
Timer::after(Duration::from_secs(2))
}
fn flip(&self, status: Status) -> BoxFuture<'static, Status> {
let session = self.session.clone();
let mut callbacks = self.callbacks.clone();
Box::pin(async move {
let next = match &status {
Status::Free => Status::InUse(session.get_user_ref()),
Status::InUse(_) => Status::Free,
_ => Status::Free,
};
callbacks.try_update(session, status).await;
next
})
}
}
#[derive(Debug, Error, Diagnostic)]
pub enum DummyError {}
impl Future for Dummy {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let span = tracing::debug_span!("Dummy initiator poll");
let _guard = span.enter();
tracing::trace!("polling Dummy initiator");
loop {
match &mut self.state {
DummyState::Empty => {
tracing::trace!("Dummy initiator is empty, initializing…");
mem::replace(
&mut self.state,
DummyState::Sleeping(Self::timer(), Some(Status::Free)),
);
}
DummyState::Sleeping(timer, next) => {
tracing::trace!("Sleep timer exists, polling it.");
let _: Instant = ready!(Pin::new(timer).poll(cx));
tracing::trace!("Timer has fired, poking out an update!");
let status = next.take().unwrap();
let f = self.flip(status);
mem::replace(&mut self.state, DummyState::Updating(f));
}
DummyState::Updating(f) => {
tracing::trace!("Update future exists, polling it .");
let next = ready!(Pin::new(f).poll(cx));
tracing::trace!("Update future completed, sleeping!");
mem::replace(
&mut self.state,
DummyState::Sleeping(Self::timer(), Some(next)),
);
}
}
}
}
}
impl Initiator for Dummy {
fn new(params: &HashMap<String, String>, callbacks: InitiatorCallbacks) -> miette::Result<Self>
where
Self: Sized,
{
let uid = params
.get("uid")
.ok_or_else(|| miette!("Dummy initiator configured without an UID"))?;
let session = callbacks
.open_session(uid)
.ok_or_else(|| miette!("The configured user for the dummy initiator does not exist"))?;
Ok(Self {
callbacks,
session,
state: DummyState::Empty,
})
}
}

View File

@ -1,161 +1,149 @@
use crate::initiators::dummy::Dummy;
use crate::resources::modules::fabaccess::Status;
use crate::session::SessionHandle;
use crate::{
AuthenticationHandle, Config, MachineState, Resource, ResourcesHandle, SessionManager,
};
use async_compat::CompatExt;
use executor::prelude::Executor;
use futures_util::ready;
use miette::IntoDiagnostic;
use rumqttc::ConnectReturnCode::Success;
use rumqttc::{AsyncClient, ConnectionError, Event, Incoming, MqttOptions};
use std::collections::HashMap;
use std::fmt::Display;
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use async_channel as channel; use std::time::Duration;
use async_oneshot as oneshot; use url::Url;
use futures_signals::signal::Signal;
use futures_util::future::BoxFuture;
use crate::resources::claim::{ResourceID, UserID};
use crate::resources::state::State;
pub enum UpdateError { mod dummy;
/// We're not connected to anything anymore. You can't do anything about this error and the
/// only reason why you even get it is because your future was called a last time before
/// being shelved so best way to handle this error is to just return from your loop entirely,
/// cleaning up any state that doesn't survive a freeze.
Closed,
Denied, pub trait Initiator: Future<Output = ()> {
fn new(params: &HashMap<String, String>, callbacks: InitiatorCallbacks) -> miette::Result<Self>
Other(Box<dyn std::error::Error + Send>), where
} Self: Sized;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
pub trait InitiatorError: std::error::Error + Send { <Self as Future>::poll(self, cx)
} }
pub trait Initiator {
fn start_for(&mut self, machine: ResourceID)
-> BoxFuture<'static, Result<(), Box<dyn InitiatorError>>>;
fn run(&mut self, request: &mut UpdateSink)
-> BoxFuture<'static, Result<(), Box<dyn InitiatorError>>>;
} }
#[derive(Clone)] #[derive(Clone)]
pub struct UpdateSink { pub struct InitiatorCallbacks {
tx: channel::Sender<(Option<UserID>, State)>, resource: Resource,
rx: channel::Receiver<Result<(), Error>>, sessions: SessionManager,
}
impl InitiatorCallbacks {
pub fn new(resource: Resource, sessions: SessionManager) -> Self {
Self { resource, sessions }
}
pub async fn try_update(&mut self, session: SessionHandle, status: Status) {
self.resource.try_update(session, status).await
}
pub fn open_session(&self, uid: &str) -> Option<SessionHandle> {
self.sessions.open(uid)
}
} }
impl UpdateSink { pub struct InitiatorDriver {
fn new(tx: channel::Sender<(Option<UserID>, State)>, name: String,
rx: channel::Receiver<Result<(), Error>>) initiator: Box<dyn Initiator + Unpin + Send>,
-> Self }
impl InitiatorDriver {
pub fn new<I>(
name: String,
params: &HashMap<String, String>,
resource: Resource,
sessions: SessionManager,
) -> miette::Result<Self>
where
I: 'static + Initiator + Unpin + Send,
{ {
Self { tx, rx } let callbacks = InitiatorCallbacks::new(resource, sessions);
} let initiator = Box::new(I::new(params, callbacks)?);
Ok(Self { name, initiator })
async fn send(&mut self, userid: Option<UserID>, state: State)
-> Result<(), UpdateError>
{
if let Err(_e) = self.tx.send((userid, state)).await {
return Err(UpdateError::Closed);
}
match self.rx.recv().await {
Ok(Ok(())) => Ok(()),
Ok(Err(Error::Denied)) => Err(UpdateError::Denied),
Ok(Err(Error::Internal(e))) => Err(UpdateError::Other(e)),
// RecvError is send only when the channel is closed
Err(_) => Err(UpdateError::Closed),
}
} }
} }
struct Resource; impl Future for InitiatorDriver {
pub struct InitiatorDriver<S, I: Initiator> {
// TODO: make this a static reference to the resources because it's much easier and we don't
// need to replace resources at runtime at the moment.
resource_signal: S,
resource: Option<channel::Sender<Update>>,
// TODO: Initiators should instead
error_channel: Option<oneshot::Receiver<Error>>,
initiator: I,
initiator_future: Option<BoxFuture<'static, Result<(), Box<dyn InitiatorError>>>>,
update_sink: UpdateSink,
initiator_req_rx: channel::Receiver<(Option<UserID>, State)>,
initiator_reply_tx: channel::Sender<Result<(), Error>>,
}
pub struct ResourceSink {
pub id: ResourceID,
pub state_sink: channel::Sender<Update>,
}
impl<S: Signal<Item=ResourceSink>, I: Initiator> InitiatorDriver<S, I> {
pub fn new(resource_signal: S, initiator: I) -> Self {
let (initiator_reply_tx, initiator_reply_rx) = channel::bounded(1);
let (initiator_req_tx, initiator_req_rx) = async_channel::bounded(1);
let update_sink = UpdateSink::new(initiator_req_tx, initiator_reply_rx);
Self {
resource: None,
resource_signal,
error_channel: None,
initiator,
initiator_future: None,
update_sink,
initiator_req_rx,
initiator_reply_tx,
}
}
}
impl<S: Signal<Item=ResourceSink> + Unpin, I: Initiator + Unpin> Future for InitiatorDriver<S, I> {
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.resource_signal).poll_change(cx) { let _guard = tracing::info_span!("initiator poll", initiator=%self.name);
Poll::Ready(Some(resource)) => { tracing::trace!(initiator=%self.name, "polling initiator");
self.resource = Some(resource.state_sink);
self.error_channel = None;
let f = Box::pin(self.initiator.start_for(resource.id));
self.initiator_future.replace(f);
},
Poll::Ready(None) => self.resource = None,
Poll::Pending => {}
}
// do while there is work to do ready!(Pin::new(&mut self.initiator).poll(cx));
while {
// First things first:
// If we've send an update to the resources in question we have error channel set, so
// we poll that first to determine if the resources has acted on it yet.
if let Some(ref mut errchan) = self.error_channel {
match Pin::new(errchan).poll(cx) {
// In case there's an ongoing
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(error)) => {
self.error_channel = None;
self.initiator_reply_tx.send(Err(error));
}
Poll::Ready(Err(_closed)) => {
// Error channel was dropped which means there was no error
self.error_channel = None;
self.initiator_reply_tx.send(Ok(()));
}
}
}
if let Some(ref mut init_fut) = self.initiator_future { tracing::warn!(initiator=%self.name, "an initiator module ran to completion!");
match init_fut.as_mut().poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(())) => {},
Poll::Ready(Err(_e)) => {
// TODO: Log initiator error here
}
}
} else if let Some(ref mut _resource) = self.resource {
let mut s = self.update_sink.clone();
let f = self.initiator.run(&mut s);
self.initiator_future.replace(f);
}
self.error_channel.is_some()
} {}
Poll::Ready(()) Poll::Ready(())
} }
} }
pub fn load(
executor: Executor,
config: &Config,
resources: ResourcesHandle,
sessions: SessionManager,
authentication: AuthenticationHandle,
) -> miette::Result<()> {
let span = tracing::info_span!("loading initiators");
let _guard = span.enter();
let mut initiator_map: HashMap<String, Resource> = config
.init_connections
.iter()
.filter_map(|(k, v)| {
if let Some(resource) = resources.get_by_id(v) {
Some((k.clone(), resource.clone()))
} else {
tracing::error!(initiator=%k, machine=%v,
"Machine configured for initiator not found!");
None
}
})
.collect();
for (name, cfg) in config.initiators.iter() {
if let Some(resource) = initiator_map.remove(name) {
if let Some(driver) = load_single(name, &cfg.module, &cfg.params, resource, &sessions) {
tracing::debug!(module_name=%cfg.module, %name, "starting initiator task");
executor.spawn(driver);
} else {
tracing::error!(module_name=%cfg.module, %name, "Initiator module could not be configured");
}
} else {
tracing::warn!(actor=%name, ?config, "Initiator has no machine configured. Skipping!");
}
}
Ok(())
}
fn load_single(
name: &String,
module_name: &String,
params: &HashMap<String, String>,
resource: Resource,
sessions: &SessionManager,
) -> Option<InitiatorDriver> {
tracing::info!(%name, %module_name, ?params, "Loading initiator");
let o = match module_name.as_ref() {
"Dummy" => Some(InitiatorDriver::new::<Dummy>(
name.clone(),
params,
resource,
sessions.clone(),
)),
_ => None,
};
o.transpose().unwrap_or_else(|error| {
tracing::error!(%error, "failed to configure initiator");
None
})
}

View File

@ -24,6 +24,7 @@ pub mod users;
pub mod resources; pub mod resources;
pub mod actors; pub mod actors;
pub mod initiators;
pub mod sensors; pub mod sensors;
@ -118,15 +119,22 @@ impl Diflouroborane {
.into_diagnostic() .into_diagnostic()
.wrap_err("Failed to construct signal handler")?; .wrap_err("Failed to construct signal handler")?;
let sessionmanager = SessionManager::new(self.users.clone(), self.roles.clone());
let authentication = AuthenticationHandle::new(self.users.clone());
initiators::load(
self.executor.clone(),
&self.config,
self.resources.clone(),
sessionmanager.clone(),
authentication.clone(),
);
actors::load(self.executor.clone(), &self.config, self.resources.clone())?; actors::load(self.executor.clone(), &self.config, self.resources.clone())?;
let tlsconfig = TlsConfig::new(self.config.tlskeylog.as_ref(), !self.config.is_quiet()) let tlsconfig = TlsConfig::new(self.config.tlskeylog.as_ref(), !self.config.is_quiet())
.into_diagnostic()?; .into_diagnostic()?;
let acceptor = tlsconfig.make_tls_acceptor(&self.config.tlsconfig)?; let acceptor = tlsconfig.make_tls_acceptor(&self.config.tlsconfig)?;
let sessionmanager = SessionManager::new(self.users.clone(), self.roles.clone());
let authentication = AuthenticationHandle::new(self.users.clone());
let apiserver = self.executor.run(APIServer::bind( let apiserver = self.executor.run(APIServer::bind(
self.executor.clone(), self.executor.clone(),
&self.config.listens, &self.config.listens,

View File

@ -25,6 +25,7 @@ pub mod modules;
pub struct PermissionDenied; pub struct PermissionDenied;
#[derive(Debug)]
pub(crate) struct Inner { pub(crate) struct Inner {
id: String, id: String,
db: StateDB, db: StateDB,
@ -94,7 +95,7 @@ impl Inner {
} }
} }
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct Resource { pub struct Resource {
inner: Arc<Inner>, inner: Arc<Inner>,
} }