|
| 1 | +import urllib.request |
| 2 | + |
| 3 | +from urllib.parse import urlencode, urlsplit |
| 4 | +from json import loads |
| 5 | +from socket import timeout |
| 6 | +from ssl import _create_unverified_context |
| 7 | +from base64 import b64encode |
| 8 | + |
| 9 | + |
| 10 | +TIMEOUT = 20 |
| 11 | + |
| 12 | + |
| 13 | +def gen_auth(username, password): |
| 14 | + 'Generate basic authorization using username and password' |
| 15 | + return b64encode(f'{username}:{password}'.encode('utf-8')).decode() |
| 16 | + |
| 17 | +def make_url(base_url, endpoint, resource): |
| 18 | + 'Corsair creates URLs using this method' |
| 19 | + base_url = urlsplit(base_url) |
| 20 | + path = base_url.path + f'/{endpoint}/{resource}' |
| 21 | + path = path.replace('//', '/') |
| 22 | + path = path[:-1] if path.endswith('/') else path |
| 23 | + return base_url._replace(path=path).geturl() |
| 24 | + |
| 25 | + |
| 26 | +class Api(object): |
| 27 | + def __init__(self, base_url, username, password, tls_verify=True): |
| 28 | + self.base_url = base_url if base_url[-1] != '/' else base_url[:-1] |
| 29 | + self.auth = gen_auth(username, password) |
| 30 | + self.tls_verify = tls_verify |
| 31 | + self.credentials = (self.base_url, self.auth, self.tls_verify) |
| 32 | + |
| 33 | + self.data = Endpoint(self.credentials, 'data') |
| 34 | + self.op = Endpoint(self.credentials, 'op') |
| 35 | + |
| 36 | + |
| 37 | +class Endpoint(object): |
| 38 | + def __init__(self, credentials, endpoint): |
| 39 | + self.base_url = credentials[0] |
| 40 | + self.endpoint = endpoint |
| 41 | + self.resource = '' |
| 42 | + self.auth = credentials[1] |
| 43 | + self.tls_verify = credentials[2] |
| 44 | + |
| 45 | + def read(self, _resource, **filters): |
| 46 | + self.resource = f'{_resource}.json' # will only deal with JSON outputs |
| 47 | + first_result = 0 if 'firstResult' not in filters else filters['firstResult'] |
| 48 | + max_results = 1000 if 'maxResults' not in filters else filters['maxResults'] |
| 49 | + filters.update({'firstResult':first_result, 'maxResults':max_results}) |
| 50 | + req = Request(make_url(self.base_url, self.endpoint, self.resource), |
| 51 | + self.auth, self.tls_verify) |
| 52 | + try: |
| 53 | + res = req.get(**filters) |
| 54 | + except timeout: |
| 55 | + raise Exception('Operation timedout') |
| 56 | + return loads(res.read()) # test for possible Prime errors |
| 57 | + |
| 58 | + |
| 59 | +class Request(object): |
| 60 | + def __init__(self, url, auth, tls_verify): |
| 61 | + self.url = url |
| 62 | + self.auth = auth |
| 63 | + self.timeout = TIMEOUT |
| 64 | + self.context = None if tls_verify else _create_unverified_context() |
| 65 | + self.headers = { |
| 66 | + 'Content-Type': 'application/json', |
| 67 | + 'Authorization': f'Basic {self.auth}' |
| 68 | + } |
| 69 | + |
| 70 | + def get(self, **filters): |
| 71 | + url = f'{self.url}?{self.dotted_filters(**filters)}' if filters else self.url |
| 72 | + req = urllib.request.Request(url, headers=self.headers, method='GET') |
| 73 | + return urllib.request.urlopen(req, timeout=self.timeout, context=self.context) |
| 74 | + |
| 75 | + def dotted_filters(self, **filters): |
| 76 | + 'Prime filters start with a dot' |
| 77 | + if not filters: |
| 78 | + return '' |
| 79 | + else: |
| 80 | + return f'.{urlencode(filters).replace("&", "&.")}' |
| 81 | + |
| 82 | + |
| 83 | +class Prime(object): |
| 84 | + |
| 85 | + def __init__(self, address, username, password, tls_verify, unknown): |
| 86 | + self.prime = Api(address, username, password, tls_verify) |
| 87 | + self.unknown = unknown |
| 88 | + self.hosts = list() |
| 89 | + |
| 90 | + def run(self, access_points=False): |
| 91 | + '''Extracts devices from Cisco Prime |
| 92 | + access_points: if set to True, will try to get APs data |
| 93 | + Returns False for no errors or True if errors occurred |
| 94 | + ''' |
| 95 | + errors = False |
| 96 | + devices = self.get_devices('Devices') |
| 97 | + for device in devices: |
| 98 | + try: |
| 99 | + self.hosts.append(( |
| 100 | + device['devicesDTO']['ipAddress'], |
| 101 | + device['devicesDTO']['deviceName'] |
| 102 | + )) |
| 103 | + except KeyError: |
| 104 | + errors = True |
| 105 | + |
| 106 | + if access_points: |
| 107 | + aps = self.get_devices('AccessPoints') |
| 108 | + for ap in aps: |
| 109 | + try: |
| 110 | + self.hosts.append(( |
| 111 | + ap['accessPointsDTO']['ipAddress']['address'], |
| 112 | + ap['accessPointsDTO']['model'] |
| 113 | + )) |
| 114 | + except KeyError: |
| 115 | + errors = True |
| 116 | + return errors |
| 117 | + |
| 118 | + def get_devices(self, resource): |
| 119 | + 'This function is used to support run()' |
| 120 | + raw = list() |
| 121 | + res = self.prime.data.read(resource, full='true') |
| 122 | + count = res['queryResponse']['@count'] |
| 123 | + last = res['queryResponse']['@last'] |
| 124 | + raw.extend(res['queryResponse']['entity']) |
| 125 | + while last < count - 1: |
| 126 | + first_result = last + 1 |
| 127 | + last += 1000 |
| 128 | + res = self.prime.data.read( |
| 129 | + resource, |
| 130 | + full='true', |
| 131 | + firstResult=first_result |
| 132 | + ) |
| 133 | + raw.extend(res['queryResponse']['entity']) |
| 134 | + return raw |
| 135 | + |
0 commit comments