diff --git a/custom_components/localtuya/config_flow.py b/custom_components/localtuya/config_flow.py index 87c82a290..9c6adced9 100644 --- a/custom_components/localtuya/config_flow.py +++ b/custom_components/localtuya/config_flow.py @@ -1102,8 +1102,12 @@ def dps_string_list(dps_data: dict[str, dict], cloud_dp_codes: dict[str, dict]) for dp, func in cloud_dp_codes.items(): # Default Manual dp value is -1, we will replace it if it in cloud. add_dp = dp not in dps_data or dps_data.get(dp) == -1 - if add_dp and ((value := func.get("value")) or value is not None): - dps_data[dp] = f"{value}, cloud pull" + if add_dp: + value = func.get("value", "null") + if func.get("accessMode", "null") == "wr": + dps_data[dp] = f"{value}, write-only" + else: + dps_data[dp] = f"{value}, cloud pull" for dp, value in dps_data.items(): if (dp_data := cloud_dp_codes.get(dp)) and (code := dp_data.get("code")): diff --git a/custom_components/localtuya/coordinator.py b/custom_components/localtuya/coordinator.py index 442440d8d..5da4f565a 100644 --- a/custom_components/localtuya/coordinator.py +++ b/custom_components/localtuya/coordinator.py @@ -70,6 +70,7 @@ def __init__( self._hass_entry: HassLocalTuyaData = hass.data[DOMAIN][entry.entry_id] self._device_config = DeviceConfig(device_config.copy()) self.id = self._device_config.id + self.write_only = False # Used for BLE bulbs, see LocalTuyaLight self._status = {} self._interface = None @@ -414,6 +415,9 @@ async def set_status(self): payload, self._pending_status = self._pending_status.copy(), {} try: await self._interface.set_dps(payload, cid=self._node_id) + if self.write_only: + # The device never replies, process its status change now + self.status_updated(payload) except Exception as ex: # pylint: disable=broad-except self.debug(f"Failed to set values {payload} --> {ex}", force=True) elif not self.connected: diff --git a/custom_components/localtuya/core/ha_entities/base.py b/custom_components/localtuya/core/ha_entities/base.py index 78d987606..762a37cca 100644 --- a/custom_components/localtuya/core/ha_entities/base.py +++ b/custom_components/localtuya/core/ha_entities/base.py @@ -188,6 +188,7 @@ class DPCode(StrEnum): COLOR_DATA_V2 = "color_data_v2" COLOUR_DATA = "colour_data" # Colored light mode COLOUR_DATA_HSV = "colour_data_hsv" # Colored light mode + COLOUR_DATA_RAW = "colour_data_raw" # Colored light mode for BLE COLOUR_DATA_V2 = "colour_data_v2" # Colored light mode COMPRESSOR_COMMAND = "compressor_command" CONCENTRATION_SET = "concentration_set" # Concentration setting @@ -504,6 +505,7 @@ class DPCode(StrEnum): SCENE_8 = "scene_8" SCENE_9 = "scene_9" SCENE_DATA = "scene_data" # Colored light mode + SCENE_DATA_RAW = "scene_data_raw" # Colored light mode for BLE SCENE_DATA_V2 = "scene_data_v2" # Colored light mode SEEK = "seek" SENS = "sens" # Ikuu SXSEN003PIR IP65 Motion Detector (Wi-Fi) diff --git a/custom_components/localtuya/core/ha_entities/lights.py b/custom_components/localtuya/core/ha_entities/lights.py index dfc580b9e..1f3ecb747 100644 --- a/custom_components/localtuya/core/ha_entities/lights.py +++ b/custom_components/localtuya/core/ha_entities/lights.py @@ -84,8 +84,8 @@ def localtuya_light( color_mode=DPCode.WORK_MODE, brightness=(DPCode.BRIGHT_VALUE_V2, DPCode.BRIGHT_VALUE), color_temp=(DPCode.TEMP_VALUE_V2, DPCode.TEMP_VALUE), - color=(DPCode.COLOUR_DATA_V2, DPCode.COLOUR_DATA), - scene=(DPCode.SCENE_DATA_V2, DPCode.SCENE_DATA), + color=(DPCode.COLOUR_DATA_V2, DPCode.COLOUR_DATA_RAW, DPCode.COLOUR_DATA), + scene=(DPCode.SCENE_DATA_V2, DPCode.SCENE_DATA_RAW, DPCode.SCENE_DATA), custom_configs=localtuya_light(29, 1000, 2700, 6500, False, True), ), # Not documented diff --git a/custom_components/localtuya/entity.py b/custom_components/localtuya/entity.py index b3bc9b74e..844405652 100644 --- a/custom_components/localtuya/entity.py +++ b/custom_components/localtuya/entity.py @@ -284,6 +284,20 @@ def dp_value(self, key, default=None) -> Any | None: return value + def dp_code(self, key): + """Returns DP code, if available from the cloud""" + dp_id = self._config.get(key) + if dp_id is None: + return None + for dp in self._device_config.dps_strings: + all = dp.split(" ") + if dp_id == all[0]: + if len(all) > 3 and all[2] == "code:": + return all[3] + else: + return None + return None + def status_updated(self) -> None: """Device status was updated. diff --git a/custom_components/localtuya/light.py b/custom_components/localtuya/light.py index 8818060b8..c2e386c3a 100644 --- a/custom_components/localtuya/light.py +++ b/custom_components/localtuya/light.py @@ -1,5 +1,6 @@ """Platform to locally control Tuya-based light devices.""" +import base64 import logging import textwrap import homeassistant.util.color as color_util @@ -20,6 +21,7 @@ ) from homeassistant.const import CONF_BRIGHTNESS, CONF_COLOR_TEMP, CONF_SCENE +from .core.ha_entities.base import DPCode from .config_flow import col_to_select from .entity import LocalTuyaEntity, async_setup_entry from .const import ( @@ -51,11 +53,11 @@ MODE_SCENE = "scene" MODE_WHITE = "white" -SCENE_CUSTOM = "Custom" SCENE_MUSIC = "Music" MODES_SET = {"Colour, Music, Scene and White": 0, "Manual, Music, Scene and White": 1} +# https://developer.tuya.com/en/docs/iot/dj?id=K9i5ql3v98hn3#title-10-scene_data SCENE_LIST_RGBW_255 = { "Night": "bd76000168ffff", "Read": "fffcf70168ffff", @@ -66,20 +68,45 @@ "Scenario 3": "scene_3", "Scenario 4": "scene_4", } + +# https://developer.tuya.com/en/docs/iot/dj?id=K9i5ql3v98hn3#title-11-scene_data_v2 SCENE_LIST_RGBW_1000 = { - "Night": "000e0d0000000000000000c80000", - "Read": "010e0d0000000000000003e801f4", + "Night 1": "000e0d0000000000000000c80000", + "Night 2": "000e0d00002e03e802cc00000000", + "Read 1": "010e0d0000000000000003e801f4", + "Read 2": "010e0d000084000003e800000000", "Meeting": "020e0d0000000000000003e803e8", - "Leasure": "030e0d0000000000000001f401f4", + "Working": "020e0d00001403e803e800000000", + "Leasure 1": "030e0d0000000000000001f401f4", + "Leisure 2": "030e0d0000e80383031c00000000", "Soft": "04464602007803e803e800000000464602007803e8000a00000000", "Rainbow": "05464601000003e803e800000000464601007803e803e80000000046460100f003e803" + "e800000000", - "Shine": "06464601000003e803e800000000464601007803e803e80000000046460100f003e803e8" - + "00000000", + "Colorful": "06464601000003e803e800000000464601007803e803e80000000046460100f003e80" + + "3e800000000464601003d03e803e80000000046460100ae03e803e800000000464601011303e803" + + "e800000000", "Beautiful": "07464602000003e803e800000000464602007803e803e80000000046460200f003e8" + "03e800000000464602003d03e803e80000000046460200ae03e803e800000000464602011303e80" + "3e800000000", + "Forest": "19464601007803e803e800000000464602006e0320025800000000464602005a038403e8" + + "00000000", + "Dream": "1c4646020104032003e800000000464602011802bc03e800000000464602011303e803e80" + + "0000000", + "F Style": "1e323201015e01f403e800000000323202003201f403e80000000032320200a001f403e" + + "800000000", + "A Style": "1f46460100dc02bc03e800000000464602006e03200258000000004646020014038403e" + + "800000000464601012703e802ee0000000046460100000384028a00000000", + "Halloween": "28464601011303e803e800000000464601001e03e803e800000000", + "Christmas": "225a5a0100f003e803e8000000005a5a01003d03e803e800000000464601000003e80" + + "3e8000000005a5a0100ae03e803e8000000005a5a01011303e803e800000000464601007803e803e" + + "800000000", + "Birthday": "20646401003d03e803e800000000646401007803e803e8000000005a5a01011303e803" + + "e8000000005a5a0100ae03e803e800000000646401003201f403e800000000646401000003e803e8" + + "00000000", + "Wedding Anniversary": "21323202015e01f403e800000000323202011303e803e800000000", } + +# Same format as SCENE_LIST_RGBW_1000 SCENE_LIST_RGB_1000 = { "Night": "000e0d00002e03e802cc00000000", "Read": "010e0d000084000003e800000000", @@ -96,6 +123,16 @@ + "0000000", } +# BASE64-encoded 1-byte numbers. +# Other numbers up to 0x10 were tested to no avail. +SCENE_LIST_RGBW_BLE = { + "Good Night": "AA==", # 00 + "Leisure": "Aw==", # 01 + "Gorgeous": "Bw==", # 07 + "Dream": "HA==", # 1C + "Sunflower": "GA==", # 18 + "Grassland": "BA==", # 04 +} @dataclass(frozen=True) class Mode: @@ -162,8 +199,13 @@ def __init__( ): """Initialize the Tuya light.""" super().__init__(device, config_entry, lightid, _LOGGER, **kwargs) - self._state = False - self._brightness = None + # Light is an active device (mains powered). It should be able + # to respond at any time. But Tuya BLE bulbs are write-only. + self._write_only = self._is_write_only + if self._write_only: + self._device.write_only = self._write_only + + self._state = None self._color_temp = None self._lower_brightness = int( self._config.get(CONF_BRIGHTNESS_LOWER, DEFAULT_LOWER_BRIGHTNESS) @@ -171,6 +213,7 @@ def __init__( self._upper_brightness = int( self._config.get(CONF_BRIGHTNESS_UPPER, DEFAULT_UPPER_BRIGHTNESS) ) + self._brightness = None if not self._write_only else self._upper_brightness self._upper_color_temp = self._upper_brightness self._min_kelvin = int( self._config.get(CONF_COLOR_TEMP_MIN_KELVIN, DEFAULT_MIN_KELVIN) @@ -187,28 +230,70 @@ def __init__( self._effect_list = [] self._scenes = {} - custom_scenes = False if self.has_config(CONF_SCENE): - if self.has_config(CONF_SCENE_VALUES): - custom_scenes = True + if self.has_config(CONF_SCENE_VALUES) and len(self._config.get(CONF_SCENE_VALUES)): values_list = list(self._config.get(CONF_SCENE_VALUES)) values_name = list(self._config.get(CONF_SCENE_VALUES).values()) self._scenes = dict(zip(values_name, values_list)) - elif int(self._config.get(CONF_SCENE)) < 20: - self._scenes = SCENE_LIST_RGBW_255 - elif self._config.get(CONF_BRIGHTNESS) is None: - self._scenes = SCENE_LIST_RGB_1000 else: - self._scenes = SCENE_LIST_RGBW_1000 - - if not custom_scenes: - self._scenes = {**self._modes.as_dict(), **self._scenes} + scene_code = self.dp_code(CONF_SCENE) + if scene_code is None: + # Using fuzzy logic to detect scene data format + if self._write_only: # BLE bulbs + self._scenes = SCENE_LIST_RGBW_BLE + elif int(self._config.get(CONF_SCENE)) < 20: + self._scenes = SCENE_LIST_RGBW_255 + elif self._config.get(CONF_BRIGHTNESS) is None: + self._scenes = SCENE_LIST_RGB_1000 + else: + self._scenes = SCENE_LIST_RGBW_1000 + elif scene_code == DPCode.SCENE_DATA_V2: + self._scenes = SCENE_LIST_RGBW_1000 + elif scene_code == DPCode.SCENE_DATA_RAW: + self._scenes = SCENE_LIST_RGBW_BLE + elif scene_code == DPCode.SCENE_DATA: + self._scenes = SCENE_LIST_RGBW_255 + + self._scenes = {**self._modes.as_dict(), **self._scenes} self._effect_list = list(self._scenes.keys()) if self._config.get(CONF_MUSIC_MODE): self._effect_list.append(SCENE_MUSIC) + if self.has_config(CONF_COLOR): + color_code = self.dp_code(CONF_COLOR) + if color_code is None: + self.__to_color = self.__to_color_common + self.__from_color = self.__from_color_common + elif color_code in (DPCode.COLOUR_DATA_V2, DPCode.COLOR_DATA_V2): + self.__to_color = self.__to_color_v2 + self.__from_color = self.__from_color_v2 + elif color_code == DPCode.COLOUR_DATA_RAW: + self.__to_color = self.__to_color_raw + self.__from_color = self.__from_color_raw + elif color_code == DPCode.COLOUR_DATA: + self.__to_color = self.__to_color_ + self.__from_color = self.__from_color_ + else: + self.__to_color = self.__to_color_common + self.__from_color = self.__from_color_common + + @property + def _is_write_only(self): + """Return if this sub-device is write-only (BLE).""" + if not self._device.is_subdevice: + return False + for dp in self._device_config.dps_strings: + all = dp.split(" ") + if all[0] == self._dp_id: + if "write-only" in all or "cloud" in all: + return True + else: + break + # Setup without cloud? + return "0" in self._device_config.manual_dps.split(",") + @property def is_on(self): """Check if Tuya light is on.""" @@ -249,7 +334,7 @@ def hs_color(self): def color_temp(self): """Return the color_temp of the light.""" if self._color_temp is None: - return + return None if self.has_config(CONF_COLOR_TEMP): color_temp = ( self._upper_color_temp - self._color_temp @@ -364,8 +449,8 @@ def __is_color_rgb_encoded(self): def __find_scene_by_scene_data(self, data): return next( (item for item in self._effect_list if self._scenes.get(item) == data), - SCENE_CUSTOM, - ) + None, + ) if data is not None else None def __get_color_mode(self): return ( @@ -374,14 +459,98 @@ def __get_color_mode(self): else self._modes.white ) + def __to_color_raw(self, hs, brightness): + return base64.b64encode( +# BASE64-encoded 4-byte value: HHSL + bytes([ + round(hs[0]) // 256, + round(hs[0]) % 256, + round(hs[1]), + round(brightness * 100 / self._upper_brightness) + ]) + ).decode("ascii") + + def __to_color_(self, hs, brightness): +# https://developer.tuya.com/en/docs/iot/dj?id=K9i5ql3v98hn3#title-8-colour_data + return "{:04x}{:02x}{:02x}".format( + round(hs[0]), + round(hs[1] * 255 / 100), + round(brightness * 255 / self._upper_brightness) + ) + + def __to_color_v2(self, hs, brightness): +# https://developer.tuya.com/en/docs/iot/dj?id=K9i5ql3v98hn3#title-9-colour_data_v2 + return "{:04x}{:04x}{:04x}".format( + round(hs[0]), + round(hs[1] * 10.0), + brightness + ) + + def __to_color_common(self, hs, brightness): + """Converts HSB values to a string.""" + if self.__is_color_rgb_encoded(): + # Not documented format + rgb = color_util.color_hsv_to_RGB( + hs[0], hs[1], int(brightness * 100 / self._upper_brightness) + ) + return "{:02x}{:02x}{:02x}{:04x}{:02x}{:02x}".format( + round(rgb[0]), + round(rgb[1]), + round(rgb[2]), + round(hs[0]), + round(hs[1] * 255 / 100), + brightness, + ) + else: + return self.__to_color_v2(hs, brightness) + + def __from_color_raw(self, color): +# BASE64-encoded 4-byte value: HHSL + hsl = int.from_bytes( + base64.b64decode(color), byteorder='big', signed=False + ) + hue = hsl // 65536 + sat = (hsl // 256) % 256 + value = (hsl % 256) * self._upper_brightness / 100 + self._hs = [hue, sat] + self._brightness = value + + def __from_color_(self, color): +# https://developer.tuya.com/en/docs/iot/dj?id=K9i5ql3v98hn3#title-8-colour_data + hue, sat, value = [ + int(value, 16) for value in textwrap.wrap(color, 4) + ] + self._hs = [hue, sat * 100 / 255] + self._brightness = value * self._upper_brightness / 100 + + def __from_color_v2(self, color): +# https://developer.tuya.com/en/docs/iot/dj?id=K9i5ql3v98hn3#title-9-colour_data_v2 + hue, sat, value = [ + int(value, 16) for value in textwrap.wrap(color, 4) + ] + self._hs = [hue, sat / 10.0] + self._brightness = value + + def __from_color_common(self, color): + """Convert a string to HSL values.""" + if self.__is_color_rgb_encoded(): + hue = int(color[6:10], 16) + sat = int(color[10:12], 16) + value = int(color[12:14], 16) + self._hs = [hue, sat] + self._brightness = value + else: + self.__from_color_v2(color) + async def async_turn_on(self, **kwargs): """Turn on or control the light.""" states = {} - if not self.is_on: + if not self.is_on or self._write_only: states[self._dp_id] = True features = self.supported_features color_modes = self.supported_color_modes brightness = None + color_mode = None if ATTR_EFFECT in kwargs and (features & LightEntityFeature.EFFECT): effect = kwargs[ATTR_EFFECT] scene = self._scenes.get(effect) @@ -390,14 +559,14 @@ async def async_turn_on(self, **kwargs): self._modes.white, self._modes.color, ): - states[self._config.get(CONF_COLOR_MODE)] = scene + color_mode = scene else: - states[self._config.get(CONF_COLOR_MODE)] = self._modes.scene + color_mode = self._modes.scene states[self._config.get(CONF_SCENE)] = scene elif effect in self._modes.as_list(): - states[self._config.get(CONF_COLOR_MODE)] = effect + color_mode = effect elif effect == self._modes.music: - states[self._config.get(CONF_COLOR_MODE)] = self._modes.music + color_mode = self._modes.music if ATTR_BRIGHTNESS in kwargs and ( ColorMode.BRIGHTNESS in color_modes @@ -411,29 +580,12 @@ async def async_turn_on(self, **kwargs): self._lower_brightness, self._upper_brightness, ) - if self.is_white_mode or self.dp_value(CONF_COLOR) is None: - states[self._config.get(CONF_BRIGHTNESS)] = brightness + if self.is_color_mode and self._hs is not None: + states[self._config.get(CONF_COLOR)] = self.__to_color(self._hs, brightness) + color_mode = self._modes.color else: - if self.__is_color_rgb_encoded(): - rgb = color_util.color_hsv_to_RGB( - self._hs[0], - self._hs[1], - int(brightness * 100 / self._upper_brightness), - ) - color = "{:02x}{:02x}{:02x}{:04x}{:02x}{:02x}".format( - round(rgb[0]), - round(rgb[1]), - round(rgb[2]), - round(self._hs[0]), - round(self._hs[1] * 255 / 100), - brightness, - ) - else: - color = "{:04x}{:04x}{:04x}".format( - round(self._hs[0]), round(self._hs[1] * 10.0), brightness - ) - states[self._config.get(CONF_COLOR)] = color - states[self._config.get(CONF_COLOR_MODE)] = self._modes.color + states[self._config.get(CONF_BRIGHTNESS)] = brightness + color_mode = self._modes.white if ATTR_HS_COLOR in kwargs and ColorMode.HS in color_modes: if brightness is None: @@ -441,26 +593,10 @@ async def async_turn_on(self, **kwargs): hs = kwargs[ATTR_HS_COLOR] if hs[1] == 0 and self.has_config(CONF_BRIGHTNESS): states[self._config.get(CONF_BRIGHTNESS)] = brightness - states[self._config.get(CONF_COLOR_MODE)] = self._modes.white + color_mode = self._modes.white else: - if self.__is_color_rgb_encoded(): - rgb = color_util.color_hsv_to_RGB( - hs[0], hs[1], int(brightness * 100 / self._upper_brightness) - ) - color = "{:02x}{:02x}{:02x}{:04x}{:02x}{:02x}".format( - round(rgb[0]), - round(rgb[1]), - round(rgb[2]), - round(hs[0]), - round(hs[1] * 255 / 100), - brightness, - ) - else: - color = "{:04x}{:04x}{:04x}".format( - round(hs[0]), round(hs[1] * 10.0), brightness - ) - states[self._config.get(CONF_COLOR)] = color - states[self._config.get(CONF_COLOR_MODE)] = self._modes.color + states[self._config.get(CONF_COLOR)] = self.__to_color(hs, brightness) + color_mode = self._modes.color if ATTR_COLOR_TEMP in kwargs and ColorMode.COLOR_TEMP in color_modes: if brightness is None: @@ -477,10 +613,13 @@ async def async_turn_on(self, **kwargs): - (self._upper_color_temp / (self.max_mireds - self.min_mireds)) * (mired - self.min_mireds) ) - states[self._config.get(CONF_COLOR_MODE)] = self._modes.white + color_mode = self._modes.white states[self._config.get(CONF_BRIGHTNESS)] = brightness states[self._config.get(CONF_COLOR_TEMP)] = color_temp + if color_mode is not None: + states[self._config.get(CONF_COLOR_MODE)] = color_mode + await self._device.set_dps(states) async def async_turn_off(self, **kwargs): @@ -499,38 +638,23 @@ def status_updated(self): if ColorMode.HS in self.supported_color_modes: color = self.dp_value(CONF_COLOR) if color is not None and not self.is_white_mode: - if self.__is_color_rgb_encoded(): - hue = int(color[6:10], 16) - sat = int(color[10:12], 16) - value = int(color[12:14], 16) - self._hs = [hue, (sat * 100 / 255)] - self._brightness = value - else: - hue, sat, value = [ - int(value, 16) for value in textwrap.wrap(color, 4) - ] - self._hs = [hue, sat / 10.0] - self._brightness = value + self.__from_color(color) elif self._brightness is None: - self._brightness = 20 + self._brightness = self._upper_brightness if ColorMode.COLOR_TEMP in self.supported_color_modes: self._color_temp = self.dp_value(CONF_COLOR_TEMP) if self.is_scene_mode and supported & LightEntityFeature.EFFECT: - if self.dp_value(CONF_COLOR_MODE) != self._modes.scene: - self._effect = self.__find_scene_by_scene_data( - self.dp_value(CONF_COLOR_MODE) - ) + color_mode = self.dp_value(CONF_COLOR_MODE) + if color_mode != self._modes.scene: + self._effect = self.__find_scene_by_scene_data(color_mode) else: self._effect = self.__find_scene_by_scene_data( self.dp_value(CONF_SCENE) ) - if self._effect == SCENE_CUSTOM: - if SCENE_CUSTOM not in self._effect_list: - self._effect_list.append(SCENE_CUSTOM) - elif SCENE_CUSTOM in self._effect_list: - self._effect_list.remove(SCENE_CUSTOM) + if self._effect is None: + self._effect = self.__find_scene_by_scene_data(color_mode) if self.is_music_mode and supported & LightEntityFeature.EFFECT: self._effect = SCENE_MUSIC