diff --git a/setup.py b/setup.py index 0ee97c0..1ecf379 100755 --- a/setup.py +++ b/setup.py @@ -156,7 +156,8 @@ def find_version(*paths): # package dependencies install_requires=[ - 'requests >= 1.15.1', + 'requests >= 2.10.0', + 'PySocks >= 1.5.6', 'dict2xml', 'f5-icontrol-rest', 'ciscoisesdk' diff --git a/src/rest/connector/__init__.py b/src/rest/connector/__init__.py index 5f82f5b..85dea7b 100644 --- a/src/rest/connector/__init__.py +++ b/src/rest/connector/__init__.py @@ -54,8 +54,8 @@ def __getattribute__(self, name): # Selector of methods/attributes to pick from abstracted # Can't use __getattr__ as BaseConnection is abstract and some already # exists - if name in ['api', 'get', 'post', 'put', 'patch', 'delete', - 'connect', 'disconnect', 'connected']: + if name in ['api', 'options', 'head', 'get', 'post', 'put', 'patch', + 'delete', 'connect', 'disconnect', 'connected']: return getattr(self._implementation, name) # Send the rest to normal __getattribute__ diff --git a/src/rest/connector/implementation.py b/src/rest/connector/implementation.py index e675a12..c6a1e1d 100644 --- a/src/rest/connector/implementation.py +++ b/src/rest/connector/implementation.py @@ -67,6 +67,16 @@ def disconnect(self): raise NotImplementedError + def options(self, *args, **kwargs): + '''OPTIONS REST Command to retrieve supported methods''' + + raise NotImplementedError + + def head(self, *args, **kwargs): + '''HEAD REST Command to retrieve header fields''' + + raise NotImplementedError + def get(self, *args, **kwargs): '''GET REST Command to retrieve information from the device''' diff --git a/src/rest/connector/libs/iosxe/implementation.py b/src/rest/connector/libs/iosxe/implementation.py index 843c2d2..839c69e 100644 --- a/src/rest/connector/libs/iosxe/implementation.py +++ b/src/rest/connector/libs/iosxe/implementation.py @@ -1,20 +1,21 @@ -import json -import logging -import re -import urllib.request -import requests -from dict2xml import dict2xml +from json import dumps as json_dumps +from logging import getLogger +from typing import Optional, Union + +from urllib.parse import urljoin +from requests import (codes as requests_codes, + Response as requests_Response) from requests.exceptions import RequestException from pyats.connections import BaseConnection -from rest.connector.implementation import Implementation as RestImplementation +from rest.connector.restconf_implementation import RestconfImplementation from rest.connector.utils import get_username_password # create a logger for this module -log = logging.getLogger(__name__) +log = getLogger(__name__) -class Implementation(RestImplementation): +class Implementation(RestconfImplementation): '''Rest Implementation for IOS-XE Implementation of Rest connection to IOS-XE devices supporting RESTCONF @@ -49,25 +50,34 @@ class Implementation(RestImplementation): >>> device.rest.connected True ''' + def __init__(self, *args, **kwargs): + """ + Prepare the object for consupmption and define any attributes not + explicitly set in the super() call. + + :param args: Arguments + :param kwargs: Keyword Arguments + """ super().__init__(*args, **kwargs) - if 'proxies' not in kwargs: - self.proxies = urllib.request.getproxies() + self.base_url = self.url + self.content_type = None @BaseConnection.locked def connect(self, - timeout=30, - default_content_type='json', - verbose=False, - port="443", - protocol='https'): + timeout: Optional[int] = None, + default_content_type: str = "json", + verbose: bool = False, + port: Union[int, str, None] = 443, # Legacy - port defined in testbed + protocol: Optional[str] = "https") -> None: # Legacy - protocol defined in testbed '''connect to the device via REST Arguments --------- - timeout (int): Timeout value - default_content_type: Default for content type, json or xml + timeout: (int) Timeout value + default_content_type: (str: json|xml) Default for content type, json or xml + verbose: (bool) Display more detail about the connection proxies: Specify the proxy to use for connection as seen below. {'http': 'http://proxy.esl.cisco.com:80/', 'ftp': 'http://proxy.esl.cisco.com:80/', @@ -92,104 +102,80 @@ def connect(self, if self.connected: return - log.debug("Content type: %s" % default_content_type) - log.debug("Timeout: %s" % timeout) - self.content_type = default_content_type - - # support sshtunnel - if 'sshtunnel' in self.connection_info: - try: - from unicon.sshutils import sshtunnel - except ImportError: - raise ImportError( - '`unicon` is not installed for `sshtunnel`. Please install by `pip install unicon`.' - ) - try: - tunnel_port = sshtunnel.auto_tunnel_add(self.device, self.via) - if tunnel_port: - ip = self.device.connections[self.via].sshtunnel.tunnel_ip - port = tunnel_port - except AttributeError as e: - raise AttributeError( - "Cannot add ssh tunnel. Connection %s may not have ip/host or port.\n%s" - % (self.via, e)) - else: - ip = self.connection_info.ip.exploded - port = self.connection_info.get('port', port) - - if 'protocol' in self.connection_info: - protocol = self.connection_info['protocol'] - - self.base_url = '{protocol}://{ip}:{port}'.format(protocol=protocol, - ip=ip, - port=port) + log.debug("Content type: %s", default_content_type) + log.debug("Timeout: %s", timeout) # --------------------------------------------------------------------- # Connect to "well-known" RESTCONF resource to "test", the # RESTCONF connection on 'connect'. Comparable to CLI (SSH) connection, # which triggers a "show version" on connect # --------------------------------------------------------------------- - log.info("Connecting to '{d}' with alias " - "'{a}'".format(d=self.device.name, a=self.alias)) - login_url = '{f}/restconf/data/Cisco-IOS-XE-native:native/version'.format(f=self.base_url) - username, password = get_username_password(self) + log.info("Connecting to '%s' with alias '%s'", + self.device.name, + self.alias) - self.session = requests.Session() + login_url = urljoin(self.base_url, + "/restconf/data/Cisco-IOS-XE-native:native/version") + + username, password = get_username_password(self) self.session.auth = (username, password) - header = 'application/yang-data+{fmt}' + self.content_type = default_content_type + + if timeout is not None: + self.session.timeout = timeout - if default_content_type.lower() == 'json': - accept_header = header.format(fmt='json') - elif default_content_type.lower() == 'xml': - accept_header = header.format(fmt='xml') - else: - accept_header = default_content_type - - self.session.headers.update({'Accept': accept_header}) + # Connect to the device directly via requests and set the connected + # flag on success. - # Connect to the device via requests - response = self.session.get( - login_url, proxies=self.proxies, timeout=timeout, verify=False) + # NOTE - all session attribute already set in __init__ + # (proxies, verify, etc) + response = self.session.get(login_url) output = response.text - log.debug("Response: {c} {r}, headers: {h}, payload {p}".format( - c=response.status_code, - r=response.reason, - h=response.headers, - p=response.text)) + log.debug("Response: %s %s, headers: %s, payload %s", + response.status_code, + response.reason, + response.headers, + response.text) if verbose: - log.info("Response text:\n%s" % output) + log.info("Response text:\n%s", output) # Make sure it returned requests.codes.ok - if response.status_code != requests.codes.ok: + if response.status_code != requests_codes.ok: # Something bad happened - raise RequestException("Connection to '{ip}:{port}' has returned the " - "following code '{c}', instead of the " - "expected status code '{ok}'" - .format(ip=ip, port=port, c=response.status_code, - ok=requests.codes.ok)) - self._is_connected = True - log.info("Connected successfully to '{d}'".format(d=self.device.name)) + # TODO - Custom exception class? + raise RequestException(f"Connection to '{self.host}:{self.port}' has returned the " # self.url??? + f"following code '{response.status_code}', instead of the " + f"expected status code '{requests_codes.ok}'") - return response + self._is_connected = True + log.info("Connected successfully to '%s'", self.device.name) @BaseConnection.locked def disconnect(self): """ Does not make sense to disconnect from a device. """ - self._is_connected = False - return + + log.info("Disconnecting from '%s' with alias '%s'", + self.device.name, self.alias) + try: + self.session.close() + finally: + self._is_connected = False + + log.info("Disconnected successfully from '%s'", + self.device.name) @BaseConnection.locked - def get(self, api_url, content_type=None, headers=None, - expected_status_codes=( - requests.codes.no_content, - requests.codes.ok - ), - timeout=30, - verbose=False): - '''GET REST Command to retrieve information from the device + def options(self, + api_url: str, + content_type: Optional[str] = None, + headers: Optional[dict] = None, + expected_status_codes: Union[list[int], set[int], tuple[int], None] = None, + timeout: Optional[int] = None, + verbose: Optional[bool] = False) -> requests_Response: + '''OPTIONS REST Command to retrieve supported methods from device Arguments --------- @@ -198,152 +184,78 @@ def get(self, api_url, content_type=None, headers=None, headers: dictionary of HTTP headers (optional) expected_status_codes: list of expected result codes (integers) timeout: timeout in seconds (default: 30) + verbose: Enable additional information in output (default: False) ''' - if not self.connected: - raise Exception("'{d}' is not connected for " - "alias '{a}'".format(d=self.device.name, - a=self.alias)) - if content_type is None: - content_type = self.content_type - - full_url = '{b}{a}'.format(b=self.base_url, a=api_url) - - header = 'application/yang-data+{fmt}' - - if content_type.lower() == 'json': - accept_header = header.format(fmt='json') - elif content_type.lower() == 'xml': - accept_header = header.format(fmt='xml') - else: - accept_header = content_type - - self.session.headers.update({'Accept': accept_header}) - if headers is not None: - self.session.headers.update(headers) - - log.debug("Sending GET command to '{d}': " - "{u}".format(d=self.device.name, u=full_url)) - log.debug("Request headers:{headers}".format( - headers=self.session.headers)) - - response = self.session.get(full_url, proxies=self.proxies, timeout=timeout) - output = response.text - log.debug("Response: {c} {r}, headers: {h}".format(c=response.status_code, - r=response.reason, h=response.headers)) - if verbose: - log.info("Output received:\n{output}".format(output=output)) - - # Make sure it returned requests.codes.ok - if response.status_code not in expected_status_codes: - # Something bad happened - raise RequestException("'{c}' result code has been returned " - "instead of the expected status code(s) " - "'{e}' for '{d}'\n{t}" - .format(d=self.device.name, - c=response.status_code, - e=expected_status_codes, - t=response.text)) - return response + return self._request(method="OPTIONS", + url=api_url, + content_type=content_type, + expected_status_codes=expected_status_codes, + verbose=verbose, + timeout=timeout) @BaseConnection.locked - def post(self, api_url, payload='', content_type=None, headers=None, - expected_status_codes=( - requests.codes.created, - requests.codes.no_content, - requests.codes.ok - ), - timeout=30, - verbose=False): - '''POST REST Command to configure information from the device + def head(self, + api_url: str, + content_type: Optional[str] = None, + headers: Optional[dict] = None, + expected_status_codes: Union[list[int], set[int], tuple[int], None] = None, + timeout: Optional[int] = None, + verbose: Optional[bool] = False) -> requests_Response: + '''OPTIONS REST Command to retrieve resource metadata via headers Arguments --------- api_url: API url string - payload: payload to sent, can be string or dict content_type: expected content type to be returned (xml or json) headers: dictionary of HTTP headers (optional) expected_status_codes: list of expected result codes (integers) timeout: timeout in seconds (default: 30) + verbose: Enable additional information in output (default: False) ''' + return self._request(method="HEAD", + url=api_url, + content_type=content_type, + expected_status_codes=expected_status_codes, + verbose=verbose, + timeout=timeout) - if not self.connected: - raise Exception("'{d}' is not connected for " - "alias '{a}'".format(d=self.device.name, - a=self.alias)) - - full_url = '{b}{a}'.format(b=self.base_url, a=api_url) - - request_payload = payload - if isinstance(payload, dict): - assert content_type is not None, 'content_type parameter required when passing dict' - if content_type == 'json': - request_payload = json.dumps(payload) - elif content_type == 'xml': - request_payload = dict2xml(payload) - - if content_type is None: - if re.match("<", payload.lstrip()) is not None: - content_type = 'xml' - else: - content_type = 'json' - - content_type_header = 'application/yang-data+{fmt}' - accept_header = 'application/yang-data+{fmt}' - - if content_type.lower() == 'json': - content_type_header = content_type_header.format(fmt='json') - accept_header = accept_header.format(fmt='json') - elif content_type.lower() == 'xml': - content_type_header = content_type_header.format(fmt='xml') - accept_header = accept_header.format(fmt='xml') - else: - content_type_header = content_type - accept_header = content_type - - self.session.headers.update({'Content-type': content_type_header}) - self.session.headers.update({'Accept': accept_header}) - if headers is not None: - self.session.headers.update(headers) - - log.debug("Sending POST command to '{d}': {u}" - .format(d=self.device.name, u=full_url)) - log.debug("Request headers: {h}\nPayload: {p}" - .format(h=self.session.headers, p=request_payload)) - if verbose: - log.info('Request payload:\n{payload}'.format( - payload=request_payload)) - - # Send to the device - response = self.session.post( - full_url, request_payload, proxies=self.proxies, timeout=timeout) - output = response.text - log.debug("Response: {c} {r}, headers: {h}".format(c=response.status_code, - r=response.reason, h=response.headers)) - if verbose: - log.info("Output received:\n{output}".format(output=output)) + @BaseConnection.locked + def get(self, + api_url: str, + content_type: Optional[str] = None, + headers: Optional[dict] = None, + expected_status_codes: Union[list[int], set[int], tuple[int], None] = None, + timeout: Optional[int] = None, + verbose: Optional[bool] = False) -> requests_Response: + '''GET REST Command to retrieve information from the device - # Make sure it returned requests.codes.ok - if response.status_code not in expected_status_codes: - # Something bad happened - raise RequestException("'{c}' result code has been returned " - "instead of the expected status code(s) " - "'{e}' for '{d}'\n{t}" - .format(d=self.device.name, - c=response.status_code, - e=expected_status_codes, - t=response.text)) - return response + Arguments + --------- + api_url: API url string + content_type: expected content type to be returned (xml or json) + headers: dictionary of HTTP headers (optional) + expected_status_codes: list of expected result codes (integers) + timeout: timeout in seconds (default: 30) + verbose: Enable additional information in output (default: False) + ''' + return self._request(method="GET", + url=api_url, + content_type=content_type, + headers=headers, + expected_status_codes=expected_status_codes, + verbose=verbose, + timeout=timeout) @BaseConnection.locked - def patch(self, api_url, payload, content_type=None, headers=None, - expected_status_codes=( - requests.codes.created, - requests.codes.no_content, - requests.codes.ok - ), - timeout=30, - verbose=False): - '''PATCH REST Command to configure information from the device + def post(self, + api_url: str, + payload: Union[dict, str], + content_type: Optional[str] = None, + headers: Optional[dict] = None, + expected_status_codes: Union[list[int], set[int], tuple[int], None] = None, + timeout: Optional[int] = None, + verbose: Optional[bool] = False) -> requests_Response: + '''POST REST Command to configure information from the device Arguments --------- @@ -353,85 +265,56 @@ def patch(self, api_url, payload, content_type=None, headers=None, headers: dictionary of HTTP headers (optional) expected_status_codes: list of expected result codes (integers) timeout: timeout in seconds (default: 30) + verbose: Enable additional information in output (default: False) ''' + return self._request(method="POST", + url=api_url, + content_type=content_type, + message_body=payload, + headers=headers, + expected_status_codes=expected_status_codes, + verbose=verbose, + timeout=timeout) - if not self.connected: - raise Exception("'{d}' is not connected for " - "alias '{a}'".format(d=self.device.name, - a=self.alias)) - - request_payload = payload - if isinstance(payload, dict): - assert content_type is not None, 'content_type parameter required when passing dict' - if content_type == 'json': - request_payload = json.dumps(payload) - elif content_type == 'xml': - request_payload = dict2xml(payload) - - full_url = '{b}{a}'.format(b=self.base_url, a=api_url) - - if content_type is None: - if re.match("<", payload.lstrip()) is not None: - content_type = 'xml' - else: - content_type = 'json' - - content_type_header = 'application/yang-data+{fmt}' - accept_header = 'application/yang-data+{fmt}' - - if content_type.lower() == 'json': - content_type_header = content_type_header.format(fmt='json') - accept_header = accept_header.format(fmt='json') - elif content_type.lower() == 'xml': - content_type_header = content_type_header.format(fmt='xml') - accept_header = accept_header.format(fmt='xml') - else: - content_type_header = content_type - accept_header = content_type - - self.session.headers.update({'Content-type': content_type_header}) - self.session.headers.update({'Accept': accept_header}) - if headers is not None: - self.session.headers.update(headers) - - log.debug("Sending PATCH command to '{d}': {u}".format( - d=self.device.name, u=full_url)) - log.debug("Request headers: {h}\nPayload:{p}".format(h=self.session.headers, - p=request_payload)) - if verbose: - log.info('Request payload:\n{payload}'.format( - payload=request_payload)) - - # Send to the device - response = self.session.patch( - full_url, request_payload, proxies=self.proxies, timeout=timeout) - output = response.text - log.debug("Response: {c} {r}, headers: {h}".format(c=response.status_code, - r=response.reason, h=response.headers)) - if verbose: - log.info("Output received:\n{output}".format(output=output)) + @BaseConnection.locked + def patch(self, + api_url: str, + payload: Union[dict, str], # , None] = None, + content_type: Optional[str] = None, + headers: Optional[dict] = None, + expected_status_codes: Union[list[int], set[int], tuple[int], None] = None, + timeout: Optional[int] = None, + verbose: Optional[bool] = False) -> requests_Response: + '''PATCH REST Command to configure information from the device - # Make sure it returned requests.codes.ok - if response.status_code not in expected_status_codes: - # Something bad happened - raise RequestException("'{c}' result code has been returned " - "instead of the expected status code(s) " - "'{e}' for '{d}'\n{t}" - .format(d=self.device.name, - c=response.status_code, - e=expected_status_codes, - t=response.text)) - return response + Arguments + --------- + api_url: API url string + payload: payload to sent, can be string or dict + content_type: expected content type to be returned (xml or json) + headers: dictionary of HTTP headers (optional) + expected_status_codes: list of expected result codes (integers) + timeout: timeout in seconds (default: 30) + verbose: Enable additional information in output (default: False) + ''' + return self._request(method="PATCH", + url=api_url, + content_type=content_type, + message_body=payload, + headers=headers, + expected_status_codes=expected_status_codes, + verbose=verbose, + timeout=timeout) @BaseConnection.locked - def put(self, api_url, payload, content_type=None, headers=None, - expected_status_codes=( - requests.codes.created, - requests.codes.no_content, - requests.codes.ok - ), - timeout=30, - verbose=False): + def put(self, + api_url: str, + payload: Union[dict, str], # , None] = None, + content_type: Optional[str] = None, + headers: Optional[dict] = None, + expected_status_codes: Union[list[int], set[int], tuple[int], None] = None, + timeout: Optional[int] = None, + verbose: Optional[bool] = False) -> requests_Response: '''PUT REST Command to configure information from the device Arguments @@ -442,84 +325,25 @@ def put(self, api_url, payload, content_type=None, headers=None, headers: dictionary of HTTP headers (optional) expected_status_codes: list of expected result codes (integers) timeout: timeout in seconds (default: 30) + verbose: Enable additional information in output (default: False) ''' - - if not self.connected: - raise Exception("'{d}' is not connected for " - "alias '{a}'".format(d=self.device.name, - a=self.alias)) - - full_url = '{b}{a}'.format(b=self.base_url, a=api_url) - - request_payload = payload - if isinstance(payload, dict): - assert content_type != None, 'content_type parameter required when passing dict' - if content_type == 'json': - request_payload = json.dumps(payload) - elif content_type == 'xml': - request_payload = dict2xml(payload) - - if content_type is None: - if re.match("<", payload.lstrip()) is not None: - content_type = 'xml' - else: - content_type = 'json' - - content_type_header = 'application/yang-data+{fmt}' - accept_header = 'application/yang-data+{fmt}' - - if content_type.lower() == 'json': - content_type_header = content_type_header.format(fmt='json') - accept_header = accept_header.format(fmt='json') - elif content_type.lower() == 'xml': - content_type_header = content_type_header.format(fmt='xml') - accept_header = accept_header.format(fmt='xml') - else: - content_type_header = content_type - accept_header = content_type - - self.session.headers.update({'Content-type': content_type_header}) - self.session.headers.update({'Accept': accept_header}) - if headers is not None: - self.session.headers.update(headers) - - log.debug("Sending PUT command to '{d}': {u}".format( - d=self.device.name, u=full_url)) - log.debug("Request headers: {h}\nPayload:{p}".format(h=self.session.headers, - p=request_payload)) - if verbose: - log.info('Request payload:\n{payload}'.format( - payload=request_payload)) - - # Send to the device - response = self.session.put(full_url, request_payload, proxies=self.proxies, timeout=timeout) - output = response.text - log.debug("Response: {c} {r}, headers: {h}".format(c=response.status_code, - r=response.reason, h=response.headers)) - if verbose: - log.info("Output received:\n{output}".format(output=output)) - - # Make sure it returned requests.codes.ok - if response.status_code not in expected_status_codes: - # Something bad happened - raise RequestException("'{c}' result code has been returned " - "instead of the expected status code(s) " - "'{e}' for '{d}'\n{t}" - .format(d=self.device.name, - c=response.status_code, - e=expected_status_codes, - t=response.text)) - return response + return self._request(method="PUT", + url=api_url, + content_type=content_type, + message_body=payload, + headers=headers, + expected_status_codes=expected_status_codes, + verbose=verbose, + timeout=timeout) @BaseConnection.locked - def delete(self, api_url, content_type=None, headers=None, - expected_status_codes=( - requests.codes.created, - requests.codes.no_content, - requests.codes.ok - ), - timeout=30, - verbose=False): + def delete(self, + api_url: str, + content_type: Optional[str] = None, + headers: Optional[dict] = None, + expected_status_codes: Union[list[int], set[int], tuple[int], None] = None, + timeout: Optional[int] = None, + verbose: Optional[bool] = False) -> requests_Response: '''DELETE REST Command to configure information from the device Arguments @@ -529,49 +353,12 @@ def delete(self, api_url, content_type=None, headers=None, headers: dictionary of HTTP headers (optional) expected_status_codes: list of expected result codes (integers) timeout: timeout in seconds (default: 30) + verbose: Enable additional information in output (default: False) ''' - - if not self.connected: - raise Exception("'{d}' is not connected for " - "alias '{a}'".format(d=self.device.name, - a=self.alias)) - - if content_type is None: - content_type = self.content_type - - full_url = '{b}{a}'.format(b=self.base_url, a=api_url) - - if content_type.lower() == 'json': - accept_header = 'application/yang-data+json' - elif content_type.lower() == 'xml': - accept_header = 'application/yang-data+xml' - else: - accept_header = content_type - - self.session.headers.update({'Accept': accept_header}) - if headers is not None: - self.session.headers.update(headers) - - log.debug("Sending DELETE command to '{d}': " - "{u}".format(d=self.device.name, u=full_url)) - log.debug("Request headers:{headers}".format( - headers=self.session.headers)) - - response = self.session.delete(full_url, proxies=self.proxies, timeout=timeout) - output = response.text - log.debug("Response: {c} {r}, headers: {h}".format(c=response.status_code, - r=response.reason, h=response.headers)) - if verbose: - log.info("Output received:\n{output}".format(output=output)) - - # Make sure it returned requests.codes.ok - if response.status_code not in expected_status_codes: - # Something bad happened - raise RequestException("'{c}' result code has been returned " - "instead of the expected status code(s) " - "'{e}' for '{d}'\n{t}" - .format(d=self.device.name, - c=response.status_code, - e=expected_status_codes, - t=response.text)) - return response \ No newline at end of file + return self._request(method="DELETE", + url=api_url, + content_type=content_type, + headers=headers, + expected_status_codes=expected_status_codes, + verbose=verbose, + timeout=timeout) diff --git a/src/rest/connector/restconf_implementation.py b/src/rest/connector/restconf_implementation.py new file mode 100644 index 0000000..49dc501 --- /dev/null +++ b/src/rest/connector/restconf_implementation.py @@ -0,0 +1,267 @@ +""" +The RestconfImplementation class provides a common interface for +implementations using the RESTCONF protocol (RFC8040) code duplication. + +To subclass, inherit +""" + +from logging import getLogger +from typing import Optional, Union +from json import dumps as json_dumps +from ipaddress import ip_address, IPv6Address + +from urllib.request import getproxies as urllib_getproxies +from urllib.parse import urljoin +from urllib3 import disable_warnings +from urllib3.exceptions import InsecureRequestWarning + +from dict2xml import dict2xml + +from pyats.connections import BaseConnection +from rest.connector.implementation import Implementation +from rest.connector.utils import default_response_hook + +from requests import (codes as requests_codes, + Response as requests_Response, + Session as requests_Session,) +from requests.exceptions import RequestException + +log = getLogger(__name__) + +# Default timeout +DEFAULT_REQUEST_TIMEOUT = 30 + +# Default status codes per RFC for each HTTP method. +DEFAULT_STATUS_CODES = { + "OPTIONS": (requests_codes.ok,), + "HEAD": (requests_codes.no_content, + requests_codes.ok,), + "GET": (requests_codes.no_content, + requests_codes.ok,), + "POST": (requests_codes.created, + requests_codes.no_content, + requests_codes.ok,), + "PATCH": (requests_codes.no_content, + requests_codes.ok,), + "PUT": (requests_codes.created, + requests_codes.no_content,), + "DELETE": (requests_codes.no_content,), +} + + +class RestconfImplementation(Implementation): + '''RESTCONF Implementation BaseClass + + Baseclass for Rest connection implementation + + YAML Example + ------------ + + devices: + PE1: + credentials: + rest: + username: admin + password: cisco123 + connections: + a: + protocol: telnet + ip: "1.2.3.4" + port: 2004 + vty: + protocol : telnet + ip : "2.3.4.5" + rest: + class: rest.connector.Rest + host: device.example.com + port: "443" + protocol: https + verify: true + credentials: + rest: + username: admin + password: cisco123 + + Example + ------- + + >>> from pyats.topology import loader + >>> testbed = loader.load('/users/xxx/xxx/asr22.yaml') + >>> device = testbed.devices['PE1'] + >>> device.connect(alias='rest', via='rest') + >>> device.rest.connected + True + ''' + def __init__(self, *args, **kwargs): + '''__init__ instantiates a single connection instance.''' + + # instantiate BaseConnection + super().__init__(*args, **kwargs) + + self._is_connected = False + self.protocol = self.connection_info.get("protocol", "https") + self.port = self.connection_info.get("port", 443) + + self.host = self.connection_info.get("host", self.connection_info.get("ip")) + + if "proxies" in kwargs: + self.proxies = kwargs["proxies"] + elif hasattr(self.connection_info, "proxies"): + self.proxies = self.connection_info.get("proxies") + else: + self.proxies = urllib_getproxies() + + if self.host is None: + raise AttributeError( + "Cannot connect to device. " + f"Connection {self.via} may not have host/ip defined." + ) + + # support sshtunnel + if 'sshtunnel' in self.connection_info: + try: + from unicon.sshutils import sshtunnel + except ImportError as e: + raise ImportError( + "`unicon` is not installed for `sshtunnel`. " + "Please install by `pip install unicon`." + ) from e + try: + tunnel_port = sshtunnel.auto_tunnel_add(self.device, self.via) + if tunnel_port: + self.host = self.device.connections[self.via].sshtunnel.tunnel_ip + self.port = tunnel_port + except AttributeError as e: + raise AttributeError( + f"Cannot add ssh tunnel. Connection {self.via} may not have ip/host or port.\n{e}" + ) from e + + + try: + # If IPv6 address provided, format for proper URL + self.host = ip_address(self.host) + if isinstance(self.host, IPv6Address): + self.host = f"[{self.host.exploded}]" + else: + self.host = self.host.exploded + except ValueError: + # Hostname provided (not IP) - leave as-is + pass + + self.url = f"{self.protocol}://{self.host}:{self.port}/" + + # Create and configure the Session instance + self.session = requests_Session() + + self.session.verify = self.connection_info.get("verify", True) + if self.session.verify: + # remove warnings for insecure HTTPS + disable_warnings(InsecureRequestWarning) + + self.session.timeout = DEFAULT_REQUEST_TIMEOUT + self.session.proxies.update(self.proxies) + + # Attach a default response hook to raise for status and re-raise + # caught exceptions + self.session.hooks["response"] = [default_response_hook] + + @BaseConnection.locked + def _request(self, + method: str, + url: str, + content_type: Optional[str] = None, + message_body: Union[dict, str, None] = None, + headers: Optional[dict] = None, + timeout: Optional[int] = None, + expected_status_codes: Union[list, set, tuple] = None, + verbose: Optional[bool] = False) -> requests_Response: + """ + Handle sending the request to the device and validating the response. + If successful, return the response object. + + :param method: HTTP method (verb) for the request + :param url: Relative URL of the resource to be joined to the base URL + :param content_type: Optional - override the content type + :param message_body: RESTCONF message body to send + :param headers: Optional - override the default headers + :param timeout: Optional - override the default timeout + :param expected_status_codes: Optional - override the expected status codes + :param verbose: Enable additional information about the request + :return: Response object + :raises: None + """ + if not self.connected: + # TODO: Consider more specific or pyATS-specific exception + raise Exception(f"'{self.device.name}' is not connected for alias '{self.alias}'") + + # Set the content-type + header = 'application/yang-data+{fmt}' + content_type = (content_type or self.content_type).lower() + if content_type.lower() in ("json", "xml"): + accept_header = header.format(fmt=content_type) + else: + accept_header = content_type + + self.session.headers.update( + { + "Content-Type": accept_header, + "Accept": accept_header + } + ) + + # Encode the message body for json | xml. Otherwise, leave as-is + if message_body is not None: + if content_type.lower() == "json": + message_body = json_dumps(message_body) + elif content_type.lower() == "xml" and isinstance(message_body, dict): + message_body = dict2xml(message_body) + + # Add any additional headers, but don't update the session headers + if headers is not None: + request_headers = self.session.headers.copy() + request_headers.update(headers) + else: + request_headers = self.session.headers + + # Use session timeout if not provided per-request + if timeout is None: + timeout = self.session.timeout + + # Default expected status codes, if not provided for the request + if expected_status_codes is None: + expected_status_codes = DEFAULT_STATUS_CODES[method.upper()] + + # Complete the URL + resource_url = urljoin(self.base_url, url) + + log.debug("Sending %s command to '%s': %s", method, self.device.name, url) + log.debug("Request headers: %s", self.session.headers) + log.debug("Request timeout: %s", timeout) + log.debug("Valid status codes: %s", expected_status_codes) + if message_body is not None: + log.debug("Message body: %s", message_body) + + # All other request parameters are handled in the Session object + response = self.session.request(method, + resource_url, + data=message_body, + headers=request_headers, + timeout=timeout) + + output = response.text + log.debug("Response: %s %s, headers: %s", + response.status_code, + response.reason, + response.headers) + if verbose: + log.info("Output received:\n%s", output) + + # Make sure it returned requests.codes.ok + if response.status_code not in expected_status_codes: + # Something bad happened + # TODO - custom exception class? + raise RequestException(f"'{response.status_code}' result code has been returned " + f"instead of the expected status code(s) " + f"'{expected_status_codes}' for " + f"'{self.device.name}'\n{response.text}") + return response diff --git a/src/rest/connector/tests/test_iosxe.py b/src/rest/connector/tests/test_iosxe.py deleted file mode 100644 index 6762d0b..0000000 --- a/src/rest/connector/tests/test_iosxe.py +++ /dev/null @@ -1,606 +0,0 @@ -#!/bin/env python -""" Unit tests for the IOS-XE REST implementation """ - -__copyright__ = "# Copyright (c) 2019 by cisco Systems, Inc. All rights reserved." -__author__ = "Maaz Mashood Mohiuddin " - -import os -import unittest -import requests_mock - -from pyats.topology import loader - -from rest.connector import Rest - -HERE = os.path.dirname(__file__) - -@requests_mock.Mocker(kw='mock') -class test_iosxe_test_connector(unittest.TestCase): -# @requests_mock.Mocker(kw='mock') -# class test_iosxe_test_connector(): - - def setUp(self): - self.testbed = loader.load(os.path.join(HERE, 'testbed.yaml')) - self.device = self.testbed.devices['eWLC'] - - def test_connect(self, **kwargs): - connection = Rest(device=self.device, alias='rest', via='rest') - response_text = """{ - "Cisco-IOS-XE-native:version": "17.3" - } - """ - kwargs['mock'].get('https://198.51.100.3:443/restconf/data/Cisco-IOS-XE-native:native/version', text=response_text) - output = connection.connect(verbose=True).text - self.assertEqual(output, response_text) - return connection - - def test_get(self, **kwargs): - connection = self.test_connect() - - response_text = """{ - "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": [ - { - "profile-name": "default-ap-profile", - "description": "default ap profile", - "hyperlocation": { - "pak-rssi-threshold-detection": -50 - }, - "halo-ble-entries": { - "halo-ble-entry": [ - { - "beacon-id": 0 - }, - { - "beacon-id": 1 - }, - { - "beacon-id": 2 - }, - { - "beacon-id": 3 - }, - { - "beacon-id": 4 - } - ] - } - } - ] -} -""" - - kwargs['mock'].get('https://198.51.100.3:443/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile', text=response_text) - output = connection.get('/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile', verbose=True).text - self.assertEqual(output, response_text) - connection.disconnect() - - self.assertEqual(connection.connected, False) - - def test_post(self, **kwargs): - connection = self.test_connect() - - payload = """ - { - "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { - "profile-name": "test-profile", - "description": "test-profile", - "hyperlocation": { - "hyperlocation-enable": true, - "pak-rssi-threshold-detection": -50 - }, - "halo-ble-entries": { - "halo-ble-entry": [ - { - "beacon-id": 0 - }, - { - "beacon-id": 1 - }, - { - "beacon-id": 2 - }, - { - "beacon-id": 3 - }, - { - "beacon-id": 4 - } - ] - } - } -} -""" - url = 'https://198.51.100.3:443/restconf/data/site-cfg-data/ap-cfg-profiles' - kwargs['mock'].post(url, status_code=204) - output = connection.post('/restconf/data/site-cfg-data/ap-cfg-profiles', payload, content_type='json', verbose=True).text - self.assertEqual(output, '') - connection.disconnect() - - self.assertEqual(connection.connected, False) - - - def test_post_dict_payload_without_content_type(self, **kwargs): - connection = self.test_connect() - - payload = { - "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { - "profile-name": "test-profile", - "description": "test-profile", - "hyperlocation": { - "hyperlocation-enable": True, - "pak-rssi-threshold-detection": -50 - }, - "halo-ble-entries": { - "halo-ble-entry": [ - { - "beacon-id": 0 - }, - { - "beacon-id": 1 - }, - { - "beacon-id": 2 - }, - { - "beacon-id": 3 - }, - { - "beacon-id": 4 - } - ] - } - } -} - response_text = "" - - url = 'https://198.51.100.3:443/restconf/data/site-cfg-data/ap-cfg-profiles' - kwargs['mock'].post(url, text=response_text) - try: - output = connection.post('/restconf/data/site-cfg-data/ap-cfg-profiles', payload, verbose=True).text - except AssertionError as e: - self.assertEqual(str(e), 'content_type parameter required when passing dict') - connection.disconnect() - - self.assertEqual(connection.connected, False) - - def test_post_dict_payload_with_json_content_type(self, **kwargs): - connection = self.test_connect() - - payload = { - "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { - "profile-name": "test-profile", - "description": "test-profile", - "hyperlocation": { - "hyperlocation-enable": True, - "pak-rssi-threshold-detection": -50 - }, - "halo-ble-entries": { - "halo-ble-entry": [ - { - "beacon-id": 0 - }, - { - "beacon-id": 1 - }, - { - "beacon-id": 2 - }, - { - "beacon-id": 3 - }, - { - "beacon-id": 4 - } - ] - } - } -} - url = 'https://198.51.100.3:443/restconf/data/site-cfg-data/ap-cfg-profiles' - kwargs['mock'].post(url, status_code=204) - try: - output = connection.post('/restconf/data/site-cfg-data/ap-cfg-profiles', payload, content_type='json', verbose=True).text - except AssertionError as e: - self.assertEqual(str(e), 'content_type parameter required when passing dict') - connection.disconnect() - - self.assertEqual(connection.connected, False) - - def test_post_dict_payload_with_xml_content_type(self, **kwargs): - connection = self.test_connect() - - payload = { - "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { - "profile-name": "test-profile", - "description": "test-profile", - "hyperlocation": { - "hyperlocation-enable": True, - "pak-rssi-threshold-detection": -50 - }, - "halo-ble-entries": { - "halo-ble-entry": [ - { - "beacon-id": 0 - }, - { - "beacon-id": 1 - }, - { - "beacon-id": 2 - }, - { - "beacon-id": 3 - }, - { - "beacon-id": 4 - } - ] - } - } -} - url = 'https://198.51.100.3:443/restconf/data/site-cfg-data/ap-cfg-profiles' - kwargs['mock'].post(url, status_code=204) - try: - output = connection.post('/restconf/data/site-cfg-data/ap-cfg-profiles', payload, content_type='xml', verbose=True).text - except AssertionError as e: - self.assertEqual(str(e), 'content_type parameter required when passing dict') - connection.disconnect() - - self.assertEqual(connection.connected, False) - - - def test_patch(self, **kwargs): - connection = self.test_connect() - - payload = """{ - "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { - "hyperlocation": { - "hyperlocation-enable": true, - "pak-rssi-threshold-detection": -50 - }, - "halo-ble-entries": { - "halo-ble-entry": [ - { - "beacon-id": 0 - }, - { - "beacon-id": 1 - }, - { - "beacon-id": 2 - }, - { - "beacon-id": 3 - }, - { - "beacon-id": 4 - } - ] - } - } -} -""" - url = 'https://198.51.100.3:443/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile=default-ap-profile' - kwargs['mock'].patch(url, status_code=204) - output = connection.patch('/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile=default-ap-profile', payload, content_type='json', verbose=True).text - self.assertEqual(output, '') - connection.disconnect() - - self.assertEqual(connection.connected, False) - - - def test_patch_dict_payload_without_content_type(self, **kwargs): - connection = self.test_connect() - - payload = { - "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { - "hyperlocation": { - "hyperlocation-enable": True, - "pak-rssi-threshold-detection": -50 - }, - "halo-ble-entries": { - "halo-ble-entry": [ - { - "beacon-id": 0 - }, - { - "beacon-id": 1 - }, - { - "beacon-id": 2 - }, - { - "beacon-id": 3 - }, - { - "beacon-id": 4 - } - ] - } - } -} - response_text = "" - - url = 'https://198.51.100.3:443/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile=default-ap-profile' - kwargs['mock'].patch(url, text=response_text) - try: - output = connection.patch('/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile=default-ap-profile', payload, verbose=True).text - except AssertionError as e: - self.assertEqual(str(e), 'content_type parameter required when passing dict') - connection.disconnect() - - self.assertEqual(connection.connected, False) - - def test_patch_dict_payload_with_json_content_type(self, **kwargs): - connection = self.test_connect() - - payload = { - "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { - "profile-name": "test-profile", - "description": "test-profile", - "hyperlocation": { - "hyperlocation-enable": True, - "pak-rssi-threshold-detection": -50 - }, - "halo-ble-entries": { - "halo-ble-entry": [ - { - "beacon-id": 0 - }, - { - "beacon-id": 1 - }, - { - "beacon-id": 2 - }, - { - "beacon-id": 3 - }, - { - "beacon-id": 4 - } - ] - } - } -} - url = 'https://198.51.100.3:443/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile=default-ap-profile' - kwargs['mock'].patch(url, status_code=204) - try: - output = connection.patch('/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile=default-ap-profile', payload, content_type='json', verbose=True).text - except AssertionError as e: - self.assertEqual(str(e), 'content_type parameter required when passing dict') - connection.disconnect() - - self.assertEqual(connection.connected, False) - - def test_patch_dict_payload_with_xml_content_type(self, **kwargs): - connection = self.test_connect() - - payload = { - "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { - "hyperlocation": { - "hyperlocation-enable": True, - "pak-rssi-threshold-detection": -50 - }, - "halo-ble-entries": { - "halo-ble-entry": [ - { - "beacon-id": 0 - }, - { - "beacon-id": 1 - }, - { - "beacon-id": 2 - }, - { - "beacon-id": 3 - }, - { - "beacon-id": 4 - } - ] - } - } -} - url = 'https://198.51.100.3:443/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile=default-ap-profile' - kwargs['mock'].patch(url, status_code=204) - try: - output = connection.patch('/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile=default-ap-profile', payload, content_type='xml', verbose=True).text - except AssertionError as e: - self.assertEqual(str(e), 'content_type parameter required when passing dict') - connection.disconnect() - - self.assertEqual(connection.connected, False) - - - def test_put(self, **kwargs): - connection = self.test_connect() - - payload = """{ - "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { - "profile-name": "test-profile", - "description": "test-profile", - "hyperlocation": { - "hyperlocation-enable": true, - "pak-rssi-threshold-detection": -50 - }, - "halo-ble-entries": { - "halo-ble-entry": [ - { - "beacon-id": 0 - }, - { - "beacon-id": 1 - }, - { - "beacon-id": 2 - }, - { - "beacon-id": 3 - }, - { - "beacon-id": 4 - } - ] - } - } -} -""" - url = 'https://198.51.100.3:443/restconf/data/site-cfg-data/ap-cfg-profiles' - kwargs['mock'].put(url, status_code=204) - output = connection.put('/restconf/data/site-cfg-data/ap-cfg-profiles', payload, content_type='json', verbose=True).text - self.assertEqual(output, '') - connection.disconnect() - - self.assertEqual(connection.connected, False) - - - def test_put_dict_payload_without_content_type(self, **kwargs): - connection = self.test_connect() - - payload = { - "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { - "profile-name": "test-profile", - "description": "test-profile", - "hyperlocation": { - "hyperlocation-enable": True, - "pak-rssi-threshold-detection": -50 - }, - "halo-ble-entries": { - "halo-ble-entry": [ - { - "beacon-id": 0 - }, - { - "beacon-id": 1 - }, - { - "beacon-id": 2 - }, - { - "beacon-id": 3 - }, - { - "beacon-id": 4 - } - ] - } - } -} - response_text = "" - - url = 'https://198.51.100.3:443/restconf/data/site-cfg-data/ap-cfg-profiles' - kwargs['mock'].put(url, text=response_text) - try: - output = connection.put('/restconf/data/site-cfg-data/ap-cfg-profiles', payload, verbose=True).text - except AssertionError as e: - self.assertEqual(str(e), 'content_type parameter required when passing dict') - connection.disconnect() - - self.assertEqual(connection.connected, False) - - def test_put_dict_payload_with_json_content_type(self, **kwargs): - connection = self.test_connect() - - payload = { - "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { - "profile-name": "test-profile", - "description": "test-profile", - "hyperlocation": { - "hyperlocation-enable": True, - "pak-rssi-threshold-detection": -50 - }, - "halo-ble-entries": { - "halo-ble-entry": [ - { - "beacon-id": 0 - }, - { - "beacon-id": 1 - }, - { - "beacon-id": 2 - }, - { - "beacon-id": 3 - }, - { - "beacon-id": 4 - } - ] - } - } -} - url = 'https://198.51.100.3:443/restconf/data/site-cfg-data/ap-cfg-profiles' - kwargs['mock'].put(url, status_code=204) - try: - output = connection.put('/restconf/data/site-cfg-data/ap-cfg-profiles', payload, content_type='json', verbose=True).text - except AssertionError as e: - self.assertEqual(str(e), 'content_type parameter required when passing dict') - connection.disconnect() - - self.assertEqual(connection.connected, False) - - def test_put_dict_payload_with_xml_content_type(self, **kwargs): - connection = self.test_connect() - - payload = { - "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { - "profile-name": "test-profile", - "description": "test-profile", - "hyperlocation": { - "hyperlocation-enable": True, - "pak-rssi-threshold-detection": -50 - }, - "halo-ble-entries": { - "halo-ble-entry": [ - { - "beacon-id": 0 - }, - { - "beacon-id": 1 - }, - { - "beacon-id": 2 - }, - { - "beacon-id": 3 - }, - { - "beacon-id": 4 - } - ] - } - } -} - url = 'https://198.51.100.3:443/restconf/data/site-cfg-data/ap-cfg-profiles' - kwargs['mock'].put(url, status_code=204) - try: - output = connection.put('/restconf/data/site-cfg-data/ap-cfg-profiles', payload, content_type='xml', verbose=True).text - except AssertionError as e: - self.assertEqual(str(e), 'content_type parameter required when passing dict') - connection.disconnect() - - self.assertEqual(connection.connected, False) - - - def test_delete(self, **kwargs): - connection = self.test_connect() - - url = 'https://198.51.100.3:443/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile=test-profile' - kwargs['mock'].delete(url, status_code=204) - output = connection.delete('/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile=test-profile', verbose=True).text - self.assertEqual(output, '') - connection.disconnect() - - self.assertEqual(connection.connected, False) - - -if __name__ == "__main__": - import sys - import logging - - logging.basicConfig(stream=sys.stderr, level=logging.WARNING, format="%(asctime)s [%(levelname)8s]: %(message)s") - logger = logging.getLogger('rest') - logger.setLevel(logging.DEBUG) - unittest.main() \ No newline at end of file diff --git a/src/rest/connector/tests/test_iosxe/__init__.py b/src/rest/connector/tests/test_iosxe/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/rest/connector/tests/test_iosxe/fixtures.py b/src/rest/connector/tests/test_iosxe/fixtures.py new file mode 100644 index 0000000..1bb1580 --- /dev/null +++ b/src/rest/connector/tests/test_iosxe/fixtures.py @@ -0,0 +1,59 @@ +import json + +TEST_PAYLOAD_DICT = { + "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": [ + { + "profile-name": "default-ap-profile", + "description": "default ap profile", + "hyperlocation": { + "pak-rssi-threshold-detection": -50 + }, + "halo-ble-entries": { + "halo-ble-entry": [ + { + "beacon-id": 0 + }, + { + "beacon-id": 1 + }, + { + "beacon-id": 2 + }, + { + "beacon-id": 3 + }, + { + "beacon-id": 4 + } + ] + } + } + ] +} + +TEST_PAYLOAD_JSON = json.dumps(TEST_PAYLOAD_DICT) + +TEST_PAYLOAD_XML = """ + default ap profile + + + 0 + + + 1 + + + 2 + + + 3 + + + 4 + + + + -50 + + default-ap-profile +""" \ No newline at end of file diff --git a/src/rest/connector/tests/test_iosxe/test_iosxe.py b/src/rest/connector/tests/test_iosxe/test_iosxe.py new file mode 100644 index 0000000..0ce31be --- /dev/null +++ b/src/rest/connector/tests/test_iosxe/test_iosxe.py @@ -0,0 +1,618 @@ +#!/bin/env python +""" Unit tests for the IOS-XE REST implementation """ + +__copyright__ = "# Copyright (c) 2019 by cisco Systems, Inc. All rights reserved." +__author__ = "Maaz Mashood Mohiuddin " + +import unittest +import json + +from urllib.parse import urljoin +import requests_mock + +from requests import RequestException + +from rest.connector import Rest +from rest.connector.tests.utils import (generate_mock_server_url, + get_testbed, + return_all_request_data_from_mocker) + +from .fixtures import (TEST_PAYLOAD_DICT, + TEST_PAYLOAD_JSON, + TEST_PAYLOAD_XML) + + +@requests_mock.Mocker(kw='mock') +class test_iosxe_test_connector(unittest.TestCase): + + def setUp(self): + self.testbed = get_testbed() + self.device = self.testbed.devices['eWLC'] + self.testbed_connection_names = ["rest", "rest-ipv6", "rest-fqdn"] + + def generate_connection(self, **kwargs): + try: + connection_name = kwargs["connection_name"] + except KeyError: + connection_name = "rest" + + connection = Rest(device=self.device, alias="rest", via=connection_name) + + response_text = """{ + "Cisco-IOS-XE-native:version": "17.3" + } + """ + + # Set the mocker URL + mocker_url = generate_mock_server_url(self.device, connection_name) + login_url = urljoin(mocker_url, "/restconf/data/Cisco-IOS-XE-native:native/version") + kwargs['mock'].get(login_url, text=response_text) + connection.connect(verbose=True) + return connection + + def test_connect(self, **kwargs): + for connection_name in self.testbed_connection_names: + with self.subTest(connection_name=connection_name): + connection = self.generate_connection(connection_name=connection_name, **kwargs) + self.assertEqual(connection.connected, True) + + + def test_get(self, **kwargs): + response_text = TEST_PAYLOAD_JSON + + for connection_name in self.testbed_connection_names: + with self.subTest(connection_name=connection_name): + connection = self.generate_connection(connection_name=connection_name, **kwargs) + + mocker_url = generate_mock_server_url(self.device, connection_name) + restconf_path = "/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile" + kwargs['mock'].get(urljoin(mocker_url, restconf_path), text=response_text) + + output = connection.get(restconf_path, verbose=True).text + self.assertEqual(output, response_text) + + connection.disconnect() + self.assertEqual(connection.connected, False) + + def test_post(self, **kwargs): + payload = TEST_PAYLOAD_DICT + + for connection_name in self.testbed_connection_names: + with self.subTest(connection_name=connection_name): + connection = self.generate_connection(connection_name=connection_name, **kwargs) + + mocker_url = generate_mock_server_url(self.device, connection_name) + restconf_path = "/restconf/data/site-cfg-data/ap-cfg-profiles" + kwargs['mock'].post(urljoin(mocker_url, restconf_path), status_code=204) + + output = connection.post(restconf_path, payload, content_type='json', verbose=True).text + self.assertEqual(output, '') + + connection.disconnect() + self.assertEqual(connection.connected, False) + + + def test_post_xml_payload_without_content_type(self, **kwargs): + """ + Send an XML payload with the implementation default content-type (JSON) + should result in a server returning a 400-series which should be + caught by the response hook. + """ + payload = TEST_PAYLOAD_DICT + + for connection_name in self.testbed_connection_names: + with self.subTest(connection_name=connection_name): + connection = self.generate_connection(connection_name=connection_name, **kwargs) + + mocker_url = generate_mock_server_url(self.device, connection_name) + restconf_path = "/restconf/data/site-cfg-data/ap-cfg-profiles" + kwargs['mock'].post(urljoin(mocker_url, restconf_path), status_code=400) + + with self.assertRaises(RequestException): + connection.post(restconf_path, payload, verbose=True, content_type=None).text + + connection.disconnect() + self.assertEqual(connection.connected, False) + + def test_post_dict_payload_with_json_content_type(self, **kwargs): + """ + Default content-type is JSON, so a successful response should be + returned + """ + payload = TEST_PAYLOAD_DICT + + for connection_name in self.testbed_connection_names: + with self.subTest(connection_name=connection_name): + connection = self.generate_connection(connection_name=connection_name, **kwargs) + + # Set the mocker URL + mocker_url = generate_mock_server_url(self.device, connection_name) + restconf_path = "/restconf/data/site-cfg-data/ap-cfg-profiles" + kwargs['mock'].post(urljoin(mocker_url, restconf_path), status_code=204) + + output = connection.post(restconf_path, payload, content_type='json', verbose=True).text + self.assertEqual(output, '') + + connection.disconnect() + self.assertEqual(connection.connected, False) + + def test_post_dict_payload_with_xml_content_type(self, **kwargs): + + payload = TEST_PAYLOAD_DICT + expected_response = TEST_PAYLOAD_XML + + for connection_name in self.testbed_connection_names: + with self.subTest(connection_name=connection_name): + connection = self.generate_connection(connection_name=connection_name, **kwargs) + + # Set the mocker URL + mocker_url = generate_mock_server_url(self.device, connection_name) + restconf_path = "/restconf/data/site-cfg-data/ap-cfg-profiles" + kwargs['mock'].post(urljoin(mocker_url, restconf_path), text=return_all_request_data_from_mocker) + + output = connection.post(restconf_path, payload, content_type='xml', verbose=True).json() + self.assertEqual(output.get("body"), expected_response) + + connection.disconnect() + self.assertEqual(connection.connected, False) + + + def test_patch(self, **kwargs): + + payload = """{ + "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { + "hyperlocation": { + "hyperlocation-enable": true, + "pak-rssi-threshold-detection": -50 + }, + "halo-ble-entries": { + "halo-ble-entry": [ + { + "beacon-id": 0 + }, + { + "beacon-id": 1 + }, + { + "beacon-id": 2 + }, + { + "beacon-id": 3 + }, + { + "beacon-id": 4 + } + ] + } + } +} +""" + + for connection_name in self.testbed_connection_names: + with self.subTest(connection_name=connection_name): + connection = self.generate_connection(connection_name=connection_name, **kwargs) + + # Set the mocker URL + mocker_url = generate_mock_server_url(self.device, connection_name) + restconf_path = "/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile=default-ap-profile" + kwargs['mock'].patch(urljoin(mocker_url, restconf_path), status_code=204) + output = connection.patch(restconf_path, payload, content_type='json', verbose=True).text + self.assertEqual(output, '') + connection.disconnect() + + self.assertEqual(connection.connected, False) + + + def test_patch_dict_payload_without_content_type(self, **kwargs): + + payload = { + "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { + "hyperlocation": { + "hyperlocation-enable": True, + "pak-rssi-threshold-detection": -50 + }, + "halo-ble-entries": { + "halo-ble-entry": [ + { + "beacon-id": 0 + }, + { + "beacon-id": 1 + }, + { + "beacon-id": 2 + }, + { + "beacon-id": 3 + }, + { + "beacon-id": 4 + } + ] + } + } +} + response_text = "" + + for connection_name in self.testbed_connection_names: + with self.subTest(connection_name=connection_name): + connection = self.generate_connection(connection_name=connection_name, **kwargs) + + # Set the mocker URL + mocker_url = generate_mock_server_url(self.device, connection_name) + restconf_path = "/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile=default-ap-profile" + kwargs['mock'].patch(urljoin(mocker_url, restconf_path), text=response_text) + try: + output = connection.patch(restconf_path, payload, verbose=True).text + except AssertionError as e: + self.assertEqual(str(e), 'content_type parameter required when passing dict') + connection.disconnect() + + self.assertEqual(connection.connected, False) + + def test_patch_dict_payload_with_json_content_type(self, **kwargs): + + payload = { + "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { + "profile-name": "test-profile", + "description": "test-profile", + "hyperlocation": { + "hyperlocation-enable": True, + "pak-rssi-threshold-detection": -50 + }, + "halo-ble-entries": { + "halo-ble-entry": [ + { + "beacon-id": 0 + }, + { + "beacon-id": 1 + }, + { + "beacon-id": 2 + }, + { + "beacon-id": 3 + }, + { + "beacon-id": 4 + } + ] + } + } +} + + for connection_name in self.testbed_connection_names: + with self.subTest(connection_name=connection_name): + connection = self.generate_connection(connection_name=connection_name, **kwargs) + + # Set the mocker URL + mocker_url = generate_mock_server_url(self.device, connection_name) + restconf_path = "/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile=default-ap-profile" + + kwargs['mock'].patch(urljoin(mocker_url, restconf_path), status_code=204) + try: + output = connection.patch(restconf_path, payload, content_type='json', verbose=True).text + except AssertionError as e: + self.assertEqual(str(e), 'content_type parameter required when passing dict') + connection.disconnect() + + self.assertEqual(connection.connected, False) + + def test_patch_dict_payload_with_xml_content_type(self, **kwargs): + + payload = { + "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { + "hyperlocation": { + "hyperlocation-enable": True, + "pak-rssi-threshold-detection": -50 + }, + "halo-ble-entries": { + "halo-ble-entry": [ + { + "beacon-id": 0 + }, + { + "beacon-id": 1 + }, + { + "beacon-id": 2 + }, + { + "beacon-id": 3 + }, + { + "beacon-id": 4 + } + ] + } + } +} + + for connection_name in self.testbed_connection_names: + with self.subTest(connection_name=connection_name): + connection = self.generate_connection(connection_name=connection_name, **kwargs) + + # Set the mocker URL + mocker_url = generate_mock_server_url(self.device, connection_name) + restconf_path = "/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile=default-ap-profile" + + kwargs['mock'].patch(urljoin(mocker_url, restconf_path), status_code=204) + try: + output = connection.patch(restconf_path, payload, content_type='xml', verbose=True).text + except AssertionError as e: + self.assertEqual(str(e), 'content_type parameter required when passing dict') + connection.disconnect() + + self.assertEqual(connection.connected, False) + + + def test_put(self, **kwargs): + + payload = """{ + "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { + "profile-name": "test-profile", + "description": "test-profile", + "hyperlocation": { + "hyperlocation-enable": true, + "pak-rssi-threshold-detection": -50 + }, + "halo-ble-entries": { + "halo-ble-entry": [ + { + "beacon-id": 0 + }, + { + "beacon-id": 1 + }, + { + "beacon-id": 2 + }, + { + "beacon-id": 3 + }, + { + "beacon-id": 4 + } + ] + } + } +} +""" + + for connection_name in self.testbed_connection_names: + with self.subTest(connection_name=connection_name): + connection = self.generate_connection(connection_name=connection_name, **kwargs) + + # Set the mocker URL + mocker_url = generate_mock_server_url(self.device, connection_name) + restconf_path = "/restconf/data/site-cfg-data/ap-cfg-profiles" + + kwargs['mock'].put(urljoin(mocker_url, restconf_path), status_code=204) + output = connection.put(restconf_path, payload, content_type='json', verbose=True).text + self.assertEqual(output, '') + connection.disconnect() + + self.assertEqual(connection.connected, False) + + + def test_put_dict_payload_without_content_type(self, **kwargs): + + payload = { + "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { + "profile-name": "test-profile", + "description": "test-profile", + "hyperlocation": { + "hyperlocation-enable": True, + "pak-rssi-threshold-detection": -50 + }, + "halo-ble-entries": { + "halo-ble-entry": [ + { + "beacon-id": 0 + }, + { + "beacon-id": 1 + }, + { + "beacon-id": 2 + }, + { + "beacon-id": 3 + }, + { + "beacon-id": 4 + } + ] + } + } +} + response_text = "" + + for connection_name in self.testbed_connection_names: + with self.subTest(connection_name=connection_name): + connection = self.generate_connection(connection_name=connection_name, **kwargs) + + # Set the mocker URL + mocker_url = generate_mock_server_url(self.device, connection_name) + restconf_path = "/restconf/data/site-cfg-data/ap-cfg-profiles" + + kwargs['mock'].put(urljoin(mocker_url, restconf_path), text=response_text, status_code=204) + try: + output = connection.put(restconf_path, payload, verbose=True).text + except AssertionError as e: + self.assertEqual(str(e), 'content_type parameter required when passing dict') + connection.disconnect() + + self.assertEqual(connection.connected, False) + + def test_put_dict_payload_with_json_content_type(self, **kwargs): + + payload = { + "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { + "profile-name": "test-profile", + "description": "test-profile", + "hyperlocation": { + "hyperlocation-enable": True, + "pak-rssi-threshold-detection": -50 + }, + "halo-ble-entries": { + "halo-ble-entry": [ + { + "beacon-id": 0 + }, + { + "beacon-id": 1 + }, + { + "beacon-id": 2 + }, + { + "beacon-id": 3 + }, + { + "beacon-id": 4 + } + ] + } + } +} + + for connection_name in self.testbed_connection_names: + with self.subTest(connection_name=connection_name): + connection = self.generate_connection(connection_name=connection_name, **kwargs) + + # Set the mocker URL + mocker_url = generate_mock_server_url(self.device, connection_name) + restconf_path = "/restconf/data/site-cfg-data/ap-cfg-profiles" + + kwargs['mock'].put(urljoin(mocker_url, restconf_path), status_code=204) + try: + output = connection.put(restconf_path, payload, content_type='json', verbose=True).text + except AssertionError as e: + self.assertEqual(str(e), 'content_type parameter required when passing dict') + connection.disconnect() + + self.assertEqual(connection.connected, False) + + def test_put_dict_payload_with_xml_content_type(self, **kwargs): + + payload = { + "Cisco-IOS-XE-wireless-site-cfg:ap-cfg-profile": { + "profile-name": "test-profile", + "description": "test-profile", + "hyperlocation": { + "hyperlocation-enable": True, + "pak-rssi-threshold-detection": -50 + }, + "halo-ble-entries": { + "halo-ble-entry": [ + { + "beacon-id": 0 + }, + { + "beacon-id": 1 + }, + { + "beacon-id": 2 + }, + { + "beacon-id": 3 + }, + { + "beacon-id": 4 + } + ] + } + } +} + + for connection_name in self.testbed_connection_names: + with self.subTest(connection_name=connection_name): + connection = self.generate_connection(connection_name=connection_name, **kwargs) + + # Set the mocker URL + mocker_url = generate_mock_server_url(self.device, connection_name) + restconf_path = "/restconf/data/site-cfg-data/ap-cfg-profiles" + + kwargs['mock'].put(urljoin(mocker_url, restconf_path), status_code=204) + try: + output = connection.put(restconf_path, payload, content_type='xml', verbose=True).text + except AssertionError as e: + self.assertEqual(str(e), 'content_type parameter required when passing dict') + connection.disconnect() + + self.assertEqual(connection.connected, False) + + + def test_delete(self, **kwargs): + for connection_name in self.testbed_connection_names: + with self.subTest(connection_name=connection_name): + connection = self.generate_connection(connection_name=connection_name, **kwargs) + + # Set the mocker URL + mocker_url = generate_mock_server_url(self.device, connection_name) + restconf_path = "/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile=test-profile" + + kwargs['mock'].delete(urljoin(mocker_url, restconf_path), status_code=204) + output = connection.delete(restconf_path, verbose=True).text + self.assertEqual(output, '') + connection.disconnect() + + self.assertEqual(connection.connected, False) + + def test_response_hook_raises_exception(self, **kwargs): + for connection_name in self.testbed_connection_names: + with self.subTest(connection_name=connection_name): + connection = self.generate_connection(connection_name=connection_name, **kwargs) + mocker_url = generate_mock_server_url(self.device, connection_name) + restconf_path = "/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile=test-profile" + + kwargs['mock'].get(urljoin(mocker_url, restconf_path), status_code=401) + with self.assertRaises(RequestException): + connection.get(restconf_path, verbose=True, timeout=1) + + def test_additional_headers(self, **kwargs): + def return_client_headers(request, context): + """ + Return all received headers to the client in the response body. + """ + context.status_code = 200 + return json.dumps(dict(request.headers)) + + for connection_name in self.testbed_connection_names: + with self.subTest(connection_name=connection_name): + connection = self.generate_connection(connection_name=connection_name, **kwargs) + mocker_url = generate_mock_server_url(self.device, connection_name) + restconf_path = "/restconf/data/site-cfg-data/ap-cfg-profiles/ap-cfg-profile=test-profile" + extra_headers = {"X-NEW-HEADER": "rest-testing"} + + kwargs['mock'].get(urljoin(mocker_url, restconf_path), text=return_client_headers) + + response = connection.get(restconf_path, verbose=True, timeout=1, headers=extra_headers) + + # Is the authorization header present in the response? + self.assertEqual(response.request.headers["Authorization"], + response.json().get("Authorization"), + "Authorization header missing from response") + + self.assertEqual(response.request.headers["X-NEW-HEADER"], + response.json().get("X-NEW-HEADER"), + "Extra header missing from response") + + + self.assertEqual(response.request.headers, + response.json(), + "Returned headers do not match sent.\n" + "Expected: {response.request.headers}\n" + "Received: {response.json()}") + + +if __name__ == "__main__": + import sys + import logging + + logging.basicConfig(stream=sys.stderr, level=logging.WARNING, format="%(asctime)s [%(levelname)8s]: %(message)s") + logger = logging.getLogger('rest') + logger.setLevel(logging.DEBUG) + unittest.main() diff --git a/src/rest/connector/tests/testbed.yaml b/src/rest/connector/tests/testbed.yaml index b7d54a6..b261a73 100644 --- a/src/rest/connector/tests/testbed.yaml +++ b/src/rest/connector/tests/testbed.yaml @@ -38,6 +38,20 @@ devices: port: 443 username: cisco password: cisco + rest-ipv6: + class: rest.connector.Rest + protocol: https + ip: 2001:db8:c15:c0::64:3 + port: 443 + username: cisco + password: cisco + rest-fqdn: + class: rest.connector.Rest + protocol: https + host: test-ewlc.example.com + port: 443 + username: cisco + password: cisco apic: os: apic type: apic diff --git a/src/rest/connector/tests/utils.py b/src/rest/connector/tests/utils.py new file mode 100644 index 0000000..aa26f0b --- /dev/null +++ b/src/rest/connector/tests/utils.py @@ -0,0 +1,64 @@ +""" +Utility functions available to all tests +""" +import os +from logging import getLogger +import json +from ipaddress import IPv6Address + +from pyats.topology import loader + +log = getLogger(__name__) + +HERE = os.path.dirname(__file__) + +def generate_mock_server_url(testbed_device: object, + connection_name: str) -> str: + """ + Common function to generate a mock server base URL based on testbed + attributes (protocol, host, post) + + :param testbed_device: Testbed device object + :param connection_name: Test connection being tested + """ + # Test for IPv6, set the mocker URL accordingly + protocol = testbed_device.connections[connection_name].protocol + port = testbed_device.connections[connection_name].port + + try: + destination_host = testbed_device.connections[connection_name].host + except AttributeError: + destination_host = testbed_device.connections[connection_name].ip + if isinstance(destination_host, IPv6Address): + destination_host = f"[{IPv6Address(destination_host).exploded}]" + + mock_url = f"{protocol}://{destination_host}:{port}/" + return mock_url + +def get_testbed(testbed_file="testbed.yaml"): + """ + Load and return the testbed for unittests + + :param testbed_file: Filename of the testbed to return + """ + return loader.load(os.path.join(HERE, testbed_file)) + +def return_all_request_data_from_mocker(request, context): + """ + Use this function as the "text" parameter in requests_mock to return + the headers and message-body received from a request back to the client. + + Useful to verify that an encoded message-body matches the expected + (e.g. that a dict was actually encoded in XML or that an Authorization + header matches the expected value) + + Example usage: + + kwargs['mock'].get(url, text=return_all_request_data_from_mocker) + """ + response_body = { + "headers": dict(request.headers), + "body": request.text + } + context.status_code = 200 + return json.dumps(response_body) diff --git a/src/rest/connector/utils/__init__.py b/src/rest/connector/utils/__init__.py new file mode 100644 index 0000000..9e6161d --- /dev/null +++ b/src/rest/connector/utils/__init__.py @@ -0,0 +1,12 @@ +"""Supporting functions and classes available to all implementations""" + +# metadata +__version__ = '25.2' +__author__ = ['Jean-Benoit Aubin ', + 'Takashi Higashimura (tahigash) '] + +__contact__ = 'pyats-support@cisco.com' +__copyright__ = 'Cisco Systems, Inc. Cisco Confidential' + +from .utils import * +from .request_hooks import * diff --git a/src/rest/connector/utils/request_hooks.py b/src/rest/connector/utils/request_hooks.py new file mode 100644 index 0000000..eb1bc15 --- /dev/null +++ b/src/rest/connector/utils/request_hooks.py @@ -0,0 +1,29 @@ +""" +Collection of response hooks available for requests Session instances +across all implementations. +""" +from logging import getLogger + +from requests import Response +from requests.exceptions import RequestException + +log = getLogger(__name__) + +def default_response_hook(response: Response, *args, **kwargs) -> Response: + """ + Default response hook. Raise for status, catch any desired exceptions, + then raise a meaningful exception for the caller. + + :param response: Requests response object for the last call + :param args: Additional arguments to pass to the hook + :param kwargs: Additional keyword arguments passed to the hook + :return: Response object, assuming no exceptions re-raised + """ + try: + response.raise_for_status() + except RequestException as e: + log.critical("An error occurred while making request:\n%s\n", + e, + exc_info=True) + raise RequestException(e) from e + return response diff --git a/src/rest/connector/utils.py b/src/rest/connector/utils/utils.py similarity index 100% rename from src/rest/connector/utils.py rename to src/rest/connector/utils/utils.py