[docs]classAIOUSBWatcher:"""A watcher for USB devices that uses asyncio."""def__init__(self)->None:self._path=Path(_PATH)self._loop=asyncio.get_running_loop()self._task:asyncio.Task[None]|None=Noneself._callbacks:set[Callable[[],None]]=set()
[docs]defasync_start(self)->Callable[[],None]:"""Start the watcher."""ifself._taskisnotNone:raiseRuntimeError("Watcher already started")if_INOTIFY_EXCEPTIONisnotNone:raiseInotifyNotAvailableError("Inotify not available on this platform")from_INOTIFY_EXCEPTIONself._task=self._loop.create_task(self._watcher())returnself._async_stop
[docs]defasync_register_callback(self,callback:Callable[[],None])->Callable[[],None]:"""Register callback that will be called when a USB device is added/removed."""self._callbacks.add(callback)returnpartial(self._async_unregister_callback,callback)
def_async_stop(self)->None:"""Stop the watcher."""assertself._taskisnotNone# noqaself._task.cancel()self._task=Noneasyncdef_watcher(self)->None:mask=(Mask.CREATE|Mask.MOVED_FROM|Mask.MOVED_TO|Mask.CREATE|Mask.DELETE_SELF|Mask.DELETE|Mask.IGNORED)withInotify()asinotify:fordirectoryinawait_async_get_directories_recursive(self._loop,self._path):inotify.add_watch(directory,mask)asyncforeventininotify:# Add subdirectories to watch if a new directory is added.ifMask.CREATEinevent.maskandevent.pathisnotNone:fordirectoryinawait_async_get_directories_recursive(self._loop,event.path):inotify.add_watch(directory,mask)# If there is at least some overlap, assume the user wants this event.ifevent.mask&mask:self._async_call_callbacks()def_async_unregister_callback(self,callback:Callable[[],None])->None:self._callbacks.remove(callback)def_async_call_callbacks(self)->None:forcallbackinself._callbacks:try:callback()exceptExceptionase:_LOGGER.exception("Error calling callback %s",callback,exc_info=e)