mirror of
https://gitlab.com/fabinfra/fabaccess/bffh.git
synced 2024-11-22 23:07:56 +01:00
Main refactor #2
This commit is contained in:
parent
e08b9e43d8
commit
a0d60a574f
@ -1,4 +1,6 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::future::Future;
|
||||||
|
use futures::FutureExt;
|
||||||
|
|
||||||
use slog::Logger;
|
use slog::Logger;
|
||||||
|
|
||||||
@ -71,17 +73,27 @@ async fn handshake(log: &Logger, stream: &mut TcpStream) -> Result<()> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_connection(log: Logger, mut stream: TcpStream, db: Databases) -> Result<()> {
|
pub struct ConnectionHandler {
|
||||||
info!(log, "New connection from on {:?}", stream);
|
log: Logger,
|
||||||
let session = Arc::new(Session::new(log, db.access.clone()));
|
db: Databases,
|
||||||
let boots = Bootstrap::new(session, db);
|
}
|
||||||
|
|
||||||
|
impl ConnectionHandler {
|
||||||
|
pub fn new(log: Logger, db: Databases) -> Self {
|
||||||
|
Self { log, db }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn handle(&mut self, mut stream: TcpStream) -> impl Future<Output=Result<()>> {
|
||||||
|
info!(self.log, "New connection from on {:?}", stream);
|
||||||
|
let session = Arc::new(Session::new(self.log.new(o!()), self.db.access.clone()));
|
||||||
|
let boots = Bootstrap::new(session, self.db.clone());
|
||||||
let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(boots);
|
let rpc: connection_capnp::bootstrap::Client = capnp_rpc::new_client(boots);
|
||||||
|
|
||||||
let network = twoparty::VatNetwork::new(stream.clone(), stream,
|
let network = twoparty::VatNetwork::new(stream.clone(), stream,
|
||||||
rpc_twoparty_capnp::Side::Server, Default::default());
|
rpc_twoparty_capnp::Side::Server, Default::default());
|
||||||
let rpc_system = capnp_rpc::RpcSystem::new(Box::new(network),
|
let rpc_system = capnp_rpc::RpcSystem::new(Box::new(network), Some(rpc.client));
|
||||||
Some(rpc.client));
|
|
||||||
|
|
||||||
rpc_system.await.unwrap();
|
// Convert the error type to one of our errors
|
||||||
Ok(())
|
rpc_system.map(|r| r.map_err(Into::into))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
45
src/db.rs
45
src/db.rs
@ -1,4 +1,11 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
use slog::Logger;
|
||||||
|
|
||||||
|
use crate::error::Result;
|
||||||
|
use crate::config::Settings;
|
||||||
|
|
||||||
/// (Hashed) password database
|
/// (Hashed) password database
|
||||||
pub mod pass;
|
pub mod pass;
|
||||||
@ -22,3 +29,41 @@ pub struct Databases {
|
|||||||
pub machine: Arc<machine::MachineDB>,
|
pub machine: Arc<machine::MachineDB>,
|
||||||
pub passdb: Arc<pass::PassDB>,
|
pub passdb: Arc<pass::PassDB>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const LMDB_MAX_DB: u32 = 16;
|
||||||
|
|
||||||
|
impl Databases {
|
||||||
|
pub fn new(log: &Logger, config: &Settings) -> Result<Self> {
|
||||||
|
|
||||||
|
// Initialize the LMDB environment. This blocks untill the mmap() finishes
|
||||||
|
info!(log, "LMDB env");
|
||||||
|
let env = lmdb::Environment::new()
|
||||||
|
.set_flags(lmdb::EnvironmentFlags::MAP_ASYNC | lmdb::EnvironmentFlags::NO_SUB_DIR)
|
||||||
|
.set_max_dbs(LMDB_MAX_DB as libc::c_uint)
|
||||||
|
.open(&PathBuf::from_str("/tmp/a.db").unwrap())?;
|
||||||
|
|
||||||
|
// Start loading the machine database, authentication system and permission system
|
||||||
|
// All of those get a custom logger so the source of a log message can be better traced and
|
||||||
|
// filtered
|
||||||
|
let env = Arc::new(env);
|
||||||
|
let mdb = machine::init(log.new(o!("system" => "machines")), &config, env.clone())?;
|
||||||
|
|
||||||
|
// Error out if any of the subsystems failed to start.
|
||||||
|
let defs = crate::machine::MachineDescription::load_file(&config.machines)?;
|
||||||
|
let machdb = machine::MachineDB::new(mdb, defs);
|
||||||
|
|
||||||
|
|
||||||
|
let mut ac = access::AccessControl::new();
|
||||||
|
|
||||||
|
let permdb = access::init(log.new(o!("system" => "permissions")), &config, env.clone())?;
|
||||||
|
ac.add_source_unchecked("Internal".to_string(), Box::new(permdb));
|
||||||
|
|
||||||
|
let passdb = pass::PassDB::init(log.new(o!("system" => "passwords")), env.clone()).unwrap();
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
access: Arc::new(ac),
|
||||||
|
machine: Arc::new(machdb),
|
||||||
|
passdb: Arc::new(passdb),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
248
src/main.rs
248
src/main.rs
@ -18,6 +18,7 @@ mod schema;
|
|||||||
mod db;
|
mod db;
|
||||||
mod machine;
|
mod machine;
|
||||||
mod builtin;
|
mod builtin;
|
||||||
|
mod server;
|
||||||
|
|
||||||
use clap::{App, Arg};
|
use clap::{App, Arg};
|
||||||
|
|
||||||
@ -27,9 +28,6 @@ use futures::compat::Stream01CompatExt;
|
|||||||
use futures::join;
|
use futures::join;
|
||||||
use futures::task::LocalSpawn;
|
use futures::task::LocalSpawn;
|
||||||
|
|
||||||
use smol::net::TcpListener;
|
|
||||||
use smol::net::unix::UnixStream;
|
|
||||||
|
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
@ -38,32 +36,15 @@ use std::str::FromStr;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use lmdb::Transaction;
|
use lmdb::Transaction;
|
||||||
|
use smol::net::TcpListener;
|
||||||
|
|
||||||
use error::Error;
|
use error::Error;
|
||||||
|
|
||||||
use registries::Registries;
|
use registries::Registries;
|
||||||
|
|
||||||
const LMDB_MAX_DB: u32 = 16;
|
|
||||||
|
|
||||||
// Returning a `Result` from `main` allows us to use the `?` shorthand.
|
// Returning a `Result` from `main` allows us to use the `?` shorthand.
|
||||||
// In the case of an Err it will be printed using `fmt::Debug`
|
// In the case of an Err it will be printed using `fmt::Debug`
|
||||||
fn main() -> Result<(), Error> {
|
fn main() -> Result<(), Error> {
|
||||||
let signal = Box::pin(async {
|
|
||||||
let (tx, mut rx) = UnixStream::pair()?;
|
|
||||||
// Initialize signal handler.
|
|
||||||
// We currently only care about Ctrl-C so SIGINT it is.
|
|
||||||
// TODO: Make this do SIGHUP and a few others too. (By cloning the tx end of the pipe)
|
|
||||||
signal_hook::pipe::register(signal_hook::SIGINT, tx)?;
|
|
||||||
// When a signal is received this future can complete and read a byte from the underlying
|
|
||||||
// socket — the actual data is discarded but the act of being able to receive data tells us
|
|
||||||
// that we received a SIGINT.
|
|
||||||
|
|
||||||
// FIXME: What errors are possible and how to handle them properly?
|
|
||||||
rx.read_exact(&mut [0u8]).await?;
|
|
||||||
|
|
||||||
io::Result::Ok(LoopResult::Stop)
|
|
||||||
});
|
|
||||||
|
|
||||||
use clap::{crate_version, crate_description, crate_name};
|
use clap::{crate_version, crate_description, crate_name};
|
||||||
|
|
||||||
// Argument parsing
|
// Argument parsing
|
||||||
@ -109,6 +90,9 @@ fn main() -> Result<(), Error> {
|
|||||||
|
|
||||||
// Early return to exit.
|
// Early return to exit.
|
||||||
return Ok(())
|
return Ok(())
|
||||||
|
} else if matches.is_present("dump") {
|
||||||
|
} else if matches.is_present("load") {
|
||||||
|
} else {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -123,229 +107,13 @@ fn main() -> Result<(), Error> {
|
|||||||
let log = Arc::new(log::init(&config));
|
let log = Arc::new(log::init(&config));
|
||||||
info!(log, "Starting");
|
info!(log, "Starting");
|
||||||
|
|
||||||
// Initialize the LMDB environment. This blocks untill the mmap() finishes
|
let db = db::Databases::new(&log, &config)?;
|
||||||
let e_config = config.clone();
|
|
||||||
info!(log, "LMDB env");
|
|
||||||
let env = lmdb::Environment::new()
|
|
||||||
.set_flags(lmdb::EnvironmentFlags::MAP_ASYNC | lmdb::EnvironmentFlags::NO_SUB_DIR)
|
|
||||||
.set_max_dbs(LMDB_MAX_DB as libc::c_uint)
|
|
||||||
.open(&PathBuf::from_str("/tmp/a.db").unwrap())?;
|
|
||||||
|
|
||||||
// Kick up an executor
|
server::serve_api_connections(log, config, db)
|
||||||
// Most initializations from now on do some amount of IO and are much better done in an
|
|
||||||
// asyncronous fashion.
|
|
||||||
let mut exec = LocalPool::new();
|
|
||||||
|
|
||||||
|
|
||||||
// Start loading the machine database, authentication system and permission system
|
|
||||||
// All of those get a custom logger so the source of a log message can be better traced and
|
|
||||||
// filtered
|
|
||||||
let env = Arc::new(env);
|
|
||||||
let mdb = db::machine::init(log.new(o!("system" => "machines")), &config, env.clone());
|
|
||||||
let pdb = db::access::init(log.new(o!("system" => "permissions")), &config, env.clone());
|
|
||||||
|
|
||||||
// If --load or --dump is given we can stop at this point and load/dump the database and then
|
|
||||||
// exit.
|
|
||||||
if matches.is_present("load") {
|
|
||||||
if let Some(pathstr) = matches.value_of("load") {
|
|
||||||
let path = std::path::Path::new(pathstr);
|
|
||||||
|
|
||||||
let mut txn = env.begin_rw_txn()?;
|
|
||||||
let path = path.to_path_buf();
|
|
||||||
pdb?.load_db(&mut txn, path.clone())?;
|
|
||||||
//mdb?.load_db(&mut txn, path)?;
|
|
||||||
txn.commit()?;
|
|
||||||
} else {
|
|
||||||
error!(log, "You must provide a directory path to load from");
|
|
||||||
}
|
|
||||||
|
|
||||||
return Ok(())
|
|
||||||
} else if matches.is_present("dump") {
|
|
||||||
if let Some(pathstr) = matches.value_of("dump") {
|
|
||||||
let path = std::path::Path::new(pathstr);
|
|
||||||
if let Err(e) = std::fs::create_dir_all(path) {
|
|
||||||
error!(log, "The provided path could not be created: {}", e);
|
|
||||||
return Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
let txn = env.begin_ro_txn()?;
|
|
||||||
let path = path.to_path_buf();
|
|
||||||
pdb?.dump_db(&txn, path.clone())?;
|
|
||||||
//mdb?.dump_db(&txn, path)?;
|
|
||||||
} else {
|
|
||||||
error!(log, "You must provide a directory path to dump into");
|
|
||||||
}
|
|
||||||
|
|
||||||
return Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// Bind to each address in config.listen.
|
|
||||||
// This is a Stream over Futures so it will do absolutely nothing unless polled to completion
|
|
||||||
let listeners_s: futures::stream::Collect<_, Vec<TcpListener>>
|
|
||||||
= stream::iter((&config).listens.iter())
|
|
||||||
.map(|l| {
|
|
||||||
let addr = l.address.clone();
|
|
||||||
let port = l.port.unwrap_or(config::DEFAULT_PORT);
|
|
||||||
TcpListener::bind((l.address.as_str(), port))
|
|
||||||
// If the bind errors, include the address so we can log it
|
|
||||||
// Since this closure is lazy we need to have a cloned addr
|
|
||||||
.map_err(move |e| { (addr, port, e) })
|
|
||||||
})
|
|
||||||
.filter_map(|f| async {
|
|
||||||
match f.await {
|
|
||||||
Ok(l) => Some(l),
|
|
||||||
Err((addr, port, e)) => {
|
|
||||||
error!(&log, "Could not setup socket on {} port {}: {}", addr, port, e);
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}).collect();
|
|
||||||
|
|
||||||
//let (mach, auth) = exec.run_until(async {
|
|
||||||
// // Rull all futures to completion in parallel.
|
|
||||||
// // This will block until all three are done starting up.
|
|
||||||
// join!(machinedb_f, authentication_f)
|
|
||||||
//});
|
|
||||||
|
|
||||||
// Error out if any of the subsystems failed to start.
|
|
||||||
let mdb = mdb?;
|
|
||||||
let defs = machine::MachineDescription::load_file(&config.machines)?;
|
|
||||||
let machdb = db::machine::MachineDB::new(mdb, defs);
|
|
||||||
info!(log, "{:?}", machdb);
|
|
||||||
let pdb = pdb?;
|
|
||||||
let mut ac = db::access::AccessControl::new();
|
|
||||||
ac.add_source_unchecked("Internal".to_string(), Box::new(pdb));
|
|
||||||
let machdb = Arc::new(machdb);
|
|
||||||
|
|
||||||
let passdb = db::pass::PassDB::init(log.new(o!("system" => "passwords")), env.clone()).unwrap();
|
|
||||||
let db = db::Databases {
|
|
||||||
access: Arc::new(db::access::AccessControl::new()),
|
|
||||||
machine: machdb.clone(),
|
|
||||||
passdb: Arc::new(passdb),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Since the below closures will happen at a much later time we need to make sure all pointers
|
|
||||||
// are still valid. Thus, Arc.
|
|
||||||
let start_log = log.clone();
|
|
||||||
let stop_log = log.clone();
|
|
||||||
|
|
||||||
// Create a thread pool to run tasks on
|
|
||||||
let pool = ThreadPool::builder()
|
|
||||||
.after_start(move |i| {
|
|
||||||
info!(start_log.new(o!("system" => "threadpool")), "Starting Thread <{}>", i)
|
|
||||||
})
|
|
||||||
.before_stop(move |i| {
|
|
||||||
info!(stop_log.new(o!("system" => "threadpool")), "Stopping Thread <{}>", i)
|
|
||||||
})
|
|
||||||
.create()?;
|
|
||||||
let local_spawn = exec.spawner();
|
|
||||||
|
|
||||||
// Start all modules on the threadpool. The pool will run the modules until it is dropped.
|
|
||||||
// FIXME: implement notification so the modules can shut down cleanly instead of being killed
|
|
||||||
// without warning.
|
|
||||||
let modlog = log.clone();
|
|
||||||
let mut regs = Registries::new(machdb.clone());
|
|
||||||
match exec.run_until(modules::init(modlog.new(o!("system" => "modules")), config.clone(), pool.clone(), regs.clone())) {
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(e) => {
|
|
||||||
error!(modlog, "Module startup failed: {}", e);
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Closure inefficiencies. Lucky cloning an Arc is pretty cheap.
|
|
||||||
let inner_log = log.clone();
|
|
||||||
let loop_log = log.clone();
|
|
||||||
|
|
||||||
exec.run_until(async move {
|
|
||||||
// Generate a stream of TcpStreams appearing on any of the interfaces we listen to
|
|
||||||
let listeners = listeners_s.await;
|
|
||||||
let incoming = stream::select_all(listeners.iter().map(|l| l.incoming()));
|
|
||||||
|
|
||||||
// For each incoming connection start a new task to handle it
|
|
||||||
let handle_sockets = incoming.map(|socket| {
|
|
||||||
// incoming.next() returns an error when the underlying `accept` call yielded an error
|
|
||||||
// In POSIX those are protocol errors we can't really handle, so we just log the error
|
|
||||||
// and the move on
|
|
||||||
match socket {
|
|
||||||
Ok(socket) => {
|
|
||||||
// If we have it available add the peer's address to all log messages
|
|
||||||
let log =
|
|
||||||
if let Ok(addr) = socket.peer_addr() {
|
|
||||||
inner_log.new(o!("address" => addr))
|
|
||||||
} else {
|
|
||||||
inner_log.new(o!())
|
|
||||||
};
|
|
||||||
|
|
||||||
// Clone a log for potential error handling
|
|
||||||
let elog = log.clone();
|
|
||||||
|
|
||||||
// We handle the error using map_err
|
|
||||||
let f = connection::handle_connection(log.clone(), socket, db.clone())
|
|
||||||
.map_err(move |e| {
|
|
||||||
error!(log, "Error occured during protocol handling: {}", e);
|
|
||||||
})
|
|
||||||
// Void any and all results since pool.spawn allows no return value.
|
|
||||||
.map(|_| ());
|
|
||||||
|
|
||||||
// In this case only the error is relevant since the Value is always ()
|
|
||||||
// The future is Boxed to make it the `LocalFutureObj` that LocalSpawn expects
|
|
||||||
if let Err(e) = local_spawn.spawn_local_obj(Box::new(f).into()) {
|
|
||||||
error!(elog, "Failed to spawn connection handler: {}", e);
|
|
||||||
// Failing to spawn a handler means we are most likely overloaded
|
|
||||||
return LoopResult::Overloaded;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(e) => {
|
|
||||||
error!(inner_log, "Socket `accept` error: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unless we are overloaded we just want to keep going.
|
|
||||||
return LoopResult::Continue;
|
|
||||||
});
|
|
||||||
|
|
||||||
// Check each signal as it arrives
|
|
||||||
let handle_signals = signal.map(|r| { r.unwrap() }).into_stream();
|
|
||||||
|
|
||||||
let mut combined = stream::select(handle_signals, handle_sockets);
|
|
||||||
|
|
||||||
// This is the basic main loop that drives execution
|
|
||||||
loop {
|
|
||||||
match combined.next().await {
|
|
||||||
// When the result says to continue, do exactly that
|
|
||||||
Some(LoopResult::Continue) => {}
|
|
||||||
Some(LoopResult::Overloaded) => {
|
|
||||||
// In case over server overload we should install a replacement handler that
|
|
||||||
// would instead just return `overloaded` for all connections until the
|
|
||||||
// situation is remedied.
|
|
||||||
//
|
|
||||||
// For now, just log the overload and keep going.
|
|
||||||
error!(loop_log, "Server overloaded");
|
|
||||||
}
|
|
||||||
// When the result says to stop the server, do exactly that.
|
|
||||||
// Also catches a `None` from the stream; None should never be returned because it
|
|
||||||
// would mean all sockets were closed and we can not receive any further signals.
|
|
||||||
// Still, in that case shut down cleanly anyway, the only reason this could happen
|
|
||||||
// are some heavy bugs in the runtime
|
|
||||||
Some(LoopResult::Stop) | None => {
|
|
||||||
warn!(loop_log, "Stopping server");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// TODO: Run actual shut down code here
|
|
||||||
info!(log, "Shutting down...");
|
|
||||||
|
|
||||||
// Returning () is an implicit success so this will properly set the exit code as well
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The result of one iteration of the core loop
|
/// The result of one iteration of the core loop
|
||||||
enum LoopResult {
|
pub enum LoopResult {
|
||||||
/// Everything was fine, keep going
|
/// Everything was fine, keep going
|
||||||
Continue,
|
Continue,
|
||||||
/// Something happened that means we should shut down
|
/// Something happened that means we should shut down
|
||||||
|
Loading…
Reference in New Issue
Block a user