Re-enable MQTT

This commit is contained in:
Nadja Reitzenstein 2021-09-21 18:45:35 +02:00
parent 73f63238a8
commit ba73fe80c5
4 changed files with 20 additions and 17 deletions

View File

@ -1,14 +1,14 @@
-- { actor_connections = [] : List { _1 : Text, _2 : Text } -- { actor_connections = [] : List { _1 : Text, _2 : Text }
{ actor_connections = { actor_connections =
-- Link up machines to actors -- Link up machines to actors
[ { _1 = "Testmachine", _2 = "Actor" } [ { _1 = "Testmachine", _2 = "Shelly_1234" }
, { _1 = "Another", _2 = "Bash" } , { _1 = "Another", _2 = "Bash" }
-- One machine can have as many actors as it wants -- One machine can have as many actors as it wants
, { _1 = "Yetmore", _2 = "Bash2" } , { _1 = "Yetmore", _2 = "Bash2" }
, { _1 = "Yetmore", _2 = "FailBash"} , { _1 = "Yetmore", _2 = "FailBash"}
] ]
, actors = , actors =
{ Actor = { module = "Dummy", params = {=} } { Shelly_1234 = { module = "Shelly", params = {=} }
, Bash = { module = "Process", params = , Bash = { module = "Process", params =
{ cmd = "./examples/actor.sh" { cmd = "./examples/actor.sh"
, args = "your ad could be here" , args = "your ad could be here"
@ -23,8 +23,8 @@
} }
, init_connections = [] : List { _1 : Text, _2 : Text } , init_connections = [] : List { _1 : Text, _2 : Text }
--, init_connections = [{ _1 = "Initiator", _2 = "Testmachine" }] --, init_connections = [{ _1 = "Initiator", _2 = "Testmachine" }]
, initiators = --{=} , initiators = {=}
{ Initiator = { module = "Dummy", params = {=} } } --{ Initiator = { module = "Dummy", params = {=} } }
, listens = , listens =
[ { address = "127.0.0.1", port = Some 59661 } [ { address = "127.0.0.1", port = Some 59661 }
, { address = "::1", port = Some 59661 } , { address = "::1", port = Some 59661 }
@ -56,7 +56,7 @@
, write = "lab.test.write" , write = "lab.test.write"
} }
} }
, mqtt_url = "" , mqtt_url = "tcp://localhost:1883"
, db_path = "/tmp/bffh" , db_path = "/tmp/bffh"
, roles = , roles =
{ testrole = { testrole =

View File

@ -114,8 +114,8 @@ pub struct Dummy {
} }
impl Dummy { impl Dummy {
pub fn new(log: &Logger) -> Self { pub fn new(log: Logger) -> Self {
Self { log: log.new(o!("module" => "Dummy Actor")) } Self { log }
} }
} }
@ -129,8 +129,12 @@ impl Actuator for Dummy {
pub fn load(log: &Logger, config: &Config) -> Result<(ActorMap, Vec<Actor>)> { pub fn load(log: &Logger, config: &Config) -> Result<(ActorMap, Vec<Actor>)> {
let mut map = HashMap::new(); let mut map = HashMap::new();
let mqtt = AsyncClient::new(config.mqtt_url.clone())?;
let tok = mqtt.connect(paho_mqtt::ConnectOptions::new());
smol::block_on(tok)?;
let actuators = config.actors.iter() let actuators = config.actors.iter()
.map(|(k,v)| (k, load_single(log, k, &v.module, &v.params))) .map(|(k,v)| (k, load_single(log, k, &v.module, &v.params, mqtt.clone())))
.filter_map(|(k, n)| match n { .filter_map(|(k, n)| match n {
None => None, None => None,
Some(a) => Some((k, a)) Some(a) => Some((k, a))
@ -151,20 +155,25 @@ fn load_single(
log: &Logger, log: &Logger,
name: &String, name: &String,
module_name: &String, module_name: &String,
params: &HashMap<String, String> params: &HashMap<String, String>,
client: AsyncClient,
) -> Option<Box<dyn Actuator + Sync + Send>> ) -> Option<Box<dyn Actuator + Sync + Send>>
{ {
use crate::modules::*; use crate::modules::*;
info!(log, "Loading actor \"{}\" with module {} and params {:?}", name, module_name, params); info!(log, "Loading actor \"{}\" with module {} and params {:?}", name, module_name, params);
let log = log.new(o!("name" => name.clone()));
match module_name.as_ref() { match module_name.as_ref() {
"Dummy" => { "Dummy" => {
Some(Box::new(Dummy::new(log))) Some(Box::new(Dummy::new(log)))
} }
"Process" => { "Process" => {
Process::new(log.new(o!("name" => name.clone())), name.clone(), params) Process::new(log, name.clone(), params)
.map(|a| a.into_boxed_actuator()) .map(|a| a.into_boxed_actuator())
} }
"Shelly" => {
Some(Box::new(Shelly::new(log, name.clone(), client)))
}
_ => { _ => {
error!(log, "No actor found with name \"{}\", configured as \"{}\".", module_name, name); error!(log, "No actor found with name \"{}\", configured as \"{}\".", module_name, name);
None None

View File

@ -167,11 +167,6 @@ fn maybe(matches: clap::ArgMatches, log: Arc<Logger>) -> Result<(), Error> {
let ex = Executor::new(); let ex = Executor::new();
let db = db::Databases::new(&log, &config)?; let db = db::Databases::new(&log, &config)?;
//let mqtt = AsyncClient::new(config.mqtt_url.clone())?;
//let tok = mqtt.connect(paho_mqtt::ConnectOptions::new());
//smol::block_on(tok)?;
let machines = machine::load(&config)?; let machines = machine::load(&config)?;
let (actor_map, actors) = actor::load(&log, &config)?; let (actor_map, actors) = actor::load(&log, &config)?;
let (init_map, initiators) = initiator::load(&log, &config)?; let (init_map, initiators) = initiator::load(&log, &config)?;

View File

@ -22,8 +22,7 @@ pub struct Shelly {
} }
impl Shelly { impl Shelly {
pub fn new(log_view: &Logger, name: String, client: mqtt::AsyncClient) -> Self { pub fn new(log: Logger, name: String, client: mqtt::AsyncClient) -> Self {
let log = log_view.new(o!("shelly_name" => name.clone()));
debug!(log, "Starting shelly module for {}", &name); debug!(log, "Starting shelly module for {}", &name);
Shelly { log, name, client, } Shelly { log, name, client, }
} }