mirror of
https://gitlab.com/fabinfra/fabaccess/bffh.git
synced 2024-11-25 08:07:57 +01:00
main.rs cleanup
This commit is contained in:
parent
f5b37f2006
commit
764b08d4fa
183
src/main.rs
183
src/main.rs
@ -12,75 +12,158 @@ mod api;
|
|||||||
mod config;
|
mod config;
|
||||||
mod error;
|
mod error;
|
||||||
mod machine;
|
mod machine;
|
||||||
|
mod session;
|
||||||
|
|
||||||
|
use signal_hook::iterator::Signals;
|
||||||
|
|
||||||
|
use clap::{App, Arg};
|
||||||
|
|
||||||
use api::api as api_capnp;
|
use api::api as api_capnp;
|
||||||
|
|
||||||
use futures::prelude::*;
|
use session::Session;
|
||||||
use futures_signals::signal::Mutable;
|
|
||||||
use futures::task::LocalSpawn;
|
|
||||||
|
|
||||||
fn main() {
|
use futures::prelude::*;
|
||||||
let log = log::init();
|
use futures::executor::{LocalPool, ThreadPool};
|
||||||
|
use futures::join;
|
||||||
|
|
||||||
|
use capnp_rpc::twoparty::{VatNetwork, VatId};
|
||||||
|
use capnp_rpc::rpc_twoparty_capnp::Side;
|
||||||
|
|
||||||
|
use async_std::net::{TcpListener, TcpStream};
|
||||||
|
|
||||||
|
use std::io;
|
||||||
|
use std::io::Write;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::str::FromStr;
|
||||||
|
use std::mem::drop;
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use error::Error;
|
||||||
|
|
||||||
|
// Returning a `Result` from `main` allows us to use the `?` shorthand.
|
||||||
|
// In the case of an Err it will be printed using `fmt::Debug`
|
||||||
|
fn main() -> Result<(), Error> {
|
||||||
|
// Initialize signal handler.
|
||||||
|
// Specifically, this is a Stream of c_int representing received signals
|
||||||
|
// We currently only care about Ctrl-C so SIGINT it is.
|
||||||
|
// TODO: Make this do SIGHUP and a few others too.
|
||||||
|
let signals = Signals::new(&[signal_hook::SIGINT])?.into_async()?;
|
||||||
|
|
||||||
|
use clap::{crate_version, crate_description, crate_name};
|
||||||
|
|
||||||
|
// Argument parsing
|
||||||
|
// values for the name, description and version are pulled from `Cargo.toml`.
|
||||||
|
let matches = App::new(crate_name!())
|
||||||
|
.about(crate_description!())
|
||||||
|
.version(crate_version!())
|
||||||
|
.arg(Arg::with_name("config")
|
||||||
|
.help("Path to the config file to use")
|
||||||
|
.long("config")
|
||||||
|
.short("c")
|
||||||
|
.takes_value(true)
|
||||||
|
)
|
||||||
|
.arg(Arg::with_name("print default")
|
||||||
|
.help("Print a default config to stdout instead of running")
|
||||||
|
.long("print-default")
|
||||||
|
)
|
||||||
|
.get_matches();
|
||||||
|
|
||||||
|
// Check for the --print-default option first because we don't need to do anything else in that
|
||||||
|
// case.
|
||||||
|
if matches.is_present("print default") {
|
||||||
|
let config = config::Config::default();
|
||||||
|
let encoded = toml::to_vec(&config)?;
|
||||||
|
|
||||||
|
// Direct writing to fd 1 is faster but also prevents any print-formatting that could
|
||||||
|
// invalidate the generated TOML
|
||||||
|
let stdout = io::stdout();
|
||||||
|
let mut handle = stdout.lock();
|
||||||
|
handle.write_all(&encoded)?;
|
||||||
|
|
||||||
|
// Early return to exit.
|
||||||
|
return Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no `config` option is given use a preset default.
|
||||||
|
let configpath = matches.value_of("config").unwrap_or("/etc/diflouroborane.toml");
|
||||||
|
let config = config::read(&PathBuf::from_str(configpath).unwrap())?;
|
||||||
|
|
||||||
|
// Initialize the logging subsystem first to be able to better document the progress from now
|
||||||
|
// on.
|
||||||
|
// TODO: Now would be a really good time to close stdin/out and move logging to syslog
|
||||||
|
// Log is in an Arc so we can do very cheap clones in closures.
|
||||||
|
let log = Arc::new(log::init(&config));
|
||||||
info!(log, "Starting");
|
info!(log, "Starting");
|
||||||
|
|
||||||
let config = config::read().unwrap();
|
// Kick up an executor
|
||||||
|
// Most initializations from now on do some amount of IO and are much better done in an
|
||||||
|
// asyncronous fashion.
|
||||||
|
let mut exec = LocalPool::new();
|
||||||
|
|
||||||
modules::init(log.new(o!()));
|
// Start loading the machine database, authentication system and permission system
|
||||||
api::init();
|
// All of those get a custom logger so the source of a log message can be better traced and
|
||||||
|
// filtered
|
||||||
|
let machinedb_f = machine::init(log.new(o!("system" => "machinedb")), &config);
|
||||||
|
let permission_f = access::init(log.new(o!("system" => "permissions")), &config);
|
||||||
|
let authentication_f = auth::init(log.new(o!("system" => "authentication")), &config);
|
||||||
|
|
||||||
let m = machine::init(&config).unwrap();
|
// Bind to each address in config.listen.
|
||||||
let m = Mutable::new(m);
|
// This is a Stream over Futures so it will do absolutely nothing unless polled to completion
|
||||||
let m2 = m.clone();
|
let listeners_s: futures::stream::Collect<_, Vec<TcpListener>> = stream::iter((&config).listen.iter())
|
||||||
let c2 = config.clone();
|
.map(|l| {
|
||||||
|
let addr = l.address.clone();
|
||||||
|
TcpListener::bind((l.address.as_str(), l.port.unwrap_or(config::DEFAULT_PORT)))
|
||||||
|
// If the bind errors, include the address so we can log it
|
||||||
|
// Since this closure is lazy we need to have a cloned addr
|
||||||
|
.map_err(|e| { (addr, e) })
|
||||||
|
})
|
||||||
|
.filter_map(|f| async {
|
||||||
|
match f.await {
|
||||||
|
Ok(l) => Some(l),
|
||||||
|
Err((addr, e)) => {
|
||||||
|
error!(&log, "Could not connect to {}: {}", addr, e);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).collect();
|
||||||
|
|
||||||
|
let (mdb, pdb, auth) = exec.run_until(async {
|
||||||
let mut exec = futures::executor::LocalPool::new();
|
// Rull all futures to completion in parallel.
|
||||||
|
// This will "block" until all three are done starting up.
|
||||||
let enf = exec.run_until(async {
|
join!(machinedb_f, permission_f, authentication_f)
|
||||||
let e = access::init(&config).await.unwrap();
|
|
||||||
Mutable::new(e)
|
|
||||||
});
|
});
|
||||||
|
|
||||||
let p = auth::open_passdb(&config.passdb).unwrap();
|
// Error out if any of the subsystems failed to start.
|
||||||
let p = Mutable::new(p);
|
let mdb = mdb?;
|
||||||
let authp = auth::AuthenticationProvider::new(p, enf.clone());
|
let pdb = pdb.unwrap();
|
||||||
let authp = Mutable::new(authp);
|
let auth = auth?;
|
||||||
|
|
||||||
use std::net::ToSocketAddrs;
|
// Since the below closures will happen at a much later time we need to make sure all pointers
|
||||||
let args: Vec<String> = ::std::env::args().collect();
|
// are still valid. Thus, Arc.
|
||||||
if args.len() != 2 {
|
let l2 = log.clone();
|
||||||
println!("usage: {} ADDRESS[:PORT]", args[0]);
|
let l3 = log.clone();
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let addr = args[1].to_socket_addrs().unwrap().next().expect("could not parse address");
|
// Create a thread pool to run tasks on
|
||||||
|
let mut pool = ThreadPool::builder()
|
||||||
|
.after_start(move |i| { info!(l2.new(o!("system" => "threadpool")), "Starting Thread <{}>", i)})
|
||||||
|
.before_stop(move |i| { info!(l3.new(o!("system" => "threadpool")), "Stopping Thread <{}>", i)})
|
||||||
|
.create()?;
|
||||||
|
|
||||||
let permlog = log.new(o!());
|
// Spawner is a handle to the shared ThreadPool forwarded into each connection
|
||||||
let machlog = log.new(o!());
|
let spawner = pool.clone();
|
||||||
|
|
||||||
machine::save(&c2, &m2.lock_ref()).expect("MachineDB save");
|
|
||||||
|
|
||||||
let spawner = exec.spawner();
|
|
||||||
let result: Result<(), Box<dyn std::error::Error>> = exec.run_until(async move {
|
let result: Result<(), Box<dyn std::error::Error>> = exec.run_until(async move {
|
||||||
let listener = async_std::net::TcpListener::bind(&addr).await?;
|
// Generate a stream of TcpStreams appearing on any of the interfaces we listen to
|
||||||
let mut incoming = listener.incoming();
|
let listeners = listeners_s.await;
|
||||||
while let Some(socket) = incoming.next().await {
|
let mut incoming = stream::select_all(listeners.iter().map(|l| l.incoming()));
|
||||||
let socket = socket?;
|
|
||||||
trace!(log, "New connection from {:?}", socket.peer_addr());
|
|
||||||
// TODO: Prettify session handling
|
|
||||||
let auth = auth::Authentication::new(authp.clone());
|
|
||||||
trace!(log, "Init auth");
|
|
||||||
let perm = access::Permissions::new(permlog.clone(), enf.clone(), auth.clone());
|
|
||||||
trace!(log, "Init perm");
|
|
||||||
let mach = machine::Machines::new(machlog.clone(), m.clone(), perm.clone());
|
|
||||||
trace!(log, "Init mach");
|
|
||||||
|
|
||||||
let rpc_system = api::process_socket(auth, perm, mach, socket);
|
// Runn
|
||||||
spawner.spawn_local_obj(
|
while let Some(socket) = incoming.next().await {
|
||||||
Box::pin(rpc_system.map_err(|e| println!("error: {:?}", e)).map(|_|())).into()).expect("spawn")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
});
|
});
|
||||||
result.expect("main");
|
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user