From 764b08d4fa214a0aa7caf2f59cca0638b276be99 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Tue, 18 Feb 2020 01:30:40 +0100 Subject: [PATCH] main.rs cleanup --- src/main.rs | 177 ++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 130 insertions(+), 47 deletions(-) diff --git a/src/main.rs b/src/main.rs index 58a104d..2105adf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,75 +12,158 @@ mod api; mod config; mod error; mod machine; +mod session; + +use signal_hook::iterator::Signals; + +use clap::{App, Arg}; use api::api as api_capnp; +use session::Session; + use futures::prelude::*; -use futures_signals::signal::Mutable; -use futures::task::LocalSpawn; +use futures::executor::{LocalPool, ThreadPool}; +use futures::join; -fn main() { - let log = log::init(); - info!(log, "Starting"); +use capnp_rpc::twoparty::{VatNetwork, VatId}; +use capnp_rpc::rpc_twoparty_capnp::Side; - let config = config::read().unwrap(); +use async_std::net::{TcpListener, TcpStream}; - modules::init(log.new(o!())); - api::init(); +use std::io; +use std::io::Write; +use std::path::PathBuf; +use std::str::FromStr; +use std::mem::drop; - let m = machine::init(&config).unwrap(); - let m = Mutable::new(m); - let m2 = m.clone(); - let c2 = config.clone(); +use std::sync::Arc; +use error::Error; - let mut exec = futures::executor::LocalPool::new(); +// Returning a `Result` from `main` allows us to use the `?` shorthand. +// In the case of an Err it will be printed using `fmt::Debug` +fn main() -> Result<(), Error> { + // Initialize signal handler. + // Specifically, this is a Stream of c_int representing received signals + // We currently only care about Ctrl-C so SIGINT it is. + // TODO: Make this do SIGHUP and a few others too. + let signals = Signals::new(&[signal_hook::SIGINT])?.into_async()?; - let enf = exec.run_until(async { - let e = access::init(&config).await.unwrap(); - Mutable::new(e) - }); + use clap::{crate_version, crate_description, crate_name}; - let p = auth::open_passdb(&config.passdb).unwrap(); - let p = Mutable::new(p); - let authp = auth::AuthenticationProvider::new(p, enf.clone()); - let authp = Mutable::new(authp); + // Argument parsing + // values for the name, description and version are pulled from `Cargo.toml`. + let matches = App::new(crate_name!()) + .about(crate_description!()) + .version(crate_version!()) + .arg(Arg::with_name("config") + .help("Path to the config file to use") + .long("config") + .short("c") + .takes_value(true) + ) + .arg(Arg::with_name("print default") + .help("Print a default config to stdout instead of running") + .long("print-default") + ) + .get_matches(); - use std::net::ToSocketAddrs; - let args: Vec = ::std::env::args().collect(); - if args.len() != 2 { - println!("usage: {} ADDRESS[:PORT]", args[0]); - return; + // Check for the --print-default option first because we don't need to do anything else in that + // case. + if matches.is_present("print default") { + let config = config::Config::default(); + let encoded = toml::to_vec(&config)?; + + // Direct writing to fd 1 is faster but also prevents any print-formatting that could + // invalidate the generated TOML + let stdout = io::stdout(); + let mut handle = stdout.lock(); + handle.write_all(&encoded)?; + + // Early return to exit. + return Ok(()) } - let addr = args[1].to_socket_addrs().unwrap().next().expect("could not parse address"); + // If no `config` option is given use a preset default. + let configpath = matches.value_of("config").unwrap_or("/etc/diflouroborane.toml"); + let config = config::read(&PathBuf::from_str(configpath).unwrap())?; - let permlog = log.new(o!()); - let machlog = log.new(o!()); + // Initialize the logging subsystem first to be able to better document the progress from now + // on. + // TODO: Now would be a really good time to close stdin/out and move logging to syslog + // Log is in an Arc so we can do very cheap clones in closures. + let log = Arc::new(log::init(&config)); + info!(log, "Starting"); - machine::save(&c2, &m2.lock_ref()).expect("MachineDB save"); + // Kick up an executor + // Most initializations from now on do some amount of IO and are much better done in an + // asyncronous fashion. + let mut exec = LocalPool::new(); + + // Start loading the machine database, authentication system and permission system + // All of those get a custom logger so the source of a log message can be better traced and + // filtered + let machinedb_f = machine::init(log.new(o!("system" => "machinedb")), &config); + let permission_f = access::init(log.new(o!("system" => "permissions")), &config); + let authentication_f = auth::init(log.new(o!("system" => "authentication")), &config); + + // Bind to each address in config.listen. + // This is a Stream over Futures so it will do absolutely nothing unless polled to completion + let listeners_s: futures::stream::Collect<_, Vec> = stream::iter((&config).listen.iter()) + .map(|l| { + let addr = l.address.clone(); + TcpListener::bind((l.address.as_str(), l.port.unwrap_or(config::DEFAULT_PORT))) + // If the bind errors, include the address so we can log it + // Since this closure is lazy we need to have a cloned addr + .map_err(|e| { (addr, e) }) + }) + .filter_map(|f| async { + match f.await { + Ok(l) => Some(l), + Err((addr, e)) => { + error!(&log, "Could not connect to {}: {}", addr, e); + None + } + } + }).collect(); + + let (mdb, pdb, auth) = exec.run_until(async { + // Rull all futures to completion in parallel. + // This will "block" until all three are done starting up. + join!(machinedb_f, permission_f, authentication_f) + }); + + // Error out if any of the subsystems failed to start. + let mdb = mdb?; + let pdb = pdb.unwrap(); + let auth = auth?; + + // Since the below closures will happen at a much later time we need to make sure all pointers + // are still valid. Thus, Arc. + let l2 = log.clone(); + let l3 = log.clone(); + + // Create a thread pool to run tasks on + let mut pool = ThreadPool::builder() + .after_start(move |i| { info!(l2.new(o!("system" => "threadpool")), "Starting Thread <{}>", i)}) + .before_stop(move |i| { info!(l3.new(o!("system" => "threadpool")), "Stopping Thread <{}>", i)}) + .create()?; + + // Spawner is a handle to the shared ThreadPool forwarded into each connection + let spawner = pool.clone(); - let spawner = exec.spawner(); let result: Result<(), Box> = exec.run_until(async move { - let listener = async_std::net::TcpListener::bind(&addr).await?; - let mut incoming = listener.incoming(); - while let Some(socket) = incoming.next().await { - let socket = socket?; - trace!(log, "New connection from {:?}", socket.peer_addr()); - // TODO: Prettify session handling - let auth = auth::Authentication::new(authp.clone()); - trace!(log, "Init auth"); - let perm = access::Permissions::new(permlog.clone(), enf.clone(), auth.clone()); - trace!(log, "Init perm"); - let mach = machine::Machines::new(machlog.clone(), m.clone(), perm.clone()); - trace!(log, "Init mach"); + // Generate a stream of TcpStreams appearing on any of the interfaces we listen to + let listeners = listeners_s.await; + let mut incoming = stream::select_all(listeners.iter().map(|l| l.incoming())); - let rpc_system = api::process_socket(auth, perm, mach, socket); - spawner.spawn_local_obj( - Box::pin(rpc_system.map_err(|e| println!("error: {:?}", e)).map(|_|())).into()).expect("spawn") + // Runn + while let Some(socket) = incoming.next().await { } + Ok(()) }); - result.expect("main"); + Ok(()) }