mirror of
https://gitlab.com/fabinfra/fabaccess/fabfire_adapter.git
synced 2025-03-12 14:51:50 +01:00
95 lines
3.1 KiB
Python
95 lines
3.1 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from contextlib import AsyncExitStack, asynccontextmanager
|
|
from asyncio_mqtt import Client, MqttError
|
|
import toml
|
|
|
|
from host import Host
|
|
from reader import Reader
|
|
|
|
|
|
async def run_adapter(mqtt_host: Host, readers: list[Reader]):
|
|
# We 💛 context managers. Let's create a stack to help
|
|
# us manage them.
|
|
async with AsyncExitStack() as stack:
|
|
# Keep track of the asyncio tasks that we create, so that
|
|
# we can cancel them on exit
|
|
tasks = set()
|
|
stack.push_async_callback(cancel_tasks, tasks)
|
|
|
|
# Connect to the MQTT broker
|
|
client = Client(hostname=mqtt_host.hostname, port=mqtt_host.port, username=mqtt_host.username, password=mqtt_host.password)
|
|
await stack.enter_async_context(client)
|
|
|
|
# setup async handler for readers
|
|
for reader in readers:
|
|
manager = client.filtered_messages(f"fabreader/{reader.reader_id}/#")
|
|
messages = await stack.enter_async_context(manager)
|
|
task = asyncio.create_task(reader.handle_messages(messages, client))
|
|
tasks.add(task)
|
|
logging.info(f"Registered handler for reader {reader.reader_id}")
|
|
|
|
# Messages that doesn't match a filter will get logged here
|
|
messages = await stack.enter_async_context(client.unfiltered_messages())
|
|
task = asyncio.create_task(log_unhandled(messages))
|
|
tasks.add(task)
|
|
|
|
# Subscribe to topic(s)
|
|
# 🤔 Note that we subscribe *after* starting the message
|
|
# loggers. Otherwise, we may miss retained messages.
|
|
await client.subscribe("fabreader/#")
|
|
|
|
logging.info("Initialization done")
|
|
|
|
# Wait for everything to complete (or fail due to, e.g., network
|
|
# errors)
|
|
await asyncio.gather(*tasks)
|
|
|
|
|
|
async def log_unhandled(messages):
|
|
async for message in messages:
|
|
logging.warning(f"received unhandled message on: {message.topic}")
|
|
|
|
|
|
async def cancel_tasks(tasks):
|
|
for task in tasks:
|
|
if task.done():
|
|
continue
|
|
try:
|
|
task.cancel()
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
|
|
async def main():
|
|
# Run the advanced_example indefinitely. Reconnect automatically
|
|
# if the connection is lost.
|
|
reconnect_interval = 3 # [seconds]
|
|
|
|
config = toml.load("config/config.toml")
|
|
|
|
mqtt_host = Host(config["mqtt"]["hostname"], config["mqtt"]["port"], config["mqtt"].setdefault("username", None),
|
|
config["mqtt"].setdefault("password", None))
|
|
bffhd_host = Host(config["bffhd"]["hostname"], config["bffhd"]["port"])
|
|
|
|
readers = map(lambda reader: Reader(reader["id"], reader["machine"], bffhd_host), config["readers"].values())
|
|
|
|
while True:
|
|
try:
|
|
await run_adapter(mqtt_host, readers)
|
|
except MqttError as error:
|
|
logging.error(f'"{error}". Reconnecting in {reconnect_interval} seconds.')
|
|
finally:
|
|
await asyncio.sleep(reconnect_interval)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
LOGLEVEL = os.environ.get('LOGLEVEL', 'INFO').upper()
|
|
logging.basicConfig(level=LOGLEVEL)
|
|
|
|
asyncio.run(main())
|