2021-10-27 23:20:35 +02:00
|
|
|
use std::fmt::Debug;
|
2021-10-06 13:53:14 +02:00
|
|
|
use async_trait::async_trait;
|
|
|
|
|
|
|
|
use futures_signals::signal::Mutable;
|
2021-10-27 17:53:00 +02:00
|
|
|
use async_oneshot::Sender;
|
|
|
|
use async_channel::Receiver;
|
2021-10-06 13:53:14 +02:00
|
|
|
|
2021-11-26 02:25:48 +01:00
|
|
|
use state::State;
|
2021-11-26 21:01:43 +01:00
|
|
|
use state::db::StateAccessor;
|
2021-10-06 13:53:14 +02:00
|
|
|
|
2021-11-26 02:25:48 +01:00
|
|
|
pub mod state;
|
|
|
|
pub mod claim;
|
|
|
|
|
2021-11-26 21:01:43 +01:00
|
|
|
|
2021-10-06 13:53:14 +02:00
|
|
|
/// A resource in BFFH has to contain several different parts;
|
|
|
|
/// - Currently set state
|
|
|
|
/// - Execution state of attached actors (⇒ BFFH's job)
|
|
|
|
/// - Output of interal logic of a resource
|
|
|
|
/// ⇒ Resource logic gets read access to set state and write access to output state.
|
|
|
|
/// ⇒ state `update` happens via resource logic. This logic should do access control. If the update
|
|
|
|
/// succeeds then BFFH stores those input parameters ("set" state) and results / output state.
|
|
|
|
/// Storing input parameters is relevant so that BFFH can know that an "update" is a no-op
|
|
|
|
/// without having to run the module code.
|
|
|
|
/// ⇒ in fact actors only really care about the output state, and shouldn't (need to) see "set"
|
|
|
|
/// state.
|
|
|
|
/// ⇒ example reserving:
|
|
|
|
/// - Claimant sends 'update' message with a new state
|
|
|
|
/// - Doesn't set the state until `update` has returned Ok.
|
|
|
|
/// - This runs the `update` function with that new state and the claimants user context returning
|
|
|
|
/// either an Ok or an Error.
|
|
|
|
/// - Error is returned to Claimant to show user, stop.
|
|
|
|
/// - On ok:
|
|
|
|
/// - Commit new "set" state, storing it and making it visible to all other claimants
|
|
|
|
/// - Commit new output state, storing it and notifying all connected actors / Notify
|
|
|
|
/// ⇒ BFFHs job in this whole ordeal is:
|
|
|
|
/// - Message passing primitives so that update message are queued
|
|
|
|
/// - As reliable as possible storage system for input and output state
|
|
|
|
/// - Again message passing so that updates are broadcasted to all Notify and Actors.
|
|
|
|
/// ⇒ Resource module's job is:
|
|
|
|
/// - Validating updates semantically i.e. are the types correct
|
|
|
|
/// - Check authorization of updates i.e. is this user allowed to do that
|
|
|
|
#[async_trait]
|
2021-10-27 23:20:35 +02:00
|
|
|
pub trait Resource: Debug {
|
2021-10-06 13:53:14 +02:00
|
|
|
/// Run whatever internal logic this resource has for the given State update, and return the
|
|
|
|
/// new output state that this update produces.
|
2021-10-27 21:32:50 +02:00
|
|
|
async fn on_update(&mut self, input: &State) -> Result<State, Error>;
|
|
|
|
async fn shutdown(&mut self);
|
2021-09-30 10:07:42 +02:00
|
|
|
}
|
|
|
|
|
2021-10-27 23:20:35 +02:00
|
|
|
#[derive(Debug)]
|
2021-10-27 21:32:50 +02:00
|
|
|
pub struct Passthrough;
|
|
|
|
#[async_trait]
|
|
|
|
impl Resource for Passthrough {
|
|
|
|
async fn on_update(&mut self, input: &State) -> Result<State, Error> {
|
|
|
|
Ok(input.clone())
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn shutdown(&mut self) {}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Error type a resource implementation can produce
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub enum Error {
|
|
|
|
Internal(Box<dyn std::error::Error>),
|
|
|
|
Denied,
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: more message context
|
2021-10-28 01:10:35 +02:00
|
|
|
#[derive(Debug)]
|
2021-10-06 13:53:14 +02:00
|
|
|
pub struct Update {
|
|
|
|
pub state: State,
|
2021-10-27 21:32:50 +02:00
|
|
|
pub errchan: Sender<Error>,
|
2021-10-06 13:53:14 +02:00
|
|
|
}
|
|
|
|
|
2021-10-27 23:20:35 +02:00
|
|
|
#[derive(Debug)]
|
2021-10-06 13:53:14 +02:00
|
|
|
pub struct ResourceDriver {
|
2021-10-13 04:57:40 +02:00
|
|
|
// putput
|
2021-10-06 13:53:14 +02:00
|
|
|
res: Box<dyn Resource>,
|
|
|
|
|
2021-10-13 04:57:40 +02:00
|
|
|
// input
|
2021-10-06 13:53:14 +02:00
|
|
|
rx: Receiver<Update>,
|
2021-10-13 04:57:40 +02:00
|
|
|
|
|
|
|
// output
|
|
|
|
db: StateAccessor,
|
|
|
|
|
2021-10-06 13:53:14 +02:00
|
|
|
signal: Mutable<State>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ResourceDriver {
|
|
|
|
pub async fn drive_to_end(&mut self) {
|
|
|
|
while let Ok(update) = self.rx.recv().await {
|
|
|
|
let state = update.state;
|
2021-10-27 14:49:45 +02:00
|
|
|
let mut errchan = update.errchan;
|
2021-10-06 13:53:14 +02:00
|
|
|
|
2021-10-27 21:32:50 +02:00
|
|
|
match self.res.on_update(&state).await {
|
2021-10-06 13:53:14 +02:00
|
|
|
Ok(outstate) => {
|
|
|
|
// FIXME: Send any error here to some global error collector. A failed write to
|
|
|
|
// the DB is not necessarily fatal, but it means that BFFH is now in an
|
|
|
|
// inconsistent state until a future update succeeds with writing to the DB.
|
|
|
|
// Not applying the new state isn't correct either since we don't know what the
|
|
|
|
// internal logic of the resource has done to make this happen.
|
|
|
|
// Another half right solution is to unwrap and recreate everything.
|
2021-10-13 04:57:40 +02:00
|
|
|
// "Best" solution would be to tell the resource to rollback their interal
|
|
|
|
// changes on a fatal failure and then notify the Claimant, while simply trying
|
|
|
|
// again for temporary failures.
|
|
|
|
let _ = self.db.set(&state, &outstate);
|
|
|
|
self.signal.set(outstate);
|
2021-10-06 13:53:14 +02:00
|
|
|
},
|
|
|
|
Err(e) => {
|
2021-10-13 04:57:40 +02:00
|
|
|
let _ = errchan.send(e);
|
2021-10-06 13:53:14 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-09-30 10:07:42 +02:00
|
|
|
}
|
2021-10-27 21:32:50 +02:00
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use std::pin::Pin;
|
|
|
|
use std::task::Poll;
|
|
|
|
use std::future::Future;
|
|
|
|
use super::*;
|
|
|
|
|
|
|
|
#[futures_test::test]
|
|
|
|
async fn test_passthrough_is_id() {
|
2021-11-26 02:25:48 +01:00
|
|
|
let inp = state::tests::gen_random();
|
2021-10-27 21:32:50 +02:00
|
|
|
|
|
|
|
let mut res = Passthrough;
|
|
|
|
let out = res.on_update(&inp).await.unwrap();
|
|
|
|
assert_eq!(inp, out);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_passthrough_is_always_ready() {
|
|
|
|
let inp = State::build().finish();
|
|
|
|
|
|
|
|
let mut res = Passthrough;
|
|
|
|
let mut cx = futures_test::task::panic_context();
|
|
|
|
if let Poll::Ready(_) = Pin::new(&mut res.on_update(&inp)).poll(&mut cx) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
panic!("Passthrough returned Poll::Pending")
|
|
|
|
}
|
|
|
|
}
|