From 1041afd0aba5b6a1dba43b47f02eb5e41e26da67 Mon Sep 17 00:00:00 2001 From: Gregor Reitzenstein Date: Tue, 1 Dec 2020 10:21:39 +0100 Subject: [PATCH] Network'd --- Cargo.lock | 8 +++++++ Cargo.toml | 3 +++ src/actor.rs | 48 +++++++++++++++++++++++++++++++------- src/api/machine.rs | 28 +--------------------- src/api/machines.rs | 22 +---------------- src/db/machine/internal.rs | 9 ++++--- src/initiator.rs | 10 +++++++- src/log.rs | 2 +- src/machine.rs | 2 +- src/main.rs | 15 ++++++------ src/server.rs | 1 + 11 files changed, 75 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 940b7f9..035fddd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -410,6 +410,7 @@ dependencies = [ name = "diflouroborane" version = "0.1.0" dependencies = [ + "async-channel", "async-trait", "capnp", "capnp-futures", @@ -417,6 +418,7 @@ dependencies = [ "capnpc", "clap", "config", + "easy-parallel", "flexbuffers", "futures 0.3.7", "futures-signals", @@ -466,6 +468,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" +[[package]] +name = "easy-parallel" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dd4afd79212583ff429b913ad6605242ed7eec277e950b1438f300748f948f4" + [[package]] name = "env_logger" version = "0.7.1" diff --git a/Cargo.toml b/Cargo.toml index 987af7f..c4b80ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,5 +60,8 @@ lazy_static = "1.4.0" rust-argon2 = "0.8" rand = "0.7" +async-channel = "1.5" +easy-parallel = "3.1" + [build-dependencies] capnpc = "0.13" diff --git a/src/actor.rs b/src/actor.rs index 78d8392..8897288 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -1,5 +1,11 @@ +use std::pin::Pin; +use std::task::{Poll, Context}; +use std::sync::Arc; use std::future::Future; +use smol::Executor; + +use futures::{future::BoxFuture, Stream, StreamExt}; use futures_signals::signal::Signal; use crate::db::machine::MachineState; @@ -8,18 +14,42 @@ use crate::config::Settings; use crate::error::Result; pub struct Actor { - inner: Box + inner: Box, + f: Option> } -impl Actor { - pub fn new(inner: Box) -> Self { - Self { inner } - } +unsafe impl Send for Actor {} - pub fn run(self, ex: Arc) -> impl Future { - inner.for_each(|fut| { - ex.run(fut); - }) +impl Actor { + pub fn new(inner: Box) -> Self { + Self { + inner: inner, + f: None, + } + } +} + +impl Future for Actor { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let mut this = &mut *self; + + // If we have a future at the moment, poll it + if let Some(mut f) = this.f.take() { + if Future::poll(Pin::new(&mut f), cx).is_pending() { + this.f.replace(f); + } + } + + match Stream::poll_next(Pin::new(&mut this.inner), cx) { + Poll::Ready(None) => Poll::Ready(()), + Poll::Ready(Some(f)) => { + this.f.replace(f); + Poll::Pending + } + Poll::Pending => Poll::Pending + } } } diff --git a/src/api/machine.rs b/src/api/machine.rs index f0bc8f9..1cdeb10 100644 --- a/src/api/machine.rs +++ b/src/api/machine.rs @@ -29,33 +29,7 @@ impl Machine { } pub fn fill_info(&self, builder: &mut m_info::Builder) { - if let Some(desc) = self.db.machine.get_desc(&self.id) { - builder.set_name(&desc.name); - if let Some(d) = desc.description.as_ref() { - builder.set_description(d); - } - - // TODO: Set `responsible` - // TODO: Error Handling - if let Some(state) = self.db.machine.get_state(&self.id) { - match state.state { - Status::Free => builder.set_state(State::Free), - Status::InUse(_u, _p) => { - builder.set_state(State::InUse); - } - Status::ToCheck(_u, _p) => { - builder.set_state(State::ToCheck); - } - Status::Blocked(_u, _p) => { - builder.set_state(State::Blocked); - } - Status::Disabled => builder.set_state(State::Disabled), - Status::Reserved(_u, _p) => { - builder.set_state(State::Reserved); - } - } - } - } + unimplemented!() } } diff --git a/src/api/machines.rs b/src/api/machines.rs index f02c67f..285f0cf 100644 --- a/src/api/machines.rs +++ b/src/api/machines.rs @@ -41,26 +41,6 @@ impl machines::Server for Machines { mut results: machines::GetMachineResults) -> Promise<(), Error> { - match params.get() { - Ok(reader) => { - if let Ok(api_id) = reader.get_uuid() { - let id = uuid_from_api(api_id); - if self.db.machine.exists(id) { - debug!(self.session.log, "Accessing machine {}", id); - // TODO check disclose permission - - let mut builder = results.get().init_machine(); - - let m = Machine::new(self.session.clone(), id, self.db.clone()); - - Machine::fill(Arc::new(m), &mut builder); - } else { - debug!(self.session.log, "Client requested nonexisting machine {}", id); - } - } - Promise::ok(()) - } - Err(e) => Promise::err(e), - } + unimplemented!() } } diff --git a/src/db/machine/internal.rs b/src/db/machine/internal.rs index 1079dc9..74cbadd 100644 --- a/src/db/machine/internal.rs +++ b/src/db/machine/internal.rs @@ -30,10 +30,10 @@ impl Internal { Self { log, env, db } } - pub fn get_with_txn(&self, txn: &T, uuid: &Uuid) + pub fn get_with_txn(&self, txn: &T, id: &String) -> Result> { - match txn.get(self.db, uuid.as_bytes()) { + match txn.get(self.db, &id.as_bytes()) { Ok(bytes) => { let mut machine: MachineState = flexbuffers::from_slice(bytes)?; Ok(Some(machine)) @@ -48,11 +48,11 @@ impl Internal { self.get_with_txn(&txn, id) } - pub fn put_with_txn(&self, txn: &mut RwTransaction, uuid: &Uuid, status: &MachineState) + pub fn put_with_txn(&self, txn: &mut RwTransaction, uuid: &String, status: &MachineState) -> Result<()> { let bytes = flexbuffers::to_vec(status)?; - txn.put(self.db, uuid.as_bytes(), &bytes, lmdb::WriteFlags::empty())?; + txn.put(self.db, &uuid.as_bytes(), &bytes, lmdb::WriteFlags::empty())?; Ok(()) } @@ -67,7 +67,6 @@ impl Internal { let mut cursor = txn.open_ro_cursor(self.db)?; Ok(cursor.iter_start().map(|buf| { let (kbuf, vbuf) = buf.unwrap(); - let machID = uuid::Uuid::from_slice(kbuf).unwrap(); flexbuffers::from_slice(vbuf).unwrap() })) } diff --git a/src/initiator.rs b/src/initiator.rs index 5314854..088e1e0 100644 --- a/src/initiator.rs +++ b/src/initiator.rs @@ -1,7 +1,15 @@ +use std::future::Future; use smol::Task; +use crate::error::Result; + pub struct Initiator { - inner: Task<()>, +} + +impl Initiator { + pub fn run(self) -> impl Future { + futures::future::pending() + } } pub fn load(config: &crate::config::Settings) -> Result> { diff --git a/src/log.rs b/src/log.rs index 0e1643a..9f052e1 100644 --- a/src/log.rs +++ b/src/log.rs @@ -3,7 +3,7 @@ use slog_async; use slog_term::{TermDecorator, FullFormat}; use crate::config::Settings; -pub fn init(_config: &Settings) -> Logger { +pub fn init() -> Logger { let decorator = TermDecorator::new().build(); let drain = FullFormat::new(decorator).build().fuse(); let drain = slog_async::Async::new(drain).build().fuse(); diff --git a/src/machine.rs b/src/machine.rs index d5c3deb..f6934a4 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -64,7 +64,7 @@ impl Machine { } pub fn from_file>(path: P) -> Result> { - let map: HashMap = MachineDescription::load_file(path)?; + let mut map: HashMap = MachineDescription::load_file(path)?; Ok(map.drain().map(|(id, desc)| { Self::construct(id, desc, MachineState::new()) }).collect()) diff --git a/src/main.rs b/src/main.rs index 20a4789..3dcb68c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -94,7 +94,7 @@ fn main() { handle.write_all(&encoded).unwrap(); // Early return to exit. - return Ok(()); + return; } let retval; @@ -142,24 +142,23 @@ fn maybe(matches: clap::ArgMatches, log: Arc) -> Result<(), Error> { // when bffh should exit let machines = machine::load(&config)?; let actors = actor::load(&config)?; - let initiators = load_initiators(&config)?; + let initiators = initiator::load(&config)?; // TODO restore connections between initiators, machines, actors - let ex = Arc::new(Executor::new()); + let ex = Executor::new(); for i in initiators.into_iter() { ex.spawn(i.run()); } for a in actors.into_iter() { - ex.spawn(a.run()); + ex.spawn(a); } - let (signal, shutdown) = futures::channel::oneshot::channel(); - for i in 0..4 { - std::thread::spawn(|| smol::block_on(ex.run(shutdown.recv()))); - } + let (signal, shutdown) = async_channel::bounded::<()>(1); + easy_parallel::Parallel::new() + .each(0..4, |_| smol::block_on(ex.run(shutdown.recv()))); server::serve_api_connections(log.clone(), config, db) // Signal is dropped here, stopping all executor threads as well. diff --git a/src/server.rs b/src/server.rs index b985614..fede905 100644 --- a/src/server.rs +++ b/src/server.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use crate::db::Databases; +/// Handle all API connections and run the RPC tasks spawned from that on the local thread. pub fn serve_api_connections(log: Arc, config: Settings, db: Databases) -> Result<(), Error> { let signal = Box::pin(async { let (tx, mut rx) = UnixStream::pair()?;