diff --git a/Cargo.toml b/Cargo.toml index 4d50c95..4a53de0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,10 +11,12 @@ edition = "2018" #tokio = { version = "0.2", features = ["full"] } async-std = "1.5" -futures = "0.3" +futures = { version = "0.3", features = ["thread-pool", "compat"] } futures-util = "0.3" futures-signals = "0.3" +signal-hook = { version = "0.1", features = ["tokio-support"] } + slog = { version = "2.5", features = ["max_level_trace"] } slog-term = "2.5" slog-async = "2.4" @@ -29,5 +31,7 @@ casbin = "0.2" uuid = { version = "0.8", features = ["serde", "v4"] } +clap = "2.33" + [build-dependencies] capnpc = "0.12" diff --git a/src/main.rs b/src/main.rs index ad5e82e..104a127 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,7 +24,9 @@ use session::Session; use futures::prelude::*; use futures::executor::{LocalPool, ThreadPool}; +use futures::compat::Stream01CompatExt; use futures::join; +use futures::task::SpawnExt; use capnp_rpc::twoparty::{VatNetwork, VatId}; use capnp_rpc::rpc_twoparty_capnp::Side; @@ -110,19 +112,21 @@ fn main() -> Result<(), Error> { // Bind to each address in config.listen. // This is a Stream over Futures so it will do absolutely nothing unless polled to completion - let listeners_s: futures::stream::Collect<_, Vec> = stream::iter((&config).listen.iter()) + let listeners_s: futures::stream::Collect<_, Vec> + = stream::iter((&config).listen.iter()) .map(|l| { let addr = l.address.clone(); - TcpListener::bind((l.address.as_str(), l.port.unwrap_or(config::DEFAULT_PORT))) + let port = l.port.unwrap_or(config::DEFAULT_PORT); + TcpListener::bind((l.address.as_str(), port)) // If the bind errors, include the address so we can log it // Since this closure is lazy we need to have a cloned addr - .map_err(|e| { (addr, e) }) + .map_err(move |e| { (addr, port, e) }) }) .filter_map(|f| async { match f.await { Ok(l) => Some(l), - Err((addr, e)) => { - error!(&log, "Could not connect to {}: {}", addr, e); + Err((addr, port, e)) => { + error!(&log, "Could not setup socket on {} port {}: {}", addr, port, e); None } } @@ -130,7 +134,7 @@ fn main() -> Result<(), Error> { let (mdb, pdb, auth) = exec.run_until(async { // Rull all futures to completion in parallel. - // This will "block" until all three are done starting up. + // This will block until all three are done starting up. join!(machinedb_f, permission_f, authentication_f) }); @@ -141,29 +145,122 @@ fn main() -> Result<(), Error> { // Since the below closures will happen at a much later time we need to make sure all pointers // are still valid. Thus, Arc. - let l2 = log.clone(); - let l3 = log.clone(); + let start_log = log.clone(); + let stop_log = log.clone(); // Create a thread pool to run tasks on - let mut pool = ThreadPool::builder() - .after_start(move |i| { info!(l2.new(o!("system" => "threadpool")), "Starting Thread <{}>", i)}) - .before_stop(move |i| { info!(l3.new(o!("system" => "threadpool")), "Stopping Thread <{}>", i)}) + let pool = ThreadPool::builder() + .after_start(move |i| { + info!(start_log.new(o!("system" => "threadpool")), "Starting Thread <{}>", i) + }) + .before_stop(move |i| { + info!(stop_log.new(o!("system" => "threadpool")), "Stopping Thread <{}>", i) + }) .create()?; - // Spawner is a handle to the shared ThreadPool forwarded into each connection - let spawner = pool.clone(); + // Closure inefficiencies. Lucky cloning an Arc is pretty cheap. + let inner_log = log.clone(); + let loop_log = log.clone(); - let result: Result<(), Box> = exec.run_until(async move { + exec.run_until(async move { // Generate a stream of TcpStreams appearing on any of the interfaces we listen to let listeners = listeners_s.await; - let mut incoming = stream::select_all(listeners.iter().map(|l| l.incoming())); + let incoming = stream::select_all(listeners.iter().map(|l| l.incoming())); - // Runn - while let Some(socket) = incoming.next().await { + // Spawner is a handle to the shared ThreadPool forwarded into each connection + let spawner = pool.clone(); + + // For each incoming connection start a new task to handle it and throw it on the thread + // pool + let handle_sockets = incoming.map(|socket| { + // incoming.next() is an error when the underlying `accept` call yielded an error + // In POSIX those are protocol errors we can't really handle, so we just log the error + // and the move on + match socket { + Ok(socket) => { + // If we have it available add the peer's address to all log messages + let log = + if let Ok(addr) = socket.peer_addr() { + inner_log.new(o!("address" => addr)) + } else { + inner_log.new(o!()) + }; + + // Clone a log for potential error handling + let elog = log.clone(); + + // We handle the error using map_err, `let _` is used to quiet the compiler + // warning + let f = api::handle_connection(log.clone(), socket, spawner.clone()) + .map_err(move |e| { + error!(log, "Error occured during protocol handling: {}", e); + }) + // Void any and all results since pool.spawn allows no return value. + .map(|_| ()); + + // In this case only the error is relevant since the Value is always () + if let Err(e) = pool.spawn(f) { + error!(elog, "Failed to spawn connection handler: {}", e); + // Failing to spawn a handler means we are most likely overloaded + return LoopResult::Overloaded; + } + }, + Err(e) => { + error!(inner_log, "Socket `accept` error: {}", e); + } + } + + // Unless we are overloaded we just want to keep going. + return LoopResult::Continue; + }); + + // Check each signal as it arrives + // signals is a futures-0.1 stream, compat() makes it a futures-0.3 (which we use) stream + let handle_signals = signals.compat().map(|_signal| { + // _signal is the signal c_int. + // But since we only listen for SIGINT at the moment we don't really need to look at + // it. + return LoopResult::Stop; + }); + + // Now actually check if a connection was opened or a signal recv'd + let mut combined = stream::select(handle_signals, handle_sockets); + loop { + match combined.next().await { + // When the result says to continue, do exactly that + Some(LoopResult::Continue) => {} + Some(LoopResult::Overloaded) => { + // In case over server overload we should install a replacement handler that + // would instead just return `overloaded` for all connections until the + // situation is remedied. + // + // For now, just log the overload and keep going. + error!(loop_log, "Server overloaded"); + } + // None should never be returned because it would mean all sockets were closed and + // we can not receive any further signals. Still, in that case shut down cleanly + // anyway, the only reason this could happen are some heavy bugs in the runtime + Some(LoopResult::Stop) | None => { + warn!(loop_log, "Stopping server"); + break; + } + } } - - Ok(()) }); + // TODO: Run actual shut down code here + info!(log, "Shutting down..."); + + // Returning () is an implicit success so this will properly set the exit code as well Ok(()) } + +/// The result of one iteration of the core loop +enum LoopResult { + /// Everything was fine, keep going + Continue, + /// Something happened that means we should shut down + Stop, + /// The Server is currently overloaded + Overloaded, +}