fabfire_adapter/main.py

98 lines
3.3 KiB
Python
Raw Normal View History

2022-03-13 19:16:45 +01:00
from __future__ import annotations
2022-03-13 17:54:59 +01:00
import asyncio
import logging
import os
from contextlib import AsyncExitStack, asynccontextmanager
from asyncio_mqtt import Client, MqttError
import toml
2022-03-16 05:59:18 +01:00
from host import Host
from reader import Reader
2022-03-13 17:54:59 +01:00
async def run_adapter(mqtt_host: Host, readers: list[Reader]):
# We 💛 context managers. Let's create a stack to help
# us manage them.
async with AsyncExitStack() as stack:
# Keep track of the asyncio tasks that we create, so that
# we can cancel them on exit
tasks = set()
stack.push_async_callback(cancel_tasks, tasks)
# Connect to the MQTT broker
client = Client(hostname=mqtt_host.hostname, port=mqtt_host.port, username=mqtt_host.username,
password=mqtt_host.password)
await stack.enter_async_context(client)
# setup async handler for readers
for reader in readers:
manager = client.filtered_messages(f"/rfid_reader/{reader.reader_id}")
messages = await stack.enter_async_context(manager)
task = asyncio.create_task(reader.handle_messages(messages, client))
tasks.add(task)
2022-03-16 05:59:18 +01:00
await client.publish(f"/cmnd/reader/{reader.reader_id}", payload='{"Cmd":"haltPICC"}', qos=2,
retain=False) # reset readers
2022-03-13 17:54:59 +01:00
logging.info(f"Registered handler for reader {reader.reader_id}")
# Messages that doesn't match a filter will get logged here
messages = await stack.enter_async_context(client.unfiltered_messages())
task = asyncio.create_task(log_unhandled(messages))
tasks.add(task)
# Subscribe to topic(s)
# 🤔 Note that we subscribe *after* starting the message
# loggers. Otherwise, we may miss retained messages.
await client.subscribe("/rfid_reader/#")
logging.info("Initialization done")
# Wait for everything to complete (or fail due to, e.g., network
# errors)
await asyncio.gather(*tasks)
async def log_unhandled(messages):
async for message in messages:
logging.warning(f"received unhandled message: {message.payload.decode()}")
async def cancel_tasks(tasks):
for task in tasks:
if task.done():
continue
try:
task.cancel()
await task
except asyncio.CancelledError:
pass
async def main():
# Run the advanced_example indefinitely. Reconnect automatically
# if the connection is lost.
reconnect_interval = 3 # [seconds]
2022-03-16 05:59:18 +01:00
config = toml.load("config/config.toml")
2022-03-13 17:54:59 +01:00
mqtt_host = Host(config["mqtt"]["hostname"], config["mqtt"]["port"], config["mqtt"].setdefault("username", None),
config["mqtt"].setdefault("password", None))
bffhd_host = Host(config["bffhd"]["hostname"], config["bffhd"]["port"])
readers = map(lambda reader: Reader(reader["id"], reader["machine"], bffhd_host), config["readers"].values())
while True:
try:
await run_adapter(mqtt_host, readers)
except MqttError as error:
logging.error(f'"{error}". Reconnecting in {reconnect_interval} seconds.')
finally:
await asyncio.sleep(reconnect_interval)
if __name__ == "__main__":
LOGLEVEL = os.environ.get('LOGLEVEL', 'INFO').upper()
logging.basicConfig(level=LOGLEVEL)
asyncio.run(main())