Refactor fab_access python

This commit is contained in:
Lukas Brodschelm 2022-11-03 00:41:51 +01:00
parent 094139f796
commit ce5ca3bee6
5 changed files with 212 additions and 178 deletions

9
fab_access/config.py Normal file
View File

@ -0,0 +1,9 @@
class Config:
broker = 'mosquitto'
port = 1883
client_id = f'FabMan'
keycloak_url = 'http://keycloak:8080/auth/'
keycloak_username = os.environ['KEYCLOAK_USER_NAME']
keycloak_password = os.environ['KEYCLOAK_USER_PW']
keycloak_realm = os.environ['KEYCLOAK_REALM']

View File

@ -0,0 +1,44 @@
from config import Config
from keycloak import KeycloakAdmin
class KeycloakHandler:
Config.keycloak_password
@staticmethod
def login():
KeycloakHandler.admin = KeycloakAdmin(
server_url=Config.keycloak_url,
username=Config.keycloak_username,
password=Config.keycloak_password,
realm_name=Config.keycloak_realm,
verify=True
)
@staticmethod
def get_user_by_card_id(card_id):
users = KeycloakHandler.admin.get_users(
{ 'attributes': { 'FabCard': card_id }}
)
print(f'Found {len(users)} users with card_id: {card_id}')
match len(users):
case 0:
return None
case 1:
print(f'FabCard matches with user {users[0]["username"]}')
return users[0]
case other:
print(f'Error! too many users with card_id: {card_id}')
return None
@staticmethod
def user_is_privileged(username):
groups = KeycloakHandler.admin.get_user_groups(user_id=username)
groups = [group['name'] for group in groups]
if 'Mentoren' in groups:
print('Overrided becouse of "Mentor" group')
return True
else:
return False

98
fab_access/main.py Normal file
View File

@ -0,0 +1,98 @@
import json
import os
from config import Config
from mqtt_client import MqttHandler
from keycloak_handler import KeycloakHandler
def has_permission(user_permissions, machine_id):
parsed_permissions = [permission.split('.') for permission in user_permissions]
parsed_machine_id = machine_id.split('.')
for permission in parsed_permissions:
missmatch = False
for i, id_sequence in enumerate(parsed_machine_id):
if permission[i] == '*':
return True
if permission[i] != id_sequence:
missmatch = True
break
if not missmatch:
return True
return False
def gen_display_name(user):
# display names must be not longer than 8 chrs
if 'firstName' in user.keys() and 'lastName' in user.keys():
full_name = f'{user["firstName"]} {user["lastName"]}'
if len(full_name) > 8:
full_name = 'f{user["firstName"][0]}.{user["lastName"][:6]}'
else:
try:
display_name = user['username'][:8]
except KeyError:
print('user has no username')
return 'Error'
def handle_request(msg, client):
print('')
print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic')
fabcard_id = json.loads(msg.payload.decode())['UID']
reader_id = msg.topic.split('/')[-1]
user = KeycloakHandler.get_user_by_card_id(fabcard_id)
if not user:
MqttHandler.print_to_display(reader_id, 16, fabcard_id)
return
# TODO Database
# - get machine_id from database
# - get last_user from database
# - get machine_status from database, is 0 for off, 1 for on
# - get plug_id from database
machine_id = 'space.foo.lazerspacer'
last_user = 'foo.bar'
machine_status = 0 # or 1
plug_id = 'lazerspacer'
try:
user_permissions = json.loads(user['attributes']['FabPermissions'][0])
except KeyError:
print(f'user with id {fabcard_id} is missing FabPermissions attr')
except IndexError:
print(f'user with id {fabcard_id} is missing FabPermissions attr')
if not has_permission(user_permissions, machine_id):
MqttHandler.print_to_display(reader_id, 7, '')
return
username = user['username']
display_name = gen_display_name(user)
if machine_status == 0:
print(f'Turn Plug {plug_id} on')
MqttHandler.switch_plug(plug_id, 1)
MqttHandler.print_to_display(reader_id, 20, f'Login\n{display_name}')
else:
if not (username == last_user or KeycloakHandler.user_is_privileged(username)):
MqttHandler.print_to_display(reader_id, 9, last_user)
return
print(f'Turn Plug {plug_id} off')
MqttHandler.switch_plug(plug_id, 0)
MqttHandler.print_to_display(reader_id, 20, f'Bitte anmelden')
# TODO Update Database:
# - last user
# - machine_status
MqttHandler.publish(f'/FabLogging/{plug_id}/USER', username)
def main():
MqttHandler.setup(handle_request)
MqttHandler.subscribe("/rfid_reader/#")
MqttHandler.loop()
if __name__ == '__main__':
main()

61
fab_access/mqtt_client.py Normal file
View File

@ -0,0 +1,61 @@
from paho.mqtt import client as mqtt_client
from config import Config
import json
class MqttHandler:
@staticmethod
def setup(msg_handler):
MqttHandler.msg_handler = msg_handler
MqttHandler.client = None
@staticmethod
def connect_mqtt():
def on_connect(userdata, flags, rc):
if rc == 0:
print('Connected to MQTT Broker!')
else:
print('Failed to connect, return code %d\n', rc)
MqttHandler.client = mqtt_client.Client(Config.client_id)
MqttHandler.client.username_pw_set('admin', 'user')
MqttHandler.client.on_connect = on_connect
MqttHandler.client.connect(Config.broker, Config.port)
@staticmethod
def publish(topic, msg):
result = MqttHandler.client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
#print(f'Send `{msg}` to topic `{topic}`')
pass
else:
print(f'Failed to send message to topic {topic}')
@staticmethod
def subscribe(topic):
def on_message(client, userdata, msg):
MqttHandler.msg_handler(msg, client)
MqttHandler.client.subscribe(topic)
MqttHandler.client.on_message = on_message
@staticmethod
def loop():
MqttHandler.client.loop_forever()
@staticmethod
def switch_plug(PlugID, state):
MqttHandler.publish(f'/FabLogging/{PlugID}/POWER', 1 if state else 0)
MqttHandler.publish(f'/aktoren/{PlugID}/cmnd/POWER', 'On' if state else 'OFF')
@staticmethod
def print_to_display(MachineID, status, text):
message = {
'Cmd': 'message',
'MssgID': status,
'ClrTxt': '',
'AddnTxt': text,
}
MqttHandler.publish(f'/cmnd/reader/{MachineID}', json.dumps(message))

178
main.py
View File

@ -1,178 +0,0 @@
from paho.mqtt import client as mqtt_client
from keycloak import KeycloakAdmin
import json
import mysql.connector
import os
broker = 'mosquitto'
port = 1883
client_id = f'FabMan'
KEYCLOAK_URL = "http://keycloak:8080/auth/"
KEYCLOAK_USERNAME = os.environ['KEYCLOAK_USER_NAME']
KEYCLOAK_PASSWORD = os.environ['KEYCLOAK_USER_PW']
REALM = os.environ['KEYCLOAK_REALM']
FabDB = mysql.connector.connect(
host="FabDB",
user=os.environ['FABDB_DB_USER_NAME'],
password=os.environ['FABDB_DB_USER_PW'],
database=os.environ['FABDB_DB_NAME']
)
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set("admin", "user")
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client,topic,msg):
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
#print(f"Send `{msg}` to topic `{topic}`")
pass
else:
print(f"Failed to send message to topic {topic}")
def changePlug(client,PlugID,state):
publish(client,f"/FabLogging/{PlugID}/POWER",state)
cmd = ["OFF", "On"][state]
publish(client,f"/aktoren/{PlugID}/cmnd/POWER",cmd)
def changeDisplay(client,MachineID,status,text):
publish(client,f"/cmnd/reader/{MachineID}",'{"Cmd": "message", "MssgID": %s , "ClrTxt":"" , "AddnTxt":"%s"}' % ((status), (text)))
def checkPermission(UserPermsJSON,PermissionPath):
permission = False
error = 0
MachinePermArray = PermissionPath.split(".")
for UserPerm in UserPermsJSON:
#print(f"check {UserPerm}")
UserPermArray = UserPerm.split(".")
x = 0
for UserPermPart in UserPermArray:
if(error == 0):
#print(f"Compare {UserPermPart} and {MachinePermArray[x]}")
if not (MachinePermArray[x] == UserPermPart):
if(UserPermPart == "*"):
#print("* regelt")
permission = True
else:
error = 1
#print(f"MISmatch between {MachinePermArray[x]} and {UserPermPart}")
else:
pass
#print(f"Match between {MachinePermArray[x]} and {UserPermPart}")
x = x + 1
if(error == 1):
pass
#print("Error")
else:
#print("Hurra")
permission = True
error = 0
return permission
def msg_handler(msg,client):
print("")
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
topic = msg.topic
MQTT_KEY = json.loads(msg.payload.decode())["UID"]
keycloak_admin = KeycloakAdmin( server_url=KEYCLOAK_URL, username=KEYCLOAK_USERNAME, password=KEYCLOAK_PASSWORD, realm_name=REALM, verify=True)
users = keycloak_admin.get_users({})
knownUser = False
ReaderID = msg.topic.split("/")[-1]
for user in users:
try:
FabCardID = user['attributes']['FabCard'][0]
UserPermissions = user['attributes']['FabPermissions'][0]
if (FabCardID == MQTT_KEY):
knownUser = True
print(f"FabCard matches with user {user['username']}")
mycursor = FabDB.cursor()
mycursor.execute(f'SELECT PlugName,PermissionPath,Status,LastUser FROM ReaderPlug WHERE ReaderID="{ReaderID}";')
myValues = mycursor.fetchall()[0]
PlugID = myValues[0]
PermissionPath = myValues[1]
MachineStatus = myValues[2]
LastUser = myValues[3]
#print(PermissionPath)
UserPermsJSON = json.loads(UserPermissions)
permission = checkPermission(UserPermsJSON, PermissionPath)
print('System "use" access %s' % ['denied','granted'][permission])
if(permission):
error = False
status = MachineStatus
ActualUser = user['username']
if(status):
status = False
if(LastUser == ActualUser):
error = False
else:
try:
if (keycloak_admin.get_user_groups(user_id=user['id'])[0]['name'] == 'Mentoren'):
print('Overrided becouse of "Mentor" Group')
else:
print(f'Bereits benutzt von {LastUser}')
error = True
except IndexError:
print("No groups available")
error = True
if(error):
changeDisplay(client, ReaderID, 9, LastUser)
else:
status = True
error = False
if(error == False):
#KSstatus ^= True
switch = ["off", "on"][status]
print(f"Turn Plug {switch}")
#print(status)
changePlug(client, PlugID, status)
publish(client,f"/FabLogging/{PlugID}/USER",ActualUser)
try:
firstCombo = user['firstName']+" "
DisplayUser = firstCombo+user['lastName']
if(len(firstCombo) >= 7):
DisplayUser = user['firstName'][:7]
except KeyError:
DisplayUser = user['username']
if(len(DisplayUser) > 7):
DisplayUser = DisplayUser[0:7] + "." + DisplayUser[7+1: ]
DisplayUser = DisplayUser[:8]
changeDisplay(client, ReaderID, [20, 20][status], ["Bitte anmelden", f"Login\n{DisplayUser}"][status])
mycursor.execute(f'UPDATE ReaderPlug SET Status={status}, LastUser="{ActualUser}" WHERE ReaderID="{ReaderID}";')
FabDB.commit()
else:
changeDisplay(client, ReaderID, 7, "")
except KeyError:
# Key is not present
pass
if not (knownUser):
changeDisplay(client, ReaderID, 16, MQTT_KEY)
def subscribe(client: mqtt_client, topic):
def on_message(client, userdata, msg):
msg_handler(msg,client)
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client,"/rfid_reader/#")
client.loop_forever()
if __name__ == '__main__':
run()