eufy-robovac/custom_components/robovac/tuyawebapi.py

245 lines
7.7 KiB
Python

"""Original Work from here: Andre Borie https://gitlab.com/Rjevski/eufy-device-id-and-local-key-grabber"""
from hashlib import md5, sha256
import hmac
import json
import math
import random
import string
import time
import uuid
from cryptography.hazmat.backends.openssl import backend as openssl_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import requests
TUYA_INITIAL_BASE_URL = "https://a1.tuyaeu.com"
EUFY_HMAC_KEY = (
"A_cepev5pfnhua4dkqkdpmnrdxx378mpjr_s8x78u7xwymasd9kqa7a73pjhxqsedaj".encode()
)
def unpadded_rsa(key_exponent: int, key_n: int, plaintext: bytes) -> bytes:
keylength = math.ceil(key_n.bit_length() / 8)
input_nr = int.from_bytes(plaintext, byteorder="big")
crypted_nr = pow(input_nr, key_exponent, key_n)
return crypted_nr.to_bytes(keylength, byteorder="big")
def shuffled_md5(value: str) -> str:
_hash = md5(value.encode("utf-8")).hexdigest()
return _hash[8:16] + _hash[0:8] + _hash[24:32] + _hash[16:24]
TUYA_PASSWORD_INNER_CIPHER = Cipher(
algorithms.AES(
bytearray(
[36, 78, 109, 138, 86, 172, 135, 145, 36, 67, 45, 139, 108, 188, 162, 196]
)
),
modes.CBC(
bytearray(
[119, 36, 86, 242, 167, 102, 76, 243, 57, 44, 53, 151, 233, 62, 87, 71]
)
),
backend=openssl_backend,
)
DEFAULT_TUYA_HEADERS = {"User-Agent": "TY-UA=APP/Android/2.4.0/SDK/null"}
SIGNATURE_RELEVANT_PARAMETERS = {
"a",
"v",
"lat",
"lon",
"lang",
"deviceId",
"appVersion",
"ttid",
"isH5",
"h5Token",
"os",
"clientId",
"postData",
"time",
"requestId",
"et",
"n4h5",
"sid",
"sp",
}
DEFAULT_TUYA_QUERY_PARAMS = {
"appVersion": "2.4.0",
"deviceId": "",
"platform": "sdk_gphone64_arm64",
"clientId": "yx5v9uc3ef9wg3v9atje",
"lang": "en",
"osSystem": "12",
"os": "Android",
"timeZoneId": "",
"ttid": "android",
"et": "0.0.1",
"sdkVersion": "3.0.8cAnker",
}
class TuyaAPISession:
username = None
country_code = None
session_id = None
def __init__(self, username, region, timezone):
self.session = requests.session()
self.session.headers = DEFAULT_TUYA_HEADERS.copy()
self.default_query_params = DEFAULT_TUYA_QUERY_PARAMS.copy()
self.default_query_params["deviceId"] = self.generate_new_device_id()
self.username = username
self.country_code = self.getCountryCode(region)
self.base_url = {
"EU": "https://a1.tuyaeu.com",
"AY": "https://a1.tuyacn.com",
}.get(region, "https://a1.tuyaus.com")
DEFAULT_TUYA_QUERY_PARAMS["timeZoneId"] = timezone
@staticmethod
def generate_new_device_id():
expected_length = 44
base64_characters = string.ascii_letters + string.digits
device_id_dependent_part = "8534c8ec0ed0"
return device_id_dependent_part + "".join(
random.choice(base64_characters)
for _ in range(expected_length - len(device_id_dependent_part))
)
@staticmethod
def get_signature(query_params: dict, encoded_post_data: str):
query_params = query_params.copy()
if encoded_post_data:
query_params["postData"] = encoded_post_data
sorted_pairs = sorted(query_params.items())
filtered_pairs = filter(
lambda p: p[0] and p[0] in SIGNATURE_RELEVANT_PARAMETERS, sorted_pairs
)
mapped_pairs = map(
# postData is pre-emptively hashed (for performance reasons?), everything else is included as-is
lambda p: p[0] + "=" + (shuffled_md5(p[1]) if p[0] == "postData" else p[1]),
filtered_pairs,
)
message = "||".join(mapped_pairs)
return hmac.HMAC(
key=EUFY_HMAC_KEY, msg=message.encode("utf-8"), digestmod=sha256
).hexdigest()
def _request(
self,
action: str,
version="1.0",
data: dict = None,
query_params: dict = None,
_requires_session=True,
):
if not self.session_id and _requires_session:
self.acquire_session()
current_time = time.time()
request_id = uuid.uuid4()
extra_query_params = {
"time": str(int(current_time)),
"requestId": str(request_id),
"a": action,
"v": version,
**(query_params or {}),
}
query_params = {**self.default_query_params, **extra_query_params}
encoded_post_data = json.dumps(data, separators=(",", ":")) if data else ""
resp = self.session.post(
self.base_url + "/api.json",
params={
**query_params,
"sign": self.get_signature(query_params, encoded_post_data),
},
data={"postData": encoded_post_data} if encoded_post_data else None,
)
resp.raise_for_status()
data = resp.json()
if "result" not in data:
raise Exception(
f"No 'result' key in the response - the entire response is {data}."
)
return data["result"]
def request_token(self, username, country_code):
return self._request(
action="tuya.m.user.uid.token.create",
data={"uid": username, "countryCode": country_code},
_requires_session=False,
)
def determine_password(self, username: str):
new_uid = username
padded_size = 16 * math.ceil(len(new_uid) / 16)
password_uid = new_uid.zfill(padded_size)
encryptor = TUYA_PASSWORD_INNER_CIPHER.encryptor()
encrypted_uid = encryptor.update(password_uid.encode("utf8"))
encrypted_uid += encryptor.finalize()
return md5(encrypted_uid.hex().upper().encode("utf-8")).hexdigest()
def request_session(self, username, password, country_code):
token_response = self.request_token(username, country_code)
encrypted_password = unpadded_rsa(
key_exponent=int(token_response["exponent"]),
key_n=int(token_response["publicKey"]),
plaintext=password.encode("utf-8"),
)
data = {
"uid": username,
"createGroup": True,
"ifencrypt": 1,
"passwd": encrypted_password.hex(),
"countryCode": country_code,
"options": '{"group": 1}',
"token": token_response["token"],
}
try:
return self._request(
action="tuya.m.user.uid.password.login.reg",
data=data,
_requires_session=False,
)
except Exception as e:
error_password = md5("12345678".encode("utf8")).hexdigest()
if password != error_password:
return self.request_session(username, error_password, country_code)
else:
raise e
def acquire_session(self):
password = self.determine_password(self.username)
session_response = self.request_session(
self.username, password, self.country_code
)
self.session_id = self.default_query_params["sid"] = session_response["sid"]
self.base_url = session_response["domain"]["mobileApiUrl"]
self.country_code = (
session_response["phoneCode"]
if session_response["phoneCode"]
else self.getCountryCode(session_response["domain"]["regionCode"])
)
def list_homes(self):
return self._request(action="tuya.m.location.list", version="2.1")
def get_device(self, devId):
return self._request(
action="tuya.m.device.get",
version="1.0",
data={"devId": devId}
)
def getCountryCode(self, region_code):
return {"EU": "44", "AY": "86"}.get(region_code, "1")