diff --git a/Cargo.lock b/Cargo.lock index d94a644..87f98a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -192,18 +192,6 @@ dependencies = [ "event-listener", ] -[[package]] -name = "async-native-tls" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e9e7a929bd34c68a82d58a4de7f86fffdaf97fb2af850162a7bb19dd7269b33" -dependencies = [ - "async-std", - "native-tls", - "thiserror", - "url", -] - [[package]] name = "async-net" version = "1.6.1" @@ -248,8 +236,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c86f33abd5a4f3e2d6d9251a9e0c6a7e52eb1113caf893dae8429bf4a53f378" dependencies = [ "futures-lite", - "rustls 0.19.1", - "webpki 0.21.4", + "rustls", + "webpki", ] [[package]] @@ -287,9 +275,9 @@ checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" [[package]] name = "async-trait" -version = "0.1.51" +version = "0.1.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e" +checksum = "061a7acccaa286c011ddc30970520b98fa40e00c9d644633fb26b5fc63a265e3" dependencies = [ "proc-macro2", "quote", @@ -609,22 +597,6 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" -[[package]] -name = "core-foundation" -version = "0.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6888e10551bb93e424d8df1d07f1a8b4fceb0001a3a4b048bfc47554946f47b3" -dependencies = [ - "core-foundation-sys", - "libc", -] - -[[package]] -name = "core-foundation-sys" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" - [[package]] name = "cpufeatures" version = "0.2.1" @@ -837,7 +809,6 @@ version = "0.4.0" dependencies = [ "api", "async-channel", - "async-native-tls", "async-oneshot", "async-rustls", "async-trait", @@ -850,7 +821,7 @@ dependencies = [ "executor", "futures-signals", "futures-test", - "intmap", + "futures-util", "inventory", "lazy_static", "libc", @@ -864,7 +835,8 @@ dependencies = [ "rkyv_typename", "rsasl", "rust-argon2", - "rustls 0.20.2", + "rustls", + "sdk", "serde", "serde_dhall", "serde_json", @@ -991,21 +963,6 @@ dependencies = [ "instant", ] -[[package]] -name = "foreign-types" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" -dependencies = [ - "foreign-types-shared", -] - -[[package]] -name = "foreign-types-shared" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" - [[package]] name = "form_urlencoded" version = "1.0.1" @@ -1308,12 +1265,6 @@ dependencies = [ "cfg-if 1.0.0", ] -[[package]] -name = "intmap" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e50930385956f6c4a0b99f3dd654adcc40788456c36e17c5b20e1d1ceb523ec6" - [[package]] name = "inventory" version = "0.1.10" @@ -1542,24 +1493,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "native-tls" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48ba9f7719b5a0f42f338907614285fb5fd70e53858141f69898a1fb7203b24d" -dependencies = [ - "lazy_static", - "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework", - "security-framework-sys", - "tempfile", -] - [[package]] name = "nom" version = "5.1.2" @@ -1624,39 +1557,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" -[[package]] -name = "openssl" -version = "0.10.36" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d9facdb76fec0b73c406f125d44d86fdad818d66fef0531eec9233ca425ff4a" -dependencies = [ - "bitflags", - "cfg-if 1.0.0", - "foreign-types", - "libc", - "once_cell", - "openssl-sys", -] - -[[package]] -name = "openssl-probe" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a" - -[[package]] -name = "openssl-sys" -version = "0.9.67" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69df2d8dfc6ce3aaf44b40dec6f487d5a886516cf6879c49e98e0710f310a058" -dependencies = [ - "autocfg", - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "parking" version = "2.0.0" @@ -2165,20 +2065,8 @@ dependencies = [ "base64", "log", "ring", - "sct 0.6.1", - "webpki 0.21.4", -] - -[[package]] -name = "rustls" -version = "0.20.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d37e5e2290f3e040b594b1a9e04377c2c671f1a1cfd9bfdef82106ac1c113f84" -dependencies = [ - "log", - "ring", - "sct 0.7.0", - "webpki 0.22.0", + "sct", + "webpki", ] [[package]] @@ -2196,16 +2084,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "schannel" -version = "0.1.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" -dependencies = [ - "lazy_static", - "winapi", -] - [[package]] name = "scopeguard" version = "1.1.0" @@ -2222,19 +2100,13 @@ dependencies = [ "untrusted", ] -[[package]] -name = "sct" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "sdk" version = "0.1.0" +dependencies = [ + "async-trait", + "futures-util", +] [[package]] name = "seahash" @@ -2242,29 +2114,6 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" -[[package]] -name = "security-framework" -version = "2.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "525bc1abfda2e1998d152c45cf13e696f76d0a4972310b22fac1658b05df7c87" -dependencies = [ - "bitflags", - "core-foundation", - "core-foundation-sys", - "libc", - "security-framework-sys", -] - -[[package]] -name = "security-framework-sys" -version = "2.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9dd14d83160b528b7bfd66439110573efcfbe281b17fc2ca9f39f550d619c7e" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "semver" version = "1.0.4" @@ -2733,12 +2582,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "vcpkg" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" - [[package]] name = "vec_map" version = "0.8.2" @@ -2860,16 +2703,6 @@ dependencies = [ "untrusted", ] -[[package]] -name = "webpki" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "wepoll-ffi" version = "0.1.2" diff --git a/Cargo.toml b/Cargo.toml index 5b46990..5666a4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,12 +25,13 @@ path = "bin/bffhd/main.rs" [dependencies] libc = "0.2.101" + +sdk = { path = "modules/sdk", default-features = false } lazy_static = "1.4.0" uuid = { version = "0.8.2", features = ["serde", "v4"] } async-trait = "0.1.51" -async-native-tls = "0.3" -intmap = "0.7" pin-utils = "0.1.0" +futures-util = "0.3" # Runtime executor = { path = "runtime/executor" } @@ -80,7 +81,7 @@ serde_json = "1.0" once_cell = "1.8" -rustls = "0.20" +rustls = "~0.19" async-rustls = "0.2" [dev-dependencies] diff --git a/bffhd/actors/mod.rs b/bffhd/actors/mod.rs new file mode 100644 index 0000000..193cc7b --- /dev/null +++ b/bffhd/actors/mod.rs @@ -0,0 +1,99 @@ +use std::cell::Cell; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use futures_signals::signal::{MutableSignalRef, ReadOnlyMutable, Signal}; +use futures_util::future::BoxFuture; +use crate::resource::state::State; + +pub trait Actor { + fn apply(&mut self, state: State) -> BoxFuture<'static, ()>; +} + +fn loader>(cell: &Cell>) -> Option { + cell.take() +} + +pub struct ActorDriver { + rx: MutableSignalRef>, &'static dyn Fn(&Cell>) -> Option>, + signal: Option, + + actor: Box, + future: Option>, +} + +impl> ActorDriver +{ + pub fn new(rx: &ReadOnlyMutable>>, actor: Box) + -> Self + { + let f: &'static dyn Fn(&Cell>) -> Option = &loader; + let rx = rx.signal_ref(f); + Self { + rx, + signal: None, + actor, + future: None, + } + } +} + + +impl Future for ActorDriver + where S: Signal + Unpin + Send, +{ + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let mut done = false; // Is the channel with new state-signals exhausted? + + match Pin::new(&mut self.rx).poll_change(cx) { + Poll::Ready(Some(new_signal)) => { self.signal = new_signal; }, + Poll::Ready(None) => done = true, + Poll::Pending => {}, + } + + // Work until there is no more work to do. + loop { + + // Poll the `apply` future. And ensure it's completed before the next one is started + match self.future.as_mut() + .map(|future| Future::poll(Pin::new(future), cx)) + { + // Skip and poll for a new future to do + None => { } + + // This apply future is done, get a new one + Some(Poll::Ready(_)) => self.future = None, + + // This future would block so we return to continue work another time + Some(Poll::Pending) => return Poll::Pending, + } + + // Poll the signal and apply any change that happen to the inner Actuator + match self.signal.as_mut() + .map(|inner| S::poll_change(Pin::new(inner), cx)) + { + // No signal to poll + None => return Poll::Pending, + Some(Poll::Pending) => return Poll::Pending, + Some(Poll::Ready(None)) => { + self.signal = None; + + if done { + return Poll::Ready(()); + } else { + return Poll::Pending; + } + }, + Some(Poll::Ready(Some(state))) => { + // This future MUST be polled before we exit from the Actor::poll because if we + // do not do that it will not register the dependency and thus NOT BE POLLED. + let f = self.actor.apply(state); + self.future.replace(f); + } + } + } + } +} + diff --git a/bffhd/error.rs b/bffhd/error.rs index 9ec0b38..d07ed9a 100644 --- a/bffhd/error.rs +++ b/bffhd/error.rs @@ -1,12 +1,8 @@ use std::io; use std::fmt; - use rsasl::SaslError; - use crate::db::DBError; -//FIXME use crate::network; - #[derive(Debug)] /// Shared error type pub enum Error { @@ -71,4 +67,6 @@ impl From for Error { fn from(e: DBError) -> Error { Error::DB(e) } -} \ No newline at end of file +} + +pub type Result = std::result::Result; \ No newline at end of file diff --git a/bffhd/initiators/mod.rs b/bffhd/initiators/mod.rs new file mode 100644 index 0000000..1996985 --- /dev/null +++ b/bffhd/initiators/mod.rs @@ -0,0 +1,138 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use async_channel as channel; +use async_oneshot as oneshot; +use futures_signals::signal::Signal; +use futures_util::future::BoxFuture; +use smol::future::FutureExt; +use sdk::initiators::{Initiator, InitiatorError, UpdateError, UpdateSink, UserID, ResourceID}; +use crate::resource::{Error, Update}; + +#[derive(Clone)] +pub struct BffhUpdateSink { + tx: channel::Sender<(Option, sdk::initiators::State)>, + rx: channel::Receiver>, +} + +#[async_trait::async_trait] +impl UpdateSink for BffhUpdateSink { + async fn send(&mut self, userid: Option, state: sdk::initiators::State) + -> Result<(), UpdateError> + { + if let Err(_e) = self.tx.send((userid, state)).await { + return Err(UpdateError::Closed); + } + + match self.rx.recv().await { + Ok(Ok(())) => Ok(()), + Ok(Err(Error::Denied)) => Err(UpdateError::Denied), + Ok(Err(Error::Internal(e))) => Err(UpdateError::Other(e)), + // RecvError is send only when the channel is closed + Err(_) => Err(UpdateError::Closed), + } + } +} + +impl BffhUpdateSink { + fn new(tx: channel::Sender<(Option, sdk::initiators::State)>, + rx: channel::Receiver>) + -> Self + { + Self { tx, rx } + } +} + +struct Resource; +pub struct InitiatorDriver { + resource_signal: S, + resource: Option>, + error_channel: Option>, + + initiator: I, + initiator_future: Option>>>, + update_sink: BffhUpdateSink, + initiator_req_rx: channel::Receiver<(Option, sdk::initiators::State)>, + initiator_reply_tx: channel::Sender>, +} + +pub struct ResourceSink { + pub id: ResourceID, + pub state_sink: channel::Sender, +} + +impl, I: Initiator> InitiatorDriver { + pub fn new(resource_signal: S, initiator: I) -> Self { + let (initiator_reply_tx, initiator_reply_rx) = channel::bounded(1); + let (initiator_req_tx, initiator_req_rx) = async_channel::bounded(1); + let update_sink = BffhUpdateSink::new(initiator_req_tx, initiator_reply_rx); + Self { + resource: None, + resource_signal, + error_channel: None, + + initiator, + initiator_future: None, + update_sink, + initiator_req_rx, + initiator_reply_tx, + } + } +} + +impl + Unpin, I: Initiator + Unpin> Future for InitiatorDriver { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.resource_signal).poll_change(cx) { + Poll::Ready(Some(resource)) => { + self.resource = Some(resource.state_sink); + self.error_channel = None; + let f = Box::pin(self.initiator.start_for(resource.id)); + self.initiator_future.replace(f); + }, + Poll::Ready(None) => self.resource = None, + Poll::Pending => {} + } + + // do while there is work to do + while { + // First things first: + // If we've send an update to the resource in question we have error channel set, so + // we poll that first to determine if the resource has acted on it yet. + if let Some(ref mut errchan) = self.error_channel { + match errchan.poll(cx) { + // In case there's an ongoing + Poll::Pending => return Poll::Pending, + Poll::Ready(Ok(error)) => { + self.error_channel = None; + self.initiator_reply_tx.send(Err(error)); + } + Poll::Ready(Err(_closed)) => { + // Error channel was dropped which means there was no error + self.error_channel = None; + self.initiator_reply_tx.send(Ok(())); + } + } + } + + if let Some(ref mut init_fut) = self.initiator_future { + match init_fut.poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Ok(())) => {}, + Poll::Ready(Err(_e)) => { + // TODO: Log initiator error here + } + } + } else if let Some(ref mut resource) = self.resource { + let mut s = self.update_sink.clone(); + let f = self.initiator.run(&mut s); + self.initiator_future.replace(f); + } + + self.error_channel.is_some() + } {} + + Poll::Ready(()) + } +} \ No newline at end of file diff --git a/bffhd/lib.rs b/bffhd/lib.rs index 57702ea..7a99a1e 100644 --- a/bffhd/lib.rs +++ b/bffhd/lib.rs @@ -26,6 +26,12 @@ pub mod users; pub mod resource; pub mod resources; +pub mod actors; + +pub mod initiators; + +pub mod sensors; + pub mod server; pub mod utils; \ No newline at end of file diff --git a/bffhd/resource/claim.rs b/bffhd/resource/claim.rs index 38dcd3e..1c1b0cc 100644 --- a/bffhd/resource/claim.rs +++ b/bffhd/resource/claim.rs @@ -1,6 +1,28 @@ +use std::sync::Arc; use async_channel::Sender; +use lmdb::Environment; use crate::resource::Update; +#[derive(Clone, Debug)] +/// Database of currently valid claims, interests and notify, as far as applicable +pub struct ClaimDB { + env: Arc, +} + +pub type UserID = String; +pub type ResourceID = String; +pub struct ClaimEntry { + subject: UserID, + target: ResourceID, + level: Level, +} + +enum Level { + Claim(Claim), + Interest(Interest), + Notify(Notify), +} + #[derive(Debug)] /// A claim on a resource grants permission to update state /// diff --git a/bffhd/resource/mod.rs b/bffhd/resource/mod.rs index 4eb6965..fd1e337 100644 --- a/bffhd/resource/mod.rs +++ b/bffhd/resource/mod.rs @@ -40,7 +40,7 @@ pub mod claim; /// - Validating updates semantically i.e. are the types correct /// - Check authorization of updates i.e. is this user allowed to do that #[async_trait] -pub trait Resource: Debug { +pub trait ResourceModel: Debug { /// Run whatever internal logic this resource has for the given State update, and return the /// new output state that this update produces. async fn on_update(&mut self, input: &State) -> Result; @@ -50,7 +50,7 @@ pub trait Resource: Debug { #[derive(Debug)] pub struct Passthrough; #[async_trait] -impl Resource for Passthrough { +impl ResourceModel for Passthrough { async fn on_update(&mut self, input: &State) -> Result { Ok(input.clone()) } @@ -61,7 +61,7 @@ impl Resource for Passthrough { /// Error type a resource implementation can produce #[derive(Debug)] pub enum Error { - Internal(Box), + Internal(Box), Denied, } @@ -75,7 +75,7 @@ pub struct Update { #[derive(Debug)] pub struct ResourceDriver { // putput - res: Box, + res: Box, // input rx: Receiver, diff --git a/bffhd/server/tls.rs b/bffhd/sensors/mod.rs similarity index 100% rename from bffhd/server/tls.rs rename to bffhd/sensors/mod.rs diff --git a/bffhd/server/authentication.rs b/bffhd/server/authentication.rs index 51ecf11..e6ec2e9 100644 --- a/bffhd/server/authentication.rs +++ b/bffhd/server/authentication.rs @@ -1,11 +1,12 @@ use api::utils::l10n_string; +use crate::error; use std::ops::Deref; use capnp::capability::Promise; use capnp::Error; use capnp_rpc::pry; -use rsasl::{rsasl_err_to_str, SASL, Session as SaslSession, Property, ReturnCode}; +use rsasl::{rsasl_err_to_str, SASL, Session as SaslSession, Property, ReturnCode, RSASL, DiscardOnDrop, Mechanisms}; use rsasl::session::Step::{Done, NeedsMore}; use api::auth::authentication::{ @@ -21,13 +22,50 @@ use api::auth::response::{ }; use crate::users::{UserDB, PassDB}; +#[derive(Debug)] +pub struct AuthenticationProvider { + sasl: RSASL, +} + +impl AuthenticationProvider { + pub fn new() -> error::Result { + let sasl = SASL::new()?; + Ok(Self { sasl }) + } + + pub fn mechanisms(&self) -> error::Result { + Ok(self.sasl.server_mech_list()?) + } + + pub fn try_start_session(&mut self, mechanism: &str) -> error::Result { + let session = self.sasl.server_start(mechanism)?; + Ok(Authentication { + state: State::Running(session), + }) + } + + pub fn bad_mechanism(&self) -> Authentication { + Authentication { + state: State::InvalidMechanism, + } + } + + pub fn start_session(&mut self, mechanism: &str) -> Authentication { + self.try_start_session(mechanism) + .unwrap_or_else(|_| self.bad_mechanism()) + } +} + +#[derive(Debug)] struct Callback; +#[derive(Debug)] struct AppData { userdb: UserDB, passdb: PassDB, } +#[derive(Debug)] struct SessionData; impl rsasl::Callback for Callback { @@ -67,38 +105,43 @@ impl rsasl::Callback for Callback { } } +#[derive(Debug)] pub struct Authentication { state: State, } +#[derive(Debug)] enum State { InvalidMechanism, Finished, Aborted, - Running(SaslSession) + Running(DiscardOnDrop>) } impl Server for Authentication { fn step(&mut self, params: StepParams, mut results: StepResults) -> Promise<(), Error> { use State::*; - match self.state { + let new = match self.state { InvalidMechanism => { let builder = results.get(); let mut b = builder.init_error(); b.set_reason(Reason::BadMechanism); b.set_action(Action::Permanent); + None }, Finished => { let builder = results.get(); let mut b = builder.init_error(); b.set_reason(Reason::Finished); b.set_action(Action::Permanent); + None }, Aborted => { let builder = results.get(); let mut b = builder.init_error(); b.set_reason(Reason::Aborted); b.set_action(Action::Permanent); + None }, Running(ref mut session) => { // TODO: If null what happens? @@ -114,23 +157,31 @@ impl Server for Authentication { let mut session_builder = b.init_session(); let session = super::session::Session::new(); session.build(&mut session_builder); + Some(State::Finished) }, Ok(NeedsMore(data)) => { builder.set_challenge(data.deref()); + None }, Err(_) => { let mut b = builder.init_error(); b.set_reason(Reason::Aborted); b.set_action(Action::Permanent); + Some(State::Aborted) } } } + }; + + if let Some(new) = new { + std::mem::replace(&mut self.state, new); } Promise::ok(()) } fn abort(&mut self, _: AbortParams, _: AbortResults) -> Promise<(), Error> { + std::mem::replace(&mut self.state, State::Aborted); Promise::ok(()) } } diff --git a/bffhd/server/mod.rs b/bffhd/server/mod.rs index 0f93994..e56c9bd 100644 --- a/bffhd/server/mod.rs +++ b/bffhd/server/mod.rs @@ -1,7 +1,17 @@ +use std::future::Future; +use futures_util::future::FutureExt; +use async_rustls::TlsStream; use capnp::capability::Promise; use capnp::Error; +use capnp_rpc::rpc_twoparty_capnp::Side; +use capnp_rpc::RpcSystem; +use capnp_rpc::twoparty::VatNetwork; +use smol::io::{AsyncRead, AsyncWrite}; + +use crate::error::Result; use api::bootstrap::{ + Client, Server, MechanismsParams, MechanismsResults, @@ -9,12 +19,36 @@ use api::bootstrap::{ CreateSessionResults }; -mod tls; mod authentication; mod session; mod users; mod resources; +#[derive(Debug)] +pub struct APIHandler { + +} + +impl APIHandler { + pub fn handle(&mut self, stream: TlsStream) + -> impl Future> + { + let (mut reader, mut writer) = smol::io::split(stream); + + let bootstrap = ApiSystem {}; + let rpc: Client = capnp_rpc::new_client(bootstrap); + let network = VatNetwork::new( + reader, + writer, + Side::Server, + Default::default(), + ); + let rpc_system = RpcSystem::new(Box::new(network), Some(rpc.client)); + + rpc_system.map(|r| r.map_err(Into::into)) + } +} + #[derive(Debug)] /// Cap'n Proto API Handler struct ApiSystem { diff --git a/modules/sdk/Cargo.toml b/modules/sdk/Cargo.toml index 7fb36fa..9cb2c61 100644 --- a/modules/sdk/Cargo.toml +++ b/modules/sdk/Cargo.toml @@ -6,3 +6,5 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = "0.1.51" +futures-util = "0.3" diff --git a/modules/sdk/src/initiators/mod.rs b/modules/sdk/src/initiators/mod.rs new file mode 100644 index 0000000..ac9dfca --- /dev/null +++ b/modules/sdk/src/initiators/mod.rs @@ -0,0 +1,35 @@ +use async_trait::async_trait; +use futures_util::future::BoxFuture; + +pub struct State; +pub struct UserID; +pub struct ResourceID; +pub struct Error; + +pub enum UpdateError { + /// We're not connected to anything anymore. You can't do anything about this error and the + /// only reason why you even get it is because your future was called a last time before + /// being shelved so best way to handle this error is to just return from your loop entirely, + /// cleaning up any state that doesn't survive a freeze. + Closed, + + Denied, + + Other(Box), +} + +#[async_trait] +pub trait UpdateSink: Send { + async fn send(&mut self, userid: Option, state: State) -> Result<(), UpdateError>; +} + +pub trait InitiatorError: std::error::Error + Send { +} + +pub trait Initiator { + fn start_for(&mut self, machine: ResourceID) + -> BoxFuture<'static, Result<(), Box>>; + + fn run(&mut self, request: &mut impl UpdateSink) + -> BoxFuture<'static, Result<(), Box>>; +} diff --git a/modules/sdk/src/lib.rs b/modules/sdk/src/lib.rs index 31e1bb2..e750ce4 100644 --- a/modules/sdk/src/lib.rs +++ b/modules/sdk/src/lib.rs @@ -1,3 +1,7 @@ +#[forbid(private_in_public)] + +pub mod initiators; + #[cfg(test)] mod tests { #[test]