mirror of
https://gitlab.com/fabinfra/fabaccess/fabfire_adapter.git
synced 2025-03-12 23:01:44 +01:00
19 lines
411 B
Python
19 lines
411 B
Python
import asyncio
|
|
from typing import Callable
|
|
|
|
import asyncio_mqtt
|
|
|
|
|
|
class Timer:
|
|
def __init__(self, timeout: int, callback: Callable):
|
|
self._timeout = timeout
|
|
self._callback = callback
|
|
self._task = asyncio.ensure_future(self._job())
|
|
|
|
async def _job(self):
|
|
await asyncio.sleep(self._timeout)
|
|
await self._callback()
|
|
|
|
def cancel(self):
|
|
self._task.cancel()
|