use crate::initiators::dummy::Dummy; use crate::initiators::process::Process; use crate::resources::modules::fabaccess::Status; use crate::session::SessionHandle; use crate::{ AuthenticationHandle, Config, Resource, ResourcesHandle, SessionManager, }; use executor::prelude::Executor; use futures_util::ready; use std::collections::HashMap; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use tracing::Span; mod dummy; mod process; pub trait Initiator: Future { fn new(params: &HashMap, callbacks: InitiatorCallbacks) -> miette::Result where Self: Sized; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { ::poll(self, cx) } } #[derive(Clone)] pub struct InitiatorCallbacks { span: Span, resource: Resource, sessions: SessionManager, } impl InitiatorCallbacks { pub fn new(span: Span, resource: Resource, sessions: SessionManager) -> Self { Self { span, resource, sessions, } } pub async fn try_update(&mut self, session: SessionHandle, status: Status) { self.resource.try_update(session, status).await } pub fn set_status(&mut self, status: Status) { self.resource.set_status(status) } pub fn open_session(&self, uid: &str) -> Option { self.sessions.try_open(&self.span, uid) } } pub struct InitiatorDriver { span: Span, name: String, initiator: Box, } impl InitiatorDriver { pub fn new( span: Span, name: String, params: &HashMap, resource: Resource, sessions: SessionManager, ) -> miette::Result where I: 'static + Initiator + Unpin + Send, { let callbacks = InitiatorCallbacks::new(span.clone(), resource, sessions); let initiator = Box::new(I::new(params, callbacks)?); Ok(Self { span, name, initiator, }) } } impl Future for InitiatorDriver { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let _guard = tracing::info_span!("initiator poll", initiator=%self.name); tracing::trace!(initiator=%self.name, "polling initiator"); ready!(Pin::new(&mut self.initiator).poll(cx)); tracing::warn!(initiator=%self.name, "initiator module ran to completion!"); Poll::Ready(()) } } pub fn load( executor: Executor, config: &Config, resources: ResourcesHandle, sessions: SessionManager, authentication: AuthenticationHandle, ) -> miette::Result<()> { let span = tracing::info_span!("loading initiators"); let _guard = span.enter(); let mut initiator_map: HashMap = config .init_connections .iter() .filter_map(|(k, v)| { if let Some(resource) = resources.get_by_id(v) { Some((k.clone(), resource.clone())) } else { tracing::error!(initiator=%k, machine=%v, "Machine configured for initiator not found!"); None } }) .collect(); for (name, cfg) in config.initiators.iter() { if let Some(resource) = initiator_map.remove(name) { if let Some(driver) = load_single(name, &cfg.module, &cfg.params, resource, &sessions) { tracing::debug!(module_name=%cfg.module, %name, "starting initiator task"); executor.spawn(driver); } else { tracing::error!(module_name=%cfg.module, %name, "Initiator module could not be configured"); } } else { tracing::warn!(actor=%name, ?config, "Initiator has no machine configured. Skipping!"); } } Ok(()) } fn load_single( name: &String, module_name: &String, params: &HashMap, resource: Resource, sessions: &SessionManager, ) -> Option { let span = tracing::info_span!( "initiator", name = %name, module = %module_name, ); tracing::info!(%name, %module_name, ?params, "Loading initiator"); let o = match module_name.as_ref() { "Dummy" => Some(InitiatorDriver::new::( span, name.clone(), params, resource, sessions.clone(), )), "Process" => Some(InitiatorDriver::new::( span, name.clone(), params, resource, sessions.clone(), )), _ => None, }; o.transpose().unwrap_or_else(|error| { tracing::error!(%error, "failed to configure initiator"); None }) }