Merge branch 'feature/new_initiator' into development

* feature/new_initiator:
  Process initiator working
  Reimplement the dummy initiator
This commit is contained in:
Nadja Reitzenstein 2022-06-18 16:52:59 +02:00
commit 728c33f444
11 changed files with 536 additions and 146 deletions

23
Cargo.lock generated
View File

@ -171,9 +171,9 @@ dependencies = [
[[package]]
name = "async-io"
version = "1.6.0"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a811e6a479f2439f0c04038796b5cfb3d2ad56c230e0f2d3f7b04d68cfee607b"
checksum = "e5e18f61464ae81cde0a23e713ae8fd299580c54d697a35820cfd0625b8b0e07"
dependencies = [
"concurrent-queue",
"futures-lite",
@ -226,6 +226,23 @@ dependencies = [
"futures-micro",
]
[[package]]
name = "async-process"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf2c06e30a24e8c78a3987d07f0930edf76ef35e027e7bdb063fccafdad1f60c"
dependencies = [
"async-io",
"blocking",
"cfg-if",
"event-listener",
"futures-lite",
"libc",
"once_cell",
"signal-hook",
"winapi",
]
[[package]]
name = "async-std"
version = "1.10.0"
@ -836,8 +853,10 @@ dependencies = [
"api",
"async-channel",
"async-compat",
"async-io",
"async-net",
"async-oneshot",
"async-process",
"async-trait",
"backtrace",
"capnp",

View File

@ -37,6 +37,8 @@ pin-utils = "0.1.0"
futures-util = "0.3"
futures-lite = "1.12.0"
async-net = "1.6.1"
async-io = "1.7.0"
async-process = "1.4.0"
backtrace = "0.3.65"
miette = { version = "4.7.1", features = ["fancy"] }
thiserror = "1.0.31"

118
bffhd/initiators/dummy.rs Normal file
View File

@ -0,0 +1,118 @@
use miette::{miette, Diagnostic};
use thiserror::Error;
use super::Initiator;
use crate::initiators::InitiatorCallbacks;
use crate::resources::modules::fabaccess::Status;
use crate::session::SessionHandle;
use crate::users::UserRef;
use async_io::Timer;
use futures_util::future::BoxFuture;
use futures_util::ready;
use lmdb::Stat;
use std::collections::HashMap;
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
pub struct Dummy {
callbacks: InitiatorCallbacks,
session: SessionHandle,
state: DummyState,
}
enum DummyState {
Empty,
Sleeping(Timer, Option<Status>),
Updating(BoxFuture<'static, Status>),
}
impl Dummy {
fn timer() -> Timer {
Timer::after(Duration::from_secs(2))
}
fn flip(&self, status: Status) -> BoxFuture<'static, Status> {
let session = self.session.clone();
let mut callbacks = self.callbacks.clone();
Box::pin(async move {
let next = match &status {
Status::Free => Status::InUse(session.get_user_ref()),
Status::InUse(_) => Status::Free,
_ => Status::Free,
};
callbacks.try_update(session, status).await;
next
})
}
}
#[derive(Debug, Error, Diagnostic)]
pub enum DummyError {}
impl Future for Dummy {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let span = tracing::debug_span!("Dummy initiator poll");
let _guard = span.enter();
tracing::trace!("polling Dummy initiator");
loop {
match &mut self.state {
DummyState::Empty => {
tracing::trace!("Dummy initiator is empty, initializing…");
mem::replace(
&mut self.state,
DummyState::Sleeping(Self::timer(), Some(Status::Free)),
);
}
DummyState::Sleeping(timer, next) => {
tracing::trace!("Sleep timer exists, polling it.");
let _: Instant = ready!(Pin::new(timer).poll(cx));
tracing::trace!("Timer has fired, poking out an update!");
let status = next.take().unwrap();
let f = self.flip(status);
mem::replace(&mut self.state, DummyState::Updating(f));
}
DummyState::Updating(f) => {
tracing::trace!("Update future exists, polling it .");
let next = ready!(Pin::new(f).poll(cx));
tracing::trace!("Update future completed, sleeping!");
mem::replace(
&mut self.state,
DummyState::Sleeping(Self::timer(), Some(next)),
);
}
}
}
}
}
impl Initiator for Dummy {
fn new(params: &HashMap<String, String>, callbacks: InitiatorCallbacks) -> miette::Result<Self>
where
Self: Sized,
{
let uid = params
.get("uid")
.ok_or_else(|| miette!("Dummy initiator configured without an UID"))?;
let session = callbacks
.open_session(uid)
.ok_or_else(|| miette!("The configured user for the dummy initiator does not exist"))?;
Ok(Self {
callbacks,
session,
state: DummyState::Empty,
})
}
}

View File

@ -1,161 +1,157 @@
use crate::initiators::dummy::Dummy;
use crate::initiators::process::Process;
use crate::resources::modules::fabaccess::Status;
use crate::session::SessionHandle;
use crate::{
AuthenticationHandle, Config, MachineState, Resource, ResourcesHandle, SessionManager,
};
use async_compat::CompatExt;
use executor::prelude::Executor;
use futures_util::ready;
use miette::IntoDiagnostic;
use rumqttc::ConnectReturnCode::Success;
use rumqttc::{AsyncClient, ConnectionError, Event, Incoming, MqttOptions};
use std::collections::HashMap;
use std::fmt::Display;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use async_channel as channel;
use async_oneshot as oneshot;
use futures_signals::signal::Signal;
use futures_util::future::BoxFuture;
use crate::resources::claim::{ResourceID, UserID};
use crate::resources::state::State;
use std::time::Duration;
use url::Url;
pub enum UpdateError {
/// We're not connected to anything anymore. You can't do anything about this error and the
/// only reason why you even get it is because your future was called a last time before
/// being shelved so best way to handle this error is to just return from your loop entirely,
/// cleaning up any state that doesn't survive a freeze.
Closed,
mod dummy;
mod process;
Denied,
Other(Box<dyn std::error::Error + Send>),
pub trait Initiator: Future<Output = ()> {
fn new(params: &HashMap<String, String>, callbacks: InitiatorCallbacks) -> miette::Result<Self>
where
Self: Sized;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
<Self as Future>::poll(self, cx)
}
pub trait InitiatorError: std::error::Error + Send {
}
pub trait Initiator {
fn start_for(&mut self, machine: ResourceID)
-> BoxFuture<'static, Result<(), Box<dyn InitiatorError>>>;
fn run(&mut self, request: &mut UpdateSink)
-> BoxFuture<'static, Result<(), Box<dyn InitiatorError>>>;
}
#[derive(Clone)]
pub struct UpdateSink {
tx: channel::Sender<(Option<UserID>, State)>,
rx: channel::Receiver<Result<(), Error>>,
pub struct InitiatorCallbacks {
resource: Resource,
sessions: SessionManager,
}
impl InitiatorCallbacks {
pub fn new(resource: Resource, sessions: SessionManager) -> Self {
Self { resource, sessions }
}
impl UpdateSink {
fn new(tx: channel::Sender<(Option<UserID>, State)>,
rx: channel::Receiver<Result<(), Error>>)
-> Self
pub async fn try_update(&mut self, session: SessionHandle, status: Status) {
self.resource.try_update(session, status).await
}
pub fn open_session(&self, uid: &str) -> Option<SessionHandle> {
self.sessions.open(uid)
}
}
pub struct InitiatorDriver {
name: String,
initiator: Box<dyn Initiator + Unpin + Send>,
}
impl InitiatorDriver {
pub fn new<I>(
name: String,
params: &HashMap<String, String>,
resource: Resource,
sessions: SessionManager,
) -> miette::Result<Self>
where
I: 'static + Initiator + Unpin + Send,
{
Self { tx, rx }
}
async fn send(&mut self, userid: Option<UserID>, state: State)
-> Result<(), UpdateError>
{
if let Err(_e) = self.tx.send((userid, state)).await {
return Err(UpdateError::Closed);
}
match self.rx.recv().await {
Ok(Ok(())) => Ok(()),
Ok(Err(Error::Denied)) => Err(UpdateError::Denied),
Ok(Err(Error::Internal(e))) => Err(UpdateError::Other(e)),
// RecvError is send only when the channel is closed
Err(_) => Err(UpdateError::Closed),
}
let callbacks = InitiatorCallbacks::new(resource, sessions);
let initiator = Box::new(I::new(params, callbacks)?);
Ok(Self { name, initiator })
}
}
struct Resource;
pub struct InitiatorDriver<S, I: Initiator> {
// TODO: make this a static reference to the resources because it's much easier and we don't
// need to replace resources at runtime at the moment.
resource_signal: S,
resource: Option<channel::Sender<Update>>,
// TODO: Initiators should instead
error_channel: Option<oneshot::Receiver<Error>>,
initiator: I,
initiator_future: Option<BoxFuture<'static, Result<(), Box<dyn InitiatorError>>>>,
update_sink: UpdateSink,
initiator_req_rx: channel::Receiver<(Option<UserID>, State)>,
initiator_reply_tx: channel::Sender<Result<(), Error>>,
}
pub struct ResourceSink {
pub id: ResourceID,
pub state_sink: channel::Sender<Update>,
}
impl<S: Signal<Item=ResourceSink>, I: Initiator> InitiatorDriver<S, I> {
pub fn new(resource_signal: S, initiator: I) -> Self {
let (initiator_reply_tx, initiator_reply_rx) = channel::bounded(1);
let (initiator_req_tx, initiator_req_rx) = async_channel::bounded(1);
let update_sink = UpdateSink::new(initiator_req_tx, initiator_reply_rx);
Self {
resource: None,
resource_signal,
error_channel: None,
initiator,
initiator_future: None,
update_sink,
initiator_req_rx,
initiator_reply_tx,
}
}
}
impl<S: Signal<Item=ResourceSink> + Unpin, I: Initiator + Unpin> Future for InitiatorDriver<S, I> {
impl Future for InitiatorDriver {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.resource_signal).poll_change(cx) {
Poll::Ready(Some(resource)) => {
self.resource = Some(resource.state_sink);
self.error_channel = None;
let f = Box::pin(self.initiator.start_for(resource.id));
self.initiator_future.replace(f);
},
Poll::Ready(None) => self.resource = None,
Poll::Pending => {}
}
let _guard = tracing::info_span!("initiator poll", initiator=%self.name);
tracing::trace!(initiator=%self.name, "polling initiator");
// do while there is work to do
while {
// First things first:
// If we've send an update to the resources in question we have error channel set, so
// we poll that first to determine if the resources has acted on it yet.
if let Some(ref mut errchan) = self.error_channel {
match Pin::new(errchan).poll(cx) {
// In case there's an ongoing
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(error)) => {
self.error_channel = None;
self.initiator_reply_tx.send(Err(error));
}
Poll::Ready(Err(_closed)) => {
// Error channel was dropped which means there was no error
self.error_channel = None;
self.initiator_reply_tx.send(Ok(()));
}
}
}
ready!(Pin::new(&mut self.initiator).poll(cx));
if let Some(ref mut init_fut) = self.initiator_future {
match init_fut.as_mut().poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(())) => {},
Poll::Ready(Err(_e)) => {
// TODO: Log initiator error here
}
}
} else if let Some(ref mut _resource) = self.resource {
let mut s = self.update_sink.clone();
let f = self.initiator.run(&mut s);
self.initiator_future.replace(f);
}
self.error_channel.is_some()
} {}
tracing::warn!(initiator=%self.name, "initiator module ran to completion!");
Poll::Ready(())
}
}
pub fn load(
executor: Executor,
config: &Config,
resources: ResourcesHandle,
sessions: SessionManager,
authentication: AuthenticationHandle,
) -> miette::Result<()> {
let span = tracing::info_span!("loading initiators");
let _guard = span.enter();
let mut initiator_map: HashMap<String, Resource> = config
.init_connections
.iter()
.filter_map(|(k, v)| {
if let Some(resource) = resources.get_by_id(v) {
Some((k.clone(), resource.clone()))
} else {
tracing::error!(initiator=%k, machine=%v,
"Machine configured for initiator not found!");
None
}
})
.collect();
for (name, cfg) in config.initiators.iter() {
if let Some(resource) = initiator_map.remove(name) {
if let Some(driver) = load_single(name, &cfg.module, &cfg.params, resource, &sessions) {
tracing::debug!(module_name=%cfg.module, %name, "starting initiator task");
executor.spawn(driver);
} else {
tracing::error!(module_name=%cfg.module, %name, "Initiator module could not be configured");
}
} else {
tracing::warn!(actor=%name, ?config, "Initiator has no machine configured. Skipping!");
}
}
Ok(())
}
fn load_single(
name: &String,
module_name: &String,
params: &HashMap<String, String>,
resource: Resource,
sessions: &SessionManager,
) -> Option<InitiatorDriver> {
tracing::info!(%name, %module_name, ?params, "Loading initiator");
let o = match module_name.as_ref() {
"Dummy" => Some(InitiatorDriver::new::<Dummy>(
name.clone(),
params,
resource,
sessions.clone(),
)),
"Process" => Some(InitiatorDriver::new::<Process>(
name.clone(),
params,
resource,
sessions.clone(),
)),
_ => None,
};
o.transpose().unwrap_or_else(|error| {
tracing::error!(%error, "failed to configure initiator");
None
})
}

166
bffhd/initiators/process.rs Normal file
View File

@ -0,0 +1,166 @@
use super::Initiator;
use super::InitiatorCallbacks;
use crate::resources::state::State;
use crate::utils::linebuffer::LineBuffer;
use async_process::{Child, ChildStdout, Command, Stdio};
use futures_lite::{ready, AsyncRead};
use miette::{miette, IntoDiagnostic};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Debug, Serialize, Deserialize)]
pub enum InputMessage {
#[serde(rename = "state")]
SetState(State),
}
#[derive(Serialize, Deserialize)]
pub struct OutputLine {}
pub struct Process {
pub cmd: String,
pub args: Vec<String>,
state: Option<ProcessState>,
buffer: LineBuffer,
callbacks: InitiatorCallbacks,
}
impl Process {
fn spawn(&mut self) -> io::Result<()> {
let mut child = Command::new(&self.cmd)
.args(&self.args)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
self.state = Some(ProcessState::new(
child
.stdout
.take()
.expect("Child just spawned with piped stdout has no stdout"),
child,
));
Ok(())
}
}
struct ProcessState {
pub child: Child,
pub stdout: ChildStdout,
}
impl ProcessState {
pub fn new(stdout: ChildStdout, child: Child) -> Self {
Self { stdout, child }
}
fn try_process(&mut self, buffer: &[u8], callbacks: &mut InitiatorCallbacks) -> usize {
tracing::trace!("trying to process current buffer");
let mut end = 0;
while let Some(idx) = buffer[end..].iter().position(|b| *b == b'\n') {
if idx == 0 {
end += 1;
continue;
}
let line = &buffer[end..(end + idx)];
self.process_line(line, callbacks);
end = idx;
}
end
}
fn process_line(&mut self, line: &[u8], callbacks: &mut InitiatorCallbacks) {
match serde_json::from_slice::<InputMessage>(line) {
Ok(state) => {
tracing::trace!(?state, "got new state for process initiator");
}
Err(error) => tracing::warn!(%error, "process initiator did not send a valid line"),
}
}
}
impl Future for Process {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Process {
state: Some(state),
buffer,
callbacks,
..
} = self.get_mut()
{
match state.child.try_status() {
Err(error) => {
tracing::error!(%error, "checking child exit code returned an error");
return Poll::Ready(());
}
Ok(Some(exitcode)) => {
tracing::warn!(%exitcode, "child process exited");
return Poll::Ready(());
}
Ok(None) => {
tracing::trace!("process initiator checking on process");
let stdout = &mut state.stdout;
loop {
let buf = buffer.get_mut_write(512);
match AsyncRead::poll_read(Pin::new(stdout), cx, buf) {
Poll::Pending => break,
Poll::Ready(Ok(read)) => {
buffer.advance_valid(read);
continue;
}
Poll::Ready(Err(error)) => {
tracing::warn!(%error, "reading from child stdout errored");
return Poll::Ready(());
}
}
}
let processed = state.try_process(buffer, callbacks);
buffer.consume(processed);
return Poll::Pending;
}
}
} else {
tracing::warn!("process initiator has no process attached!");
}
Poll::Ready(())
}
}
impl Initiator for Process {
fn new(params: &HashMap<String, String>, callbacks: InitiatorCallbacks) -> miette::Result<Self>
where
Self: Sized,
{
let cmd = params
.get("cmd")
.ok_or(miette!("Process initiator requires a `cmd` parameter."))?
.clone();
let args = params
.get("args")
.map(|argv| argv.split_whitespace().map(|s| s.to_string()).collect())
.unwrap_or_else(Vec::new);
let mut this = Self {
cmd,
args,
state: None,
buffer: LineBuffer::new(),
callbacks,
};
this.spawn().into_diagnostic()?;
Ok(this)
}
}

View File

@ -24,6 +24,7 @@ pub mod users;
pub mod resources;
pub mod actors;
pub mod initiators;
pub mod sensors;
@ -118,15 +119,22 @@ impl Diflouroborane {
.into_diagnostic()
.wrap_err("Failed to construct signal handler")?;
let sessionmanager = SessionManager::new(self.users.clone(), self.roles.clone());
let authentication = AuthenticationHandle::new(self.users.clone());
initiators::load(
self.executor.clone(),
&self.config,
self.resources.clone(),
sessionmanager.clone(),
authentication.clone(),
);
actors::load(self.executor.clone(), &self.config, self.resources.clone())?;
let tlsconfig = TlsConfig::new(self.config.tlskeylog.as_ref(), !self.config.is_quiet())
.into_diagnostic()?;
let acceptor = tlsconfig.make_tls_acceptor(&self.config.tlsconfig)?;
let sessionmanager = SessionManager::new(self.users.clone(), self.roles.clone());
let authentication = AuthenticationHandle::new(self.users.clone());
let apiserver = self.executor.run(APIServer::bind(
self.executor.clone(),
&self.config.listens,

View File

@ -25,6 +25,7 @@ pub mod modules;
pub struct PermissionDenied;
#[derive(Debug)]
pub(crate) struct Inner {
id: String,
db: StateDB,
@ -94,7 +95,7 @@ impl Inner {
}
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct Resource {
inner: Arc<Inner>,
}

View File

@ -1,9 +1,9 @@
use crate::config::deser_option;
use crate::utils::oid::ObjectIdentifier;
use once_cell::sync::Lazy;
use rkyv::{Archive, Archived, Deserialize, Infallible};
use std::fmt;
use std::fmt::Write;
use std::str::FromStr;
//use crate::oidvalue;
@ -54,6 +54,11 @@ pub enum Status {
/// The status of the machine
pub struct MachineState {
pub state: Status,
#[serde(
default,
skip_serializing_if = "Option::is_none",
deserialize_with = "deser_option"
)]
pub previous: Option<UserRef>,
}

60
bffhd/utils/linebuffer.rs Normal file
View File

@ -0,0 +1,60 @@
use std::ops::{Deref, DerefMut};
pub struct LineBuffer {
buffer: Vec<u8>,
valid: usize,
}
impl LineBuffer {
pub fn new() -> Self {
Self {
buffer: Vec::new(),
valid: 0,
}
}
/// Resize the internal Vec so that buffer.len() == buffer.capacity()
fn resize(&mut self) {
// SAFETY: Whatever is in memory is always valid as u8.
unsafe { self.buffer.set_len(self.buffer.capacity()) }
}
/// Get an (initialized but empty) writeable buffer of at least `atleast` bytes
pub fn get_mut_write(&mut self, atleast: usize) -> &mut [u8] {
let avail = self.buffer.len() - self.valid;
if avail < atleast {
self.buffer.reserve(atleast - avail);
self.resize()
}
&mut self.buffer[self.valid..]
}
pub fn advance_valid(&mut self, amount: usize) {
self.valid += amount
}
/// Mark `amount` bytes as 'consumed'
///
/// This will move any remaining data to the start of the buffer for future processing
pub fn consume(&mut self, amount: usize) {
assert!(amount <= self.valid);
if amount < self.valid {
self.buffer.copy_within(amount..self.valid, 0);
}
self.valid -= amount;
}
}
impl Deref for LineBuffer {
type Target = [u8];
fn deref(&self) -> &Self::Target {
&self.buffer[0..self.valid]
}
}
impl DerefMut for LineBuffer {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.buffer[0..self.valid]
}
}

View File

@ -8,3 +8,5 @@ pub mod varint;
pub mod l10nstring;
pub mod uuid;
pub mod linebuffer;

13
examples/init.py Executable file
View File

@ -0,0 +1,13 @@
#!/usr/bin/env python
import sys
import time
while True:
print('{ "state": { "1.3.6.1.4.1.48398.612.2.4": { "state": "Free" } } }')
sys.stdout.flush()
time.sleep(2)
print('{ "state": { "1.3.6.1.4.1.48398.612.2.4": { "state": { "InUse": { "id": "Testuser" } } } } }')
sys.stdout.flush()
time.sleep(2)