mirror of
https://gitlab.com/fabinfra/fabaccess/bffh.git
synced 2024-11-25 16:17:56 +01:00
Better tls connection handling with smol::io::split
This commit is contained in:
parent
9571afbcc7
commit
f397e1e636
@ -18,6 +18,7 @@ use crate::error::Result;
|
|||||||
|
|
||||||
use capnp_rpc::{rpc_twoparty_capnp, twoparty};
|
use capnp_rpc::{rpc_twoparty_capnp, twoparty};
|
||||||
use futures_util::{pin_mut, ready};
|
use futures_util::{pin_mut, ready};
|
||||||
|
use smol::io::split;
|
||||||
|
|
||||||
use crate::schema::connection_capnp;
|
use crate::schema::connection_capnp;
|
||||||
|
|
||||||
@ -85,14 +86,14 @@ impl ConnectionHandler {
|
|||||||
pub fn handle<IO: 'static + Unpin + AsyncWrite + AsyncRead>(&mut self, stream: TlsStream<IO>)
|
pub fn handle<IO: 'static + Unpin + AsyncWrite + AsyncRead>(&mut self, stream: TlsStream<IO>)
|
||||||
-> impl Future<Output=Result<()>>
|
-> impl Future<Output=Result<()>>
|
||||||
{
|
{
|
||||||
let conn = Connection::new(stream);
|
let (mut reader, mut writer) = split(stream);
|
||||||
|
|
||||||
let boots = Bootstrap::new(self.log.new(o!()), self.db.clone(), self.network.clone());
|
let boots = Bootstrap::new(self.log.new(o!()), self.db.clone(), self.network.clone());
|
||||||
let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(boots);
|
let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(boots);
|
||||||
|
|
||||||
let network = twoparty::VatNetwork::new(
|
let network = twoparty::VatNetwork::new(
|
||||||
conn.clone(),
|
reader,
|
||||||
conn,
|
writer,
|
||||||
rpc_twoparty_capnp::Side::Server,
|
rpc_twoparty_capnp::Side::Server,
|
||||||
Default::default(),
|
Default::default(),
|
||||||
);
|
);
|
||||||
@ -101,77 +102,4 @@ impl ConnectionHandler {
|
|||||||
// Convert the error type to one of our errors
|
// Convert the error type to one of our errors
|
||||||
rpc_system.map(|r| r.map_err(Into::into))
|
rpc_system.map(|r| r.map_err(Into::into))
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
struct Connection<IO> {
|
|
||||||
inner: Rc<Mutex<TlsStream<IO>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<IO> Connection<IO> {
|
|
||||||
pub fn new(stream: TlsStream<IO>) -> Self {
|
|
||||||
Self {
|
|
||||||
inner: Rc::new(Mutex::new(stream)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<IO> Clone for Connection<IO> {
|
|
||||||
fn clone(&self) -> Self {
|
|
||||||
Self {
|
|
||||||
inner: self.inner.clone()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
impl<IO: 'static + AsyncRead + AsyncWrite + Unpin> AsyncRead for Connection<IO> {
|
|
||||||
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
|
|
||||||
let f = self.inner.lock();
|
|
||||||
pin_mut!(f);
|
|
||||||
let mut guard = ready!(f.poll(cx));
|
|
||||||
let stream = guard.deref_mut();
|
|
||||||
Pin::new(stream).poll_read(cx, buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>]) -> Poll<std::io::Result<usize>> {
|
|
||||||
let f = self.inner.lock();
|
|
||||||
pin_mut!(f);
|
|
||||||
let mut guard = ready!(f.poll(cx));
|
|
||||||
let stream = guard.deref_mut();
|
|
||||||
Pin::new(stream).poll_read_vectored(cx, bufs)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<IO: 'static + AsyncWrite + AsyncRead + Unpin> AsyncWrite for Connection<IO> {
|
|
||||||
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::io::Result<usize>> {
|
|
||||||
let f = self.inner.lock();
|
|
||||||
pin_mut!(f);
|
|
||||||
let mut guard = ready!(f.poll(cx));
|
|
||||||
let stream = guard.deref_mut();
|
|
||||||
Pin::new(stream).poll_write(cx, buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>]) -> Poll<std::io::Result<usize>> {
|
|
||||||
let f = self.inner.lock();
|
|
||||||
pin_mut!(f);
|
|
||||||
let mut guard = ready!(f.poll(cx));
|
|
||||||
let stream = guard.deref_mut();
|
|
||||||
Pin::new(stream).poll_write_vectored(cx, bufs)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
|
|
||||||
let f = self.inner.lock();
|
|
||||||
pin_mut!(f);
|
|
||||||
let mut guard = ready!(f.poll(cx));
|
|
||||||
let stream = guard.deref_mut();
|
|
||||||
Pin::new(stream).poll_flush(cx)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
|
|
||||||
let f = self.inner.lock();
|
|
||||||
pin_mut!(f);
|
|
||||||
let mut guard = ready!(f.poll(cx));
|
|
||||||
let stream = guard.deref_mut();
|
|
||||||
Pin::new(stream).poll_close(cx)
|
|
||||||
}
|
|
||||||
}
|
}
|
@ -138,28 +138,25 @@ pub fn serve_api_connections(log: Arc<Logger>, config: Config, db: Databases, nw
|
|||||||
let tls_acceptor_clone = tls_acceptor.clone();
|
let tls_acceptor_clone = tls_acceptor.clone();
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
let tls_acceptor = tls_acceptor_clone;
|
let tls_acceptor = tls_acceptor_clone;
|
||||||
let local_ex = LocalExecutor::new();
|
|
||||||
|
|
||||||
info!(tlog, "New connection from on {:?}", socket);
|
info!(tlog, "New connection from on {:?}", socket);
|
||||||
|
let peer = socket.peer_addr();
|
||||||
let mut handler = connection::ConnectionHandler::new(tlog, db, network);
|
let mut handler = connection::ConnectionHandler::new(tlog, db, network);
|
||||||
// We handle the error using map_err
|
// We handle the error using map_err
|
||||||
let log2 = log.clone();
|
|
||||||
let f = tls_acceptor.accept(socket)
|
let f = tls_acceptor.accept(socket)
|
||||||
.map_err(move |e| {
|
.map_err(Error::IO)
|
||||||
error!(log, "Error occured during protocol handling: {}", e);
|
.and_then(|stream| handler.handle(stream));
|
||||||
})
|
|
||||||
.and_then(|stream| {
|
|
||||||
handler.handle(stream).map_err(move |e| {
|
|
||||||
error!(log2, "Error occured during protocol handling: {}", e);
|
|
||||||
})
|
|
||||||
})
|
|
||||||
// Void any and all results since pool.spawn allows no return value.
|
|
||||||
.map(|_| ());
|
|
||||||
|
|
||||||
// Spawn the connection context onto the local executor since it isn't Send
|
// Spawn the connection context onto the local executor since it isn't Send
|
||||||
// Also `detach` it so the task isn't canceled as soon as it's dropped.
|
// Also `detach` it so the task isn't canceled as soon as it's dropped.
|
||||||
// TODO: Store all those tasks to have a easier way of managing them?
|
// TODO: Store all those tasks to have a easier way of managing them?
|
||||||
smol::block_on(f);
|
if let Err(e) = smol::block_on(f) {
|
||||||
|
error!(log, "Error occurred during connection handling: {:?}", e)
|
||||||
|
} else if let Ok(peer) = peer {
|
||||||
|
debug!(log, "Closed connection with {:?}", peer);
|
||||||
|
} else {
|
||||||
|
debug!(log, "Closed connection with unknown peer");
|
||||||
|
}
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
Loading…
Reference in New Issue
Block a user