From f2cdb78b9070fcdb21b2d9492e8cc2dea574990a Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Mon, 2 Nov 2020 14:56:45 +0100 Subject: [PATCH] Start rewrite --- Cargo.toml | 2 +- src/app.rs | 24 +++-------- src/main.rs | 30 ++++++++++++-- src/{schema.rs => schema/mod.rs} | 68 +++++++++++++++++++++----------- 4 files changed, 80 insertions(+), 44 deletions(-) rename src/{schema.rs => schema/mod.rs} (70%) diff --git a/Cargo.toml b/Cargo.toml index 2179424..a847a66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ signal-hook = "0.1" slog = "2.5" libc = "0.2" -rsasl = "0.1" +rsasl = "0.2.3" [build-dependencies] capnpc = "0.13" diff --git a/src/app.rs b/src/app.rs index 03ae9ac..f71997b 100644 --- a/src/app.rs +++ b/src/app.rs @@ -26,13 +26,8 @@ pub enum Window { Help, } -enum ConnState { - Connecting(F), - Connected(Api) -} - /// Application state struct -pub struct Sute<'a, S, F> { +pub struct Sute<'a, S> { // TODO: BE SMART. Inputs change the state, resize signals change the state, futures completing // change the state. @@ -41,12 +36,12 @@ pub struct Sute<'a, S, F> { statesig: MutableSignalCloned>, signal: S, inputs: Inputs, - api: Option>, + api: Option, new: bool } -impl<'a, S: Unpin, F: Unpin> Sute<'a, S, F> { - pub fn new(s: S, log: Arc>, api: F) -> Self { +impl<'a, S: Unpin> Sute<'a, S> { + pub fn new(s: S, log: Arc>, api: Api) -> Self { let inputs = Inputs::new(); let state = Mutable::new(SuteState::new(log)); @@ -55,7 +50,7 @@ impl<'a, S: Unpin, F: Unpin> Sute<'a, S, F> { state: state, signal: s, inputs: inputs, - api: Some(ConnState::Connecting(api)), + api: Some(api), new: true, } } @@ -85,7 +80,7 @@ impl<'a, S: Unpin, F: Unpin> Sute<'a, S, F> { } } -impl<'a, S: Signal + Unpin, F: Future + Unpin> Signal for Sute<'a, S, F> { +impl<'a, S: Signal + Unpin> Signal for Sute<'a, S> { type Item = SuteState<'a>; fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { if let Poll::Ready(Some(key)) = Pin::new(&mut self.inputs).poll_next(cx) { @@ -94,13 +89,6 @@ impl<'a, S: Signal + Unpin, F: Future + Unpin> Signa if let Poll::Ready(Some(size)) = Pin::new(&mut self.signal).poll_change(cx) { self.handle_resize(size); } - if let Some(ConnState::Connecting(mut apif)) = self.api.take() { - if let Poll::Ready(api) = Pin::new(&mut apif).poll(cx) { - self.api = Some(ConnState::Connected(api)); - } else { - self.api = Some(ConnState::Connecting(apif)); - } - } // TODO chunk this? Pin::new(&mut self.statesig).poll_change(cx) diff --git a/src/main.rs b/src/main.rs index f4bf134..25e1be5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,8 @@ extern crate slog; use std::io; use std::sync::{Arc, Mutex}; +use smol::net::TcpStream; + use tui::backend::{Backend, TermionBackend}; use tui::Terminal; use termion::raw::IntoRawMode; @@ -45,7 +47,7 @@ fn main() -> Result<(), io::Error> { .takes_value(true)) .get_matches(); - let server = matches.value_of("server").unwrap_or("localhost"); + let server = matches.value_of("server").unwrap_or("localhost:59661"); let stdout = io::stdout().into_raw_mode()?; let backend = TermionBackend::new(stdout); @@ -61,8 +63,30 @@ fn main() -> Result<(), io::Error> { let log = slog::Logger::root(slog::Fuse::new(drain.clone()), o!()); let resize = util::Resize::new()?; - let api = schema::Api::connect(log.clone(), server); - let app = app::Sute::new(resize, drain, Box::pin(api)); + let lex = smol::LocalExecutor::new(); + let stream_f = async move { + TcpStream::connect(server).await.unwrap() + }; + + let stream = smol::block_on(lex.run(stream_f)); + + let (f1, mut api) = schema::Api::from_stream(stream); + + lex.spawn(f1).detach(); + + let i = log.clone(); + let f = async { + println!("API ready"); + let mut auth = api.authentication().await; + println!("AUTH ready: {:?}", &auth); + let mechs = auth.mechanisms().await; + println!("MECHS ready: {:?}", &mechs); + for mech in mechs { + println!("{}", mech); + } + }; + + let app = app::Sute::new(resize, drain, api); crit!(log, "This is a test: {}", 42); error!(log, "This is a test: {}", 42); diff --git a/src/schema.rs b/src/schema/mod.rs similarity index 70% rename from src/schema.rs rename to src/schema/mod.rs index 238849f..be75f82 100644 --- a/src/schema.rs +++ b/src/schema/mod.rs @@ -1,3 +1,5 @@ +use std::fmt; +use std::any::Any; use std::ffi::CStr; use slog::Logger; @@ -25,35 +27,33 @@ mod api_capnp { const PLAIN: *const libc::c_char = b"PLAIN" as *const u8 as *const libc::c_char; -pub struct Api { - stream: TcpStream, - bffh: connection_capnp::bootstrap::Client, +pub struct API { + inner: connection_capnp::bootstrap::Client, + log: Logger, } impl Api { - pub fn new(stream: TcpStream, bffh: connection_capnp::bootstrap::Client) -> Self { - Self { stream, bffh } + fn new(log: Logger, inner: connection_capnp::bootstrap::Client) -> Self { + Self { inner} } - pub fn connect(log: Logger, addr: A) -> impl Future { - let f = async { - let mut stream = TcpStream::connect(addr).await.unwrap(); + pub fn from_stream(stream: TcpStream) -> (impl Future, Self) { + let network = Box::new(twoparty::VatNetwork::new(stream.clone(), stream.clone(), + rpc_twoparty_capnp::Side::Client, Default::default())); - handshake(log.clone(), &mut stream).await.unwrap(); - let network = Box::new(twoparty::VatNetwork::new(stream.clone(), stream.clone(), - rpc_twoparty_capnp::Side::Client, Default::default())); + let mut rpc_system = RpcSystem::new(network, None); + let bffh: connection_capnp::bootstrap::Client + = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); - let mut rpc_system = RpcSystem::new(network, None); - let bffh: connection_capnp::bootstrap::Client - = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); + (rpc_system, Api::new(bffh)) + } - let mut api = Api::new(stream, bffh); - - api.authenticate(log).await; - - api - }; - - f + pub async fn authentication(&mut self) -> Authentication { + let req = self.bffh.auth_request().send().promise; + // TODO: When's that an Err? + let res = req.await.unwrap(); + // TODO: When's that an Err? + let tmp = res.get().unwrap(); + Authentication::new(tmp.get_auth().unwrap()) } async fn authenticate(&mut self, log: Logger) { @@ -114,6 +114,30 @@ impl Api { //} } +pub struct Authentication { + inner: auth_capnp::authentication::Client, +} +impl fmt::Debug for Authentication { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Authentication") + .field("inner", &self.inner.type_id()) + .finish() + } +} + +impl Authentication { + pub fn new(inner: auth_capnp::authentication::Client) -> Self { + Self { inner } + } + + pub async fn mechanisms(&mut self) -> Vec { + let req = self.inner.mechanisms_request().send().promise; + let res = req.await.unwrap(); + let tmp = res.get().unwrap(); + tmp.get_mechs().unwrap().iter().map(|x| x.unwrap().to_string()).collect() + } +} + async fn handshake(log: Logger, mut stream: &mut TcpStream) -> Result<(), io::Error> { let host = "localhost"; let program = format!("{}-{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"));