diff --git a/cli/README.md b/cli/README.md index b7e48dc9..ea61a82e 100644 --- a/cli/README.md +++ b/cli/README.md @@ -1,138 +1,150 @@ -Command line interface for python-broadlink -=========================================== +# Command line interface for python-broadlink This is a command line interface for the python-broadlink API. +## Requirements -Requirements ------------- You need to install the module first: -``` + +```shell pip3 install broadlink ``` -Installation ------------ -Download "broadlink_cli" and "broadlink_discovery". - +broadlink_cli should then be in your path -Programs --------- -* broadlink_discovery: Discover Broadlink devices connected to the local network. +## Programs * broadlink_cli: Send commands and query the Broadlink device. - -Device specification formats ----------------------------- +## Device specification formats Using separate parameters for each information: -``` + +```shell broadlink_cli --type 0x2712 --host 1.1.1.1 --mac aaaaaaaaaa --temp ``` Using all parameters as a single argument: -``` + +```shell broadlink_cli --device "0x2712 1.1.1.1 aaaaaaaaaa" --temp ``` Using file with parameters: -``` + +```shell broadlink_cli --device @BEDROOM.device --temp ``` + This is prefered as the configuration is stored in a file and you can change it later to point to a different device. -Example usage -------------- +## Example usage ### Common commands #### Join device to the Wi-Fi network -``` + +```shell broadlink_cli --joinwifi SSID PASSWORD ``` #### Discover devices connected to the local network -``` + +```shell broadlink_discovery ``` ### Universal remotes #### Learn IR code and show at console -``` -broadlink_cli --device @BEDROOM.device --learn + +```shell +broadlink_cli --device @BEDROOM.device --irlearn ``` #### Learn RF code and show at console -``` -broadlink_cli --device @BEDROOM.device --rfscanlearn + +```shell +broadlink_cli --device @BEDROOM.device --rflearn ``` #### Learn IR code and save to file -``` + +```shell broadlink_cli --device @BEDROOM.device --learnfile LG-TV.power ``` #### Learn RF code and save to file -``` -broadlink_cli --device @BEDROOM.device --rfscanlearn --learnfile LG-TV.power + +```shell +broadlink_cli --device @BEDROOM.device --rflearn --learnfile LG-TV.power ``` #### Send code -``` + +```shell broadlink_cli --device @BEDROOM.device --send DATA ``` #### Send code from file -``` + +```shell broadlink_cli --device @BEDROOM.device --send @LG-TV.power ``` #### Check temperature -``` + +```shell broadlink_cli --device @BEDROOM.device --temperature ``` #### Check humidity -``` + +```shell broadlink_cli --device @BEDROOM.device --humidity ``` ### Smart plugs #### Turn on -``` + +```shell broadlink_cli --device @BEDROOM.device --turnon ``` #### Turn off -``` + +```shell broadlink_cli --device @BEDROOM.device --turnoff ``` #### Turn on nightlight -``` + +```shell broadlink_cli --device @BEDROOM.device --turnnlon ``` #### Turn off nightlight -``` + +```shell broadlink_cli --device @BEDROOM.device --turnnloff ``` #### Check power state -``` + +```shell broadlink_cli --device @BEDROOM.device --check ``` #### Check nightlight state -``` + +```shell broadlink_cli --device @BEDROOM.device --checknl ``` #### Check power consumption -``` + +```shell broadlink_cli --device @BEDROOM.device --energy ``` diff --git a/cli/broadlink_cli b/cli/broadlink_cli index 7913e332..a572c8e7 100755 --- a/cli/broadlink_cli +++ b/cli/broadlink_cli @@ -1,6 +1,14 @@ #!/usr/bin/env python3 +""" +A command line interface to the broadlink library for executing common actions to +control and interact with Broadlink devices +""" + import argparse import base64 +import os +import json +import sys import time from typing import List @@ -9,18 +17,254 @@ from broadlink.const import DEFAULT_PORT from broadlink.exceptions import ReadError, StorageError from broadlink.remote import data_to_pulses, pulses_to_data -TIMEOUT = 30 +DEFAULT_TIMEOUT = 30 # seconds +DEFAULT_DEVICE = 0x2712 -def auto_int(x): +def auto_int(x) -> int: + """Parse the given value to an integer""" return int(x, 0) +def auto_hex(x) -> bytearray: + """Parse the given hex string to a byte array""" + return bytearray.fromhex(x) + + +def parse_args() -> object: + """parse and process the commandline arguments""" + + parser = argparse.ArgumentParser( + fromfile_prefix_chars="@", + description="Control and interact with Broadlink devices.", + formatter_class=argparse.RawTextHelpFormatter, + ) + + action_group = parser.add_argument_group( + "Actions", "Specify the action to perform." + ) + action_group.add_argument( + "--sensors", + dest="action", + action="store_const", + const="sensors", + help="Check all sensors", + ) + action_group.add_argument( + "--temperature", + dest="action", + action="store_const", + const="temperature", + help="Request temperature from the device", + ) + action_group.add_argument( + "--humidity", + dest="action", + action="store_const", + const="humidity", + help="Request humidity from the device", + ) + + action_group.add_argument( + "--energy", + dest="action", + action="store_const", + const="energy", + help="Request energy consumption from the device", + ) + action_group.add_argument( + "--check", + dest="action", + action="store_const", + const="check", + help="Check current power state", + ) + action_group.add_argument( + "--checknl", + dest="action", + action="store_const", + const="checknl", + help="Check current nightlight state", + ) + + action_group.add_argument( + "--turnon", + dest="action", + action="store_const", + const="turnon", + help="Turn on the device", + ) + action_group.add_argument( + "--turnoff", + dest="action", + action="store_const", + const="turnoff", + help="Turn off the device", + ) + action_group.add_argument( + "--turnnlon", + dest="action", + action="store_const", + const="turnnlon", + help="Turn on nightlight on the device", + ) + action_group.add_argument( + "--turnnloff", + dest="action", + action="store_const", + const="turnnloff", + help="Turn off nightlight on the device", + ) + action_group.add_argument( + "--switch", + dest="action", + action="store_const", + const="switch", + help="Switch state from on to off and off to on", + ) + + action_group.add_argument( + "--send", dest="action", action="store_const", const="send", help="Send command" + ) + + action_group.add_argument( + "--convert", + dest="action", + action="store_const", + const="convert", + help="Convert input data to durations", + ) + + action_group.add_argument( + "--discover", + dest="action", + action="store_const", + const="discover", + help="Scan the local network to discover Broadlink devices", + ) + action_group.add_argument( + "--irlearn", + "--learn", + dest="action", + action="store_const", + const="irlearn", + help="IR learning", + ) + action_group.add_argument( + "--rflearn", + dest="action", + action="store_const", + const="rflearn", + help="RF scan learning", + ) + + parser.add_argument( + "--frequency", + type=float, + help="Specify radiofrequency for --rflean, otherwise scan", + ) + + parser.add_argument("--device", help="Device definition as 'type host mac'") + parser.add_argument( + "--type", + type=auto_int, + default=DEFAULT_DEVICE, + dest="devtype", + help="Type of device", + ) + parser.add_argument("--host", help="Host address") + parser.add_argument( + "--mac", + type=auto_hex, + help="MAC address (hex reverse), as used by python-broadlink library", + ) + + parser.add_argument("--port", type=int, default=DEFAULT_PORT, help="Broadlink port") + parser.add_argument( + "--timeout", + type=int, + default=DEFAULT_TIMEOUT, + help="The timeout period for actions", + ) + + parser.add_argument( + "--joinwifi", + nargs=2, + help="SSID and PASSPHRASE to configure Broadlink device with", + ) + + parser.add_argument( + "--learntemplate", + help="Specified template for learning and outputing multiple controls " + '- e.g. --learntemplate \'{ "on" : "raw", "off": "raw" }\'', + ) + parser.add_argument("--learnfile", help="Save learned command to a specified file") + + parser.add_argument( + "--output", + dest="out_fmt", + choices=["json", "text", "raw", "base64", "pulses"], + default="json", + help="Specify the output format. 'json' (default), 'text', or a single learned field:" + "'raw' (hex; default Broadlink format), 'base64', 'pulses' (microsecond-format) ", + ) + + parser.add_argument( + "--durations", + action="store_true", + help="Parse [data] durations in microsecond- instead of Broadlink-format", + ) + parser.add_argument("--data", nargs="*", help="Data to send or convert") + + args = parser.parse_args() + + if args.durations: + args.data = pulses_to_data(parse_pulses(args.data)) + elif args.data: + args.data = bytes.fromhex("".join(args.data)) + + if args.device: + values = args.device.split() + args.devtype = int(values[0], 0) + args.host = values[1] + args.mac = bytearray.fromhex(values[2]) + + if not args.action: + parser.print_help() + print("Error: You must specify an action", file=sys.stderr) + sys.exit(1) + elif args.action not in ["convert", "discover"]: + if not (args.device or (args.devtype and args.host and args.mac)): + parser.error( + f"For --{args.action} you must also specify --device " + "or provide --devtype, --host, and --mac." + ) + elif args.action not in ["irlearn", "rflearn"]: + if args.learntemplate: + parser.error("--learntemplate can only be used with --learn or --rflearn") + if args.out_fmt not in ["json", "text"]: + parser.error( + f"--output {args.out_fmt} can only be used with --learn or --rflearn" + ) + + if args.learntemplate: + if args.out_fmt != "json": + parser.error("--learntemplate can only be used with --output json") + try: + args.learntemplate = json.loads(args.learntemplate) + except json.JSONDecodeError as e: + parser.error(f"--learntemplate is not valid JSON - {e.args[0]}") + + if not isinstance(args.learntemplate, dict): + parser.error("invalid --learntemplate structure - must be a dict") + + return args + + def format_pulses(pulses: List[int]) -> str: - """Format pulses.""" + """Concatentate the list of pulses""" return " ".join( - f"+{pulse}" if i % 2 == 0 else f"-{pulse}" - for i, pulse in enumerate(pulses) + f"+{pulse}" if i % 2 == 0 else f"-{pulse}" for i, pulse in enumerate(pulses) ) @@ -29,193 +273,248 @@ def parse_pulses(data: List[str]) -> List[int]: return [abs(int(s)) for s in data] -parser = argparse.ArgumentParser(fromfile_prefix_chars='@') -parser.add_argument("--device", help="device definition as 'type host mac'") -parser.add_argument("--type", type=auto_int, default=0x2712, help="type of device") -parser.add_argument("--host", help="host address") -parser.add_argument("--mac", help="mac address (hex reverse), as used by python-broadlink library") -parser.add_argument("--temperature", action="store_true", help="request temperature from device") -parser.add_argument("--humidity", action="store_true", help="request humidity from device") -parser.add_argument("--energy", action="store_true", help="request energy consumption from device") -parser.add_argument("--check", action="store_true", help="check current power state") -parser.add_argument("--checknl", action="store_true", help="check current nightlight state") -parser.add_argument("--turnon", action="store_true", help="turn on device") -parser.add_argument("--turnoff", action="store_true", help="turn off device") -parser.add_argument("--turnnlon", action="store_true", help="turn on nightlight on the device") -parser.add_argument("--turnnloff", action="store_true", help="turn off nightlight on the device") -parser.add_argument("--switch", action="store_true", help="switch state from on to off and off to on") -parser.add_argument("--send", action="store_true", help="send command") -parser.add_argument("--sensors", action="store_true", help="check all sensors") -parser.add_argument("--learn", action="store_true", help="learn command") -parser.add_argument("--rflearn", action="store_true", help="rf scan learning") -parser.add_argument("--frequency", type=float, help="specify radiofrequency for learning") -parser.add_argument("--learnfile", help="save learned command to a specified file") -parser.add_argument("--durations", action="store_true", - help="use durations in micro seconds instead of the Broadlink format") -parser.add_argument("--convert", action="store_true", help="convert input data to durations") -parser.add_argument("--joinwifi", nargs=2, help="Args are SSID PASSPHRASE to configure Broadlink device with") -parser.add_argument("data", nargs='*', help="Data to send or convert") -args = parser.parse_args() - -if args.device: - values = args.device.split() - devtype = int(values[0], 0) - host = values[1] - mac = bytearray.fromhex(values[2]) -elif args.mac: - devtype = args.type - host = args.host - mac = bytearray.fromhex(args.mac) - -if args.host or args.device: - dev = broadlink.gendevice(devtype, (host, DEFAULT_PORT), mac) - dev.auth() - -if args.joinwifi: - broadlink.setup(args.joinwifi[0], args.joinwifi[1], 4) - -if args.convert: - data = bytearray.fromhex(''.join(args.data)) - pulses = data_to_pulses(data) - print(format_pulses(pulses)) -if args.temperature: - print(dev.check_temperature()) -if args.humidity: - print(dev.check_humidity()) -if args.energy: - print(dev.get_energy()) -if args.sensors: - data = dev.check_sensors() - for key in data: - print("{} {}".format(key, data[key])) -if args.send: - data = ( - pulses_to_data(parse_pulses(args.data)) - if args.durations - else bytes.fromhex(''.join(args.data)) - ) - dev.send_data(data) -if args.learn or (args.learnfile and not args.rflearn): - dev.enter_learning() - print("Learning...") +def get_frequency(dev: object, timeout: int): + """Scan to detect RF remote frequency""" + dev.sweep_frequency() + print( + "Detecting radiofrequency, press and HOLD the button to learn...", + file=sys.stderr, + ) + start = time.time() - while time.time() - start < TIMEOUT: + while time.time() - start < timeout: time.sleep(1) - try: - data = dev.check_data() - except (ReadError, StorageError): - continue - else: + locked, frequency = dev.check_frequency() + if locked: break else: - print("No data received...") - exit(1) - - print("Packet found!") - raw_fmt = data.hex() - base64_fmt = base64.b64encode(data).decode('ascii') - pulse_fmt = format_pulses(data_to_pulses(data)) - - print("Raw:", raw_fmt) - print("Base64:", base64_fmt) - print("Pulses:", pulse_fmt) - - if args.learnfile: - print("Saving to {}".format(args.learnfile)) - with open(args.learnfile, "w") as text_file: - text_file.write(pulse_fmt if args.durations else raw_fmt) -if args.check: - if dev.check_power(): - print('* ON *') - else: - print('* OFF *') -if args.checknl: - if dev.check_nightlight(): - print('* ON *') - else: - print('* OFF *') -if args.turnon: - dev.set_power(True) - if dev.check_power(): - print('== Turned * ON * ==') - else: - print('!! Still OFF !!') -if args.turnoff: - dev.set_power(False) - if dev.check_power(): - print('!! Still ON !!') - else: - print('== Turned * OFF * ==') -if args.turnnlon: - dev.set_nightlight(True) - if dev.check_nightlight(): - print('== Turned * ON * ==') - else: - print('!! Still OFF !!') -if args.turnnloff: - dev.set_nightlight(False) - if dev.check_nightlight(): - print('!! Still ON !!') - else: - print('== Turned * OFF * ==') -if args.switch: - if dev.check_power(): - dev.set_power(False) - print('* Switch to OFF *') - else: - dev.set_power(True) - print('* Switch to ON *') -if args.rflearn: - if args.frequency: - frequency = args.frequency - print("Press the button you want to learn, a short press...") - else: - dev.sweep_frequency() - print("Detecting radiofrequency, press and hold the button to learn...") - - start = time.time() - while time.time() - start < TIMEOUT: - time.sleep(1) - locked, frequency = dev.check_frequency() - if locked: - break - else: - print("Radiofrequency not found") - dev.cancel_sweep_frequency() - exit(1) + print("Radiofrequency not found", file=sys.stderr) + dev.cancel_sweep_frequency() + return 1 - print("Radiofrequency detected: {}MHz".format(frequency)) - print("You can now let go of the button") + print(f"Radiofrequency detected: {frequency}MHz", file=sys.stderr) + print("You can now LET GO of the button", file=sys.stderr) + time.sleep(0.5) - input("Press enter to continue...") + input("Press enter to continue...") - print("Press the button again, now a short press.") + return frequency - dev.find_rf_packet(frequency) + +def format_packet(data: object) -> dict: + """Return the broadlink data packet as a well-formated object""" + return { + "raw": data.hex(), + "base64": base64.b64encode(data).decode("ascii"), + "pulses": format_pulses(data_to_pulses(data)), + } + + +def get_data( + transmitter: str, dev: object, timeout: int, frequency: float, prompt: str = None +) -> object: + """Get data of transmitter type (RF|IR) from the given device-""" + + print( + f"Awaiting {transmitter} code{ f' for {prompt} button' if prompt else '' }...", + file=sys.stderr, + end=None, + ) + if transmitter == "RF": + dev.find_rf_packet(frequency) + else: + dev.enter_learning() start = time.time() - while time.time() - start < TIMEOUT: + while time.time() - start < timeout: time.sleep(1) try: data = dev.check_data() - except (ReadError, StorageError): + except ReadError: + print("_", end="") + continue + except StorageError: + print(".", end="") continue else: break else: - print("No data received...") - exit(1) - - print("Packet found!") - raw_fmt = data.hex() - base64_fmt = base64.b64encode(data).decode('ascii') - pulse_fmt = format_pulses(data_to_pulses(data)) - - print("Raw:", raw_fmt) - print("Base64:", base64_fmt) - print("Pulses:", pulse_fmt) - - if args.learnfile: - print("Saving to {}".format(args.learnfile)) - with open(args.learnfile, "w") as text_file: - text_file.write(pulse_fmt if args.durations else raw_fmt) + print(file=sys.stderr) + print(f"No {transmitter} data received...", file=sys.stderr) + return None + + print(file=sys.stderr) + print("Packet found!", file=sys.stderr) + return format_packet(data) + + +def print_data(out_fmt: str, data: any, sep: str = " "): + """Output the given data in the given format""" + if out_fmt == "json": + print(json.dumps(data, indent=4)) + return + + if isinstance(data, dict): + data = [data] + + if out_fmt == "text": + for key in data[0].keys(): + print(key, sep, end="") + print() + + for item in data: + if out_fmt == "text": + for key in data[0].keys(): + print(item[key], sep, end="") + print() + else: + print(item[out_fmt]) + + +def do_learn( + transmitter: str, + dev: object, + out_fmt: str, + learntemplate: str, + learnfile: str, + timeout: int, + frequency: float = None, +) -> int: + """ + Learning + """ + + if transmitter == "RF" and not frequency: + frequency = get_frequency(dev, timeout) + + if learntemplate: + data = {} + for key, value in learntemplate.items(): + item_data = get_data(transmitter, dev, timeout, frequency, prompt=key) + if not item_data: + return 1 + data[key] = item_data[value] + else: + data = get_data(transmitter, dev, timeout, frequency) + + print_data(out_fmt, data, sep=os.linesep) + + if learnfile: + if out_fmt == "json": + output = json.dumps(data, indent=4) + elif out_fmt == "text": + output = " ".join(data.keys()) + os.linesep + " ".join(data.values()) + else: + output = data[out_fmt] + + print(f"Saving {out_fmt} data to {learnfile}", file=sys.stderr) + with open(learnfile, "w", encoding="utf-8") as text_file: + text_file.write(output) + + return 0 + + +def discover_devices(out_fmt: str, ip=None) -> int: + """Scan local network for Broadlink devices""" + try: + print( + f"Scanning { (ip + ' on ') if ip else '' }local network...", file=sys.stderr + ) + devices = broadlink.discover(local_ip_address=ip) + for device in devices: + print_data( + out_fmt, + { + "devtype": device.devtype, + "host": device.host[0], + "mac": device.mac.hex(), + }, + ) + return 0 + except Exception as e: + print(f"Error in discover_devices: {e}", file=sys.stderr) + return 1 + + +def main() -> int: + """Main function - execute the specified action""" + args = parse_args() + + if args.host and args.mac and args.devtype: + dev = broadlink.gendevice(args.devtype, (args.host, args.port), args.mac) + dev.auth() + + try: + match args.action: + case "temperature": + print_data(args.out_fmt, {"temperature": dev.check_temperature()}) + case "humidity": + print_data(args.out_fmt, {"humidity": dev.check_humidity()}) + case "energy": + print_data(args.out_fmt, {"energy": dev.get_energy()}) + case "sensors": + print_data(args.out_fmt, dev.check_sensors()) + case "check": + print_data(args.out_fmt, {"power": dev.check_power()}) + case "checknl": + print_data(args.out_fmt, {"nightlight": dev.check_nightlight()}) + + case "turnon": + dev.set_power(True) + print_data(args.out_fmt, {"power": dev.check_power()}) + case "turnoff": + dev.set_power(False) + print_data(args.out_fmt, {"power": dev.check_power()}) + case "switch": + dev.set_power(not dev.check_power()) + print_data(args.out_fmt, {"power": dev.check_nightlight()}) + + case "turnnlon": + dev.set_nightlight(True) + print_data(args.out_fmt, {"nightlight": dev.check_nightlight()}) + case "turnnloff": + dev.set_nightlight(False) + print_data(args.out_fmt, {"nightlight": dev.check_nightlight()}) + + case "send": + dev.send_data(args.data) + + case "discover": + return discover_devices(args.out_fmt, args.host) + case "irlearn": + return do_learn( + "IR", + dev, + args.out_fmt, + args.learntemplate, + args.learnfile, + args.timeout, + ) + case "rflearn": + return do_learn( + "RF", + dev, + args.out_fmt, + args.learntemplate, + args.learnfile, + args.timeout, + args.frequency, + ) + + case "convert": + print(data_to_pulses(bytearray.fromhex("".join(args.data)))) + + case _: + print(f"Unknown action: {args.action}") + return 255 + except AttributeError as e: + if hasattr(e.obj, "TYPE"): + print(f"Action {e.name} not supported by device {dev.TYPE} {dev.devtype}", file=sys.stderr) + return 2 + else: + raise e + + return 0 + + +sys.exit(main()) diff --git a/setup.py b/setup.py index 0426f148..0e613c92 100644 --- a/setup.py +++ b/setup.py @@ -1,20 +1,26 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +""" +Setup script for the python-broadlink package. +This script defines the installation requirements and metadata for the package. +To install the package, run: + python setup.py install +""" from setuptools import setup, find_packages -version = '0.19.0' +VERSION = '0.20.0' setup( name="broadlink", - version=version, + version=VERSION, author="Matthew Garrett", author_email="mjg59@srcf.ucam.org", url="http://github.com/mjg59/python-broadlink", packages=find_packages(), - scripts=[], + scripts=["cli/broadlink_cli"], install_requires=["cryptography>=3.2"], description="Python API for controlling Broadlink devices", classifiers=[ @@ -26,4 +32,9 @@ ], include_package_data=True, zip_safe=False, + entry_points={ + "console_scripts": [ + "broadlink-cli=broadlink.cli.broadlink_cli:main", + ], + }, )