From c45de56b6774d0998d07b3edee8968fe8171ace0 Mon Sep 17 00:00:00 2001 From: Kai Jan Kriegel Date: Sun, 13 Mar 2022 17:54:59 +0100 Subject: [PATCH] added support for multiple readers --- config.toml | 16 ++++++ fabapi/connect.py | 16 +++--- main.py | 139 ++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 4 ++ single.py | 44 --------------- 5 files changed, 168 insertions(+), 51 deletions(-) create mode 100644 config.toml create mode 100644 main.py create mode 100644 requirements.txt delete mode 100644 single.py diff --git a/config.toml b/config.toml new file mode 100644 index 0000000..d7a5e8f --- /dev/null +++ b/config.toml @@ -0,0 +1,16 @@ +[mqtt] +hostname = "192.168.178.31" +port = 1883 + +[bffhd] +hostname = "192.168.178.31" +port = 59661 + +[readers] + [readers.abc] + id = "111" + machine = "urn:fabaccess:resource:Testmachine" + + [readers.def] + id = "222" + machine = "urn:fabaccess:resource:Another" diff --git a/fabapi/connect.py b/fabapi/connect.py index 81b8ff1..9df7233 100644 --- a/fabapi/connect.py +++ b/fabapi/connect.py @@ -2,6 +2,7 @@ import asyncio import socket import ssl import capnp +import logging connection_capnp = capnp.load('schema/connection.capnp') authenticationsystem_capnp = capnp.load('schema/authenticationsystem.capnp') @@ -53,9 +54,10 @@ async def connect(host, port, user, pw): rep_prom = req.send() response = await rep_prom.a_wait() if response.response.outcome.result == "successful": + logging.info("Auth completed successfully") return cap else: - print("Auth failed") + logging.info("Auth failed") return None @@ -91,12 +93,12 @@ async def connect_with_fabfire_initial(host, port, uid): req.request.initialResponse.initial = uid rep_prom = req.send() response = await rep_prom.a_wait() - print(response.response.which()) + logging.debug(f"got response type: {response.response.which()}") if response.response.which() == "challence": - print(f"challenge: {response.response.challence}") + logging.debug(f"challenge: {response.response.challence}") return auth, response.response.challence, cap else: - print(f"Auth failed: {response.response.outcome.result}, additional info: {response.response.outcome.helpText}") + logging.error(f"Auth failed: {response.response.outcome.result}, additional info: {response.response.outcome.helpText}") return None @@ -106,11 +108,11 @@ async def connect_with_fabfire_step(auth, msg): rep_prom = req.send() response = await rep_prom.a_wait() if response.response.which() == "challence": - print(f"challenge: {response.response.challence}") + logging.debug(f"challenge: {response.response.challence}") return response.response.challence, False # auth cap, challenge, not done elif response.response.outcome.result == "successful": - print(f"Auth completed successfully! Got additional Data: {response.response.outcome.additionalData.additional}") + logging.info(f"Auth completed successfully! Got additional Data: {response.response.outcome.additionalData.additional}") return response.response.outcome.additionalData.additional, True # dont care, message, we are done else: - print(f"Auth failed: {response.response.outcome.result}, additional info: {response.response.outcome.helpText}") + logging.error(f"Auth failed: {response.response.outcome.result}, additional info: {response.response.outcome.helpText}") return None diff --git a/main.py b/main.py new file mode 100644 index 0000000..b7823b8 --- /dev/null +++ b/main.py @@ -0,0 +1,139 @@ +import asyncio +import logging +import os +from contextlib import AsyncExitStack, asynccontextmanager +from dataclasses import dataclass +from typing import Optional +import asyncio_mqtt +from asyncio_mqtt import Client, MqttError +import fabapi +import toml + + +@dataclass(frozen=True) +class Host: + hostname: str + port: int + username: Optional[str] = None + password: Optional[str] = None + + +class Reader: + def __init__(self, reader_id: str, machine_urn: str, bffhd_address: Host): + self.reader_id = reader_id + self.machine_urn = machine_urn + self.bffhd_address = bffhd_address + self.is_authenticated = False + self.auth_cap = None + self.bootstrap_cap = None + + async def handle_messages(self, messages, client: asyncio_mqtt.Client): + async for message in messages: + response_for_reader = None + if not self.auth_cap: + self.auth_cap, response_for_reader, self.bootstrap_cap = await fabapi.connect_with_fabfire_initial( + self.bffhd_address.hostname, self.bffhd_address.port, message.payload) + elif not self.is_authenticated: + response_for_reader, self.is_authenticated = await fabapi.connect_with_fabfire_step(self.auth_cap, + message.payload) + if self.is_authenticated: + await client.publish(f"/cmnd/reader/{self.reader_id}", payload='{"Cmd":"haltPICC"}', qos=2, + retain=False) + ms = await self.bootstrap_cap.machineSystem().a_wait() + info = await ms.machineSystem.info().a_wait() + ma = await info.info.getMachineURN(f"{self.machine_urn}").a_wait() + if ma.machine.state == "inUse": + await ma.machine.inuse.giveBack().a_wait() + else: + await ma.machine.use.use().a_wait() + self.is_authenticated = False + self.bootstrap_cap = None + self.auth_cap = None + response_for_reader = None + + if response_for_reader: + await client.publish(f"/cmnd/reader/{self.reader_id}", payload=response_for_reader, qos=2, retain=False) + + +async def run_adapter(mqtt_host: Host, readers: list[Reader]): + # We 💛 context managers. Let's create a stack to help + # us manage them. + async with AsyncExitStack() as stack: + # Keep track of the asyncio tasks that we create, so that + # we can cancel them on exit + tasks = set() + stack.push_async_callback(cancel_tasks, tasks) + + # Connect to the MQTT broker + client = Client(hostname=mqtt_host.hostname, port=mqtt_host.port, username=mqtt_host.username, + password=mqtt_host.password) + await stack.enter_async_context(client) + + # setup async handler for readers + for reader in readers: + manager = client.filtered_messages(f"/rfid_reader/{reader.reader_id}") + messages = await stack.enter_async_context(manager) + task = asyncio.create_task(reader.handle_messages(messages, client)) + tasks.add(task) + logging.info(f"Registered handler for reader {reader.reader_id}") + + # Messages that doesn't match a filter will get logged here + messages = await stack.enter_async_context(client.unfiltered_messages()) + task = asyncio.create_task(log_unhandled(messages)) + tasks.add(task) + + # Subscribe to topic(s) + # 🤔 Note that we subscribe *after* starting the message + # loggers. Otherwise, we may miss retained messages. + await client.subscribe("/rfid_reader/#") + + logging.info("Initialization done") + + # Wait for everything to complete (or fail due to, e.g., network + # errors) + await asyncio.gather(*tasks) + + +async def log_unhandled(messages): + async for message in messages: + logging.warning(f"received unhandled message: {message.payload.decode()}") + + +async def cancel_tasks(tasks): + for task in tasks: + if task.done(): + continue + try: + task.cancel() + await task + except asyncio.CancelledError: + pass + + +async def main(): + # Run the advanced_example indefinitely. Reconnect automatically + # if the connection is lost. + reconnect_interval = 3 # [seconds] + + config = toml.load("config.toml") + + mqtt_host = Host(config["mqtt"]["hostname"], config["mqtt"]["port"], config["mqtt"].setdefault("username", None), + config["mqtt"].setdefault("password", None)) + bffhd_host = Host(config["bffhd"]["hostname"], config["bffhd"]["port"]) + + readers = map(lambda reader: Reader(reader["id"], reader["machine"], bffhd_host), config["readers"].values()) + + while True: + try: + await run_adapter(mqtt_host, readers) + except MqttError as error: + logging.error(f'"{error}". Reconnecting in {reconnect_interval} seconds.') + finally: + await asyncio.sleep(reconnect_interval) + + +if __name__ == "__main__": + LOGLEVEL = os.environ.get('LOGLEVEL', 'INFO').upper() + logging.basicConfig(level=LOGLEVEL) + + asyncio.run(main()) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..8f99b95 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +asyncio-mqtt==0.12.1 +paho-mqtt==1.6.1 +pycapnp==1.1.0 +toml==0.10.2 diff --git a/single.py b/single.py deleted file mode 100644 index 6d62675..0000000 --- a/single.py +++ /dev/null @@ -1,44 +0,0 @@ -import asyncio - -from asyncio_mqtt import Client -import json -import fabapi - - - -async def main(): - done = False - cap = None - auth = None - msg = None - - async with Client("192.168.178.31") as client: - await client.publish("/cmnd/reader", payload='{"Cmd":"haltPICC"}', qos=2, retain=False) - await client.publish("/cmnd/reader", payload='{"Cmd": "message", "MssgID": 0, "AddnTxt":" Karte auflegen"}', qos=2, retain=False) - async with client.filtered_messages("/rfid_reader/111") as messages: - await client.subscribe("/rfid_reader/#") - async for message in messages: - if not auth: - auth, msg, cap = await fabapi.connect_with_fabfire_initial("192.168.178.31", 59661, message.payload) - elif not done: - msg, done = await fabapi.connect_with_fabfire_step(auth, message.payload) - if done: - await client.publish("/cmnd/reader", payload='{"Cmd":"haltPICC"}', qos=2, retain=False) - ms = await cap.machineSystem().a_wait() - info = await ms.machineSystem.info().a_wait() - ma = await info.info.getMachineURN("urn:fabaccess:resource:Testmachine").a_wait() - if ma.machine.state == "inUse": - await ma.machine.inuse.giveBack().a_wait() - else: - await ma.machine.use.use().a_wait() - done = False - cap = None - auth = None - msg = None - - await client.publish("/cmnd/reader", payload=msg, qos=2, retain=False) - - -if __name__ == "__main__": - loop = asyncio.get_event_loop() - loop.run_until_complete(main())