From 4231ff71ba4de58f36023ffe9dba0b73dec8a67f Mon Sep 17 00:00:00 2001 From: Luke Bonaccorsi Date: Fri, 4 Aug 2023 17:21:33 +0100 Subject: [PATCH] Final work to switch to StateVacuumEntity --- custom_components/robovac/vacuum.py | 251 +++++++++++----------------- 1 file changed, 96 insertions(+), 155 deletions(-) diff --git a/custom_components/robovac/vacuum.py b/custom_components/robovac/vacuum.py index 4b21aa7..873312a 100644 --- a/custom_components/robovac/vacuum.py +++ b/custom_components/robovac/vacuum.py @@ -25,7 +25,7 @@ import time import ast from typing import Any -from enum import IntEnum +from enum import IntEnum, StrEnum from homeassistant.loader import bind_hass from homeassistant.components.vacuum import ( StateVacuumEntity, @@ -78,6 +78,7 @@ class RoboVacEntityFeature(IntEnum): ATTR_BATTERY_ICON = "battery_icon" +ATTR_ERROR = "error" ATTR_FAN_SPEED = "fan_speed" ATTR_FAN_SPEED_LIST = "fan_speed_list" ATTR_STATUS = "status" @@ -96,17 +97,59 @@ _LOGGER = logging.getLogger(__name__) REFRESH_RATE = 20 SCAN_INTERVAL = timedelta(seconds=REFRESH_RATE) +ERROR_MESSAGES = { + "IP_ADDRESS": "IP Address not set", + 1:"Error: Front bumper stuck", + 2:"Error: Wheel stuck", + 3:"Error: Side brush", + 4:"Error: Rolling brush bar stuck", + 5:"Error: Device trapped", + 6:"Error: Device trapped", + 7:"Error: Wheel suspended", + 8:"Error: Low battery", + 9:"Error: Magnetic boundary", + 12:"Error: Right wall sensor", + 13:"Error: Device tilted", + 14:"Error: Insert dust collector", + 17:"Error: Restricted area detected", + 18:"Error: Laser cover stuck", + 19:"Error: Laser sesor stuck", + 20:"Error: Laser sensor blocked", + 21:"Error: Base blocked", + "S1":"Error: Battery", + "S2":"Error: Wheel Module", + "S3":"Error: Side Brush", + "S4":"Error: Suction Fan", + "S5":"Error: Rolling Brush", + "S8":"Error: Path Tracking Sensor", + "Wheel_stuck":"Error: Wheel stuck", + "R_brush_stuck":"Error: Rolling brush stuck", + "Crash_bar_stuck":"Error: Front bumper stuck", + "sensor_dirty":"Error: Sensor dirty", + "N_enough_pow":"Error: Low battery", + "Stuck_5_min":"Error: Device trapped", + "Fan_stuck":"Error: Fan stuck", + "S_brush_stuck":"Error: Side brush stuck", +} + +class TUYA_CODES(StrEnum): + BATTERY_LEVEL = "104" + STATE = "15" + ERROR_CODE = "106" + MODE = "5" + FAN_SPEED = "102" + CLEANING_AREA = "110" + CLEANING_TIME = "109" + AUTO_RETURN = "135" + DO_NOT_DISTURB = "107" + BOOST_IQ = "118" + G_CONSUMABLES = "142" + X_CONSUMABLES = "116" + class robovac(TuyaDevice): """""" - -@bind_hass -def is_on(hass: HomeAssistant, entity_id: str) -> bool: - """Return if the vacuum is on based on the statemachine.""" - return hass.states.is_state(entity_id, STATE_ON) - - async def async_setup_entry( hass: HomeAssistant, config_entry: ConfigEntry, @@ -192,10 +235,24 @@ class RoboVacEntity(StateVacuumEntity): return self._attr_ip_address @property - def state_attributes(self) -> dict[str, Any]: - """Return the state attributes of the vacuum cleaner.""" - data = super().state_attributes - # data: dict[str, Any] = {} + def state(self) -> str | None: + if self.tuya_state is None or (type(self.error_code) is not None and self.error_code not in [0, "no_error"]): + return STATE_ERROR + elif self.tuya_state == "Charging" or self.tuya_state == "completed": + return STATE_DOCKED + elif self.tuya_state == "Recharge": + return STATE_RETURNING + elif self.tuya_state == "Sleeping" or self.tuya_state == "standby": + return STATE_IDLE + else: + return STATE_CLEANING + + @property + def extra_state_attributes(self) -> dict[str, Any]: + """Return the device-specific state attributes of this vacuum.""" + data: dict[str, Any] = {} + data[ATTR_ERROR] = ERROR_MESSAGES.get(self.error_code, self.error_code) + if self.supported_features & VacuumEntityFeature.BATTERY: data[ATTR_BATTERY_LEVEL] = self.battery_level data[ATTR_BATTERY_ICON] = self.battery_icon @@ -218,29 +275,10 @@ class RoboVacEntity(StateVacuumEntity): data[ATTR_MODE] = self.mode return data - @property - def capability_attributes(self) -> Mapping[str, Any] | None: - """Return capability attributes.""" - if self.supported_features & VacuumEntityFeature.FAN_SPEED: - return { - ATTR_FAN_SPEED_LIST: self.fan_speed_list, - # CONF_ACCESS_TOKEN: self.access_token, - CONF_IP_ADDRESS: self.ip_address, - ATTR_MODEL_CODE: self.model_code, - } - else: - return { - # CONF_ACCESS_TOKEN: self.access_token, - CONF_IP_ADDRESS: self.ip_address, - ATTR_MODEL_CODE: self.model_code, - } - def __init__(self, item) -> None: - """Initialize mytest2 Sensor.""" + """Initialize Eufy Robovac""" super().__init__() - self._extra_state_attributes = {} self._attr_battery_level = 0 - self._attr_is_on = False self._attr_name = item[CONF_NAME] self._attr_unique_id = item[CONF_ID] self._attr_supported_features = 0 @@ -357,11 +395,11 @@ class RoboVacEntity(StateVacuumEntity): self.tuyastatus = self.vacuum._dps print("Tuya local API Result:", self.tuyastatus) # for 15C - self._attr_battery_level = self.tuyastatus.get("104") - self.tuya_state = self.tuyastatus.get("15") - self.error_code = self.tuyastatus.get("106") - self._attr_mode = self.tuyastatus.get("5") - self._attr_fan_speed = self.tuyastatus.get("102") + self._attr_battery_level = self.tuyastatus.get(TUYA_CODES.BATTERY_LEVEL) + self.tuya_state = self.tuyastatus.get(TUYA_CODES.STATE) + self.error_code = self.tuyastatus.get(TUYA_CODES.ERROR_CODE) + self._attr_mode = self.tuyastatus.get(TUYA_CODES.MODE) + self._attr_fan_speed = self.tuyastatus.get(TUYA_CODES.FAN_SPEED) if self.fan_speed == "No_suction": self._attr_fan_speed = "No Suction" elif self.fan_speed == "Boost_IQ": @@ -369,17 +407,17 @@ class RoboVacEntity(StateVacuumEntity): elif self.fan_speed == "Quiet": self._attr_fan_speed = "Pure" # for G30 - self._attr_cleaning_area = self.tuyastatus.get("110") - self._attr_cleaning_time = self.tuyastatus.get("109") - self._attr_auto_return = self.tuyastatus.get("135") - self._attr_do_not_disturb = self.tuyastatus.get("107") + self._attr_cleaning_area = self.tuyastatus.get(TUYA_CODES.CLEANING_AREA) + self._attr_cleaning_time = self.tuyastatus.get(TUYA_CODES.CLEANING_TIME) + self._attr_auto_return = self.tuyastatus.get(TUYA_CODES.AUTO_RETURN) + self._attr_do_not_disturb = self.tuyastatus.get(TUYA_CODES.DO_NOT_DISTURB) if self.tuyastatus.get("142") is not None: self._attr_consumables = ast.literal_eval( base64.b64decode(self.tuyastatus.get("142")).decode("ascii") )["consumable"]["duration"] print(self.consumables) # For X8 - self._attr_boost_iq = self.tuyastatus.get("118") + self._attr_boost_iq = self.tuyastatus.get(TUYA_CODES.BOOST_IQ) # self.map_data = self.tuyastatus.get("121") # self.erro_msg? = self.tuyastatus.get("124") if self.tuyastatus.get("116") is not None: @@ -388,105 +426,6 @@ class RoboVacEntity(StateVacuumEntity): )["consumable"]["duration"] print(self.consumables) - @property - def status(self): - """Return the status of the vacuum cleaner.""" - print("status:", self.error_code, self.tuya_state) - if self.ip_address == "": - return "Error: Set the IP Address" - if type(self.error_code) is not None and self.error_code not in [0, "no_error"]: - self._attr_is_on = False - if self.error_code == 1: - return "Error: Front bumper stuck" - elif self.error_code == 2: - return "Error: Wheel stuck" - elif self.error_code == 3: - return "Error: Side brush" - elif self.error_code == 4: - return "Error: Rolling brush bar stuck" - elif self.error_code == 5: - return "Error: Device trapped" - elif self.error_code == 6: - return "Error: Device trapped" - elif self.error_code == 7: - return "Error: Wheel suspended" - elif self.error_code == 8: - return "Error: Low battery" - elif self.error_code == 9: - return "Error: Magnetic boundary" - elif self.error_code == 12: - return "Error: Right wall sensor" - elif self.error_code == 13: - return "Error: Device tilted" - elif self.error_code == 14: - return "Error: Insert dust collector" - elif self.error_code == 17: - return "Error: Restricted area detected" - elif self.error_code == 18: - return "Error: Laser cover stuck" - elif self.error_code == 19: - return "Error: Laser sesor stuck" - elif self.error_code == 20: - return "Error: Laser sensor blocked" - elif self.error_code == 21: - return "Error: Base blocked" - elif self.error_code == "S1": - return "Error: Battery" - elif self.error_code == "S2": - return "Error: Wheel Module" - elif self.error_code == "S3": - return "Error: Side Brush" - elif self.error_code == "S4": - return "Error: Suction Fan" - elif self.error_code == "S5": - return "Error: Rolling Brush" - elif self.error_code == "S8": - return "Error: Path Tracking Sensor" - elif self.error_code == "Wheel_stuck": - return "Error: Wheel stuck" - elif self.error_code == "R_brush_stuck": - return "Error: Rolling brush stuck" - elif self.error_code == "Crash_bar_stuck": - return "Error: Front bumper stuck" - elif self.error_code == "sensor_dirty": - return "Error: Sensor dirty" - elif self.error_code == "N_enough_pow": - return "Error: Low battery" - elif self.error_code == "Stuck_5_min": - return "Error: Device trapped" - elif self.error_code == "Fan_stuck": - return "Error: Fan stuck" - elif self.error_code == "S_brush_stuck": - return "Error: Side brush stuck" - else: - return "Error: " + str(self.error_code) - elif self.tuya_state == "Running": - self._attr_is_on = True - return "cleaning" - elif self.tuya_state == "Locating": - self._attr_is_on = True - return "cleaning" - elif self.tuya_state == "remote": - self._attr_is_on = True - return "cleaning" - elif self.tuya_state == "Charging": - self._attr_is_on = False - return "charging" - elif self.tuya_state == "completed": - self._attr_is_on = False - return "docked" - elif self.tuya_state == "Recharge": - self._attr_is_on = True - return "returning" - elif self.tuya_state == "Sleeping": - self._attr_is_on = False - return "paused" - elif self.tuya_state == "standby": - self._attr_is_on = False - return "paused" - else: - return "cleaning" - async def async_locate(self, **kwargs): """Locate the vacuum cleaner.""" print("Locate Pressed") @@ -504,23 +443,25 @@ class RoboVacEntity(StateVacuumEntity): await asyncio.sleep(1) self.async_update - async def async_start_pause(self, **kwargs): - """Pause the cleaning task or resume it.""" - print("Start/Pause Pressed") - _LOGGER.info("Start/Pause Pressed") - if self.tuya_state in ["Recharge", "Running", "Locating", "remote"]: - await self.vacuum.async_set({"2": False}, None) - else: - if self.mode == "Nosweep": - self._attr_mode = "auto" - elif self.mode == "room" and ( - self.status == "Charging" or self.status == "completed" - ): - self._attr_mode = "auto" - await self.vacuum.async_set({"5": self.mode}, None) + async def async_start(self, **kwargs): + if self.mode == "Nosweep": + self._attr_mode = "auto" + elif self.mode == "room" and ( + self.status == "Charging" or self.status == "completed" + ): + self._attr_mode = "auto" + await self.vacuum.async_set({"5": self.mode}, None) await asyncio.sleep(1) self.async_update + async def async_pause(self, **kwargs): + await self.vacuum.async_set({"2": False}, None) + await asyncio.sleep(1) + self.async_update + + async def async_stop(self, **kwargs): + await self.async_return_to_base() + async def async_clean_spot(self, **kwargs): """Perform a spot clean-up.""" print("Spot Clean Pressed")