From 40480cc319ea81cde228993e96d330f76efe6b4b Mon Sep 17 00:00:00 2001 From: Luke Bonaccorsi Date: Thu, 28 Sep 2023 10:15:45 +0100 Subject: [PATCH] Improve network handling --- custom_components/robovac/tuyalocalapi.py | 84 +++++++------- custom_components/robovac/vacuum.py | 129 ++++++++++------------ 2 files changed, 104 insertions(+), 109 deletions(-) diff --git a/custom_components/robovac/tuyalocalapi.py b/custom_components/robovac/tuyalocalapi.py index 77639db..7d86ce1 100644 --- a/custom_components/robovac/tuyalocalapi.py +++ b/custom_components/robovac/tuyalocalapi.py @@ -496,36 +496,23 @@ class Message: __bytes__ = bytes - class AsyncWrappedCallback: - def __init__(self, request, callback): - self.request = request - self.callback = callback - self.devices = [] - - def register(self, device): - self.devices.append(device) - device._handlers.setdefault(self.request.command, []) - device._handlers[self.request.command].append(self) - - def unregister(self, device): - self.devices.remove(device) - device._handlers[self.request.command].remove(self) - - def unregister_all(self): - while self.devices: - device = self.devices.pop() - device._handlers[self.request.command].remove(self) - - async def __call__(self, response, device): - if response.sequence == self.request.sequence: - asyncio.ensure_future(self.callback(response, device)) - self.unregister(device) - - async def async_send(self, device, callback=None): - if callback is not None: - wrapped = self.AsyncWrappedCallback(self, callback) - wrapped.register(device) + async def async_send(self, device): + device._listeners[self.sequence] = asyncio.Semaphore(0) await device._async_send(self) + try: + await asyncio.wait_for( + device._listeners[self.sequence].acquire(), timeout=device.timeout + ) + except: + _LOGGER.debug( + "Timed out waiting for response to sequence number {}".format( + self.sequence + ) + ) + del device._listeners[self.sequence] + raise + + return device._listeners.pop(self.sequence) @classmethod def from_bytes(cls, data, cipher=None): @@ -626,6 +613,7 @@ class TuyaDevice: host, timeout, ping_interval, + update_entity_state, local_key=None, port=6668, gateway_id=None, @@ -642,6 +630,7 @@ class TuyaDevice: self.timeout = timeout self.last_pong = 0 self.ping_interval = ping_interval + self.update_entity_state_cb = update_entity_state if len(local_key) != 16: raise InvalidKey("Local key should be a 16-character string") @@ -649,12 +638,12 @@ class TuyaDevice: self.cipher = TuyaCipher(local_key, self.version) self.writer = None self._handlers = { - Message.GET_COMMAND: [self.async_update_state], - Message.GRATUITOUS_UPDATE: [self.async_update_state], + Message.GRATUITOUS_UPDATE: [self.async_gratuitous_update_state], Message.PING_COMMAND: [self._async_pong_received], } self._dps = {} self._connected = False + self._listeners = {} def __repr__(self): return "{}({!r}, {!r}, {!r}, {!r})".format( @@ -681,6 +670,8 @@ class TuyaDevice: raise ConnectionTimeoutException("Connection timed out") from e self.reader, self.writer = await asyncio.open_connection(sock=sock) self._connected = True + asyncio.ensure_future(self._async_ping(self.ping_interval)) + asyncio.ensure_future(self._async_handle_message()) async def async_disconnect(self): _LOGGER.debug("Disconnected from {}".format(self)) @@ -689,17 +680,18 @@ class TuyaDevice: if self.writer is not None: self.writer.close() - async def async_get(self, callback=None): + async def async_get(self): payload = {"gwId": self.gateway_id, "devId": self.device_id} maybe_self = None if self.version < (3, 3) else self message = Message(Message.GET_COMMAND, payload, encrypt_for=maybe_self) - return await message.async_send(self, callback) + response = await message.async_send(self) + await self.async_update_state(response) - async def async_set(self, dps, callback=None): + async def async_set(self, dps): t = int(time.time()) payload = {"devId": self.device_id, "uid": "", "t": t, "dps": dps} message = Message(Message.SET_COMMAND, payload, encrypt_for=self) - await message.async_send(self, callback) + await message.async_send(self) def set(self, dps): _call_async(self.async_set, dps) @@ -721,7 +713,11 @@ class TuyaDevice: async def _async_pong_received(self, message, device): self.last_pong = time.time() - async def async_update_state(self, state_message, _): + async def async_gratuitous_update_state(self, state_message, _): + await self.async_update_state(state_message) + await self.update_entity_state_cb() + + async def async_update_state(self, state_message, _=None): _LOGGER.info("Received updated state {}: {}".format(self, self._dps)) self._dps.update(state_message.payload["dps"]) @@ -734,9 +730,8 @@ class TuyaDevice: asyncio.ensure_future(self.async_set(new_values)) async def _async_handle_message(self): - response_data = await self.reader.readuntil(MAGIC_SUFFIX_BYTES) - try: + response_data = await self.reader.readuntil(MAGIC_SUFFIX_BYTES) message = Message.from_bytes(response_data, self.cipher) except InvalidMessage as e: _LOGGER.error("Invalid message from {}: {}".format(self, e)) @@ -744,8 +739,16 @@ class TuyaDevice: _LOGGER.error("Failed to decrypt message from {}".format(self)) else: _LOGGER.debug("Received message from {}: {}".format(self, message)) - for c in self._handlers.get(message.command, []): - asyncio.ensure_future(c(message, self)) + if message.sequence in self._listeners: + sem = self._listeners[message.sequence] + if isinstance(sem, asyncio.Semaphore): + self._listeners[message.sequence] = message + sem.release() + else: + for c in self._handlers.get(message.command, []): + asyncio.ensure_future(c(message, self)) + + asyncio.ensure_future(self._async_handle_message()) async def _async_send(self, message, retries=4): try: @@ -753,7 +756,6 @@ class TuyaDevice: _LOGGER.debug("Sending to {}: {}".format(self, message)) self.writer.write(message.bytes()) await self.writer.drain() - await self._async_handle_message() except Exception as e: if retries == 0: if isinstance(e, socket.error): diff --git a/custom_components/robovac/vacuum.py b/custom_components/robovac/vacuum.py index c916924..5885f57 100644 --- a/custom_components/robovac/vacuum.py +++ b/custom_components/robovac/vacuum.py @@ -274,8 +274,9 @@ class RoboVacEntity(StateVacuumEntity): host=self.ip_address, local_key=self.access_token, timeout=2, - ping_interval=REFRESH_RATE, + ping_interval=REFRESH_RATE / 2, model_code=self.model_code[0:5], + update_entity_state=self.pushed_update_handler, ) except ModelNotSupportedException: self.error_code = "UNSUPPORTED_MODEL" @@ -311,43 +312,8 @@ class RoboVacEntity(StateVacuumEntity): try: await self.vacuum.async_get() - self.update_failures = 0 - self.tuyastatus = self.vacuum._dps - - # for 15C - self._attr_battery_level = self.tuyastatus.get(TUYA_CODES.BATTERY_LEVEL) - self.tuya_state = self.tuyastatus.get(TUYA_CODES.STATE) - self.error_code = self.tuyastatus.get(TUYA_CODES.ERROR_CODE) - self._attr_mode = self.tuyastatus.get(TUYA_CODES.MODE) - self._attr_fan_speed = self.tuyastatus.get(TUYA_CODES.FAN_SPEED) - if self.fan_speed == "No_suction": - self._attr_fan_speed = "No Suction" - elif self.fan_speed == "Boost_IQ": - self._attr_fan_speed = "Boost IQ" - elif self.fan_speed == "Quiet": - self._attr_fan_speed = "Pure" - # for G30 - self._attr_cleaning_area = self.tuyastatus.get(TUYA_CODES.CLEANING_AREA) - self._attr_cleaning_time = self.tuyastatus.get(TUYA_CODES.CLEANING_TIME) - self._attr_auto_return = self.tuyastatus.get(TUYA_CODES.AUTO_RETURN) - self._attr_do_not_disturb = self.tuyastatus.get(TUYA_CODES.DO_NOT_DISTURB) - self._attr_boost_iq = self.tuyastatus.get(TUYA_CODES.BOOST_IQ) - # self.map_data = self.tuyastatus.get("121") - # self.erro_msg? = self.tuyastatus.get("124") - if self.robovac_supported & RoboVacEntityFeature.CONSUMABLES: - for CONSUMABLE_CODE in TUYA_CONSUMABLES_CODES: - if ( - CONSUMABLE_CODE in self.tuyastatus - and self.tuyastatus.get(CONSUMABLE_CODE) is not None - ): - self._attr_consumables = ast.literal_eval( - base64.b64decode( - self.tuyastatus.get(CONSUMABLE_CODE) - ).decode("ascii") - )["consumable"]["duration"] - - self.async_write_ha_state() + self.update_entity_values() except TuyaException as e: self.update_failures += 1 _LOGGER.debug( @@ -360,31 +326,64 @@ class RoboVacEntity(StateVacuumEntity): self.error_code = "CONNECTION_FAILED" raise e + async def pushed_update_handler(self): + self.update_entity_values() + self.async_write_ha_state() + + def update_entity_values(self): + self.tuyastatus = self.vacuum._dps + + # for 15C + self._attr_battery_level = self.tuyastatus.get(TUYA_CODES.BATTERY_LEVEL) + self.tuya_state = self.tuyastatus.get(TUYA_CODES.STATE) + self.error_code = self.tuyastatus.get(TUYA_CODES.ERROR_CODE) + self._attr_mode = self.tuyastatus.get(TUYA_CODES.MODE) + self._attr_fan_speed = self.tuyastatus.get(TUYA_CODES.FAN_SPEED) + if self.fan_speed == "No_suction": + self._attr_fan_speed = "No Suction" + elif self.fan_speed == "Boost_IQ": + self._attr_fan_speed = "Boost IQ" + elif self.fan_speed == "Quiet": + self._attr_fan_speed = "Pure" + # for G30 + self._attr_cleaning_area = self.tuyastatus.get(TUYA_CODES.CLEANING_AREA) + self._attr_cleaning_time = self.tuyastatus.get(TUYA_CODES.CLEANING_TIME) + self._attr_auto_return = self.tuyastatus.get(TUYA_CODES.AUTO_RETURN) + self._attr_do_not_disturb = self.tuyastatus.get(TUYA_CODES.DO_NOT_DISTURB) + self._attr_boost_iq = self.tuyastatus.get(TUYA_CODES.BOOST_IQ) + # self.map_data = self.tuyastatus.get("121") + # self.erro_msg? = self.tuyastatus.get("124") + if self.robovac_supported & RoboVacEntityFeature.CONSUMABLES: + for CONSUMABLE_CODE in TUYA_CONSUMABLES_CODES: + if ( + CONSUMABLE_CODE in self.tuyastatus + and self.tuyastatus.get(CONSUMABLE_CODE) is not None + ): + self._attr_consumables = ast.literal_eval( + base64.b64decode(self.tuyastatus.get(CONSUMABLE_CODE)).decode( + "ascii" + ) + )["consumable"]["duration"] + async def async_locate(self, **kwargs): """Locate the vacuum cleaner.""" _LOGGER.info("Locate Pressed") if self.tuyastatus.get("103"): - await self.vacuum.async_set({"103": False}, None) + await self.vacuum.async_set({"103": False}) else: - await self.vacuum.async_set({"103": True}, None) + await self.vacuum.async_set({"103": True}) async def async_return_to_base(self, **kwargs): """Set the vacuum cleaner to return to the dock.""" _LOGGER.info("Return home Pressed") - await self.vacuum.async_set({"101": True}, None) - await asyncio.sleep(1) - self.async_update + await self.vacuum.async_set({"101": True}) async def async_start(self, **kwargs): self._attr_mode = "auto" - await self.vacuum.async_set({"5": self.mode}, None) - await asyncio.sleep(1) - self.async_update + await self.vacuum.async_set({"5": self.mode}) async def async_pause(self, **kwargs): - await self.vacuum.async_set({"2": False}, None) - await asyncio.sleep(1) - self.async_update + await self.vacuum.async_set({"2": False}) async def async_stop(self, **kwargs): await self.async_return_to_base() @@ -392,9 +391,7 @@ class RoboVacEntity(StateVacuumEntity): async def async_clean_spot(self, **kwargs): """Perform a spot clean-up.""" _LOGGER.info("Spot Clean Pressed") - await self.vacuum.async_set({"5": "Spot"}, None) - await asyncio.sleep(1) - self.async_update + await self.vacuum.async_set({"5": "Spot"}) async def async_set_fan_speed(self, fan_speed, **kwargs): """Set fan speed.""" @@ -405,9 +402,7 @@ class RoboVacEntity(StateVacuumEntity): fan_speed = "Boost_IQ" elif fan_speed == "Pure": fan_speed = "Quiet" - await self.vacuum.async_set({"102": fan_speed}, None) - await asyncio.sleep(1) - self.async_update + await self.vacuum.async_set({"102": fan_speed}) async def async_send_command( self, command: str, params: dict | list | None = None, **kwargs @@ -415,28 +410,28 @@ class RoboVacEntity(StateVacuumEntity): """Send a command to a vacuum cleaner.""" _LOGGER.info("Send Command %s Pressed", command) if command == "edgeClean": - await self.vacuum.async_set({"5": "Edge"}, None) + await self.vacuum.async_set({"5": "Edge"}) elif command == "smallRoomClean": - await self.vacuum.async_set({"5": "SmallRoom"}, None) + await self.vacuum.async_set({"5": "SmallRoom"}) elif command == "autoClean": - await self.vacuum.async_set({"5": "auto"}, None) + await self.vacuum.async_set({"5": "auto"}) elif command == "autoReturn": if self.auto_return: - await self.vacuum.async_set({"135": False}, None) + await self.vacuum.async_set({"135": False}) else: - await self.vacuum.async_set({"135": True}, None) + await self.vacuum.async_set({"135": True}) elif command == "doNotDisturb": if self.do_not_disturb: - await self.vacuum.async_set({"139": "MEQ4MDAwMDAw"}, None) - await self.vacuum.async_set({"107": False}, None) + await self.vacuum.async_set({"139": "MEQ4MDAwMDAw"}) + await self.vacuum.async_set({"107": False}) else: - await self.vacuum.async_set({"139": "MTAwMDAwMDAw"}, None) - await self.vacuum.async_set({"107": True}, None) + await self.vacuum.async_set({"139": "MTAwMDAwMDAw"}) + await self.vacuum.async_set({"107": True}) elif command == "boostIQ": if self.boost_iq: - await self.vacuum.async_set({"118": False}, None) + await self.vacuum.async_set({"118": False}) else: - await self.vacuum.async_set({"118": True}, None) + await self.vacuum.async_set({"118": True}) elif command == "roomClean": roomIds = params.get("roomIds", [1]) count = params.get("count", 1) @@ -449,9 +444,7 @@ class RoboVacEntity(StateVacuumEntity): json_str = json.dumps(method_call, separators=(",", ":")) base64_str = base64.b64encode(json_str.encode("utf8")).decode("utf8") _LOGGER.info("roomClean call %s", json_str) - await self.vacuum.async_set({"124": base64_str}, None) - await asyncio.sleep(1) - self.async_update + await self.vacuum.async_set({"124": base64_str}) async def async_will_remove_from_hass(self): await self.vacuum.async_disconnect()