From f397e1e63699ec85305fd0de1d27446682815fad Mon Sep 17 00:00:00 2001 From: Nadja Reitzenstein Date: Thu, 9 Dec 2021 21:54:57 +0100 Subject: [PATCH] Better tls connection handling with smol::io::split --- src/connection.rs | 80 +++-------------------------------------------- src/server.rs | 23 ++++++-------- 2 files changed, 14 insertions(+), 89 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 967ced3..6072463 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -18,6 +18,7 @@ use crate::error::Result; use capnp_rpc::{rpc_twoparty_capnp, twoparty}; use futures_util::{pin_mut, ready}; +use smol::io::split; use crate::schema::connection_capnp; @@ -85,14 +86,14 @@ impl ConnectionHandler { pub fn handle(&mut self, stream: TlsStream) -> impl Future> { - let conn = Connection::new(stream); + let (mut reader, mut writer) = split(stream); let boots = Bootstrap::new(self.log.new(o!()), self.db.clone(), self.network.clone()); let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(boots); let network = twoparty::VatNetwork::new( - conn.clone(), - conn, + reader, + writer, rpc_twoparty_capnp::Side::Server, Default::default(), ); @@ -101,77 +102,4 @@ impl ConnectionHandler { // Convert the error type to one of our errors rpc_system.map(|r| r.map_err(Into::into)) } -} - -struct Connection { - inner: Rc>>, -} - -impl Connection { - pub fn new(stream: TlsStream) -> Self { - Self { - inner: Rc::new(Mutex::new(stream)), - } - } -} - -impl Clone for Connection { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone() - } - } -} - - -impl AsyncRead for Connection { - fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - let f = self.inner.lock(); - pin_mut!(f); - let mut guard = ready!(f.poll(cx)); - let stream = guard.deref_mut(); - Pin::new(stream).poll_read(cx, buf) - } - - fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>]) -> Poll> { - let f = self.inner.lock(); - pin_mut!(f); - let mut guard = ready!(f.poll(cx)); - let stream = guard.deref_mut(); - Pin::new(stream).poll_read_vectored(cx, bufs) - } -} - -impl AsyncWrite for Connection { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - let f = self.inner.lock(); - pin_mut!(f); - let mut guard = ready!(f.poll(cx)); - let stream = guard.deref_mut(); - Pin::new(stream).poll_write(cx, buf) - } - - fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>]) -> Poll> { - let f = self.inner.lock(); - pin_mut!(f); - let mut guard = ready!(f.poll(cx)); - let stream = guard.deref_mut(); - Pin::new(stream).poll_write_vectored(cx, bufs) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let f = self.inner.lock(); - pin_mut!(f); - let mut guard = ready!(f.poll(cx)); - let stream = guard.deref_mut(); - Pin::new(stream).poll_flush(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let f = self.inner.lock(); - pin_mut!(f); - let mut guard = ready!(f.poll(cx)); - let stream = guard.deref_mut(); - Pin::new(stream).poll_close(cx) - } } \ No newline at end of file diff --git a/src/server.rs b/src/server.rs index 4b9c500..7775330 100644 --- a/src/server.rs +++ b/src/server.rs @@ -138,28 +138,25 @@ pub fn serve_api_connections(log: Arc, config: Config, db: Databases, nw let tls_acceptor_clone = tls_acceptor.clone(); std::thread::spawn(move || { let tls_acceptor = tls_acceptor_clone; - let local_ex = LocalExecutor::new(); info!(tlog, "New connection from on {:?}", socket); + let peer = socket.peer_addr(); let mut handler = connection::ConnectionHandler::new(tlog, db, network); // We handle the error using map_err - let log2 = log.clone(); let f = tls_acceptor.accept(socket) - .map_err(move |e| { - error!(log, "Error occured during protocol handling: {}", e); - }) - .and_then(|stream| { - handler.handle(stream).map_err(move |e| { - error!(log2, "Error occured during protocol handling: {}", e); - }) - }) - // Void any and all results since pool.spawn allows no return value. - .map(|_| ()); + .map_err(Error::IO) + .and_then(|stream| handler.handle(stream)); // Spawn the connection context onto the local executor since it isn't Send // Also `detach` it so the task isn't canceled as soon as it's dropped. // TODO: Store all those tasks to have a easier way of managing them? - smol::block_on(f); + if let Err(e) = smol::block_on(f) { + error!(log, "Error occurred during connection handling: {:?}", e) + } else if let Ok(peer) = peer { + debug!(log, "Closed connection with {:?}", peer); + } else { + debug!(log, "Closed connection with unknown peer"); + } }); }, Err(e) => {