mirror of
https://gitlab.com/sfz.aalen/infra/fabaccess.git
synced 2025-03-12 15:01:47 +01:00
65 lines
2.7 KiB
Python
65 lines
2.7 KiB
Python
|
import psycopg2
|
||
|
from psycopg2 import sql, extensions
|
||
|
|
||
|
from config import Config
|
||
|
|
||
|
class SQLHandler:
|
||
|
@staticmethod
|
||
|
def setup():
|
||
|
SQLHandler.cursor = None
|
||
|
SQLHandler.conn = None
|
||
|
SQLHandler.conn = psycopg2.connect(host=Config.db_host_name, user=Config.db_user_name, port=Config.db_port, password=Config.db_password, dbname=Config.db_database)
|
||
|
# get the isolation leve for autocommit
|
||
|
autocommit = extensions.ISOLATION_LEVEL_AUTOCOMMIT
|
||
|
print ("ISOLATION_LEVEL_AUTOCOMMIT:", extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||
|
|
||
|
# set the isolation level for the connection's cursors
|
||
|
# will raise ActiveSqlTransaction exception otherwise
|
||
|
SQLHandler.conn.set_isolation_level( autocommit )
|
||
|
SQLHandler.cursor = SQLHandler.conn.cursor()
|
||
|
|
||
|
@staticmethod
|
||
|
def get_machine_data(reader_id):
|
||
|
# TODO Database
|
||
|
# - get machine_id from database
|
||
|
# - get last_user from database
|
||
|
# - get machine_status from database, is 0 for off, 1 for on
|
||
|
# - get plug_id from database
|
||
|
SQLHandler.cursor.execute("SELECT machine_id,last_user,machine_status,plug_id FROM readerplug WHERE reader_id = %s;", (reader_id,))
|
||
|
data = [row for row in SQLHandler.cursor.fetchall()]
|
||
|
print(data)
|
||
|
return {
|
||
|
'machine_id': data[0],
|
||
|
'last_user': data[1],
|
||
|
'machine_status': data[2],
|
||
|
'plug_id': data[3]
|
||
|
}
|
||
|
|
||
|
|
||
|
@staticmethod
|
||
|
def init_db():
|
||
|
SQLHandler.cursor.execute("SELECT datname FROM pg_database;")
|
||
|
dbs = [row[0] for row in SQLHandler.cursor.fetchall()]
|
||
|
if not (Config.db_database in dbs):
|
||
|
print(f"Missing database ({Config.db_database}) -> creating new db")
|
||
|
SQLHandler.cursor.execute(sql.SQL("CREATE DATABASE {};").format(sql.Identifier( Config.db_database )))
|
||
|
else:
|
||
|
print(f"Found DB {Config.db_database} -> Using existing one")
|
||
|
|
||
|
SQLHandler.cursor.execute("SELECT * FROM pg_catalog.pg_tables\
|
||
|
WHERE schemaname != 'pg_catalog' AND \
|
||
|
schemaname != 'information_schema';")
|
||
|
tables = [row[1] for row in SQLHandler.cursor.fetchall()]
|
||
|
if not ("readerplug" in tables):
|
||
|
print("Missing table -> creating new table in db")
|
||
|
SQLHandler.cursor.execute("\
|
||
|
CREATE TABLE readerplug (\
|
||
|
reader_id int NOT NULL, \
|
||
|
plug_id varchar(255) NOT NULL, \
|
||
|
machine_id varchar(255) NOT NULL, \
|
||
|
machine_status boolean NOT NULL, \
|
||
|
last_user varchar(255) NOT NULL \
|
||
|
)")
|
||
|
else:
|
||
|
print("Found Table -> Using existing one")
|
||
|
SQLHandler.conn.commit()
|