diff --git a/src/connection.rs b/src/connection.rs index d367793..e958405 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,4 +1,6 @@ use std::sync::Arc; +use std::future::Future; +use futures::FutureExt; use slog::Logger; @@ -71,17 +73,27 @@ async fn handshake(log: &Logger, stream: &mut TcpStream) -> Result<()> { } } -pub async fn handle_connection(log: Logger, mut stream: TcpStream, db: Databases) -> Result<()> { - info!(log, "New connection from on {:?}", stream); - let session = Arc::new(Session::new(log, db.access.clone())); - let boots = Bootstrap::new(session, db); - let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(boots); - - let network = twoparty::VatNetwork::new(stream.clone(), stream, - rpc_twoparty_capnp::Side::Server, Default::default()); - let rpc_system = capnp_rpc::RpcSystem::new(Box::new(network), - Some(rpc.client)); - - rpc_system.await.unwrap(); - Ok(()) +pub struct ConnectionHandler { + log: Logger, + db: Databases, +} + +impl ConnectionHandler { + pub fn new(log: Logger, db: Databases) -> Self { + Self { log, db } + } + + pub fn handle(&mut self, mut stream: TcpStream) -> impl Future> { + info!(self.log, "New connection from on {:?}", stream); + let session = Arc::new(Session::new(self.log.new(o!()), self.db.access.clone())); + let boots = Bootstrap::new(session, self.db.clone()); + let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(boots); + + let network = twoparty::VatNetwork::new(stream.clone(), stream, + rpc_twoparty_capnp::Side::Server, Default::default()); + let rpc_system = capnp_rpc::RpcSystem::new(Box::new(network), Some(rpc.client)); + + // Convert the error type to one of our errors + rpc_system.map(|r| r.map_err(Into::into)) + } } diff --git a/src/db.rs b/src/db.rs index 97140d5..35c7eea 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,4 +1,11 @@ use std::sync::Arc; +use std::path::PathBuf; +use std::str::FromStr; + +use slog::Logger; + +use crate::error::Result; +use crate::config::Settings; /// (Hashed) password database pub mod pass; @@ -22,3 +29,41 @@ pub struct Databases { pub machine: Arc, pub passdb: Arc, } + +const LMDB_MAX_DB: u32 = 16; + +impl Databases { + pub fn new(log: &Logger, config: &Settings) -> Result { + + // Initialize the LMDB environment. This blocks untill the mmap() finishes + info!(log, "LMDB env"); + let env = lmdb::Environment::new() + .set_flags(lmdb::EnvironmentFlags::MAP_ASYNC | lmdb::EnvironmentFlags::NO_SUB_DIR) + .set_max_dbs(LMDB_MAX_DB as libc::c_uint) + .open(&PathBuf::from_str("/tmp/a.db").unwrap())?; + + // Start loading the machine database, authentication system and permission system + // All of those get a custom logger so the source of a log message can be better traced and + // filtered + let env = Arc::new(env); + let mdb = machine::init(log.new(o!("system" => "machines")), &config, env.clone())?; + + // Error out if any of the subsystems failed to start. + let defs = crate::machine::MachineDescription::load_file(&config.machines)?; + let machdb = machine::MachineDB::new(mdb, defs); + + + let mut ac = access::AccessControl::new(); + + let permdb = access::init(log.new(o!("system" => "permissions")), &config, env.clone())?; + ac.add_source_unchecked("Internal".to_string(), Box::new(permdb)); + + let passdb = pass::PassDB::init(log.new(o!("system" => "passwords")), env.clone()).unwrap(); + + Ok(Self { + access: Arc::new(ac), + machine: Arc::new(machdb), + passdb: Arc::new(passdb), + }) + } +} diff --git a/src/main.rs b/src/main.rs index b5a223b..c0f53c0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,6 +18,7 @@ mod schema; mod db; mod machine; mod builtin; +mod server; use clap::{App, Arg}; @@ -27,9 +28,6 @@ use futures::compat::Stream01CompatExt; use futures::join; use futures::task::LocalSpawn; -use smol::net::TcpListener; -use smol::net::unix::UnixStream; - use std::io; use std::io::Write; use std::path::PathBuf; @@ -38,32 +36,15 @@ use std::str::FromStr; use std::sync::Arc; use lmdb::Transaction; +use smol::net::TcpListener; use error::Error; use registries::Registries; -const LMDB_MAX_DB: u32 = 16; - // Returning a `Result` from `main` allows us to use the `?` shorthand. // In the case of an Err it will be printed using `fmt::Debug` fn main() -> Result<(), Error> { - let signal = Box::pin(async { - let (tx, mut rx) = UnixStream::pair()?; - // Initialize signal handler. - // We currently only care about Ctrl-C so SIGINT it is. - // TODO: Make this do SIGHUP and a few others too. (By cloning the tx end of the pipe) - signal_hook::pipe::register(signal_hook::SIGINT, tx)?; - // When a signal is received this future can complete and read a byte from the underlying - // socket — the actual data is discarded but the act of being able to receive data tells us - // that we received a SIGINT. - - // FIXME: What errors are possible and how to handle them properly? - rx.read_exact(&mut [0u8]).await?; - - io::Result::Ok(LoopResult::Stop) - }); - use clap::{crate_version, crate_description, crate_name}; // Argument parsing @@ -109,6 +90,9 @@ fn main() -> Result<(), Error> { // Early return to exit. return Ok(()) + } else if matches.is_present("dump") { + } else if matches.is_present("load") { + } else { } @@ -123,229 +107,13 @@ fn main() -> Result<(), Error> { let log = Arc::new(log::init(&config)); info!(log, "Starting"); - // Initialize the LMDB environment. This blocks untill the mmap() finishes - let e_config = config.clone(); - info!(log, "LMDB env"); - let env = lmdb::Environment::new() - .set_flags(lmdb::EnvironmentFlags::MAP_ASYNC | lmdb::EnvironmentFlags::NO_SUB_DIR) - .set_max_dbs(LMDB_MAX_DB as libc::c_uint) - .open(&PathBuf::from_str("/tmp/a.db").unwrap())?; + let db = db::Databases::new(&log, &config)?; - // Kick up an executor - // Most initializations from now on do some amount of IO and are much better done in an - // asyncronous fashion. - let mut exec = LocalPool::new(); - - - // Start loading the machine database, authentication system and permission system - // All of those get a custom logger so the source of a log message can be better traced and - // filtered - let env = Arc::new(env); - let mdb = db::machine::init(log.new(o!("system" => "machines")), &config, env.clone()); - let pdb = db::access::init(log.new(o!("system" => "permissions")), &config, env.clone()); - - // If --load or --dump is given we can stop at this point and load/dump the database and then - // exit. - if matches.is_present("load") { - if let Some(pathstr) = matches.value_of("load") { - let path = std::path::Path::new(pathstr); - - let mut txn = env.begin_rw_txn()?; - let path = path.to_path_buf(); - pdb?.load_db(&mut txn, path.clone())?; - //mdb?.load_db(&mut txn, path)?; - txn.commit()?; - } else { - error!(log, "You must provide a directory path to load from"); - } - - return Ok(()) - } else if matches.is_present("dump") { - if let Some(pathstr) = matches.value_of("dump") { - let path = std::path::Path::new(pathstr); - if let Err(e) = std::fs::create_dir_all(path) { - error!(log, "The provided path could not be created: {}", e); - return Ok(()) - } - - let txn = env.begin_ro_txn()?; - let path = path.to_path_buf(); - pdb?.dump_db(&txn, path.clone())?; - //mdb?.dump_db(&txn, path)?; - } else { - error!(log, "You must provide a directory path to dump into"); - } - - return Ok(()) - } - - - // Bind to each address in config.listen. - // This is a Stream over Futures so it will do absolutely nothing unless polled to completion - let listeners_s: futures::stream::Collect<_, Vec> - = stream::iter((&config).listens.iter()) - .map(|l| { - let addr = l.address.clone(); - let port = l.port.unwrap_or(config::DEFAULT_PORT); - TcpListener::bind((l.address.as_str(), port)) - // If the bind errors, include the address so we can log it - // Since this closure is lazy we need to have a cloned addr - .map_err(move |e| { (addr, port, e) }) - }) - .filter_map(|f| async { - match f.await { - Ok(l) => Some(l), - Err((addr, port, e)) => { - error!(&log, "Could not setup socket on {} port {}: {}", addr, port, e); - None - } - } - }).collect(); - - //let (mach, auth) = exec.run_until(async { - // // Rull all futures to completion in parallel. - // // This will block until all three are done starting up. - // join!(machinedb_f, authentication_f) - //}); - - // Error out if any of the subsystems failed to start. - let mdb = mdb?; - let defs = machine::MachineDescription::load_file(&config.machines)?; - let machdb = db::machine::MachineDB::new(mdb, defs); - info!(log, "{:?}", machdb); - let pdb = pdb?; - let mut ac = db::access::AccessControl::new(); - ac.add_source_unchecked("Internal".to_string(), Box::new(pdb)); - let machdb = Arc::new(machdb); - - let passdb = db::pass::PassDB::init(log.new(o!("system" => "passwords")), env.clone()).unwrap(); - let db = db::Databases { - access: Arc::new(db::access::AccessControl::new()), - machine: machdb.clone(), - passdb: Arc::new(passdb), - }; - - // Since the below closures will happen at a much later time we need to make sure all pointers - // are still valid. Thus, Arc. - let start_log = log.clone(); - let stop_log = log.clone(); - - // Create a thread pool to run tasks on - let pool = ThreadPool::builder() - .after_start(move |i| { - info!(start_log.new(o!("system" => "threadpool")), "Starting Thread <{}>", i) - }) - .before_stop(move |i| { - info!(stop_log.new(o!("system" => "threadpool")), "Stopping Thread <{}>", i) - }) - .create()?; - let local_spawn = exec.spawner(); - - // Start all modules on the threadpool. The pool will run the modules until it is dropped. - // FIXME: implement notification so the modules can shut down cleanly instead of being killed - // without warning. - let modlog = log.clone(); - let mut regs = Registries::new(machdb.clone()); - match exec.run_until(modules::init(modlog.new(o!("system" => "modules")), config.clone(), pool.clone(), regs.clone())) { - Ok(()) => {} - Err(e) => { - error!(modlog, "Module startup failed: {}", e); - return Err(e); - } - } - - // Closure inefficiencies. Lucky cloning an Arc is pretty cheap. - let inner_log = log.clone(); - let loop_log = log.clone(); - - exec.run_until(async move { - // Generate a stream of TcpStreams appearing on any of the interfaces we listen to - let listeners = listeners_s.await; - let incoming = stream::select_all(listeners.iter().map(|l| l.incoming())); - - // For each incoming connection start a new task to handle it - let handle_sockets = incoming.map(|socket| { - // incoming.next() returns an error when the underlying `accept` call yielded an error - // In POSIX those are protocol errors we can't really handle, so we just log the error - // and the move on - match socket { - Ok(socket) => { - // If we have it available add the peer's address to all log messages - let log = - if let Ok(addr) = socket.peer_addr() { - inner_log.new(o!("address" => addr)) - } else { - inner_log.new(o!()) - }; - - // Clone a log for potential error handling - let elog = log.clone(); - - // We handle the error using map_err - let f = connection::handle_connection(log.clone(), socket, db.clone()) - .map_err(move |e| { - error!(log, "Error occured during protocol handling: {}", e); - }) - // Void any and all results since pool.spawn allows no return value. - .map(|_| ()); - - // In this case only the error is relevant since the Value is always () - // The future is Boxed to make it the `LocalFutureObj` that LocalSpawn expects - if let Err(e) = local_spawn.spawn_local_obj(Box::new(f).into()) { - error!(elog, "Failed to spawn connection handler: {}", e); - // Failing to spawn a handler means we are most likely overloaded - return LoopResult::Overloaded; - } - }, - Err(e) => { - error!(inner_log, "Socket `accept` error: {}", e); - } - } - - // Unless we are overloaded we just want to keep going. - return LoopResult::Continue; - }); - - // Check each signal as it arrives - let handle_signals = signal.map(|r| { r.unwrap() }).into_stream(); - - let mut combined = stream::select(handle_signals, handle_sockets); - - // This is the basic main loop that drives execution - loop { - match combined.next().await { - // When the result says to continue, do exactly that - Some(LoopResult::Continue) => {} - Some(LoopResult::Overloaded) => { - // In case over server overload we should install a replacement handler that - // would instead just return `overloaded` for all connections until the - // situation is remedied. - // - // For now, just log the overload and keep going. - error!(loop_log, "Server overloaded"); - } - // When the result says to stop the server, do exactly that. - // Also catches a `None` from the stream; None should never be returned because it - // would mean all sockets were closed and we can not receive any further signals. - // Still, in that case shut down cleanly anyway, the only reason this could happen - // are some heavy bugs in the runtime - Some(LoopResult::Stop) | None => { - warn!(loop_log, "Stopping server"); - break; - } - } - } - }); - - // TODO: Run actual shut down code here - info!(log, "Shutting down..."); - - // Returning () is an implicit success so this will properly set the exit code as well - Ok(()) + server::serve_api_connections(log, config, db) } /// The result of one iteration of the core loop -enum LoopResult { +pub enum LoopResult { /// Everything was fine, keep going Continue, /// Something happened that means we should shut down