-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patharris_scraper.py
138 lines (120 loc) · 5.18 KB
/
arris_scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/env python
# A library to scrape statistics from Arris CM820 and similar cable modems
# Inspired by https://gist.github.com/berg/2651577
import BeautifulSoup
import requests
import time
cm_time_format = '%a %Y-%m-%d %H:%M:%S'
def get_status(baseurl):
# Retrieve and process the page from the modem
url = baseurl + 'status_cgi'
pagedata = requests.get(url).content
timestamp = time.time() # Get the time immediately after retrieval
bs = BeautifulSoup.BeautifulSoup(pagedata)
downstream_table = bs.findAll('table')[1].findAll('tr')[1:]
upstream_table = bs.findAll('table')[3].findAll('tr')[2:]
status_table = bs.findAll('table')[5].findAll('tr')
interface_table = bs.findAll('table')[7].findAll('tr')[1:]
downstream_stats = []
for row in downstream_table:
cols = row.findAll('td')
modem_channel = int(cols[0].string.strip()[-1])
docsis_channel = int(cols[1].string.strip())
frequency = float(cols[2].string.strip().split()[0])
if cols[3].string.strip() == '----':
channel_available = False
power = None
snr = None
modulation = None
octets = None
corrected_errors = None
uncorrectable_errors = None
else:
power = float(cols[3].string.strip().split()[0])
snr = float(cols[4].string.strip().split()[0])
modulation = cols[5].string.strip()
octets = int(cols[6].string.strip())
corrected_errors = int(cols[7].string.strip())
uncorrectable_errors = int(cols[8].string.strip())
channelstats = {'modem_channel': modem_channel,
'dcid': docsis_channel,
'frequency': frequency,
'power': power,
'snr': snr,
'modulation': modulation,
'octets': octets,
'corrected_errors': corrected_errors,
'uncorrectable_errors': uncorrectable_errors}
downstream_stats.append(channelstats)
upstream_stats = []
for row in upstream_table:
cols = row.findAll('td')
modem_channel = int(cols[0].string.strip()[-1])
docsis_channel = int(cols[1].string.strip())
frequency = float(cols[2].string.strip().split()[0])
power = float(cols[3].string.strip().split()[0])
channel_type = cols[4].string.strip()
symbol_rate = int(cols[5].string.strip().split()[0]) * 1000
modulation = cols[6].string.strip()
channelstats = {'modem_channel': modem_channel,
'ucid': docsis_channel,
'frequency': frequency,
'power': power,
'channel_type': channel_type,
'symbol_rate': symbol_rate,
'modulation': modulation}
upstream_stats.append(channelstats)
uptime_split = status_table[0].findAll('td')[1].string.strip().split(':')
uptime_days = int(uptime_split[0].strip().split()[0])
uptime_hours = int(uptime_split[1].strip().split()[0])
uptime_minutes = int(uptime_split[2].strip().split()[0])
uptime = ((((uptime_days * 24) + uptime_hours) * 60) + uptime_minutes) * 60
cpe_split = status_table[1].findAll('td')[1].string.strip().split(',')
cpelist = {}
for entry in cpe_split:
entrystripped = entry.strip()
entrysplit = entrystripped.split('CPE')
cpe_type = entrysplit[0]
cpe_count = int(entrysplit[1].strip('()'))
cpelist[cpe_type] = cpe_count
cm_status = status_table[2].findAll('td')[1].string.strip()
cm_time_string = status_table[3].findAll('td')[1].string.strip()
cm_time = time.mktime(time.strptime(cm_time_string, cm_time_format))
modem_status = {'uptime': uptime,
'cpe': cpelist,
'cm_status': cm_status,
'cm_time': cm_time}
interfaces = []
for row in interface_table:
cols = row.findAll('td')
interface_name = cols[0].string.strip()
provisioning_state = cols[1].string.strip()
interface_state = cols[2].string.strip()
interface_speed = cols[3].string.strip()
mac = cols[4].string.strip()
interface_data = {'name': interface_name,
'provisioned': provisioning_state,
'state': interface_state,
'speed': interface_speed,
'mac': mac}
interfaces.append(interface_data)
status = {'timestamp': timestamp,
'status': modem_status,
'downstream': downstream_stats,
'upstream': upstream_stats,
'interfaces': interfaces}
return status
def get_versions(baseurl):
raise NotImplementedError()
def get_eventlog(baseurl):
raise NotImplementedError()
def get_cmstate(baseurl):
raise NotImplementedError()
def get_productdetails(baseurl):
raise NotImplementedError()
def get_dhcpparams(baseurl):
raise NotImplementedError()
def get_qos(url):
raise NotImplementedError()
def get_config(url):
raise NotImplementedError()