diff --git a/src/connection.rs b/src/connection.rs index 4d018da..7014c3d 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -2,20 +2,30 @@ use slog::Logger; use smol::net::TcpStream; -use crate::error::Result; +use crate::error::{Error, Result}; use crate::auth; use crate::api; pub use crate::schema::connection_capnp; +use capnp_rpc::{twoparty, rpc_twoparty_capnp}; + use capnp::capability::{Params, Results, Promise, FromServer}; /// Connection context struct Connection { - stream: TcpStream, + log: Logger, user: Option, } +impl Connection { + pub fn new(log: Logger) -> Self { + let user = None; + + Self { log, user } + } +} + use connection_capnp::bootstrap::*; impl connection_capnp::bootstrap::Server for Connection { @@ -38,13 +48,41 @@ impl connection_capnp::bootstrap::Server for Connection { mut res: Results ) -> Promise<(), capnp::Error> { if self.user.is_some() { - } Promise::ok(()) } } -pub async fn handle_connection(log: Logger, mut stream: TcpStream) -> Result<()> { - unimplemented!() +async fn handshake(log: &Logger, stream: &mut TcpStream) -> Result<()> { + if let Some(m) = capnp_futures::serialize::read_message(stream, Default::default()).await? { + let greeting = m.get_root::()?; + let major = greeting.get_major(); + let minor = greeting.get_minor(); + + if major != 1 { + Err(Error::BadVersion((major, minor))) + } else { + info!(log, "Handshake successful with peer {} running {}, API {}.{}", + greeting.get_host()?, greeting.get_program()?, major, minor); + Ok(()) + } + } else { + unimplemented!() + } +} + +pub async fn handle_connection(log: Logger, mut stream: TcpStream) -> Result<()> { + handshake(&log, &mut stream).await?; + + let mut conn = Connection::new(log); + let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(conn); + + let network = twoparty::VatNetwork::new(stream.clone(), stream, + rpc_twoparty_capnp::Side::Server, Default::default()); + let rpc_system = capnp_rpc::RpcSystem::new(Box::new(network), + Some(rpc.client)); + + rpc_system.await; + Ok(()) } diff --git a/src/error.rs b/src/error.rs index 7c8e537..758260d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -23,6 +23,7 @@ pub enum Error { FuturesSpawn(futures::SpawnError), MQTT(mqtt::Error), Config(config::ConfigError), + BadVersion((u32,u32)), } impl fmt::Display for Error { @@ -64,6 +65,9 @@ impl fmt::Display for Error { Error::Config(e) => { write!(f, "Failed to parse config: {}", e) } + Error::BadVersion((major,minor)) => { + write!(f, "Peer uses API version {}.{} which is incompatible!", major, minor) + } } } }