Final work to switch to StateVacuumEntity

This commit is contained in:
Luke Bonaccorsi 2023-08-04 17:21:33 +01:00
parent d55a1ceda5
commit 4231ff71ba
1 changed files with 96 additions and 155 deletions

View File

@ -25,7 +25,7 @@ import time
import ast
from typing import Any
from enum import IntEnum
from enum import IntEnum, StrEnum
from homeassistant.loader import bind_hass
from homeassistant.components.vacuum import (
StateVacuumEntity,
@ -78,6 +78,7 @@ class RoboVacEntityFeature(IntEnum):
ATTR_BATTERY_ICON = "battery_icon"
ATTR_ERROR = "error"
ATTR_FAN_SPEED = "fan_speed"
ATTR_FAN_SPEED_LIST = "fan_speed_list"
ATTR_STATUS = "status"
@ -96,17 +97,59 @@ _LOGGER = logging.getLogger(__name__)
REFRESH_RATE = 20
SCAN_INTERVAL = timedelta(seconds=REFRESH_RATE)
ERROR_MESSAGES = {
"IP_ADDRESS": "IP Address not set",
1:"Error: Front bumper stuck",
2:"Error: Wheel stuck",
3:"Error: Side brush",
4:"Error: Rolling brush bar stuck",
5:"Error: Device trapped",
6:"Error: Device trapped",
7:"Error: Wheel suspended",
8:"Error: Low battery",
9:"Error: Magnetic boundary",
12:"Error: Right wall sensor",
13:"Error: Device tilted",
14:"Error: Insert dust collector",
17:"Error: Restricted area detected",
18:"Error: Laser cover stuck",
19:"Error: Laser sesor stuck",
20:"Error: Laser sensor blocked",
21:"Error: Base blocked",
"S1":"Error: Battery",
"S2":"Error: Wheel Module",
"S3":"Error: Side Brush",
"S4":"Error: Suction Fan",
"S5":"Error: Rolling Brush",
"S8":"Error: Path Tracking Sensor",
"Wheel_stuck":"Error: Wheel stuck",
"R_brush_stuck":"Error: Rolling brush stuck",
"Crash_bar_stuck":"Error: Front bumper stuck",
"sensor_dirty":"Error: Sensor dirty",
"N_enough_pow":"Error: Low battery",
"Stuck_5_min":"Error: Device trapped",
"Fan_stuck":"Error: Fan stuck",
"S_brush_stuck":"Error: Side brush stuck",
}
class TUYA_CODES(StrEnum):
BATTERY_LEVEL = "104"
STATE = "15"
ERROR_CODE = "106"
MODE = "5"
FAN_SPEED = "102"
CLEANING_AREA = "110"
CLEANING_TIME = "109"
AUTO_RETURN = "135"
DO_NOT_DISTURB = "107"
BOOST_IQ = "118"
G_CONSUMABLES = "142"
X_CONSUMABLES = "116"
class robovac(TuyaDevice):
""""""
@bind_hass
def is_on(hass: HomeAssistant, entity_id: str) -> bool:
"""Return if the vacuum is on based on the statemachine."""
return hass.states.is_state(entity_id, STATE_ON)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
@ -192,10 +235,24 @@ class RoboVacEntity(StateVacuumEntity):
return self._attr_ip_address
@property
def state_attributes(self) -> dict[str, Any]:
"""Return the state attributes of the vacuum cleaner."""
data = super().state_attributes
# data: dict[str, Any] = {}
def state(self) -> str | None:
if self.tuya_state is None or (type(self.error_code) is not None and self.error_code not in [0, "no_error"]):
return STATE_ERROR
elif self.tuya_state == "Charging" or self.tuya_state == "completed":
return STATE_DOCKED
elif self.tuya_state == "Recharge":
return STATE_RETURNING
elif self.tuya_state == "Sleeping" or self.tuya_state == "standby":
return STATE_IDLE
else:
return STATE_CLEANING
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the device-specific state attributes of this vacuum."""
data: dict[str, Any] = {}
data[ATTR_ERROR] = ERROR_MESSAGES.get(self.error_code, self.error_code)
if self.supported_features & VacuumEntityFeature.BATTERY:
data[ATTR_BATTERY_LEVEL] = self.battery_level
data[ATTR_BATTERY_ICON] = self.battery_icon
@ -218,29 +275,10 @@ class RoboVacEntity(StateVacuumEntity):
data[ATTR_MODE] = self.mode
return data
@property
def capability_attributes(self) -> Mapping[str, Any] | None:
"""Return capability attributes."""
if self.supported_features & VacuumEntityFeature.FAN_SPEED:
return {
ATTR_FAN_SPEED_LIST: self.fan_speed_list,
# CONF_ACCESS_TOKEN: self.access_token,
CONF_IP_ADDRESS: self.ip_address,
ATTR_MODEL_CODE: self.model_code,
}
else:
return {
# CONF_ACCESS_TOKEN: self.access_token,
CONF_IP_ADDRESS: self.ip_address,
ATTR_MODEL_CODE: self.model_code,
}
def __init__(self, item) -> None:
"""Initialize mytest2 Sensor."""
"""Initialize Eufy Robovac"""
super().__init__()
self._extra_state_attributes = {}
self._attr_battery_level = 0
self._attr_is_on = False
self._attr_name = item[CONF_NAME]
self._attr_unique_id = item[CONF_ID]
self._attr_supported_features = 0
@ -357,11 +395,11 @@ class RoboVacEntity(StateVacuumEntity):
self.tuyastatus = self.vacuum._dps
print("Tuya local API Result:", self.tuyastatus)
# for 15C
self._attr_battery_level = self.tuyastatus.get("104")
self.tuya_state = self.tuyastatus.get("15")
self.error_code = self.tuyastatus.get("106")
self._attr_mode = self.tuyastatus.get("5")
self._attr_fan_speed = self.tuyastatus.get("102")
self._attr_battery_level = self.tuyastatus.get(TUYA_CODES.BATTERY_LEVEL)
self.tuya_state = self.tuyastatus.get(TUYA_CODES.STATE)
self.error_code = self.tuyastatus.get(TUYA_CODES.ERROR_CODE)
self._attr_mode = self.tuyastatus.get(TUYA_CODES.MODE)
self._attr_fan_speed = self.tuyastatus.get(TUYA_CODES.FAN_SPEED)
if self.fan_speed == "No_suction":
self._attr_fan_speed = "No Suction"
elif self.fan_speed == "Boost_IQ":
@ -369,17 +407,17 @@ class RoboVacEntity(StateVacuumEntity):
elif self.fan_speed == "Quiet":
self._attr_fan_speed = "Pure"
# for G30
self._attr_cleaning_area = self.tuyastatus.get("110")
self._attr_cleaning_time = self.tuyastatus.get("109")
self._attr_auto_return = self.tuyastatus.get("135")
self._attr_do_not_disturb = self.tuyastatus.get("107")
self._attr_cleaning_area = self.tuyastatus.get(TUYA_CODES.CLEANING_AREA)
self._attr_cleaning_time = self.tuyastatus.get(TUYA_CODES.CLEANING_TIME)
self._attr_auto_return = self.tuyastatus.get(TUYA_CODES.AUTO_RETURN)
self._attr_do_not_disturb = self.tuyastatus.get(TUYA_CODES.DO_NOT_DISTURB)
if self.tuyastatus.get("142") is not None:
self._attr_consumables = ast.literal_eval(
base64.b64decode(self.tuyastatus.get("142")).decode("ascii")
)["consumable"]["duration"]
print(self.consumables)
# For X8
self._attr_boost_iq = self.tuyastatus.get("118")
self._attr_boost_iq = self.tuyastatus.get(TUYA_CODES.BOOST_IQ)
# self.map_data = self.tuyastatus.get("121")
# self.erro_msg? = self.tuyastatus.get("124")
if self.tuyastatus.get("116") is not None:
@ -388,105 +426,6 @@ class RoboVacEntity(StateVacuumEntity):
)["consumable"]["duration"]
print(self.consumables)
@property
def status(self):
"""Return the status of the vacuum cleaner."""
print("status:", self.error_code, self.tuya_state)
if self.ip_address == "":
return "Error: Set the IP Address"
if type(self.error_code) is not None and self.error_code not in [0, "no_error"]:
self._attr_is_on = False
if self.error_code == 1:
return "Error: Front bumper stuck"
elif self.error_code == 2:
return "Error: Wheel stuck"
elif self.error_code == 3:
return "Error: Side brush"
elif self.error_code == 4:
return "Error: Rolling brush bar stuck"
elif self.error_code == 5:
return "Error: Device trapped"
elif self.error_code == 6:
return "Error: Device trapped"
elif self.error_code == 7:
return "Error: Wheel suspended"
elif self.error_code == 8:
return "Error: Low battery"
elif self.error_code == 9:
return "Error: Magnetic boundary"
elif self.error_code == 12:
return "Error: Right wall sensor"
elif self.error_code == 13:
return "Error: Device tilted"
elif self.error_code == 14:
return "Error: Insert dust collector"
elif self.error_code == 17:
return "Error: Restricted area detected"
elif self.error_code == 18:
return "Error: Laser cover stuck"
elif self.error_code == 19:
return "Error: Laser sesor stuck"
elif self.error_code == 20:
return "Error: Laser sensor blocked"
elif self.error_code == 21:
return "Error: Base blocked"
elif self.error_code == "S1":
return "Error: Battery"
elif self.error_code == "S2":
return "Error: Wheel Module"
elif self.error_code == "S3":
return "Error: Side Brush"
elif self.error_code == "S4":
return "Error: Suction Fan"
elif self.error_code == "S5":
return "Error: Rolling Brush"
elif self.error_code == "S8":
return "Error: Path Tracking Sensor"
elif self.error_code == "Wheel_stuck":
return "Error: Wheel stuck"
elif self.error_code == "R_brush_stuck":
return "Error: Rolling brush stuck"
elif self.error_code == "Crash_bar_stuck":
return "Error: Front bumper stuck"
elif self.error_code == "sensor_dirty":
return "Error: Sensor dirty"
elif self.error_code == "N_enough_pow":
return "Error: Low battery"
elif self.error_code == "Stuck_5_min":
return "Error: Device trapped"
elif self.error_code == "Fan_stuck":
return "Error: Fan stuck"
elif self.error_code == "S_brush_stuck":
return "Error: Side brush stuck"
else:
return "Error: " + str(self.error_code)
elif self.tuya_state == "Running":
self._attr_is_on = True
return "cleaning"
elif self.tuya_state == "Locating":
self._attr_is_on = True
return "cleaning"
elif self.tuya_state == "remote":
self._attr_is_on = True
return "cleaning"
elif self.tuya_state == "Charging":
self._attr_is_on = False
return "charging"
elif self.tuya_state == "completed":
self._attr_is_on = False
return "docked"
elif self.tuya_state == "Recharge":
self._attr_is_on = True
return "returning"
elif self.tuya_state == "Sleeping":
self._attr_is_on = False
return "paused"
elif self.tuya_state == "standby":
self._attr_is_on = False
return "paused"
else:
return "cleaning"
async def async_locate(self, **kwargs):
"""Locate the vacuum cleaner."""
print("Locate Pressed")
@ -504,23 +443,25 @@ class RoboVacEntity(StateVacuumEntity):
await asyncio.sleep(1)
self.async_update
async def async_start_pause(self, **kwargs):
"""Pause the cleaning task or resume it."""
print("Start/Pause Pressed")
_LOGGER.info("Start/Pause Pressed")
if self.tuya_state in ["Recharge", "Running", "Locating", "remote"]:
await self.vacuum.async_set({"2": False}, None)
else:
if self.mode == "Nosweep":
self._attr_mode = "auto"
elif self.mode == "room" and (
self.status == "Charging" or self.status == "completed"
):
self._attr_mode = "auto"
await self.vacuum.async_set({"5": self.mode}, None)
async def async_start(self, **kwargs):
if self.mode == "Nosweep":
self._attr_mode = "auto"
elif self.mode == "room" and (
self.status == "Charging" or self.status == "completed"
):
self._attr_mode = "auto"
await self.vacuum.async_set({"5": self.mode}, None)
await asyncio.sleep(1)
self.async_update
async def async_pause(self, **kwargs):
await self.vacuum.async_set({"2": False}, None)
await asyncio.sleep(1)
self.async_update
async def async_stop(self, **kwargs):
await self.async_return_to_base()
async def async_clean_spot(self, **kwargs):
"""Perform a spot clean-up."""
print("Spot Clean Pressed")