Term restructure

This commit is contained in:
Gregor Reitzenstein 2020-11-03 11:45:48 +01:00
parent c49ee16c61
commit d0440942b1
8 changed files with 181 additions and 147 deletions

View File

@ -31,5 +31,7 @@ slog = "2.5"
libc = "0.2" libc = "0.2"
rsasl = "0.2.3" rsasl = "0.2.3"
linefeed = "0.6"
[build-dependencies] [build-dependencies]
capnpc = "0.13" capnpc = "0.13"

View File

@ -3,14 +3,15 @@ use std::task::{Context, Poll};
use std::sync::{Arc, Mutex, MutexGuard}; use std::sync::{Arc, Mutex, MutexGuard};
use futures::prelude::*; use futures::prelude::*;
use futures_signals::signal::{Mutable, Signal, MutableSignalCloned}; use futures_signals::signal::{Mutable, Signal, MutableSignalCloned, MutableLockMut};
use termion::event::Key; use linefeed::memory::MemoryTerminal;
use crate::input::Inputs; use crate::input::Inputs;
use crate::schema::Api; use crate::schema::API;
use slog::{ use slog::{
Logger,
Drain, Drain,
Level, Level,
Record, Record,
@ -36,14 +37,14 @@ pub struct Sute<'a, S> {
statesig: MutableSignalCloned<SuteState<'a>>, statesig: MutableSignalCloned<SuteState<'a>>,
signal: S, signal: S,
inputs: Inputs, inputs: Inputs,
api: Option<Api>, api: Option<API>,
new: bool log: Logger,
} }
impl<'a, S: Unpin> Sute<'a, S> { impl<'a, S: Unpin> Sute<'a, S> {
pub fn new(s: S, log: Arc<LogDrain<'a>>, api: Api) -> Self { pub fn new(term: MemoryTerminal, s: S, log: Logger, drain: Arc<LogDrain<'a>>, api: API) -> Self {
let inputs = Inputs::new(); let inputs = Inputs::new(term);
let state = Mutable::new(SuteState::new(log)); let state = Mutable::new(SuteState::new(drain));
Self { Self {
statesig: state.signal_cloned(), statesig: state.signal_cloned(),
@ -51,30 +52,18 @@ impl<'a, S: Unpin> Sute<'a, S> {
signal: s, signal: s,
inputs: inputs, inputs: inputs,
api: Some(api), api: Some(api),
new: true, log: log,
} }
} }
fn run_cmd(&mut self, cmd: String) {
}
fn handle_resize(&mut self, new_size: (u16,u16)) { fn handle_resize(&mut self, new_size: (u16,u16)) {
(self.state.lock_mut()).size = new_size; (self.state.lock_mut()).size = new_size;
} }
fn handle_input(&mut self, key: Key) {
// TODO modify signal implementation so we don't have to modify the state on all ticks.
let mut state = self.state.lock_mut();
state.tick = state.tick + 1;
match key {
Key::Char('q') => state.running = false,
Key::Char('?') => state.active_win = Window::Help,
Key::Esc => {
if state.active_win == Window::Help {
state.active_win = Window::Main;
}
}
_ => {}
}
}
pub fn get_state(&self) -> SuteState<'a> { pub fn get_state(&self) -> SuteState<'a> {
self.state.get_cloned() self.state.get_cloned()
} }
@ -83,8 +72,14 @@ impl<'a, S: Unpin> Sute<'a, S> {
impl<'a, S: Signal<Item=(u16,u16)> + Unpin> Signal for Sute<'a, S> { impl<'a, S: Signal<Item=(u16,u16)> + Unpin> Signal for Sute<'a, S> {
type Item = SuteState<'a>; type Item = SuteState<'a>;
fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if let Poll::Ready(Some(key)) = Pin::new(&mut self.inputs).poll_next(cx) { match Pin::new(&mut self.inputs).poll_next(cx) {
self.handle_input(key); Poll::Ready(Some(line)) => self.run_cmd(line),
// If the input closes stop the program
Poll::Ready(None) => {
self.state.lock_mut().running = false;
return Poll::Ready(None);
},
Poll::Pending => { },
} }
if let Poll::Ready(Some(size)) = Pin::new(&mut self.signal).poll_change(cx) { if let Poll::Ready(Some(size)) = Pin::new(&mut self.signal).poll_change(cx) {
self.handle_resize(size); self.handle_resize(size);
@ -95,14 +90,33 @@ impl<'a, S: Signal<Item=(u16,u16)> + Unpin> Signal for Sute<'a, S> {
} }
} }
impl <'a, S> Future for Sute<'a, S> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if self.state.lock_ref().running {
Poll::Pending
} else {
Poll::Ready(())
}
}
}
#[derive(Debug, Clone)]
// TODO: `Signal` struct changes using this struct?
// TODO: If so, procmacro here?
enum SuteStateChange {
active_win(Window),
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct SuteState<'a> { pub struct SuteState<'a> {
pub active_win: Window, pub active_win: Window,
pub size: (u16,u16), pub size: (u16,u16),
pub running: bool, pub running: bool,
pub tick: usize, pub tick: usize,
pub server: Option<String>, pub server: Option<&'a str>,
pub log: Arc<LogDrain<'a>>, pub log: Arc<LogDrain<'a>>,
pub cmd_line: String,
} }
impl<'a> SuteState<'a> { impl<'a> SuteState<'a> {
@ -114,6 +128,7 @@ impl<'a> SuteState<'a> {
tick: 0, tick: 0,
server: None, server: None,
log: log, log: log,
cmd_line: String::new(),
} }
} }
} }

View File

@ -3,32 +3,28 @@ use std::thread;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use termion::event::Key; use linefeed::{Interface, ReadResult};
use termion::input::TermRead; use linefeed::memory::MemoryTerminal;
use futures::Stream; use futures::Stream;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::SinkExt; use futures::SinkExt;
pub struct Inputs { pub struct Inputs {
rx: mpsc::Receiver<Key>, rx: mpsc::Receiver<String>,
hndl: thread::JoinHandle<()>, hndl: thread::JoinHandle<()>,
} }
impl Inputs { impl Inputs {
pub fn new() -> Self { pub fn new(term: MemoryTerminal) -> Self {
let (mut tx, rx) = mpsc::channel(64); let (mut tx, rx) = mpsc::channel(64);
let hndl = thread::spawn(move || { let hndl = thread::spawn(move || {
let stdin = io::stdin(); let stdin = io::stdin();
let keys = stdin.keys(); let mut reader = Interface::with_term("sute", term).unwrap();
for key in keys { reader.set_prompt("> ").unwrap();
if key.is_err() { while let ReadResult::Input(line) = reader.read_line().unwrap() {
break; smol::block_on(tx.send(line));
}
if let Err(_) = smol::block_on(tx.send(key.unwrap())) {
break; // and thus stop the thread
}
} }
}); });
@ -37,7 +33,7 @@ impl Inputs {
} }
impl Stream for Inputs { impl Stream for Inputs {
type Item = Key; type Item = String;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.rx).poll_next(cx) Pin::new(&mut self.rx).poll_next(cx)

View File

@ -3,7 +3,9 @@ extern crate slog;
use std::io; use std::io;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread;
use smol::Task;
use smol::net::TcpStream; use smol::net::TcpStream;
use tui::backend::{Backend, TermionBackend}; use tui::backend::{Backend, TermionBackend};
@ -27,6 +29,8 @@ mod schema;
use banner::BANNER; use banner::BANNER;
static DEFAULT_SERVER: &'static str = "localhost:59661";
fn main() -> Result<(), io::Error> { fn main() -> Result<(), io::Error> {
let matches = App::new("sute 🌸") let matches = App::new("sute 🌸")
@ -47,70 +51,61 @@ fn main() -> Result<(), io::Error> {
.takes_value(true)) .takes_value(true))
.get_matches(); .get_matches();
let server = matches.value_of("server").unwrap_or("localhost:59661"); //
let server = matches.value_of("server").unwrap_or(DEFAULT_SERVER);
let stdout = io::stdout().into_raw_mode()?;
let backend = TermionBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
terminal.hide_cursor()?;
// Refresh the screen once by resizing the terminal
if let Ok((x,y)) = termion::terminal_size() {
terminal.resize(tui::layout::Rect::new(0, 0, x,y)).unwrap();
}
// Set up logging
let drain = Arc::new(app::LogDrain::new()); let drain = Arc::new(app::LogDrain::new());
let log = slog::Logger::root(slog::Fuse::new(drain.clone()), o!()); let log = slog::Logger::root(slog::Fuse::new(drain.clone()), o!());
let resize = util::Resize::new()?;
let lex = smol::LocalExecutor::new(); let lex = smol::LocalExecutor::new();
let resize = util::Resize::new()?;
let stream_f = async move { let stream_f = async move {
TcpStream::connect(server).await.unwrap() TcpStream::connect(server).await.unwrap()
}; };
let stream = smol::block_on(lex.run(stream_f)); let stream = smol::block_on(lex.run(stream_f));
let (f1, mut api) = schema::Api::from_stream(stream); let (rpc_future, mut api) = schema::bootstrap(log.clone(), stream);
lex.spawn(f1).detach(); let app = app::Sute::new(term.clone(), resize, log.clone(), drain, api);
let i = log.clone();
let f = async {
println!("API ready");
let mut auth = api.authentication().await;
println!("AUTH ready: {:?}", &auth);
let mechs = auth.mechanisms().await;
println!("MECHS ready: {:?}", &mechs);
for mech in mechs {
println!("{}", mech);
}
};
let app = app::Sute::new(resize, drain, api);
crit!(log, "This is a test: {}", 42);
error!(log, "This is a test: {}", 42);
warn!(log, "This is a test: {}", 42);
info!(log, "This is a test: {}", 42);
debug!(log, "This is a test: {}", 42);
trace!(log, "This is a test: {}", 42);
let mut state = app.get_state();
terminal.draw(|f| ui::draw_ui(f, &mut state))?;
let mut stream = app.to_stream(); let mut stream = app.to_stream();
loop {
if let Some(mut state) = smol::block_on(stream.next()) { let stdout = io::stdout().into_raw_mode()?;
if !state.running { let backend = TermionBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
terminal.hide_cursor()?;
let ui_future = async move {
// Refresh the screen once by resizing the terminal
if let Ok((x,y)) = termion::terminal_size() {
terminal.resize(tui::layout::Rect::new(0, 0, x,y)).unwrap();
}
loop {
if let Some(mut state) = stream.next().await {
if !state.running {
break;
}
terminal.draw(|f| ui::draw_ui(f, &mut state))?;
} else {
break; break;
} }
terminal.draw(|f| ui::draw_ui(f, &mut state))?; } else {
break;
} }
}
terminal.show_cursor()?; // TODO: Ensure that will always be run
terminal.show_cursor()?;
Ok(())
};
lex.spawn(rpc_future).detach();
let r: Task<Result<(), io::Error>> = lex.spawn(ui_future);
//smol::block_on(lex.run(Box::pin(app)));
smol::block_on(r);
Ok(()) Ok(())
} }

View File

@ -1,25 +1,22 @@
use std::fmt;
use slog::Logger;
use super::connection_capnp::bootstrap::Client;
use super::Authentication;
pub struct API { pub struct API {
inner: connection_capnp::bootstrap::Client, inner: Client,
log: Logger, log: Logger,
} }
impl Api { impl API {
fn new(log: Logger, inner: connection_capnp::bootstrap::Client) -> Self { pub fn new(log: Logger, inner: Client) -> Self {
Self { inner} Self { log, inner}
}
pub fn from_stream(stream: TcpStream) -> (impl Future, Self) {
let network = Box::new(twoparty::VatNetwork::new(stream.clone(), stream.clone(),
rpc_twoparty_capnp::Side::Client, Default::default()));
let mut rpc_system = RpcSystem::new(network, None);
let bffh: connection_capnp::bootstrap::Client
= rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
(rpc_system, Api::new(bffh))
} }
pub async fn authentication(&mut self) -> Authentication { pub async fn authentication(&mut self) -> Authentication {
let req = self.bffh.auth_request().send().promise; let req = self.inner.auth_request().send().promise;
// TODO: When's that an Err? // TODO: When's that an Err?
let res = req.await.unwrap(); let res = req.await.unwrap();
// TODO: When's that an Err? // TODO: When's that an Err?
@ -27,16 +24,6 @@ impl Api {
Authentication::new(tmp.get_auth().unwrap()) Authentication::new(tmp.get_auth().unwrap())
} }
async fn authenticate(&mut self, log: Logger) {
let r = self.bffh.auth_request();
let auth = r.send().pipeline.get_auth();
let m = auth.mechanisms_request().send().promise.await.unwrap();
for t in m.get().unwrap().get_mechs().unwrap().iter() {
info!(log, "Mechanism {} available", t.unwrap());
}
}
///// Authenticate to the server. Returns true on success, false on error ///// Authenticate to the server. Returns true on success, false on error
//async fn authenticate(&mut self) -> Result<bool, io::Error> { //async fn authenticate(&mut self) -> Result<bool, io::Error> {
// let mut sasl = SASL::new().unwrap(); // let mut sasl = SASL::new().unwrap();

View File

@ -1,5 +1,11 @@
use std::fmt;
use std::any::Any;
use slog::Logger;
use super::auth_capnp::authentication::Client;
pub struct Authentication { pub struct Authentication {
inner: auth_capnp::authentication::Client, inner: Client,
} }
impl fmt::Debug for Authentication { impl fmt::Debug for Authentication {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
@ -10,7 +16,7 @@ impl fmt::Debug for Authentication {
} }
impl Authentication { impl Authentication {
pub fn new(inner: auth_capnp::authentication::Client) -> Self { pub fn new(inner: Client) -> Self {
Self { inner } Self { inner }
} }
@ -21,32 +27,3 @@ impl Authentication {
tmp.get_mechs().unwrap().iter().map(|x| x.unwrap().to_string()).collect() tmp.get_mechs().unwrap().iter().map(|x| x.unwrap().to_string()).collect()
} }
} }
async fn handshake(log: Logger, mut stream: &mut TcpStream) -> Result<(), io::Error> {
let host = "localhost";
let program = format!("{}-{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"));
let version = (0u32,1u32);
let mut message = capnp::message::Builder::new_default();
let mut builder = message.init_root::<connection_capnp::greeting::Builder>();
builder.set_host(host);
builder.set_major(version.0);
builder.set_minor(version.1);
builder.set_program(&program);
capnp_futures::serialize::write_message(&mut stream, message).await.unwrap();
if let Some(m) = capnp_futures::serialize::read_message(&mut stream, Default::default()).await.unwrap() {
let greeting = m.get_root::<connection_capnp::greeting::Reader>().unwrap();
let peer_host = greeting.get_host().unwrap();
let peer_program = greeting.get_program().unwrap();
let major = greeting.get_major();
let minor = greeting.get_minor();
info!(log, "Peer {} running {} API {}.{}", peer_host, peer_program, major, minor)
} else {
error!(log, "Oh noes");
}
Ok(())
}

View File

@ -1,3 +1,14 @@
use std::future::Future;
use std::io;
use slog::Logger;
use smol::net::TcpStream;
use capnp_rpc::twoparty;
use capnp_rpc::RpcSystem;
use capnp_rpc::rpc_twoparty_capnp;
mod auth_capnp { mod auth_capnp {
include!(concat!(env!("OUT_DIR"), "/schema/auth_capnp.rs")); include!(concat!(env!("OUT_DIR"), "/schema/auth_capnp.rs"));
} }
@ -9,4 +20,52 @@ mod api_capnp {
} }
mod api; mod api;
pub use api::API;
mod authentication; mod authentication;
pub use authentication::Authentication;
pub fn bootstrap(log: Logger, stream: TcpStream) -> (impl Future, API) {
debug!(log, "Bootstrapping API…");
let network = Box::new(twoparty::VatNetwork::new(stream.clone(), stream,
rpc_twoparty_capnp::Side::Client, Default::default()));
let mut rpc = RpcSystem::new(network, None);
let client: connection_capnp::bootstrap::Client
= rpc.bootstrap(rpc_twoparty_capnp::Side::Server);
return (rpc, API::new(log, client));
}
async fn handshake(log: &Logger, mut stream: &mut TcpStream) -> Result<(), io::Error> {
let host = "localhost";
let program = format!("{}-{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"));
let version = (0u32,1u32);
debug!(log, "Sending handshake as {}@{} speaking API v{}.{}",
program, host, version.0, version.1);
let mut message = capnp::message::Builder::new_default();
let mut builder = message.init_root::<connection_capnp::greeting::Builder>();
builder.set_host(host);
builder.set_major(version.0);
builder.set_minor(version.1);
builder.set_program(&program);
capnp_futures::serialize::write_message(&mut stream, message).await.unwrap();
if let Some(m) = capnp_futures::serialize::read_message(&mut stream, Default::default()).await.unwrap() {
let greeting = m.get_root::<connection_capnp::greeting::Reader>().unwrap();
let peer_host = greeting.get_host().unwrap();
let peer_program = greeting.get_program().unwrap();
let major = greeting.get_major();
let minor = greeting.get_minor();
info!(log, "Peer {} running {} API {}.{}", peer_host, peer_program, major, minor)
} else {
error!(log, "Oh noes");
}
Ok(())
}

View File

@ -36,7 +36,7 @@ fn draw_main<B: Backend>(f: &mut Frame<B>, app: &mut SuteState, layout_chunk: Re
.constraints([Constraint::Percentage(20), Constraint::Percentage(80)].as_ref()) .constraints([Constraint::Percentage(20), Constraint::Percentage(80)].as_ref())
.split(layout_chunk); .split(layout_chunk);
f.render_widget(Paragraph::new(app.server.as_ref().map(|s| s.as_str()).unwrap_or("Not connected")), chunk[0]); f.render_widget(Paragraph::new(app.server.unwrap_or("Not connected")), chunk[0]);
draw_logs(f, app, chunk[1]) draw_logs(f, app, chunk[1])
} }
@ -47,8 +47,11 @@ fn draw_logs<B: Backend>(f: &mut Frame<B>, app: &mut SuteState, layout_chunk: Re
} }
fn draw_command_line<B: Backend>(f: &mut Frame<B>, app: &mut SuteState, layout_chunk: Rect) { fn draw_command_line<B: Backend>(f: &mut Frame<B>, app: &mut SuteState, layout_chunk: Rect) {
f.render_widget(Block::default() let block = Block::default()
.title("Command line") .title("Command line")
.borders(Borders::ALL), layout_chunk); .borders(Borders::ALL);
f.render_widget(Paragraph::new(">"), layout_chunk); let inner_rect = block.inner(layout_chunk);
f.render_widget(block, layout_chunk);
f.render_widget(Paragraph::new(app.cmd_line.as_ref()), inner_rect);
} }