diff --git a/Cargo.lock b/Cargo.lock index e5717db..1a9d6f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -171,9 +171,9 @@ dependencies = [ [[package]] name = "async-io" -version = "1.6.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a811e6a479f2439f0c04038796b5cfb3d2ad56c230e0f2d3f7b04d68cfee607b" +checksum = "e5e18f61464ae81cde0a23e713ae8fd299580c54d697a35820cfd0625b8b0e07" dependencies = [ "concurrent-queue", "futures-lite", @@ -226,6 +226,23 @@ dependencies = [ "futures-micro", ] +[[package]] +name = "async-process" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf2c06e30a24e8c78a3987d07f0930edf76ef35e027e7bdb063fccafdad1f60c" +dependencies = [ + "async-io", + "blocking", + "cfg-if", + "event-listener", + "futures-lite", + "libc", + "once_cell", + "signal-hook", + "winapi", +] + [[package]] name = "async-std" version = "1.10.0" @@ -836,8 +853,10 @@ dependencies = [ "api", "async-channel", "async-compat", + "async-io", "async-net", "async-oneshot", + "async-process", "async-trait", "backtrace", "capnp", diff --git a/Cargo.toml b/Cargo.toml index 86e0029..a50d54c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,8 @@ pin-utils = "0.1.0" futures-util = "0.3" futures-lite = "1.12.0" async-net = "1.6.1" +async-io = "1.7.0" +async-process = "1.4.0" backtrace = "0.3.65" miette = { version = "4.7.1", features = ["fancy"] } thiserror = "1.0.31" diff --git a/bffhd/initiators/dummy.rs b/bffhd/initiators/dummy.rs new file mode 100644 index 0000000..1fd2a3e --- /dev/null +++ b/bffhd/initiators/dummy.rs @@ -0,0 +1,118 @@ +use miette::{miette, Diagnostic}; +use thiserror::Error; + +use super::Initiator; +use crate::initiators::InitiatorCallbacks; +use crate::resources::modules::fabaccess::Status; +use crate::session::SessionHandle; +use crate::users::UserRef; +use async_io::Timer; +use futures_util::future::BoxFuture; +use futures_util::ready; +use lmdb::Stat; +use std::collections::HashMap; +use std::future::Future; +use std::mem; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +pub struct Dummy { + callbacks: InitiatorCallbacks, + session: SessionHandle, + state: DummyState, +} + +enum DummyState { + Empty, + Sleeping(Timer, Option), + Updating(BoxFuture<'static, Status>), +} + +impl Dummy { + fn timer() -> Timer { + Timer::after(Duration::from_secs(2)) + } + + fn flip(&self, status: Status) -> BoxFuture<'static, Status> { + let session = self.session.clone(); + let mut callbacks = self.callbacks.clone(); + Box::pin(async move { + let next = match &status { + Status::Free => Status::InUse(session.get_user_ref()), + Status::InUse(_) => Status::Free, + _ => Status::Free, + }; + callbacks.try_update(session, status).await; + + next + }) + } +} + +#[derive(Debug, Error, Diagnostic)] +pub enum DummyError {} + +impl Future for Dummy { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let span = tracing::debug_span!("Dummy initiator poll"); + let _guard = span.enter(); + tracing::trace!("polling Dummy initiator"); + loop { + match &mut self.state { + DummyState::Empty => { + tracing::trace!("Dummy initiator is empty, initializing…"); + mem::replace( + &mut self.state, + DummyState::Sleeping(Self::timer(), Some(Status::Free)), + ); + } + DummyState::Sleeping(timer, next) => { + tracing::trace!("Sleep timer exists, polling it."); + + let _: Instant = ready!(Pin::new(timer).poll(cx)); + + tracing::trace!("Timer has fired, poking out an update!"); + + let status = next.take().unwrap(); + let f = self.flip(status); + mem::replace(&mut self.state, DummyState::Updating(f)); + } + DummyState::Updating(f) => { + tracing::trace!("Update future exists, polling it ."); + + let next = ready!(Pin::new(f).poll(cx)); + + tracing::trace!("Update future completed, sleeping!"); + + mem::replace( + &mut self.state, + DummyState::Sleeping(Self::timer(), Some(next)), + ); + } + } + } + } +} + +impl Initiator for Dummy { + fn new(params: &HashMap, callbacks: InitiatorCallbacks) -> miette::Result + where + Self: Sized, + { + let uid = params + .get("uid") + .ok_or_else(|| miette!("Dummy initiator configured without an UID"))?; + let session = callbacks + .open_session(uid) + .ok_or_else(|| miette!("The configured user for the dummy initiator does not exist"))?; + + Ok(Self { + callbacks, + session, + state: DummyState::Empty, + }) + } +} diff --git a/bffhd/initiators/mod.rs b/bffhd/initiators/mod.rs index 59d58e7..e4c9714 100644 --- a/bffhd/initiators/mod.rs +++ b/bffhd/initiators/mod.rs @@ -1,161 +1,157 @@ +use crate::initiators::dummy::Dummy; +use crate::initiators::process::Process; +use crate::resources::modules::fabaccess::Status; +use crate::session::SessionHandle; +use crate::{ + AuthenticationHandle, Config, MachineState, Resource, ResourcesHandle, SessionManager, +}; +use async_compat::CompatExt; +use executor::prelude::Executor; +use futures_util::ready; +use miette::IntoDiagnostic; +use rumqttc::ConnectReturnCode::Success; +use rumqttc::{AsyncClient, ConnectionError, Event, Incoming, MqttOptions}; +use std::collections::HashMap; +use std::fmt::Display; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use async_channel as channel; -use async_oneshot as oneshot; -use futures_signals::signal::Signal; -use futures_util::future::BoxFuture; -use crate::resources::claim::{ResourceID, UserID}; -use crate::resources::state::State; +use std::time::Duration; +use url::Url; -pub enum UpdateError { - /// We're not connected to anything anymore. You can't do anything about this error and the - /// only reason why you even get it is because your future was called a last time before - /// being shelved so best way to handle this error is to just return from your loop entirely, - /// cleaning up any state that doesn't survive a freeze. - Closed, +mod dummy; +mod process; - Denied, - - Other(Box), -} - -pub trait InitiatorError: std::error::Error + Send { -} - -pub trait Initiator { - fn start_for(&mut self, machine: ResourceID) - -> BoxFuture<'static, Result<(), Box>>; - - fn run(&mut self, request: &mut UpdateSink) - -> BoxFuture<'static, Result<(), Box>>; +pub trait Initiator: Future { + fn new(params: &HashMap, callbacks: InitiatorCallbacks) -> miette::Result + where + Self: Sized; + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { + ::poll(self, cx) + } } #[derive(Clone)] -pub struct UpdateSink { - tx: channel::Sender<(Option, State)>, - rx: channel::Receiver>, +pub struct InitiatorCallbacks { + resource: Resource, + sessions: SessionManager, +} +impl InitiatorCallbacks { + pub fn new(resource: Resource, sessions: SessionManager) -> Self { + Self { resource, sessions } + } + + pub async fn try_update(&mut self, session: SessionHandle, status: Status) { + self.resource.try_update(session, status).await + } + + pub fn open_session(&self, uid: &str) -> Option { + self.sessions.open(uid) + } } -impl UpdateSink { - fn new(tx: channel::Sender<(Option, State)>, - rx: channel::Receiver>) - -> Self +pub struct InitiatorDriver { + name: String, + initiator: Box, +} + +impl InitiatorDriver { + pub fn new( + name: String, + params: &HashMap, + resource: Resource, + sessions: SessionManager, + ) -> miette::Result + where + I: 'static + Initiator + Unpin + Send, { - Self { tx, rx } - } - - async fn send(&mut self, userid: Option, state: State) - -> Result<(), UpdateError> - { - if let Err(_e) = self.tx.send((userid, state)).await { - return Err(UpdateError::Closed); - } - - match self.rx.recv().await { - Ok(Ok(())) => Ok(()), - Ok(Err(Error::Denied)) => Err(UpdateError::Denied), - Ok(Err(Error::Internal(e))) => Err(UpdateError::Other(e)), - // RecvError is send only when the channel is closed - Err(_) => Err(UpdateError::Closed), - } + let callbacks = InitiatorCallbacks::new(resource, sessions); + let initiator = Box::new(I::new(params, callbacks)?); + Ok(Self { name, initiator }) } } -struct Resource; -pub struct InitiatorDriver { - // TODO: make this a static reference to the resources because it's much easier and we don't - // need to replace resources at runtime at the moment. - resource_signal: S, - resource: Option>, - - // TODO: Initiators should instead - error_channel: Option>, - - initiator: I, - initiator_future: Option>>>, - update_sink: UpdateSink, - initiator_req_rx: channel::Receiver<(Option, State)>, - initiator_reply_tx: channel::Sender>, -} - -pub struct ResourceSink { - pub id: ResourceID, - pub state_sink: channel::Sender, -} - -impl, I: Initiator> InitiatorDriver { - pub fn new(resource_signal: S, initiator: I) -> Self { - let (initiator_reply_tx, initiator_reply_rx) = channel::bounded(1); - let (initiator_req_tx, initiator_req_rx) = async_channel::bounded(1); - let update_sink = UpdateSink::new(initiator_req_tx, initiator_reply_rx); - Self { - resource: None, - resource_signal, - error_channel: None, - - initiator, - initiator_future: None, - update_sink, - initiator_req_rx, - initiator_reply_tx, - } - } -} - -impl + Unpin, I: Initiator + Unpin> Future for InitiatorDriver { +impl Future for InitiatorDriver { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Pin::new(&mut self.resource_signal).poll_change(cx) { - Poll::Ready(Some(resource)) => { - self.resource = Some(resource.state_sink); - self.error_channel = None; - let f = Box::pin(self.initiator.start_for(resource.id)); - self.initiator_future.replace(f); - }, - Poll::Ready(None) => self.resource = None, - Poll::Pending => {} - } + let _guard = tracing::info_span!("initiator poll", initiator=%self.name); + tracing::trace!(initiator=%self.name, "polling initiator"); - // do while there is work to do - while { - // First things first: - // If we've send an update to the resources in question we have error channel set, so - // we poll that first to determine if the resources has acted on it yet. - if let Some(ref mut errchan) = self.error_channel { - match Pin::new(errchan).poll(cx) { - // In case there's an ongoing - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(error)) => { - self.error_channel = None; - self.initiator_reply_tx.send(Err(error)); - } - Poll::Ready(Err(_closed)) => { - // Error channel was dropped which means there was no error - self.error_channel = None; - self.initiator_reply_tx.send(Ok(())); - } - } - } + ready!(Pin::new(&mut self.initiator).poll(cx)); - if let Some(ref mut init_fut) = self.initiator_future { - match init_fut.as_mut().poll(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(())) => {}, - Poll::Ready(Err(_e)) => { - // TODO: Log initiator error here - } - } - } else if let Some(ref mut _resource) = self.resource { - let mut s = self.update_sink.clone(); - let f = self.initiator.run(&mut s); - self.initiator_future.replace(f); - } - - self.error_channel.is_some() - } {} + tracing::warn!(initiator=%self.name, "initiator module ran to completion!"); Poll::Ready(()) } -} \ No newline at end of file +} + +pub fn load( + executor: Executor, + config: &Config, + resources: ResourcesHandle, + sessions: SessionManager, + authentication: AuthenticationHandle, +) -> miette::Result<()> { + let span = tracing::info_span!("loading initiators"); + let _guard = span.enter(); + + let mut initiator_map: HashMap = config + .init_connections + .iter() + .filter_map(|(k, v)| { + if let Some(resource) = resources.get_by_id(v) { + Some((k.clone(), resource.clone())) + } else { + tracing::error!(initiator=%k, machine=%v, + "Machine configured for initiator not found!"); + None + } + }) + .collect(); + + for (name, cfg) in config.initiators.iter() { + if let Some(resource) = initiator_map.remove(name) { + if let Some(driver) = load_single(name, &cfg.module, &cfg.params, resource, &sessions) { + tracing::debug!(module_name=%cfg.module, %name, "starting initiator task"); + executor.spawn(driver); + } else { + tracing::error!(module_name=%cfg.module, %name, "Initiator module could not be configured"); + } + } else { + tracing::warn!(actor=%name, ?config, "Initiator has no machine configured. Skipping!"); + } + } + + Ok(()) +} + +fn load_single( + name: &String, + module_name: &String, + params: &HashMap, + resource: Resource, + sessions: &SessionManager, +) -> Option { + tracing::info!(%name, %module_name, ?params, "Loading initiator"); + let o = match module_name.as_ref() { + "Dummy" => Some(InitiatorDriver::new::( + name.clone(), + params, + resource, + sessions.clone(), + )), + "Process" => Some(InitiatorDriver::new::( + name.clone(), + params, + resource, + sessions.clone(), + )), + _ => None, + }; + + o.transpose().unwrap_or_else(|error| { + tracing::error!(%error, "failed to configure initiator"); + None + }) +} diff --git a/bffhd/initiators/process.rs b/bffhd/initiators/process.rs new file mode 100644 index 0000000..74de4f5 --- /dev/null +++ b/bffhd/initiators/process.rs @@ -0,0 +1,166 @@ +use super::Initiator; +use super::InitiatorCallbacks; +use crate::resources::state::State; +use crate::utils::linebuffer::LineBuffer; +use async_process::{Child, ChildStdout, Command, Stdio}; +use futures_lite::{ready, AsyncRead}; +use miette::{miette, IntoDiagnostic}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[derive(Debug, Serialize, Deserialize)] +pub enum InputMessage { + #[serde(rename = "state")] + SetState(State), +} + +#[derive(Serialize, Deserialize)] +pub struct OutputLine {} + +pub struct Process { + pub cmd: String, + pub args: Vec, + state: Option, + buffer: LineBuffer, + callbacks: InitiatorCallbacks, +} + +impl Process { + fn spawn(&mut self) -> io::Result<()> { + let mut child = Command::new(&self.cmd) + .args(&self.args) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + self.state = Some(ProcessState::new( + child + .stdout + .take() + .expect("Child just spawned with piped stdout has no stdout"), + child, + )); + Ok(()) + } +} + +struct ProcessState { + pub child: Child, + pub stdout: ChildStdout, +} + +impl ProcessState { + pub fn new(stdout: ChildStdout, child: Child) -> Self { + Self { stdout, child } + } + + fn try_process(&mut self, buffer: &[u8], callbacks: &mut InitiatorCallbacks) -> usize { + tracing::trace!("trying to process current buffer"); + + let mut end = 0; + + while let Some(idx) = buffer[end..].iter().position(|b| *b == b'\n') { + if idx == 0 { + end += 1; + continue; + } + let line = &buffer[end..(end + idx)]; + self.process_line(line, callbacks); + end = idx; + } + + end + } + + fn process_line(&mut self, line: &[u8], callbacks: &mut InitiatorCallbacks) { + match serde_json::from_slice::(line) { + Ok(state) => { + tracing::trace!(?state, "got new state for process initiator"); + } + Err(error) => tracing::warn!(%error, "process initiator did not send a valid line"), + } + } +} + +impl Future for Process { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if let Process { + state: Some(state), + buffer, + callbacks, + .. + } = self.get_mut() + { + match state.child.try_status() { + Err(error) => { + tracing::error!(%error, "checking child exit code returned an error"); + return Poll::Ready(()); + } + Ok(Some(exitcode)) => { + tracing::warn!(%exitcode, "child process exited"); + return Poll::Ready(()); + } + Ok(None) => { + tracing::trace!("process initiator checking on process"); + + let stdout = &mut state.stdout; + + loop { + let buf = buffer.get_mut_write(512); + match AsyncRead::poll_read(Pin::new(stdout), cx, buf) { + Poll::Pending => break, + Poll::Ready(Ok(read)) => { + buffer.advance_valid(read); + continue; + } + Poll::Ready(Err(error)) => { + tracing::warn!(%error, "reading from child stdout errored"); + return Poll::Ready(()); + } + } + } + + let processed = state.try_process(buffer, callbacks); + buffer.consume(processed); + + return Poll::Pending; + } + } + } else { + tracing::warn!("process initiator has no process attached!"); + } + + Poll::Ready(()) + } +} + +impl Initiator for Process { + fn new(params: &HashMap, callbacks: InitiatorCallbacks) -> miette::Result + where + Self: Sized, + { + let cmd = params + .get("cmd") + .ok_or(miette!("Process initiator requires a `cmd` parameter."))? + .clone(); + let args = params + .get("args") + .map(|argv| argv.split_whitespace().map(|s| s.to_string()).collect()) + .unwrap_or_else(Vec::new); + let mut this = Self { + cmd, + args, + state: None, + buffer: LineBuffer::new(), + callbacks, + }; + this.spawn().into_diagnostic()?; + Ok(this) + } +} diff --git a/bffhd/lib.rs b/bffhd/lib.rs index 3c34192..21ff90c 100644 --- a/bffhd/lib.rs +++ b/bffhd/lib.rs @@ -24,6 +24,7 @@ pub mod users; pub mod resources; pub mod actors; +pub mod initiators; pub mod sensors; @@ -118,15 +119,22 @@ impl Diflouroborane { .into_diagnostic() .wrap_err("Failed to construct signal handler")?; + let sessionmanager = SessionManager::new(self.users.clone(), self.roles.clone()); + let authentication = AuthenticationHandle::new(self.users.clone()); + + initiators::load( + self.executor.clone(), + &self.config, + self.resources.clone(), + sessionmanager.clone(), + authentication.clone(), + ); actors::load(self.executor.clone(), &self.config, self.resources.clone())?; let tlsconfig = TlsConfig::new(self.config.tlskeylog.as_ref(), !self.config.is_quiet()) .into_diagnostic()?; let acceptor = tlsconfig.make_tls_acceptor(&self.config.tlsconfig)?; - let sessionmanager = SessionManager::new(self.users.clone(), self.roles.clone()); - let authentication = AuthenticationHandle::new(self.users.clone()); - let apiserver = self.executor.run(APIServer::bind( self.executor.clone(), &self.config.listens, diff --git a/bffhd/resources/mod.rs b/bffhd/resources/mod.rs index 3a7fc10..c222467 100644 --- a/bffhd/resources/mod.rs +++ b/bffhd/resources/mod.rs @@ -25,6 +25,7 @@ pub mod modules; pub struct PermissionDenied; +#[derive(Debug)] pub(crate) struct Inner { id: String, db: StateDB, @@ -94,7 +95,7 @@ impl Inner { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Resource { inner: Arc, } diff --git a/bffhd/resources/modules/fabaccess.rs b/bffhd/resources/modules/fabaccess.rs index 1863aab..b50e9b6 100644 --- a/bffhd/resources/modules/fabaccess.rs +++ b/bffhd/resources/modules/fabaccess.rs @@ -1,9 +1,9 @@ +use crate::config::deser_option; use crate::utils::oid::ObjectIdentifier; use once_cell::sync::Lazy; use rkyv::{Archive, Archived, Deserialize, Infallible}; use std::fmt; use std::fmt::Write; - use std::str::FromStr; //use crate::oidvalue; @@ -54,6 +54,11 @@ pub enum Status { /// The status of the machine pub struct MachineState { pub state: Status, + #[serde( + default, + skip_serializing_if = "Option::is_none", + deserialize_with = "deser_option" + )] pub previous: Option, } diff --git a/bffhd/utils/linebuffer.rs b/bffhd/utils/linebuffer.rs new file mode 100644 index 0000000..87d6559 --- /dev/null +++ b/bffhd/utils/linebuffer.rs @@ -0,0 +1,60 @@ +use std::ops::{Deref, DerefMut}; + +pub struct LineBuffer { + buffer: Vec, + valid: usize, +} + +impl LineBuffer { + pub fn new() -> Self { + Self { + buffer: Vec::new(), + valid: 0, + } + } + + /// Resize the internal Vec so that buffer.len() == buffer.capacity() + fn resize(&mut self) { + // SAFETY: Whatever is in memory is always valid as u8. + unsafe { self.buffer.set_len(self.buffer.capacity()) } + } + + /// Get an (initialized but empty) writeable buffer of at least `atleast` bytes + pub fn get_mut_write(&mut self, atleast: usize) -> &mut [u8] { + let avail = self.buffer.len() - self.valid; + if avail < atleast { + self.buffer.reserve(atleast - avail); + self.resize() + } + &mut self.buffer[self.valid..] + } + + pub fn advance_valid(&mut self, amount: usize) { + self.valid += amount + } + + /// Mark `amount` bytes as 'consumed' + /// + /// This will move any remaining data to the start of the buffer for future processing + pub fn consume(&mut self, amount: usize) { + assert!(amount <= self.valid); + + if amount < self.valid { + self.buffer.copy_within(amount..self.valid, 0); + } + self.valid -= amount; + } +} + +impl Deref for LineBuffer { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.buffer[0..self.valid] + } +} +impl DerefMut for LineBuffer { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.buffer[0..self.valid] + } +} diff --git a/bffhd/utils/mod.rs b/bffhd/utils/mod.rs index c14ec8f..d65f75c 100644 --- a/bffhd/utils/mod.rs +++ b/bffhd/utils/mod.rs @@ -8,3 +8,5 @@ pub mod varint; pub mod l10nstring; pub mod uuid; + +pub mod linebuffer; diff --git a/examples/init.py b/examples/init.py new file mode 100755 index 0000000..73ed4e2 --- /dev/null +++ b/examples/init.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python + +import sys +import time + +while True: + print('{ "state": { "1.3.6.1.4.1.48398.612.2.4": { "state": "Free" } } }') + sys.stdout.flush() + time.sleep(2) + + print('{ "state": { "1.3.6.1.4.1.48398.612.2.4": { "state": { "InUse": { "id": "Testuser" } } } } }') + sys.stdout.flush() + time.sleep(2)