fabaccess-bffh/bffhd/capnp/mod.rs

174 lines
5.2 KiB
Rust
Raw Normal View History

use async_net::TcpListener;
2022-03-15 20:00:43 +01:00
2022-03-10 20:52:34 +01:00
use capnp_rpc::rpc_twoparty_capnp::Side;
use capnp_rpc::twoparty::VatNetwork;
use capnp_rpc::RpcSystem;
2022-06-23 21:19:31 +02:00
use executor::prelude::{Executor, GroupId, SupervisionRegistry};
2022-03-08 16:41:38 +01:00
use futures_rustls::server::TlsStream;
use futures_rustls::TlsAcceptor;
use futures_util::stream::FuturesUnordered;
use futures_util::{stream, AsyncRead, AsyncWrite, FutureExt, StreamExt};
2022-03-15 20:00:43 +01:00
use std::future::Future;
use std::io;
2022-03-15 20:00:43 +01:00
2022-03-15 19:56:41 +01:00
use std::net::SocketAddr;
2022-03-15 20:00:43 +01:00
2022-03-12 17:31:53 +01:00
use crate::authentication::AuthenticationHandle;
use crate::session::SessionManager;
2021-11-26 21:01:43 +01:00
2022-03-20 21:22:15 +01:00
mod config;
pub use config::{Listen, TlsListen};
2022-03-10 20:52:34 +01:00
mod authenticationsystem;
mod connection;
2022-03-10 20:52:34 +01:00
mod machine;
mod machinesystem;
mod permissionsystem;
mod session;
mod user;
mod user_system;
2021-11-26 22:11:24 +01:00
pub struct APIServer {
executor: Executor<'static>,
sockets: Vec<TcpListener>,
acceptor: TlsAcceptor,
2022-03-12 17:31:53 +01:00
sessionmanager: SessionManager,
authentication: AuthenticationHandle,
}
impl APIServer {
pub fn new(
executor: Executor<'static>,
sockets: Vec<TcpListener>,
acceptor: TlsAcceptor,
2022-03-12 17:31:53 +01:00
sessionmanager: SessionManager,
authentication: AuthenticationHandle,
) -> Self {
Self {
executor,
sockets,
acceptor,
2022-03-12 17:31:53 +01:00
sessionmanager,
authentication,
}
}
pub async fn bind(
executor: Executor<'static>,
listens: impl IntoIterator<Item = &Listen>,
acceptor: TlsAcceptor,
2022-03-12 17:31:53 +01:00
sessionmanager: SessionManager,
authentication: AuthenticationHandle,
2022-06-02 17:46:26 +02:00
) -> miette::Result<Self> {
let span = tracing::info_span!("binding API listen sockets");
let _guard = span.enter();
2022-03-10 20:52:34 +01:00
2022-03-15 20:00:43 +01:00
let sockets = FuturesUnordered::new();
listens
.into_iter()
2022-05-05 15:50:44 +02:00
.map(|a| async move { (async_net::resolve(a.to_tuple()).await, a) })
.collect::<FuturesUnordered<_>>()
.filter_map(|(res, addr)| async move {
match res {
Ok(a) => Some(a),
Err(e) => {
tracing::error!("Failed to resolve {:?}: {}", addr, e);
None
}
}
})
.for_each(|addrs| async {
for addr in addrs {
sockets.push(async move { (TcpListener::bind(addr).await, addr) })
}
})
.await;
let sockets: Vec<TcpListener> = sockets
.filter_map(|(res, addr)| async move {
match res {
Ok(s) => {
tracing::info!("Opened listen socket on {}", addr);
Some(s)
}
Err(e) => {
tracing::error!("Failed to open socket on {}: {}", addr, e);
None
}
}
})
.collect()
.await;
2021-11-26 22:11:24 +01:00
2022-03-15 19:56:41 +01:00
tracing::info!("listening on {:?}", sockets);
if sockets.is_empty() {
tracing::warn!("No usable listen addresses configured for the API server!");
}
2021-11-26 22:11:24 +01:00
2022-05-05 15:50:44 +02:00
Ok(Self::new(
executor,
sockets,
acceptor,
sessionmanager,
authentication,
))
2022-03-10 20:52:34 +01:00
}
2021-12-06 21:53:42 +01:00
2022-03-12 17:31:53 +01:00
pub async fn handle_until(self, stop: impl Future) {
stream::select_all(
self.sockets
.iter()
.map(|tcplistener| tcplistener.incoming()),
)
.take_until(stop)
.for_each(|stream| async {
match stream {
2022-03-15 19:56:41 +01:00
Ok(stream) => {
if let Ok(peer_addr) = stream.peer_addr() {
self.handle(peer_addr, self.acceptor.accept(stream))
} else {
tracing::error!(?stream, "failing a TCP connection with no peer addr");
}
2022-05-05 15:50:44 +02:00
}
Err(e) => tracing::warn!("Failed to accept stream: {}", e),
}
2022-05-05 15:50:44 +02:00
})
.await;
2022-03-15 19:56:41 +01:00
tracing::info!("closing down API handler");
2021-11-26 22:11:24 +01:00
}
fn handle<IO: 'static + Unpin + AsyncRead + AsyncWrite>(
&self,
2022-03-15 19:56:41 +01:00
peer_addr: SocketAddr,
stream: impl Future<Output = io::Result<TlsStream<IO>>>,
) {
2022-03-15 19:56:41 +01:00
tracing::debug!("handling new API connection");
let f = async move {
let stream = match stream.await {
Ok(stream) => stream,
Err(e) => {
tracing::error!("TLS handshake failed: {}", e);
return;
}
};
let (rx, tx) = futures_lite::io::split(stream);
let vat = VatNetwork::new(rx, tx, Side::Server, Default::default());
2022-03-12 17:31:53 +01:00
2022-05-05 15:50:44 +02:00
let bootstrap: connection::Client = capnp_rpc::new_client(connection::BootCap::new(
peer_addr,
self.authentication.clone(),
self.sessionmanager.clone(),
));
if let Err(e) = RpcSystem::new(Box::new(vat), Some(bootstrap.client)).await {
tracing::error!("Error during RPC handling: {}", e);
}
};
2022-06-23 21:19:31 +02:00
let cgroup = SupervisionRegistry::with(SupervisionRegistry::new_group);
self.executor.spawn_local_cgroup(f, cgroup);
2021-11-26 22:11:24 +01:00
}
}