Network'd

This commit is contained in:
Gregor Reitzenstein 2020-12-01 10:21:39 +01:00
parent aace3c1b32
commit 1041afd0ab
11 changed files with 75 additions and 73 deletions

8
Cargo.lock generated
View File

@ -410,6 +410,7 @@ dependencies = [
name = "diflouroborane" name = "diflouroborane"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-channel",
"async-trait", "async-trait",
"capnp", "capnp",
"capnp-futures", "capnp-futures",
@ -417,6 +418,7 @@ dependencies = [
"capnpc", "capnpc",
"clap", "clap",
"config", "config",
"easy-parallel",
"flexbuffers", "flexbuffers",
"futures 0.3.7", "futures 0.3.7",
"futures-signals", "futures-signals",
@ -466,6 +468,12 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0"
[[package]]
name = "easy-parallel"
version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1dd4afd79212583ff429b913ad6605242ed7eec277e950b1438f300748f948f4"
[[package]] [[package]]
name = "env_logger" name = "env_logger"
version = "0.7.1" version = "0.7.1"

View File

@ -60,5 +60,8 @@ lazy_static = "1.4.0"
rust-argon2 = "0.8" rust-argon2 = "0.8"
rand = "0.7" rand = "0.7"
async-channel = "1.5"
easy-parallel = "3.1"
[build-dependencies] [build-dependencies]
capnpc = "0.13" capnpc = "0.13"

View File

@ -1,5 +1,11 @@
use std::pin::Pin;
use std::task::{Poll, Context};
use std::sync::Arc;
use std::future::Future; use std::future::Future;
use smol::Executor;
use futures::{future::BoxFuture, Stream, StreamExt};
use futures_signals::signal::Signal; use futures_signals::signal::Signal;
use crate::db::machine::MachineState; use crate::db::machine::MachineState;
@ -8,18 +14,42 @@ use crate::config::Settings;
use crate::error::Result; use crate::error::Result;
pub struct Actor { pub struct Actor {
inner: Box<dyn Actuator> inner: Box<dyn Actuator + Unpin>,
f: Option<BoxFuture<'static, ()>>
} }
impl Actor { unsafe impl Send for Actor {}
pub fn new(inner: Box<dyn Actuator>) -> Self {
Self { inner }
}
pub fn run(self, ex: Arc<Executor>) -> impl Future<Output=()> { impl Actor {
inner.for_each(|fut| { pub fn new(inner: Box<dyn Actuator + Unpin>) -> Self {
ex.run(fut); Self {
}) inner: inner,
f: None,
}
}
}
impl Future for Actor {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut this = &mut *self;
// If we have a future at the moment, poll it
if let Some(mut f) = this.f.take() {
if Future::poll(Pin::new(&mut f), cx).is_pending() {
this.f.replace(f);
}
}
match Stream::poll_next(Pin::new(&mut this.inner), cx) {
Poll::Ready(None) => Poll::Ready(()),
Poll::Ready(Some(f)) => {
this.f.replace(f);
Poll::Pending
}
Poll::Pending => Poll::Pending
}
} }
} }

View File

@ -29,33 +29,7 @@ impl Machine {
} }
pub fn fill_info(&self, builder: &mut m_info::Builder) { pub fn fill_info(&self, builder: &mut m_info::Builder) {
if let Some(desc) = self.db.machine.get_desc(&self.id) { unimplemented!()
builder.set_name(&desc.name);
if let Some(d) = desc.description.as_ref() {
builder.set_description(d);
}
// TODO: Set `responsible`
// TODO: Error Handling
if let Some(state) = self.db.machine.get_state(&self.id) {
match state.state {
Status::Free => builder.set_state(State::Free),
Status::InUse(_u, _p) => {
builder.set_state(State::InUse);
}
Status::ToCheck(_u, _p) => {
builder.set_state(State::ToCheck);
}
Status::Blocked(_u, _p) => {
builder.set_state(State::Blocked);
}
Status::Disabled => builder.set_state(State::Disabled),
Status::Reserved(_u, _p) => {
builder.set_state(State::Reserved);
}
}
}
}
} }
} }

View File

@ -41,26 +41,6 @@ impl machines::Server for Machines {
mut results: machines::GetMachineResults) mut results: machines::GetMachineResults)
-> Promise<(), Error> -> Promise<(), Error>
{ {
match params.get() { unimplemented!()
Ok(reader) => {
if let Ok(api_id) = reader.get_uuid() {
let id = uuid_from_api(api_id);
if self.db.machine.exists(id) {
debug!(self.session.log, "Accessing machine {}", id);
// TODO check disclose permission
let mut builder = results.get().init_machine();
let m = Machine::new(self.session.clone(), id, self.db.clone());
Machine::fill(Arc::new(m), &mut builder);
} else {
debug!(self.session.log, "Client requested nonexisting machine {}", id);
}
}
Promise::ok(())
}
Err(e) => Promise::err(e),
}
} }
} }

View File

@ -30,10 +30,10 @@ impl Internal {
Self { log, env, db } Self { log, env, db }
} }
pub fn get_with_txn<T: Transaction>(&self, txn: &T, uuid: &Uuid) pub fn get_with_txn<T: Transaction>(&self, txn: &T, id: &String)
-> Result<Option<MachineState>> -> Result<Option<MachineState>>
{ {
match txn.get(self.db, uuid.as_bytes()) { match txn.get(self.db, &id.as_bytes()) {
Ok(bytes) => { Ok(bytes) => {
let mut machine: MachineState = flexbuffers::from_slice(bytes)?; let mut machine: MachineState = flexbuffers::from_slice(bytes)?;
Ok(Some(machine)) Ok(Some(machine))
@ -48,11 +48,11 @@ impl Internal {
self.get_with_txn(&txn, id) self.get_with_txn(&txn, id)
} }
pub fn put_with_txn(&self, txn: &mut RwTransaction, uuid: &Uuid, status: &MachineState) pub fn put_with_txn(&self, txn: &mut RwTransaction, uuid: &String, status: &MachineState)
-> Result<()> -> Result<()>
{ {
let bytes = flexbuffers::to_vec(status)?; let bytes = flexbuffers::to_vec(status)?;
txn.put(self.db, uuid.as_bytes(), &bytes, lmdb::WriteFlags::empty())?; txn.put(self.db, &uuid.as_bytes(), &bytes, lmdb::WriteFlags::empty())?;
Ok(()) Ok(())
} }
@ -67,7 +67,6 @@ impl Internal {
let mut cursor = txn.open_ro_cursor(self.db)?; let mut cursor = txn.open_ro_cursor(self.db)?;
Ok(cursor.iter_start().map(|buf| { Ok(cursor.iter_start().map(|buf| {
let (kbuf, vbuf) = buf.unwrap(); let (kbuf, vbuf) = buf.unwrap();
let machID = uuid::Uuid::from_slice(kbuf).unwrap();
flexbuffers::from_slice(vbuf).unwrap() flexbuffers::from_slice(vbuf).unwrap()
})) }))
} }

View File

@ -1,7 +1,15 @@
use std::future::Future;
use smol::Task; use smol::Task;
use crate::error::Result;
pub struct Initiator { pub struct Initiator {
inner: Task<()>, }
impl Initiator {
pub fn run(self) -> impl Future<Output=()> {
futures::future::pending()
}
} }
pub fn load(config: &crate::config::Settings) -> Result<Vec<Initiator>> { pub fn load(config: &crate::config::Settings) -> Result<Vec<Initiator>> {

View File

@ -3,7 +3,7 @@ use slog_async;
use slog_term::{TermDecorator, FullFormat}; use slog_term::{TermDecorator, FullFormat};
use crate::config::Settings; use crate::config::Settings;
pub fn init(_config: &Settings) -> Logger { pub fn init() -> Logger {
let decorator = TermDecorator::new().build(); let decorator = TermDecorator::new().build();
let drain = FullFormat::new(decorator).build().fuse(); let drain = FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse(); let drain = slog_async::Async::new(drain).build().fuse();

View File

@ -64,7 +64,7 @@ impl Machine {
} }
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Vec<Machine>> { pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Vec<Machine>> {
let map: HashMap<MachineIdentifier, MachineDescription> = MachineDescription::load_file(path)?; let mut map: HashMap<MachineIdentifier, MachineDescription> = MachineDescription::load_file(path)?;
Ok(map.drain().map(|(id, desc)| { Ok(map.drain().map(|(id, desc)| {
Self::construct(id, desc, MachineState::new()) Self::construct(id, desc, MachineState::new())
}).collect()) }).collect())

View File

@ -94,7 +94,7 @@ fn main() {
handle.write_all(&encoded).unwrap(); handle.write_all(&encoded).unwrap();
// Early return to exit. // Early return to exit.
return Ok(()); return;
} }
let retval; let retval;
@ -142,24 +142,23 @@ fn maybe(matches: clap::ArgMatches, log: Arc<Logger>) -> Result<(), Error> {
// when bffh should exit // when bffh should exit
let machines = machine::load(&config)?; let machines = machine::load(&config)?;
let actors = actor::load(&config)?; let actors = actor::load(&config)?;
let initiators = load_initiators(&config)?; let initiators = initiator::load(&config)?;
// TODO restore connections between initiators, machines, actors // TODO restore connections between initiators, machines, actors
let ex = Arc::new(Executor::new()); let ex = Executor::new();
for i in initiators.into_iter() { for i in initiators.into_iter() {
ex.spawn(i.run()); ex.spawn(i.run());
} }
for a in actors.into_iter() { for a in actors.into_iter() {
ex.spawn(a.run()); ex.spawn(a);
} }
let (signal, shutdown) = futures::channel::oneshot::channel(); let (signal, shutdown) = async_channel::bounded::<()>(1);
for i in 0..4 { easy_parallel::Parallel::new()
std::thread::spawn(|| smol::block_on(ex.run(shutdown.recv()))); .each(0..4, |_| smol::block_on(ex.run(shutdown.recv())));
}
server::serve_api_connections(log.clone(), config, db) server::serve_api_connections(log.clone(), config, db)
// Signal is dropped here, stopping all executor threads as well. // Signal is dropped here, stopping all executor threads as well.

View File

@ -26,6 +26,7 @@ use std::sync::Arc;
use crate::db::Databases; use crate::db::Databases;
/// Handle all API connections and run the RPC tasks spawned from that on the local thread.
pub fn serve_api_connections(log: Arc<Logger>, config: Settings, db: Databases) -> Result<(), Error> { pub fn serve_api_connections(log: Arc<Logger>, config: Settings, db: Databases) -> Result<(), Error> {
let signal = Box::pin(async { let signal = Box::pin(async {
let (tx, mut rx) = UnixStream::pair()?; let (tx, mut rx) = UnixStream::pair()?;