163 lines
5.6 KiB
Python
163 lines
5.6 KiB
Python
|
'''
|
||
|
Hilfe:
|
||
|
- Scanner wird nicht mehr erkannt? Abziehen vom USB-Slot für 1-2 Minuten hilft + Stoppen dieses Programms für die gleiche Zeit
|
||
|
- Scanner-Wert wird nicht übertragen? -> Knopf ca. 1 Sekunde gedrückt halten. Nicht bloß antippen!
|
||
|
'''
|
||
|
import usb.core
|
||
|
import usb.util
|
||
|
import time
|
||
|
from datetime import datetime
|
||
|
import pytz
|
||
|
from systemd import journal
|
||
|
import sqlite3
|
||
|
from sqlite3 import Error
|
||
|
from contextlib import closing
|
||
|
import os.path
|
||
|
|
||
|
idVendor=0x0581
|
||
|
idProduct=0x0115
|
||
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
|
db_file = os.path.join(BASE_DIR, "kugelstossmeeting-prod.db")
|
||
|
|
||
|
def printwrite(str): #write to log and to stdout
|
||
|
journal.write(str)
|
||
|
print(str)
|
||
|
|
||
|
def hid2ascii(lst):
|
||
|
#array('B', [0, 0, 0, 0, 0, 0, 0, 0]) # nothing, ignore
|
||
|
conv_table = {
|
||
|
0:['', ''],
|
||
|
4:['a', 'A'],
|
||
|
5:['b', 'B'],
|
||
|
6:['c', 'C'],
|
||
|
7:['d', 'D'],
|
||
|
8:['e', 'E'],
|
||
|
9:['f', 'F'],
|
||
|
10:['g', 'G'],
|
||
|
11:['h', 'H'],
|
||
|
12:['i', 'I'],
|
||
|
13:['j', 'J'],
|
||
|
14:['k', 'K'],
|
||
|
15:['l', 'L'],
|
||
|
16:['m', 'M'],
|
||
|
17:['n', 'N'],
|
||
|
18:['o', 'O'],
|
||
|
19:['p', 'P'],
|
||
|
20:['q', 'Q'],
|
||
|
21:['r', 'R'],
|
||
|
22:['s', 'S'],
|
||
|
23:['t', 'T'],
|
||
|
24:['u', 'U'],
|
||
|
25:['v', 'V'],
|
||
|
26:['w', 'W'],
|
||
|
27:['x', 'X'],
|
||
|
28:['z', 'Z'],
|
||
|
29:['y', 'Y'],
|
||
|
30:['1', '!'],
|
||
|
31:['2', '@'],
|
||
|
32:['3', '#'],
|
||
|
33:['4', '$'],
|
||
|
34:['5', '%'],
|
||
|
35:['6', '^'],
|
||
|
36:['7' ,'&'],
|
||
|
37:['8', '*'],
|
||
|
38:['9', '('],
|
||
|
39:['0', ')'],
|
||
|
40:['\n', '\n'],
|
||
|
41:['\x1b', '\x1b'],
|
||
|
42:['\b', '\b'],
|
||
|
43:['\t', '\t'],
|
||
|
44:[' ', ' '],
|
||
|
45:['_', '_'],
|
||
|
46:['=', '+'],
|
||
|
47:['[', '{'],
|
||
|
48:[']', '}'],
|
||
|
49:['\\', '|'],
|
||
|
50:['#', '~'],
|
||
|
51:[';', ':'],
|
||
|
52:["'", '"'],
|
||
|
53:['`', '~'],
|
||
|
54:[',', '<'],
|
||
|
55:['.', '>'],
|
||
|
56:['/', '?'],
|
||
|
100:['\\', '|'],
|
||
|
103:['=', '='],
|
||
|
}
|
||
|
|
||
|
line = ''
|
||
|
char = ''
|
||
|
shift = 0
|
||
|
|
||
|
#print(lst)
|
||
|
|
||
|
for byte in lst: #raw byte array
|
||
|
if byte == 2:
|
||
|
shift = 1
|
||
|
if byte != 0:
|
||
|
if byte not in conv_table and byte != 2:
|
||
|
printwrite("Warning: byte {0} not in conversion table".format(byte))
|
||
|
char = ''
|
||
|
line += char
|
||
|
elif byte != 40 and byte != 2: #skip newline character
|
||
|
char = conv_table[byte][shift]
|
||
|
line += char
|
||
|
shift = 0 #reset shift
|
||
|
return line
|
||
|
|
||
|
sleepTime = 5 #secs
|
||
|
def mainloop():
|
||
|
while True:
|
||
|
dev = usb.core.find(idVendor=idVendor, idProduct=idProduct)
|
||
|
time.sleep(sleepTime) #in jedem Fall etwas warten
|
||
|
if dev is None:
|
||
|
printwrite("No USB Scanner. Restarting mainloop {0}".format(sleepTime))
|
||
|
mainloop()
|
||
|
else:
|
||
|
try:
|
||
|
detached = dev.is_kernel_driver_active(0)
|
||
|
if detached is False:
|
||
|
printwrite("USB Scanner found! Device is already detached")
|
||
|
if detached is True:
|
||
|
dev.detach_kernel_driver(0)
|
||
|
#journal.write("Detached USB device from kernel driver")
|
||
|
printwrite("USB Scanner found! Detached from kernel driver")
|
||
|
|
||
|
cfg = dev.get_active_configuration()
|
||
|
intf = cfg[(0,0)]
|
||
|
ep = usb.util.find_descriptor(intf, custom_match = lambda e: \
|
||
|
usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN)
|
||
|
assert ep is not None, "Endpoint for USB device not found. Something is wrong."
|
||
|
while cfg is not None:
|
||
|
print("... in the mainloop")
|
||
|
# Wait up to 1 seconds for data
|
||
|
data = ep.read(1000)
|
||
|
line = hid2ascii(data)
|
||
|
if len(line) == 5: #wir akzepieren nur Codes, die 5-stellig sind (so, wie sie auf dem Ticket sind)
|
||
|
#printwrite(line)
|
||
|
sql = "INSERT INTO scans(unixtimestamp_created,barcode,validated,hint,unixtimestamp_checked,skipped) VALUES(?,?,?,?,?,?)"
|
||
|
with closing(sqlite3.connect(db_file)) as conn:
|
||
|
with closing(conn.cursor()) as cur:
|
||
|
#standardmäßig validated=0 und leerer Hinweis. Das wird vom GUI befüllt!
|
||
|
ts = time.time_ns()
|
||
|
res = cur.execute(sql,(int(ts),line,0,None,None,0))
|
||
|
printwrite("{0}: Inserting {1} into database".format(
|
||
|
datetime.fromtimestamp(int(str(ts)[0:10]), pytz.timezone("Europe/Berlin")).strftime('%d.%m.%Y %H:%M:%S'),
|
||
|
line))
|
||
|
conn.commit()
|
||
|
time.sleep(0.1)
|
||
|
except KeyboardInterrupt:
|
||
|
printwrite("Stopping program")
|
||
|
break
|
||
|
except usb.core.USBError as e:
|
||
|
printwrite("Device disappeared. Restarting mainloop | Error: {}".format(e))
|
||
|
#Error: [Errno None] Configuration not set
|
||
|
#Error: [Errno 5] Input/Output Error
|
||
|
#Error: [Errno 16] Resource busy
|
||
|
#Error: [Errno 13] Access denied (insufficient permissions)
|
||
|
#Error: [Errno 19] No such device (it may have been disconnected)
|
||
|
#Error: [Errno 110] Operation timed out
|
||
|
time.sleep(sleepTime)
|
||
|
#continue
|
||
|
mainloop()
|
||
|
mainloop()
|