This commit is contained in:
TheJoKlLa 2022-10-01 21:31:18 +02:00
parent 8652b244ff
commit ab697f938e
3 changed files with 17 additions and 14 deletions

View File

@ -9,7 +9,7 @@ port = 59661
[readers] [readers]
[readers.00001] [readers.00001]
id = "00001" id = "00001"
machine = "urn:fabaccess:resource:MachineA1" machine = "urn:fabaccess:resource:Yetmore"
[readers.00002] [readers.00002]
id = "00002" id = "00002"

View File

@ -26,11 +26,10 @@ async def run_adapter(mqtt_host: Host, readers: list[Reader]):
# setup async handler for readers # setup async handler for readers
for reader in readers: for reader in readers:
manager = client.filtered_messages(f"/fabreader/{reader.reader_id}") manager = client.filtered_messages(f"fabreader/{reader.reader_id}/#")
messages = await stack.enter_async_context(manager) messages = await stack.enter_async_context(manager)
task = asyncio.create_task(reader.handle_messages(messages, client)) task = asyncio.create_task(reader.handle_messages(messages, client))
tasks.add(task) tasks.add(task)
await client.publish(f"fabreader/{reader.reader_id}/stopOTA", payload=None, qos=2, retain=False)
logging.info(f"Registered handler for reader {reader.reader_id}") logging.info(f"Registered handler for reader {reader.reader_id}")
# Messages that doesn't match a filter will get logged here # Messages that doesn't match a filter will get logged here
@ -52,7 +51,7 @@ async def run_adapter(mqtt_host: Host, readers: list[Reader]):
async def log_unhandled(messages): async def log_unhandled(messages):
async for message in messages: async for message in messages:
logging.warning(f"received unhandled message: {message.payload.decode()}") logging.warning(f"received unhandled message on: {message.topic}")
async def cancel_tasks(tasks): async def cancel_tasks(tasks):

View File

@ -18,8 +18,9 @@ class Reader:
self.timeout_timer = None self.timeout_timer = None
async def handle_messages(self, messages, client: asyncio_mqtt.Client): async def handle_messages(self, messages, client: asyncio_mqtt.Client):
try: async for message in messages:
async for message in messages: try:
print(message.topic)
response_for_reader = None response_for_reader = None
if message.topic == f"fabreader/{self.reader_id}/startOTA": if message.topic == f"fabreader/{self.reader_id}/startOTA":
self.timeout_timer = Timer(10, lambda: self.handle_timeout(client)) self.timeout_timer = Timer(10, lambda: self.handle_timeout(client))
@ -28,7 +29,7 @@ class Reader:
response_for_reader, self.session = await fabapi.connect_with_fabfire_step(self.auth_cap, message.payload) response_for_reader, self.session = await fabapi.connect_with_fabfire_step(self.auth_cap, message.payload)
if self.session: if self.session:
self.timeout_timer.cancel() self.timeout_timer.cancel()
await client.publish(f"fabreader/{self.reader_id}/stopOTA", payload=None, qos=2, retain=False) await client.publish(f"fabreader/{self.reader_id}/stopOTA", payload="", qos=2, retain=False)
info = self.session.machineSystem.info info = self.session.machineSystem.info
ma = await info.getMachineURN(f"{self.machine_urn}").a_wait() ma = await info.getMachineURN(f"{self.machine_urn}").a_wait()
@ -55,18 +56,21 @@ class Reader:
self.session = None self.session = None
self.auth_cap = None self.auth_cap = None
response_for_reader = None response_for_reader = None
self.timeout_timer.cancel()
if response_for_reader: if response_for_reader:
await client.publish(f"fabreader/{self.reader_id}/requestOTA", payload=response_for_reader, qos=2, retain=False) await client.publish(f"fabreader/{self.reader_id}/requestOTA", payload=response_for_reader, qos=2, retain=False)
except Exception as e: except Exception as e:
logging.error(f"Caught exception {e}") logging.error(f"Caught exception {e}")
await client.publish(f"fabreader/{self.reader_id}/stopOTA", payload=None, qos=2, retain=False) await client.publish(f"fabreader/{self.reader_id}/stopOTA", payload="", qos=2, retain=False)
self.session = None self.session = None
self.auth_cap = None self.auth_cap = None
response_for_reader = None
self.timeout_timer.cancel()
async def handle_timeout(self, client): async def handle_timeout(self, client):
await client.publish(f"fabreader/{self.reader_id}/stopOTA", payload=None, qos=2, retain=False) await client.publish(f"fabreader/{self.reader_id}/stopOTA", payload="", qos=2, retain=False)
logging.critical(f"authentication timed out on reader {self.reader_id}") logging.error(f"authentication timed out on reader {self.reader_id}")
self.auth_cap = None self.auth_cap = None
self.session = None self.session = None