From dfdeaaefa6774d88df294688d3756ec8dec3af18 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Mon, 11 May 2020 18:21:45 +0200 Subject: [PATCH] Simple message exchange working --- Cargo.toml | 5 ++++- src/api.rs | 1 + src/connection.rs | 48 +++++++++++++++++++++++++++++++++++++++++++++++ src/error.rs | 10 ++++++++++ src/main.rs | 2 +- 5 files changed, 64 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5faa1e7..9f6852f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,11 +11,13 @@ edition = "2018" # TODO: reduce the feature groups for faster compilation #tokio = { version = "0.2", features = ["full"] } -async-std = "1.5" futures = { version = "0.3", features = ["thread-pool", "compat"] } futures-util = "0.3" futures-signals = "0.3" +async-std = "1.5" +smol = "0.1" + signal-hook = { version = "0.1", features = ["tokio-support"] } slog = { version = "2.5", features = ["max_level_trace"] } @@ -24,6 +26,7 @@ slog-async = "2.4" capnp = "0.12" capnp-rpc = "0.12" +capnp-futures = "0.12" toml = "0.5" serde = { version = "1.0", features = ["derive"] } diff --git a/src/api.rs b/src/api.rs index 30dcbb4..0d2f369 100644 --- a/src/api.rs +++ b/src/api.rs @@ -5,6 +5,7 @@ pub mod gen { } use async_std::net::TcpStream; +use futures::io::{AsyncRead, AsyncWrite}; use slog::Logger; diff --git a/src/connection.rs b/src/connection.rs index af15a73..14cd4b5 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,4 +1,52 @@ +use slog::Logger; + +use async_std::net::TcpStream; +use futures::io::AsyncWriteExt; + +use crate::error::Result; + pub mod gen { include!(concat!(env!("OUT_DIR"), "/schema/connection_capnp.rs")); } + +pub async fn handle_connection(log: Logger, mut stream: TcpStream) -> Result<()> { + let host = "localhost"; + let program = "Difluoroborane-0.1.0"; + let version = (0u32,1u32); + + let mut message = capnp::message::Builder::new_default(); + let greet_outer = message.init_root::(); + let mut greeting = greet_outer.init_greet(); + greeting.set_host(host); + greeting.set_program(program); + greeting.set_major(version.0); + greeting.set_minor(version.1); + + capnp_futures::serialize::write_message(&mut stream, message).await?; + + stream.flush().await?; + + let receive_options = capnp::message::ReaderOptions::default(); + let message = capnp_futures::serialize::read_message(&mut stream, receive_options).await.unwrap().unwrap(); + let body: capnp::any_pointer::Reader = message.get_root().unwrap(); + let m = body.get_as::().unwrap(); + + if m.has_greet() { + match m.which() { + Ok(gen::message::Which::Greet(Ok(r))) => { + println!("Host {} with program {} is saying hello. They speak API version {}.{}.", + r.get_host().unwrap(), + r.get_program().unwrap(), + r.get_major(), + r.get_minor()) + }, + _ => { + // We *JUST* checked that it's a greeting. This can not happen + unreachable!() + } + } + } + + Ok(()) +} diff --git a/src/error.rs b/src/error.rs index 7a34267..2ffaa48 100644 --- a/src/error.rs +++ b/src/error.rs @@ -11,6 +11,7 @@ pub enum Error { SASL(SaslError), IO(io::Error), Boxed(Box), + Capnp(capnp::Error), } impl fmt::Display for Error { @@ -31,6 +32,9 @@ impl fmt::Display for Error { Error::Boxed(e) => { write!(f, "{}", e) } + Error::Capnp(e) => { + write!(f, "Cap'n Proto Error: {}", e) + } } } } @@ -65,4 +69,10 @@ impl From> for Error { } } +impl From for Error { + fn from(e: capnp::Error) -> Error { + Error::Capnp(e) + } +} + pub type Result = std::result::Result; diff --git a/src/main.rs b/src/main.rs index 4c4ec23..0a6d2df 100644 --- a/src/main.rs +++ b/src/main.rs @@ -191,7 +191,7 @@ fn main() -> Result<(), Error> { let elog = log.clone(); // We handle the error using map_err - let f = api::handle_connection(log.clone(), socket) + let f = connection::handle_connection(log.clone(), socket) .map_err(move |e| { error!(log, "Error occured during protocol handling: {}", e); })