from __future__ import annotations import asyncio import logging import os from contextlib import AsyncExitStack, asynccontextmanager from asyncio_mqtt import Client, MqttError import toml from host import Host from reader import Reader async def run_adapter(mqtt_host: Host, readers: list[Reader]): # We 💛 context managers. Let's create a stack to help # us manage them. async with AsyncExitStack() as stack: # Keep track of the asyncio tasks that we create, so that # we can cancel them on exit tasks = set() stack.push_async_callback(cancel_tasks, tasks) # Connect to the MQTT broker client = Client(hostname=mqtt_host.hostname, port=mqtt_host.port, username=mqtt_host.username, password=mqtt_host.password) await stack.enter_async_context(client) # setup async handler for readers for reader in readers: manager = client.filtered_messages(f"fabreader/{reader.reader_id}/#") messages = await stack.enter_async_context(manager) task = asyncio.create_task(reader.handle_messages(messages, client)) tasks.add(task) logging.info(f"Registered handler for reader {reader.reader_id}") # Messages that doesn't match a filter will get logged here messages = await stack.enter_async_context(client.unfiltered_messages()) task = asyncio.create_task(log_unhandled(messages)) tasks.add(task) # Subscribe to topic(s) # 🤔 Note that we subscribe *after* starting the message # loggers. Otherwise, we may miss retained messages. await client.subscribe("fabreader/#") logging.info("Initialization done") # Wait for everything to complete (or fail due to, e.g., network # errors) await asyncio.gather(*tasks) async def log_unhandled(messages): async for message in messages: logging.warning(f"received unhandled message on: {message.topic}") async def cancel_tasks(tasks): for task in tasks: if task.done(): continue try: task.cancel() await task except asyncio.CancelledError: pass async def main(): # Run the advanced_example indefinitely. Reconnect automatically # if the connection is lost. reconnect_interval = 3 # [seconds] config = toml.load("config/config.toml") mqtt_host = Host(config["mqtt"]["hostname"], config["mqtt"]["port"], config["mqtt"].setdefault("username", None), config["mqtt"].setdefault("password", None)) bffhd_host = Host(config["bffhd"]["hostname"], config["bffhd"]["port"]) readers = map(lambda reader: Reader(reader["id"], reader["machine"], bffhd_host), config["readers"].values()) while True: try: await run_adapter(mqtt_host, readers) except MqttError as error: logging.error(f'"{error}". Reconnecting in {reconnect_interval} seconds.') finally: await asyncio.sleep(reconnect_interval) if __name__ == "__main__": LOGLEVEL = os.environ.get('LOGLEVEL', 'INFO').upper() logging.basicConfig(level=LOGLEVEL) asyncio.run(main())