from __future__ import annotations import asyncio import logging import os from contextlib import AsyncExitStack, asynccontextmanager from dataclasses import dataclass from typing import Optional import asyncio_mqtt from asyncio_mqtt import Client, MqttError import fabapi import toml @dataclass(frozen=True) class Host: hostname: str port: int username: Optional[str] = None password: Optional[str] = None class Reader: def __init__(self, reader_id: str, machine_urn: str, bffhd_address: Host): self.reader_id = reader_id self.machine_urn = machine_urn self.bffhd_address = bffhd_address self.is_authenticated = False self.auth_cap = None self.bootstrap_cap = None async def handle_messages(self, messages, client: asyncio_mqtt.Client): async for message in messages: response_for_reader = None if not self.auth_cap: self.auth_cap, response_for_reader, self.bootstrap_cap = await fabapi.connect_with_fabfire_initial( self.bffhd_address.hostname, self.bffhd_address.port, message.payload) elif not self.is_authenticated: response_for_reader, self.is_authenticated = await fabapi.connect_with_fabfire_step(self.auth_cap, message.payload) if self.is_authenticated: await client.publish(f"/cmnd/reader/{self.reader_id}", payload='{"Cmd":"haltPICC"}', qos=2, retain=False) ms = await self.bootstrap_cap.machineSystem().a_wait() info = await ms.machineSystem.info().a_wait() ma = await info.info.getMachineURN(f"{self.machine_urn}").a_wait() if ma.machine.state == "inUse": await ma.machine.inuse.giveBack().a_wait() else: await ma.machine.use.use().a_wait() self.is_authenticated = False self.bootstrap_cap = None self.auth_cap = None response_for_reader = None if response_for_reader: await client.publish(f"/cmnd/reader/{self.reader_id}", payload=response_for_reader, qos=2, retain=False) async def run_adapter(mqtt_host: Host, readers: list[Reader]): # We 💛 context managers. Let's create a stack to help # us manage them. async with AsyncExitStack() as stack: # Keep track of the asyncio tasks that we create, so that # we can cancel them on exit tasks = set() stack.push_async_callback(cancel_tasks, tasks) # Connect to the MQTT broker client = Client(hostname=mqtt_host.hostname, port=mqtt_host.port, username=mqtt_host.username, password=mqtt_host.password) await stack.enter_async_context(client) # setup async handler for readers for reader in readers: manager = client.filtered_messages(f"/rfid_reader/{reader.reader_id}") messages = await stack.enter_async_context(manager) task = asyncio.create_task(reader.handle_messages(messages, client)) tasks.add(task) logging.info(f"Registered handler for reader {reader.reader_id}") # Messages that doesn't match a filter will get logged here messages = await stack.enter_async_context(client.unfiltered_messages()) task = asyncio.create_task(log_unhandled(messages)) tasks.add(task) # Subscribe to topic(s) # 🤔 Note that we subscribe *after* starting the message # loggers. Otherwise, we may miss retained messages. await client.subscribe("/rfid_reader/#") logging.info("Initialization done") # Wait for everything to complete (or fail due to, e.g., network # errors) await asyncio.gather(*tasks) async def log_unhandled(messages): async for message in messages: logging.warning(f"received unhandled message: {message.payload.decode()}") async def cancel_tasks(tasks): for task in tasks: if task.done(): continue try: task.cancel() await task except asyncio.CancelledError: pass async def main(): # Run the advanced_example indefinitely. Reconnect automatically # if the connection is lost. reconnect_interval = 3 # [seconds] config = toml.load("config.toml") mqtt_host = Host(config["mqtt"]["hostname"], config["mqtt"]["port"], config["mqtt"].setdefault("username", None), config["mqtt"].setdefault("password", None)) bffhd_host = Host(config["bffhd"]["hostname"], config["bffhd"]["port"]) readers = map(lambda reader: Reader(reader["id"], reader["machine"], bffhd_host), config["readers"].values()) while True: try: await run_adapter(mqtt_host, readers) except MqttError as error: logging.error(f'"{error}". Reconnecting in {reconnect_interval} seconds.') finally: await asyncio.sleep(reconnect_interval) if __name__ == "__main__": LOGLEVEL = os.environ.get('LOGLEVEL', 'INFO').upper() logging.basicConfig(level=LOGLEVEL) asyncio.run(main())