mirror of
https://gitlab.com/sfz.aalen/infra/fabaccess.git
synced 2025-03-12 15:01:47 +01:00
101 lines
3.2 KiB
Python
101 lines
3.2 KiB
Python
import json
|
|
import os
|
|
from config import Config
|
|
from mqtt_client import MqttHandler
|
|
from keycloak_handler import KeycloakHandler
|
|
from sql_handler import SQLHandler
|
|
|
|
def has_permission(user_permissions, machine_id):
|
|
parsed_permissions = [permission.split('.') for permission in user_permissions]
|
|
parsed_machine_id = machine_id.split('.')
|
|
|
|
for permission in parsed_permissions:
|
|
missmatch = False
|
|
for i, id_sequence in enumerate(parsed_machine_id):
|
|
if permission[i] == '*':
|
|
return True
|
|
if permission[i] != id_sequence:
|
|
missmatch = True
|
|
break
|
|
if not missmatch:
|
|
return True
|
|
|
|
return False
|
|
|
|
def gen_display_name(user):
|
|
# display names must be not longer than 8 chrs
|
|
if 'firstName' in user.keys() and 'lastName' in user.keys():
|
|
full_name = f'{user["firstName"]} {user["lastName"]}'
|
|
if len(full_name) > 8:
|
|
display_name = f'{user["firstName"][0]}.{user["lastName"][:6]}'
|
|
else:
|
|
try:
|
|
display_name = user['username'][:8]
|
|
except KeyError:
|
|
print('user has no username')
|
|
return 'Error'
|
|
return(display_name)
|
|
|
|
def handle_request(msg, client):
|
|
print('')
|
|
print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic')
|
|
fabcard_id = json.loads(msg.payload.decode())['UID']
|
|
reader_id = msg.topic.split('/')[-1]
|
|
|
|
KeycloakHandler.login()
|
|
user = KeycloakHandler.get_user_by_card_id(fabcard_id)
|
|
if not user:
|
|
MqttHandler.print_to_display(reader_id, 16, fabcard_id)
|
|
return
|
|
|
|
db_data = SQLHandler.get_machine_data(reader_id)
|
|
|
|
machine_id = db_data["machine_id"]
|
|
last_user = db_data["last_user"]
|
|
machine_status = db_data["machine_status"]
|
|
plug_id = db_data["plug_id"]
|
|
|
|
try:
|
|
user_permissions = json.loads(user['attributes']['FabPermissions'][0])
|
|
except KeyError:
|
|
print(f'user with id {fabcard_id} is missing FabPermissions attr')
|
|
except IndexError:
|
|
print(f'user with id {fabcard_id} is missing FabPermissions attr')
|
|
|
|
if not has_permission(user_permissions, machine_id):
|
|
print(f"user with id {fabcard_id} is missing {machine_id}")
|
|
MqttHandler.print_to_display(reader_id, 7, '')
|
|
return
|
|
|
|
username = user['username']
|
|
display_name = gen_display_name(user)
|
|
|
|
if not machine_status:
|
|
print(f'Turn Plug {plug_id} on')
|
|
MqttHandler.switch_plug(plug_id, 1)
|
|
MqttHandler.print_to_display(reader_id, 20, f'Login\n{display_name}')
|
|
else:
|
|
if not (username == last_user or KeycloakHandler.user_is_privileged(username)):
|
|
MqttHandler.print_to_display(reader_id, 9, last_user)
|
|
return
|
|
print(f'Turn Plug {plug_id} off')
|
|
MqttHandler.switch_plug(plug_id, 0)
|
|
MqttHandler.print_to_display(reader_id, 20, f'Bitte anmelden')
|
|
|
|
SQLHandler.update_machine(reader_id, username, machine_status)
|
|
MqttHandler.publish(f'/FabLogging/{plug_id}/USER', username)
|
|
|
|
|
|
def main():
|
|
MqttHandler.setup(handle_request)
|
|
KeycloakHandler.login()
|
|
SQLHandler.setup()
|
|
|
|
MqttHandler.connect_mqtt()
|
|
MqttHandler.subscribe("/rfid_reader/#")
|
|
SQLHandler.init_db()
|
|
MqttHandler.loop()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|