diff --git a/custom_components/robovac/__init__.py b/custom_components/robovac/__init__.py new file mode 100644 index 0000000..a627561 --- /dev/null +++ b/custom_components/robovac/__init__.py @@ -0,0 +1,53 @@ +# Copyright 2022 Brendan McCluskey +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""The Eufy Robovac integration.""" +from __future__ import annotations + +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import Platform +from homeassistant.core import HomeAssistant +from .const import DOMAIN + +PLATFORMS = [Platform.VACUUM] + + +async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Set up Eufy Robovac from a config entry.""" + hass.data.setdefault(DOMAIN, {}) + + # hass_data = dict(entry.data) + # Registers update listener to update config entry when options are updated. + # unsub_options_update_listener = entry.add_update_listener(options_update_listener) + # Store a reference to the unsubscribe function to cleanup if an entry is unloaded. + # hass_data["unsub_options_update_listener"] = unsub_options_update_listener + # hass.data[DOMAIN][entry.entry_id] = hass_data + entry.async_on_unload(entry.add_update_listener(update_listener)) + + await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) + return True + + +async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Unload a config entry.""" + if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS): + """Nothing""" + return unload_ok + + +async def update_listener(hass, entry): + """Handle options update.""" + await async_unload_entry(hass, entry) + await async_setup_entry(hass, entry) diff --git a/custom_components/robovac/config_flow.py b/custom_components/robovac/config_flow.py new file mode 100644 index 0000000..49100e5 --- /dev/null +++ b/custom_components/robovac/config_flow.py @@ -0,0 +1,210 @@ +# Copyright 2022 Brendan McCluskey +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Config flow for Eufy Robovac integration.""" +from __future__ import annotations + +import logging +from typing import Any, Optional +from copy import deepcopy + +import voluptuous as vol +import homeassistant.helpers.config_validation as cv + +from homeassistant import config_entries +from homeassistant.core import callback +from homeassistant.core import HomeAssistant +from homeassistant.data_entry_flow import FlowResult +from homeassistant.exceptions import HomeAssistantError +from homeassistant.helpers.selector import selector + +from homeassistant.const import ( + CONF_ACCESS_TOKEN, + CONF_NAME, + CONF_ID, + CONF_MODEL, + CONF_USERNAME, + CONF_PASSWORD, + CONF_IP_ADDRESS, + CONF_DESCRIPTION, + CONF_MAC, + CONF_LOCATION, + CONF_CLIENT_ID, +) + +from .const import DOMAIN, CONF_VACS, CONF_PHONE_CODE + +from .tuyawebapi import TuyaAPISession +from .eufywebapi import EufyLogon + +_LOGGER = logging.getLogger(__name__) + +USER_SCHEMA = vol.Schema( + { + vol.Required(CONF_USERNAME): cv.string, + vol.Required(CONF_PASSWORD): cv.string, + } +) + + +def get_eufy_vacuums(self): + """Login to Eufy and get the vacuum details""" + + eufy_session = EufyLogon(self["username"], self["password"]) + response = eufy_session.get_user_info() + if response.status_code != 200: + raise CannotConnect + + user_response = response.json() + if user_response["res_code"] != 1: + raise InvalidAuth + + response = eufy_session.get_device_info( + user_response["user_info"]["request_host"], + user_response["user_info"]["id"], + user_response["access_token"], + ) + + device_response = response.json() + self[CONF_CLIENT_ID] = user_response["user_info"]["id"] + self[CONF_PHONE_CODE] = user_response["user_info"]["phone_code"] + + # self[CONF_VACS] = {} + items = device_response["items"] + allvacs = {} + for item in items: + if item["device"]["product"]["appliance"] == "Cleaning": + vac_details = { + CONF_ID: item["device"]["id"], + CONF_MODEL: item["device"]["product"]["product_code"], + CONF_NAME: item["device"]["alias_name"], + CONF_DESCRIPTION: item["device"]["name"], + CONF_MAC: item["device"]["wifi"]["mac"], + CONF_IP_ADDRESS: "", + } + allvacs[item["device"]["id"]] = vac_details + self[CONF_VACS] = allvacs + + tuya_client = TuyaAPISession( + username="eh-" + self[CONF_CLIENT_ID], country_code=self[CONF_PHONE_CODE] + ) + for home in tuya_client.list_homes(): + for device in tuya_client.list_devices(home["groupId"]): + self[CONF_VACS][device["devId"]][CONF_ACCESS_TOKEN] = device["localKey"] + self[CONF_VACS][device["devId"]][CONF_LOCATION] = home["groupId"] + + return response + + +async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> dict[str, Any]: + """Validate the user input allows us to connect.""" + await hass.async_add_executor_job(get_eufy_vacuums, data) + return data + + +class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): + """Handle a config flow for Eufy Robovac.""" + + data: Optional[dict[str, Any]] + + async def async_step_user( + self, user_input: dict[str, Any] | None = None + ) -> FlowResult: + """Handle the initial step.""" + if user_input is None: + return self.async_show_form(step_id="user", data_schema=USER_SCHEMA) + errors = {} + try: + unique_id = user_input[CONF_USERNAME] + valid_data = await validate_input(self.hass, user_input) + except CannotConnect: + errors["base"] = "cannot_connect" + except InvalidAuth: + errors["base"] = "invalid_auth" + except Exception: # pylint: disable=broad-except + _LOGGER.exception("Unexpected exception") + errors["base"] = "unknown" + else: + await self.async_set_unique_id(unique_id) + self._abort_if_unique_id_configured() + # return await self.async_step_repo(valid_data) + return self.async_create_entry(title=unique_id, data=valid_data) + return self.async_show_form( + step_id="user", data_schema=USER_SCHEMA, errors=errors + ) + + @staticmethod + @callback + def async_get_options_flow(config_entry): + """Get the options flow for this handler.""" + return OptionsFlowHandler(config_entry) + + +class CannotConnect(HomeAssistantError): + """Error to indicate we cannot connect.""" + + +class InvalidAuth(HomeAssistantError): + """Error to indicate there is invalid auth.""" + + +class OptionsFlowHandler(config_entries.OptionsFlow): + """Handles options flow for the component.""" + + def __init__(self, config_entry: config_entries.ConfigEntry) -> None: + self.config_entry = config_entry + + async def async_step_init( + self, user_input: dict[str, Any] = None + ) -> dict[str, Any]: + """Manage the options for the custom component.""" + errors: dict[str, str] = {} + + vac_names = [] + vacuums = self.config_entry.data[CONF_VACS] + for item in vacuums: + item_settings = vacuums[item] + vac_names.append(item_settings["name"]) + if user_input is not None: + for item in vacuums: + item_settings = vacuums[item] + if item_settings["name"] == user_input["vacuum"]: + item_settings[CONF_IP_ADDRESS] = user_input[CONF_IP_ADDRESS] + updated_repos = deepcopy(self.config_entry.data[CONF_VACS]) + + # print("Updated", updated_repos) + if not errors: + # Value of data will be set on the options property of our config_entry + # instance. + return self.async_create_entry( + title="", + data={CONF_VACS: updated_repos}, + ) + + options_schema = vol.Schema( + { + vol.Optional("vacuum", default=1): selector( + { + "select": { + "options": vac_names, + } + } + ), + vol.Optional(CONF_IP_ADDRESS): cv.string, + } + ) + + return self.async_show_form( + step_id="init", data_schema=options_schema, errors=errors + ) diff --git a/custom_components/robovac/const.py b/custom_components/robovac/const.py new file mode 100644 index 0000000..a95a06d --- /dev/null +++ b/custom_components/robovac/const.py @@ -0,0 +1,5 @@ +"""Constants for the Eufy Robovac integration.""" + +DOMAIN = "robovac" +CONF_VACS = "vacuums" +CONF_PHONE_CODE = "phone_code" diff --git a/custom_components/robovac/eufywebapi.py b/custom_components/robovac/eufywebapi.py new file mode 100644 index 0000000..72345f8 --- /dev/null +++ b/custom_components/robovac/eufywebapi.py @@ -0,0 +1,41 @@ +"""Original Work from here: Andre Borie https://gitlab.com/Rjevski/eufy-device-id-and-local-key-grabber""" + +import requests + +eufyheaders = { + "User-Agent": "EufyHome-Android-2.4.0", + "timezone": "Europe/London", + "category": "Home", + "token": "", + "uid": "", + "openudid": "sdk_gphone64_arm64", + "clientType": "2", + "language": "en", + "country": "US", + "Accept-Encoding": "gzip", +} + + +class EufyLogon: + def __init__(self, username, password): + self.username = username + self.password = password + + def get_user_info(self): + login_url = "https://home-api.eufylife.com/v1/user/email/login" + login_auth = { + "client_Secret": "GQCpr9dSp3uQpsOMgJ4xQ", + "client_id": "eufyhome-app", + "email": self.username, + "password": self.password, + } + + return requests.post( + login_url, json=login_auth, headers=eufyheaders, timeout=1.5 + ) + + def get_device_info(self, url, userid, token): + device_url = url + "/v1/device/list/devices-and-groups" + eufyheaders["token"] = token + eufyheaders["id"] = userid + return requests.request("GET", device_url, headers=eufyheaders, timeout=1.5) diff --git a/custom_components/robovac/manifest.json b/custom_components/robovac/manifest.json new file mode 100644 index 0000000..58b201c --- /dev/null +++ b/custom_components/robovac/manifest.json @@ -0,0 +1,14 @@ +{ + "domain": "robovac", + "name": "Eufy Robovac", + "config_flow": true, + "documentation": "https://www.home-assistant.io/integrations/robovac", + "requirements": [], + "ssdp": [], + "zeroconf": [], + "homekit": {}, + "dependencies": [], + "codeowners": ["@bmccluskey"], + "iot_class": "local_polling", + "version": "1" +} diff --git a/custom_components/robovac/strings.json b/custom_components/robovac/strings.json new file mode 100644 index 0000000..8536026 --- /dev/null +++ b/custom_components/robovac/strings.json @@ -0,0 +1,21 @@ +{ + "config": { + "step": { + "user": { + "data": { + "username": "[%key:common::config_flow::data::username%]", + "password": "[%key:common::config_flow::data::password%]" + } + } + }, + "error": { + "cannot_connect": "[%key:common::config_flow::error::cannot_connect%]", + "invalid_auth": "[%key:common::config_flow::error::invalid_auth%]", + "unknown": "[%key:common::config_flow::error::unknown%]" + }, + "abort": { + "already_configured": "[%key:common::config_flow::abort::already_configured_device%]" + } + } +} + diff --git a/custom_components/robovac/translations/en.json b/custom_components/robovac/translations/en.json new file mode 100644 index 0000000..3905c80 --- /dev/null +++ b/custom_components/robovac/translations/en.json @@ -0,0 +1,37 @@ +{ + "config": { + "abort": { + "already_configured": "Device is already configured" + }, + "error": { + "cannot_connect": "Failed to connect", + "invalid_auth": "Invalid authentication", + "unknown": "Unexpected error" + }, + "step": { + "user": { + "data": { + "host": "Host", + "password": "Password", + "username": "Username" + }, + "description": "Enter your Eufy account details" + } + } + }, + "options": { + "error": { + "invalid_path": "The path provided is not valid. Should be in the format `user/repo-name` and should be a valid github repository." + }, + "step": { + "init": { + "title": "Manage IPs", + "data": { + "vacuum": "Select the Vacuum to edit", + "ip_address": "IP address of vacuum" + }, + "description": "Add or update a vacuums IP address" + } + } + } +} \ No newline at end of file diff --git a/custom_components/robovac/tuyalocalapi.py b/custom_components/robovac/tuyalocalapi.py new file mode 100644 index 0000000..4e62d3f --- /dev/null +++ b/custom_components/robovac/tuyalocalapi.py @@ -0,0 +1,772 @@ +# -*- coding: utf-8 -*- + +# Copyright 2019 Richard Mitchell +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Based on portions of https://github.com/codetheweb/tuyapi/ +# +# MIT License +# +# Copyright (c) 2017 Max Isom +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import asyncio +import base64 +import json +import logging +import socket +import struct +import sys +import time + +from cryptography.hazmat.backends.openssl import backend as openssl_backend +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives.hashes import Hash, MD5 +from cryptography.hazmat.primitives.padding import PKCS7 + + +_LOGGER = logging.getLogger(__name__) +MESSAGE_PREFIX_FORMAT = ">IIII" +MESSAGE_SUFFIX_FORMAT = ">II" +MAGIC_PREFIX = 0x000055AA +MAGIC_SUFFIX = 0x0000AA55 +MAGIC_SUFFIX_BYTES = struct.pack(">I", MAGIC_SUFFIX) +CRC_32_TABLE = [ + 0x00000000, + 0x77073096, + 0xEE0E612C, + 0x990951BA, + 0x076DC419, + 0x706AF48F, + 0xE963A535, + 0x9E6495A3, + 0x0EDB8832, + 0x79DCB8A4, + 0xE0D5E91E, + 0x97D2D988, + 0x09B64C2B, + 0x7EB17CBD, + 0xE7B82D07, + 0x90BF1D91, + 0x1DB71064, + 0x6AB020F2, + 0xF3B97148, + 0x84BE41DE, + 0x1ADAD47D, + 0x6DDDE4EB, + 0xF4D4B551, + 0x83D385C7, + 0x136C9856, + 0x646BA8C0, + 0xFD62F97A, + 0x8A65C9EC, + 0x14015C4F, + 0x63066CD9, + 0xFA0F3D63, + 0x8D080DF5, + 0x3B6E20C8, + 0x4C69105E, + 0xD56041E4, + 0xA2677172, + 0x3C03E4D1, + 0x4B04D447, + 0xD20D85FD, + 0xA50AB56B, + 0x35B5A8FA, + 0x42B2986C, + 0xDBBBC9D6, + 0xACBCF940, + 0x32D86CE3, + 0x45DF5C75, + 0xDCD60DCF, + 0xABD13D59, + 0x26D930AC, + 0x51DE003A, + 0xC8D75180, + 0xBFD06116, + 0x21B4F4B5, + 0x56B3C423, + 0xCFBA9599, + 0xB8BDA50F, + 0x2802B89E, + 0x5F058808, + 0xC60CD9B2, + 0xB10BE924, + 0x2F6F7C87, + 0x58684C11, + 0xC1611DAB, + 0xB6662D3D, + 0x76DC4190, + 0x01DB7106, + 0x98D220BC, + 0xEFD5102A, + 0x71B18589, + 0x06B6B51F, + 0x9FBFE4A5, + 0xE8B8D433, + 0x7807C9A2, + 0x0F00F934, + 0x9609A88E, + 0xE10E9818, + 0x7F6A0DBB, + 0x086D3D2D, + 0x91646C97, + 0xE6635C01, + 0x6B6B51F4, + 0x1C6C6162, + 0x856530D8, + 0xF262004E, + 0x6C0695ED, + 0x1B01A57B, + 0x8208F4C1, + 0xF50FC457, + 0x65B0D9C6, + 0x12B7E950, + 0x8BBEB8EA, + 0xFCB9887C, + 0x62DD1DDF, + 0x15DA2D49, + 0x8CD37CF3, + 0xFBD44C65, + 0x4DB26158, + 0x3AB551CE, + 0xA3BC0074, + 0xD4BB30E2, + 0x4ADFA541, + 0x3DD895D7, + 0xA4D1C46D, + 0xD3D6F4FB, + 0x4369E96A, + 0x346ED9FC, + 0xAD678846, + 0xDA60B8D0, + 0x44042D73, + 0x33031DE5, + 0xAA0A4C5F, + 0xDD0D7CC9, + 0x5005713C, + 0x270241AA, + 0xBE0B1010, + 0xC90C2086, + 0x5768B525, + 0x206F85B3, + 0xB966D409, + 0xCE61E49F, + 0x5EDEF90E, + 0x29D9C998, + 0xB0D09822, + 0xC7D7A8B4, + 0x59B33D17, + 0x2EB40D81, + 0xB7BD5C3B, + 0xC0BA6CAD, + 0xEDB88320, + 0x9ABFB3B6, + 0x03B6E20C, + 0x74B1D29A, + 0xEAD54739, + 0x9DD277AF, + 0x04DB2615, + 0x73DC1683, + 0xE3630B12, + 0x94643B84, + 0x0D6D6A3E, + 0x7A6A5AA8, + 0xE40ECF0B, + 0x9309FF9D, + 0x0A00AE27, + 0x7D079EB1, + 0xF00F9344, + 0x8708A3D2, + 0x1E01F268, + 0x6906C2FE, + 0xF762575D, + 0x806567CB, + 0x196C3671, + 0x6E6B06E7, + 0xFED41B76, + 0x89D32BE0, + 0x10DA7A5A, + 0x67DD4ACC, + 0xF9B9DF6F, + 0x8EBEEFF9, + 0x17B7BE43, + 0x60B08ED5, + 0xD6D6A3E8, + 0xA1D1937E, + 0x38D8C2C4, + 0x4FDFF252, + 0xD1BB67F1, + 0xA6BC5767, + 0x3FB506DD, + 0x48B2364B, + 0xD80D2BDA, + 0xAF0A1B4C, + 0x36034AF6, + 0x41047A60, + 0xDF60EFC3, + 0xA867DF55, + 0x316E8EEF, + 0x4669BE79, + 0xCB61B38C, + 0xBC66831A, + 0x256FD2A0, + 0x5268E236, + 0xCC0C7795, + 0xBB0B4703, + 0x220216B9, + 0x5505262F, + 0xC5BA3BBE, + 0xB2BD0B28, + 0x2BB45A92, + 0x5CB36A04, + 0xC2D7FFA7, + 0xB5D0CF31, + 0x2CD99E8B, + 0x5BDEAE1D, + 0x9B64C2B0, + 0xEC63F226, + 0x756AA39C, + 0x026D930A, + 0x9C0906A9, + 0xEB0E363F, + 0x72076785, + 0x05005713, + 0x95BF4A82, + 0xE2B87A14, + 0x7BB12BAE, + 0x0CB61B38, + 0x92D28E9B, + 0xE5D5BE0D, + 0x7CDCEFB7, + 0x0BDBDF21, + 0x86D3D2D4, + 0xF1D4E242, + 0x68DDB3F8, + 0x1FDA836E, + 0x81BE16CD, + 0xF6B9265B, + 0x6FB077E1, + 0x18B74777, + 0x88085AE6, + 0xFF0F6A70, + 0x66063BCA, + 0x11010B5C, + 0x8F659EFF, + 0xF862AE69, + 0x616BFFD3, + 0x166CCF45, + 0xA00AE278, + 0xD70DD2EE, + 0x4E048354, + 0x3903B3C2, + 0xA7672661, + 0xD06016F7, + 0x4969474D, + 0x3E6E77DB, + 0xAED16A4A, + 0xD9D65ADC, + 0x40DF0B66, + 0x37D83BF0, + 0xA9BCAE53, + 0xDEBB9EC5, + 0x47B2CF7F, + 0x30B5FFE9, + 0xBDBDF21C, + 0xCABAC28A, + 0x53B39330, + 0x24B4A3A6, + 0xBAD03605, + 0xCDD70693, + 0x54DE5729, + 0x23D967BF, + 0xB3667A2E, + 0xC4614AB8, + 0x5D681B02, + 0x2A6F2B94, + 0xB40BBE37, + 0xC30C8EA1, + 0x5A05DF1B, + 0x2D02EF8D, +] + + +class TuyaException(Exception): + """Base for Tuya exceptions.""" + + +class InvalidKey(TuyaException): + """The local key is invalid.""" + + +class InvalidMessage(TuyaException): + """The message received is invalid.""" + + +class MessageDecodeFailed(TuyaException): + """The message received cannot be decoded as JSON.""" + + +class ConnectionException(TuyaException): + """The socket connection failed.""" + + +class ConnectionTimeoutException(ConnectionException): + """The socket connection timed out.""" + + +class RequestResponseCommandMismatch(TuyaException): + """The command in the response didn't match the one from the request.""" + + +class TuyaCipher: + """Tuya cryptographic helpers.""" + + def __init__(self, key, version): + """Initialize the cipher.""" + self.version = version + self.key = key + self.cipher = Cipher( + algorithms.AES(key.encode("ascii")), modes.ECB(), backend=openssl_backend + ) + + def get_prefix_size_and_validate(self, command, encrypted_data): + try: + version = tuple(map(int, encrypted_data[:3].decode("utf8").split("."))) + except ValueError: + version = (0, 0) + if version != self.version: + return 0 + if version < (3, 3): + hash = encrypted_data[3:19].decode("ascii") + expected_hash = self.hash(encrypted_data[19:]) + if hash != expected_hash: + return 0 + return 19 + else: + if command in (Message.SET_COMMAND, Message.GRATUITOUS_UPDATE): + _, sequence, __, ___ = struct.unpack_from(">IIIH", encrypted_data, 3) + return 15 + return 0 + + def decrypt(self, command, data): + prefix_size = self.get_prefix_size_and_validate(command, data) + data = data[prefix_size:] + decryptor = self.cipher.decryptor() + if self.version < (3, 3): + data = base64.b64decode(data) + decrypted_data = decryptor.update(data) + decrypted_data += decryptor.finalize() + unpadder = PKCS7(128).unpadder() + unpadded_data = unpadder.update(decrypted_data) + unpadded_data += unpadder.finalize() + + return unpadded_data + + def encrypt(self, command, data): + encrypted_data = b"" + if data: + padder = PKCS7(128).padder() + padded_data = padder.update(data) + padded_data += padder.finalize() + encryptor = self.cipher.encryptor() + encrypted_data = encryptor.update(padded_data) + encrypted_data += encryptor.finalize() + + prefix = ".".join(map(str, self.version)).encode("utf8") + if self.version < (3, 3): + payload = base64.b64encode(encrypted_data) + hash = self.hash(payload) + prefix += hash.encode("utf8") + else: + payload = encrypted_data + if command in (Message.SET_COMMAND, Message.GRATUITOUS_UPDATE): + prefix += b"\x00" * 12 + else: + prefix = b"" + + return prefix + payload + + def hash(self, data): + digest = Hash(MD5(), backend=openssl_backend) + to_hash = "data={}||lpv={}||{}".format( + data.decode("ascii"), ".".join(map(str, self.version)), self.key + ) + digest.update(to_hash.encode("utf8")) + intermediate = digest.finalize().hex() + return intermediate[8:24] + + +def crc(data): + """Calculate the Tuya-flavored CRC of some data.""" + c = 0xFFFFFFFF + for b in data: + c = (c >> 8) ^ CRC_32_TABLE[(c ^ b) & 255] + + return c ^ 0xFFFFFFFF + + +class Message: + + PING_COMMAND = 0x09 + GET_COMMAND = 0x0A + SET_COMMAND = 0x07 + GRATUITOUS_UPDATE = 0x08 + + def __init__(self, command, payload=None, sequence=None, encrypt_for=None): + if payload is None: + payload = b"" + self.payload = payload + self.command = command + if sequence is None: + # Use millisecond process time as the sequence number. Not ideal, + # but good for one month's continuous connection time though. + sequence = int(time.perf_counter() * 1000) & 0xFFFFFFFF + self.sequence = sequence + self.encrypt = False + self.device = None + if encrypt_for is not None: + self.device = encrypt_for + self.encrypt = True + + def __repr__(self): + return "{}({}, {!r}, {!r}, {})".format( + self.__class__.__name__, + hex(self.command), + self.payload, + self.sequence, + "".format(self.device) if self.device else None, + ) + + def hex(self): + return self.bytes().hex() + + def bytes(self): + payload_data = self.payload + if isinstance(payload_data, dict): + payload_data = json.dumps(payload_data, separators=(",", ":")) + if not isinstance(payload_data, bytes): + payload_data = payload_data.encode("utf8") + + if self.encrypt: + payload_data = self.device.cipher.encrypt(self.command, payload_data) + + payload_size = len(payload_data) + struct.calcsize(MESSAGE_SUFFIX_FORMAT) + + header = struct.pack( + MESSAGE_PREFIX_FORMAT, + MAGIC_PREFIX, + self.sequence, + self.command, + payload_size, + ) + if self.device and self.device.version >= (3, 3): + checksum = crc(header + payload_data) + else: + checksum = crc(payload_data) + footer = struct.pack(MESSAGE_SUFFIX_FORMAT, checksum, MAGIC_SUFFIX) + return header + payload_data + footer + + __bytes__ = bytes + + class AsyncWrappedCallback: + def __init__(self, request, callback): + self.request = request + self.callback = callback + self.devices = [] + + def register(self, device): + self.devices.append(device) + device._handlers.setdefault(self.request.command, []) + device._handlers[self.request.command].append(self) + + def unregister(self, device): + self.devices.remove(device) + device._handlers[self.request.command].remove(self) + + def unregister_all(self): + while self.devices: + device = self.devices.pop() + device._handlers[self.request.command].remove(self) + + async def __call__(self, response, device): + if response.sequence == self.request.sequence: + asyncio.ensure_future(self.callback(response, device)) + self.unregister(device) + + async def async_send(self, device, callback=None): + if callback is not None: + wrapped = self.AsyncWrappedCallback(self, callback) + wrapped.register(device) + await device._async_send(self) + + @classmethod + def from_bytes(cls, data, cipher=None): + try: + prefix, sequence, command, payload_size = struct.unpack_from( + MESSAGE_PREFIX_FORMAT, data + ) + except struct.error as e: + raise InvalidMessage("Invalid message header format.") from e + if prefix != MAGIC_PREFIX: + raise InvalidMessage("Magic prefix missing from message.") + + # check for an optional return code + header_size = struct.calcsize(MESSAGE_PREFIX_FORMAT) + try: + (return_code,) = struct.unpack_from(">I", data, header_size) + except struct.error as e: + raise InvalidMessage("Unable to unpack return code.") from e + if return_code >> 8: + payload_data = data[ + header_size : header_size + + payload_size + - struct.calcsize(MESSAGE_SUFFIX_FORMAT) + ] + return_code = None + else: + payload_data = data[ + header_size + + struct.calcsize(">I") : header_size + + payload_size + - struct.calcsize(MESSAGE_SUFFIX_FORMAT) + ] + + try: + expected_crc, suffix = struct.unpack_from( + MESSAGE_SUFFIX_FORMAT, + data, + header_size + payload_size - struct.calcsize(MESSAGE_SUFFIX_FORMAT), + ) + except struct.error as e: + raise InvalidMessage("Invalid message suffix format.") from e + if suffix != MAGIC_SUFFIX: + raise InvalidMessage("Magic suffix missing from message") + + actual_crc = crc( + data[: header_size + payload_size - struct.calcsize(MESSAGE_SUFFIX_FORMAT)] + ) + if expected_crc != actual_crc: + raise InvalidMessage("CRC check failed") + + payload = None + if payload_data: + try: + payload_data = cipher.decrypt(command, payload_data) + except ValueError as e: + pass + try: + payload_text = payload_data.decode("utf8") + except UnicodeDecodeError as e: + _LOGGER.debug(payload_data.hex()) + _LOGGER.error(e) + raise MessageDecodeFailed() from e + try: + payload = json.loads(payload_text) + except json.decoder.JSONDecodeError as e: + # data may be encrypted + _LOGGER.debug(payload_data.hex()) + _LOGGER.error(e) + raise MessageDecodeFailed() from e + + return cls(command, payload, sequence) + + +def _call_async(fn, *args): + loop = None + if sys.version_info >= (3, 7): + try: + loop = asyncio.get_running_loop() + except RuntimeError: + pass + + loop = asyncio.get_event_loop() + + def wrapper(fn, *args): + asyncio.ensure_future(fn(*args)) + + loop.call_soon(wrapper, fn, *args) + + +class TuyaDevice: + """Represents a generic Tuya device.""" + + # PING_INTERVAL = 10 + + def __init__( + self, + device_id, + host, + timeout, + ping_interval, + local_key=None, + port=6668, + gateway_id=None, + version=(3, 3), + ): + """Initialize the device.""" + self.device_id = device_id + self.host = host + self.port = port + if not gateway_id: + gateway_id = self.device_id + self.gateway_id = gateway_id + self.version = version + self.timeout = timeout + self.last_pong = 0 + self.ping_interval = ping_interval + + if len(local_key) != 16: + raise InvalidKey("Local key should be a 16-character string") + + self.cipher = TuyaCipher(local_key, self.version) + self.writer = None + self._handlers = { + Message.GET_COMMAND: [self.async_update_state], + Message.GRATUITOUS_UPDATE: [self.async_update_state], + Message.PING_COMMAND: [self._async_pong_received], + } + self._dps = {} + self._connected = False + + def __repr__(self): + return "{}({!r}, {!r}, {!r}, {!r})".format( + self.__class__.__name__, + self.device_id, + self.host, + self.port, + self.cipher.key, + ) + + def __str__(self): + return "{} ({}:{})".format(self.device_id, self.host, self.port) + + async def async_connect(self, callback=None): + if self._connected: + return + sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM) + sock.settimeout(self.timeout) + _LOGGER.debug("Connecting to {}".format(self)) + try: + sock.connect((self.host, self.port)) + except socket.timeout as e: + raise ConnectionTimeoutException("Connection timed out") from e + self.reader, self.writer = await asyncio.open_connection(sock=sock) + self._connected = True + asyncio.ensure_future(self._async_handle_message()) + asyncio.ensure_future(self._async_ping(self.ping_interval)) + asyncio.ensure_future(self.async_get(callback)) + + async def async_disconnect(self): + _LOGGER.debug("Disconnected from {}".format(self)) + self._connected = False + self.last_pong = 0 + if self.writer is not None: + self.writer.close() + + async def async_get(self, callback=None): + payload = {"gwId": self.gateway_id, "devId": self.device_id} + maybe_self = None if self.version < (3, 3) else self + message = Message(Message.GET_COMMAND, payload, encrypt_for=maybe_self) + return await message.async_send(self, callback) + + async def async_set(self, dps, callback=None): + t = int(time.time()) + payload = {"devId": self.device_id, "uid": "", "t": t, "dps": dps} + message = Message(Message.SET_COMMAND, payload, encrypt_for=self) + await message.async_send(self, callback) + + def set(self, dps): + _call_async(self.async_set, dps) + + async def _async_ping(self, ping_interval): + # print("ping") + self.last_ping = time.time() + maybe_self = None if self.version < (3, 3) else self + message = Message(Message.PING_COMMAND, sequence=0, encrypt_for=maybe_self) + await self._async_send(message) + await asyncio.sleep(ping_interval) + if self.last_pong < self.last_ping: + await self.async_disconnect() + else: + asyncio.ensure_future(self._async_ping(self.ping_interval)) + + async def _async_pong_received(self, message, device): + self.last_pong = time.time() + + async def async_update_state(self, state_message, _): + self._dps.update(state_message.payload["dps"]) + _LOGGER.info("Received updated state {}: {}".format(self, self._dps)) + + @property + def state(self): + return dict(self._dps) + + @state.setter + def state_setter(self, new_values): + asyncio.ensure_future(self.async_set(new_values)) + + async def _async_handle_message(self): + try: + response_data = await self.reader.readuntil(MAGIC_SUFFIX_BYTES) + except socket.error as e: + _LOGGER.error("Connection to {} failed: {}".format(self, e)) + asyncio.ensure_future(self.async_disconnect()) + return + except asyncio.IncompleteReadError as e: + _LOGGER.error("Incomplete read from: {} : {}".format(self, e)) + return + + try: + message = Message.from_bytes(response_data, self.cipher) + except InvalidMessage as e: + _LOGGER.error("Invalid message from {}: {}".format(self, e)) + except MessageDecodeFailed as e: + _LOGGER.error("Failed to decrypt message from {}".format(self)) + else: + _LOGGER.debug("Received message from {}: {}".format(self, message)) + for c in self._handlers.get(message.command, []): + asyncio.ensure_future(c(message, self)) + + asyncio.ensure_future(self._async_handle_message()) + + async def _async_send(self, message, retries=4): + try: + await self.async_connect() + except (socket.timeout, socket.error, OSError) as e: + if retries == 0: + raise ConnectionException( + "Failed to send data to {}".format(self) + ) from e + await self.async_connect() + await self._async_send(message, retries=retries - 1) + _LOGGER.debug("Sending to {}: {}".format(self, message)) + self.writer.write(message.bytes()) diff --git a/custom_components/robovac/tuyawebapi.py b/custom_components/robovac/tuyawebapi.py new file mode 100644 index 0000000..e44fb2a --- /dev/null +++ b/custom_components/robovac/tuyawebapi.py @@ -0,0 +1,223 @@ +"""Original Work from here: Andre Borie https://gitlab.com/Rjevski/eufy-device-id-and-local-key-grabber""" + +from hashlib import md5, sha256 +import hmac +import json +import math +import random +import string +import time +import uuid + +from cryptography.hazmat.backends.openssl import backend as openssl_backend +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +import requests + +TUYA_INITIAL_BASE_URL = "https://a1.tuyaeu.com" + +EUFY_HMAC_KEY = ( + "A_cepev5pfnhua4dkqkdpmnrdxx378mpjr_s8x78u7xwymasd9kqa7a73pjhxqsedaj".encode() +) + + +def unpadded_rsa(key_exponent: int, key_n: int, plaintext: bytes) -> bytes: + keylength = math.ceil(key_n.bit_length() / 8) + input_nr = int.from_bytes(plaintext, byteorder="big") + crypted_nr = pow(input_nr, key_exponent, key_n) + return crypted_nr.to_bytes(keylength, byteorder="big") + + +def shuffled_md5(value: str) -> str: + _hash = md5(value.encode("utf-8")).hexdigest() + return _hash[8:16] + _hash[0:8] + _hash[24:32] + _hash[16:24] + + +TUYA_PASSWORD_INNER_CIPHER = Cipher( + algorithms.AES( + bytearray( + [36, 78, 109, 138, 86, 172, 135, 145, 36, 67, 45, 139, 108, 188, 162, 196] + ) + ), + modes.CBC( + bytearray( + [119, 36, 86, 242, 167, 102, 76, 243, 57, 44, 53, 151, 233, 62, 87, 71] + ) + ), + backend=openssl_backend, +) + +DEFAULT_TUYA_HEADERS = {"User-Agent": "TY-UA=APP/Android/2.4.0/SDK/null"} + +SIGNATURE_RELEVANT_PARAMETERS = { + "a", + "v", + "lat", + "lon", + "lang", + "deviceId", + "appVersion", + "ttid", + "isH5", + "h5Token", + "os", + "clientId", + "postData", + "time", + "requestId", + "et", + "n4h5", + "sid", + "sp", +} + +DEFAULT_TUYA_QUERY_PARAMS = { + "appVersion": "2.4.0", + "deviceId": "", + "platform": "sdk_gphone64_arm64", + "clientId": "yx5v9uc3ef9wg3v9atje", + "lang": "en", + "osSystem": "12", + "os": "Android", + "timeZoneId": "Europe/London", + "ttid": "android", + "et": "0.0.1", + "sdkVersion": "3.0.8cAnker", +} + + +class TuyaAPISession: + + username = None + country_code = None + session_id = None + + def __init__(self, username, country_code): + self.session = requests.session() + self.session.headers = DEFAULT_TUYA_HEADERS.copy() + self.default_query_params = DEFAULT_TUYA_QUERY_PARAMS.copy() + self.default_query_params["deviceId"] = self.generate_new_device_id() + self.username = username + self.country_code = country_code + self.base_url = TUYA_INITIAL_BASE_URL + + @staticmethod + def generate_new_device_id(): + expected_length = 44 + base64_characters = string.ascii_letters + string.digits + device_id_dependent_part = "8534c8ec0ed0" + return device_id_dependent_part + "".join( + random.choice(base64_characters) + for _ in range(expected_length - len(device_id_dependent_part)) + ) + + @staticmethod + def get_signature(query_params: dict, encoded_post_data: str): + query_params = query_params.copy() + if encoded_post_data: + query_params["postData"] = encoded_post_data + sorted_pairs = sorted(query_params.items()) + filtered_pairs = filter( + lambda p: p[0] and p[0] in SIGNATURE_RELEVANT_PARAMETERS, sorted_pairs + ) + mapped_pairs = map( + # postData is pre-emptively hashed (for performance reasons?), everything else is included as-is + lambda p: p[0] + "=" + (shuffled_md5(p[1]) if p[0] == "postData" else p[1]), + filtered_pairs, + ) + message = "||".join(mapped_pairs) + return hmac.HMAC( + key=EUFY_HMAC_KEY, msg=message.encode("utf-8"), digestmod=sha256 + ).hexdigest() + + def _request( + self, + action: str, + version="1.0", + data: dict = None, + query_params: dict = None, + _requires_session=True, + ): + if not self.session_id and _requires_session: + self.acquire_session() + + current_time = time.time() + request_id = uuid.uuid4() + extra_query_params = { + "time": str(int(current_time)), + "requestId": str(request_id), + "a": action, + "v": version, + **(query_params or {}), + } + query_params = {**self.default_query_params, **extra_query_params} + encoded_post_data = json.dumps(data, separators=(",", ":")) if data else "" + resp = self.session.post( + self.base_url + "/api.json", + params={ + **query_params, + "sign": self.get_signature(query_params, encoded_post_data), + }, + data={"postData": encoded_post_data} if encoded_post_data else None, + ) + resp.raise_for_status() + data = resp.json() + if "result" not in data: + raise Exception( + f"No 'result' key in the response - the entire response is {data}." + ) + return data["result"] + + def request_token(self, username, country_code): + return self._request( + action="tuya.m.user.uid.token.create", + data={"uid": username, "countryCode": country_code}, + _requires_session=False, + ) + + def determine_password(self, username: str): + new_uid = username + padded_size = 16 * math.ceil(len(new_uid) / 16) + password_uid = new_uid.zfill(padded_size) + encryptor = TUYA_PASSWORD_INNER_CIPHER.encryptor() + encrypted_uid = encryptor.update(password_uid.encode("utf8")) + encrypted_uid += encryptor.finalize() + return md5(encrypted_uid.hex().upper().encode("utf-8")).hexdigest() + + def request_session(self, username, country_code): + password = self.determine_password(username) + token_response = self.request_token(username, country_code) + encrypted_password = unpadded_rsa( + key_exponent=int(token_response["exponent"]), + key_n=int(token_response["publicKey"]), + plaintext=password.encode("utf-8"), + ) + data = { + "uid": username, + "createGroup": True, + "ifencrypt": 1, + "passwd": encrypted_password.hex(), + "countryCode": country_code, + "options": '{"group": 1}', + "token": token_response["token"], + } + session_response = self._request( + action="tuya.m.user.uid.password.login.reg", + data=data, + _requires_session=False, + ) + return session_response + + def acquire_session(self): + session_response = self.request_session(self.username, self.country_code) + self.session_id = self.default_query_params["sid"] = session_response["sid"] + self.base_url = session_response["domain"]["mobileApiUrl"] + + def list_homes(self): + return self._request(action="tuya.m.location.list", version="2.1") + + def list_devices(self, home_id: str): + return self._request( + action="tuya.m.my.group.device.list", + version="1.0", + query_params={"gid": home_id}, + ) diff --git a/custom_components/robovac/vacuum.py b/custom_components/robovac/vacuum.py new file mode 100644 index 0000000..ed456d0 --- /dev/null +++ b/custom_components/robovac/vacuum.py @@ -0,0 +1,475 @@ +# Copyright 2022 Brendan McCluskey +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Eufy Robovac sensor platform.""" +from __future__ import annotations +from collections.abc import Mapping + +from datetime import timedelta +import logging +import asyncio +import base64 +import json +import time +import ast + +from typing import Any +from homeassistant.components.vacuum import StateVacuumEntity, VacuumEntityFeature +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import HomeAssistant +from homeassistant.helpers.device_registry import ( + CONNECTION_NETWORK_MAC, +) +from homeassistant.helpers.entity import DeviceInfo + +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.const import ( + CONF_ACCESS_TOKEN, + CONF_MODEL, + CONF_NAME, + CONF_ID, + CONF_IP_ADDRESS, + CONF_DESCRIPTION, + CONF_MAC, +) + +from .const import CONF_VACS, DOMAIN + +from .tuyalocalapi import TuyaDevice + +from homeassistant.const import ATTR_BATTERY_LEVEL + +ATTR_BATTERY_ICON = "battery_icon" +ATTR_FAN_SPEED = "fan_speed" +ATTR_FAN_SPEED_LIST = "fan_speed_list" +ATTR_STATUS = "status" +ATTR_ERROR_CODE = "error_code" +ATTR_MODEL_CODE = "model_code" +ATTR_CLEANING_AREA = "cleaning_area" +ATTR_CLEANING_TIME = "cleaning_time" +ATTR_AUTO_RETURN = "auto_return" +ATTR_DO_NOT_DISTURB = "do_not_disturb" +ATTR_BOOST_IQ = "boost_iq" +ATTR_CONSUMABLES = "consumables" +ATTR_MODE = "mode" + +_LOGGER = logging.getLogger(__name__) +# Time between updating data from GitHub +REFRESH_RATE = 20 +SCAN_INTERVAL = timedelta(seconds=REFRESH_RATE) + + +class robovac(TuyaDevice): + """""" + + +async def async_setup_entry( + hass: HomeAssistant, + config_entry: ConfigEntry, + async_add_entities: AddEntitiesCallback, +) -> None: + """Initialize my test integration 2 config entry.""" + # print("vacuum:async_setup_entry") + vacuums = config_entry.data[CONF_VACS] + # print("Vac:", vacuums) + for item in vacuums: + item = vacuums[item] + # print("item") + async_add_entities([RoboVacEntity(item)]) + + +class RoboVacEntity(StateVacuumEntity): + """Eufy Robovac version of a Vacuum entity""" + + _attr_should_poll = True + + _attr_access_token: str | None = None + _attr_ip_address: str | None = None + _attr_model_code: str | None = None + _attr_cleaning_area: str | None = None + _attr_cleaning_time: str | None = None + _attr_auto_return: str | None = None + _attr_do_not_disturb: str | None = None + _attr_boost_iq: str | None = None + _attr_consumables: str | None = None + _attr_mode: str | None = None + + @property + def mode(self) -> str | None: + """Return the cleaning mode of the vacuum cleaner.""" + return self._attr_mode + + @property + def consumables(self) -> str | None: + """Return the consumables status of the vacuum cleaner.""" + return self._attr_consumables + + @property + def cleaning_area(self) -> str | None: + """Return the cleaning area of the vacuum cleaner.""" + return self._attr_cleaning_area + + @property + def cleaning_time(self) -> str | None: + """Return the cleaning time of the vacuum cleaner.""" + return self._attr_cleaning_time + + @property + def auto_return(self) -> str | None: + """Return the auto_return mode of the vacuum cleaner.""" + return self._attr_auto_return + + @property + def do_not_disturb(self) -> str | None: + """Return the do not disturb mode of the vacuum cleaner.""" + return self._attr_do_not_disturb + + @property + def boost_iq(self) -> str | None: + """Return the boost iq mode of the vacuum cleaner.""" + return self._attr_boost_iq + + @property + def model_code(self) -> str | None: + """Return the model code of the vacuum cleaner.""" + return self._attr_model_code + + @property + def access_token(self) -> str | None: + """Return the fan speed of the vacuum cleaner.""" + return self._attr_access_token + + @property + def ip_address(self) -> str | None: + """Return the ip address of the vacuum cleaner.""" + return self._attr_ip_address + + @property + def state_attributes(self) -> dict[str, Any]: + """Return the state attributes of the vacuum cleaner.""" + data: dict[str, Any] = {} + if self.supported_features & VacuumEntityFeature.BATTERY: + data[ATTR_BATTERY_LEVEL] = self.battery_level + data[ATTR_BATTERY_ICON] = self.battery_icon + if self.supported_features & VacuumEntityFeature.FAN_SPEED: + data[ATTR_FAN_SPEED] = self.fan_speed + if self.supported_features & VacuumEntityFeature.STATUS: + data[ATTR_STATUS] = self.status + data[ATTR_CLEANING_AREA] = self.cleaning_area + data[ATTR_CLEANING_TIME] = self.cleaning_time + data[ATTR_AUTO_RETURN] = self.auto_return + data[ATTR_DO_NOT_DISTURB] = self.do_not_disturb + data[ATTR_BOOST_IQ] = self.boost_iq + data[ATTR_CONSUMABLES] = self.consumables + data[ATTR_MODE] = self.mode + return data + + @property + def capability_attributes(self) -> Mapping[str, Any] | None: + """Return capability attributes.""" + if self.supported_features & VacuumEntityFeature.FAN_SPEED: + return { + ATTR_FAN_SPEED_LIST: self.fan_speed_list, + # CONF_ACCESS_TOKEN: self.access_token, + CONF_IP_ADDRESS: self.ip_address, + ATTR_MODEL_CODE: self.model_code, + } + else: + return { + # CONF_ACCESS_TOKEN: self.access_token, + CONF_IP_ADDRESS: self.ip_address, + ATTR_MODEL_CODE: self.model_code, + } + + def __init__(self, item) -> None: + # print("vacuum:RoboVacEntity") + # print("init_item", item) + """Initialize mytest2 Sensor.""" + super().__init__() + self._attr_name = item[CONF_NAME] + self._attr_unique_id = item[CONF_ID] + self._attr_supported_features = 4084 + self._attr_model_code = item[CONF_MODEL] + self._attr_ip_address = item[CONF_IP_ADDRESS] + self._attr_access_token = item[CONF_ACCESS_TOKEN] + if self.model_code in [ + "T2103", + "T2117", + "T2118", + "T2119", + "T2120", + "T2123", + "T2128", + "T2130", + ]: # C + self._attr_fan_speed_list = ["No Suction", "Standard", "Boost IQ", "Max"] + elif self.model_code in ["T1250", "T2250", "T2251", "T2252", "T2253"]: # G + self._attr_fan_speed_list = ["Standard", "Turbo", "Max", "Boost IQ"] + elif self.model_code in ["T2262"]: # X + self._attr_fan_speed_list = ["Pure", "Standard", "Turbo", "Max"] + else: + self._attr_fan_speed_list = ["Standard"] + self._attr_mode = None + self._attr_consumables = None + self._attr_device_info = DeviceInfo( + identifiers={(DOMAIN, item[CONF_ID])}, + name=item[CONF_NAME], + manufacturer="Eufy", + model=item[CONF_DESCRIPTION], + connections=[ + (CONNECTION_NETWORK_MAC, item[CONF_MAC]), + ], + access_token=item[CONF_ACCESS_TOKEN], + ip_address=item[CONF_IP_ADDRESS], + ) + self.vacuum = robovac( + device_id=self.unique_id, + host=self.ip_address, + local_key=self.access_token, + timeout=2, + ping_interval=10 + # ping_interval=REFRESH_RATE / 2, + ) + self.error_code = None + self.tuya_state = None + self.tuyastatus = None + print("vac:", self.vacuum) + + async def async_update(self): + """Synchronise state from the vacuum.""" + print("update:", self.name) + if self.ip_address == "": + return + await self.vacuum.async_get() + self.tuyastatus = self.vacuum._dps + print("Tuya local API Result:", self.tuyastatus) + # for 15C + self._attr_battery_level = self.tuyastatus.get("104") + self.tuya_state = self.tuyastatus.get("15") + self.error_code = self.tuyastatus.get("106") + self._attr_mode = self.tuyastatus.get("5") + self._attr_fan_speed = self.tuyastatus.get("102") + if self.fan_speed == "No_suction": + self._attr_fan_speed = "No Suction" + elif self.fan_speed == "Boost_IQ": + self._attr_fan_speed = "Boost IQ" + elif self.fan_speed == "Quiet": + self._attr_fan_speed = "Pure" + # for G30 + self._attr_cleaning_area = self.tuyastatus.get("110") + self._attr_cleaning_time = self.tuyastatus.get("109") + self._attr_auto_return = self.tuyastatus.get("135") + self._attr_do_not_disturb = self.tuyastatus.get("107") + if self.tuyastatus.get("142") is not None: + self._attr_consumables = ast.literal_eval( + base64.b64decode(self.tuyastatus.get("142")).decode("ascii") + )["consumable"]["duration"] + print(self.consumables) + # For X8 + self._attr_boost_iq = self.tuyastatus.get("118") + # self.map_data = self.tuyastatus.get("121") + # self.erro_msg? = self.tuyastatus.get("124") + if self.tuyastatus.get("116") is not None: + self._attr_consumables = ast.literal_eval( + base64.b64decode(self.tuyastatus.get("116")).decode("ascii") + )["consumable"]["duration"] + print(self.consumables) + + @property + def status(self): + """Return the status of the vacuum cleaner.""" + print("status:", self.error_code, self.tuya_state) + if self.ip_address == "": + return "Error: Set the IP Address" + if type(self.error_code) is not None and self.error_code not in [0, "no_error"]: + if self.error_code == 1: + return "Error: Front bumper stuck" + elif self.error_code == 2: + return "Error: Wheel stuck" + elif self.error_code == 3: + return "Error: Side brush" + elif self.error_code == 4: + return "Error: Rolling brush bar stuck" + elif self.error_code == 5: + return "Error: Device trapped" + elif self.error_code == 6: + return "Error: Device trapped" + elif self.error_code == 7: + return "Error: Wheel suspended" + elif self.error_code == 8: + return "Error: Low battery" + elif self.error_code == 9: + return "Error: Magnetic boundary" + elif self.error_code == 12: + return "Error: Right wall sensor" + elif self.error_code == 13: + return "Error: Device tilted" + elif self.error_code == 14: + return "Error: Insert dust collector" + elif self.error_code == 17: + return "Error: Restricted area detected" + elif self.error_code == 18: + return "Error: Laser cover stuck" + elif self.error_code == 19: + return "Error: Laser sesor stuck" + elif self.error_code == 20: + return "Error: Laser sensor blocked" + elif self.error_code == 21: + return "Error: Base blocked" + elif self.error_code == "S1": + return "Error: Battery" + elif self.error_code == "S2": + return "Error: Wheel Module" + elif self.error_code == "S3": + return "Error: Side Brush" + elif self.error_code == "S4": + return "Error: Suction Fan" + elif self.error_code == "S5": + return "Error: Rolling Brush" + elif self.error_code == "S8": + return "Error: Path Tracking Sensor" + elif self.error_code == "Wheel_stuck": + return "Error: Wheel stuck" + elif self.error_code == "R_brush_stuck": + return "Error: Rolling brush stuck" + elif self.error_code == "Crash_bar_stuck": + return "Error: Front bumper stuck" + elif self.error_code == "sensor_dirty": + return "Error: Sensor dirty" + elif self.error_code == "N_enough_pow": + return "Error: Low battery" + elif self.error_code == "Stuck_5_min": + return "Error: Device trapped" + elif self.error_code == "Fan_stuck": + return "Error: Fan stuck" + elif self.error_code == "S_brush_stuck": + return "Error: Side brush stuck" + else: + return "Error: " + str(self.error_code) + elif self.tuya_state == "Running": + return "Cleaning" + elif self.tuya_state == "Locating": + return "Locating" + elif self.tuya_state == "remote": + return "Cleaning" + elif self.tuya_state == "Charging": + return "Charging" + elif self.tuya_state == "completed": + return "Docked" + elif self.tuya_state == "Recharge": + return "Returning" + elif self.tuya_state == "Sleeping": + return "Sleeping" + elif self.tuya_state == "standby": + return "Standby" + else: + return "Cleaning" + + async def async_locate(self, **kwargs): + """Locate the vacuum cleaner.""" + print("Locate Pressed") + _LOGGER.info("Locate Pressed") + if self.tuyastatus.get("103"): + await self.vacuum.async_set({"103": False}, None) + else: + await self.vacuum.async_set({"103": True}, None) + + async def async_return_to_base(self, **kwargs): + """Set the vacuum cleaner to return to the dock.""" + print("Return home Pressed") + _LOGGER.info("Return home Pressed") + await self.vacuum.async_set({"101": True}, None) + await asyncio.sleep(1) + self.async_update + + async def async_start_pause(self, **kwargs): + """Pause the cleaning task or resume it.""" + print("Start/Pause Pressed") + _LOGGER.info("Start/Pause Pressed") + if self.tuyastatus.get("2") or self.tuya_state == "Recharge": + await self.vacuum.async_set({"2": False}, None) + else: + if self.mode == "Nosweep": + self._attr_mode = "auto" + elif self.mode == "room" and ( + self.status == "Charging" or self.status == "completed" + ): + self._attr_mode = "auto" + await self.vacuum.async_set({"5": self.mode}, None) + await asyncio.sleep(1) + self.async_update + + async def async_clean_spot(self, **kwargs): + """Perform a spot clean-up.""" + print("Spot Clean Pressed") + _LOGGER.info("Spot Clean Pressed") + await self.vacuum.async_set({"5": "Spot"}, None) + await asyncio.sleep(1) + self.async_update + + async def async_set_fan_speed(self, fan_speed, **kwargs): + """Set fan speed.""" + print("Fan Speed Selected", fan_speed) + _LOGGER.info("Fan Speed Selected") + if fan_speed == "No Suction": + fan_speed = "No_suction" + elif fan_speed == "Boost IQ": + fan_speed = "Boost_IQ" + elif fan_speed == "Pure": + fan_speed = "Quiet" + await self.vacuum.async_set({"102": fan_speed}, None) + await asyncio.sleep(1) + self.async_update + + async def async_send_command( + self, command: str, params: dict | list | None = None, **kwargs + ) -> None: + """Send a command to a vacuum cleaner.""" + _LOGGER.info("Send Command %s Pressed", command) + if command == "edgeClean": + await self.vacuum.async_set({"5": "Edge"}, None) + elif command == "smallRoomClean": + await self.vacuum.async_set({"5": "SmallRoom"}, None) + elif command == "autoClean": + await self.vacuum.async_set({"5": "auto"}, None) + elif command == "autoReturn": + if self.auto_return: + await self.vacuum.async_set({"135": False}, None) + else: + await self.vacuum.async_set({"135": True}, None) + elif command == "doNotDisturb": + if self.do_not_disturb: + await self.vacuum.async_set({"139": "MEQ4MDAwMDAw"}, None) + await self.vacuum.async_set({"107": False}, None) + else: + await self.vacuum.async_set({"139": "MTAwMDAwMDAw"}, None) + await self.vacuum.async_set({"107": True}, None) + elif command == "boostIQ": + if self.boost_iq: + await self.vacuum.async_set({"118": False}, None) + else: + await self.vacuum.async_set({"118": True}, None) + elif command == "roomClean": + roomIds = params.get("roomIds", [1]) + count = params.get("count", 1) + clean_request = {"roomIds": roomIds, "cleanTimes": count} + method_call = { + "method": "selectRoomsClean", + "data": clean_request, + "timestamp": round(time.time() * 1000), + } + json_str = json.dumps(method_call, separators=(",", ":")) + base64_str = base64.b64encode(json_str.encode("utf8")).decode("utf8") + _LOGGER.info("roomClean call %s", json_str) + await self.vacuum.async_set({"124": base64_str}, None)