diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 00000000..c8e0941c --- /dev/null +++ b/.coveragerc @@ -0,0 +1,18 @@ +[run] +branch = True +source = tinytuya +omit = + */__init__.py + tests/* + sandbox/* + build/* + setup.py + test.py + +[report] +exclude_lines = + pragma: no cover + if __name__ == .__main__. + if TYPE_CHECKING: + pass +show_missing = True diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml index ebf8e669..eeb3ba75 100644 --- a/.github/workflows/pylint.yml +++ b/.github/workflows/pylint.yml @@ -21,4 +21,4 @@ jobs: pip install -r requirements.txt - name: Analyzing the code with pylint run: | - pylint --recursive y -E tinytuya/ + pylint --recursive y -E --ignore scanner_async.py tinytuya/ diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 197aadc0..cb063b4f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -25,7 +25,28 @@ jobs: python -VV python -m site python -m pip install --upgrade pip setuptools wheel - python -m pip install --upgrade cryptography requests colorama + python -m pip install --upgrade cryptography requests colorama pytest pytest-cov pytest-asyncio + # project editable install + pip install -e . - - name: "Run test.py and tests.py on ${{ matrix.python-version }}" - run: "python -m test.py && python -m tests" + - name: "Run legacy scripts" + run: python -m test.py && python -m tests + + - name: "Run pytest with coverage" + run: pytest --cov=tinytuya --cov-report=xml --cov-report=term-missing + + - name: "Upload coverage artifact" + uses: actions/upload-artifact@v4 + with: + name: coverage-${{ matrix.python-version }} + path: coverage.xml + + - name: "Codecov upload" + if: matrix.python-version == '3.10' + uses: codecov/codecov-action@v4 + with: + token: ${{ secrets.CODECOV_TOKEN }} + files: coverage.xml + flags: unittests + fail_ci_if_error: false + verbose: true diff --git a/ASYNC.md b/ASYNC.md new file mode 100644 index 00000000..f52aa633 --- /dev/null +++ b/ASYNC.md @@ -0,0 +1,141 @@ +# TinyTuya Async Roadmap (v2.x) + +## Vision +Provide a first-class asyncio-native API for local Tuya LAN control that: +* Preserves rock-solid backward compatibility with the existing synchronous API (1.x style) for the large installed base. +* Enables high-concurrency, low-latency operations (parallel status polling, batched control, streaming updates) across mixed protocol versions (3.1–3.5) without blocking threads. +* Establishes a sustainable architecture so future protocol changes (e.g. 3.6+, new discovery flows) can be integrated once at the async layer and selectively backported. + +## Goals +1. Async Core: Non-blocking socket connect, handshake, encrypt/decrypt, send/receive framed messages for all protocol versions. +2. High Throughput: Support dozens/hundreds of devices concurrently with graceful backpressure and timeout handling. +3. Pluggable Crypto & Parsing: Reuse existing message framing logic but allow async pipeline (reader / writer tasks) with cancellation. +4. Structured API: Mirror familiar synchronous class names with `Async` suffix (e.g. `XenonDeviceAsync`, `OutletDeviceAsync`). +5. Observability: Built-in debug / trace hooks and metrics counters (messages sent, retries, handshake duration) pluggable via callbacks. +6. Incremental Adoption: No forced migration—sync and async coexist; shared utility modules (e.g. encoding, DPS merge) remain single-source. + +## Out of Scope +* Replacing synchronous classes or removing sync code paths. +* Full async Cloud API (could follow later). + +## Architectural Overview (Planned) +``` ++------------------------------+ +---------------------------+ +| XenonDeviceAsync (base) | | MessageHelper (shared) | +| - state machine |<--calls--> | pack/unpack (sync funcs) | +| - connection supervisor | | crypto helpers | +| - protocol v3.1..v3.5 | +---------------------------+ +| - send queue (asyncio.Queue)| +| - recv task (reader loop) | +---------------------------+ +| - handshake coroutine |<--uses---->| Crypto (AESCipher) | ++--------------+---------------+ +---------------------------+ + | derives + +----------+-----------+ + | Async Device Mixins | + | (Outlet/Bulb/etc.) | + +----------------------+ +``` + +## Milestones +| Milestone | Description | Deliverables | Target Version | +|-----------|-------------|--------------|----------------| +| M0 | Planning & Version Bump | v2.0.0, `ASYNC.md`, release notes | 2.0.0 | +| M1 | Async Core Skeleton | `xasync/connection.py`, `XenonDeviceAsync` minimal connect + status (3.1/3.3) | 2.1.0 | +| M2 | Protocol Coverage | Support 3.4/3.5 handshake & GCM in async path | 2.2.0 | +| M3 | Device Classes | `OutletDeviceAsync`, `BulbDeviceAsync`, `CoverDeviceAsync` parity subset | 2.3.0 | +| M4 | High-Perf Scanner | Async scanner refactor (parallel probes, cancellation) | 2.4.0 | +| M5 | Test & Metrics | 85%+ coverage for async modules; metrics hooks | 2.5.0 | +| M6 | Examples & Docs | Async examples, README + PROTOCOL cross-links | 2.6.0 | +| M7 | Optimization | Connection pooling, adaptive retry, rate limiting | 2.7.0 | + +## Detailed Task Breakdown +### M1 – Async Core Skeleton +- [ ] Create package folder `tinytuya/asyncio/` (or `tinytuya/async_`) to avoid name collision. +- [ ] Implement `XenonDeviceAsync` with: + * `__init__(..., loop=None)` store config + * `_ensure_connection()` coroutine: open TCP, negotiate session key if needed + * `_reader_task()` coroutine: read frames, push to internal queue + * `_send_frame()` coroutine: pack + write + * `status()` -> `await get_status()` that sends DP_QUERY / CONTROL_NEW per version + * Graceful close / cancellation +- [ ] Reuse existing `pack_message` / `unpack_message` in a thread-safe way (they are CPU-bound but fast; optionally offload heavy crypto to default loop executor only if needed later). + +### M2 – Protocol v3.4 / v3.5 Handshake +- [ ] Async handshake coroutine with timeout + auto-retry +- [ ] Session key caching per open connection +- [ ] Automatic renegotiation on GCM tag failure + +### M3 – Device Class Parity +- [ ] Async mixins or subclasses replicating key sync API (`set_value`, `set_multiple_values`, `turn_on/off`) +- [ ] If return values differ (e.g. coroutines), document mapping in README +- [ ] Shared DPS merge logic factored into pure functions usable by both sync/async + +### M4 – Async Scanner +- [ ] Coroutine to probe IP ranges concurrently (configurable concurrency) +- [ ] Cancel outstanding probes on shutdown +- [ ] Integrate UDP discovery (v3.1–v3.5) with async sockets +- [ ] Provide `await scan_network(subnet, timeout)` returning structured device list + +### M5 – Tests & QA +- [ ] Pytest-asyncio test suite for: framing, handshake, reconnection, DPS updates +- [ ] Fake device server (async) to simulate v3.1, 3.3, 3.4, 3.5 behaviors +- [ ] Performance smoke test (N devices concurrently) gating PR merges + +### M6 – Documentation & Examples +- [ ] `examples/async/` directory with: basic status, bulk control, scanner usage, bulb effects +- [ ] README section: “Using the Async API” with migration notes +- [ ] Cross-link PROTOCOL.md for handshake & framing details + +### M7 – Optimization & Enhancements +- [ ] Connection pooling for multiple logical child devices (gateways) sharing transport +- [ ] Adaptive retry and exponential backoff for transient network errors +- [ ] Optional structured logging adapter (JSON events) +- [ ] Metrics hook interface (`on_event(event_name, **data)`) for integrations + +## Testing Strategy +| Layer | Strategy | +|-------|----------| +| Unit | Pure functions (framing, header parse) deterministic tests | +| Integration | Async fake device endpoints per protocol version | +| Performance | Timed concurrent status for N synthetic devices (assert throughput baseline) | +| Regression | Mirror critical sync tests with async equivalents | +| Fuzz (future) | Random DP payload mutation on fake server to harden parsing | + +## API Sketch (Draft) +```python +import asyncio +import tinytuya +from tinytuya.asyncio import XenonDeviceAsync + +async def main(): + dev = XenonDeviceAsync(dev_id, address=ip, local_key=key, version=3.5, persist=True) + status = await dev.status() # coroutine + print(status) + await dev.set_value(1, True) + await dev.close() + +asyncio.run(main()) +``` +Methods returning coroutines (awaitables): `status`, `set_value`, `set_multiple_values`, `heartbeat`, `updatedps`, `close`. + +## Backward Compatibility Plan +* Sync code paths untouched; existing imports remain default. +* Async lives under `tinytuya.asyncio` (explicit opt-in) to avoid polluting top-level namespace initially. +* When async reaches parity, consider promoting selected classes to top-level import in a minor release (opt-in alias only). + +## Open Questions / To Refine +- Should we introduce an event callback API for spontaneous DP updates vs polling? (Likely yes in M3/M4.) +- Provide context manager (`async with XenonDeviceAsync(...)`) for auto-connect/close? (Planned.) +- Rate limiting: global vs per-device? (Investigate after baseline performance metrics.) + +## Contribution Guidelines (Async Track) +* Prefer small, reviewable PRs per milestone task. +* Include tests & docs for every new public coroutine. +* Avoid breaking sync APIs—additive only. +* Mark experimental APIs with a leading `_` or mention in docstring. + +## Next Step +Implement Milestone M1 skeleton: create async package, base class, minimal status() for 3.1 & 3.3 devices. + +--- +Maintained with the goal of long-term stability for existing users while enabling modern async performance. diff --git a/RELEASE.md b/RELEASE.md index c16c71a5..26d3d5c9 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,5 +1,27 @@ # RELEASE NOTES +## v2.0.0 - Async Architecture Introduction (BREAKING MAJOR VERSION) + +This major release introduces the foundation for native asyncio-based device communication while fully preserving the existing synchronous API for backward compatibility. + +Highlights: +* Version bump to 2.x to signal new async subsystem (legacy sync classes unchanged). +* Planning document `ASYNC.md` added (vision, goals, milestones for XenonDeviceAsync & related classes). +* No behavioral changes to existing synchronous code paths in this initial 2.0.0 tag. +* Future minor releases (2.1.x+) will add new async classes and examples without removing sync support. + +Compatibility: +* Existing imports and synchronous usage continue to work (API surface of 1.x retained). +* New async classes will live alongside current modules (no name collisions) and require explicit opt‑in. +* Officially removed Python 2.7 support. + +Migration Guidance: +* You can adopt async incrementally—no action required if you stay with sync API. +* When async classes land, prefer `await device.status_async()` patterns in event loops for concurrency gains. + +See `ASYNC.md` for roadmap details. + + ## 1.17.4 - Cloud Config - Cloud: Add `configFile` option to the Cloud constructor, allowing users to specify the config file location (default remains 'tinytuya.json') by @blackw1ng in https://github.com/jasonacox/tinytuya/pull/640 diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 00000000..20b95dba --- /dev/null +++ b/codecov.yml @@ -0,0 +1,27 @@ +codecov: + require_ci_to_pass: no + +coverage: + precision: 2 + round: down + range: 60..95 + status: + project: + default: + target: auto + threshold: 2% + patch: + default: + target: auto + threshold: 2% + +ignore: + - "sandbox/*" + - "tests/*" + - "build/*" + - "docs/*" + +comment: + layout: "reach,diff,flags,files" + behavior: default + require_changes: false diff --git a/examples/README.md b/examples/README.md index 545633cf..bca97ae6 100644 --- a/examples/README.md +++ b/examples/README.md @@ -20,9 +20,9 @@ Tested devices: Peteme Smart Light Bulbs, Wi-Fi - [link](https://www.amazon.com [monitor.py](monitor.py) - This script uses a loop to listen to a Tuya device for any state changes. -## Async Send and Receive +## Non-blocking Send and Receive -[async_send_receive.py](async_send_receive.py) - This demonstrates how you can make a persistent connection to a Tuya device, send commands and monitor for responses in an async way. +[non_blocking_send_receive.py](non_blocking_send_receive.py) - This demonstrates how you can make a persistent connection to a Tuya device, send commands and monitor for responses without blocking. ## Send Raw DPS Values @@ -49,7 +49,7 @@ turn_on('Dining Room') ## Multi-Threaded Example -[threading.py](threading.py) - Example that uses python threading to connect to multiple devices and listen for updates. +[multi-threading.py](multi-threading.py) - Example that uses python threading to connect to multiple devices and listen for updates. ## Multiple Device Select Example diff --git a/examples/threading.py b/examples/multi-threading.py similarity index 100% rename from examples/threading.py rename to examples/multi-threading.py diff --git a/examples/async_send_receive.py b/examples/non_blocking_send_receive.py similarity index 96% rename from examples/async_send_receive.py rename to examples/non_blocking_send_receive.py index 7b1d5844..53a5c046 100644 --- a/examples/async_send_receive.py +++ b/examples/non_blocking_send_receive.py @@ -1,7 +1,7 @@ # TinyTuya Example # -*- coding: utf-8 -*- """ - TinyTuya - Example showing async persistent connection to device with + TinyTuya - Example showing non-blocking persistent connection to device with continual loop watching for device updates. Author: Jason A. Cox diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 00000000..228203a2 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,11 @@ +[pytest] +# Ignore experimental or local work areas +norecursedirs = sandbox build dist .git .venv venv +testpaths = tests +python_files = test_*.py +asyncio_mode = auto +addopts = -ra --cov=tinytuya --cov-report=term-missing --cov-report=xml +markers = + asyncio: mark a test as using asyncio + integration: integration tests (may hit network) + slow: slow tests diff --git a/server/server.py b/server/server.py index b28896a0..d0b91eec 100644 --- a/server/server.py +++ b/server/server.py @@ -34,7 +34,6 @@ """ # Modules -from __future__ import print_function import threading import time import logging diff --git a/test-devices.py b/test-devices.py new file mode 100644 index 00000000..f1e93871 --- /dev/null +++ b/test-devices.py @@ -0,0 +1,300 @@ +#!/usr/bin/env python3 +""" +Regression Test - DeviceAsync + +Async example: Fetch status for all devices listed in devices.json using DeviceAsync. + +Features: +- Loads devices.json in current working directory (same format produced by wizard) +- Creates DeviceAsync instances (auto-discovers IP if blank) +- Runs status() concurrently with optional concurrency limit + - Optional --rediscover flag ignores stored IPs and forces broadcast discovery per device +- Prints per-device summary line and optional JSON output +- Gracefully closes each connection + +Usage: + python test-devices.py # human-readable summary + python test-devices.py --json # JSON array output + python test-devices.py --include-sub # include sub (child) devices + python test-devices.py --limit 5 # max concurrent connections + python test-devices.py --rediscover # force IP rediscovery for all devices + +Exit code is 0 if all succeeded, 1 if any device failed. +""" +from __future__ import annotations +import asyncio, json, argparse, sys, time, os, socket +try: + from colorama import init as colorama_init, Fore, Style + colorama_init() + HAVE_COLORAMA = True +except ImportError: # fallback + HAVE_COLORAMA = False + class _Dummy: + RESET_ALL = '' + class _ForeDummy: + RED = GREEN = '' + class _StyleDummy: + BRIGHT = NORMAL = RESET_ALL = '' + Fore = _ForeDummy() + Style = _StyleDummy() +import tinytuya +try: + from tinytuya.async_scanner import shared_discover # native async UDP discovery +except Exception: # fallback if module path changes + shared_discover = None # type: ignore + +DEFAULT_DEVICES_FILE = 'devices.json' + +# -------------------- Data Loading -------------------- + +def load_devices(path: str, include_sub: bool) -> list[dict]: + try: + with open(path, 'r') as f: + data = json.load(f) + except FileNotFoundError: + print(f"ERROR: devices file not found: {path}", file=sys.stderr) + return [] + except Exception as e: + print(f"ERROR: unable to parse {path}: {e}", file=sys.stderr) + return [] + + devices = [] + for item in data: + if (not include_sub) and item.get('sub'): + continue + dev_id = item.get('id') or item.get('uuid') + if not dev_id: + continue + version_raw = (item.get('version') or '').strip() + try: + version = float(version_raw) if version_raw else 3.3 + except Exception: + version = 3.3 + devices.append({ + 'id': dev_id, + 'name': item.get('name') or dev_id, + 'ip': item.get('ip') or None, + 'key': item.get('key',''), + 'version': version, + 'sub': item.get('sub', False), + 'dev_type': 'default' + }) + return devices + +# -------------------- Async Fetch -------------------- + +COLOR_OK = Fore.GREEN +COLOR_FAIL = Fore.RED +COLOR_DIM = Style.DIM if hasattr(Style, 'DIM') else '' +COLOR_RESET = Style.RESET_ALL if hasattr(Style, 'RESET_ALL') else '' + +def _supports_color(args) -> bool: + if getattr(args, 'no_color', False): + return False + if getattr(args, 'color', False): + return True + return sys.stdout.isatty() and HAVE_COLORAMA + +def _short_error(msg: str, limit: int = 60) -> str: + if not msg: + return '' + msg = msg.replace('\n', ' ').replace('\r', ' ') + return (msg[:limit] + '…') if len(msg) > limit else msg + +async def fetch_status(meta: dict, sem: asyncio.Semaphore, timeout: float = 8.0) -> dict: + start = time.time() + async with sem: + try: + async with tinytuya.DeviceAsync( + meta['id'], + address=meta['ip'], + local_key=meta['key'], + version=meta['version'], + dev_type=meta['dev_type'], + persist=False + ) as dev: + # Directly call status(); DeviceAsync handles any lazy setup internally. + result = await asyncio.wait_for(dev.status(), timeout=timeout) + duration = time.time() - start + if not result: + raise RuntimeError('No response') + if isinstance(result, dict) and 'dps' in result: + dps = result['dps'] if isinstance(result['dps'], dict) else result['dps'] + else: + dps = result + return { + 'id': meta['id'], + 'name': meta['name'], + 'ip': dev.address, + 'version': meta['version'], + 'ok': True, + 'dps': dps, + 'elapsed': round(duration, 3), + 'category': 'ok' + } + except Exception as e: # gather error info + duration = time.time() - start + emsg = f"{e.__class__.__name__}: {e}" if isinstance(e, Exception) else str(e) + low = emsg.lower() + if isinstance(e, asyncio.TimeoutError) or 'timeout' in low: + cat = 'timeout' + elif isinstance(e, (ConnectionError, OSError, socket.timeout)) or 'network' in low or 'connect' in low: + cat = 'network' + elif 'key' in low: + cat = 'key' + elif 'payload' in low or 'decode' in low or 'protocol' in low: + cat = 'protocol' + else: + cat = 'other' + return { + 'id': meta['id'], + 'name': meta['name'], + 'ip': meta['ip'], + 'version': meta['version'], + 'ok': False, + 'error': emsg, + 'elapsed': round(duration, 3), + 'category': cat + } + +# -------------------- Main -------------------- + +async def run(args) -> int: + # Early exit if devices file missing per requirement + if not os.path.exists(args.file): + # Dynamic message with actual filename; still a successful (0) exit + print(f"No {os.path.basename(args.file)} found", flush=True) + return 0 + + meta_list = load_devices(args.file, args.include_sub) + if not meta_list: + print('No devices to query.') + return 1 + + if getattr(args, 'rediscover', False): + for m in meta_list: + m['ip'] = None # force AUTO discovery + print('Forcing IP discovery for all devices (ignoring stored IPs)...') + + # Shared discovery pre-pass (quick win) if any device missing IP or rediscover forced + need_discovery = any(m['ip'] is None for m in meta_list) + if need_discovery and shared_discover: + try: + print(f"Performing shared discovery ({args.discover_seconds:.1f}s)...", flush=True) + discovered = await shared_discover(listen_seconds=args.discover_seconds, include_app=True, verbose=False) + # discovered is dict: id -> info + hits = 0 + for m in meta_list: + info = discovered.get(m['id']) + if info and info.get('ip'): + m['ip'] = info.get('ip') + # update version if available + ver = info.get('version') + try: + if ver: + m['version'] = float(ver) + except Exception: + pass + hits += 1 + print(f"Discovery matched {hits} / {total} device IDs.") + except Exception as e: + print(f"Shared discovery failed: {e}") + + total = len(meta_list) + name_width = max(len(m['name']) for m in meta_list) + id_width = max(len(m['id']) for m in meta_list) + # Ensure space for full IPv4 (15 chars) or placeholder '---' + ip_width = max(15, max(len(m.get('ip') or '---') for m in meta_list)) + sem = asyncio.Semaphore(args.limit) + + tasks = [asyncio.create_task(fetch_status(meta, sem, timeout=args.timeout)) for meta in meta_list] + results = [] + + if args.json: + # Still stream-complete internally, just collect for JSON dump + for coro in asyncio.as_completed(tasks): + r = await coro + results.append(r) + print(json.dumps(results, indent=2)) + else: + print(f"Querying {total} device(s)...") + RES_PLAIN_WIDTH = 5 # visible width for result column (OK / FAIL) + TIME_WIDTH = 7 # width for time like '99.99s' + prefix_sample = f"[{total:>3}/{total}] " # sample to measure prefix width (includes trailing space) + prefix_width = len(prefix_sample) + header = (f"{'':<{prefix_width}}" + f"{'Name':<{name_width}} {'ID':<{id_width}} {'IP':<{ip_width}} " + f"{'Ver':<3} {'Res':<{RES_PLAIN_WIDTH}} {'Time':>{TIME_WIDTH}} DPS") + print(header) + completed = 0 + use_color = _supports_color(args) + for coro in asyncio.as_completed(tasks): + r = await coro + results.append(r) + completed += 1 + raw_time = f"{r['elapsed']:.2f}s" + time_str = raw_time.rjust(TIME_WIDTH) + if r['ok']: + plain = 'OK' + if use_color: + colored = f"{COLOR_OK}{plain}{COLOR_RESET}" + else: + colored = plain + pad = ' ' * (RES_PLAIN_WIDTH - len(plain)) + res_col = colored + pad + ip_disp = r['ip'] or '---' + line = (f"[{completed:>3}/{total}] {r['name']:<{name_width}} {r['id']:<{id_width}} {ip_disp:<{ip_width}} " + f"v{r['version']:<3} {res_col}{time_str} dps={len(r.get('dps',{}))}") + else: + plain = 'FAIL' + if use_color: + colored = f"{COLOR_FAIL}{plain}{COLOR_RESET}" + else: + colored = plain + pad = ' ' * (RES_PLAIN_WIDTH - len(plain)) + res_col = colored + pad + err = _short_error(r.get('error','')) + ip_disp = r['ip'] or '---' + line = (f"[{completed:>3}/{total}] {r['name']:<{name_width}} {r['id']:<{id_width}} {ip_disp:<{ip_width}} " + f"v{r['version']:<3} {res_col}{time_str} {err}") + print(line, flush=True) + + ok = sum(1 for r in results if r['ok']) + if use_color: + summary = (f"Done: {COLOR_OK}{ok}{COLOR_RESET}/{total} succeeded; " + f"{COLOR_FAIL}{total-ok}{COLOR_RESET} failed.") + else: + summary = f"Done: {ok}/{total} succeeded; {total-ok} failed." + print(summary) + + # Error category counters + categories = {} + for r in results: + cat = r.get('category', 'unknown') + categories[cat] = categories.get(cat, 0) + 1 + if any(not r['ok'] for r in results): + parts = [f"{k}={v}" for k, v in sorted(categories.items())] + print("Categories: " + ", ".join(parts)) + + return 0 if all(r['ok'] for r in results) else 1 + +# -------------------- CLI -------------------- + +def parse_args(argv=None): + p = argparse.ArgumentParser(description='Async status fetch for devices in devices.json') + p.add_argument('-f','--file', default=DEFAULT_DEVICES_FILE, help='Path to devices.json (default: devices.json)') + p.add_argument('--include-sub', action='store_true', help='Include sub / child devices') + p.add_argument('--rediscover', action='store_true', help='Ignore stored IPs and force broadcast discovery for every device') + p.add_argument('-l','--limit', type=int, default=10, help='Max concurrent connections (default: 10)') + p.add_argument('-t','--timeout', type=float, default=8.0, help='Per-device connect+status timeout seconds (default: 8)') + p.add_argument('--discover-seconds', type=float, default=3.0, help='Seconds for shared discovery pre-pass (default: 3.0)') + p.add_argument('--json', action='store_true', help='Output JSON array instead of text summary') + return p.parse_args(argv) + +if __name__ == '__main__': + args = parse_args() + try: + code = asyncio.run(run(args)) + except KeyboardInterrupt: + code = 130 + sys.exit(code) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..8e1cf6d1 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,7 @@ +# Ensure local project root is on sys.path before any site-packages version +# so tests import the in-repo tinytuya, not an installed one. +import os, sys +ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) +if ROOT not in sys.path: + # Insert at position 0 for highest precedence + sys.path.insert(0, ROOT) diff --git a/tests/test_device_async.py b/tests/test_device_async.py new file mode 100644 index 00000000..ea338ed8 --- /dev/null +++ b/tests/test_device_async.py @@ -0,0 +1,56 @@ +import asyncio +import pytest +import tinytuya + +# Basic async tests for DeviceAsync +# These tests avoid real network I/O by monkeypatching low-level methods. + +@pytest.mark.asyncio +async def test_device_async_repr_and_defaults(): + dev = tinytuya.DeviceAsync('testid123', address='1.2.3.4', local_key='0123456789abcdef', version=3.3) + r = repr(dev) + assert 'testid123' in r + assert '1.2.3.4' in r + +@pytest.mark.asyncio +async def test_device_async_status_mock(monkeypatch): + dev = tinytuya.DeviceAsync('did123', address='10.0.0.99', local_key='0123456789abcdef', version=3.3) + + # Monkeypatch _send_receive to simulate a status response structure + async def fake_send_receive(payload, getresponse=True, decode_response=True, from_child=None): + return {"dps": {"1": True, "2": 42}} + monkeypatch.setattr(dev, '_send_receive', fake_send_receive) + + data = await dev.status() + assert data["dps"]["1"] is True + assert data["dps"]["2"] == 42 + +@pytest.mark.asyncio +async def test_device_async_send_nowait(monkeypatch): + dev = tinytuya.DeviceAsync('didABC', address='10.0.0.50', local_key='0123456789abcdef', version=3.1) + + sent = {} + async def fake_send_receive(payload, getresponse=True, decode_response=True, from_child=None): + sent['called'] = True + return None + monkeypatch.setattr(dev, '_send_receive', fake_send_receive) + + # nowait path (getresponse False) via send() + result = await dev._send(b'rawpayload') + assert sent['called'] is True + assert result is None + +@pytest.mark.asyncio +async def test_device_async_context_manager(monkeypatch): + dev = tinytuya.DeviceAsync('ctx1', address='10.0.0.10', local_key='0123456789abcdef', version=3.3) + + closed = {'count': 0} + async def fake_close(): + closed['count'] += 1 + monkeypatch.setattr(dev, '_close', fake_close) + + async with dev: + # inside context, nothing special to assert yet + assert isinstance(dev, tinytuya.DeviceAsync) + # ensure close() called once + assert closed['count'] == 1 diff --git a/tinytuya/core/DeviceAsync.py b/tinytuya/core/DeviceAsync.py new file mode 100644 index 00000000..1c642253 --- /dev/null +++ b/tinytuya/core/DeviceAsync.py @@ -0,0 +1,1489 @@ +# TinyTuya Module +# -*- coding: utf-8 -*- + +import asyncio +import binascii +import hmac +import json +from hashlib import md5, sha256 +import logging +import socket +import struct +import time + +from .const import DEVICEFILE, TCPPORT +from .crypto_helper import AESCipher +from .error_helper import ERR_CONNECT, ERR_DEVTYPE, ERR_JSON, ERR_KEY_OR_VER, ERR_OFFLINE, ERR_PAYLOAD, error_json +from .exceptions import DecodeError +from .message_helper import MessagePayload, TuyaMessage, pack_message, unpack_message, parse_header +from . import command_types as CT, header as H +from .core import merge_dps_results +from .XenonDevice import find_device, device_info + +log = logging.getLogger(__name__) + +# Async helper functions +async def find_device_async(dev_id=None, address=None): + # Use asyncio.to_thread if available (Python 3.9+), otherwise use executor + # Call the actual sync implementation defined above, not the wrapper + if hasattr(asyncio, 'to_thread'): + return await asyncio.to_thread(find_device, dev_id, address) + else: + # Python 3.8 compatibility + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, find_device, dev_id, address) + +async def device_info_async(dev_id): + # Use asyncio.to_thread if available (Python 3.9+), otherwise use executor + # Call the actual sync implementation defined above, not the wrapper + if hasattr(asyncio, 'to_thread'): + devinfo = await asyncio.to_thread(device_info, dev_id) + else: + # Python 3.8 compatibility + loop = asyncio.get_event_loop() + devinfo = await loop.run_in_executor(None, device_info, dev_id) + if not devinfo: + return {} + return devinfo + +class DeviceAsync(object): + def __init__( + self, dev_id, address=None, local_key="", dev_type="default", connection_timeout=5, + version=3.5, persist=True, cid=None, node_id=None, parent=None, + connection_retry_limit=5, connection_retry_delay=5, port=TCPPORT, + max_simultaneous_dps=0 + ): + """ + Represents a Tuya device. + + Args: + dev_id (str): The device id. + address (str): The network address. + local_key (str, optional): The encryption key. Defaults to None. + cid (str: Optional sub device id. Default to None. + node_id (str: alias for cid) + parent (object: gateway device this device is a child of) + + Attributes: + port (int): The port to connect to. + """ + + self.id = dev_id + self.address = address + self.auto_ip = (not address) or address == "Auto" or address == "0.0.0.0" + self.dev_type = dev_type + self.dev_type_auto = self.dev_type == 'default' + self.last_dev_type = '' + self.connection_timeout = connection_timeout + self.retry = True + self.disabledetect = False # if True do not detect device22 + self.port = port + self.socketPersistent = persist + self.socketNODELAY = True + self.socketRetryLimit = connection_retry_limit + self.socketRetryDelay = connection_retry_delay + self.seqno = 1 + self.sendWait = None + self.dps_cache = {} + self.parent = parent + self.children = {} + self.cid = cid if cid else node_id + self.auto_cid = not self.cid + self._auto_cid_children = False + self.received_wrong_cid_queue = [] + self.local_nonce = b'0123456789abcdef' # not-so-random random key + self.remote_nonce = b'' + self.payload_dict = None + self._historic_status = {} + self._last_status = {} + self._have_status = False + self.max_simultaneous_dps = max_simultaneous_dps if max_simultaneous_dps else 0 + self.raw_sent = None + self.raw_recv = [] + self.cmd_retcode = None + self.reader = None + self.writer = None + self.cipher = None + self.local_key = local_key.encode("latin1") + self.real_local_key = self.local_key + self.auto_key = not local_key + self.version = version + #self._callback_queue = asyncio.Queue() + self._callbacks_connect = [] + self._callbacks_disconnect = [] + self._callbacks_response = [] + self._deferred_callbacks = [] + self._deferred_task = None + self._deferred_task_running = False + self._send_lock = asyncio.Lock() + self._recv_lock = asyncio.Lock() + self._rcv2_lock = asyncio.Lock() + self._conn_lock = asyncio.Lock() + self.connected = asyncio.Event() + self._close_task = None + + if self.parent: + self._set_version(self.parent.version) + self.parent._register_child(self) + if self.auto_cid and not self.cid: + self.parent._auto_cid_children = True + else: + if self.version: + self._set_version(float(self.version)) + else: + self._set_version(3.1) + + def __repr__(self): + # FIXME can do better than this + if self.parent: + parent = self.parent.id + else: + parent = None + return ("%s( %r, address=%r, local_key=%r, dev_type=%r, connection_timeout=%r, version=%r, persist=%r, cid=%r, parent=%r, children=%r )" % + (self.__class__.__name__, self.id, self.address, self.real_local_key.decode(), self.dev_type, self.connection_timeout, self.version, self.socketPersistent, self.cid, parent, self.children)) + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self._close() + + async def _defer_callbacks(self, delay=True, pause=False): + if self._deferred_task and not self._deferred_task_running: + if (not (delay or pause)) or self._deferred_task.done(): + deferred = self._deferred_task + self._deferred_task = None + print('cancelling CB task') + deferred.cancel() + await deferred + print('CB task cancelled') + if self._deferred_callbacks and (not self._deferred_task) and (not pause): + if delay: + print('deferring cb for 100ms', len(self._deferred_callbacks)) + self._deferred_task = asyncio.create_task( self._run_deferred_callbacks_later() ) + else: + print('scheduling cb immediately', len(self._deferred_callbacks)) + # run immediate task doe snto support cancelling, so mark as running + self._deferred_task_running = True + self._deferred_task = asyncio.create_task( self._run_deferred_callbacks() ) + + async def _run_deferred_callbacks(self): + self._deferred_task_running = True + print('running CBs') + cbs = self._deferred_callbacks + self._deferred_callbacks = [] + for cb in cbs: + await cb + if self._deferred_callbacks: + print('running CBs again') + await self._run_deferred_callbacks() + print('CBs finished') + self._deferred_task_running = False + + async def _run_deferred_callbacks_later(self): + try: + await asyncio.sleep(0.1) + except asyncio.CancelledError: + return + self._deferred_task_running = True + await self._run_deferred_callbacks() + + async def _load_child_cids( self ): + self._auto_cid_children = False + for child in self.children: + # if we are a child then we should have a cid/node_id but none were given - try and find it the same way we look up local keys + if child.auto_cid and not child.cid: + devinfo = await device_info_async( child.id ) + if devinfo.get('node_id'): + child.cid = devinfo['node_id'] + child.auto_cid = False + if not child.cid: + # not fatal as the user could have set the device_id to the cid + # in that case dev_type should be 'zigbee' to set the proper fields in requests + log.debug( 'Child device but no cid/node_id given!' ) + + async def _load_local_key( self ): + devinfo = await device_info_async( self.id ) + if devinfo.get('key'): + self.local_key = devinfo['key'].encode("latin1") + self.real_local_key = self.local_key + + async def _ensure_connection(self, renew=False): + async with self._conn_lock: + return await self._ensure_connection_locked(renew=renew) + + async def _ensure_connection_locked(self, renew=False): + """ + error = self._ensure_socket_connection(renew=False) + + Returns 0 on success, or an ERR_* constant on error + """ + # Replaces _get_socket method + if renew and self.writer: + await self._close() + + if self._auto_cid_children: + await self._load_child_cids() + + if self.writer: + return 0 + + retries = 0 + err = ERR_OFFLINE + while retries < self.socketRetryLimit: + if self.auto_key: + await self._load_local_key() + + if self.auto_ip and not self.address: + from ..scanner_async import scanfor + bcast_data = await scanfor( self.id, timeout=True ) + if (not bcast_data) or (bcast_data['ip'] is None): + log.debug("Unable to find device on network (specify IP address)") + err = ERR_OFFLINE + break + self.address = bcast_data['ip'] + self._set_version(float(bcast_data['version'])) + + if not self.address: + log.debug("No address for device!") + err = ERR_OFFLINE + break + + if self.version > 3.1: + if not self.local_key: + log.debug("No local key for device!") + err = ERR_KEY_OR_VER + break + elif len(self.local_key) != 16: + log.debug("Bad local key length for device!") + err = ERR_KEY_OR_VER + break + try: + retries += 1 + fut = asyncio.open_connection(self.address, self.port) + self.reader, self.writer = await asyncio.wait_for(fut, timeout=self.connection_timeout) + + # TCP_NODELAY + sock = self.writer.get_extra_info('socket') + if sock and self.socketNODELAY: + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + + if self.version >= 3.4: + # restart session key negotiation + if await self._negotiate_session_key(): + err = 0 + break + else: + err = ERR_KEY_OR_VER + if not self.auto_key: + await self._close(err) + break + elif retries < self.socketRetryLimit: + # only try reloading the key once + retries = self.socketRetryLimit - 1 + else: + err = 0 + break + except (asyncio.TimeoutError, socket.timeout): + log.debug(f"Connection timeout - retry {retries}/{self.socketRetryLimit}") + err = ERR_OFFLINE + except Exception as e: + log.debug(f"Connection failed (exception) - retry {retries}/{self.socketRetryLimit}", exc_info=True) + err = ERR_CONNECT + + await self._close(err) + if retries < self.socketRetryLimit: + await asyncio.sleep(self.socketRetryDelay) + if self.auto_ip: + self.address = None + + for cb in self._callbacks_connect: + self._deferred_callbacks.append( cb( self, err ) ) + await self._defer_callbacks() + + if err: + self.connected.clear() + else: + self.connected.set() + + return err + + async def _check_socket_close(self): + if not self.socketPersistent: + await self._schedule_close() + + async def _recv_all(self, length, timeout=True): + try: + if timeout is None: + return await self.reader.readexactly(length) + elif timeout is True: + return await asyncio.wait_for(self.reader.readexactly(length), timeout=self.connection_timeout) + else: + return await asyncio.wait_for(self.reader.readexactly(length), timeout=timeout) + except asyncio.IncompleteReadError as e: + log.debug(f"_recv_all(): no data?: {e}") + raise DecodeError('No data received - connection closed') + + async def _receive(self, timeout=True): + # make sure to use the parent's self.seqno and session key + if self.parent: + return await self.parent._receive(timeout) + async with self._rcv2_lock: + return await self._receive_locked( timeout ) + + async def _receive_locked(self, timeout): + # message consists of header + retcode + [data] + crc (4 or 32) + footer + min_len_55AA = struct.calcsize(H.MESSAGE_HEADER_FMT_55AA) + 4 + 4 + len(H.SUFFIX_BIN) + # message consists of header + iv + retcode + [data] + crc (16) + footer + min_len_6699 = struct.calcsize(H.MESSAGE_HEADER_FMT_6699) + 12 + 4 + 16 + len(H.SUFFIX_BIN) + min_len = min(min_len_55AA, min_len_6699) + + data = await self._recv_all(min_len, timeout) + + # search for the prefix. if not found, delete everything except + # the last (prefix_len - 1) bytes and recv more to replace it + prefix_offset_55AA = data.find(H.PREFIX_55AA_BIN) + prefix_offset_6699 = data.find(H.PREFIX_6699_BIN) + + while prefix_offset_55AA != 0 and prefix_offset_6699 != 0: + log.debug('Message prefix not at the beginning of the received data!') + log.debug('Offset 55AA: %d, 6699: %d, Received data: %r', prefix_offset_55AA, prefix_offset_6699, data) + if prefix_offset_55AA < 0 and prefix_offset_6699 < 0: + data = data[1 - len(H.PREFIX_55AA_BIN):] + else: + prefix_offset = prefix_offset_6699 if prefix_offset_55AA < 0 else prefix_offset_55AA + data = data[prefix_offset:] + + data += await self._recv_all(min_len - len(data), timeout) + prefix_offset_55AA = data.find(H.PREFIX_55AA_BIN) + prefix_offset_6699 = data.find(H.PREFIX_6699_BIN) + + header = parse_header(data) + remaining = header.total_length - len(data) + if remaining > 0: + data += await self._recv_all(remaining, timeout) + + log.debug("received data=%r", binascii.hexlify(data)) + hmac_key = self.local_key if self.version >= 3.4 else None + no_retcode = False #None if self.version >= 3.5 else False + return unpack_message(data, header=header, hmac_key=hmac_key, no_retcode=no_retcode) + + async def _send_message( self, payload ): + log.debug("sending payload: %r", payload) + enc_payload = self._encode_message(payload) if isinstance(payload, MessagePayload) else payload + self.writer.write(enc_payload) + await self.writer.drain() + try: + self.raw_sent = parse_header(enc_payload) + except: + self.raw_sent = None + return enc_payload + + # similar to _send_receive() but never retries sending and does not decode the response + async def _send_receive_quick(self, payload, recv_retries, from_child=None, timeout=True): + if self.parent: + return await self.parent._send_receive_quick(payload, recv_retries, from_child=self, timeout=timeout) + + self.raw_sent = None + self.raw_recv = [] + self.cmd_retcode = None + + # loop warning: + # _ensure_connection() calls _negotiate_session_key which calls us again! + if not self.writer: + return None + + if payload: + try: + await self._send_message( payload ) + except Exception: + await self._close(ERR_CONNECT) + return None + + if not recv_retries: + return True + while recv_retries: + try: + msg = await self._receive(timeout=timeout) + self.raw_recv.append(msg) + except Exception: + msg = None + if msg: + self._get_retcode(self.raw_sent, msg) # set self.cmd_retcode + if len(msg.payload) != 0: + return msg + recv_retries -= 1 + if recv_retries == 0: + log.debug("received null payload (%r) but out of recv retries, giving up", msg) + else: + log.debug("received null payload (%r), fetch new one - %s retries remaining", msg, recv_retries) + return False + + async def _send_receive(self, payload, getresponse=True, decode_response=True, from_child=None, timeout=True, retry=True): + """ + Send single buffer `payload` and receive a single buffer. + + Args: + payload(bytes): Data to send. Set to 'None' to receive only. + getresponse(bool): If True, wait for and return response. + """ + if self.parent: + return await self.parent._send_receive(payload, getresponse, decode_response, from_child=self, timeout=timeout, retry=retry) + + if (not payload) and getresponse and self.received_wrong_cid_queue: + if (not self.children) or (not from_child): + r = self.received_wrong_cid_queue[0] + self.received_wrong_cid_queue = self.received_wrong_cid_queue[1:] + return r + found_rq = False + for rq in self.received_wrong_cid_queue: + if rq[0] == from_child: + found_rq = rq + break + if found_rq: + self.received_wrong_cid_queue.remove(found_rq) + return found_rq[1] + + if getresponse and payload: + async with self._send_lock: + async with self._recv_lock: + await self._defer_callbacks(pause=True) + result = await self._send_receive_locked( payload=payload, getresponse=getresponse, decode_response=decode_response, from_child=from_child, timeout=timeout, retry=retry ) + elif getresponse: + async with self._recv_lock: + await self._defer_callbacks(pause=True) + result = await self._send_receive_locked( payload=payload, getresponse=getresponse, decode_response=decode_response, from_child=from_child, timeout=timeout, retry=retry ) + else: + async with self._send_lock: + await self._defer_callbacks(pause=True) + result = await self._send_receive_locked( payload=payload, getresponse=getresponse, decode_response=decode_response, from_child=from_child, timeout=timeout, retry=retry ) + await self._defer_callbacks(delay=False) + return result + + async def _send_receive_locked( self, payload=None, getresponse=True, decode_response=True, from_child=None, timeout=True, retry=True ): + success = False + partial_success = False + retries = 0 + recv_retries = 0 + max_recv_retries = self.socketRetryLimit if (self.retry and retry) else 0 + dev_type = self.dev_type + do_send = True + msg = None + self.raw_recv = [] + self.cmd_retcode = None + while not success: + # open up socket if device is available + # close and re-open if we're sending and socket is not supposed to be persistent + sock_error = await self._ensure_connection( renew=(payload and (not self.socketPersistent)) ) + if sock_error: + await self._close(sock_error) + return error_json(sock_error) + # send request to device + try: + if payload is not None and do_send: + await self._send_message( payload ) + if getresponse and self.sendWait is not None: + await asyncio.sleep(self.sendWait) + if getresponse: + do_send = False + rmsg = await self._receive(timeout) + # device may send null ack (28 byte) response before a full response + # consider it an ACK and do not retry the send even if we do not get a full response + if rmsg: + payload = None + partial_success = True + msg = rmsg + self.raw_recv.append(rmsg) + self._get_retcode(self.raw_sent, rmsg) # set self.cmd_retcode + if len(msg.payload) == 0: + for cb in self._callbacks_response: + self._deferred_callbacks.append( cb( self, None, msg ) ) + await self._defer_callbacks(delay=False) + if (not msg or len(msg.payload) == 0) and recv_retries <= max_recv_retries: + log.debug("received null payload (%r), fetch new one - retry %s / %s", msg, recv_retries, max_recv_retries) + recv_retries += 1 + if recv_retries > max_recv_retries: + success = True + else: + success = True + log.debug("received message=%r", msg) + else: + # legacy/default mode avoids persisting socket across commands + await self._check_socket_close() + return {} + except (KeyboardInterrupt, SystemExit) as err: + log.debug("Keyboard Interrupt - Exiting") + raise + except (asyncio.TimeoutError, socket.timeout): + # a socket timeout occurred + log.debug("Timeout in _send_receive()") + if payload is None: + # Receive only mode - return None + await self._check_socket_close() + return {} + do_send = True + retries += 1 + log.debug(f"Timeout in _send_receive() - retry {retries}/{self.socketRetryLimit}") + # if we exceed the limit of retries then lets get out of here + if retries > self.socketRetryLimit: + await self._close(ERR_KEY_OR_VER) + return error_json(ERR_KEY_OR_VER) + # toss old socket and get new one + await self._close(ERR_PAYLOAD) # FIXME is this the best error to use? + # wait a bit before retrying + await asyncio.sleep(0.1) + except DecodeError: + # connection closed! + log.debug("Connection closed during receive", exc_info=True) + await self._close(ERR_CONNECT) + recv_retries += 1 + if recv_retries > max_recv_retries: + if partial_success: + # we recieved at least 1 valid message with a null payload, so the send was successful + return {} + # device is probably rejecting the payload + return error_json(ERR_PAYLOAD if payload else ERR_CONNECT) + except Exception as err: + # likely network or connection error + if not partial_success: + do_send = True + retries += 1 + # toss old socket and get new one + await self._close(ERR_CONNECT) + log.debug(f"Network connection error - retry {retries}/{self.socketRetryLimit}", exc_info=True) + # if we exceed the limit of retries then lets get out of here + if retries > self.socketRetryLimit: + log.debug( + "Exceeded tinytuya retry limit (%s)", + self.socketRetryLimit + ) + log.debug("Unable to connect to device ") + # timeout reached - return error + return error_json(ERR_CONNECT) + # wait a bit before retrying + await asyncio.sleep(0.1) + # except + # while + + # could be None or have a null payload + if not decode_response: + await self._check_socket_close() + return msg + + return await self._process_message(msg, dev_type, from_child, decode_response, timeout, retry) + + async def _process_message( self, msg, dev_type=None, from_child=None, decode_response=True, timeout=True, retry=True ): + # null packet, nothing to decode + if not msg or len(msg.payload) == 0: + log.debug("raw unpacked message = %r", msg) + # legacy/default mode avoids persisting socket across commands + await self._check_socket_close() + return {} + + # Unpack Message into TuyaMessage format + # and return payload decrypted + try: + # Data available: seqno cmd retcode payload crc + log.debug("raw unpacked message = %r", msg) + payload = self._decrypt_payload(msg.payload) + except: + log.debug("error decrypting tuya JSON payload", exc_info=True) + payload = None + + if payload is None: + log.debug("_decrypt_payload() failed!") + result = error_json(ERR_PAYLOAD) + #result['invalid_msg'] = msg + elif( (not self.disabledetect) and + b"data unvalid" in payload and + self.version == 3.3 and + msg.retcode != 0 and + msg.cmd in (CT.DP_QUERY, CT.DP_QUERY_NEW) and + self.dev_type != "device22" + ): + dev_type = self.dev_type + self.dev_type = "device22" + # set at least one DPS + self.dps_to_request = {"1": None} + log.debug( + "'data unvalid' error detected: switching dev_type (%r -> %r) - Update payload and try again", + dev_type, + self.dev_type, + ) + + #log.debug("status() rebuilding payload for device22") + #payload = self._generate_payload(query_type) + #data = await self._send_receive(payload) + + # FIXME resend status request? + + + elif not payload.startswith(b"{"): + log.debug("Unexpected payload=%r", payload) + result = error_json(ERR_PAYLOAD, payload) + result['invalid_json'] = payload + else: + result = self._decode_payload(payload) + + found_child = False + if self.children: + found_cid = None + if result and 'cid' in result: + found_cid = result['cid'] + elif result and 'data' in result and type(result['data']) == dict and 'cid' in result['data']: + found_cid = result['data']['cid'] + + if found_cid: + for c in self.children: + if self.children[c].cid == found_cid: + result['device'] = found_child = self.children[c] + break + + if from_child and from_child is not True and from_child != found_child: + # async update from different CID, try again + log.debug( 'Recieved async update for wrong CID %s while looking for CID %s, trying again', found_cid, from_child.cid ) + if self.socketPersistent: + # if persistent, save response until the next receive() call + # otherwise, trash it + if found_child: + found_child._handle_response(result, msg) + for cb in found_child._callbacks_response: + self._deferred_callbacks.append( cb( found_child, result, msg ) ) + result = found_child._process_response(result) + else: + self._handle_response(result, msg) + for cb in self._callbacks_response: + self._deferred_callbacks.append( cb( self, result, msg ) ) + result = self._process_response(result) + await self._defer_callbacks() + self.received_wrong_cid_queue.append( (found_child, result) ) + # events should not be coming in so fast that we will never timeout a read, so don't worry about loops + return await self._send_receive_locked( None, True, decode_response, from_child=from_child, timeout=timeout, retry=retry) + + # legacy/default mode avoids persisting socket across commands + await self._check_socket_close() + + obj = self if not found_child else found_child + obj._handle_response(result, msg) + for cb in obj._callbacks_response: + self._deferred_callbacks.append( cb( obj, result, msg ) ) + await self._defer_callbacks(delay=False) + return obj._process_response(result) + + def _decrypt_payload(self, payload): + log.debug("decode payload=%r", payload) + cipher = AESCipher(self.local_key) + + if self.version == 3.4: + # 3.4 devices encrypt the version header in addition to the payload + try: + log.debug("decrypting=%r", payload) + payload = cipher.decrypt(payload, False, decode_text=False) + except: + log.debug("incomplete payload=%r (len:%d)", payload, len(payload), exc_info=True) + return error_json(ERR_PAYLOAD) + + log.debug("decrypted 3.x payload=%r", payload) + log.debug("payload type = %s", type(payload)) + + if payload.startswith(H.PROTOCOL_VERSION_BYTES_31): + # Received an encrypted payload + # Remove version header + payload = payload[len(H.PROTOCOL_VERSION_BYTES_31) :] + # Decrypt payload + # Remove 16-bytes of MD5 hexdigest of payload + payload = cipher.decrypt(payload[16:], decode_text=False) + elif self.version >= 3.2: # 3.2 or 3.3 or 3.4 or 3.5 + # Trim header for non-default device type + if payload.startswith( self.version_bytes ): + payload = payload[len(self.version_header) :] + log.debug("removing 3.x=%r", payload) + elif self.dev_type == "device22" and (len(payload) & 0x0F) != 0: + payload = payload[len(self.version_header) :] + log.debug("removing device22 3.x header=%r", payload) + + if self.version < 3.4: + try: + log.debug("decrypting=%r", payload) + payload = cipher.decrypt(payload, False, decode_text=False) + except: + log.debug("incomplete payload=%r (len:%d)", payload, len(payload), exc_info=True) + return error_json(ERR_PAYLOAD) + + log.debug("decrypted 3.x payload=%r", payload) + # Try to detect if device22 found + log.debug("payload type = %s", type(payload)) + + if isinstance(payload, str): + payload = payload.encode('utf-8') + + return payload + + def _decode_payload(self, payload): + invalid_json = None + if not isinstance(payload, dict): + if not isinstance(payload, str): + try: + payload = payload.decode() + except UnicodeDecodeError: + if (payload[:1] == b'{') and (payload[-1:] == b'}'): + try: + invalid_json = payload + payload = payload.decode( errors='replace' ) + except: + pass + except: + pass + + # if .decode() threw an exception, `payload` will still be bytes + if not isinstance(payload, str): + log.debug("payload was not string type and decoding failed") + return error_json(ERR_JSON, payload) + + log.debug("decoded results=%r", payload) + try: + json_payload = json.loads(payload) + except: + json_payload = error_json(ERR_JSON, payload) + json_payload['invalid_json'] = payload + else: + if invalid_json and isinstance(json_payload, dict): + # give it to the user so they can try to decode it if they want + json_payload['invalid_json'] = invalid_json + + # v3.4 stuffs it into {"data":{"dps":{"1":true}}, ...} + if "dps" not in json_payload and "data" in json_payload and "dps" in json_payload['data']: + json_payload['dps'] = json_payload['data']['dps'] + + return json_payload + + def _handle_response(self, response, raw_msg ): + """ + Cache the last DP values and kick off the data callbacks + """ + + # Save (cache) the last value of every DP + merge_dps_results(self._historic_status, response) + + if (not self.socketPersistent) or (not self.writer): + return + + log.debug('caching: %s', response) + merge_dps_results(self._last_status, response) + log.debug('merged: %s', self._last_status) + + def _process_response(self, response): # pylint: disable=R0201 + """ + Override this function in a sub-class if you want to do some processing on the received data + """ + return response + + async def _negotiate_session_key(self): + rkey = await self._send_receive_quick( self._negotiate_session_key_generate_step_1(), 2 ) + if not rkey: + log.debug('_negotiate_session_key: rkey is None!') + return False + step3 = self._negotiate_session_key_generate_step_3( rkey ) + if not step3: + return False + if not await self._send_receive_quick( step3, None ): + return False + self._negotiate_session_key_generate_finalize() + return True + + def _negotiate_session_key_generate_step_1( self ): + self.local_nonce = b'0123456789abcdef' # not-so-random random key + self.remote_nonce = b'' + self.local_key = self.real_local_key + + return MessagePayload(CT.SESS_KEY_NEG_START, self.local_nonce) + + def _negotiate_session_key_generate_step_3( self, rkey ): + if not rkey or type(rkey) != TuyaMessage or len(rkey.payload) < 48: + # error + log.debug("session key negotiation failed on step 1") + return False + + if rkey.cmd != CT.SESS_KEY_NEG_RESP: + log.debug("session key negotiation step 2 returned wrong command: %d", rkey.cmd) + return False + + payload = rkey.payload + if self.version == 3.4: + try: + log.debug("decrypting=%r", payload) + cipher = AESCipher(self.real_local_key) + payload = cipher.decrypt(payload, False, decode_text=False) + except: + log.debug("session key step 2 decrypt failed, payload=%r (len:%d)", payload, len(payload), exc_info=True) + return False + + log.debug("decrypted session key negotiation step 2 payload=%r", payload) + log.debug("payload type = %s len = %d", type(payload), len(payload)) + + if len(payload) < 48: + log.debug("session key negotiation step 2 failed, too short response") + return False + + self.remote_nonce = payload[:16] + hmac_check = hmac.new(self.local_key, self.local_nonce, sha256).digest() + + if hmac_check != payload[16:48]: + log.debug("session key negotiation step 2 failed HMAC check! wanted=%r but got=%r", binascii.hexlify(hmac_check), binascii.hexlify(payload[16:48])) + return False + + log.debug("session local nonce: %r remote nonce: %r", self.local_nonce, self.remote_nonce) + + rkey_hmac = hmac.new(self.local_key, self.remote_nonce, sha256).digest() + return MessagePayload(CT.SESS_KEY_NEG_FINISH, rkey_hmac) + + def _negotiate_session_key_generate_finalize( self ): + # Python 3: nonce XOR produces bytes object directly + self.local_key = bytes(a ^ b for (a, b) in zip(self.local_nonce, self.remote_nonce)) + log.debug("Session nonce XOR'd: %r", self.local_key) + + cipher = AESCipher(self.real_local_key) + if self.version == 3.4: + self.local_key = cipher.encrypt( self.local_key, False, pad=False ) + else: + iv = self.local_nonce[:12] + log.debug("Session IV: %r", iv) + self.local_key = cipher.encrypt( self.local_key, use_base64=False, pad=False, iv=iv )[12:28] + + log.debug("Session key negotiate success! session key: %r", self.local_key) + return True + + # adds protocol header (if needed) and encrypts + def _encode_message( self, msg ): + # make sure to use the parent's self.seqno and session key + if self.parent: + return self.parent._encode_message( msg ) + hmac_key = None + iv = None + payload = msg.payload + self.cipher = AESCipher(self.local_key) + + if self.version >= 3.4: + hmac_key = self.local_key + if msg.cmd not in H.NO_PROTOCOL_HEADER_CMDS: + # add the 3.x header + payload = self.version_header + payload + log.debug('final payload: %r', payload) + + if self.version >= 3.5: + iv = True + # seqno cmd retcode payload crc crc_good, prefix, iv + msg = TuyaMessage(self.seqno, msg.cmd, None, payload, 0, True, H.PREFIX_6699_VALUE, True) + self.seqno += 1 # increase message sequence number + data = pack_message(msg,hmac_key=self.local_key) + #log.debug("payload [%d] encrypted=%r",self.seqno, binascii.hexlify(data) ) + log.debug("payload %r encrypted=%r", msg, binascii.hexlify(data) ) + return data + + payload = self.cipher.encrypt(payload, False) + elif self.version >= 3.2: + # expect to connect and then disconnect to set new + payload = self.cipher.encrypt(payload, False) + if msg.cmd not in H.NO_PROTOCOL_HEADER_CMDS: + # add the 3.x header + payload = self.version_header + payload + elif msg.cmd == CT.CONTROL: + # need to encrypt + payload = self.cipher.encrypt(payload) + preMd5String = ( + b"data=" + + payload + + b"||lpv=" + + H.PROTOCOL_VERSION_BYTES_31 + + b"||" + + self.local_key + ) + m = md5() + m.update(preMd5String) + hexdigest = m.hexdigest() + # some tuya libraries strip 8: to :24 + payload = ( + H.PROTOCOL_VERSION_BYTES_31 + + hexdigest[8:][:16].encode("latin1") + + payload + ) + + self.cipher = None + msg = TuyaMessage(self.seqno, msg.cmd, 0, payload, 0, True, H.PREFIX_55AA_VALUE, False) + self.seqno += 1 # increase message sequence number + buffer = pack_message(msg,hmac_key=hmac_key) + log.debug("payload %r encrypted=%r", msg, binascii.hexlify(buffer)) + return buffer + + def _get_retcode(self, sent, msg): + """Try to get the retcode for the last sent message""" + if (not sent) or (not msg): + return + if sent.cmd != msg.cmd: + return + if self.version < 3.5: + # v3.5 devices respond with a global incrementing seqno, not the sent seqno + if sent.seqno != msg.seqno: + return + self.cmd_retcode = msg.retcode + + def _register_child(self, child): + if child.id in self.children and child != self.children[child.id]: + log.debug('Replacing existing child %r!', child.id) + self.children[child.id] = child + # disable device22 detection as some gateways return "json obj data unvalid" when the gateway is polled without a cid + self.disabledetect = True + self.payload_dict = None + + def is_connected(self): + return self.connected.is_set() + + async def receive(self, timeout=True): + """ + Poll device to read any payload in the buffer. Timeout results in an empty dict {} returned. + """ + return await self._send_receive(None, timeout=timeout) + + async def _send(self, payload): + """ + Send single buffer `payload`. + + Args: + payload(bytes): Data to send. + """ + return await self._send_receive(payload, getresponse=False) + + async def status(self): + """Return device status.""" + query_type = CT.DP_QUERY + log.debug("status() entry (dev_type is %s)", self.dev_type) + payload = self._generate_payload(query_type) + data = await self._send_receive(payload, getresponse=False) + log.debug("status() received data=%r", data) + return data + + async def cached_status(self, historic=False): + """ + Return device last status if a persistent connection is open. + + Args: + + Response: + json if cache is available, else None + """ + if historic: + return self._historic_status + if (not self._have_status) or (not self.writer) or (not self._last_status): + log.debug("Cache not available, returning None") + return None + return self._last_status + + def cache_clear(self): + self._last_status = {} + self._have_status = False + + async def subdev_query(self): + """Query for a list of sub-devices and their status""" + # final payload should look like: {"data":{"cids":[]},"reqType":"subdev_online_stat_query"} + payload = self._generate_payload(CT.LAN_EXT_STREAM, rawData={"cids":[]}, reqType='subdev_online_stat_query') + return await self._send_receive(payload, getresponse=False) + + async def detect_available_dps(self): + """Return which datapoints are supported by the device.""" + # device22 devices need a sort of bruteforce querying in order to detect the + # list of available dps experience shows that the dps available are usually + # in the ranges [1-25] and [100-110] need to split the bruteforcing in + # different steps due to request payload limitation (max. length = 255) + self.dps_cache = {} + ranges = [(2, 11), (11, 21), (21, 31), (100, 111)] + + for dps_range in ranges: + # dps 1 must always be sent, otherwise it might fail in case no dps is found + # in the requested range + self.dps_to_request = {"1": None} + self.add_dps_to_request(range(*dps_range)) + try: + data = await self.status() + except Exception as ex: + log.exception("Failed to get status: %s", ex) + raise + if data is not None and "dps" in data: + for k in data["dps"]: + self.dps_cache[k] = None + + if self.dev_type == "default": + self.dps_to_request = self.dps_cache + return self.dps_cache + log.debug("Detected dps: %s", self.dps_cache) + self.dps_to_request = self.dps_cache + return self.dps_cache + + def add_dps_to_request(self, dp_indicies): + """Add a datapoint (DP) to be included in requests.""" + if isinstance(dp_indicies, int): + self.dps_to_request[str(dp_indicies)] = None + else: + self.dps_to_request.update({str(index): None for index in dp_indicies}) + + def set_version(self, version): + return self._set_version(version) + + def _set_version(self, version): + # FIXME rework the await self.detect_available_dps() below + version = float(version) + self.version = version + self.version_str = "v" + str(version) + self.version_bytes = str(version).encode('latin1') + self.version_header = self.version_bytes + H.PROTOCOL_3x_HEADER + self.payload_dict = None + if version == 3.2: # 3.2 behaves like 3.3 with device22 + self.dev_type="device22" + if self.dps_to_request == {}: + # FIXME cannot await here + self.detect_available_dps() + + def set_socketPersistent(self, persist): + self.socketPersistent = persist + if not persist: + self._close() + + def set_socketNODELAY(self, nodelay): + self.socketNODELAY = nodelay + sock = self.writer.get_extra_info('socket') + if sock: + if nodelay: + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + else: + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 0) + + def set_socketRetryLimit(self, limit): + self.socketRetryLimit = limit + + def set_socketRetryDelay(self, delay): + self.socketRetryDelay = delay + + def set_socketTimeout(self, s): + self.connection_timeout = s + + def set_dpsUsed(self, dps_to_request): + self.dps_to_request = dps_to_request + + def set_retry(self, retry): + self.retry = retry + + def set_sendWait(self, s): + self.sendWait = s + + async def open(self): + await self._defer_callbacks(pause=True) + result = await self._ensure_connection() + await self._defer_callbacks(delay=False) + return result + + async def close(self, reason=None): + await self._close() + + async def _close(self, reason=None): + self.connected.clear() + if self.writer: + await self._defer_callbacks(pause=True) + try: + self.writer.close() + await self.writer.wait_closed() + except Exception as e: + log.debug(f"Error closing writer: {e}") + for cb in self._callbacks_disconnect: + self._deferred_callbacks.append( cb( self, reason ) ) + await self._defer_callbacks(delay=False) + self.writer = None + self.reader = None + self.cache_clear() + if (self._recv_lock.locked() or self._send_lock.locked()) and self._close_task: + # we are not being called from _run_close() if the lock has been acquired by someone + self._close_task.cancel() + await self._close_task + self._close_task = None + + async def _schedule_close(self): + if self._close_task: + self._close_task.cancel() + await self._close_task + self._close_task = None + print('closing socket in 100ms') + self._close_task = asyncio.create_task( self._run_close() ) + + async def _run_close(self): + try: + await asyncio.sleep(0.1) + except asyncio.CancelledError: + return + if self._send_lock.locked() or self._recv_lock.locked() or self._conn_lock.locked(): + # someone is using the socket + return + await self._close() + + def _generate_payload(self, command, data=None, gwId=None, devId=None, uid=None, rawData=None, reqType=None): + """ + Generate the payload to send. + + Args: + command(str): The type of command. + This is one of the entries from payload_dict + data(dict, optional): The data to send. + This is what will be passed via the 'dps' entry + gwId(str, optional): Will be used for gwId + devId(str, optional): Will be used for devId + uid(str, optional): Will be used for uid + """ + # dicts will get referenced instead of copied if we don't do this + def _deepcopy(dict1): + result = {} + for k in dict1: + if isinstance( dict1[k], dict ): + result[k] = _deepcopy( dict1[k] ) + else: + result[k] = dict1[k] + return result + + # dict2 will be merged into dict1 + # as dict2 is CT.payload_dict['...'] we only need to worry about copying 2 levels deep, + # the command id and "command"/"command_override" keys: i.e. dict2[CMD_ID]["command"] + def _merge_payload_dicts(dict1, dict2): + for cmd in dict2: + if cmd not in dict1: + # make a deep copy so we don't get a reference + dict1[cmd] = _deepcopy( dict2[cmd] ) + else: + for var in dict2[cmd]: + if not isinstance( dict2[cmd][var], dict ): + # not a dict, safe to copy + dict1[cmd][var] = dict2[cmd][var] + else: + # make a deep copy so we don't get a reference + dict1[cmd][var] = _deepcopy( dict2[cmd][var] ) + + # start merging down to the final payload dict + # later merges overwrite earlier merges + # "default" - ("gateway" if gateway) - ("zigbee" if sub-device) - [version string] - ('gateway_'+[version string] if gateway) - + # 'zigbee_'+[version string] if sub-device - [dev_type if not "default"] + if not self.payload_dict or self.last_dev_type != self.dev_type: + self.payload_dict = {} + _merge_payload_dicts( self.payload_dict, CT.payload_dict['default'] ) + if self.children: + _merge_payload_dicts( self.payload_dict, CT.payload_dict['gateway'] ) + if self.cid: + _merge_payload_dicts( self.payload_dict, CT.payload_dict['zigbee'] ) + if self.version_str in CT.payload_dict: + _merge_payload_dicts( self.payload_dict, CT.payload_dict[self.version_str] ) + if self.children and ('gateway_'+self.version_str) in CT.payload_dict: + _merge_payload_dicts( self.payload_dict, CT.payload_dict['gateway_'+self.version_str] ) + if self.cid and ('zigbee_'+self.version_str) in CT.payload_dict: + _merge_payload_dicts( self.payload_dict, CT.payload_dict['zigbee_'+self.version_str] ) + if self.dev_type != 'default': + _merge_payload_dicts( self.payload_dict, CT.payload_dict[self.dev_type] ) + log.debug( 'final payload_dict for %r (%r/%r): %r', self.id, self.version_str, self.dev_type, self.payload_dict ) + # save it so we don't have to calculate this again unless something changes + self.last_dev_type = self.dev_type + + json_data = command_override = None + + if command in self.payload_dict: + if 'command' in self.payload_dict[command]: + json_data = self.payload_dict[command]['command'] + if 'command_override' in self.payload_dict[command]: + command_override = self.payload_dict[command]['command_override'] + + if command_override is None: + command_override = command + + if command == CT.DP_QUERY or command == CT.DP_QUERY_NEW: + self._have_status = True + + if json_data is None: + # I have yet to see a device complain about included but unneeded attribs, but they *will* + # complain about missing attribs, so just include them all unless otherwise specified + json_data = {"gwId": "", "devId": "", "uid": "", "t": ""} + + # make sure we don't modify payload_dict + json_data = json_data.copy() + + if "gwId" in json_data: + if gwId is not None: + json_data["gwId"] = gwId + elif self.parent: + json_data["gwId"] = self.parent.id + else: + json_data["gwId"] = self.id + if "devId" in json_data: + if devId is not None: + json_data["devId"] = devId + else: + json_data["devId"] = self.id + if "uid" in json_data: + if uid is not None: + json_data["uid"] = uid + else: + json_data["uid"] = self.id + if self.cid: + json_data["cid"] = self.cid + if "data" in json_data: + json_data["data"]["cid"] = self.cid + json_data["data"]["ctype"] = 0 + #elif "cid" in json_data: + # del json_data['cid'] + if "t" in json_data: + if json_data['t'] == "int": + json_data["t"] = int(time.time()) + else: + json_data["t"] = str(int(time.time())) + if rawData is not None and "data" in json_data: + json_data["data"] = rawData + elif data is not None: + if "dpId" in json_data: + json_data["dpId"] = data + elif "data" in json_data: + json_data["data"]["dps"] = data + else: + json_data["dps"] = data + elif self.dev_type == "device22" and command == CT.DP_QUERY: + json_data["dps"] = self.dps_to_request + if reqType and "reqType" in json_data: + json_data["reqType"] = reqType + + # Create byte buffer from hex data + if json_data == "": + payload = "" + else: + payload = json.dumps(json_data) + # if spaces are not removed device does not respond! + payload = payload.replace(" ", "") + payload = payload.encode("utf-8") + log.debug("building command %s payload=%r", command, payload) + + # create Tuya message packet + return MessagePayload(command_override, payload) + + + def register_connect_handler( self, cb ): + if cb not in self._callbacks_connect: + self._callbacks_connect.append( cb ) + + def register_disconnect_handler( self, cb ): + if cb not in self._callbacks_disconnect: + self._callbacks_disconnect.append( cb ) + + def register_response_handler( self, cb ): + if cb not in self._callbacks_response: + self._callbacks_response.append( cb ) + + # + # The following methods are taken from the v1 Device class and modified to be async-compatible. + # + async def set_status(self, on, switch=1): + """ + Set status of the device to 'on' or 'off'. + + Args: + on(bool): True for 'on', False for 'off'. + switch(int): The switch to set + """ + # open device, send request, then close connection + if isinstance(switch, int): + switch = str(switch) # index and payload is a string + payload = self._generate_payload(CT.CONTROL, {switch: on}) + + data = await self._send_receive(payload, getresponse=False) + log.debug("set_status received data=%r", data) + + return data + + async def product(self): + """ + Request AP_CONFIG Product Info from device. [BETA] + + """ + # open device, send request, then close connection + payload = self._generate_payload(CT.AP_CONFIG) + data = await self._send_receive(payload) + log.debug("product received data=%r", data) + return data + + async def heartbeat(self): + """ + Send a keep-alive HEART_BEAT command to keep the TCP connection open. + + Devices only send an empty-payload response, so no need to wait for it. + + Args: + """ + #print('sending hb', self.id) + payload = self._generate_payload(CT.HEART_BEAT) + data = await self._send_receive(payload, getresponse=False) + log.debug("heartbeat received data=%r", data) + return data + + async def heartbeat_task(self, auto_open=False): + return await self._run_task(self._heartbeat_task_action, auto_open=auto_open) + + async def _heartbeat_task_action(self): + await asyncio.sleep(9) + if self.connected.is_set(): + await self.heartbeat() + + async def receive_task(self, auto_open=False): + return await self._run_task(self._receive_task_action, auto_open=auto_open) + + async def _receive_task_action(self): + msg = await self._send_receive(None, getresponse=True, timeout=None, retry=False) + + async def _run_task(self, task, auto_open=False): + if auto_open: + self.socketPersistent = True + + try: + while True: + if not self.connected.is_set(): + if auto_open: + async with self._recv_lock: + async with self._send_lock: + if await self._ensure_connection(): + # error connecting! + await asyncio.sleep(2) + continue + else: + try: + await self.connected.wait() + except Exception as e: + print('got conn wait exception:', e) + continue + + await task() + except asyncio.CancelledError as e: + print('got cancelled exception:', e) + pass + + async def updatedps(self, index=None): + """ + Request device to update index. + + Args: + index(array): list of dps to update (ex. [4, 5, 6, 18, 19, 20]) + """ + if index is None: + index = [1] + + log.debug("updatedps() entry (dev_type is %s)", self.dev_type) + # open device, send request, then close connection + payload = self._generate_payload(CT.UPDATEDPS, index) + data = await self._send_receive(payload, getresponse=False) + log.debug("updatedps received data=%r", data) + return data + + async def set_value(self, index, value): + """ + Set int value of any index. + + Args: + index(int): index to set + value(int): new value for the index + """ + # open device, send request, then close connection + if isinstance(index, int): + index = str(index) # index and payload is a string + + payload = self._generate_payload(CT.CONTROL, {index: value}) + + data = await self._send_receive(payload, getresponse=False) + + return data + + async def set_multiple_values(self, data): + """ + Set multiple indexes at the same time + + Args: + data(dict): array of index/value pairs to set + """ + # FIXME if nowait is set we can't detect failure + nowait = True + if nowait: + if self.max_simultaneous_dps > 0 and len(data) > self.max_simultaneous_dps: + # too many DPs, break it up into smaller chunks + ret = None + for k in data: + ret = await self.set_value(k, data[k]) + return ret + else: + # send them all. since nowait is set we can't detect failure + out = {} + for k in data: + out[str(k)] = data[k] + payload = self._generate_payload(CT.CONTROL, out) + return await self._send_receive(payload, getresponse=False) + + if self.max_simultaneous_dps > 0 and len(data) > self.max_simultaneous_dps: + # too many DPs, break it up into smaller chunks + ret = {} + for k in data: + if (not nowait) and bool(ret): + await asyncio.sleep(1) + result = await self.set_value(k, data[k]) + merge_dps_results(ret, result) + return ret + + # send them all, but try to detect devices which cannot handle multiple + out = {} + for k in data: + out[str(k)] = data[k] + + payload = self._generate_payload(CT.CONTROL, out) + result = await self._send_receive(payload, getresponse=False) + + if result and 'Err' in result and len(out) > 1: + # sending failed! device might only be able to handle 1 DP at a time + first_dp = next(iter( out )) + res = await self.set_value(first_dp, out[first_dp]) + del out[first_dp] + if res and 'Err' not in res: + # single DP succeeded! set limit to 1 + self.max_simultaneous_dps = 1 + result = res + for k in out: + res = await self.set_value(k, out[k]) + merge_dps_results(result, res) + return result + + async def turn_on(self, switch=1): + """Turn the device on""" + return await self.set_status(True, switch) + + async def turn_off(self, switch=1): + """Turn the device off""" + return await self.set_status(False, switch) + + async def set_timer(self, num_secs, dps_id=0): + """ + Set a timer. + + Args: + num_secs(int): Number of seconds + dps_id(int): DPS Index for Timer + """ + + # Query status, pick last device id as that is probably the timer + if dps_id == 0: + status = await self.status() + if "dps" in status: + devices = status["dps"] + devices_numbers = list(devices.keys()) + devices_numbers.sort() + dps_id = devices_numbers[-1] + else: + log.debug("set_timer received error=%r", status) + return status + + payload = self._generate_payload(CT.CONTROL, {dps_id: num_secs}) + + data = await self._send_receive(payload, getresponse=False) + log.debug("set_timer received data=%r", data) + return data diff --git a/tinytuya/core/XenonDevice.py b/tinytuya/core/XenonDevice.py index 89f08bb1..94c949ed 100644 --- a/tinytuya/core/XenonDevice.py +++ b/tinytuya/core/XenonDevice.py @@ -9,7 +9,6 @@ import socket import struct import time -import sys from .const import DEVICEFILE, TCPPORT from .crypto_helper import AESCipher @@ -17,11 +16,10 @@ from .exceptions import DecodeError from .message_helper import MessagePayload, TuyaMessage, pack_message, unpack_message, parse_header from . import command_types as CT, header as H +from .core import merge_dps_results log = logging.getLogger(__name__) -# Python 2 Support -IS_PY2 = sys.version_info[0] == 2 def find_device(dev_id=None, address=None): """Scans network for Tuya devices with either ID = dev_id or IP = address @@ -87,129 +85,6 @@ def device_info( dev_id ): return devinfo -def merge_dps_results(dest, src): - """Merge multiple receive() responses into a single dict - - `src` will be combined with and merged into `dest` - """ - if src and isinstance(src, dict) and 'Error' not in src and 'Err' not in src: - for k in src: - if k == 'dps' and src[k] and isinstance(src[k], dict): - if 'dps' not in dest or not isinstance(dest['dps'], dict): - dest['dps'] = {} - for dkey in src[k]: - dest['dps'][dkey] = src[k][dkey] - elif k == 'data' and src[k] and isinstance(src[k], dict) and 'dps' in src[k] and isinstance(src[k]['dps'], dict): - if k not in dest or not isinstance(dest[k], dict): - dest[k] = {'dps': {}} - if 'dps' not in dest[k] or not isinstance(dest[k]['dps'], dict): - dest[k]['dps'] = {} - for dkey in src[k]['dps']: - dest[k]['dps'][dkey] = src[k]['dps'][dkey] - else: - dest[k] = src[k] - -# Tuya Device Dictionary - Command and Payload Overrides -# -# 'default' devices require the 0a command for the DP_QUERY request -# 'device22' devices require the 0d command for the DP_QUERY request and a list of -# dps used set to Null in the request payload -# -# Any command not defined in payload_dict will be sent as-is with a -# payload of {"gwId": "", "devId": "", "uid": "", "t": ""} - -payload_dict = { - # Default Device - "default": { - CT.AP_CONFIG: { # [BETA] Set Control Values on Device - "command": {"gwId": "", "devId": "", "uid": "", "t": ""}, - }, - CT.CONTROL: { # Set Control Values on Device - "command": {"devId": "", "uid": "", "t": ""}, - }, - CT.STATUS: { # Get Status from Device - "command": {"gwId": "", "devId": ""}, - }, - CT.HEART_BEAT: {"command": {"gwId": "", "devId": ""}}, - CT.DP_QUERY: { # Get Data Points from Device - "command": {"gwId": "", "devId": "", "uid": "", "t": ""}, - }, - CT.CONTROL_NEW: {"command": {"devId": "", "uid": "", "t": ""}}, - CT.DP_QUERY_NEW: {"command": {"devId": "", "uid": "", "t": ""}}, - CT.UPDATEDPS: {"command": {"dpId": [18, 19, 20]}}, - CT.LAN_EXT_STREAM: { "command": { "reqType": "", "data": {} }}, - }, - # Special Case Device with 22 character ID - Some of these devices - # Require the 0d command as the DP_QUERY status request and the list of - # dps requested payload - "device22": { - CT.DP_QUERY: { # Get Data Points from Device - "command_override": CT.CONTROL_NEW, # Uses CONTROL_NEW command for some reason - "command": {"devId": "", "uid": "", "t": ""}, - }, - }, - # v3.3+ devices do not need devId/gwId/uid - "v3.4": { - CT.CONTROL: { - "command_override": CT.CONTROL_NEW, # Uses CONTROL_NEW command - "command": {"protocol":5, "t": "int", "data": {}} - }, - CT.CONTROL_NEW: { - "command": {"protocol":5, "t": "int", "data": {}} - }, - CT.DP_QUERY: { - "command_override": CT.DP_QUERY_NEW, - "command": {} #"protocol":4, "t": "int", "data": {}} - }, - CT.DP_QUERY_NEW: { - "command": {} - }, - }, - # v3.5 is just a copy of v3.4 - "v3.5": { - CT.CONTROL: { - "command_override": CT.CONTROL_NEW, # Uses CONTROL_NEW command - "command": {"protocol":5, "t": "int", "data": {}} - }, - CT.CONTROL_NEW: { - "command": {"protocol":5, "t": "int", "data": {}} - }, - CT.DP_QUERY: { - "command_override": CT.DP_QUERY_NEW, - "command": {} - }, - CT.DP_QUERY_NEW: { - "command": {} - }, - }, - # placeholders, not yet needed - "gateway": { }, - "gateway_v3.4": { }, - "gateway_v3.5": { }, - "zigbee": { - CT.CONTROL: { "command": {"t": "int", "cid": ""} }, - CT.DP_QUERY: { "command": {"t": "int", "cid": ""} }, - }, - "zigbee_v3.4": { - CT.CONTROL: { - "command_override": CT.CONTROL_NEW, - "command": {"protocol":5, "t": "int", "data": {"cid":""}} - }, - CT.CONTROL_NEW: { - "command": {"protocol":5, "t": "int", "data": {"cid":""}} - }, - }, - "zigbee_v3.5": { - CT.CONTROL: { - "command_override": CT.CONTROL_NEW, - "command": {"protocol":5, "t": "int", "data": {"cid":""}} - }, - CT.CONTROL_NEW: { - "command": {"protocol":5, "t": "int", "data": {"cid":""}} - }, - }, -} - class XenonDevice(object): def __init__( self, dev_id, address=None, local_key="", dev_type="default", connection_timeout=5, @@ -904,11 +779,8 @@ def _negotiate_session_key_generate_step_3( self, rkey ): return MessagePayload(CT.SESS_KEY_NEG_FINISH, rkey_hmac) def _negotiate_session_key_generate_finalize( self ): - if IS_PY2: - k = [ chr(ord(a)^ord(b)) for (a,b) in zip(self.local_nonce,self.remote_nonce) ] - self.local_key = ''.join(k) - else: - self.local_key = bytes( [ a^b for (a,b) in zip(self.local_nonce,self.remote_nonce) ] ) + # Python 3: nonce XOR produces bytes object directly + self.local_key = bytes(a ^ b for (a, b) in zip(self.local_nonce, self.remote_nonce)) log.debug("Session nonce XOR'd: %r", self.local_key) cipher = AESCipher(self.real_local_key) @@ -1206,7 +1078,7 @@ def _deepcopy(dict1): return result # dict2 will be merged into dict1 - # as dict2 is payload_dict['...'] we only need to worry about copying 2 levels deep, + # as dict2 is CT.payload_dict['...'] we only need to worry about copying 2 levels deep, # the command id and "command"/"command_override" keys: i.e. dict2[CMD_ID]["command"] def _merge_payload_dicts(dict1, dict2): for cmd in dict2: @@ -1228,19 +1100,19 @@ def _merge_payload_dicts(dict1, dict2): # 'zigbee_'+[version string] if sub-device - [dev_type if not "default"] if not self.payload_dict or self.last_dev_type != self.dev_type: self.payload_dict = {} - _merge_payload_dicts( self.payload_dict, payload_dict['default'] ) + _merge_payload_dicts( self.payload_dict, CT.payload_dict['default'] ) if self.children: - _merge_payload_dicts( self.payload_dict, payload_dict['gateway'] ) + _merge_payload_dicts( self.payload_dict, CT.payload_dict['gateway'] ) if self.cid: - _merge_payload_dicts( self.payload_dict, payload_dict['zigbee'] ) - if self.version_str in payload_dict: - _merge_payload_dicts( self.payload_dict, payload_dict[self.version_str] ) - if self.children and ('gateway_'+self.version_str) in payload_dict: - _merge_payload_dicts( self.payload_dict, payload_dict['gateway_'+self.version_str] ) - if self.cid and ('zigbee_'+self.version_str) in payload_dict: - _merge_payload_dicts( self.payload_dict, payload_dict['zigbee_'+self.version_str] ) + _merge_payload_dicts( self.payload_dict, CT.payload_dict['zigbee'] ) + if self.version_str in CT.payload_dict: + _merge_payload_dicts( self.payload_dict, CT.payload_dict[self.version_str] ) + if self.children and ('gateway_'+self.version_str) in CT.payload_dict: + _merge_payload_dicts( self.payload_dict, CT.payload_dict['gateway_'+self.version_str] ) + if self.cid and ('zigbee_'+self.version_str) in CT.payload_dict: + _merge_payload_dicts( self.payload_dict, CT.payload_dict['zigbee_'+self.version_str] ) if self.dev_type != 'default': - _merge_payload_dicts( self.payload_dict, payload_dict[self.dev_type] ) + _merge_payload_dicts( self.payload_dict, CT.payload_dict[self.dev_type] ) log.debug( 'final payload_dict for %r (%r/%r): %r', self.id, self.version_str, self.dev_type, self.payload_dict ) # save it so we don't have to calculate this again unless something changes self.last_dev_type = self.dev_type diff --git a/tinytuya/core/__init__.py b/tinytuya/core/__init__.py index 24d5e3dd..420997e1 100644 --- a/tinytuya/core/__init__.py +++ b/tinytuya/core/__init__.py @@ -1,4 +1,3 @@ - # TinyTuya Module # -*- coding: utf-8 -*- @@ -16,3 +15,5 @@ from .core import * from .core import __version__ from .core import __author__ + +from .DeviceAsync import * diff --git a/tinytuya/core/command_types.py b/tinytuya/core/command_types.py index 9ba22295..7ac55d49 100644 --- a/tinytuya/core/command_types.py +++ b/tinytuya/core/command_types.py @@ -26,3 +26,105 @@ BOARDCAST_LPV34 = 0x23 # 35 # FR_TYPE_BOARDCAST_LPV34 REQ_DEVINFO = 0x25 # broadcast to port 7000 to get v3.5 devices to send their info LAN_EXT_STREAM = 0x40 # 64 # FRM_LAN_EXT_STREAM + + +# Tuya Device Dictionary - Command and Payload Overrides +# +# 'default' devices require the 0a command for the DP_QUERY request +# 'device22' devices require the 0d command for the DP_QUERY request and a list of +# dps used set to Null in the request payload +# +# Any command not defined in payload_dict will be sent as-is with a +# payload of {"gwId": "", "devId": "", "uid": "", "t": ""} + +payload_dict = { + # Default Device + "default": { + AP_CONFIG: { # [BETA] Set Control Values on Device + "command": {"gwId": "", "devId": "", "uid": "", "t": ""}, + }, + CONTROL: { # Set Control Values on Device + "command": {"devId": "", "uid": "", "t": ""}, + }, + STATUS: { # Get Status from Device + "command": {"gwId": "", "devId": ""}, + }, + HEART_BEAT: {"command": {"gwId": "", "devId": ""}}, + DP_QUERY: { # Get Data Points from Device + "command": {"gwId": "", "devId": "", "uid": "", "t": ""}, + }, + CONTROL_NEW: {"command": {"devId": "", "uid": "", "t": ""}}, + DP_QUERY_NEW: {"command": {"devId": "", "uid": "", "t": ""}}, + UPDATEDPS: {"command": {"dpId": [18, 19, 20]}}, + LAN_EXT_STREAM: { "command": { "reqType": "", "data": {} }}, + }, + # Special Case Device with 22 character ID - Some of these devices + # Require the 0d command as the DP_QUERY status request and the list of + # dps requested payload + "device22": { + DP_QUERY: { # Get Data Points from Device + "command_override": CONTROL_NEW, # Uses CONTROL_NEW command for some reason + "command": {"devId": "", "uid": "", "t": ""}, + }, + }, + # v3.3+ devices do not need devId/gwId/uid + "v3.4": { + CONTROL: { + "command_override": CONTROL_NEW, # Uses CONTROL_NEW command + "command": {"protocol":5, "t": "int", "data": {}} + }, + CONTROL_NEW: { + "command": {"protocol":5, "t": "int", "data": {}} + }, + DP_QUERY: { + "command_override": DP_QUERY_NEW, + "command": {} #"protocol":4, "t": "int", "data": {}} + }, + DP_QUERY_NEW: { + "command": {} + }, + }, + # v3.5 is just a copy of v3.4 + "v3.5": { + CONTROL: { + "command_override": CONTROL_NEW, # Uses CONTROL_NEW command + "command": {"protocol":5, "t": "int", "data": {}} + }, + CONTROL_NEW: { + "command": {"protocol":5, "t": "int", "data": {}} + }, + DP_QUERY: { + "command_override": DP_QUERY_NEW, + "command": {} + }, + DP_QUERY_NEW: { + "command": {} + }, + }, + # placeholders, not yet needed + "gateway": { }, + "gateway_v3.4": { }, + "gateway_v3.5": { }, + "zigbee": { + CONTROL: { "command": {"t": "int", "cid": ""} }, + DP_QUERY: { "command": {"t": "int", "cid": ""} }, + }, + "zigbee_v3.4": { + CONTROL: { + "command_override": CONTROL_NEW, + "command": {"protocol":5, "t": "int", "data": {"cid":""}} + }, + CONTROL_NEW: { + "command": {"protocol":5, "t": "int", "data": {"cid":""}} + }, + }, + "zigbee_v3.5": { + CONTROL: { + "command_override": CONTROL_NEW, + "command": {"protocol":5, "t": "int", "data": {"cid":""}} + }, + CONTROL_NEW: { + "command": {"protocol":5, "t": "int", "data": {"cid":""}} + }, + }, +} diff --git a/tinytuya/core/core.py b/tinytuya/core/core.py index e8122223..0ebd037c 100644 --- a/tinytuya/core/core.py +++ b/tinytuya/core/core.py @@ -1,7 +1,8 @@ # TinyTuya Module # -*- coding: utf-8 -*- """ - Python module to interface with Tuya WiFi smart devices + Python module to interface with Tuya WiFi smart devices. + (Python 3 only as of v2.0.0 – legacy Python 2 support removed.) Author: Jason A. Cox For more information see https://github.com/jasonacox/tinytuya @@ -76,7 +77,6 @@ """ # Modules -from __future__ import print_function # python 2.7 support import logging import sys @@ -90,45 +90,24 @@ from .crypto_helper import AESCipher -# Backward compatibility for python2 -try: - input = raw_input -except NameError: - pass - # Colorama terminal color capability for all platforms if HAVE_COLORAMA: init() -version_tuple = (1, 17, 4) # Major, Minor, Patch +version_tuple = (2, 0, 0) # Major, Minor, Patch version = __version__ = "%d.%d.%d" % version_tuple __author__ = "jasonacox" log = logging.getLogger(__name__) -# Python 2 Support -IS_PY2 = sys.version_info[0] == 2 - - -# Misc Helpers def bin2hex(x, pretty=False): - if pretty: - space = " " - else: - space = "" - if IS_PY2: - result = "".join("%02X%s" % (ord(y), space) for y in x) - else: - result = "".join("%02X%s" % (y, space) for y in x) - return result + space = " " if pretty else "" + return "".join("%02X%s" % (b, space) for b in x) def hex2bin(x): - if IS_PY2: - return x.decode("hex") - else: - return bytes.fromhex(x) + return bytes.fromhex(x) def set_debug(toggle=True, color=True): """Enable tinytuya verbose logging""" @@ -184,6 +163,28 @@ def assign_dp_mappings( tuyadevices, mappings ): dev['mapping'] = None +def merge_dps_results(dest, src): + """Merge multiple receive() responses into a single dict + + `src` will be combined with and merged into `dest` + """ + if src and isinstance(src, dict) and 'Error' not in src and 'Err' not in src: + for k in src: + if k == 'dps' and src[k] and isinstance(src[k], dict): + if 'dps' not in dest or not isinstance(dest['dps'], dict): + dest['dps'] = {} + for dkey in src[k]: + dest['dps'][dkey] = src[k][dkey] + elif k == 'data' and src[k] and isinstance(src[k], dict) and 'dps' in src[k] and isinstance(src[k]['dps'], dict): + if k not in dest or not isinstance(dest[k], dict): + dest[k] = {'dps': {}} + if 'dps' not in dest[k] or not isinstance(dest[k]['dps'], dict): + dest[k]['dps'] = {} + for dkey in src[k]['dps']: + dest[k]['dps'][dkey] = src[k]['dps'][dkey] + else: + dest[k] = src[k] + ######################################################## # Core Classes and Functions diff --git a/tinytuya/core/crypto_helper.py b/tinytuya/core/crypto_helper.py index 43697343..a8d1583b 100644 --- a/tinytuya/core/crypto_helper.py +++ b/tinytuya/core/crypto_helper.py @@ -1,7 +1,6 @@ # TinyTuya Module # -*- coding: utf-8 -*- -from __future__ import print_function # python 2.7 support import base64 import logging import time diff --git a/tinytuya/scanner.py b/tinytuya/scanner.py index 0359e14b..01d9fb94 100644 --- a/tinytuya/scanner.py +++ b/tinytuya/scanner.py @@ -13,7 +13,6 @@ """ # Modules -from __future__ import print_function from collections import namedtuple import ipaddress import json @@ -42,11 +41,6 @@ #except: # SCANLIBS = False -# Backward compatibility for python2 -try: - input = raw_input -except NameError: - pass try: import netifaces # pylint: disable=E0401 @@ -379,8 +373,8 @@ def get_peer(self): r = self.sock.recv( 5000 ) if self.debug: print('Debug sock', self.ip, 'closed but received data?? Received:', r) - # ugh, ConnectionResetError and ConnectionRefusedError are not available on python 2.7 - #except ConnectionResetError: + # Connection retry logic + #except ConnectionResetError: # Python 3 specific except OSError as e: if self.initial_connect_retries and e.errno == errno.ECONNRESET: # connected, but then closed @@ -1060,8 +1054,6 @@ def scan(scantime=None, color=True, forcescan=False, discover=True, assume_yes=F def _generate_ip(networks, verbose, term): for netblock in networks: - if tinytuya.IS_PY2 and type(netblock) == str: - netblock = netblock.decode('latin1') try: network = ipaddress.ip_network(netblock, strict=False) log.debug("Starting brute force network scan %s", network) diff --git a/tinytuya/scanner_async.py b/tinytuya/scanner_async.py new file mode 100644 index 00000000..77cb796a --- /dev/null +++ b/tinytuya/scanner_async.py @@ -0,0 +1,853 @@ +# TinyTuya Setup Wizard +# -*- coding: utf-8 -*- +""" +TinyTuya Network Scanner for Tuya based WiFi smart devices + +Author: Jason A. Cox +For more information see https://github.com/jasonacox/tinytuya + +Description + Scan will scan the local network for Tuya devices and if a local devices.json is + present in the local directory, will use the Local KEYs to poll the devices for + status. + +""" +# Modules +import asyncio +from collections import namedtuple +import ipaddress +import json +import logging +import socket +import sys +import time +import errno +import base64 +import traceback +import tinytuya + +#tinytuya.set_debug() +#from .core import * +#from . import core +#import .core as core +#tinytuya = sys.modules['tinytuya'] +#print(sys.modules) +#for m in sys.modules: +# print(m) +#print(tinytuya.DEVICEFILE) + +#from .scanner_classes import * + +try: + from colorama import init + HAVE_COLORAMA = True +except ImportError: + HAVE_COLORAMA = False + +HAVE_COLOR = HAVE_COLORAMA or not sys.platform.startswith('win') + + +try: + import netifaces # pylint: disable=E0401 + NETIFLIBS = True +except ImportError: + NETIFLIBS = False + +try: + import psutil # pylint: disable=E0401 + PSULIBS = True +except ImportError: + PSULIBS = False + +# Colorama terminal color capability for all platforms +if HAVE_COLORAMA: + init() + +# Configuration Files +DEVICEFILE = tinytuya.DEVICEFILE +SNAPSHOTFILE = tinytuya.SNAPSHOTFILE + +# Global Network Configs +DEFAULT_NETWORK = tinytuya.DEFAULT_NETWORK +TCPTIMEOUT = tinytuya.TCPTIMEOUT # Seconds to wait for socket open for scanning +TCPPORT = tinytuya.TCPPORT # Tuya TCP Local Port +MAXCOUNT = tinytuya.MAXCOUNT # How many tries before stopping +UDPPORT = tinytuya.UDPPORT # Tuya 3.1 UDP Port +UDPPORTS = tinytuya.UDPPORTS # Tuya 3.3 encrypted UDP Port +UDPPORTAPP = tinytuya.UDPPORTAPP # Tuya app encrypted UDP Port +TIMEOUT = tinytuya.TIMEOUT # Socket Timeout +SCANTIME = tinytuya.SCANTIME # How many seconds to wait before stopping +BROADCASTTIME = 6 # How often to broadcast to port 7000 to get v3.5 devices to send us their info + +max_parallel = 300 +connect_timeout = 3 + +devinfo_keys = ('ip', 'mac', 'name', 'key', 'gwId', 'active', 'ability', 'encrypt', 'productKey', 'version', 'token', 'wf_cfg' ) +# id ver + +TermColors = namedtuple("TermColors", "bold, subbold, normal, dim, alert, alertdim, cyan, red, yellow") + +FoundDevice = namedtuple( 'FoundDevice', 'data, time' ) + +# Logging +log = logging.getLogger(__name__) + +scanner = None + +async def scanfor( did, timeout=True ): + global scanner + if scanner is None: + print('creating new scanner') + scanner = Scanner() + + return await scanner.scanfor( did, timeout ) + +# Helper Functions +def getmyIPaddr(): + # Fetch my IP address and assume /24 network + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(("8.8.8.8", 80)) + r = str(s.getsockname()[0]) + s.close() + return r + +def getmyIP(): + r = getmyIPaddr().split('.') + # assume a /24 network + return '%s.%s.%s.0/24' % tuple(r[:3]) + +def getmyIPs( term, verbose, ask ): + if NETIFLIBS: + return getmyIPs_via_netifaces( term, verbose, ask ) + if PSULIBS: + return getmyIPs_via_psutil( term, verbose, ask ) + return None + +def getmyIPs_via_netifaces( term, verbose, ask ): + ips = {} + interfaces = netifaces.interfaces() + try: + # skip the loopback interface + interfaces.remove('lo') + except: + pass + for interface in interfaces: + addresses = netifaces.ifaddresses(interface) + #for address_family in (netifaces.AF_INET, netifaces.AF_INET6): + family_addresses = addresses.get(netifaces.AF_INET) + if not family_addresses: + continue + + for address in family_addresses: + k = str(ipaddress.IPv4Interface(address['addr']+'/'+address['netmask']).network) + if k[:4] == '127.': + # skip the loopback interface + continue + if ask: + if ask != 2: + answer = input( '%sScan network %s from interface %s?%s ([Y]es/[n]o/[a]ll yes): ' % (term.bold, k, str(interface), term.normal) ) + if answer[0:1].lower() == 'a': + ask = 2 + elif answer.lower().find('n') >= 0: + continue + if verbose: + print(term.dim + 'Adding Network', k, 'to the force-scan list') + ips[k] = True + return ips.keys() + +def getmyIPs_via_psutil( term, verbose, ask ): + ips = {} + interfaces = psutil.net_if_addrs() + for interface in interfaces: + addresses = interfaces[interface] + for addr in addresses: + if addr.family != socket.AF_INET: + continue + k = str(ipaddress.IPv4Interface(addr.address+'/'+addr.netmask).network) + if k[:4] == '127.': + # skip the loopback interface + continue + if ask: + if ask != 2: + answer = input( '%sScan network %s from interface %s?%s ([Y]es/[n]o/[a]ll yes): ' % (term.bold, k, str(interface), term.normal) ) + if answer[0:1].lower() == 'a': + ask = 2 + elif answer.lower().find('n') >= 0: + continue + if verbose: + print(term.dim + 'Adding Network', k, 'to the force-scan list') + ips[k] = True + return ips.keys() + +def get_ip_to_broadcast(): + ip_to_broadcast = {} + + if NETIFLIBS: + interfaces = netifaces.interfaces() + for interface in interfaces: + addresses = netifaces.ifaddresses(interface) + ipv4 = addresses.get(netifaces.AF_INET) + + if ipv4: + for addr in ipv4: + if 'broadcast' in addr and 'addr' in addr and addr['broadcast'] != addr['addr']: + ip_to_broadcast[addr['broadcast']] = addr['addr'] + + if ip_to_broadcast: + return ip_to_broadcast + + if PSULIBS: + interfaces = psutil.net_if_addrs() + for addresses in interfaces.values(): + for addr in addresses: + if addr.family == socket.AF_INET and addr.broadcast and addr.address and addr.address != addr.broadcast: # AF_INET is for IPv4 + ip_to_broadcast[addr.broadcast] = addr.address + + if ip_to_broadcast: + return ip_to_broadcast + + ip_to_broadcast['255.255.255.255'] = getmyIPaddr() + return ip_to_broadcast + +def send_discovery_request( iface_list=None ): + close_sockets = False + + if not tinytuya.AESCipher.CRYPTOLIB_HAS_GCM: + # GCM is required for discovery requests + return False + + if not iface_list: + close_sockets = True + iface_list = {} + client_bcast_addrs = get_ip_to_broadcast() + for bcast in client_bcast_addrs: + addr = client_bcast_addrs[bcast] + iface_list[addr] = { 'broadcast': bcast } + + at_least_one_succeeded = False + bcast_error_messages = [] + for address in iface_list: + iface = iface_list[address] + if 'socket' not in iface: + iface['socket'] = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP + iface['socket'].setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + try: + iface['socket'].bind( (address,0) ) + except: + log.debug( 'Failed to bind to address %r for discovery broadcasts, skipping interface!', address, exc_info=True ) + continue + + if 'payload' not in iface: + bcast = json.dumps( {"from":"app","ip":address} ).encode() + bcast_msg = tinytuya.TuyaMessage( 0, tinytuya.REQ_DEVINFO, None, bcast, 0, True, tinytuya.PREFIX_6699_VALUE, True ) + iface['payload'] = tinytuya.pack_message( bcast_msg, hmac_key=tinytuya.udpkey ) + + if 'port' not in iface: + iface['port'] = 7000 + + log.debug( 'Sending discovery broadcast from %r to %r on port %r', address, iface['broadcast'], iface['port'] ) + try: + iface['socket'].sendto( iface['payload'], (iface['broadcast'], iface['port']) ) + at_least_one_succeeded = True + except socket.error as e: + log.debug( f"Failed to send discovery broadcast from {address} to {iface['broadcast']}:{iface['port']}: {e}" ) + bcast_error_messages.append( f"Failed to send discovery broadcast from {address} to {iface['broadcast']}:{iface['port']}: {e}" ) + + if close_sockets: + iface['socket'].close() + del iface['socket'] + + if not at_least_one_succeeded: + if log.level != logging.DEBUG: + for line in bcast_error_messages: + log.error( line ) + log.error( 'Sending broadcast discovery packet failed, certain v3.5 devices will not be found!' ) + + return iface_list + +def _generate_ip(networks, verbose, term): + for netblock in networks: + try: + network = ipaddress.ip_network(netblock, strict=False) + log.debug("Starting brute force network scan %s", network) + except: + log.debug("Unable to get network for %r, ignoring", netblock) + if verbose: + print(term.alert + + 'ERROR: Unable to get network for %r, ignoring.' % netblock + term.normal) + print(traceback.format_exc()) + continue + + if verbose: + print(term.bold + ' Starting Scan for network %s%s' % (network, term.dim)) + # Loop through each host + for addr in ipaddress.IPv4Network(network): + yield str(addr) + +def _print_device_info( result, note, term, extra_message=None, verbose=True ): + if not verbose: + return + ip = result["ip"] + gwId = result["gwId"] + productKey = result["productKey"] if result["productKey"] else '?' + version = result["version"] if result["version"] and result["version"] != '0.0' else '??' + devicename = result["name"] + dkey = result["key"] + mac = result["mac"] + devicetype = result['dev_type'] if 'dev_type' in result else '??' + + suffix = term.dim + ", MAC = " + mac + "" + if not result['name']: + devicename = "%sUnknown v%s Device%s" % (term.alert, version, term.normal+term.dim) # (term.normal+term.dim, term.normal, version, term.dim) + else: + devicename = term.normal + result['name'] + term.dim + print( + "%s Product ID = %s [%s]:\n %sAddress = %s %sDevice ID = %s (len:%d) %sLocal Key = %s %sVersion = %s %sType = %s%s" + % ( + devicename, + productKey, + note, + term.subbold, + ip, + term.cyan, + gwId, + len(gwId), + term.red, + dkey, + term.yellow, + version, + term.cyan, + devicetype, + suffix + ) + ) + + if extra_message: + print( extra_message ) + +class UDPProtocol: + def __init__(self, scanner): + self.scanner = scanner + self.transport = None + + def connection_made(self, transport): + self.transport = transport + print('connection_made', transport) + + def datagram_received(self, data, addr): + #print("datagram_received: Received:", addr, data) + try: + ip = addr[0] + tgt_port = addr[1] + result = tinytuya.decrypt_udp( data ) + result = json.loads(result) + log.debug("Received valid UDP packet: %r", result) + except: + log.debug("Invalid UDP Packet from %r port %r - %r", ip, tgt_port, data) + return + + if 'from' in result: + # from app + pass + elif 'gwId' in result: + # from device + devid = result['gwId'] + self.scanner.found_devices[devid] = FoundDevice( result, time.time() ) + if devid in self.scanner.device_listeners: + for fut in self.scanner.device_listeners[devid]: + fut.set_result( result ) + + def error_received(self, exc): + print('Error received:', exc) + + def connection_lost(self, exc): + print("Connection closed", exc) + +class UDPProtocolClient(UDPProtocol): + pass + +class UDPProtocolClients(UDPProtocol): + pass + +class UDPProtocolApp(UDPProtocol): + pass + + +class Scanner(): + def __init__( self, forcescan=False, discover=True ): + self.forcescan = forcescan + self.discover = discover + self.found_devices = {} + self.device_listeners = {} + self.loop = asyncio.get_running_loop() + self.start = asyncio.Event() + + self.task = asyncio.create_task( self.background_scan() ) + + async def reload_tuyadevices(self): + pass + + async def scanfor( self, devid, timeout=True, use_cache=True ): + print('scanfor adding device:', devid) + self.start.set() # start it now in case we need to expire old data + if use_cache and devid in self.found_devices: + print('scanfor returning cached data:', devid, self.found_devices[devid]) + return self.found_devices[devid].data + future = self.loop.create_future() + if devid not in self.device_listeners: + self.device_listeners[devid] = [future] + else: + self.device_listeners[devid].append( future ) + if timeout is True: + timeout = tinytuya.SCANTIME + if timeout: + try: + await asyncio.wait_for(future, timeout=timeout) + result = future.result() + except: + result = None + else: + result = await future + print('scanfor got result:', result) + self.device_listeners[devid].remove( future ) + if not self.device_listeners[devid]: + del self.device_listeners[devid] + return result + + async def background_scan(self): + #if scantime is None: + self.scantime = 2 #tinytuya.SCANTIME + self.start.set() # start it now to fill the cache + tuyadevices = [] + discoverers = [] + client_ip_broadcast_list = {} + + while self.device_listeners or await self.start.wait(): + print('starting scan!') + self.start.clear() + await self.reload_tuyadevices() + + if self.forcescan and not self.tuyadevices: + # print warning + # print(term.alert + 'Warning: Force-scan requires keys in %s but no keys were found. Disabling force-scan.' % DEVICEFILE + term.normal) + if not self.discover: + continue + + if self.discover: + # Enable UDP listening broadcasting mode on UDP port 6666 - 3.1 Devices + #client.bind(("", UDPPORT)) + discoverers.append( await self.loop.create_datagram_endpoint( lambda: UDPProtocolClient(self), local_addr=('0.0.0.0',UDPPORT), reuse_port=True, allow_broadcast=True ) ) + + # Enable UDP listening broadcasting mode on encrypted UDP port 6667 - 3.3 Devices + #clients.bind(("", UDPPORTS)) + discoverers.append( await self.loop.create_datagram_endpoint( lambda: UDPProtocolClients(self), local_addr=('0.0.0.0',UDPPORTS), reuse_port=True, allow_broadcast=True ) ) + + # Enable UDP listening broadcasting mode on encrypted UDP port 7000 - App + #clientapp.bind(("", UDPPORTAPP)) + discoverers.append( await self.loop.create_datagram_endpoint( lambda: UDPProtocolApp(self), local_addr=('0.0.0.0',UDPPORTAPP), reuse_port=True, allow_broadcast=True ) ) + + for i in tuyadevices: + options['keylist'].append( KeyObj( i['id'], i['key'] ) ) + + if False: #forcescan: + # argparse gives us a list of lists + # the inner list is empty [[]] when no address specified + add_connected = True + if isinstance( forcescan, list ) or isinstance( forcescan, tuple ): + for ip in forcescan: + if isinstance( ip, list ) or isinstance( ip, tuple ): + for ip2 in ip: + networks.append( ip2 ) + add_connected = False + else: + networks.append( ip ) + add_connected = False + + if isinstance( forcescan, str ) or isinstance( forcescan, bytes ): + networks.append( forcescan ) + add_connected = False + + if add_connected: + if (not NETIFLIBS) and (not PSULIBS): + print(term.alert + + ' NOTE: neither module netifaces nor module psutil are available, multi-interface machines will be limited.\n' + ' (Requires: `pip install netifaces` or `pip install psutil`)\n' + term.dim) + try: + ip = getmyIP() + networks.append( ip ) + except: + #traceback.print_exc() + networks.append( u''+DEFAULT_NETWORK ) + log.debug("Unable to get local network, using default %r", DEFAULT_NETWORK) + if verbose: + print(term.alert + + 'ERROR: Unable to get your IP address and network automatically, using %s' % DEFAULT_NETWORK + + term.normal) + else: + networks = getmyIPs( term, verbose, not assume_yes ) + if not networks: + print(term.alert + 'No networks to force-scan, exiting.' + term.normal) + return None + + if networks: + if verbose: + log.debug("Force-scanning networks: %r", networks) + + scan_ips = _generate_ip( networks, verbose, term ) + ip_scan = ip_scan_running = True + if discover: + ip_scan_delay = time.time() + 5 + + ## If no scantime value set use default + #if not scantime: + # scantime = 0 if ip_scan_running else tinytuya.SCANTIME + + client_bcast_addrs = get_ip_to_broadcast() + for bcast in client_bcast_addrs: + addr = client_bcast_addrs[bcast] + client_ip_broadcast_list[addr] = { 'broadcast': bcast } + + + if False: + if 'from' in result and result['from'] == 'app': #sock is clientapp: + if ip not in broadcasted_apps: + broadcasted_apps[ip] = result + if verbose: + print( term.alertdim + 'New Broadcast from App at ' + str(ip) + term.dim + ' - ' + str(result) + term.normal ) + #continue + + while self.device_listeners: + await asyncio.sleep( 2 ) + + # keep going for at least this long to refresh the cache + await asyncio.sleep( self.scantime ) + + for client, protocol in discoverers: + client.close() + discoverers = [] + + self.start.clear() + continue + +def _get_gwid( old ): + if 'gwId' in old and old['gwId']: + return old["gwId"] + if 'id' in old and old['id']: + return old["id"] + return 0 + +def _build_item( old, new ): + item = {} + item['id'] = item['gwId'] = _get_gwid( old ) + ip = ver = 0 + items = { 'ip':0, 'version':0, 'name':'', 'key':'', 'mac':None } + for itm in items: + if new and itm in new and new[itm]: + item[itm] = new[itm] + elif itm in old and old[itm]: + item[itm] = old[itm] + else: + item[itm] = items[itm] + return item + +def _display_status( item, dps, term ): + name = item['name'] + if name == "": + name = item['gwId'] + ip = item['ip'] + if not ip: + print(" %s[%-25.25s] %sError: No IP found%s" % + (term.subbold, name, term.alert, term.normal)) + elif not dps: + print(" %s[%-25.25s] %s%-18s - %sNo Response" % + (term.subbold, name, term.dim, ip, term.alert)) + else: + if '1' in dps or '20' in dps: + state = term.alertdim + "[Off]" + term.dim + if '1' in dps and dps['1'] is True: + state = term.bold + "[On] " + term.dim + elif '20' in dps and dps['20'] is True: + state = term.bold + "[On] " + term.dim + print(" %s[%-25.25s] %s%-18s - %s - DPS: %r" % + (term.subbold, name, term.dim, ip, state, dps)) + else: + print(" %s[%-25.25s] %s%-18s - DPS: %r" % + (term.subbold, name, term.dim, ip, dps)) + +def _snapshot_load_item( itm ): + # normalize all the fields + itm['id'] = itm['gwId'] = _get_gwid( itm ) + if 'ver' in itm and itm['ver']: + itm['version'] = float(itm['ver']) + del itm['ver'] + elif 'version' in itm and itm['version']: + itm['version'] = float(itm['version']) + else: + itm['version'] = 0.0 + return itm + +def _snapshot_save_item( old ): + # normalize all the fields + # "version" is prefered over "ver", but saved as "ver" + # "gwId" is prefered over "id", but saved as "id" + item = {} + item['id'] = _get_gwid( old ) + items = { 'ip':'', 'ver':'', 'origin':'', 'name':'', 'key':'', 'mac':'' } + for itm in old: + item[itm] = old[itm] + + for itm in items: + if itm not in item or not item[itm]: + item[itm] = items[itm] + + if 'version' in old: + if old['version']: + item['ver'] = old['version'] + del item['version'] + + if 'gwId' in item: + del item['gwId'] + + item['ver'] = str(item['ver']) + + return item + +def load_snapshotfile(fname): + if (not fname) or (not isinstance(fname, str)): + fname = SNAPSHOTFILE + with open(fname) as json_file: + data = json.load(json_file) + devices = [] # pylint: disable=W0621 + if data and 'devices' in data: + for dev in data['devices']: + devices.append( _snapshot_load_item(dev) ) + if data: + data['devices'] = devices + return data + +def save_snapshotfile(fname, data, term=None): + if (not fname) or (not isinstance(fname, str)): + fname = SNAPSHOTFILE + if term: + norm = term.normal + bold = term.bold + else: + norm = bold = '' + devices = [] # pylint: disable=W0621 + if type(data) == dict: + data = list(data.values()) + for itm in data: + devices.append( _snapshot_save_item(itm) ) + current = {'timestamp' : time.time(), 'devices' : devices} + output = json.dumps(current, indent=4) + print(bold + "\n>> " + norm + "Saving device snapshot data to " + fname + "\n") + with open(fname, "w") as outfile: + outfile.write(output) + +# Scan Devices in snapshot.json +def snapshot(color=True, assume_yes=False, skip_poll=None): + """Uses snapshot.json to scan devices + + Parameters: + color = True or False, print output in color [Default: True] + assume_yes = True or False, auto-answer 'yes' to "Poll local devices?" (ignored when skip_poll is set) + skip_poll = True or False, auto-answer 'no' to "Poll local devices?" (overrides assume_yes) + """ + # Terminal formatting + color = color and HAVE_COLOR + termcolors = tinytuya.termcolor(color) + term = TermColors( *termcolors ) + + print( + "\n%sTinyTuya %s(Tuya device scanner)%s [%s]\n" + % (term.bold, term.normal, term.dim, tinytuya.__version__) + ) + + try: + data = load_snapshotfile(SNAPSHOTFILE) + except Exception as e: + #traceback.print_exc(0) + print("%s ERROR: Missing %s file:%s %s: %s\n" % (term.alert, SNAPSHOTFILE, term.normal, type(e).__name__, e)) + return + + print("%sLoaded %s - %d devices:\n" % (term.dim, SNAPSHOTFILE, len(data["devices"]))) + + # Print a table with all devices + table = [] + print("%s%-25s %-24s %-15s %-17s %-5s" % (term.normal, "Name","ID", "IP","Key","Version")) + print(term.dim) + by_ip = {} + devicesx = sorted(data["devices"], key=lambda x: x['name']) + for idx in devicesx: + device = _build_item( idx, None ) + ips = device['ip'].ljust(15) if device['ip'] else (term.alert + "E: No IP found " + term.normal) + dname = device['name'] + if dname == "": + dname = device['gwId'] + print("%s%-25.25s %s%-24s %s%s %s%-17s %s%-5s" % + (term.dim, dname, term.cyan, device['gwId'], term.subbold, ips, term.red, device['key'], term.yellow, device['version'])) + if device['ip']: + by_ip[device['ip']] = device + + # Find out if we should poll all devices + if skip_poll: + answer = 'n' + elif assume_yes: + answer = 'y' + else: + answer = input(term.subbold + '\nPoll local devices? ' + term.normal + '(Y/n): ') + if answer.lower().find('n') < 0: + print("") + print("%sPolling %s local devices from last snapshot..." % (term.normal, len(devicesx))) + result = devices(verbose=False, color=color, poll=True, byID=True, discover=False, snapshot=by_ip) + + for i in devicesx: + gwId = _get_gwid( i ) + if not gwId or gwId not in result: + item = _build_item( i, None ) + _display_status( item, None, term ) + else: + item = _build_item( i, result[gwId] ) + if 'dps' in result[gwId] and 'dps' in result[gwId]['dps'] and result[gwId]['dps']['dps']: + _display_status( item, result[gwId]['dps']['dps'], term ) + else: + _display_status( item, None, term ) + + # for loop + # if poll + print("%s\nDone.\n" % term.dim) + return + + +# Scan All Devices in devices.json +def alldevices(color=True, scantime=None, forcescan=False, discover=True, assume_yes=False, skip_poll=None): + """Uses devices.json to scan devices + + Parameters: + color = True or False, print output in color [Default: True] + """ + # Terminal formatting + color = color and HAVE_COLOR + #(bold, subbold, normal, dim, alert, alertdim, cyan, red, yellow) = tinytuya.termcolor(color) + termcolors = tinytuya.termcolor(color) + term = TermColors( *termcolors ) + + print( + "\n%sTinyTuya %s(Tuya device scanner)%s [%s]\n" + % (term.bold, term.normal, term.dim, tinytuya.__version__) + ) + # Check to see if we have additional Device info + try: + # Load defaults + with open(DEVICEFILE) as f: + tuyadevices = json.load(f) + log.debug("loaded=%s [%d devices]", DEVICEFILE, len(tuyadevices)) + except: + print("%s ERROR: Missing %s file\n" % (term.alert, DEVICEFILE)) + return + + print("%sLoaded %s - %d devices:" % (term.dim, DEVICEFILE, len(tuyadevices))) + + # Display device list + print("\n\n" + term.bold + "Device Listing\n" + term.dim) + output = json.dumps(sorted(tuyadevices,key=lambda x: x['name']), indent=4) + print(output) + + # Find out if we should poll all devices + if skip_poll: + answer = 'n' + elif assume_yes: + answer = 'y' + else: + answer = input(term.subbold + '\nPoll local devices? ' + term.normal + '(Y/n): ') + if answer.lower().find('n') < 0: + poll_and_display( tuyadevices, color=color, scantime=scantime, snapshot=True, forcescan=forcescan, discover=discover ) + + print("%s\nDone.\n" % term.dim) + return + +def poll_and_display( tuyadevices, color=True, scantime=None, snapshot=False, forcescan=False, discover=True ): # pylint: disable=W0621 + color = color and HAVE_COLOR + termcolors = tinytuya.termcolor(color) + term = TermColors( *termcolors ) + + by_id = [x['id'] for x in tuyadevices] + # Scan network for devices and provide polling data + print(term.normal + "\nScanning local network for Tuya devices...") + result = devices(verbose=False, poll=True, byID=True, scantime=scantime, wantids=by_id, show_timer=True, forcescan=forcescan, tuyadevices=tuyadevices, discover=discover) + print(" %s%s local devices discovered%s" % (term.dim, len(result), term.normal)) + print("") + + polling = [] + print("Polling local devices...") + # devices = sorted(data["devices"], key=lambda x: x['name']) + for idx in sorted(tuyadevices, key=lambda x: x['name']): + gwId = _get_gwid( idx ) + if gwId and gwId in result: + item = _build_item( idx, result[gwId] ) + if 'dps' in result[gwId] and 'dps' in result[gwId]['dps']: + _display_status( item, result[gwId]['dps']['dps'], term ) + else: + _display_status( item, None, term ) + else: + item = _build_item( idx, None ) + _display_status( item, None, term ) + polling.append(item) + # for loop + + if snapshot: + # Save polling data snapsot + save_snapshotfile( SNAPSHOTFILE, result, term ) + + return polling + +# Scan Devices in tuyascan.json - respond in JSON +def snapshotjson(): + """Uses snapshot.json to scan devices - respond with json + """ + polling = [] + + try: + data = load_snapshotfile(SNAPSHOTFILE) + except: + current = {'timestamp' : time.time(), 'error' : 'Could not load JSON snapshot file: %s' % SNAPSHOTFILE} + output = json.dumps(current, indent=4) + print(output) + return + + devicesx = sorted(data["devices"], key=lambda x: x['name']) + by_ip = {} + for idx in devicesx: + if 'ip' in idx and idx['ip']: + device = _build_item( idx, None ) + by_ip[idx['ip']] = device + + resp = devices(verbose=False, scantime=0, poll=True, byID=True, discover=False, snapshot=by_ip) + + for idx in devicesx: + gwId = _get_gwid( idx ) + + if gwId and gwId in resp: + item = _build_item( idx, resp[gwId] ) + else: + item = _build_item( idx, None ) + if not item['ip']: + item['error'] = "No IP" + elif gwId not in resp or 'dps' not in resp[gwId] or 'dps' not in resp[gwId]['dps'] or not resp[gwId]['dps']['dps']: + item['error'] = "No Response" + else: + item['dps'] = resp[gwId]['dps']['dps'] + polling.append(item) + # for loop + current = {'timestamp' : time.time(), 'devices' : polling} + output = json.dumps(current, indent=4) + print(output) + return + + +if __name__ == '__main__': + + try: + scan() + except KeyboardInterrupt: + pass diff --git a/tinytuya/wizard.py b/tinytuya/wizard.py index 55902de9..b8aeddda 100644 --- a/tinytuya/wizard.py +++ b/tinytuya/wizard.py @@ -22,7 +22,6 @@ The TuyAPI/CLI wizard inspired and informed this python version. """ # Modules -from __future__ import print_function import json from datetime import datetime import tinytuya @@ -36,11 +35,6 @@ HAVE_COLOR = HAVE_COLORAMA or not sys.platform.startswith('win') -# Backward compatibility for python2 -try: - input = raw_input -except NameError: - pass # Colorama terminal color capability for all platforms if HAVE_COLORAMA: diff --git a/tools/ttcorefunc.py b/tools/ttcorefunc.py index 031e39fb..8928ff49 100644 --- a/tools/ttcorefunc.py +++ b/tools/ttcorefunc.py @@ -2,7 +2,6 @@ # -*- coding: utf-8 -*- # Modules -from __future__ import print_function # python 2.7 support import binascii from collections import namedtuple import base64 @@ -96,8 +95,6 @@ NO_PROTOCOL_HEADER_CMDS = [DP_QUERY, DP_QUERY_NEW, UPDATEDPS, HEART_BEAT, SESS_KEY_NEG_START, SESS_KEY_NEG_RESP, SESS_KEY_NEG_FINISH, LAN_EXT_STREAM ] -# Python 2 Support -IS_PY2 = sys.version_info[0] == 2 # Tuya Packet Format TuyaHeader = namedtuple('TuyaHeader', 'prefix seqno cmd length total_length') @@ -241,21 +238,11 @@ def _unpad(s, verify_padding=False): # Misc Helpers def bin2hex(x, pretty=False): - if pretty: - space = " " - else: - space = "" - if IS_PY2: - result = "".join("%02X%s" % (ord(y), space) for y in x) - else: - result = "".join("%02X%s" % (y, space) for y in x) - return result + space = " " if pretty else "" + return "".join("%02X%s" % (y, space) for y in x) def hex2bin(x): - if IS_PY2: - return x.decode("hex") - else: - return bytes.fromhex(x) + return bytes.fromhex(x) def set_debug(toggle=True, color=True): """Enable tinytuya verbose logging"""