diff --git a/Cargo.toml b/Cargo.toml index a847a66..dbf60d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,5 +31,7 @@ slog = "2.5" libc = "0.2" rsasl = "0.2.3" +linefeed = "0.6" + [build-dependencies] capnpc = "0.13" diff --git a/src/app.rs b/src/app.rs index f71997b..47073de 100644 --- a/src/app.rs +++ b/src/app.rs @@ -3,14 +3,15 @@ use std::task::{Context, Poll}; use std::sync::{Arc, Mutex, MutexGuard}; use futures::prelude::*; -use futures_signals::signal::{Mutable, Signal, MutableSignalCloned}; +use futures_signals::signal::{Mutable, Signal, MutableSignalCloned, MutableLockMut}; -use termion::event::Key; +use linefeed::memory::MemoryTerminal; use crate::input::Inputs; -use crate::schema::Api; +use crate::schema::API; use slog::{ + Logger, Drain, Level, Record, @@ -36,14 +37,14 @@ pub struct Sute<'a, S> { statesig: MutableSignalCloned>, signal: S, inputs: Inputs, - api: Option, - new: bool + api: Option, + log: Logger, } impl<'a, S: Unpin> Sute<'a, S> { - pub fn new(s: S, log: Arc>, api: Api) -> Self { - let inputs = Inputs::new(); - let state = Mutable::new(SuteState::new(log)); + pub fn new(term: MemoryTerminal, s: S, log: Logger, drain: Arc>, api: API) -> Self { + let inputs = Inputs::new(term); + let state = Mutable::new(SuteState::new(drain)); Self { statesig: state.signal_cloned(), @@ -51,30 +52,18 @@ impl<'a, S: Unpin> Sute<'a, S> { signal: s, inputs: inputs, api: Some(api), - new: true, + log: log, } } + fn run_cmd(&mut self, cmd: String) { + + } + fn handle_resize(&mut self, new_size: (u16,u16)) { (self.state.lock_mut()).size = new_size; } - fn handle_input(&mut self, key: Key) { - // TODO modify signal implementation so we don't have to modify the state on all ticks. - let mut state = self.state.lock_mut(); - state.tick = state.tick + 1; - match key { - Key::Char('q') => state.running = false, - Key::Char('?') => state.active_win = Window::Help, - Key::Esc => { - if state.active_win == Window::Help { - state.active_win = Window::Main; - } - } - _ => {} - } - } - pub fn get_state(&self) -> SuteState<'a> { self.state.get_cloned() } @@ -83,8 +72,14 @@ impl<'a, S: Unpin> Sute<'a, S> { impl<'a, S: Signal + Unpin> Signal for Sute<'a, S> { type Item = SuteState<'a>; fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - if let Poll::Ready(Some(key)) = Pin::new(&mut self.inputs).poll_next(cx) { - self.handle_input(key); + match Pin::new(&mut self.inputs).poll_next(cx) { + Poll::Ready(Some(line)) => self.run_cmd(line), + // If the input closes stop the program + Poll::Ready(None) => { + self.state.lock_mut().running = false; + return Poll::Ready(None); + }, + Poll::Pending => { }, } if let Poll::Ready(Some(size)) = Pin::new(&mut self.signal).poll_change(cx) { self.handle_resize(size); @@ -95,14 +90,33 @@ impl<'a, S: Signal + Unpin> Signal for Sute<'a, S> { } } +impl <'a, S> Future for Sute<'a, S> { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + if self.state.lock_ref().running { + Poll::Pending + } else { + Poll::Ready(()) + } + } +} + +#[derive(Debug, Clone)] +// TODO: `Signal` struct changes using this struct? +// TODO: If so, procmacro here? +enum SuteStateChange { + active_win(Window), +} + #[derive(Debug, Clone)] pub struct SuteState<'a> { pub active_win: Window, pub size: (u16,u16), pub running: bool, pub tick: usize, - pub server: Option, + pub server: Option<&'a str>, pub log: Arc>, + pub cmd_line: String, } impl<'a> SuteState<'a> { @@ -114,6 +128,7 @@ impl<'a> SuteState<'a> { tick: 0, server: None, log: log, + cmd_line: String::new(), } } } diff --git a/src/input.rs b/src/input.rs index 49e23b5..a7f7d84 100644 --- a/src/input.rs +++ b/src/input.rs @@ -3,32 +3,28 @@ use std::thread; use std::pin::Pin; use std::task::{Context, Poll}; -use termion::event::Key; -use termion::input::TermRead; +use linefeed::{Interface, ReadResult}; +use linefeed::memory::MemoryTerminal; use futures::Stream; use futures::channel::mpsc; use futures::SinkExt; pub struct Inputs { - rx: mpsc::Receiver, + rx: mpsc::Receiver, hndl: thread::JoinHandle<()>, } impl Inputs { - pub fn new() -> Self { + pub fn new(term: MemoryTerminal) -> Self { let (mut tx, rx) = mpsc::channel(64); let hndl = thread::spawn(move || { let stdin = io::stdin(); - let keys = stdin.keys(); - for key in keys { - if key.is_err() { - break; - } - if let Err(_) = smol::block_on(tx.send(key.unwrap())) { - break; // and thus stop the thread - } + let mut reader = Interface::with_term("sute", term).unwrap(); + reader.set_prompt("> ").unwrap(); + while let ReadResult::Input(line) = reader.read_line().unwrap() { + smol::block_on(tx.send(line)); } }); @@ -37,7 +33,7 @@ impl Inputs { } impl Stream for Inputs { - type Item = Key; + type Item = String; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { Pin::new(&mut self.rx).poll_next(cx) diff --git a/src/main.rs b/src/main.rs index 25e1be5..c5c2814 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,9 @@ extern crate slog; use std::io; use std::sync::{Arc, Mutex}; +use std::thread; +use smol::Task; use smol::net::TcpStream; use tui::backend::{Backend, TermionBackend}; @@ -27,6 +29,8 @@ mod schema; use banner::BANNER; +static DEFAULT_SERVER: &'static str = "localhost:59661"; + fn main() -> Result<(), io::Error> { let matches = App::new("sute 🌸") @@ -47,70 +51,61 @@ fn main() -> Result<(), io::Error> { .takes_value(true)) .get_matches(); - let server = matches.value_of("server").unwrap_or("localhost:59661"); - - let stdout = io::stdout().into_raw_mode()?; - let backend = TermionBackend::new(stdout); - let mut terminal = Terminal::new(backend)?; - terminal.hide_cursor()?; - - // Refresh the screen once by resizing the terminal - if let Ok((x,y)) = termion::terminal_size() { - terminal.resize(tui::layout::Rect::new(0, 0, x,y)).unwrap(); - } + // + let server = matches.value_of("server").unwrap_or(DEFAULT_SERVER); + // Set up logging let drain = Arc::new(app::LogDrain::new()); let log = slog::Logger::root(slog::Fuse::new(drain.clone()), o!()); - let resize = util::Resize::new()?; let lex = smol::LocalExecutor::new(); + + let resize = util::Resize::new()?; let stream_f = async move { TcpStream::connect(server).await.unwrap() }; let stream = smol::block_on(lex.run(stream_f)); - let (f1, mut api) = schema::Api::from_stream(stream); + let (rpc_future, mut api) = schema::bootstrap(log.clone(), stream); - lex.spawn(f1).detach(); - - let i = log.clone(); - let f = async { - println!("API ready"); - let mut auth = api.authentication().await; - println!("AUTH ready: {:?}", &auth); - let mechs = auth.mechanisms().await; - println!("MECHS ready: {:?}", &mechs); - for mech in mechs { - println!("{}", mech); - } - }; - - let app = app::Sute::new(resize, drain, api); - - crit!(log, "This is a test: {}", 42); - error!(log, "This is a test: {}", 42); - warn!(log, "This is a test: {}", 42); - info!(log, "This is a test: {}", 42); - debug!(log, "This is a test: {}", 42); - trace!(log, "This is a test: {}", 42); - - let mut state = app.get_state(); - terminal.draw(|f| ui::draw_ui(f, &mut state))?; + let app = app::Sute::new(term.clone(), resize, log.clone(), drain, api); let mut stream = app.to_stream(); - loop { - if let Some(mut state) = smol::block_on(stream.next()) { - if !state.running { + + let stdout = io::stdout().into_raw_mode()?; + let backend = TermionBackend::new(stdout); + let mut terminal = Terminal::new(backend)?; + terminal.hide_cursor()?; + let ui_future = async move { + + // Refresh the screen once by resizing the terminal + if let Ok((x,y)) = termion::terminal_size() { + terminal.resize(tui::layout::Rect::new(0, 0, x,y)).unwrap(); + } + + loop { + if let Some(mut state) = stream.next().await { + if !state.running { + break; + } + + terminal.draw(|f| ui::draw_ui(f, &mut state))?; + } else { break; } - - terminal.draw(|f| ui::draw_ui(f, &mut state))?; } else { - break; } - } - terminal.show_cursor()?; + // TODO: Ensure that will always be run + terminal.show_cursor()?; + + Ok(()) + }; + + lex.spawn(rpc_future).detach(); + let r: Task> = lex.spawn(ui_future); + //smol::block_on(lex.run(Box::pin(app))); + smol::block_on(r); Ok(()) } diff --git a/src/schema/api.rs b/src/schema/api.rs index a76cb3e..9536c0f 100644 --- a/src/schema/api.rs +++ b/src/schema/api.rs @@ -1,25 +1,22 @@ +use std::fmt; + +use slog::Logger; + +use super::connection_capnp::bootstrap::Client; +use super::Authentication; + pub struct API { - inner: connection_capnp::bootstrap::Client, + inner: Client, log: Logger, } -impl Api { - fn new(log: Logger, inner: connection_capnp::bootstrap::Client) -> Self { - Self { inner} - } - pub fn from_stream(stream: TcpStream) -> (impl Future, Self) { - let network = Box::new(twoparty::VatNetwork::new(stream.clone(), stream.clone(), - rpc_twoparty_capnp::Side::Client, Default::default())); - - let mut rpc_system = RpcSystem::new(network, None); - let bffh: connection_capnp::bootstrap::Client - = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); - - (rpc_system, Api::new(bffh)) +impl API { + pub fn new(log: Logger, inner: Client) -> Self { + Self { log, inner} } pub async fn authentication(&mut self) -> Authentication { - let req = self.bffh.auth_request().send().promise; + let req = self.inner.auth_request().send().promise; // TODO: When's that an Err? let res = req.await.unwrap(); // TODO: When's that an Err? @@ -27,16 +24,6 @@ impl Api { Authentication::new(tmp.get_auth().unwrap()) } - async fn authenticate(&mut self, log: Logger) { - let r = self.bffh.auth_request(); - let auth = r.send().pipeline.get_auth(); - let m = auth.mechanisms_request().send().promise.await.unwrap(); - - for t in m.get().unwrap().get_mechs().unwrap().iter() { - info!(log, "Mechanism {} available", t.unwrap()); - } - } - ///// Authenticate to the server. Returns true on success, false on error //async fn authenticate(&mut self) -> Result { // let mut sasl = SASL::new().unwrap(); diff --git a/src/schema/authentication.rs b/src/schema/authentication.rs index 9f06007..c1202bd 100644 --- a/src/schema/authentication.rs +++ b/src/schema/authentication.rs @@ -1,5 +1,11 @@ +use std::fmt; +use std::any::Any; + +use slog::Logger; +use super::auth_capnp::authentication::Client; + pub struct Authentication { - inner: auth_capnp::authentication::Client, + inner: Client, } impl fmt::Debug for Authentication { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -10,7 +16,7 @@ impl fmt::Debug for Authentication { } impl Authentication { - pub fn new(inner: auth_capnp::authentication::Client) -> Self { + pub fn new(inner: Client) -> Self { Self { inner } } @@ -21,32 +27,3 @@ impl Authentication { tmp.get_mechs().unwrap().iter().map(|x| x.unwrap().to_string()).collect() } } - -async fn handshake(log: Logger, mut stream: &mut TcpStream) -> Result<(), io::Error> { - let host = "localhost"; - let program = format!("{}-{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")); - let version = (0u32,1u32); - - let mut message = capnp::message::Builder::new_default(); - let mut builder = message.init_root::(); - - builder.set_host(host); - builder.set_major(version.0); - builder.set_minor(version.1); - builder.set_program(&program); - - capnp_futures::serialize::write_message(&mut stream, message).await.unwrap(); - if let Some(m) = capnp_futures::serialize::read_message(&mut stream, Default::default()).await.unwrap() { - let greeting = m.get_root::().unwrap(); - let peer_host = greeting.get_host().unwrap(); - let peer_program = greeting.get_program().unwrap(); - let major = greeting.get_major(); - let minor = greeting.get_minor(); - - info!(log, "Peer {} running {} API {}.{}", peer_host, peer_program, major, minor) - } else { - error!(log, "Oh noes"); - } - - Ok(()) -} diff --git a/src/schema/mod.rs b/src/schema/mod.rs index 3ebff42..6bfcccf 100644 --- a/src/schema/mod.rs +++ b/src/schema/mod.rs @@ -1,3 +1,14 @@ +use std::future::Future; +use std::io; + +use slog::Logger; + +use smol::net::TcpStream; + +use capnp_rpc::twoparty; +use capnp_rpc::RpcSystem; +use capnp_rpc::rpc_twoparty_capnp; + mod auth_capnp { include!(concat!(env!("OUT_DIR"), "/schema/auth_capnp.rs")); } @@ -9,4 +20,52 @@ mod api_capnp { } mod api; +pub use api::API; + mod authentication; +pub use authentication::Authentication; + +pub fn bootstrap(log: Logger, stream: TcpStream) -> (impl Future, API) { + debug!(log, "Bootstrapping API…"); + + let network = Box::new(twoparty::VatNetwork::new(stream.clone(), stream, + rpc_twoparty_capnp::Side::Client, Default::default())); + + let mut rpc = RpcSystem::new(network, None); + let client: connection_capnp::bootstrap::Client + = rpc.bootstrap(rpc_twoparty_capnp::Side::Server); + + return (rpc, API::new(log, client)); +} + +async fn handshake(log: &Logger, mut stream: &mut TcpStream) -> Result<(), io::Error> { + let host = "localhost"; + let program = format!("{}-{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")); + let version = (0u32,1u32); + + debug!(log, "Sending handshake as {}@{} speaking API v{}.{}", + program, host, version.0, version.1); + + let mut message = capnp::message::Builder::new_default(); + let mut builder = message.init_root::(); + + builder.set_host(host); + builder.set_major(version.0); + builder.set_minor(version.1); + builder.set_program(&program); + + capnp_futures::serialize::write_message(&mut stream, message).await.unwrap(); + if let Some(m) = capnp_futures::serialize::read_message(&mut stream, Default::default()).await.unwrap() { + let greeting = m.get_root::().unwrap(); + let peer_host = greeting.get_host().unwrap(); + let peer_program = greeting.get_program().unwrap(); + let major = greeting.get_major(); + let minor = greeting.get_minor(); + + info!(log, "Peer {} running {} API {}.{}", peer_host, peer_program, major, minor) + } else { + error!(log, "Oh noes"); + } + + Ok(()) +} diff --git a/src/ui/mod.rs b/src/ui/mod.rs index 0db9e1a..203993b 100644 --- a/src/ui/mod.rs +++ b/src/ui/mod.rs @@ -36,7 +36,7 @@ fn draw_main(f: &mut Frame, app: &mut SuteState, layout_chunk: Re .constraints([Constraint::Percentage(20), Constraint::Percentage(80)].as_ref()) .split(layout_chunk); - f.render_widget(Paragraph::new(app.server.as_ref().map(|s| s.as_str()).unwrap_or("Not connected")), chunk[0]); + f.render_widget(Paragraph::new(app.server.unwrap_or("Not connected")), chunk[0]); draw_logs(f, app, chunk[1]) } @@ -47,8 +47,11 @@ fn draw_logs(f: &mut Frame, app: &mut SuteState, layout_chunk: Re } fn draw_command_line(f: &mut Frame, app: &mut SuteState, layout_chunk: Rect) { - f.render_widget(Block::default() + let block = Block::default() .title("Command line") - .borders(Borders::ALL), layout_chunk); - f.render_widget(Paragraph::new(">"), layout_chunk); + .borders(Borders::ALL); + let inner_rect = block.inner(layout_chunk); + + f.render_widget(block, layout_chunk); + f.render_widget(Paragraph::new(app.cmd_line.as_ref()), inner_rect); }