2022-11-03 21:22:13 +01:00

100 lines
3.1 KiB
Python

import json
import os
from config import Config
from mqtt_client import MqttHandler
from keycloak_handler import KeycloakHandler
def has_permission(user_permissions, machine_id):
parsed_permissions = [permission.split('.') for permission in user_permissions]
parsed_machine_id = machine_id.split('.')
for permission in parsed_permissions:
missmatch = False
for i, id_sequence in enumerate(parsed_machine_id):
if permission[i] == '*':
return True
if permission[i] != id_sequence:
missmatch = True
break
if not missmatch:
return True
return False
def gen_display_name(user):
# display names must be not longer than 8 chrs
if 'firstName' in user.keys() and 'lastName' in user.keys():
full_name = f'{user["firstName"]} {user["lastName"]}'
if len(full_name) > 8:
full_name = 'f{user["firstName"][0]}.{user["lastName"][:6]}'
else:
try:
display_name = user['username'][:8]
except KeyError:
print('user has no username')
return 'Error'
def handle_request(msg, client):
print('')
print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic')
fabcard_id = json.loads(msg.payload.decode())['UID']
reader_id = msg.topic.split('/')[-1]
user = KeycloakHandler.get_user_by_card_id(fabcard_id)
if not user:
MqttHandler.print_to_display(reader_id, 16, fabcard_id)
return
# TODO Database
# - get machine_id from database
# - get last_user from database
# - get machine_status from database, is 0 for off, 1 for on
# - get plug_id from database
machine_id = 'space.foo.lazerspacer'
last_user = 'foo.bar'
machine_status = 0 # or 1
plug_id = 'lazerspacer'
try:
user_permissions = json.loads(user['attributes']['FabPermissions'][0])
except KeyError:
print(f'user with id {fabcard_id} is missing FabPermissions attr')
except IndexError:
print(f'user with id {fabcard_id} is missing FabPermissions attr')
if not has_permission(user_permissions, machine_id):
MqttHandler.print_to_display(reader_id, 7, '')
return
username = user['username']
display_name = gen_display_name(user)
if machine_status == 0:
print(f'Turn Plug {plug_id} on')
MqttHandler.switch_plug(plug_id, 1)
MqttHandler.print_to_display(reader_id, 20, f'Login\n{display_name}')
else:
if not (username == last_user or KeycloakHandler.user_is_privileged(username)):
MqttHandler.print_to_display(reader_id, 9, last_user)
return
print(f'Turn Plug {plug_id} off')
MqttHandler.switch_plug(plug_id, 0)
MqttHandler.print_to_display(reader_id, 20, f'Bitte anmelden')
# TODO Update Database:
# - last user
# - machine_status
MqttHandler.publish(f'/FabLogging/{plug_id}/USER', username)
def main():
MqttHandler.setup(handle_request)
MqttHandler.connect_mqtt()
MqttHandler.subscribe("/rfid_reader/#")
MqttHandler.loop()
if __name__ == '__main__':
main()