mirror of
https://gitlab.com/fabinfra/fabaccess/bffh.git
synced 2024-11-22 06:47:56 +01:00
more main cleanup
This commit is contained in:
parent
a3fa03f0ee
commit
796e957b27
@ -11,10 +11,12 @@ edition = "2018"
|
|||||||
#tokio = { version = "0.2", features = ["full"] }
|
#tokio = { version = "0.2", features = ["full"] }
|
||||||
|
|
||||||
async-std = "1.5"
|
async-std = "1.5"
|
||||||
futures = "0.3"
|
futures = { version = "0.3", features = ["thread-pool", "compat"] }
|
||||||
futures-util = "0.3"
|
futures-util = "0.3"
|
||||||
futures-signals = "0.3"
|
futures-signals = "0.3"
|
||||||
|
|
||||||
|
signal-hook = { version = "0.1", features = ["tokio-support"] }
|
||||||
|
|
||||||
slog = { version = "2.5", features = ["max_level_trace"] }
|
slog = { version = "2.5", features = ["max_level_trace"] }
|
||||||
slog-term = "2.5"
|
slog-term = "2.5"
|
||||||
slog-async = "2.4"
|
slog-async = "2.4"
|
||||||
@ -29,5 +31,7 @@ casbin = "0.2"
|
|||||||
|
|
||||||
uuid = { version = "0.8", features = ["serde", "v4"] }
|
uuid = { version = "0.8", features = ["serde", "v4"] }
|
||||||
|
|
||||||
|
clap = "2.33"
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
capnpc = "0.12"
|
capnpc = "0.12"
|
||||||
|
135
src/main.rs
135
src/main.rs
@ -24,7 +24,9 @@ use session::Session;
|
|||||||
|
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use futures::executor::{LocalPool, ThreadPool};
|
use futures::executor::{LocalPool, ThreadPool};
|
||||||
|
use futures::compat::Stream01CompatExt;
|
||||||
use futures::join;
|
use futures::join;
|
||||||
|
use futures::task::SpawnExt;
|
||||||
|
|
||||||
use capnp_rpc::twoparty::{VatNetwork, VatId};
|
use capnp_rpc::twoparty::{VatNetwork, VatId};
|
||||||
use capnp_rpc::rpc_twoparty_capnp::Side;
|
use capnp_rpc::rpc_twoparty_capnp::Side;
|
||||||
@ -110,19 +112,21 @@ fn main() -> Result<(), Error> {
|
|||||||
|
|
||||||
// Bind to each address in config.listen.
|
// Bind to each address in config.listen.
|
||||||
// This is a Stream over Futures so it will do absolutely nothing unless polled to completion
|
// This is a Stream over Futures so it will do absolutely nothing unless polled to completion
|
||||||
let listeners_s: futures::stream::Collect<_, Vec<TcpListener>> = stream::iter((&config).listen.iter())
|
let listeners_s: futures::stream::Collect<_, Vec<TcpListener>>
|
||||||
|
= stream::iter((&config).listen.iter())
|
||||||
.map(|l| {
|
.map(|l| {
|
||||||
let addr = l.address.clone();
|
let addr = l.address.clone();
|
||||||
TcpListener::bind((l.address.as_str(), l.port.unwrap_or(config::DEFAULT_PORT)))
|
let port = l.port.unwrap_or(config::DEFAULT_PORT);
|
||||||
|
TcpListener::bind((l.address.as_str(), port))
|
||||||
// If the bind errors, include the address so we can log it
|
// If the bind errors, include the address so we can log it
|
||||||
// Since this closure is lazy we need to have a cloned addr
|
// Since this closure is lazy we need to have a cloned addr
|
||||||
.map_err(|e| { (addr, e) })
|
.map_err(move |e| { (addr, port, e) })
|
||||||
})
|
})
|
||||||
.filter_map(|f| async {
|
.filter_map(|f| async {
|
||||||
match f.await {
|
match f.await {
|
||||||
Ok(l) => Some(l),
|
Ok(l) => Some(l),
|
||||||
Err((addr, e)) => {
|
Err((addr, port, e)) => {
|
||||||
error!(&log, "Could not connect to {}: {}", addr, e);
|
error!(&log, "Could not setup socket on {} port {}: {}", addr, port, e);
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -130,7 +134,7 @@ fn main() -> Result<(), Error> {
|
|||||||
|
|
||||||
let (mdb, pdb, auth) = exec.run_until(async {
|
let (mdb, pdb, auth) = exec.run_until(async {
|
||||||
// Rull all futures to completion in parallel.
|
// Rull all futures to completion in parallel.
|
||||||
// This will "block" until all three are done starting up.
|
// This will block until all three are done starting up.
|
||||||
join!(machinedb_f, permission_f, authentication_f)
|
join!(machinedb_f, permission_f, authentication_f)
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -141,29 +145,122 @@ fn main() -> Result<(), Error> {
|
|||||||
|
|
||||||
// Since the below closures will happen at a much later time we need to make sure all pointers
|
// Since the below closures will happen at a much later time we need to make sure all pointers
|
||||||
// are still valid. Thus, Arc.
|
// are still valid. Thus, Arc.
|
||||||
let l2 = log.clone();
|
let start_log = log.clone();
|
||||||
let l3 = log.clone();
|
let stop_log = log.clone();
|
||||||
|
|
||||||
// Create a thread pool to run tasks on
|
// Create a thread pool to run tasks on
|
||||||
let mut pool = ThreadPool::builder()
|
let pool = ThreadPool::builder()
|
||||||
.after_start(move |i| { info!(l2.new(o!("system" => "threadpool")), "Starting Thread <{}>", i)})
|
.after_start(move |i| {
|
||||||
.before_stop(move |i| { info!(l3.new(o!("system" => "threadpool")), "Stopping Thread <{}>", i)})
|
info!(start_log.new(o!("system" => "threadpool")), "Starting Thread <{}>", i)
|
||||||
|
})
|
||||||
|
.before_stop(move |i| {
|
||||||
|
info!(stop_log.new(o!("system" => "threadpool")), "Stopping Thread <{}>", i)
|
||||||
|
})
|
||||||
.create()?;
|
.create()?;
|
||||||
|
|
||||||
// Spawner is a handle to the shared ThreadPool forwarded into each connection
|
// Closure inefficiencies. Lucky cloning an Arc is pretty cheap.
|
||||||
let spawner = pool.clone();
|
let inner_log = log.clone();
|
||||||
|
let loop_log = log.clone();
|
||||||
|
|
||||||
let result: Result<(), Box<dyn std::error::Error>> = exec.run_until(async move {
|
exec.run_until(async move {
|
||||||
// Generate a stream of TcpStreams appearing on any of the interfaces we listen to
|
// Generate a stream of TcpStreams appearing on any of the interfaces we listen to
|
||||||
let listeners = listeners_s.await;
|
let listeners = listeners_s.await;
|
||||||
let mut incoming = stream::select_all(listeners.iter().map(|l| l.incoming()));
|
let incoming = stream::select_all(listeners.iter().map(|l| l.incoming()));
|
||||||
|
|
||||||
// Runn
|
// Spawner is a handle to the shared ThreadPool forwarded into each connection
|
||||||
while let Some(socket) = incoming.next().await {
|
let spawner = pool.clone();
|
||||||
|
|
||||||
|
// For each incoming connection start a new task to handle it and throw it on the thread
|
||||||
|
// pool
|
||||||
|
let handle_sockets = incoming.map(|socket| {
|
||||||
|
// incoming.next() is an error when the underlying `accept` call yielded an error
|
||||||
|
// In POSIX those are protocol errors we can't really handle, so we just log the error
|
||||||
|
// and the move on
|
||||||
|
match socket {
|
||||||
|
Ok(socket) => {
|
||||||
|
// If we have it available add the peer's address to all log messages
|
||||||
|
let log =
|
||||||
|
if let Ok(addr) = socket.peer_addr() {
|
||||||
|
inner_log.new(o!("address" => addr))
|
||||||
|
} else {
|
||||||
|
inner_log.new(o!())
|
||||||
|
};
|
||||||
|
|
||||||
|
// Clone a log for potential error handling
|
||||||
|
let elog = log.clone();
|
||||||
|
|
||||||
|
// We handle the error using map_err, `let _` is used to quiet the compiler
|
||||||
|
// warning
|
||||||
|
let f = api::handle_connection(log.clone(), socket, spawner.clone())
|
||||||
|
.map_err(move |e| {
|
||||||
|
error!(log, "Error occured during protocol handling: {}", e);
|
||||||
|
})
|
||||||
|
// Void any and all results since pool.spawn allows no return value.
|
||||||
|
.map(|_| ());
|
||||||
|
|
||||||
|
// In this case only the error is relevant since the Value is always ()
|
||||||
|
if let Err(e) = pool.spawn(f) {
|
||||||
|
error!(elog, "Failed to spawn connection handler: {}", e);
|
||||||
|
// Failing to spawn a handler means we are most likely overloaded
|
||||||
|
return LoopResult::Overloaded;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
error!(inner_log, "Socket `accept` error: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unless we are overloaded we just want to keep going.
|
||||||
|
return LoopResult::Continue;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Check each signal as it arrives
|
||||||
|
// signals is a futures-0.1 stream, compat() makes it a futures-0.3 (which we use) stream
|
||||||
|
let handle_signals = signals.compat().map(|_signal| {
|
||||||
|
// _signal is the signal c_int.
|
||||||
|
// But since we only listen for SIGINT at the moment we don't really need to look at
|
||||||
|
// it.
|
||||||
|
return LoopResult::Stop;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Now actually check if a connection was opened or a signal recv'd
|
||||||
|
let mut combined = stream::select(handle_signals, handle_sockets);
|
||||||
|
loop {
|
||||||
|
match combined.next().await {
|
||||||
|
// When the result says to continue, do exactly that
|
||||||
|
Some(LoopResult::Continue) => {}
|
||||||
|
Some(LoopResult::Overloaded) => {
|
||||||
|
// In case over server overload we should install a replacement handler that
|
||||||
|
// would instead just return `overloaded` for all connections until the
|
||||||
|
// situation is remedied.
|
||||||
|
//
|
||||||
|
// For now, just log the overload and keep going.
|
||||||
|
error!(loop_log, "Server overloaded");
|
||||||
|
}
|
||||||
|
// None should never be returned because it would mean all sockets were closed and
|
||||||
|
// we can not receive any further signals. Still, in that case shut down cleanly
|
||||||
|
// anyway, the only reason this could happen are some heavy bugs in the runtime
|
||||||
|
Some(LoopResult::Stop) | None => {
|
||||||
|
warn!(loop_log, "Stopping server");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// TODO: Run actual shut down code here
|
||||||
|
info!(log, "Shutting down...");
|
||||||
|
|
||||||
|
// Returning () is an implicit success so this will properly set the exit code as well
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The result of one iteration of the core loop
|
||||||
|
enum LoopResult {
|
||||||
|
/// Everything was fine, keep going
|
||||||
|
Continue,
|
||||||
|
/// Something happened that means we should shut down
|
||||||
|
Stop,
|
||||||
|
/// The Server is currently overloaded
|
||||||
|
Overloaded,
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user