Commit current state

This commit is contained in:
Nadja Reitzenstein
2021-12-17 16:43:31 +01:00
parent d7a66e2149
commit 4778c7a8d3
11 changed files with 282 additions and 58 deletions

View File

@ -6,18 +6,48 @@ use async_oneshot as oneshot;
use futures_signals::signal::Signal;
use futures_util::future::BoxFuture;
use smol::future::FutureExt;
use sdk::initiators::{Initiator, InitiatorError, UpdateError, UpdateSink, UserID, ResourceID};
use crate::resource::{Error, Update};
use crate::resource::claim::{ResourceID, UserID};
use crate::resource::state::State;
pub enum UpdateError {
/// We're not connected to anything anymore. You can't do anything about this error and the
/// only reason why you even get it is because your future was called a last time before
/// being shelved so best way to handle this error is to just return from your loop entirely,
/// cleaning up any state that doesn't survive a freeze.
Closed,
Denied,
Other(Box<dyn std::error::Error + Send>),
}
pub trait InitiatorError: std::error::Error + Send {
}
pub trait Initiator {
fn start_for(&mut self, machine: ResourceID)
-> BoxFuture<'static, Result<(), Box<dyn InitiatorError>>>;
fn run(&mut self, request: &mut UpdateSink)
-> BoxFuture<'static, Result<(), Box<dyn InitiatorError>>>;
}
#[derive(Clone)]
pub struct BffhUpdateSink {
tx: channel::Sender<(Option<UserID>, sdk::initiators::State)>,
pub struct UpdateSink {
tx: channel::Sender<(Option<UserID>, State)>,
rx: channel::Receiver<Result<(), Error>>,
}
#[async_trait::async_trait]
impl UpdateSink for BffhUpdateSink {
async fn send(&mut self, userid: Option<UserID>, state: sdk::initiators::State)
impl UpdateSink {
fn new(tx: channel::Sender<(Option<UserID>, State)>,
rx: channel::Receiver<Result<(), Error>>)
-> Self
{
Self { tx, rx }
}
async fn send(&mut self, userid: Option<UserID>, state: State)
-> Result<(), UpdateError>
{
if let Err(_e) = self.tx.send((userid, state)).await {
@ -34,15 +64,6 @@ impl UpdateSink for BffhUpdateSink {
}
}
impl BffhUpdateSink {
fn new(tx: channel::Sender<(Option<UserID>, sdk::initiators::State)>,
rx: channel::Receiver<Result<(), Error>>)
-> Self
{
Self { tx, rx }
}
}
struct Resource;
pub struct InitiatorDriver<S, I: Initiator> {
resource_signal: S,
@ -51,8 +72,8 @@ pub struct InitiatorDriver<S, I: Initiator> {
initiator: I,
initiator_future: Option<BoxFuture<'static, Result<(), Box<dyn InitiatorError>>>>,
update_sink: BffhUpdateSink,
initiator_req_rx: channel::Receiver<(Option<UserID>, sdk::initiators::State)>,
update_sink: UpdateSink,
initiator_req_rx: channel::Receiver<(Option<UserID>, State)>,
initiator_reply_tx: channel::Sender<Result<(), Error>>,
}
@ -65,7 +86,7 @@ impl<S: Signal<Item=ResourceSink>, I: Initiator> InitiatorDriver<S, I> {
pub fn new(resource_signal: S, initiator: I) -> Self {
let (initiator_reply_tx, initiator_reply_rx) = channel::bounded(1);
let (initiator_req_tx, initiator_req_rx) = async_channel::bounded(1);
let update_sink = BffhUpdateSink::new(initiator_req_tx, initiator_reply_rx);
let update_sink = UpdateSink::new(initiator_req_tx, initiator_reply_rx);
Self {
resource: None,
resource_signal,

View File

@ -1,4 +1,6 @@
use std::collections::HashSet;
use std::sync::Arc;
use lmdb::{RwTransaction, Transaction};
use crate::db::{RawDB, DB, AllocAdapter, Environment, Result};
use crate::db::{DatabaseFlags, LMDBorrow, RoTransaction, WriteFlags, };
use super::User;
@ -61,4 +63,60 @@ impl UserDB {
Ok(out)
}
}
pub struct UserIndex {
env: Arc<Environment>,
usernames: RawDB,
roles: RawDB,
}
impl UserIndex {
pub fn update(&self, old: &User, new: &User) -> Result<()> {
assert_eq!(old.id, new.id);
let mut txn = self.env.begin_rw_txn()?;
if old.username != new.username {
self.update_username(&mut txn, new.id, &old.username, &new.username)?;
}
let mut to_remove: HashSet<&String> = old.roles.iter().collect();
let mut to_add: HashSet<&String> = HashSet::new();
for role in new.roles.iter() {
// If a role wasn't found in the old ones it's a new one that's being added
if !to_remove.remove(role) {
to_add.insert(role);
}
// Otherwise it's in both sets so we just ignore it.
}
self.update_roles(&mut txn, new.id, to_remove, to_add)?;
txn.commit()?;
Ok(())
}
fn update_username(&self, txn: &mut RwTransaction, uid: u128, old: &String, new: &String)
-> Result<()>
{
let flags = WriteFlags::empty();
self.usernames.del(txn, &old.as_bytes(), Some(&uid.to_ne_bytes()))?;
self.usernames.put(txn, &new.as_bytes(), &uid.to_ne_bytes(), flags)?;
Ok(())
}
fn update_roles(&self,
txn: &mut RwTransaction,
uid: u128,
remove: HashSet<&String>,
add: HashSet<&String>
) -> Result<()>
{
let flags = WriteFlags::empty();
for role in remove.iter() {
self.roles.del(txn, &role.as_bytes(), Some(&uid.to_ne_bytes()))?;
}
for role in add.iter() {
self.roles.put(txn, &role.as_bytes(), &uid.to_ne_bytes(), flags)?;
}
Ok(())
}
}

View File

@ -1,3 +1,11 @@
/*
* Copyright (c) 2021. Lorem ipsum dolor sit amet, consectetur adipiscing elit.
* Morbi non lorem porttitor neque feugiat blandit. Ut vitae ipsum eget quam lacinia accumsan.
* Etiam sed turpis ac ipsum condimentum fringilla. Maecenas magna.
* Proin dapibus sapien vel ante. Aliquam erat volutpat. Pellentesque sagittis ligula eget metus.
* Vestibulum commodo. Ut rhoncus gravida arcu.
*/
use rkyv::{Archive, Serialize, Deserialize};
use capnp::capability::Promise;
@ -26,7 +34,9 @@ pub struct User {
}
impl User {
pub fn new(id: u128, username: String, roles: Vec<String>) -> Self {
User { id, username, roles }
}
}
impl info::Server for User {