diff --git a/README.md b/README.md index 487db5c..c0a5371 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Introduction -This is a Python 3.5+ module aiming to interact with the Chamberlain MyQ API. +This is a Python 3.8+ module aiming to interact with the Chamberlain MyQ API. Code is licensed under the MIT license. @@ -51,6 +51,14 @@ async def main() -> None: devices = myq.covers # >>> {"serial_number123": } + # Return only lamps devices: + devices = myq.lamps + # >>> {"serial_number123": } + + # Return only gateway devices: + devices = myq.gateways + # >>> {"serial_number123": } + # Return *all* devices: devices = myq.devices # >>> {"serial_number123": , "serial_number456": } @@ -58,29 +66,74 @@ async def main() -> None: asyncio.get_event_loop().run_until_complete(main()) ``` +## API Properties + +* `accounts`: dictionary with all accounts +* `covers`: dictionary with all covers +* `devices`: dictionary with all devices +* `gateways`: dictionary with all gateways +* `lamps`: dictionary with all lamps +* `last_state_update`: datetime (in UTC) last state update was retrieved +* `password`: password used for authentication. Can only be set, not retrieved +* `username`: username for authentication. + +## Account Properties + +* `id`: ID for the account +* `name`: Name of the account ## Device Properties +* `account`: Return account associated with device * `close_allowed`: Return whether the device can be closed unattended. * `device_family`: Return the family in which this device lives. * `device_id`: Return the device ID (serial number). * `device_platform`: Return the device platform. * `device_type`: Return the device type. * `firmware_version`: Return the family in which this device lives. +* `href`: URI for device * `name`: Return the device name. * `online`: Return whether the device is online. * `open_allowed`: Return whether the device can be opened unattended. * `parent_device_id`: Return the device ID (serial number) of this device's parent. * `state`: Return the current state of the device. +* `state_update`: Returns datetime when device was last updated + +## API Methods + +These are coroutines and need to be `await`ed – see `example.py` for examples. + +* `authenticate`: Authenticate (or re-authenticate) to MyQ. Call this to + re-authenticate immediately after changing username and/or password otherwise + new username/password will only be used when token has to be refreshed. +* `update_device_info`: Retrieve info and status for accounts and devices -## Methods + +## Device Methods All of the routines on the `MyQDevice` class are coroutines and need to be `await`ed – see `example.py` for examples. -* `close`: close the device -* `open`: open the device -* `update`: get the latest device info (state, etc.) +* `update`: get the latest device info (state, etc.). Note that + this runs api.update_device_info and thus all accounts/devices will be updated + +## Cover Methods + +All Device methods in addition to: +* `close`: close the cover +* `open`: open the cover + +## Lamp Methods + +All Device methods in addition to: +* `turnon`: turn lamp on +* `turnoff`: turn lamp off + + +# Acknowledgement + +Huge thank you to [hjdhjd](https://github.com/hjdhjd) for figuring out the updated V6 API and +sharing his work with us. # Disclaimer diff --git a/example.py b/example.py index d1f7fe1..4cdd67a 100644 --- a/example.py +++ b/example.py @@ -6,12 +6,32 @@ from pymyq import login from pymyq.errors import MyQError, RequestError +from pymyq.garagedoor import STATE_OPEN, STATE_CLOSED _LOGGER = logging.getLogger() EMAIL = "" PASSWORD = "" -OPEN_CLOSE = False +ISSUE_COMMANDS = True + +def print_info(number: int, device): + print(f" Device {number + 1}: {device.name}") + print(f" Device Online: {device.online}") + print(f" Device ID: {device.device_id}") + print( + f" Parent Device ID: {device.parent_device_id}", + ) + print(f" Device Family: {device.device_family}") + print( + f" Device Platform: {device.device_platform}", + ) + print(f" Device Type: {device.device_type}") + print(f" Firmware Version: {device.firmware_version}") + print(f" Open Allowed: {device.open_allowed}") + print(f" Close Allowed: {device.close_allowed}") + print(f" Current State: {device.state}") + print(" ---------") + async def main() -> None: """Create the aiohttp session and run the example.""" @@ -22,33 +42,94 @@ async def main() -> None: print(f"{EMAIL} {PASSWORD}") api = await login(EMAIL, PASSWORD, websession) - # Get the account ID: - _LOGGER.info("Account ID: %s", api.account_id) - - # Get all devices listed with this account – note that you can use - # api.covers to only examine covers: - for idx, device_id in enumerate(api.devices): - device = api.devices[device_id] - _LOGGER.info("---------") - _LOGGER.info("Device %s: %s", idx + 1, device.name) - _LOGGER.info("Device Online: %s", device.online) - _LOGGER.info("Device ID: %s", device.device_id) - _LOGGER.info("Parent Device ID: %s", device.parent_device_id) - _LOGGER.info("Device Family: %s", device.device_family) - _LOGGER.info("Device Platform: %s", device.device_platform) - _LOGGER.info("Device Type: %s", device.device_type) - _LOGGER.info("Firmware Version: %s", device.firmware_version) - _LOGGER.info("Open Allowed: %s", device.open_allowed) - _LOGGER.info("Close Allowed: %s", device.close_allowed) - _LOGGER.info("Current State: %s", device.state) - - if OPEN_CLOSE: - try: - await device.open() - await asyncio.sleep(15) - await device.close() - except RequestError as err: - _LOGGER.error(err) + for account in api.accounts: + print(f"Account ID: {account}") + print(f"Account Name: {api.accounts[account]}") + + # Get all devices listed with this account – note that you can use + # api.covers to only examine covers or api.lamps for only lamps. + print(f" GarageDoors: {len(api.covers)}") + print(" ---------------") + if len(api.covers) != 0: + for idx, device_id in enumerate( + device_id + for device_id in api.covers + if api.devices[device_id].account == account + ): + device = api.devices[device_id] + print_info(number=idx, device=device) + + if ISSUE_COMMANDS: + try: + if device.open_allowed: + if device.state == STATE_OPEN: + print(f"Garage door {device.name} is already open") + else: + print(f"Opening garage door {device.name}") + try: + if await device.open(wait_for_state=True): + print(f"Garage door {device.name} has been opened.") + else: + print(f"Failed to open garage door {device.name}.") + except MyQError as err: + _LOGGER.error(f"Error when trying to open {device.name}: {str(err)}") + else: + print(f"Opening of garage door {device.name} is not allowed.") + + if device.close_allowed: + if device.state == STATE_CLOSED: + print(f"Garage door {device.name} is already closed") + else: + print(f"Closing garage door {device.name}") + try: + wait_task = await device.close(wait_for_state=False) + except MyQError as err: + _LOGGER.error(f"Error when trying to close {device.name}: {str(err)}") + + print(f"Device {device.name} is {device.state}") + + if await wait_task: + print(f"Garage door {device.name} has been closed.") + else: + print(f"Failed to close garage door {device.name}.") + + except RequestError as err: + _LOGGER.error(err) + print(" ------------------------------") + print(f" Lamps: {len(api.lamps)}") + print(" ---------") + if len(api.lamps) != 0: + for idx, device_id in enumerate( + device_id + for device_id in api.lamps + if api.devices[device_id].account == account + ): + device = api.devices[device_id] + print_info(number=idx, device=device) + + if ISSUE_COMMANDS: + try: + print(f"Turning lamp {device.name} on") + await device.turnon() + await asyncio.sleep(15) + print(f"Turning lamp {device.name} off") + await device.turnoff() + except RequestError as err: + _LOGGER.error(err) + print(" ------------------------------") + + print(f" Gateways: {len(api.gateways)}") + print(" ------------") + if len(api.gateways) != 0: + for idx, device_id in enumerate( + device_id + for device_id in api.gateways + if api.devices[device_id].account == account + ): + device = api.devices[device_id] + print_info(number=idx, device=device) + + print("------------------------------") except MyQError as err: _LOGGER.error("There was an error: %s", err) diff --git a/pymyq/__version__.py b/pymyq/__version__.py index 6a9202d..d5f828d 100644 --- a/pymyq/__version__.py +++ b/pymyq/__version__.py @@ -1,2 +1,2 @@ """Define a version constant.""" -__version__ = '2.0.15' +__version__ = '3.0.0' diff --git a/pymyq/api.py b/pymyq/api.py index e200a94..3529645 100644 --- a/pymyq/api.py +++ b/pymyq/api.py @@ -1,317 +1,662 @@ """Define the MyQ API.""" import asyncio import logging -import string +from bs4 import BeautifulSoup from datetime import datetime, timedelta -from random import choices -from typing import Dict, Optional - -from aiohttp import ClientSession -from aiohttp.client_exceptions import ClientError -from json import JSONDecodeError - -from .const import \ - ACCOUNTS_ENDPOINT,\ - ACCOUNT_INFORMATION_ENDPOINT,\ - API_BASE,\ - BASE_API_VERSION,\ - DEVICES_API_VERSION,\ - DEVICES_ENDPOINT,\ - LOGIN_ENDPOINT - +from typing import Dict, Optional, Union, Tuple +from urllib.parse import urlsplit, parse_qs + +from aiohttp import ClientSession, ClientResponse +from aiohttp.client_exceptions import ClientError, ClientResponseError +from pkce import generate_code_verifier, get_code_challenge + +from .const import ( + ACCOUNTS_ENDPOINT, + DEVICES_ENDPOINT, + DEVICE_FAMILY_GARAGEDOOR, + DEVICE_FAMILY_GATEWAY, + DEVICE_FAMLY_LAMP, + OAUTH_CLIENT_ID, + OAUTH_CLIENT_SECRET, + OAUTH_AUTHORIZE_URI, + OAUTH_BASE_URI, + OAUTH_TOKEN_URI, + OAUTH_REDIRECT_URI, +) from .device import MyQDevice -from .errors import InvalidCredentialsError, RequestError +from .errors import AuthenticationError, InvalidCredentialsError, RequestError +from .garagedoor import MyQGaragedoor +from .lamp import MyQLamp +from .request import MyQRequest, REQUEST_METHODS _LOGGER = logging.getLogger(__name__) -DEFAULT_APP_ID = "JVM/G9Nwih5BwKgNCjLxiFUQxQijAebyyg8QUHr7JOrP+tuPb8iHfRHKwTmDzHOu" -# Generate random string for User Agent. -DEFAULT_USER_AGENT = "".join(choices(string.ascii_letters + string.digits, k=5)) -DEFAULT_BRAND_ID = 2 -DEFAULT_REQUEST_RETRIES = 5 -DEFAULT_CULTURE = "en" -MYQ_HEADERS = { - "Content-Type": "application/json", - "MyQApplicationId": DEFAULT_APP_ID, - "ApiVersion": str(DEVICES_API_VERSION), - "BrandId": str(DEFAULT_BRAND_ID), - "Culture": DEFAULT_CULTURE -} -DEFAULT_STATE_UPDATE_INTERVAL = timedelta(seconds=5) -NON_COVER_DEVICE_FAMILIES = "gateway" +DEFAULT_STATE_UPDATE_INTERVAL = timedelta(seconds=10) +DEFAULT_TOKEN_REFRESH = 10 * 60 # 10 minutes class API: # pylint: disable=too-many-instance-attributes """Define a class for interacting with the MyQ iOS App API.""" - def __init__(self, websession: ClientSession = None) -> None: + def __init__( + self, username: str, password: str, websession: ClientSession = None + ) -> None: """Initialize.""" - self._account_info = {} - self._credentials = {} - self._last_state_update = None # type: Optional[datetime] - self._lock = asyncio.Lock() - self._security_token = None # type: Optional[str] - self._websession = websession + self.__credentials = {"username": username, "password": password} + self._myqrequests = MyQRequest(websession or ClientSession()) + self._authentication_task = None # type:Optional[asyncio.Task] + self._codeverifier = None # type: Optional[str] + self._invalid_credentials = False # type: bool + self._lock = asyncio.Lock() # type: asyncio.Lock + self._update = asyncio.Lock() # type: asyncio.Lock + self._security_token = ( + None, + None, + None, + ) # type: Tuple[Optional[str], Optional[datetime], Optional[datetime]] + + self.accounts = {} # type: Dict[str, str] self.devices = {} # type: Dict[str, MyQDevice] + self.last_state_update = None # type: Optional[datetime] + + @property + def covers(self) -> Dict[str, MyQGaragedoor]: + """Return only those devices that are covers.""" + return { + device_id: device + for device_id, device in self.devices.items() + if device.device_json["device_family"] == DEVICE_FAMILY_GARAGEDOOR + } @property - def account_id(self) -> str: - """Return the account ID.""" - return self._account_info["Account"]["Id"] + def lamps(self) -> Dict[str, MyQDevice]: + """Return only those devices that are covers.""" + return { + device_id: device + for device_id, device in self.devices.items() + if device.device_json["device_family"] == DEVICE_FAMLY_LAMP + } @property - def covers(self) -> Dict[str, MyQDevice]: + def gateways(self) -> Dict[str, MyQDevice]: """Return only those devices that are covers.""" return { device_id: device for device_id, device in self.devices.items() - if device.device_json["device_family"] not in NON_COVER_DEVICE_FAMILIES + if device.device_json["device_family"] == DEVICE_FAMILY_GATEWAY } - async def _send_request( + @property + def _code_verifier(self) -> str: + if self._codeverifier is None: + self._codeverifier = generate_code_verifier(length=43) + return self._codeverifier + + @property + def username(self) -> str: + return self.__credentials["username"] + + @username.setter + def username(self, username: str) -> None: + self._invalid_credentials = False + self.__credentials["username"] = username + + @property + def password(self) -> None: + return None + + @password.setter + def password(self, password: str) -> None: + self._invalid_credentials = False + self.__credentials["password"] = password + + async def request( self, method: str, + returns: str, url: str, + websession: ClientSession = None, headers: dict = None, params: dict = None, + data: dict = None, json: dict = None, + allow_redirects: bool = True, login_request: bool = False, - ) -> dict: + ) -> (ClientResponse, Union[dict, str, None]): + """Make a request.""" - attempt = 0 - while attempt < DEFAULT_REQUEST_RETRIES - 1: - if attempt != 0: - wait_for = min(2 ** attempt, 5) - _LOGGER.warning(f"Device update failed; trying again in {wait_for} seconds") - await asyncio.sleep(wait_for) + # Determine the method to call based on what is to be returned. + call_method = REQUEST_METHODS.get(returns) + if call_method is None: + raise RequestError(f"Invalid return object requested: {returns}") - attempt += 1 - data = {} - request_headers = headers - if not login_request: - request_headers["SecurityToken"] = self._security_token + call_method = getattr(self._myqrequests, call_method) + # if this is a request as part of authentication to have it go through in parallel. + if login_request: try: - _LOGGER.debug(f"Sending myq api request {url}") - async with self._websession.request( - method, url, headers=request_headers, params=params, json=json, raise_for_status=True) as resp: - data = await resp.json(content_type=None) - return data + return await call_method( + method=method, + url=url, + websession=websession, + headers=headers, + params=params, + data=data, + json=json, + allow_redirects=allow_redirects, + ) + except ClientResponseError as err: + message = ( + f"Error requesting data from {url}: {err.status} - {err.message}" + ) + _LOGGER.debug(message) + raise RequestError(message) + except ClientError as err: - _LOGGER.debug(f"Attempt {attempt} request failed with exception:: {str(err)}") - status_code = err.status if hasattr(err, "status") else "" + message = f"Error requesting data from {url}: {str(err)}" + _LOGGER.debug(message) + raise RequestError(message) - message = f"Error requesting data from {url}: {str(status_code)} - {data.get('description', str(err))}" - if status_code == 401: - if login_request: - self._credentials = {} - raise InvalidCredentialsError("Invalid username/password") + # The MyQ API can time out if multiple concurrent requests are made, so + # ensure that only one gets through at a time. + # Exception is when this is a login request AND there is already a lock, in that case + # we're sending the request anyways as we know there is no active request now. + async with self._lock: - _LOGGER.debug(f"Security token has expired, re-authenticating to MyQ") + # If we had something for an authentication task and it is done then get the result and clear it out. + if self._authentication_task is not None: + authentication_task = await self.authenticate(wait=False) + if authentication_task.done(): + _LOGGER.debug( + "Scheduled token refresh completed, ensuring no exception." + ) + self._authentication_task = None try: - await self.authenticate(self._credentials.get("username"), - self._credentials.get("password"), - True) - except RequestError as auth_err: + # Get the result so any exception is raised. + authentication_task.result() + except asyncio.CancelledError: + pass + except (RequestError, AuthenticationError) as auth_err: + message = f"Scheduled token refresh failed: {str(auth_err)}" + _LOGGER.debug(message) + + # Check if token has to be refreshed. + if ( + self._security_token[1] is None + or self._security_token[1] <= datetime.utcnow() + ): + # Token has to be refreshed, get authentication task if running otherwise start a new one. + if self._security_token[0] is None: + # Wait for authentication task to be completed. + _LOGGER.debug( + f"Waiting for updated token, last refresh was {self._security_token[2]}" + ) + try: + await self.authenticate(wait=True) + except AuthenticationError as auth_err: message = f"Error trying to re-authenticate to myQ service: {str(auth_err)}" - _LOGGER.error(message) - raise RequestError(message) + _LOGGER.debug(message) + raise AuthenticationError(message) + else: + # We still have a token, we can continue this request with that token and schedule + # task to refresh token unless one is already running + await self.authenticate(wait=False) + + if not headers: + headers = {} + + headers["Authorization"] = self._security_token[0] + + _LOGGER.debug(f"Sending {method} request to {url}.") + # Do the request + try: + # First try + try: + return await call_method( + method=method, + url=url, + websession=websession, + headers=headers, + params=params, + data=data, + json=json, + allow_redirects=allow_redirects, + ) + except ClientResponseError as err: + # Handle only if status is 401, we then re-authenticate and retry the request + if err.status == 401: + self._security_token = (None, None, self._security_token[2]) + _LOGGER.debug("Status 401 received, re-authenticating.") + try: + await self.authenticate(wait=True) + except AuthenticationError as auth_err: + # Raise authentication error, we need a new token to continue and not getting it right + # now. + message = f"Error trying to re-authenticate to myQ service: {str(auth_err)}" + _LOGGER.debug(message) + raise AuthenticationError(message) + else: + # Some other error, re-raise. + raise err + + # Re-authentication worked, resend request that had failed. + return await call_method( + method=method, + url=url, + websession=websession, + headers=headers, + params=params, + data=data, + json=json, + allow_redirects=allow_redirects, + ) + + except ClientResponseError as err: + message = ( + f"Error requesting data from {url}: {err.status} - {err.message}" + ) + _LOGGER.debug(message) + if getattr(err, "status") and err.status == 401: + # Received unauthorized, reset token and start task to get a new one. + self._security_token = (None, None, self._security_token[2]) + await self.authenticate(wait=False) + raise AuthenticationError(message) - # Reset the attempt counter as we're now re-authenticated. - attempt = 0 - continue - except JSONDecodeError as err: - message=f"JSON Decoder error {err.msg} in response at line {err.lineno} column {err.colno}. Response " \ - f"received was:\n{err.doc}" - _LOGGER.error(message) raise RequestError(message) + except ClientError as err: + message = f"Error requesting data from {url}: {str(err)}" + _LOGGER.debug(message) + raise RequestError(message) - _LOGGER.error(message) - raise RequestError(message) + async def _oauth_authenticate(self) -> (str, int): - async def request( - self, - method: str, - endpoint: str, - headers: dict = None, - params: dict = None, - json: dict = None, - login_request: bool = False, - api_version: str = BASE_API_VERSION, - ) -> dict: - """Make a request.""" - api_base = API_BASE.format(api_version) - url = f"{api_base}/{endpoint}" + async with ClientSession() as session: + # retrieve authentication page + _LOGGER.debug("Retrieving authentication page") + resp, html = await self.request( + method="get", + returns="text", + url=OAUTH_AUTHORIZE_URI, + websession=session, + headers={ + "redirect": "follow", + }, + params={ + "client_id": OAUTH_CLIENT_ID, + "code_challenge": get_code_challenge(self._code_verifier), + "code_challenge_method": "S256", + "redirect_uri": OAUTH_REDIRECT_URI, + "response_type": "code", + "scope": "MyQ_Residential offline_access", + }, + login_request=True, + ) - if not headers: - headers = {} - headers.update(MYQ_HEADERS) + # Scanning returned web page for required fields. + _LOGGER.debug("Scanning login page for fields to return") + soup = BeautifulSoup(html, "html.parser") - # The MyQ API can time out if multiple concurrent requests are made, so - # ensure that only one gets through at a time. - # Exception is when this is a login request AND there is already a lock, in that case - # we're sending the request anyways as we know there is no active request now. - if login_request and self._lock.locked(): - return await self._send_request( - method=method, - url=url, - headers=headers, - params=params, - json=json, - login_request=login_request, + # Go through all potential forms in the page returned. This is in case multiple forms are returned. + forms = soup.find_all("form") + data = {} + for form in forms: + have_email = False + have_password = False + have_submit = False + # Go through all the input fields. + for field in form.find_all("input"): + if field.get("type"): + # Hidden value, include so we return back + if field.get("type").lower() == "hidden": + data.update( + { + field.get("name", "NONAME"): field.get( + "value", "NOVALUE" + ) + } + ) + # Email field + elif field.get("type").lower() == "email": + data.update({field.get("name", "Email"): self.username}) + have_email = True + # Password field + elif field.get("type").lower() == "password": + data.update( + { + field.get( + "name", "Password" + ): self.__credentials.get("password") + } + ) + have_password = True + # To confirm this form also has a submit button + elif field.get("type").lower() == "submit": + have_submit = True + + # Confirm we found email, password, and submit in the form to be submitted + if have_email and have_password and have_submit: + break + + # If we're here then this is not the form to submit. + data = {} + + # If data is empty then we did not find the valid form and are unable to continue. + if len(data) == 0: + _LOGGER.debug("Form with required fields not found") + raise RequestError( + "Form containing fields for email, password and submit not found." + "Unable to continue login process." + ) + + # Perform login to MyQ + _LOGGER.debug("Performing login to MyQ") + resp, _ = await self.request( + method="post", + returns="response", + url=resp.url, + websession=session, + headers={ + "Content-Type": "application/x-www-form-urlencoded", + "Cookie": resp.cookies.output(attrs=[]), + }, + data=data, + allow_redirects=False, + login_request=True, ) - async with self._lock: - return await self._send_request( - method=method, - url=url, - headers=headers, - params=params, - json=json, - login_request=login_request, + # We're supposed to receive back at least 2 cookies. If not then authentication failed. + if len(resp.cookies) < 2: + message = "Invalid MyQ credentials provided. Please recheck login and password." + self._invalid_credentials = True + _LOGGER.debug(message) + raise InvalidCredentialsError(message) + + # Intercept redirect back to MyQ iOS app + _LOGGER.debug("Calling redirect page") + resp, _ = await self.request( + method="get", + returns="response", + url=f"{OAUTH_BASE_URI}{resp.headers['Location']}", + websession=session, + headers={ + "Cookie": resp.cookies.output(attrs=[]), + "User-Agent": "null", + }, + allow_redirects=False, + login_request=True, + ) + + # Retrieve token + _LOGGER.debug("Getting token") + redirect_url = f"{OAUTH_BASE_URI}{resp.headers['Location']}" + + resp, data = await self.request( + returns="json", + method="post", + url=OAUTH_TOKEN_URI, + websession=session, + headers={ + "Content-Type": "application/x-www-form-urlencoded", + "User-Agent": "null", + }, + data={ + "client_id": OAUTH_CLIENT_ID, + "client_secret": OAUTH_CLIENT_SECRET, + "code": parse_qs(urlsplit(redirect_url).query).get("code", ""), + "code_verifier": self._code_verifier, + "grant_type": "authorization_code", + "redirect_uri": OAUTH_REDIRECT_URI, + "scope": parse_qs(urlsplit(redirect_url).query).get( + "code", "MyQ_Residential offline_access" + ), + }, + login_request=True, ) + token = f"{data.get('token_type')} {data.get('access_token')}" + try: + expires = int(data.get("expires_in", DEFAULT_TOKEN_REFRESH)) + except ValueError: + _LOGGER.debug( + f"Expires {data.get('expires_in')} received is not an integer, using default." + ) + expires = DEFAULT_TOKEN_REFRESH * 2 - async def authenticate(self, username: str, password: str, token_only = False) -> None: + if expires < DEFAULT_TOKEN_REFRESH * 2: + _LOGGER.debug( + f"Expires {expires} is less then default {DEFAULT_TOKEN_REFRESH}, setting to default instead." + ) + expires = DEFAULT_TOKEN_REFRESH * 2 + + return token, expires + + async def authenticate(self, wait: bool = True) -> Optional[asyncio.Task]: """Authenticate and get a security token.""" - if username is None or password is None: - _LOGGER.debug("No username/password, most likely due to previous failed authentication.") - return + if self.username is None or self.__credentials["password"] is None: + message = "No username/password, most likely due to previous failed authentication." + _LOGGER.debug(message) + raise InvalidCredentialsError(message) + + if self._invalid_credentials: + message = "Credentials are invalid, update username/password to re-try authentication." + _LOGGER.debug(message) + raise InvalidCredentialsError(message) + + if self._authentication_task is None: + # No authentication task is currently running, start one + _LOGGER.debug( + f"Scheduling token refresh, last refresh was {self._security_token[2]}" + ) + self._authentication_task = asyncio.create_task( + self._authenticate(), name="MyQ_Authenticate" + ) + + if wait: + try: + await self._authentication_task + except (RequestError, AuthenticationError) as auth_err: + # Raise authentication error, we need a new token to continue and not getting it right + # now. + self._authentication_task = None + raise AuthenticationError(str(auth_err)) + self._authentication_task = None - self._credentials = {"username": username, "password": password} + return self._authentication_task + + async def _authenticate(self) -> None: # Retrieve and store the initial security token: - _LOGGER.debug("Sending authentication request to MyQ") - auth_resp = await self.request( - method="post", - endpoint=LOGIN_ENDPOINT, - json={"Username": username, "Password": password}, - login_request=True, - ) - self._security_token = auth_resp.get("SecurityToken") - if self._security_token is None: + _LOGGER.debug("Initiating OAuth authentication") + token, expires = await self._oauth_authenticate() + + if token is None: _LOGGER.debug("No security token received.") - raise RequestError( + raise AuthenticationError( "Authentication response did not contain a security token yet one is expected." ) - if token_only: - return + _LOGGER.debug(f"Received token that will expire in {expires} seconds") + self._security_token = ( + token, + datetime.utcnow() + timedelta(seconds=int(expires / 2)), + datetime.now(), + ) + + async def _get_accounts(self) -> Optional[dict]: - # Retrieve and store account info: _LOGGER.debug("Retrieving account information") - self._account_info = await self.request( - method="get", - endpoint=ACCOUNT_INFORMATION_ENDPOINT, - params={"expand": "account"} + + # Retrieve the accounts + _, accounts_resp = await self.request( + method="get", returns="json", url=ACCOUNTS_ENDPOINT ) - # Retrieve and store initial set of devices: - _LOGGER.debug("Retrieving MyQ information") - await self.update_device_info() + if accounts_resp is not None and accounts_resp.get("accounts") is not None: + accounts = {} + for account in accounts_resp["accounts"]: + account_id = account.get("id") + if account_id is not None: + _LOGGER.debug( + f"Got account {account_id} with name {account.get('name')}" + ) + accounts.update({account_id: account.get("name")}) + else: + _LOGGER.debug(f"No accounts found") + accounts = None - async def update_device_info(self) -> dict: - """Get up-to-date device info.""" - # The MyQ API can time out if state updates are too frequent; therefore, - # if back-to-back requests occur within a threshold, respond to only the first: - call_dt = datetime.utcnow() - if not self._last_state_update: - self._last_state_update = call_dt - DEFAULT_STATE_UPDATE_INTERVAL - next_available_call_dt = self._last_state_update + DEFAULT_STATE_UPDATE_INTERVAL - - if call_dt < next_available_call_dt: - _LOGGER.debug("Ignoring subsequent request within throttle window") - return - - devices = [] - _LOGGER.debug("Retrieving accounts") - accounts_resp = await self.request( - method="get", - endpoint=ACCOUNTS_ENDPOINT - ) - if accounts_resp is not None and accounts_resp.get("Items") is not None: - for account in accounts_resp["Items"]: - account_id = account["Id"] - _LOGGER.debug(f"Retrieving devices for account {account_id}") - devices_resp = await self.request( - method="get", - endpoint=DEVICES_ENDPOINT.format(account_id), - api_version=DEVICES_API_VERSION - ) - if devices_resp is not None and devices_resp.get("items") is not None: - devices += devices_resp["items"] + return accounts - if not devices: - _LOGGER.debug("Retrieving device information") - devices_resp = await self.request( - method="get", - endpoint=DEVICES_ENDPOINT.format(self.account_id), - api_version=DEVICES_API_VERSION - ) + async def _get_devices_for_account(self, account) -> None: + + _LOGGER.debug(f"Retrieving devices for account {self.accounts[account]}") - if devices_resp is not None and devices_resp.get("items") is not None: - devices += devices_resp["items"] + _, devices_resp = await self.request( + method="get", + returns="json", + url=DEVICES_ENDPOINT.format(account_id=account), + ) - if not devices: - _LOGGER.debug("Response did not contain any devices, no updates.") - return + state_update_timestmp = datetime.utcnow() + if devices_resp is not None and devices_resp.get("items") is not None: + for device in devices_resp.get("items"): + serial_number = device.get("serial_number") + if serial_number is None: + _LOGGER.debug( + f"No serial number for device with name {device.get('name')}." + ) + continue - for device_json in devices: - serial_number = device_json.get("serial_number") - if serial_number is None: - _LOGGER.debug(f"No serial number for device with name {device_json.get('name')}.") - continue + if serial_number in self.devices: + _LOGGER.debug( + f"Updating information for device with serial number {serial_number}" + ) + myqdevice = self.devices[serial_number] + + # When performing commands we might update the state temporary, need to ensure + # that the state is not set back to something else if MyQ does not yet have updated + # state + last_update = myqdevice.device_json["state"].get("last_update") + myqdevice.device_json = device + + if ( + myqdevice.device_json["state"].get("last_update") is not None + and myqdevice.device_json["state"].get("last_update") + != last_update + ): + # MyQ has updated device state, reset ours ensuring we have the one from MyQ. + myqdevice.state = None + _LOGGER.debug( + f"State for device {myqdevice.name} was updated to {myqdevice.state}" + ) + + myqdevice.state_update = state_update_timestmp + else: + if device.get("device_family") == DEVICE_FAMILY_GARAGEDOOR: + _LOGGER.debug( + f"Adding new garage door with serial number {serial_number}" + ) + self.devices[serial_number] = MyQGaragedoor( + api=self, + account=account, + device_json=device, + state_update=state_update_timestmp, + ) + elif device.get("device_family") == DEVICE_FAMLY_LAMP: + _LOGGER.debug( + f"Adding new lamp with serial number {serial_number}" + ) + self.devices[serial_number] = MyQLamp( + api=self, + account=account, + device_json=device, + state_update=state_update_timestmp, + ) + elif device.get("device_family") == DEVICE_FAMILY_GATEWAY: + _LOGGER.debug( + f"Adding new gateway with serial number {serial_number}" + ) + self.devices[serial_number] = MyQDevice( + api=self, + account=account, + device_json=device, + state_update=state_update_timestmp, + ) + else: + _LOGGER.warning( + f"Unknown device family {device.get('device_family')}" + ) + else: + _LOGGER.debug(f"No devices found for account {self.accounts[account]}") + + async def update_device_info(self, for_account: str = None) -> None: + """Get up-to-date device info.""" + # The MyQ API can time out if state updates are too frequent; therefore, + # if back-to-back requests occur within a threshold, respond to only the first + # Ensure only 1 update task can run at a time. + async with self._update: + call_dt = datetime.utcnow() + if not self.last_state_update: + self.last_state_update = call_dt - DEFAULT_STATE_UPDATE_INTERVAL + next_available_call_dt = ( + self.last_state_update + DEFAULT_STATE_UPDATE_INTERVAL + ) - if serial_number in self.devices: - device = self.devices[serial_number] - device.device_json = device_json - else: - self.devices[device_json["serial_number"]] = MyQDevice( - self, device_json + # Ensure we're within our minimum update interval AND update request is not for a specific device + if call_dt < next_available_call_dt and for_account is None: + _LOGGER.debug( + "Ignoring device update request as it is within throttle window" ) + return + + _LOGGER.debug("Updating device information") + # If update request is for a specific account then do not retrieve account information. + if for_account is None: + self.accounts = await self._get_accounts() + + if self.accounts is None: + _LOGGER.debug(f"No accounts found") + self.devices = [] + accounts = {} + else: + accounts = self.accounts + else: + # Request is for specific account, thus restrict retrieval to the 1 account. + if self.accounts.get(for_account) is None: + # Checking to ensure we know the account, but this should never happen. + _LOGGER.debug( + f"Unable to perform update request for account {for_account} as it is not known." + ) + accounts = {} + else: + accounts = {for_account: self.accounts.get(for_account)} - self._last_state_update = datetime.utcnow() + for account in accounts: + await self._get_devices_for_account(account=account) + # Update our last update timestamp UNLESS this is for a specific account + if for_account is None: + self.last_state_update = datetime.utcnow() -async def login(username: str, password: str, websession: ClientSession = None, useragent: str = None) -> API: - """Log in to the API.""" - if useragent is None: - # Retrieve user agent from GitHub if not provided for login. - _LOGGER.debug("No user agent provided, trying to retrieve from GitHub.") - url = f"https://raw.githubusercontent.com/arraylabs/pymyq/master/.USER_AGENT" - try: - async with ClientSession() as session: - async with session.get(url) as resp: - useragent = await resp.text() - resp.raise_for_status() - _LOGGER.debug(f"Retrieved user agent {useragent} from GitHub.") - - except ClientError as exc: - # Default user agent to random string with length of 5 if failure to retrieve it from GitHub. - useragent = "#RANDOM:5" - _LOGGER.warning(f"Failed retrieving user agent from GitHub, will use randomized user agent " - f"instead: {str(exc)}") - else: - _LOGGER.debug(f"Received user agent {useragent}.") - - # Check if value for useragent is to create a random user agent. - useragent_list = useragent.split(":") - if useragent_list[0] == "#RANDOM": - # Create a random string, check if length is provided for the random string, if not then default is 5. - try: - randomlength = int(useragent_list[1]) if len(useragent_list) == 2 else 5 - except ValueError: - _LOGGER.debug(f"Random length value {useragent_list[1]} in user agent {useragent} is not an integer. " - f"Setting to 5 instead.") - randomlength = 5 - - # Create the random user agent. - useragent = "".join(choices(string.ascii_letters + string.digits, k=randomlength)) - _LOGGER.debug(f"User agent set to randomized value: {useragent}.") +async def login(username: str, password: str, websession: ClientSession = None) -> API: + """Log in to the API.""" # Set the user agent in the headers. - MYQ_HEADERS.update({"User-Agent": useragent}) - api = API(websession) - await api.authenticate(username, password, False) + api = API(username=username, password=password, websession=websession) + _LOGGER.debug("Performing initial authentication into MyQ") + try: + await api.authenticate(wait=True) + except InvalidCredentialsError as err: + _LOGGER.error( + f"Username and/or password are invalid. Update username/password." + ) + raise err + except AuthenticationError as err: + _LOGGER.error(f"Authentication failed: {str(err)}") + raise err + + # Retrieve and store initial set of devices: + _LOGGER.debug("Retrieving MyQ information") + await api.update_device_info() + return api diff --git a/pymyq/const.py b/pymyq/const.py index d3ed75a..fe7af12 100644 --- a/pymyq/const.py +++ b/pymyq/const.py @@ -1,19 +1,23 @@ """The myq constants.""" -API_BASE = "https://api.myqdevice.com/api/v{0}" -BASE_API_VERSION = 5 -DEVICES_API_VERSION = 5.1 +OAUTH_CLIENT_ID = "IOS_CGI_MYQ" +OAUTH_CLIENT_SECRET = "VUQ0RFhuS3lQV3EyNUJTdw==" +OAUTH_BASE_URI = "https://partner-identity.myq-cloud.com" +OAUTH_AUTHORIZE_URI = f"{OAUTH_BASE_URI}/connect/authorize" +OAUTH_REDIRECT_URI = "com.myqops://ios" +OAUTH_TOKEN_URI = f"{OAUTH_BASE_URI}/connect/token" -LOGIN_ENDPOINT = "Login" -ACCOUNT_INFORMATION_ENDPOINT = "My" -ACCOUNTS_ENDPOINT = "Accounts" -DEVICES_ENDPOINT = f"{ACCOUNTS_ENDPOINT}/{{0}}/Devices" -ACTION_ENDPOINT = f"{ACCOUNTS_ENDPOINT}/{{0}}/Devices/{{1}}/actions" +ACCOUNTS_ENDPOINT = "https://accounts.myq-cloud.com/api/v6.0/accounts" +DEVICES_ENDPOINT = "https://devices.myq-cloud.com/api/v5.2/Accounts/{account_id}/Devices" + +WAIT_TIMEOUT = 60 DEVICE_TYPE = "device_type" DEVICE_TYPE_GATE = "gate" DEVICE_FAMILY = "device_family" DEVICE_FAMILY_GATEWAY = "gateway" +DEVICE_FAMILY_GARAGEDOOR = "garagedoor" +DEVICE_FAMLY_LAMP = "lamp" DEVICE_STATE = "state" DEVICE_STATE_ONLINE = "online" diff --git a/pymyq/device.py b/pymyq/device.py index cfc1335..dab644b 100644 --- a/pymyq/device.py +++ b/pymyq/device.py @@ -1,41 +1,35 @@ """Define MyQ devices.""" +import asyncio import logging -import re -from typing import TYPE_CHECKING, Optional +from datetime import datetime +from typing import TYPE_CHECKING, Optional, Tuple, List -from .errors import RequestError - -from .const import ACTION_ENDPOINT, DEVICE_TYPE, DEVICES_API_VERSION +from .const import DEVICE_TYPE, WAIT_TIMEOUT +from .errors import RequestError, MyQError if TYPE_CHECKING: from .api import API _LOGGER = logging.getLogger(__name__) -COMMAND_CLOSE = "close" -COMMAND_OPEN = "open" - -STATE_CLOSED = "closed" -STATE_CLOSING = "closing" -STATE_OPEN = "open" -STATE_OPENING = "opening" -STATE_STOPPED = "stopped" -STATE_TRANSITION = "transition" -STATE_UNKNOWN = "unknown" - class MyQDevice: """Define a generic device.""" - def __init__(self, api: "API", device_json: dict): - """Initialize.""" - self._api = api - self.device_json = device_json + def __init__(self, api: "API", device_json: dict, account: str, state_update: datetime) -> None: + """Initialize. + :type account: str + """ + self._api = api # type: "API" + self._account = account # type: str + self.device_json = device_json # type: dict + self.state_update = state_update # type: datetime + self._device_state = None # Type: Optional[str] @property - def close_allowed(self) -> bool: - """Return whether the device can be closed unattended.""" - return self.device_json["state"].get("is_unattended_close_allowed") is True + def account(self) -> str: + """Return account associated with device""" + return self._account @property def device_family(self) -> str: @@ -72,34 +66,40 @@ def online(self) -> bool: """Return whether the device is online.""" return self.device_json["state"].get("online") is True - @property - def open_allowed(self) -> bool: - """Return whether the device can be opened unattended.""" - return self.device_json["state"].get("is_unattended_open_allowed") is True - @property def parent_device_id(self) -> Optional[str]: """Return the device ID (serial number) of this device's parent.""" return self.device_json.get("parent_device_id") - @property - def state(self) -> Optional[str]: - """Return the current state of the device.""" - return self.device_json["state"].get("door_state") - @property def href(self) -> Optional[str]: """Return the hyperlinks of the device.""" return self.device_json.get("href") + @property + def state(self) -> Optional[str]: + return self._device_state or self.device_state + @state.setter def state(self, value: str) -> None: """Set the current state of the device.""" - if not self.device_json["state"].get("door_state"): - return - self.device_json["state"]["door_state"] = value + self._device_state = value - async def _send_state_command(self, state_command: str) -> None: + @property + def device_state(self) -> Optional[str]: + return None + + @property + def close_allowed(self) -> bool: + """Return whether the device can be closed unattended.""" + return False + + @property + def open_allowed(self) -> bool: + """Return whether the device can be opened unattended.""" + return False + + async def _send_state_command(self, url: str, command: str) -> None: """Instruct the API to change the state of the device.""" # If the user tries to open or close, say, a gateway, throw an exception: if not self.state: @@ -107,42 +107,47 @@ async def _send_state_command(self, state_command: str) -> None: f"Cannot change state of device type: {self.device_type}" ) - account_id = self._api.account_id - if self.href is not None: - rule = r".*/accounts/(.*)/devices/(.*)" - infos = re.search(rule, self.href) - if infos is not None: - account_id = infos.group(1) - _LOGGER.debug(f"Sending command {state_command} for {self.name}") + _LOGGER.debug(f"Sending command {command} for {self.name}") await self._api.request( method="put", - endpoint=ACTION_ENDPOINT.format( - account_id, self.device_id - ), - json={"action_type": state_command}, - api_version=DEVICES_API_VERSION + returns="response", + url=url, ) - async def close(self) -> None: - """Close the device.""" - if self.state in (STATE_CLOSED, STATE_CLOSING): - return - - # Set the current state to "closing" right away (in case the user doesn't - # run update() before checking): - self.state = STATE_CLOSING - await self._send_state_command(COMMAND_CLOSE) - - async def open(self) -> None: - """Open the device.""" - if self.state in (STATE_OPEN, STATE_OPENING): - return - - # Set the current state to "opening" right away (in case the user doesn't - # run update() before checking): - self.state = STATE_OPENING - await self._send_state_command(COMMAND_OPEN) - async def update(self) -> None: """Get the latest info for this device.""" await self._api.update_device_info() + + async def wait_for_state(self, current_state: List, new_state: List, last_state_update: datetime) -> bool: + # First wait until door state is actually updated. + _LOGGER.debug(f"Waiting until device state has been updated for {self.name}") + wait_timeout = WAIT_TIMEOUT + while ( + last_state_update == self.device_json["state"].get("last_update", datetime.utcnow()) and wait_timeout > 0 + ): + wait_timeout = wait_timeout - 5 + await asyncio.sleep(5) + try: + await self._api.update_device_info(for_account=self.account) + except MyQError: + # Ignoring + pass + + # Wait until the state is to what we want it to be + _LOGGER.debug(f"Waiting until device state for {self.name} is {new_state}") + wait_timeout = WAIT_TIMEOUT + while self.state in current_state and wait_timeout > 0: + wait_timeout = wait_timeout - 5 + await asyncio.sleep(5) + try: + await self._api.update_device_info(for_account=self.account) + except MyQError: + # Ignoring + pass + + # Reset self.state ensuring it reflects actual device state. Only do this if state is still what it would + # have been, this to ensure if something else had updated it to something else we don't override. + if self._device_state == current_state: + self.state = None + + return self.state in new_state diff --git a/pymyq/errors.py b/pymyq/errors.py index d3e2910..ea289cd 100644 --- a/pymyq/errors.py +++ b/pymyq/errors.py @@ -13,6 +13,12 @@ class InvalidCredentialsError(MyQError): pass +class AuthenticationError(MyQError): + """Define an exception related to invalid credentials.""" + + pass + + class RequestError(MyQError): """Define an exception related to bad HTTP requests.""" diff --git a/pymyq/garagedoor.py b/pymyq/garagedoor.py new file mode 100644 index 0000000..5f42a69 --- /dev/null +++ b/pymyq/garagedoor.py @@ -0,0 +1,115 @@ +"""Define MyQ devices.""" +import asyncio +import logging +from datetime import datetime +from typing import TYPE_CHECKING, Optional, Union + +from .device import MyQDevice +from .errors import RequestError + +if TYPE_CHECKING: + from .api import API + +_LOGGER = logging.getLogger(__name__) + +COMMAND_URI = \ + "https://account-devices-gdo.myq-cloud.com/api/v5.2/Accounts/{account_id}/door_openers/{device_serial}/{command}" +COMMAND_CLOSE = "close" +COMMAND_OPEN = "open" +STATE_CLOSED = "closed" +STATE_CLOSING = "closing" +STATE_OPEN = "open" +STATE_OPENING = "opening" +STATE_STOPPED = "stopped" +STATE_TRANSITION = "transition" +STATE_UNKNOWN = "unknown" + + +class MyQGaragedoor(MyQDevice): + """Define a generic device.""" + + def __init__(self, api: "API", device_json: dict, account: str, state_update: datetime) -> None: + """Initialize. + :type account: str + """ + super().__init__(api=api, account=account, device_json=device_json, state_update=state_update) + + @property + def close_allowed(self) -> bool: + """Return whether the device can be closed unattended.""" + return self.device_json["state"].get("is_unattended_close_allowed") is True + + @property + def open_allowed(self) -> bool: + """Return whether the device can be opened unattended.""" + return self.device_json["state"].get("is_unattended_open_allowed") is True + + @property + def device_state(self) -> Optional[str]: + """Return the current state of the device.""" + return ( + self.device_json["state"].get("door_state") + if self.device_json.get("state") is not None + else None + ) + + async def close(self, wait_for_state: bool = False) -> Union[asyncio.Task, bool]: + """Close the device.""" + if self.state != self.device_state: + raise RequestError(f"Device is currently {self.state}, wait until complete.") + + if self.state not in (STATE_CLOSED, STATE_CLOSING): + # If our state is different from device state then it means an action is already being performed. + if self.state != self.device_state: + raise RequestError(f"Device is currently {self.state}, wait until complete.") + + # Device is currently not closed or closing, send command to close + await self._send_state_command( + url=COMMAND_URI.format( + account_id=self.account, + device_serial=self.device_id, + command=COMMAND_CLOSE, + ), + command=COMMAND_CLOSE, + ) + self.state = STATE_CLOSING + + wait_for_state_task = asyncio.create_task(self.wait_for_state( + current_state=[STATE_CLOSING], + new_state=[STATE_CLOSED], + last_state_update=self.device_json["state"].get("last_update"), + ), name="MyQ_WaitForClose", + ) + if not wait_for_state: + return wait_for_state_task + + _LOGGER.debug("Waiting till garage is closed") + return await wait_for_state_task + + async def open(self, wait_for_state: bool = False) -> Union[asyncio.Task, bool]: + """Open the device.""" + if self.state not in (STATE_OPEN, STATE_OPENING): + # Set the current state to "opening" right away (in case the user doesn't + # run update() before checking): + await self._send_state_command( + url=COMMAND_URI.format( + account_id=self.account, + device_serial=self.device_id, + command=COMMAND_OPEN, + ), + command=COMMAND_OPEN, + ) + self.state = STATE_OPENING + + wait_for_state_task = asyncio.create_task(self.wait_for_state( + current_state=[STATE_OPENING], + new_state=[STATE_OPEN], + last_state_update=self.device_json["state"].get("last_update"), + ), name="MyQ_WaitForOpen", + ) + + if not wait_for_state: + return wait_for_state_task + + _LOGGER.debug("Waiting till garage is open") + return await wait_for_state_task diff --git a/pymyq/lamp.py b/pymyq/lamp.py new file mode 100644 index 0000000..f66eaad --- /dev/null +++ b/pymyq/lamp.py @@ -0,0 +1,71 @@ +"""Define MyQ devices.""" +import logging +from datetime import datetime +from typing import TYPE_CHECKING, Optional + +from .device import MyQDevice + +if TYPE_CHECKING: + from .api import API + +_LOGGER = logging.getLogger(__name__) + +COMMAND_URI = \ + "https://account-devices-lamp.myq-cloud.com/api/v5.2/Accounts/{account_id}/lamps/{device_serial}/{command}" +COMMAND_ON = "on" +COMMAND_OFF = "off" +STATE_ON = "on" +STATE_OFF = "off" + + +class MyQLamp(MyQDevice): + """Define a generic device.""" + + def __init__(self, api: "API", device_json: dict, account: str, state_update: datetime) -> None: + """Initialize. + :type account: str + """ + super().__init__(api=api, account=account, device_json=device_json, state_update=state_update) + + @property + def device_state(self) -> Optional[str]: + """Return the current state of the device.""" + return ( + self.device_json["state"].get("lamp_state") + if self.device_json.get("state") is not None + else None + ) + + async def turnoff(self) -> None: + """Close the device.""" + if self.state == STATE_OFF: + return + + # Set the current state to "closing" right away (in case the user doesn't + # run update() before checking): + self.state = STATE_OFF + await self._send_state_command( + url=COMMAND_URI.format( + account_id=self.account, + device_serial=self.device_id, + command=COMMAND_OFF, + ), + command=COMMAND_OFF, + ) + + async def turnon(self) -> None: + """Open the device.""" + if self.state == STATE_ON: + return + + # Set the current state to "opening" right away (in case the user doesn't + # run update() before checking): + self.state = STATE_ON + await self._send_state_command( + url=COMMAND_URI.format( + account_id=self.account, + device_serial=self.device_id, + command=COMMAND_ON, + ), + command=COMMAND_ON, + ) diff --git a/pymyq/request.py b/pymyq/request.py new file mode 100644 index 0000000..f0f7800 --- /dev/null +++ b/pymyq/request.py @@ -0,0 +1,177 @@ +"""Handle requests to MyQ.""" +import asyncio +import logging +from json import JSONDecodeError + +from aiohttp import ClientSession, ClientResponse +from aiohttp.client_exceptions import ClientError, ClientResponseError + +from .errors import RequestError + +_LOGGER = logging.getLogger(__name__) + +REQUEST_METHODS = dict( + json="request_json", text="request_text", response="request_response" +) +DEFAULT_REQUEST_RETRIES = 5 + + +class MyQRequest: # pylint: disable=too-many-instance-attributes + """Define a class to handle requests to MyQ""" + + def __init__(self, websession: ClientSession = None) -> None: + self._websession = websession or ClientSession() + + @staticmethod + async def _send_request( + method: str, + url: str, + websession: ClientSession, + headers: dict = None, + params: dict = None, + data: dict = None, + json: dict = None, + allow_redirects: bool = False, + ) -> ClientResponse: + + attempt = 0 + resp_exc = None + last_status = "" + last_error = "" + while attempt < DEFAULT_REQUEST_RETRIES: + if attempt != 0: + wait_for = min(2 ** attempt, 5) + _LOGGER.debug(f'Request failed with "{last_status} {last_error}" ' + f'(attempt #{attempt}/{DEFAULT_REQUEST_RETRIES})"; trying again in {wait_for} seconds') + await asyncio.sleep(wait_for) + + attempt += 1 + try: + _LOGGER.debug(f"Sending myq api request {url} and headers {headers} with connection pooling") + resp = await websession.request( + method, + url, + headers=headers, + params=params, + data=data, + json=json, + skip_auto_headers={"USER-AGENT"}, + allow_redirects=allow_redirects, + raise_for_status=True, + ) + + _LOGGER.debug("Response:") + _LOGGER.debug(f" Response Code: {resp.status}") + _LOGGER.debug(f" Headers: {resp.raw_headers}") + _LOGGER.debug(f" Body: {await resp.text()}") + return resp + except ClientResponseError as err: + _LOGGER.debug( + f"Attempt {attempt} request failed with exception : {err.status} - {err.message}" + ) + if err.status == 401: + raise err + last_status = err.status + last_error = err.message + resp_exc = err + except ClientError as err: + _LOGGER.debug( + f"Attempt {attempt} request failed with exception:: {str(err)}" + ) + last_status = "" + last_error = str(err) + resp_exc = err + + raise resp_exc + + async def request_json( + self, + method: str, + url: str, + websession: ClientSession = None, + headers: dict = None, + params: dict = None, + data: dict = None, + json: dict = None, + allow_redirects: bool = False, + ) -> (ClientResponse, dict): + + websession = websession or self._websession + + resp = await self._send_request( + method=method, + url=url, + headers=headers, + params=params, + data=data, + json=json, + allow_redirects=allow_redirects, + websession=websession, + ) + + try: + data = await resp.json(content_type=None) + except JSONDecodeError as err: + message = ( + f"JSON Decoder error {err.msg} in response at line {err.lineno} column {err.colno}. Response " + f"received was:\n{err.doc}" + ) + _LOGGER.error(message) + raise RequestError(message) + + return resp, data + + async def request_text( + self, + method: str, + url: str, + websession: ClientSession = None, + headers: dict = None, + params: dict = None, + data: dict = None, + json: dict = None, + allow_redirects: bool = False, + ) -> (ClientResponse, str): + + websession = websession or self._websession + + resp = await self._send_request( + method=method, + url=url, + headers=headers, + params=params, + data=data, + json=json, + allow_redirects=allow_redirects, + websession=websession, + ) + + return resp, await resp.text() + + async def request_response( + self, + method: str, + url: str, + websession: ClientSession = None, + headers: dict = None, + params: dict = None, + data: dict = None, + json: dict = None, + allow_redirects: bool = False, + ) -> (ClientResponse, None): + + websession = websession or self._websession + + return ( + await self._send_request( + method=method, + url=url, + headers=headers, + params=params, + data=data, + json=json, + allow_redirects=allow_redirects, + websession=websession, + ), + None, + ) diff --git a/requirements.txt b/requirements.txt index 6148cca..feb8c9a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ -i https://pypi.python.org/simple aiohttp>=3.7 +beautifulsoup4>=4.9.3 +pkce>=1.0.2 diff --git a/requirements_dev.txt b/requirements_dev.txt index c519536..990d1fd 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -1,6 +1,8 @@ -i https://pypi.python.org/simple aiohttp>=3.7 +beautifulsoup4>=4.9.3 black==20.8b1 flake8>=3.8.4 +pkce>=1.0.2 twine>=3.3.0 wheel>=0.36.2