ununimpl API impl

This commit is contained in:
Gregor Reitzenstein 2020-10-29 13:04:20 +01:00
parent 4339537ec9
commit 7b9d3cd560
2 changed files with 47 additions and 5 deletions

View File

@ -2,20 +2,30 @@ use slog::Logger;
use smol::net::TcpStream; use smol::net::TcpStream;
use crate::error::Result; use crate::error::{Error, Result};
use crate::auth; use crate::auth;
use crate::api; use crate::api;
pub use crate::schema::connection_capnp; pub use crate::schema::connection_capnp;
use capnp_rpc::{twoparty, rpc_twoparty_capnp};
use capnp::capability::{Params, Results, Promise, FromServer}; use capnp::capability::{Params, Results, Promise, FromServer};
/// Connection context /// Connection context
struct Connection { struct Connection {
stream: TcpStream, log: Logger,
user: Option<auth::User>, user: Option<auth::User>,
} }
impl Connection {
pub fn new(log: Logger) -> Self {
let user = None;
Self { log, user }
}
}
use connection_capnp::bootstrap::*; use connection_capnp::bootstrap::*;
impl connection_capnp::bootstrap::Server for Connection { impl connection_capnp::bootstrap::Server for Connection {
@ -38,13 +48,41 @@ impl connection_capnp::bootstrap::Server for Connection {
mut res: Results<permissions_results::Owned> mut res: Results<permissions_results::Owned>
) -> Promise<(), capnp::Error> { ) -> Promise<(), capnp::Error> {
if self.user.is_some() { if self.user.is_some() {
} }
Promise::ok(()) Promise::ok(())
} }
} }
pub async fn handle_connection(log: Logger, mut stream: TcpStream) -> Result<()> { async fn handshake(log: &Logger, stream: &mut TcpStream) -> Result<()> {
unimplemented!() if let Some(m) = capnp_futures::serialize::read_message(stream, Default::default()).await? {
let greeting = m.get_root::<connection_capnp::greeting::Reader>()?;
let major = greeting.get_major();
let minor = greeting.get_minor();
if major != 1 {
Err(Error::BadVersion((major, minor)))
} else {
info!(log, "Handshake successful with peer {} running {}, API {}.{}",
greeting.get_host()?, greeting.get_program()?, major, minor);
Ok(())
}
} else {
unimplemented!()
}
}
pub async fn handle_connection(log: Logger, mut stream: TcpStream) -> Result<()> {
handshake(&log, &mut stream).await?;
let mut conn = Connection::new(log);
let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(conn);
let network = twoparty::VatNetwork::new(stream.clone(), stream,
rpc_twoparty_capnp::Side::Server, Default::default());
let rpc_system = capnp_rpc::RpcSystem::new(Box::new(network),
Some(rpc.client));
rpc_system.await;
Ok(())
} }

View File

@ -23,6 +23,7 @@ pub enum Error {
FuturesSpawn(futures::SpawnError), FuturesSpawn(futures::SpawnError),
MQTT(mqtt::Error), MQTT(mqtt::Error),
Config(config::ConfigError), Config(config::ConfigError),
BadVersion((u32,u32)),
} }
impl fmt::Display for Error { impl fmt::Display for Error {
@ -64,6 +65,9 @@ impl fmt::Display for Error {
Error::Config(e) => { Error::Config(e) => {
write!(f, "Failed to parse config: {}", e) write!(f, "Failed to parse config: {}", e)
} }
Error::BadVersion((major,minor)) => {
write!(f, "Peer uses API version {}.{} which is incompatible!", major, minor)
}
} }
} }
} }