mirror of
https://gitlab.com/fabinfra/fabaccess/fabfire_adapter.git
synced 2025-03-12 23:01:44 +01:00
added support for multiple readers
This commit is contained in:
parent
233b9af159
commit
c45de56b67
16
config.toml
Normal file
16
config.toml
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
[mqtt]
|
||||||
|
hostname = "192.168.178.31"
|
||||||
|
port = 1883
|
||||||
|
|
||||||
|
[bffhd]
|
||||||
|
hostname = "192.168.178.31"
|
||||||
|
port = 59661
|
||||||
|
|
||||||
|
[readers]
|
||||||
|
[readers.abc]
|
||||||
|
id = "111"
|
||||||
|
machine = "urn:fabaccess:resource:Testmachine"
|
||||||
|
|
||||||
|
[readers.def]
|
||||||
|
id = "222"
|
||||||
|
machine = "urn:fabaccess:resource:Another"
|
@ -2,6 +2,7 @@ import asyncio
|
|||||||
import socket
|
import socket
|
||||||
import ssl
|
import ssl
|
||||||
import capnp
|
import capnp
|
||||||
|
import logging
|
||||||
|
|
||||||
connection_capnp = capnp.load('schema/connection.capnp')
|
connection_capnp = capnp.load('schema/connection.capnp')
|
||||||
authenticationsystem_capnp = capnp.load('schema/authenticationsystem.capnp')
|
authenticationsystem_capnp = capnp.load('schema/authenticationsystem.capnp')
|
||||||
@ -53,9 +54,10 @@ async def connect(host, port, user, pw):
|
|||||||
rep_prom = req.send()
|
rep_prom = req.send()
|
||||||
response = await rep_prom.a_wait()
|
response = await rep_prom.a_wait()
|
||||||
if response.response.outcome.result == "successful":
|
if response.response.outcome.result == "successful":
|
||||||
|
logging.info("Auth completed successfully")
|
||||||
return cap
|
return cap
|
||||||
else:
|
else:
|
||||||
print("Auth failed")
|
logging.info("Auth failed")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
@ -91,12 +93,12 @@ async def connect_with_fabfire_initial(host, port, uid):
|
|||||||
req.request.initialResponse.initial = uid
|
req.request.initialResponse.initial = uid
|
||||||
rep_prom = req.send()
|
rep_prom = req.send()
|
||||||
response = await rep_prom.a_wait()
|
response = await rep_prom.a_wait()
|
||||||
print(response.response.which())
|
logging.debug(f"got response type: {response.response.which()}")
|
||||||
if response.response.which() == "challence":
|
if response.response.which() == "challence":
|
||||||
print(f"challenge: {response.response.challence}")
|
logging.debug(f"challenge: {response.response.challence}")
|
||||||
return auth, response.response.challence, cap
|
return auth, response.response.challence, cap
|
||||||
else:
|
else:
|
||||||
print(f"Auth failed: {response.response.outcome.result}, additional info: {response.response.outcome.helpText}")
|
logging.error(f"Auth failed: {response.response.outcome.result}, additional info: {response.response.outcome.helpText}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
@ -106,11 +108,11 @@ async def connect_with_fabfire_step(auth, msg):
|
|||||||
rep_prom = req.send()
|
rep_prom = req.send()
|
||||||
response = await rep_prom.a_wait()
|
response = await rep_prom.a_wait()
|
||||||
if response.response.which() == "challence":
|
if response.response.which() == "challence":
|
||||||
print(f"challenge: {response.response.challence}")
|
logging.debug(f"challenge: {response.response.challence}")
|
||||||
return response.response.challence, False # auth cap, challenge, not done
|
return response.response.challence, False # auth cap, challenge, not done
|
||||||
elif response.response.outcome.result == "successful":
|
elif response.response.outcome.result == "successful":
|
||||||
print(f"Auth completed successfully! Got additional Data: {response.response.outcome.additionalData.additional}")
|
logging.info(f"Auth completed successfully! Got additional Data: {response.response.outcome.additionalData.additional}")
|
||||||
return response.response.outcome.additionalData.additional, True # dont care, message, we are done
|
return response.response.outcome.additionalData.additional, True # dont care, message, we are done
|
||||||
else:
|
else:
|
||||||
print(f"Auth failed: {response.response.outcome.result}, additional info: {response.response.outcome.helpText}")
|
logging.error(f"Auth failed: {response.response.outcome.result}, additional info: {response.response.outcome.helpText}")
|
||||||
return None
|
return None
|
||||||
|
139
main.py
Normal file
139
main.py
Normal file
@ -0,0 +1,139 @@
|
|||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from contextlib import AsyncExitStack, asynccontextmanager
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Optional
|
||||||
|
import asyncio_mqtt
|
||||||
|
from asyncio_mqtt import Client, MqttError
|
||||||
|
import fabapi
|
||||||
|
import toml
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class Host:
|
||||||
|
hostname: str
|
||||||
|
port: int
|
||||||
|
username: Optional[str] = None
|
||||||
|
password: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
class Reader:
|
||||||
|
def __init__(self, reader_id: str, machine_urn: str, bffhd_address: Host):
|
||||||
|
self.reader_id = reader_id
|
||||||
|
self.machine_urn = machine_urn
|
||||||
|
self.bffhd_address = bffhd_address
|
||||||
|
self.is_authenticated = False
|
||||||
|
self.auth_cap = None
|
||||||
|
self.bootstrap_cap = None
|
||||||
|
|
||||||
|
async def handle_messages(self, messages, client: asyncio_mqtt.Client):
|
||||||
|
async for message in messages:
|
||||||
|
response_for_reader = None
|
||||||
|
if not self.auth_cap:
|
||||||
|
self.auth_cap, response_for_reader, self.bootstrap_cap = await fabapi.connect_with_fabfire_initial(
|
||||||
|
self.bffhd_address.hostname, self.bffhd_address.port, message.payload)
|
||||||
|
elif not self.is_authenticated:
|
||||||
|
response_for_reader, self.is_authenticated = await fabapi.connect_with_fabfire_step(self.auth_cap,
|
||||||
|
message.payload)
|
||||||
|
if self.is_authenticated:
|
||||||
|
await client.publish(f"/cmnd/reader/{self.reader_id}", payload='{"Cmd":"haltPICC"}', qos=2,
|
||||||
|
retain=False)
|
||||||
|
ms = await self.bootstrap_cap.machineSystem().a_wait()
|
||||||
|
info = await ms.machineSystem.info().a_wait()
|
||||||
|
ma = await info.info.getMachineURN(f"{self.machine_urn}").a_wait()
|
||||||
|
if ma.machine.state == "inUse":
|
||||||
|
await ma.machine.inuse.giveBack().a_wait()
|
||||||
|
else:
|
||||||
|
await ma.machine.use.use().a_wait()
|
||||||
|
self.is_authenticated = False
|
||||||
|
self.bootstrap_cap = None
|
||||||
|
self.auth_cap = None
|
||||||
|
response_for_reader = None
|
||||||
|
|
||||||
|
if response_for_reader:
|
||||||
|
await client.publish(f"/cmnd/reader/{self.reader_id}", payload=response_for_reader, qos=2, retain=False)
|
||||||
|
|
||||||
|
|
||||||
|
async def run_adapter(mqtt_host: Host, readers: list[Reader]):
|
||||||
|
# We 💛 context managers. Let's create a stack to help
|
||||||
|
# us manage them.
|
||||||
|
async with AsyncExitStack() as stack:
|
||||||
|
# Keep track of the asyncio tasks that we create, so that
|
||||||
|
# we can cancel them on exit
|
||||||
|
tasks = set()
|
||||||
|
stack.push_async_callback(cancel_tasks, tasks)
|
||||||
|
|
||||||
|
# Connect to the MQTT broker
|
||||||
|
client = Client(hostname=mqtt_host.hostname, port=mqtt_host.port, username=mqtt_host.username,
|
||||||
|
password=mqtt_host.password)
|
||||||
|
await stack.enter_async_context(client)
|
||||||
|
|
||||||
|
# setup async handler for readers
|
||||||
|
for reader in readers:
|
||||||
|
manager = client.filtered_messages(f"/rfid_reader/{reader.reader_id}")
|
||||||
|
messages = await stack.enter_async_context(manager)
|
||||||
|
task = asyncio.create_task(reader.handle_messages(messages, client))
|
||||||
|
tasks.add(task)
|
||||||
|
logging.info(f"Registered handler for reader {reader.reader_id}")
|
||||||
|
|
||||||
|
# Messages that doesn't match a filter will get logged here
|
||||||
|
messages = await stack.enter_async_context(client.unfiltered_messages())
|
||||||
|
task = asyncio.create_task(log_unhandled(messages))
|
||||||
|
tasks.add(task)
|
||||||
|
|
||||||
|
# Subscribe to topic(s)
|
||||||
|
# 🤔 Note that we subscribe *after* starting the message
|
||||||
|
# loggers. Otherwise, we may miss retained messages.
|
||||||
|
await client.subscribe("/rfid_reader/#")
|
||||||
|
|
||||||
|
logging.info("Initialization done")
|
||||||
|
|
||||||
|
# Wait for everything to complete (or fail due to, e.g., network
|
||||||
|
# errors)
|
||||||
|
await asyncio.gather(*tasks)
|
||||||
|
|
||||||
|
|
||||||
|
async def log_unhandled(messages):
|
||||||
|
async for message in messages:
|
||||||
|
logging.warning(f"received unhandled message: {message.payload.decode()}")
|
||||||
|
|
||||||
|
|
||||||
|
async def cancel_tasks(tasks):
|
||||||
|
for task in tasks:
|
||||||
|
if task.done():
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
task.cancel()
|
||||||
|
await task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
# Run the advanced_example indefinitely. Reconnect automatically
|
||||||
|
# if the connection is lost.
|
||||||
|
reconnect_interval = 3 # [seconds]
|
||||||
|
|
||||||
|
config = toml.load("config.toml")
|
||||||
|
|
||||||
|
mqtt_host = Host(config["mqtt"]["hostname"], config["mqtt"]["port"], config["mqtt"].setdefault("username", None),
|
||||||
|
config["mqtt"].setdefault("password", None))
|
||||||
|
bffhd_host = Host(config["bffhd"]["hostname"], config["bffhd"]["port"])
|
||||||
|
|
||||||
|
readers = map(lambda reader: Reader(reader["id"], reader["machine"], bffhd_host), config["readers"].values())
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
await run_adapter(mqtt_host, readers)
|
||||||
|
except MqttError as error:
|
||||||
|
logging.error(f'"{error}". Reconnecting in {reconnect_interval} seconds.')
|
||||||
|
finally:
|
||||||
|
await asyncio.sleep(reconnect_interval)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
LOGLEVEL = os.environ.get('LOGLEVEL', 'INFO').upper()
|
||||||
|
logging.basicConfig(level=LOGLEVEL)
|
||||||
|
|
||||||
|
asyncio.run(main())
|
4
requirements.txt
Normal file
4
requirements.txt
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
asyncio-mqtt==0.12.1
|
||||||
|
paho-mqtt==1.6.1
|
||||||
|
pycapnp==1.1.0
|
||||||
|
toml==0.10.2
|
44
single.py
44
single.py
@ -1,44 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
|
|
||||||
from asyncio_mqtt import Client
|
|
||||||
import json
|
|
||||||
import fabapi
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
|
||||||
done = False
|
|
||||||
cap = None
|
|
||||||
auth = None
|
|
||||||
msg = None
|
|
||||||
|
|
||||||
async with Client("192.168.178.31") as client:
|
|
||||||
await client.publish("/cmnd/reader", payload='{"Cmd":"haltPICC"}', qos=2, retain=False)
|
|
||||||
await client.publish("/cmnd/reader", payload='{"Cmd": "message", "MssgID": 0, "AddnTxt":" Karte auflegen"}', qos=2, retain=False)
|
|
||||||
async with client.filtered_messages("/rfid_reader/111") as messages:
|
|
||||||
await client.subscribe("/rfid_reader/#")
|
|
||||||
async for message in messages:
|
|
||||||
if not auth:
|
|
||||||
auth, msg, cap = await fabapi.connect_with_fabfire_initial("192.168.178.31", 59661, message.payload)
|
|
||||||
elif not done:
|
|
||||||
msg, done = await fabapi.connect_with_fabfire_step(auth, message.payload)
|
|
||||||
if done:
|
|
||||||
await client.publish("/cmnd/reader", payload='{"Cmd":"haltPICC"}', qos=2, retain=False)
|
|
||||||
ms = await cap.machineSystem().a_wait()
|
|
||||||
info = await ms.machineSystem.info().a_wait()
|
|
||||||
ma = await info.info.getMachineURN("urn:fabaccess:resource:Testmachine").a_wait()
|
|
||||||
if ma.machine.state == "inUse":
|
|
||||||
await ma.machine.inuse.giveBack().a_wait()
|
|
||||||
else:
|
|
||||||
await ma.machine.use.use().a_wait()
|
|
||||||
done = False
|
|
||||||
cap = None
|
|
||||||
auth = None
|
|
||||||
msg = None
|
|
||||||
|
|
||||||
await client.publish("/cmnd/reader", payload=msg, qos=2, retain=False)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
loop.run_until_complete(main())
|
|
Loading…
x
Reference in New Issue
Block a user