actors.eq3-eqiva-smartlock/eq3-eqiva-smartlock.py

123 lines
4.0 KiB
Python
Raw Permalink Normal View History

2025-02-16 23:18:24 +01:00
#!/usr/bin/env python3
import sys
import argparse
import paho.mqtt
import paho.mqtt.publish as publish
import paho.mqtt.client as mqtt
import time
def on_free(args, actor_name):
if args.verbose > 2:
print("on_free called!")
if actor_name == "DoorControl1":
print("I'm locking door 1!")
pass
elif actor_name == "DoorControl2":
print("I'm locking door 2!")
pass # Close a different door
else:
if not args.quiet:
print("process called with unknown id %s for state `Free`" % actor_name)
exit(-1)
def on_use(args, actor_name, user_id):
if args.verbose > 2:
print("on_use called!")
if actor_name == "OpenTheDoor":
print("I'm opening door 1 for 10 seconds!")
publish.single("door_lock/command", payload="4", qos=0, retain=False, hostname="mqtt.makerspace-bocholt.local",
port=1883, client_id="", keepalive=60, will=None,
auth=None , tls=None,
protocol=mqtt.MQTTv311)
pass # Open a different door
else:
if not args.quiet:
print("process called with unknown id %s for state `InUse`" % actor_name)
exit(-1)
def on_tocheck(args, actor_name, user_id):
if args.verbose > 2:
print("on_tocheck called!")
if not args.quiet:
print("process called with unexpected combo id %s and state 'ToCheck'" % actor_name)
exit(-1)
def on_blocked(args, actor_name, user_id):
if args.verbose > 2:
print("on_blocked called!")
if not args.quiet:
print("process called with unexpected combo id %s and state 'Blocked'" % actor_name)
exit(-1)
def on_disabled(args, actor_name):
if not args.quiet:
print("process called with unexpected combo id %s and state 'Disabled'" % actor_name)
exit(-1)
def on_reserve(args, actor_name, user_id):
if not args.quiet:
print("process called with unexpected combo id %s and state 'Reserved'" % actor_name)
exit(-1)
def main(args):
if args.verbose is not None:
if args.verbose == 1:
print("verbose output enabled")
elif args.verbose == 2:
print("loud output enabled!")
elif args.verbose == 3:
print("LOUD output enabled!!!")
elif args.verbose > 4:
print("Okay stop you're being ridiculous.")
sys.exit(-2)
else:
args.verbose = 0
new_state = args.state
if new_state == "free":
on_free(args, args.name)
elif new_state == "inuse":
on_use(args, args.name, args.userid)
elif new_state == "tocheck":
on_tocheck(args, args.name, args.userid)
elif new_state == "blocked":
on_blocked(args, args.name, args.userid)
elif new_state == "disabled":
on_disabled(args, args.name)
elif new_state == "reserved":
on_reserve(args, args.name, args.userid)
else:
print("Process actor called with unknown state %s" % new_state)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-q", "--quiet", help="be less verbose", action="store_true")
parser.add_argument("-v", "--verbose", help="be more verbose", action="count")
parser.add_argument("name",
help="name of this actor as configured in bffh.dhall"
)
subparsers = parser.add_subparsers(required=True, dest="state")
parser_free = subparsers.add_parser("free")
parser_inuse = subparsers.add_parser("inuse")
parser_inuse.add_argument("userid", help="The user that is now using the machine")
parser_tocheck = subparsers.add_parser("tocheck")
parser_tocheck.add_argument("userid", help="The user that should go check the machine")
parser_blocked = subparsers.add_parser("blocked")
parser_blocked.add_argument("userid", help="The user that marked the machine as blocked")
parser_disabled = subparsers.add_parser("disabled")
parser_reserved = subparsers.add_parser("reserved")
parser_reserved.add_argument("userid", help="The user that reserved the machine")
args = parser.parse_args()
main(args)