From 6d8d1384d94ef421750b6f820c7962798dd8c7a4 Mon Sep 17 00:00:00 2001 From: Nadja Reitzenstein Date: Tue, 7 Jun 2022 14:05:46 +0200 Subject: [PATCH] Reimplement the dummy initiator --- Cargo.lock | 5 +- Cargo.toml | 1 + bffhd/initiators/dummy.rs | 118 +++++++++++++++++ bffhd/initiators/mod.rs | 266 ++++++++++++++++++-------------------- bffhd/lib.rs | 14 +- bffhd/resources/mod.rs | 3 +- 6 files changed, 262 insertions(+), 145 deletions(-) create mode 100644 bffhd/initiators/dummy.rs diff --git a/Cargo.lock b/Cargo.lock index e5717db..d8cc667 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -171,9 +171,9 @@ dependencies = [ [[package]] name = "async-io" -version = "1.6.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a811e6a479f2439f0c04038796b5cfb3d2ad56c230e0f2d3f7b04d68cfee607b" +checksum = "e5e18f61464ae81cde0a23e713ae8fd299580c54d697a35820cfd0625b8b0e07" dependencies = [ "concurrent-queue", "futures-lite", @@ -836,6 +836,7 @@ dependencies = [ "api", "async-channel", "async-compat", + "async-io", "async-net", "async-oneshot", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 86e0029..a082ad7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ pin-utils = "0.1.0" futures-util = "0.3" futures-lite = "1.12.0" async-net = "1.6.1" +async-io = "1.7.0" backtrace = "0.3.65" miette = { version = "4.7.1", features = ["fancy"] } thiserror = "1.0.31" diff --git a/bffhd/initiators/dummy.rs b/bffhd/initiators/dummy.rs new file mode 100644 index 0000000..1fd2a3e --- /dev/null +++ b/bffhd/initiators/dummy.rs @@ -0,0 +1,118 @@ +use miette::{miette, Diagnostic}; +use thiserror::Error; + +use super::Initiator; +use crate::initiators::InitiatorCallbacks; +use crate::resources::modules::fabaccess::Status; +use crate::session::SessionHandle; +use crate::users::UserRef; +use async_io::Timer; +use futures_util::future::BoxFuture; +use futures_util::ready; +use lmdb::Stat; +use std::collections::HashMap; +use std::future::Future; +use std::mem; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +pub struct Dummy { + callbacks: InitiatorCallbacks, + session: SessionHandle, + state: DummyState, +} + +enum DummyState { + Empty, + Sleeping(Timer, Option), + Updating(BoxFuture<'static, Status>), +} + +impl Dummy { + fn timer() -> Timer { + Timer::after(Duration::from_secs(2)) + } + + fn flip(&self, status: Status) -> BoxFuture<'static, Status> { + let session = self.session.clone(); + let mut callbacks = self.callbacks.clone(); + Box::pin(async move { + let next = match &status { + Status::Free => Status::InUse(session.get_user_ref()), + Status::InUse(_) => Status::Free, + _ => Status::Free, + }; + callbacks.try_update(session, status).await; + + next + }) + } +} + +#[derive(Debug, Error, Diagnostic)] +pub enum DummyError {} + +impl Future for Dummy { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let span = tracing::debug_span!("Dummy initiator poll"); + let _guard = span.enter(); + tracing::trace!("polling Dummy initiator"); + loop { + match &mut self.state { + DummyState::Empty => { + tracing::trace!("Dummy initiator is empty, initializing…"); + mem::replace( + &mut self.state, + DummyState::Sleeping(Self::timer(), Some(Status::Free)), + ); + } + DummyState::Sleeping(timer, next) => { + tracing::trace!("Sleep timer exists, polling it."); + + let _: Instant = ready!(Pin::new(timer).poll(cx)); + + tracing::trace!("Timer has fired, poking out an update!"); + + let status = next.take().unwrap(); + let f = self.flip(status); + mem::replace(&mut self.state, DummyState::Updating(f)); + } + DummyState::Updating(f) => { + tracing::trace!("Update future exists, polling it ."); + + let next = ready!(Pin::new(f).poll(cx)); + + tracing::trace!("Update future completed, sleeping!"); + + mem::replace( + &mut self.state, + DummyState::Sleeping(Self::timer(), Some(next)), + ); + } + } + } + } +} + +impl Initiator for Dummy { + fn new(params: &HashMap, callbacks: InitiatorCallbacks) -> miette::Result + where + Self: Sized, + { + let uid = params + .get("uid") + .ok_or_else(|| miette!("Dummy initiator configured without an UID"))?; + let session = callbacks + .open_session(uid) + .ok_or_else(|| miette!("The configured user for the dummy initiator does not exist"))?; + + Ok(Self { + callbacks, + session, + state: DummyState::Empty, + }) + } +} diff --git a/bffhd/initiators/mod.rs b/bffhd/initiators/mod.rs index 59d58e7..7597cd8 100644 --- a/bffhd/initiators/mod.rs +++ b/bffhd/initiators/mod.rs @@ -1,161 +1,149 @@ +use crate::initiators::dummy::Dummy; +use crate::resources::modules::fabaccess::Status; +use crate::session::SessionHandle; +use crate::{ + AuthenticationHandle, Config, MachineState, Resource, ResourcesHandle, SessionManager, +}; +use async_compat::CompatExt; +use executor::prelude::Executor; +use futures_util::ready; +use miette::IntoDiagnostic; +use rumqttc::ConnectReturnCode::Success; +use rumqttc::{AsyncClient, ConnectionError, Event, Incoming, MqttOptions}; +use std::collections::HashMap; +use std::fmt::Display; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use async_channel as channel; -use async_oneshot as oneshot; -use futures_signals::signal::Signal; -use futures_util::future::BoxFuture; -use crate::resources::claim::{ResourceID, UserID}; -use crate::resources::state::State; +use std::time::Duration; +use url::Url; -pub enum UpdateError { - /// We're not connected to anything anymore. You can't do anything about this error and the - /// only reason why you even get it is because your future was called a last time before - /// being shelved so best way to handle this error is to just return from your loop entirely, - /// cleaning up any state that doesn't survive a freeze. - Closed, +mod dummy; - Denied, - - Other(Box), -} - -pub trait InitiatorError: std::error::Error + Send { -} - -pub trait Initiator { - fn start_for(&mut self, machine: ResourceID) - -> BoxFuture<'static, Result<(), Box>>; - - fn run(&mut self, request: &mut UpdateSink) - -> BoxFuture<'static, Result<(), Box>>; +pub trait Initiator: Future { + fn new(params: &HashMap, callbacks: InitiatorCallbacks) -> miette::Result + where + Self: Sized; + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { + ::poll(self, cx) + } } #[derive(Clone)] -pub struct UpdateSink { - tx: channel::Sender<(Option, State)>, - rx: channel::Receiver>, +pub struct InitiatorCallbacks { + resource: Resource, + sessions: SessionManager, +} +impl InitiatorCallbacks { + pub fn new(resource: Resource, sessions: SessionManager) -> Self { + Self { resource, sessions } + } + + pub async fn try_update(&mut self, session: SessionHandle, status: Status) { + self.resource.try_update(session, status).await + } + + pub fn open_session(&self, uid: &str) -> Option { + self.sessions.open(uid) + } } -impl UpdateSink { - fn new(tx: channel::Sender<(Option, State)>, - rx: channel::Receiver>) - -> Self +pub struct InitiatorDriver { + name: String, + initiator: Box, +} + +impl InitiatorDriver { + pub fn new( + name: String, + params: &HashMap, + resource: Resource, + sessions: SessionManager, + ) -> miette::Result + where + I: 'static + Initiator + Unpin + Send, { - Self { tx, rx } - } - - async fn send(&mut self, userid: Option, state: State) - -> Result<(), UpdateError> - { - if let Err(_e) = self.tx.send((userid, state)).await { - return Err(UpdateError::Closed); - } - - match self.rx.recv().await { - Ok(Ok(())) => Ok(()), - Ok(Err(Error::Denied)) => Err(UpdateError::Denied), - Ok(Err(Error::Internal(e))) => Err(UpdateError::Other(e)), - // RecvError is send only when the channel is closed - Err(_) => Err(UpdateError::Closed), - } + let callbacks = InitiatorCallbacks::new(resource, sessions); + let initiator = Box::new(I::new(params, callbacks)?); + Ok(Self { name, initiator }) } } -struct Resource; -pub struct InitiatorDriver { - // TODO: make this a static reference to the resources because it's much easier and we don't - // need to replace resources at runtime at the moment. - resource_signal: S, - resource: Option>, - - // TODO: Initiators should instead - error_channel: Option>, - - initiator: I, - initiator_future: Option>>>, - update_sink: UpdateSink, - initiator_req_rx: channel::Receiver<(Option, State)>, - initiator_reply_tx: channel::Sender>, -} - -pub struct ResourceSink { - pub id: ResourceID, - pub state_sink: channel::Sender, -} - -impl, I: Initiator> InitiatorDriver { - pub fn new(resource_signal: S, initiator: I) -> Self { - let (initiator_reply_tx, initiator_reply_rx) = channel::bounded(1); - let (initiator_req_tx, initiator_req_rx) = async_channel::bounded(1); - let update_sink = UpdateSink::new(initiator_req_tx, initiator_reply_rx); - Self { - resource: None, - resource_signal, - error_channel: None, - - initiator, - initiator_future: None, - update_sink, - initiator_req_rx, - initiator_reply_tx, - } - } -} - -impl + Unpin, I: Initiator + Unpin> Future for InitiatorDriver { +impl Future for InitiatorDriver { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Pin::new(&mut self.resource_signal).poll_change(cx) { - Poll::Ready(Some(resource)) => { - self.resource = Some(resource.state_sink); - self.error_channel = None; - let f = Box::pin(self.initiator.start_for(resource.id)); - self.initiator_future.replace(f); - }, - Poll::Ready(None) => self.resource = None, - Poll::Pending => {} - } + let _guard = tracing::info_span!("initiator poll", initiator=%self.name); + tracing::trace!(initiator=%self.name, "polling initiator"); - // do while there is work to do - while { - // First things first: - // If we've send an update to the resources in question we have error channel set, so - // we poll that first to determine if the resources has acted on it yet. - if let Some(ref mut errchan) = self.error_channel { - match Pin::new(errchan).poll(cx) { - // In case there's an ongoing - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(error)) => { - self.error_channel = None; - self.initiator_reply_tx.send(Err(error)); - } - Poll::Ready(Err(_closed)) => { - // Error channel was dropped which means there was no error - self.error_channel = None; - self.initiator_reply_tx.send(Ok(())); - } - } - } + ready!(Pin::new(&mut self.initiator).poll(cx)); - if let Some(ref mut init_fut) = self.initiator_future { - match init_fut.as_mut().poll(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(())) => {}, - Poll::Ready(Err(_e)) => { - // TODO: Log initiator error here - } - } - } else if let Some(ref mut _resource) = self.resource { - let mut s = self.update_sink.clone(); - let f = self.initiator.run(&mut s); - self.initiator_future.replace(f); - } - - self.error_channel.is_some() - } {} + tracing::warn!(initiator=%self.name, "an initiator module ran to completion!"); Poll::Ready(()) } -} \ No newline at end of file +} + +pub fn load( + executor: Executor, + config: &Config, + resources: ResourcesHandle, + sessions: SessionManager, + authentication: AuthenticationHandle, +) -> miette::Result<()> { + let span = tracing::info_span!("loading initiators"); + let _guard = span.enter(); + + let mut initiator_map: HashMap = config + .init_connections + .iter() + .filter_map(|(k, v)| { + if let Some(resource) = resources.get_by_id(v) { + Some((k.clone(), resource.clone())) + } else { + tracing::error!(initiator=%k, machine=%v, + "Machine configured for initiator not found!"); + None + } + }) + .collect(); + + for (name, cfg) in config.initiators.iter() { + if let Some(resource) = initiator_map.remove(name) { + if let Some(driver) = load_single(name, &cfg.module, &cfg.params, resource, &sessions) { + tracing::debug!(module_name=%cfg.module, %name, "starting initiator task"); + executor.spawn(driver); + } else { + tracing::error!(module_name=%cfg.module, %name, "Initiator module could not be configured"); + } + } else { + tracing::warn!(actor=%name, ?config, "Initiator has no machine configured. Skipping!"); + } + } + + Ok(()) +} + +fn load_single( + name: &String, + module_name: &String, + params: &HashMap, + resource: Resource, + sessions: &SessionManager, +) -> Option { + tracing::info!(%name, %module_name, ?params, "Loading initiator"); + let o = match module_name.as_ref() { + "Dummy" => Some(InitiatorDriver::new::( + name.clone(), + params, + resource, + sessions.clone(), + )), + _ => None, + }; + + o.transpose().unwrap_or_else(|error| { + tracing::error!(%error, "failed to configure initiator"); + None + }) +} diff --git a/bffhd/lib.rs b/bffhd/lib.rs index 3c34192..21ff90c 100644 --- a/bffhd/lib.rs +++ b/bffhd/lib.rs @@ -24,6 +24,7 @@ pub mod users; pub mod resources; pub mod actors; +pub mod initiators; pub mod sensors; @@ -118,15 +119,22 @@ impl Diflouroborane { .into_diagnostic() .wrap_err("Failed to construct signal handler")?; + let sessionmanager = SessionManager::new(self.users.clone(), self.roles.clone()); + let authentication = AuthenticationHandle::new(self.users.clone()); + + initiators::load( + self.executor.clone(), + &self.config, + self.resources.clone(), + sessionmanager.clone(), + authentication.clone(), + ); actors::load(self.executor.clone(), &self.config, self.resources.clone())?; let tlsconfig = TlsConfig::new(self.config.tlskeylog.as_ref(), !self.config.is_quiet()) .into_diagnostic()?; let acceptor = tlsconfig.make_tls_acceptor(&self.config.tlsconfig)?; - let sessionmanager = SessionManager::new(self.users.clone(), self.roles.clone()); - let authentication = AuthenticationHandle::new(self.users.clone()); - let apiserver = self.executor.run(APIServer::bind( self.executor.clone(), &self.config.listens, diff --git a/bffhd/resources/mod.rs b/bffhd/resources/mod.rs index 3a7fc10..c222467 100644 --- a/bffhd/resources/mod.rs +++ b/bffhd/resources/mod.rs @@ -25,6 +25,7 @@ pub mod modules; pub struct PermissionDenied; +#[derive(Debug)] pub(crate) struct Inner { id: String, db: StateDB, @@ -94,7 +95,7 @@ impl Inner { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Resource { inner: Arc, }