Add files via upload

This commit is contained in:
bmccluskey 2022-09-14 08:09:01 +01:00 committed by GitHub
parent 5c5f273f17
commit ff6184af42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1851 additions and 0 deletions

View File

@ -0,0 +1,53 @@
# Copyright 2022 Brendan McCluskey
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Eufy Robovac integration."""
from __future__ import annotations
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from .const import DOMAIN
PLATFORMS = [Platform.VACUUM]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Eufy Robovac from a config entry."""
hass.data.setdefault(DOMAIN, {})
# hass_data = dict(entry.data)
# Registers update listener to update config entry when options are updated.
# unsub_options_update_listener = entry.add_update_listener(options_update_listener)
# Store a reference to the unsubscribe function to cleanup if an entry is unloaded.
# hass_data["unsub_options_update_listener"] = unsub_options_update_listener
# hass.data[DOMAIN][entry.entry_id] = hass_data
entry.async_on_unload(entry.add_update_listener(update_listener))
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
"""Nothing"""
return unload_ok
async def update_listener(hass, entry):
"""Handle options update."""
await async_unload_entry(hass, entry)
await async_setup_entry(hass, entry)

View File

@ -0,0 +1,210 @@
# Copyright 2022 Brendan McCluskey
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Config flow for Eufy Robovac integration."""
from __future__ import annotations
import logging
from typing import Any, Optional
from copy import deepcopy
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant import config_entries
from homeassistant.core import callback
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResult
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.selector import selector
from homeassistant.const import (
CONF_ACCESS_TOKEN,
CONF_NAME,
CONF_ID,
CONF_MODEL,
CONF_USERNAME,
CONF_PASSWORD,
CONF_IP_ADDRESS,
CONF_DESCRIPTION,
CONF_MAC,
CONF_LOCATION,
CONF_CLIENT_ID,
)
from .const import DOMAIN, CONF_VACS, CONF_PHONE_CODE
from .tuyawebapi import TuyaAPISession
from .eufywebapi import EufyLogon
_LOGGER = logging.getLogger(__name__)
USER_SCHEMA = vol.Schema(
{
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
}
)
def get_eufy_vacuums(self):
"""Login to Eufy and get the vacuum details"""
eufy_session = EufyLogon(self["username"], self["password"])
response = eufy_session.get_user_info()
if response.status_code != 200:
raise CannotConnect
user_response = response.json()
if user_response["res_code"] != 1:
raise InvalidAuth
response = eufy_session.get_device_info(
user_response["user_info"]["request_host"],
user_response["user_info"]["id"],
user_response["access_token"],
)
device_response = response.json()
self[CONF_CLIENT_ID] = user_response["user_info"]["id"]
self[CONF_PHONE_CODE] = user_response["user_info"]["phone_code"]
# self[CONF_VACS] = {}
items = device_response["items"]
allvacs = {}
for item in items:
if item["device"]["product"]["appliance"] == "Cleaning":
vac_details = {
CONF_ID: item["device"]["id"],
CONF_MODEL: item["device"]["product"]["product_code"],
CONF_NAME: item["device"]["alias_name"],
CONF_DESCRIPTION: item["device"]["name"],
CONF_MAC: item["device"]["wifi"]["mac"],
CONF_IP_ADDRESS: "",
}
allvacs[item["device"]["id"]] = vac_details
self[CONF_VACS] = allvacs
tuya_client = TuyaAPISession(
username="eh-" + self[CONF_CLIENT_ID], country_code=self[CONF_PHONE_CODE]
)
for home in tuya_client.list_homes():
for device in tuya_client.list_devices(home["groupId"]):
self[CONF_VACS][device["devId"]][CONF_ACCESS_TOKEN] = device["localKey"]
self[CONF_VACS][device["devId"]][CONF_LOCATION] = home["groupId"]
return response
async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> dict[str, Any]:
"""Validate the user input allows us to connect."""
await hass.async_add_executor_job(get_eufy_vacuums, data)
return data
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Eufy Robovac."""
data: Optional[dict[str, Any]]
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle the initial step."""
if user_input is None:
return self.async_show_form(step_id="user", data_schema=USER_SCHEMA)
errors = {}
try:
unique_id = user_input[CONF_USERNAME]
valid_data = await validate_input(self.hass, user_input)
except CannotConnect:
errors["base"] = "cannot_connect"
except InvalidAuth:
errors["base"] = "invalid_auth"
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
await self.async_set_unique_id(unique_id)
self._abort_if_unique_id_configured()
# return await self.async_step_repo(valid_data)
return self.async_create_entry(title=unique_id, data=valid_data)
return self.async_show_form(
step_id="user", data_schema=USER_SCHEMA, errors=errors
)
@staticmethod
@callback
def async_get_options_flow(config_entry):
"""Get the options flow for this handler."""
return OptionsFlowHandler(config_entry)
class CannotConnect(HomeAssistantError):
"""Error to indicate we cannot connect."""
class InvalidAuth(HomeAssistantError):
"""Error to indicate there is invalid auth."""
class OptionsFlowHandler(config_entries.OptionsFlow):
"""Handles options flow for the component."""
def __init__(self, config_entry: config_entries.ConfigEntry) -> None:
self.config_entry = config_entry
async def async_step_init(
self, user_input: dict[str, Any] = None
) -> dict[str, Any]:
"""Manage the options for the custom component."""
errors: dict[str, str] = {}
vac_names = []
vacuums = self.config_entry.data[CONF_VACS]
for item in vacuums:
item_settings = vacuums[item]
vac_names.append(item_settings["name"])
if user_input is not None:
for item in vacuums:
item_settings = vacuums[item]
if item_settings["name"] == user_input["vacuum"]:
item_settings[CONF_IP_ADDRESS] = user_input[CONF_IP_ADDRESS]
updated_repos = deepcopy(self.config_entry.data[CONF_VACS])
# print("Updated", updated_repos)
if not errors:
# Value of data will be set on the options property of our config_entry
# instance.
return self.async_create_entry(
title="",
data={CONF_VACS: updated_repos},
)
options_schema = vol.Schema(
{
vol.Optional("vacuum", default=1): selector(
{
"select": {
"options": vac_names,
}
}
),
vol.Optional(CONF_IP_ADDRESS): cv.string,
}
)
return self.async_show_form(
step_id="init", data_schema=options_schema, errors=errors
)

View File

@ -0,0 +1,5 @@
"""Constants for the Eufy Robovac integration."""
DOMAIN = "robovac"
CONF_VACS = "vacuums"
CONF_PHONE_CODE = "phone_code"

View File

@ -0,0 +1,41 @@
"""Original Work from here: Andre Borie https://gitlab.com/Rjevski/eufy-device-id-and-local-key-grabber"""
import requests
eufyheaders = {
"User-Agent": "EufyHome-Android-2.4.0",
"timezone": "Europe/London",
"category": "Home",
"token": "",
"uid": "",
"openudid": "sdk_gphone64_arm64",
"clientType": "2",
"language": "en",
"country": "US",
"Accept-Encoding": "gzip",
}
class EufyLogon:
def __init__(self, username, password):
self.username = username
self.password = password
def get_user_info(self):
login_url = "https://home-api.eufylife.com/v1/user/email/login"
login_auth = {
"client_Secret": "GQCpr9dSp3uQpsOMgJ4xQ",
"client_id": "eufyhome-app",
"email": self.username,
"password": self.password,
}
return requests.post(
login_url, json=login_auth, headers=eufyheaders, timeout=1.5
)
def get_device_info(self, url, userid, token):
device_url = url + "/v1/device/list/devices-and-groups"
eufyheaders["token"] = token
eufyheaders["id"] = userid
return requests.request("GET", device_url, headers=eufyheaders, timeout=1.5)

View File

@ -0,0 +1,14 @@
{
"domain": "robovac",
"name": "Eufy Robovac",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/robovac",
"requirements": [],
"ssdp": [],
"zeroconf": [],
"homekit": {},
"dependencies": [],
"codeowners": ["@bmccluskey"],
"iot_class": "local_polling",
"version": "1"
}

View File

@ -0,0 +1,21 @@
{
"config": {
"step": {
"user": {
"data": {
"username": "[%key:common::config_flow::data::username%]",
"password": "[%key:common::config_flow::data::password%]"
}
}
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
}
}
}

View File

@ -0,0 +1,37 @@
{
"config": {
"abort": {
"already_configured": "Device is already configured"
},
"error": {
"cannot_connect": "Failed to connect",
"invalid_auth": "Invalid authentication",
"unknown": "Unexpected error"
},
"step": {
"user": {
"data": {
"host": "Host",
"password": "Password",
"username": "Username"
},
"description": "Enter your Eufy account details"
}
}
},
"options": {
"error": {
"invalid_path": "The path provided is not valid. Should be in the format `user/repo-name` and should be a valid github repository."
},
"step": {
"init": {
"title": "Manage IPs",
"data": {
"vacuum": "Select the Vacuum to edit",
"ip_address": "IP address of vacuum"
},
"description": "Add or update a vacuums IP address"
}
}
}
}

View File

@ -0,0 +1,772 @@
# -*- coding: utf-8 -*-
# Copyright 2019 Richard Mitchell
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Based on portions of https://github.com/codetheweb/tuyapi/
#
# MIT License
#
# Copyright (c) 2017 Max Isom
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import asyncio
import base64
import json
import logging
import socket
import struct
import sys
import time
from cryptography.hazmat.backends.openssl import backend as openssl_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.hashes import Hash, MD5
from cryptography.hazmat.primitives.padding import PKCS7
_LOGGER = logging.getLogger(__name__)
MESSAGE_PREFIX_FORMAT = ">IIII"
MESSAGE_SUFFIX_FORMAT = ">II"
MAGIC_PREFIX = 0x000055AA
MAGIC_SUFFIX = 0x0000AA55
MAGIC_SUFFIX_BYTES = struct.pack(">I", MAGIC_SUFFIX)
CRC_32_TABLE = [
0x00000000,
0x77073096,
0xEE0E612C,
0x990951BA,
0x076DC419,
0x706AF48F,
0xE963A535,
0x9E6495A3,
0x0EDB8832,
0x79DCB8A4,
0xE0D5E91E,
0x97D2D988,
0x09B64C2B,
0x7EB17CBD,
0xE7B82D07,
0x90BF1D91,
0x1DB71064,
0x6AB020F2,
0xF3B97148,
0x84BE41DE,
0x1ADAD47D,
0x6DDDE4EB,
0xF4D4B551,
0x83D385C7,
0x136C9856,
0x646BA8C0,
0xFD62F97A,
0x8A65C9EC,
0x14015C4F,
0x63066CD9,
0xFA0F3D63,
0x8D080DF5,
0x3B6E20C8,
0x4C69105E,
0xD56041E4,
0xA2677172,
0x3C03E4D1,
0x4B04D447,
0xD20D85FD,
0xA50AB56B,
0x35B5A8FA,
0x42B2986C,
0xDBBBC9D6,
0xACBCF940,
0x32D86CE3,
0x45DF5C75,
0xDCD60DCF,
0xABD13D59,
0x26D930AC,
0x51DE003A,
0xC8D75180,
0xBFD06116,
0x21B4F4B5,
0x56B3C423,
0xCFBA9599,
0xB8BDA50F,
0x2802B89E,
0x5F058808,
0xC60CD9B2,
0xB10BE924,
0x2F6F7C87,
0x58684C11,
0xC1611DAB,
0xB6662D3D,
0x76DC4190,
0x01DB7106,
0x98D220BC,
0xEFD5102A,
0x71B18589,
0x06B6B51F,
0x9FBFE4A5,
0xE8B8D433,
0x7807C9A2,
0x0F00F934,
0x9609A88E,
0xE10E9818,
0x7F6A0DBB,
0x086D3D2D,
0x91646C97,
0xE6635C01,
0x6B6B51F4,
0x1C6C6162,
0x856530D8,
0xF262004E,
0x6C0695ED,
0x1B01A57B,
0x8208F4C1,
0xF50FC457,
0x65B0D9C6,
0x12B7E950,
0x8BBEB8EA,
0xFCB9887C,
0x62DD1DDF,
0x15DA2D49,
0x8CD37CF3,
0xFBD44C65,
0x4DB26158,
0x3AB551CE,
0xA3BC0074,
0xD4BB30E2,
0x4ADFA541,
0x3DD895D7,
0xA4D1C46D,
0xD3D6F4FB,
0x4369E96A,
0x346ED9FC,
0xAD678846,
0xDA60B8D0,
0x44042D73,
0x33031DE5,
0xAA0A4C5F,
0xDD0D7CC9,
0x5005713C,
0x270241AA,
0xBE0B1010,
0xC90C2086,
0x5768B525,
0x206F85B3,
0xB966D409,
0xCE61E49F,
0x5EDEF90E,
0x29D9C998,
0xB0D09822,
0xC7D7A8B4,
0x59B33D17,
0x2EB40D81,
0xB7BD5C3B,
0xC0BA6CAD,
0xEDB88320,
0x9ABFB3B6,
0x03B6E20C,
0x74B1D29A,
0xEAD54739,
0x9DD277AF,
0x04DB2615,
0x73DC1683,
0xE3630B12,
0x94643B84,
0x0D6D6A3E,
0x7A6A5AA8,
0xE40ECF0B,
0x9309FF9D,
0x0A00AE27,
0x7D079EB1,
0xF00F9344,
0x8708A3D2,
0x1E01F268,
0x6906C2FE,
0xF762575D,
0x806567CB,
0x196C3671,
0x6E6B06E7,
0xFED41B76,
0x89D32BE0,
0x10DA7A5A,
0x67DD4ACC,
0xF9B9DF6F,
0x8EBEEFF9,
0x17B7BE43,
0x60B08ED5,
0xD6D6A3E8,
0xA1D1937E,
0x38D8C2C4,
0x4FDFF252,
0xD1BB67F1,
0xA6BC5767,
0x3FB506DD,
0x48B2364B,
0xD80D2BDA,
0xAF0A1B4C,
0x36034AF6,
0x41047A60,
0xDF60EFC3,
0xA867DF55,
0x316E8EEF,
0x4669BE79,
0xCB61B38C,
0xBC66831A,
0x256FD2A0,
0x5268E236,
0xCC0C7795,
0xBB0B4703,
0x220216B9,
0x5505262F,
0xC5BA3BBE,
0xB2BD0B28,
0x2BB45A92,
0x5CB36A04,
0xC2D7FFA7,
0xB5D0CF31,
0x2CD99E8B,
0x5BDEAE1D,
0x9B64C2B0,
0xEC63F226,
0x756AA39C,
0x026D930A,
0x9C0906A9,
0xEB0E363F,
0x72076785,
0x05005713,
0x95BF4A82,
0xE2B87A14,
0x7BB12BAE,
0x0CB61B38,
0x92D28E9B,
0xE5D5BE0D,
0x7CDCEFB7,
0x0BDBDF21,
0x86D3D2D4,
0xF1D4E242,
0x68DDB3F8,
0x1FDA836E,
0x81BE16CD,
0xF6B9265B,
0x6FB077E1,
0x18B74777,
0x88085AE6,
0xFF0F6A70,
0x66063BCA,
0x11010B5C,
0x8F659EFF,
0xF862AE69,
0x616BFFD3,
0x166CCF45,
0xA00AE278,
0xD70DD2EE,
0x4E048354,
0x3903B3C2,
0xA7672661,
0xD06016F7,
0x4969474D,
0x3E6E77DB,
0xAED16A4A,
0xD9D65ADC,
0x40DF0B66,
0x37D83BF0,
0xA9BCAE53,
0xDEBB9EC5,
0x47B2CF7F,
0x30B5FFE9,
0xBDBDF21C,
0xCABAC28A,
0x53B39330,
0x24B4A3A6,
0xBAD03605,
0xCDD70693,
0x54DE5729,
0x23D967BF,
0xB3667A2E,
0xC4614AB8,
0x5D681B02,
0x2A6F2B94,
0xB40BBE37,
0xC30C8EA1,
0x5A05DF1B,
0x2D02EF8D,
]
class TuyaException(Exception):
"""Base for Tuya exceptions."""
class InvalidKey(TuyaException):
"""The local key is invalid."""
class InvalidMessage(TuyaException):
"""The message received is invalid."""
class MessageDecodeFailed(TuyaException):
"""The message received cannot be decoded as JSON."""
class ConnectionException(TuyaException):
"""The socket connection failed."""
class ConnectionTimeoutException(ConnectionException):
"""The socket connection timed out."""
class RequestResponseCommandMismatch(TuyaException):
"""The command in the response didn't match the one from the request."""
class TuyaCipher:
"""Tuya cryptographic helpers."""
def __init__(self, key, version):
"""Initialize the cipher."""
self.version = version
self.key = key
self.cipher = Cipher(
algorithms.AES(key.encode("ascii")), modes.ECB(), backend=openssl_backend
)
def get_prefix_size_and_validate(self, command, encrypted_data):
try:
version = tuple(map(int, encrypted_data[:3].decode("utf8").split(".")))
except ValueError:
version = (0, 0)
if version != self.version:
return 0
if version < (3, 3):
hash = encrypted_data[3:19].decode("ascii")
expected_hash = self.hash(encrypted_data[19:])
if hash != expected_hash:
return 0
return 19
else:
if command in (Message.SET_COMMAND, Message.GRATUITOUS_UPDATE):
_, sequence, __, ___ = struct.unpack_from(">IIIH", encrypted_data, 3)
return 15
return 0
def decrypt(self, command, data):
prefix_size = self.get_prefix_size_and_validate(command, data)
data = data[prefix_size:]
decryptor = self.cipher.decryptor()
if self.version < (3, 3):
data = base64.b64decode(data)
decrypted_data = decryptor.update(data)
decrypted_data += decryptor.finalize()
unpadder = PKCS7(128).unpadder()
unpadded_data = unpadder.update(decrypted_data)
unpadded_data += unpadder.finalize()
return unpadded_data
def encrypt(self, command, data):
encrypted_data = b""
if data:
padder = PKCS7(128).padder()
padded_data = padder.update(data)
padded_data += padder.finalize()
encryptor = self.cipher.encryptor()
encrypted_data = encryptor.update(padded_data)
encrypted_data += encryptor.finalize()
prefix = ".".join(map(str, self.version)).encode("utf8")
if self.version < (3, 3):
payload = base64.b64encode(encrypted_data)
hash = self.hash(payload)
prefix += hash.encode("utf8")
else:
payload = encrypted_data
if command in (Message.SET_COMMAND, Message.GRATUITOUS_UPDATE):
prefix += b"\x00" * 12
else:
prefix = b""
return prefix + payload
def hash(self, data):
digest = Hash(MD5(), backend=openssl_backend)
to_hash = "data={}||lpv={}||{}".format(
data.decode("ascii"), ".".join(map(str, self.version)), self.key
)
digest.update(to_hash.encode("utf8"))
intermediate = digest.finalize().hex()
return intermediate[8:24]
def crc(data):
"""Calculate the Tuya-flavored CRC of some data."""
c = 0xFFFFFFFF
for b in data:
c = (c >> 8) ^ CRC_32_TABLE[(c ^ b) & 255]
return c ^ 0xFFFFFFFF
class Message:
PING_COMMAND = 0x09
GET_COMMAND = 0x0A
SET_COMMAND = 0x07
GRATUITOUS_UPDATE = 0x08
def __init__(self, command, payload=None, sequence=None, encrypt_for=None):
if payload is None:
payload = b""
self.payload = payload
self.command = command
if sequence is None:
# Use millisecond process time as the sequence number. Not ideal,
# but good for one month's continuous connection time though.
sequence = int(time.perf_counter() * 1000) & 0xFFFFFFFF
self.sequence = sequence
self.encrypt = False
self.device = None
if encrypt_for is not None:
self.device = encrypt_for
self.encrypt = True
def __repr__(self):
return "{}({}, {!r}, {!r}, {})".format(
self.__class__.__name__,
hex(self.command),
self.payload,
self.sequence,
"<Device {}>".format(self.device) if self.device else None,
)
def hex(self):
return self.bytes().hex()
def bytes(self):
payload_data = self.payload
if isinstance(payload_data, dict):
payload_data = json.dumps(payload_data, separators=(",", ":"))
if not isinstance(payload_data, bytes):
payload_data = payload_data.encode("utf8")
if self.encrypt:
payload_data = self.device.cipher.encrypt(self.command, payload_data)
payload_size = len(payload_data) + struct.calcsize(MESSAGE_SUFFIX_FORMAT)
header = struct.pack(
MESSAGE_PREFIX_FORMAT,
MAGIC_PREFIX,
self.sequence,
self.command,
payload_size,
)
if self.device and self.device.version >= (3, 3):
checksum = crc(header + payload_data)
else:
checksum = crc(payload_data)
footer = struct.pack(MESSAGE_SUFFIX_FORMAT, checksum, MAGIC_SUFFIX)
return header + payload_data + footer
__bytes__ = bytes
class AsyncWrappedCallback:
def __init__(self, request, callback):
self.request = request
self.callback = callback
self.devices = []
def register(self, device):
self.devices.append(device)
device._handlers.setdefault(self.request.command, [])
device._handlers[self.request.command].append(self)
def unregister(self, device):
self.devices.remove(device)
device._handlers[self.request.command].remove(self)
def unregister_all(self):
while self.devices:
device = self.devices.pop()
device._handlers[self.request.command].remove(self)
async def __call__(self, response, device):
if response.sequence == self.request.sequence:
asyncio.ensure_future(self.callback(response, device))
self.unregister(device)
async def async_send(self, device, callback=None):
if callback is not None:
wrapped = self.AsyncWrappedCallback(self, callback)
wrapped.register(device)
await device._async_send(self)
@classmethod
def from_bytes(cls, data, cipher=None):
try:
prefix, sequence, command, payload_size = struct.unpack_from(
MESSAGE_PREFIX_FORMAT, data
)
except struct.error as e:
raise InvalidMessage("Invalid message header format.") from e
if prefix != MAGIC_PREFIX:
raise InvalidMessage("Magic prefix missing from message.")
# check for an optional return code
header_size = struct.calcsize(MESSAGE_PREFIX_FORMAT)
try:
(return_code,) = struct.unpack_from(">I", data, header_size)
except struct.error as e:
raise InvalidMessage("Unable to unpack return code.") from e
if return_code >> 8:
payload_data = data[
header_size : header_size
+ payload_size
- struct.calcsize(MESSAGE_SUFFIX_FORMAT)
]
return_code = None
else:
payload_data = data[
header_size
+ struct.calcsize(">I") : header_size
+ payload_size
- struct.calcsize(MESSAGE_SUFFIX_FORMAT)
]
try:
expected_crc, suffix = struct.unpack_from(
MESSAGE_SUFFIX_FORMAT,
data,
header_size + payload_size - struct.calcsize(MESSAGE_SUFFIX_FORMAT),
)
except struct.error as e:
raise InvalidMessage("Invalid message suffix format.") from e
if suffix != MAGIC_SUFFIX:
raise InvalidMessage("Magic suffix missing from message")
actual_crc = crc(
data[: header_size + payload_size - struct.calcsize(MESSAGE_SUFFIX_FORMAT)]
)
if expected_crc != actual_crc:
raise InvalidMessage("CRC check failed")
payload = None
if payload_data:
try:
payload_data = cipher.decrypt(command, payload_data)
except ValueError as e:
pass
try:
payload_text = payload_data.decode("utf8")
except UnicodeDecodeError as e:
_LOGGER.debug(payload_data.hex())
_LOGGER.error(e)
raise MessageDecodeFailed() from e
try:
payload = json.loads(payload_text)
except json.decoder.JSONDecodeError as e:
# data may be encrypted
_LOGGER.debug(payload_data.hex())
_LOGGER.error(e)
raise MessageDecodeFailed() from e
return cls(command, payload, sequence)
def _call_async(fn, *args):
loop = None
if sys.version_info >= (3, 7):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
pass
loop = asyncio.get_event_loop()
def wrapper(fn, *args):
asyncio.ensure_future(fn(*args))
loop.call_soon(wrapper, fn, *args)
class TuyaDevice:
"""Represents a generic Tuya device."""
# PING_INTERVAL = 10
def __init__(
self,
device_id,
host,
timeout,
ping_interval,
local_key=None,
port=6668,
gateway_id=None,
version=(3, 3),
):
"""Initialize the device."""
self.device_id = device_id
self.host = host
self.port = port
if not gateway_id:
gateway_id = self.device_id
self.gateway_id = gateway_id
self.version = version
self.timeout = timeout
self.last_pong = 0
self.ping_interval = ping_interval
if len(local_key) != 16:
raise InvalidKey("Local key should be a 16-character string")
self.cipher = TuyaCipher(local_key, self.version)
self.writer = None
self._handlers = {
Message.GET_COMMAND: [self.async_update_state],
Message.GRATUITOUS_UPDATE: [self.async_update_state],
Message.PING_COMMAND: [self._async_pong_received],
}
self._dps = {}
self._connected = False
def __repr__(self):
return "{}({!r}, {!r}, {!r}, {!r})".format(
self.__class__.__name__,
self.device_id,
self.host,
self.port,
self.cipher.key,
)
def __str__(self):
return "{} ({}:{})".format(self.device_id, self.host, self.port)
async def async_connect(self, callback=None):
if self._connected:
return
sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
sock.settimeout(self.timeout)
_LOGGER.debug("Connecting to {}".format(self))
try:
sock.connect((self.host, self.port))
except socket.timeout as e:
raise ConnectionTimeoutException("Connection timed out") from e
self.reader, self.writer = await asyncio.open_connection(sock=sock)
self._connected = True
asyncio.ensure_future(self._async_handle_message())
asyncio.ensure_future(self._async_ping(self.ping_interval))
asyncio.ensure_future(self.async_get(callback))
async def async_disconnect(self):
_LOGGER.debug("Disconnected from {}".format(self))
self._connected = False
self.last_pong = 0
if self.writer is not None:
self.writer.close()
async def async_get(self, callback=None):
payload = {"gwId": self.gateway_id, "devId": self.device_id}
maybe_self = None if self.version < (3, 3) else self
message = Message(Message.GET_COMMAND, payload, encrypt_for=maybe_self)
return await message.async_send(self, callback)
async def async_set(self, dps, callback=None):
t = int(time.time())
payload = {"devId": self.device_id, "uid": "", "t": t, "dps": dps}
message = Message(Message.SET_COMMAND, payload, encrypt_for=self)
await message.async_send(self, callback)
def set(self, dps):
_call_async(self.async_set, dps)
async def _async_ping(self, ping_interval):
# print("ping")
self.last_ping = time.time()
maybe_self = None if self.version < (3, 3) else self
message = Message(Message.PING_COMMAND, sequence=0, encrypt_for=maybe_self)
await self._async_send(message)
await asyncio.sleep(ping_interval)
if self.last_pong < self.last_ping:
await self.async_disconnect()
else:
asyncio.ensure_future(self._async_ping(self.ping_interval))
async def _async_pong_received(self, message, device):
self.last_pong = time.time()
async def async_update_state(self, state_message, _):
self._dps.update(state_message.payload["dps"])
_LOGGER.info("Received updated state {}: {}".format(self, self._dps))
@property
def state(self):
return dict(self._dps)
@state.setter
def state_setter(self, new_values):
asyncio.ensure_future(self.async_set(new_values))
async def _async_handle_message(self):
try:
response_data = await self.reader.readuntil(MAGIC_SUFFIX_BYTES)
except socket.error as e:
_LOGGER.error("Connection to {} failed: {}".format(self, e))
asyncio.ensure_future(self.async_disconnect())
return
except asyncio.IncompleteReadError as e:
_LOGGER.error("Incomplete read from: {} : {}".format(self, e))
return
try:
message = Message.from_bytes(response_data, self.cipher)
except InvalidMessage as e:
_LOGGER.error("Invalid message from {}: {}".format(self, e))
except MessageDecodeFailed as e:
_LOGGER.error("Failed to decrypt message from {}".format(self))
else:
_LOGGER.debug("Received message from {}: {}".format(self, message))
for c in self._handlers.get(message.command, []):
asyncio.ensure_future(c(message, self))
asyncio.ensure_future(self._async_handle_message())
async def _async_send(self, message, retries=4):
try:
await self.async_connect()
except (socket.timeout, socket.error, OSError) as e:
if retries == 0:
raise ConnectionException(
"Failed to send data to {}".format(self)
) from e
await self.async_connect()
await self._async_send(message, retries=retries - 1)
_LOGGER.debug("Sending to {}: {}".format(self, message))
self.writer.write(message.bytes())

View File

@ -0,0 +1,223 @@
"""Original Work from here: Andre Borie https://gitlab.com/Rjevski/eufy-device-id-and-local-key-grabber"""
from hashlib import md5, sha256
import hmac
import json
import math
import random
import string
import time
import uuid
from cryptography.hazmat.backends.openssl import backend as openssl_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import requests
TUYA_INITIAL_BASE_URL = "https://a1.tuyaeu.com"
EUFY_HMAC_KEY = (
"A_cepev5pfnhua4dkqkdpmnrdxx378mpjr_s8x78u7xwymasd9kqa7a73pjhxqsedaj".encode()
)
def unpadded_rsa(key_exponent: int, key_n: int, plaintext: bytes) -> bytes:
keylength = math.ceil(key_n.bit_length() / 8)
input_nr = int.from_bytes(plaintext, byteorder="big")
crypted_nr = pow(input_nr, key_exponent, key_n)
return crypted_nr.to_bytes(keylength, byteorder="big")
def shuffled_md5(value: str) -> str:
_hash = md5(value.encode("utf-8")).hexdigest()
return _hash[8:16] + _hash[0:8] + _hash[24:32] + _hash[16:24]
TUYA_PASSWORD_INNER_CIPHER = Cipher(
algorithms.AES(
bytearray(
[36, 78, 109, 138, 86, 172, 135, 145, 36, 67, 45, 139, 108, 188, 162, 196]
)
),
modes.CBC(
bytearray(
[119, 36, 86, 242, 167, 102, 76, 243, 57, 44, 53, 151, 233, 62, 87, 71]
)
),
backend=openssl_backend,
)
DEFAULT_TUYA_HEADERS = {"User-Agent": "TY-UA=APP/Android/2.4.0/SDK/null"}
SIGNATURE_RELEVANT_PARAMETERS = {
"a",
"v",
"lat",
"lon",
"lang",
"deviceId",
"appVersion",
"ttid",
"isH5",
"h5Token",
"os",
"clientId",
"postData",
"time",
"requestId",
"et",
"n4h5",
"sid",
"sp",
}
DEFAULT_TUYA_QUERY_PARAMS = {
"appVersion": "2.4.0",
"deviceId": "",
"platform": "sdk_gphone64_arm64",
"clientId": "yx5v9uc3ef9wg3v9atje",
"lang": "en",
"osSystem": "12",
"os": "Android",
"timeZoneId": "Europe/London",
"ttid": "android",
"et": "0.0.1",
"sdkVersion": "3.0.8cAnker",
}
class TuyaAPISession:
username = None
country_code = None
session_id = None
def __init__(self, username, country_code):
self.session = requests.session()
self.session.headers = DEFAULT_TUYA_HEADERS.copy()
self.default_query_params = DEFAULT_TUYA_QUERY_PARAMS.copy()
self.default_query_params["deviceId"] = self.generate_new_device_id()
self.username = username
self.country_code = country_code
self.base_url = TUYA_INITIAL_BASE_URL
@staticmethod
def generate_new_device_id():
expected_length = 44
base64_characters = string.ascii_letters + string.digits
device_id_dependent_part = "8534c8ec0ed0"
return device_id_dependent_part + "".join(
random.choice(base64_characters)
for _ in range(expected_length - len(device_id_dependent_part))
)
@staticmethod
def get_signature(query_params: dict, encoded_post_data: str):
query_params = query_params.copy()
if encoded_post_data:
query_params["postData"] = encoded_post_data
sorted_pairs = sorted(query_params.items())
filtered_pairs = filter(
lambda p: p[0] and p[0] in SIGNATURE_RELEVANT_PARAMETERS, sorted_pairs
)
mapped_pairs = map(
# postData is pre-emptively hashed (for performance reasons?), everything else is included as-is
lambda p: p[0] + "=" + (shuffled_md5(p[1]) if p[0] == "postData" else p[1]),
filtered_pairs,
)
message = "||".join(mapped_pairs)
return hmac.HMAC(
key=EUFY_HMAC_KEY, msg=message.encode("utf-8"), digestmod=sha256
).hexdigest()
def _request(
self,
action: str,
version="1.0",
data: dict = None,
query_params: dict = None,
_requires_session=True,
):
if not self.session_id and _requires_session:
self.acquire_session()
current_time = time.time()
request_id = uuid.uuid4()
extra_query_params = {
"time": str(int(current_time)),
"requestId": str(request_id),
"a": action,
"v": version,
**(query_params or {}),
}
query_params = {**self.default_query_params, **extra_query_params}
encoded_post_data = json.dumps(data, separators=(",", ":")) if data else ""
resp = self.session.post(
self.base_url + "/api.json",
params={
**query_params,
"sign": self.get_signature(query_params, encoded_post_data),
},
data={"postData": encoded_post_data} if encoded_post_data else None,
)
resp.raise_for_status()
data = resp.json()
if "result" not in data:
raise Exception(
f"No 'result' key in the response - the entire response is {data}."
)
return data["result"]
def request_token(self, username, country_code):
return self._request(
action="tuya.m.user.uid.token.create",
data={"uid": username, "countryCode": country_code},
_requires_session=False,
)
def determine_password(self, username: str):
new_uid = username
padded_size = 16 * math.ceil(len(new_uid) / 16)
password_uid = new_uid.zfill(padded_size)
encryptor = TUYA_PASSWORD_INNER_CIPHER.encryptor()
encrypted_uid = encryptor.update(password_uid.encode("utf8"))
encrypted_uid += encryptor.finalize()
return md5(encrypted_uid.hex().upper().encode("utf-8")).hexdigest()
def request_session(self, username, country_code):
password = self.determine_password(username)
token_response = self.request_token(username, country_code)
encrypted_password = unpadded_rsa(
key_exponent=int(token_response["exponent"]),
key_n=int(token_response["publicKey"]),
plaintext=password.encode("utf-8"),
)
data = {
"uid": username,
"createGroup": True,
"ifencrypt": 1,
"passwd": encrypted_password.hex(),
"countryCode": country_code,
"options": '{"group": 1}',
"token": token_response["token"],
}
session_response = self._request(
action="tuya.m.user.uid.password.login.reg",
data=data,
_requires_session=False,
)
return session_response
def acquire_session(self):
session_response = self.request_session(self.username, self.country_code)
self.session_id = self.default_query_params["sid"] = session_response["sid"]
self.base_url = session_response["domain"]["mobileApiUrl"]
def list_homes(self):
return self._request(action="tuya.m.location.list", version="2.1")
def list_devices(self, home_id: str):
return self._request(
action="tuya.m.my.group.device.list",
version="1.0",
query_params={"gid": home_id},
)

View File

@ -0,0 +1,475 @@
# Copyright 2022 Brendan McCluskey
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Eufy Robovac sensor platform."""
from __future__ import annotations
from collections.abc import Mapping
from datetime import timedelta
import logging
import asyncio
import base64
import json
import time
import ast
from typing import Any
from homeassistant.components.vacuum import StateVacuumEntity, VacuumEntityFeature
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import (
CONNECTION_NETWORK_MAC,
)
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.const import (
CONF_ACCESS_TOKEN,
CONF_MODEL,
CONF_NAME,
CONF_ID,
CONF_IP_ADDRESS,
CONF_DESCRIPTION,
CONF_MAC,
)
from .const import CONF_VACS, DOMAIN
from .tuyalocalapi import TuyaDevice
from homeassistant.const import ATTR_BATTERY_LEVEL
ATTR_BATTERY_ICON = "battery_icon"
ATTR_FAN_SPEED = "fan_speed"
ATTR_FAN_SPEED_LIST = "fan_speed_list"
ATTR_STATUS = "status"
ATTR_ERROR_CODE = "error_code"
ATTR_MODEL_CODE = "model_code"
ATTR_CLEANING_AREA = "cleaning_area"
ATTR_CLEANING_TIME = "cleaning_time"
ATTR_AUTO_RETURN = "auto_return"
ATTR_DO_NOT_DISTURB = "do_not_disturb"
ATTR_BOOST_IQ = "boost_iq"
ATTR_CONSUMABLES = "consumables"
ATTR_MODE = "mode"
_LOGGER = logging.getLogger(__name__)
# Time between updating data from GitHub
REFRESH_RATE = 20
SCAN_INTERVAL = timedelta(seconds=REFRESH_RATE)
class robovac(TuyaDevice):
""""""
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Initialize my test integration 2 config entry."""
# print("vacuum:async_setup_entry")
vacuums = config_entry.data[CONF_VACS]
# print("Vac:", vacuums)
for item in vacuums:
item = vacuums[item]
# print("item")
async_add_entities([RoboVacEntity(item)])
class RoboVacEntity(StateVacuumEntity):
"""Eufy Robovac version of a Vacuum entity"""
_attr_should_poll = True
_attr_access_token: str | None = None
_attr_ip_address: str | None = None
_attr_model_code: str | None = None
_attr_cleaning_area: str | None = None
_attr_cleaning_time: str | None = None
_attr_auto_return: str | None = None
_attr_do_not_disturb: str | None = None
_attr_boost_iq: str | None = None
_attr_consumables: str | None = None
_attr_mode: str | None = None
@property
def mode(self) -> str | None:
"""Return the cleaning mode of the vacuum cleaner."""
return self._attr_mode
@property
def consumables(self) -> str | None:
"""Return the consumables status of the vacuum cleaner."""
return self._attr_consumables
@property
def cleaning_area(self) -> str | None:
"""Return the cleaning area of the vacuum cleaner."""
return self._attr_cleaning_area
@property
def cleaning_time(self) -> str | None:
"""Return the cleaning time of the vacuum cleaner."""
return self._attr_cleaning_time
@property
def auto_return(self) -> str | None:
"""Return the auto_return mode of the vacuum cleaner."""
return self._attr_auto_return
@property
def do_not_disturb(self) -> str | None:
"""Return the do not disturb mode of the vacuum cleaner."""
return self._attr_do_not_disturb
@property
def boost_iq(self) -> str | None:
"""Return the boost iq mode of the vacuum cleaner."""
return self._attr_boost_iq
@property
def model_code(self) -> str | None:
"""Return the model code of the vacuum cleaner."""
return self._attr_model_code
@property
def access_token(self) -> str | None:
"""Return the fan speed of the vacuum cleaner."""
return self._attr_access_token
@property
def ip_address(self) -> str | None:
"""Return the ip address of the vacuum cleaner."""
return self._attr_ip_address
@property
def state_attributes(self) -> dict[str, Any]:
"""Return the state attributes of the vacuum cleaner."""
data: dict[str, Any] = {}
if self.supported_features & VacuumEntityFeature.BATTERY:
data[ATTR_BATTERY_LEVEL] = self.battery_level
data[ATTR_BATTERY_ICON] = self.battery_icon
if self.supported_features & VacuumEntityFeature.FAN_SPEED:
data[ATTR_FAN_SPEED] = self.fan_speed
if self.supported_features & VacuumEntityFeature.STATUS:
data[ATTR_STATUS] = self.status
data[ATTR_CLEANING_AREA] = self.cleaning_area
data[ATTR_CLEANING_TIME] = self.cleaning_time
data[ATTR_AUTO_RETURN] = self.auto_return
data[ATTR_DO_NOT_DISTURB] = self.do_not_disturb
data[ATTR_BOOST_IQ] = self.boost_iq
data[ATTR_CONSUMABLES] = self.consumables
data[ATTR_MODE] = self.mode
return data
@property
def capability_attributes(self) -> Mapping[str, Any] | None:
"""Return capability attributes."""
if self.supported_features & VacuumEntityFeature.FAN_SPEED:
return {
ATTR_FAN_SPEED_LIST: self.fan_speed_list,
# CONF_ACCESS_TOKEN: self.access_token,
CONF_IP_ADDRESS: self.ip_address,
ATTR_MODEL_CODE: self.model_code,
}
else:
return {
# CONF_ACCESS_TOKEN: self.access_token,
CONF_IP_ADDRESS: self.ip_address,
ATTR_MODEL_CODE: self.model_code,
}
def __init__(self, item) -> None:
# print("vacuum:RoboVacEntity")
# print("init_item", item)
"""Initialize mytest2 Sensor."""
super().__init__()
self._attr_name = item[CONF_NAME]
self._attr_unique_id = item[CONF_ID]
self._attr_supported_features = 4084
self._attr_model_code = item[CONF_MODEL]
self._attr_ip_address = item[CONF_IP_ADDRESS]
self._attr_access_token = item[CONF_ACCESS_TOKEN]
if self.model_code in [
"T2103",
"T2117",
"T2118",
"T2119",
"T2120",
"T2123",
"T2128",
"T2130",
]: # C
self._attr_fan_speed_list = ["No Suction", "Standard", "Boost IQ", "Max"]
elif self.model_code in ["T1250", "T2250", "T2251", "T2252", "T2253"]: # G
self._attr_fan_speed_list = ["Standard", "Turbo", "Max", "Boost IQ"]
elif self.model_code in ["T2262"]: # X
self._attr_fan_speed_list = ["Pure", "Standard", "Turbo", "Max"]
else:
self._attr_fan_speed_list = ["Standard"]
self._attr_mode = None
self._attr_consumables = None
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, item[CONF_ID])},
name=item[CONF_NAME],
manufacturer="Eufy",
model=item[CONF_DESCRIPTION],
connections=[
(CONNECTION_NETWORK_MAC, item[CONF_MAC]),
],
access_token=item[CONF_ACCESS_TOKEN],
ip_address=item[CONF_IP_ADDRESS],
)
self.vacuum = robovac(
device_id=self.unique_id,
host=self.ip_address,
local_key=self.access_token,
timeout=2,
ping_interval=10
# ping_interval=REFRESH_RATE / 2,
)
self.error_code = None
self.tuya_state = None
self.tuyastatus = None
print("vac:", self.vacuum)
async def async_update(self):
"""Synchronise state from the vacuum."""
print("update:", self.name)
if self.ip_address == "":
return
await self.vacuum.async_get()
self.tuyastatus = self.vacuum._dps
print("Tuya local API Result:", self.tuyastatus)
# for 15C
self._attr_battery_level = self.tuyastatus.get("104")
self.tuya_state = self.tuyastatus.get("15")
self.error_code = self.tuyastatus.get("106")
self._attr_mode = self.tuyastatus.get("5")
self._attr_fan_speed = self.tuyastatus.get("102")
if self.fan_speed == "No_suction":
self._attr_fan_speed = "No Suction"
elif self.fan_speed == "Boost_IQ":
self._attr_fan_speed = "Boost IQ"
elif self.fan_speed == "Quiet":
self._attr_fan_speed = "Pure"
# for G30
self._attr_cleaning_area = self.tuyastatus.get("110")
self._attr_cleaning_time = self.tuyastatus.get("109")
self._attr_auto_return = self.tuyastatus.get("135")
self._attr_do_not_disturb = self.tuyastatus.get("107")
if self.tuyastatus.get("142") is not None:
self._attr_consumables = ast.literal_eval(
base64.b64decode(self.tuyastatus.get("142")).decode("ascii")
)["consumable"]["duration"]
print(self.consumables)
# For X8
self._attr_boost_iq = self.tuyastatus.get("118")
# self.map_data = self.tuyastatus.get("121")
# self.erro_msg? = self.tuyastatus.get("124")
if self.tuyastatus.get("116") is not None:
self._attr_consumables = ast.literal_eval(
base64.b64decode(self.tuyastatus.get("116")).decode("ascii")
)["consumable"]["duration"]
print(self.consumables)
@property
def status(self):
"""Return the status of the vacuum cleaner."""
print("status:", self.error_code, self.tuya_state)
if self.ip_address == "":
return "Error: Set the IP Address"
if type(self.error_code) is not None and self.error_code not in [0, "no_error"]:
if self.error_code == 1:
return "Error: Front bumper stuck"
elif self.error_code == 2:
return "Error: Wheel stuck"
elif self.error_code == 3:
return "Error: Side brush"
elif self.error_code == 4:
return "Error: Rolling brush bar stuck"
elif self.error_code == 5:
return "Error: Device trapped"
elif self.error_code == 6:
return "Error: Device trapped"
elif self.error_code == 7:
return "Error: Wheel suspended"
elif self.error_code == 8:
return "Error: Low battery"
elif self.error_code == 9:
return "Error: Magnetic boundary"
elif self.error_code == 12:
return "Error: Right wall sensor"
elif self.error_code == 13:
return "Error: Device tilted"
elif self.error_code == 14:
return "Error: Insert dust collector"
elif self.error_code == 17:
return "Error: Restricted area detected"
elif self.error_code == 18:
return "Error: Laser cover stuck"
elif self.error_code == 19:
return "Error: Laser sesor stuck"
elif self.error_code == 20:
return "Error: Laser sensor blocked"
elif self.error_code == 21:
return "Error: Base blocked"
elif self.error_code == "S1":
return "Error: Battery"
elif self.error_code == "S2":
return "Error: Wheel Module"
elif self.error_code == "S3":
return "Error: Side Brush"
elif self.error_code == "S4":
return "Error: Suction Fan"
elif self.error_code == "S5":
return "Error: Rolling Brush"
elif self.error_code == "S8":
return "Error: Path Tracking Sensor"
elif self.error_code == "Wheel_stuck":
return "Error: Wheel stuck"
elif self.error_code == "R_brush_stuck":
return "Error: Rolling brush stuck"
elif self.error_code == "Crash_bar_stuck":
return "Error: Front bumper stuck"
elif self.error_code == "sensor_dirty":
return "Error: Sensor dirty"
elif self.error_code == "N_enough_pow":
return "Error: Low battery"
elif self.error_code == "Stuck_5_min":
return "Error: Device trapped"
elif self.error_code == "Fan_stuck":
return "Error: Fan stuck"
elif self.error_code == "S_brush_stuck":
return "Error: Side brush stuck"
else:
return "Error: " + str(self.error_code)
elif self.tuya_state == "Running":
return "Cleaning"
elif self.tuya_state == "Locating":
return "Locating"
elif self.tuya_state == "remote":
return "Cleaning"
elif self.tuya_state == "Charging":
return "Charging"
elif self.tuya_state == "completed":
return "Docked"
elif self.tuya_state == "Recharge":
return "Returning"
elif self.tuya_state == "Sleeping":
return "Sleeping"
elif self.tuya_state == "standby":
return "Standby"
else:
return "Cleaning"
async def async_locate(self, **kwargs):
"""Locate the vacuum cleaner."""
print("Locate Pressed")
_LOGGER.info("Locate Pressed")
if self.tuyastatus.get("103"):
await self.vacuum.async_set({"103": False}, None)
else:
await self.vacuum.async_set({"103": True}, None)
async def async_return_to_base(self, **kwargs):
"""Set the vacuum cleaner to return to the dock."""
print("Return home Pressed")
_LOGGER.info("Return home Pressed")
await self.vacuum.async_set({"101": True}, None)
await asyncio.sleep(1)
self.async_update
async def async_start_pause(self, **kwargs):
"""Pause the cleaning task or resume it."""
print("Start/Pause Pressed")
_LOGGER.info("Start/Pause Pressed")
if self.tuyastatus.get("2") or self.tuya_state == "Recharge":
await self.vacuum.async_set({"2": False}, None)
else:
if self.mode == "Nosweep":
self._attr_mode = "auto"
elif self.mode == "room" and (
self.status == "Charging" or self.status == "completed"
):
self._attr_mode = "auto"
await self.vacuum.async_set({"5": self.mode}, None)
await asyncio.sleep(1)
self.async_update
async def async_clean_spot(self, **kwargs):
"""Perform a spot clean-up."""
print("Spot Clean Pressed")
_LOGGER.info("Spot Clean Pressed")
await self.vacuum.async_set({"5": "Spot"}, None)
await asyncio.sleep(1)
self.async_update
async def async_set_fan_speed(self, fan_speed, **kwargs):
"""Set fan speed."""
print("Fan Speed Selected", fan_speed)
_LOGGER.info("Fan Speed Selected")
if fan_speed == "No Suction":
fan_speed = "No_suction"
elif fan_speed == "Boost IQ":
fan_speed = "Boost_IQ"
elif fan_speed == "Pure":
fan_speed = "Quiet"
await self.vacuum.async_set({"102": fan_speed}, None)
await asyncio.sleep(1)
self.async_update
async def async_send_command(
self, command: str, params: dict | list | None = None, **kwargs
) -> None:
"""Send a command to a vacuum cleaner."""
_LOGGER.info("Send Command %s Pressed", command)
if command == "edgeClean":
await self.vacuum.async_set({"5": "Edge"}, None)
elif command == "smallRoomClean":
await self.vacuum.async_set({"5": "SmallRoom"}, None)
elif command == "autoClean":
await self.vacuum.async_set({"5": "auto"}, None)
elif command == "autoReturn":
if self.auto_return:
await self.vacuum.async_set({"135": False}, None)
else:
await self.vacuum.async_set({"135": True}, None)
elif command == "doNotDisturb":
if self.do_not_disturb:
await self.vacuum.async_set({"139": "MEQ4MDAwMDAw"}, None)
await self.vacuum.async_set({"107": False}, None)
else:
await self.vacuum.async_set({"139": "MTAwMDAwMDAw"}, None)
await self.vacuum.async_set({"107": True}, None)
elif command == "boostIQ":
if self.boost_iq:
await self.vacuum.async_set({"118": False}, None)
else:
await self.vacuum.async_set({"118": True}, None)
elif command == "roomClean":
roomIds = params.get("roomIds", [1])
count = params.get("count", 1)
clean_request = {"roomIds": roomIds, "cleanTimes": count}
method_call = {
"method": "selectRoomsClean",
"data": clean_request,
"timestamp": round(time.time() * 1000),
}
json_str = json.dumps(method_call, separators=(",", ":"))
base64_str = base64.b64encode(json_str.encode("utf8")).decode("utf8")
_LOGGER.info("roomClean call %s", json_str)
await self.vacuum.async_set({"124": base64_str}, None)