refactor python codebase

variable configuration through config.py and code splitting into
multiple files. Also added requirements.txt and gitignore.
This commit is contained in:
Philipp Fruck 2022-11-01 21:44:04 +01:00
parent 69c7933894
commit bb144d78bc
No known key found for this signature in database
GPG Key ID: 9B7D2672DB7F47AD
6 changed files with 382 additions and 178 deletions

160
.gitignore vendored Normal file
View File

@ -0,0 +1,160 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

33
fab_access/config.py Normal file
View File

@ -0,0 +1,33 @@
import os
def _read_from_env(key: str, default: str = None) -> str:
if (value := os.environ.get(key, default)) is not None:
return value
raise Exception(f'[Config] Error: Cannot find required value {key}')
def get_keycloak_config():
return {
'server_url': _read_from_env('KEYCLOAK_URL'),
'username': _read_from_env('KEYCLOAK_USERNAME'),
'password': _read_from_env('KEYCLOAK_PASSWORD'),
'realm_name': _read_from_env('REALM'),
}
def get_database_config():
return {
'host': _read_from_env('DB_HOSTNAME'),
'user': _read_from_env('DB_USERNAME'),
'port': int(_read_from_env('DB_PORT', '3306')),
'password': _read_from_env('DB_PASSWORD'),
'dbname': _read_from_env('DB_DATABASE'),
}
def get_mqtt_config():
print("port", int(_read_from_env('MQTT_PORT', '1883')))
return {
'client_name': _read_from_env('MQTT_CLIENT'),
'username': _read_from_env('MQTT_USERNAME', ''),
'password': _read_from_env('MQTT_PASSWORD', ''),
'broker': _read_from_env('MQTT_BROKER'),
'port': int(_read_from_env('MQTT_PORT', '1883')),
}

153
fab_access/main.py Normal file
View File

@ -0,0 +1,153 @@
from keycloak import KeycloakAdmin
import json
import psycopg
import os
from paho.mqtt.client import Client as MQTTClient
import config
from mqtt_helper import MQTTHelper
conn: psycopg.Connection
keycloak_admin: KeycloakAdmin
def publish(client, topic,msg):
result = client.publish(topic, msg)
status: 0 | 1 = result[0]
if status == 0:
#print(f"Send `{msg}` to topic `{topic}`")
pass
else:
print(f"Failed to send message to topic {topic}")
def changePlug(client: MQTTClient, plug_id: str, is_running: bool):
publish(client,f"/FabLogging/{plug_id}/POWER", is_running)
cmd = "On" if is_running else "Off"
publish(client, f"/aktoren/{plug_id}/cmnd/POWER", cmd)
def changeDisplay(client: MQTTClient, reader_id: str, status: int, fab_card_id: str):
publish(client, f"/cmnd/reader/{reader_id}", '{"Cmd": "message", "MssgID": %s , "ClrTxt":"" , "AddnTxt":"%s"}' % (status, fab_card_id))
def hasPermission(UserPermsJSON, PermissionPath):
# ToDo: refactor this
permission = False
error = 0
MachinePermArray = PermissionPath.split(".")
for UserPerm in UserPermsJSON:
#print(f"check {UserPerm}")
UserPermArray = UserPerm.split(".")
x = 0
for UserPermPart in UserPermArray:
if(error == 0):
#print(f"Compare {UserPermPart} and {MachinePermArray[x]}")
if not (MachinePermArray[x] == UserPermPart):
if(UserPermPart == "*"):
#print("* regelt")
permission = True
else:
error = 1
#print(f"MISmatch between {MachinePermArray[x]} and {UserPermPart}")
else:
pass
#print(f"Match between {MachinePermArray[x]} and {UserPermPart}")
x = x + 1
if(error == 1):
pass
#print("Error")
else:
#print("Hurra")
permission = True
error = 0
return permission
def handle_msg(client: MQTTClient, userdata, msg):
global conn
payload = msg.payload.decode()
print(f"Received `{payload}` from `{msg.topic}` topic")
fab_card_id = json.loads(msg.payload.decode())["UID"]
reader_id = msg.topic.split("/")[-1]
# check user exists
users = keycloak_admin.get_users({ 'attributes': { 'FabCard': fab_card_id } })
if (len(users) != 1):
print(f'Found {len(users)} users with {fab_card_id=}')
changeDisplay(client, reader_id, 16, fab_card_id)
return
# retrieve user attributes from DB
user = users[0]
print(f"FabCard matches with user {user['username']}")
with conn.cursor() as cursor:
cursor.execute(f'SELECT PlugName,PermissionPath,Status,LastUser FROM ReaderPlug WHERE ReaderID=?;', (reader_id,))
res = cursor.fetchone()
if res is None:
print(f'Error fetching card info from db for {reader_id=}')
return
plug_id, permission_path, is_running, last_user = res
# check permissions
user_permissions = user['attributes']['FabPermissions'][0]
if not hasPermission(user_permissions, permission_path):
changeDisplay(client, reader_id, 7, "")
return
# check for mentor
if is_running and last_user != user['username']:
try:
if (keycloak_admin.get_user_groups(user_id=user['id'])[0]['name'] == 'Mentoren'):
print('Overrided becouse of "Mentor" Group')
else:
print(f'Bereits benutzt von {last_user}')
error = True
except IndexError:
print("No groups available")
error = True
if(error):
changeDisplay(client, reader_id, 9, last_user)
# toggle machine state
print(f"Turn Plug {'off' if is_running else 'on'}")
changePlug(client, plug_id, is_running)
publish(client, f"/FabLogging/{plug_id}/USER", user['username'])
# ToDo: refactor display name construction
try:
firstCombo = user['firstName']+" "
DisplayUser = firstCombo+user['lastName']
if(len(firstCombo) >= 7):
DisplayUser = user['firstName'][:7]
except KeyError:
DisplayUser = user['username']
if(len(DisplayUser) > 7):
DisplayUser = DisplayUser[0:7] + "." + DisplayUser[7+1: ]
DisplayUser = DisplayUser[:8]
changeDisplay(client, reader_id, 20, f"Login\n{DisplayUser}" if is_running else "Bitte anmelden")
# write new status to db
with conn.cursor() as cursor:
cursor.execute(
f'UPDATE ReaderPlug SET Status=?, LastUser="?" WHERE ReaderID="?";',
(is_running, user['username'], reader_id),
)
conn.commit()
def main():
global conn
mqtt_client = MQTTHelper(**config.get_mqtt_config())
mqtt_client.subscribe("/rfid_reader/#", handle_msg)
mqtt_client.loop_forever()
try:
conn = psycopg.connect(**config.get_database_config())
except mariadb.Error as e:
print(f"Error connecting to MariaDB Platform")
raise e
keycloak_admin = KeycloakAdmin(**config.get_keycloak_config(), verify=True)
if __name__ == '__main__':
main()

32
fab_access/mqtt_helper.py Normal file
View File

@ -0,0 +1,32 @@
from paho.mqtt import client as mqtt_client
import config
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
raise Exception("Failed to connect, return code %d\n", rc)
class MQTTHelper():
def __init__(self, client_name: str, username: str, password: str, broker: str, port: int):
client = mqtt_client.Client(client_name)
if username and password:
client.username_pw_set(username, password)
else:
print("Connecting to MQTT without credentials")
client.on_connect = on_connect
try:
client.connect(broker, port)
except Exception as e:
raise Exception(f"Error connecting to MQTT {broker}:{port}") from e
self.client = client
def subscribe(self, topic: str, handler):
self.client.subscribe(topic)
self.client.on_message = handler
def loop_forever(self):
self.client.loop_forever()

178
main.py
View File

@ -1,178 +0,0 @@
from paho.mqtt import client as mqtt_client
from keycloak import KeycloakAdmin
import json
import mysql.connector
import os
broker = 'mosquitto'
port = 1883
client_id = f'FabMan'
KEYCLOAK_URL = "http://keycloak:8080/auth/"
KEYCLOAK_USERNAME = os.environ['KEYCLOAK_USER_NAME']
KEYCLOAK_PASSWORD = os.environ['KEYCLOAK_USER_PW']
REALM = os.environ['KEYCLOAK_REALM']
FabDB = mysql.connector.connect(
host="FabDB",
user=os.environ['FABDB_DB_USER_NAME'],
password=os.environ['FABDB_DB_USER_PW'],
database=os.environ['FABDB_DB_NAME']
)
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set("admin", "user")
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client,topic,msg):
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
#print(f"Send `{msg}` to topic `{topic}`")
pass
else:
print(f"Failed to send message to topic {topic}")
def changePlug(client,PlugID,state):
publish(client,f"/FabLogging/{PlugID}/POWER",state)
cmd = ["OFF", "On"][state]
publish(client,f"/aktoren/{PlugID}/cmnd/POWER",cmd)
def changeDisplay(client,MachineID,status,text):
publish(client,f"/cmnd/reader/{MachineID}",'{"Cmd": "message", "MssgID": %s , "ClrTxt":"" , "AddnTxt":"%s"}' % ((status), (text)))
def checkPermission(UserPermsJSON,PermissionPath):
permission = False
error = 0
MachinePermArray = PermissionPath.split(".")
for UserPerm in UserPermsJSON:
#print(f"check {UserPerm}")
UserPermArray = UserPerm.split(".")
x = 0
for UserPermPart in UserPermArray:
if(error == 0):
#print(f"Compare {UserPermPart} and {MachinePermArray[x]}")
if not (MachinePermArray[x] == UserPermPart):
if(UserPermPart == "*"):
#print("* regelt")
permission = True
else:
error = 1
#print(f"MISmatch between {MachinePermArray[x]} and {UserPermPart}")
else:
pass
#print(f"Match between {MachinePermArray[x]} and {UserPermPart}")
x = x + 1
if(error == 1):
pass
#print("Error")
else:
#print("Hurra")
permission = True
error = 0
return permission
def msg_handler(msg,client):
print("")
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
topic = msg.topic
MQTT_KEY = json.loads(msg.payload.decode())["UID"]
keycloak_admin = KeycloakAdmin( server_url=KEYCLOAK_URL, username=KEYCLOAK_USERNAME, password=KEYCLOAK_PASSWORD, realm_name=REALM, verify=True)
users = keycloak_admin.get_users({})
knownUser = False
ReaderID = msg.topic.split("/")[-1]
for user in users:
try:
FabCardID = user['attributes']['FabCard'][0]
UserPermissions = user['attributes']['FabPermissions'][0]
if (FabCardID == MQTT_KEY):
knownUser = True
print(f"FabCard matches with user {user['username']}")
mycursor = FabDB.cursor()
mycursor.execute(f'SELECT PlugName,PermissionPath,Status,LastUser FROM ReaderPlug WHERE ReaderID="{ReaderID}";')
myValues = mycursor.fetchall()[0]
PlugID = myValues[0]
PermissionPath = myValues[1]
MachineStatus = myValues[2]
LastUser = myValues[3]
#print(PermissionPath)
UserPermsJSON = json.loads(UserPermissions)
permission = checkPermission(UserPermsJSON, PermissionPath)
print('System "use" access %s' % ['denied','granted'][permission])
if(permission):
error = False
status = MachineStatus
ActualUser = user['username']
if(status):
status = False
if(LastUser == ActualUser):
error = False
else:
try:
if (keycloak_admin.get_user_groups(user_id=user['id'])[0]['name'] == 'Mentoren'):
print('Overrided becouse of "Mentor" Group')
else:
print(f'Bereits benutzt von {LastUser}')
error = True
except IndexError:
print("No groups available")
error = True
if(error):
changeDisplay(client, ReaderID, 9, LastUser)
else:
status = True
error = False
if(error == False):
#KSstatus ^= True
switch = ["off", "on"][status]
print(f"Turn Plug {switch}")
#print(status)
changePlug(client, PlugID, status)
publish(client,f"/FabLogging/{PlugID}/USER",ActualUser)
try:
firstCombo = user['firstName']+" "
DisplayUser = firstCombo+user['lastName']
if(len(firstCombo) >= 7):
DisplayUser = user['firstName'][:7]
except KeyError:
DisplayUser = user['username']
if(len(DisplayUser) > 7):
DisplayUser = DisplayUser[0:7] + "." + DisplayUser[7+1: ]
DisplayUser = DisplayUser[:8]
changeDisplay(client, ReaderID, [20, 20][status], ["Bitte anmelden", f"Login\n{DisplayUser}"][status])
mycursor.execute(f'UPDATE ReaderPlug SET Status={status}, LastUser="{ActualUser}" WHERE ReaderID="{ReaderID}";')
FabDB.commit()
else:
changeDisplay(client, ReaderID, 7, "")
except KeyError:
# Key is not present
pass
if not (knownUser):
changeDisplay(client, ReaderID, 16, MQTT_KEY)
def subscribe(client: mqtt_client, topic):
def on_message(client, userdata, msg):
msg_handler(msg,client)
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client,"/rfid_reader/#")
client.loop_forever()
if __name__ == '__main__':
run()

4
requirements.txt Normal file
View File

@ -0,0 +1,4 @@
keycloak_wrapper
paho-mqtt
psycopg[binary]
python-keycloak