-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathlog_parse.py
More file actions
138 lines (123 loc) · 5.85 KB
/
log_parse.py
File metadata and controls
138 lines (123 loc) · 5.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import pandas as pd
import re
class LogParse:
"""Parser for firewall logs"""
def log_parse_id(self):
"""For testing purposes simply return a text message"""
return 'LogParse'
def handle_message(self, df, id):
"""Handles specific syslog messages"""
# https://regex101.com
# https://developers.google.com/edu/python/regular-expressions
# https://www.w3schools.com/python/python_regex.asp
# https://rosie-lang.org
# %ASA-1-103004: (Primary) Other firewall reports this firewall failed. Reason: reason-string.
# Parse the reason out of the text string
if id == 103004:
(message, reason) = df.loc[id, 'Text'].split('Reason: ')
df.loc[id, 'Reason'] = reason.rstrip()
elif id == 114001:
# %ASA-1-114001: Failed to initialize 4GE SSM I/O card (error error_string).
m = re.search(r'card \(error (\w+)\)', df.loc[id, 'Text'])
if m:
df.loc[id, 'Error'] = m.group(1)
elif id == 114009:
# %ASA-3-114009: Failed to set multicast address in 4GE SSM I/O card (error error_string).
m = re.search(r'card \(error (\w+)\)', df.loc[id, 'Text'])
if m:
df.loc[id, 'Error'] = m.group(1)
elif id == 114010:
# %ASA-3-114010: Failed to set multicast hardware address in 4GE SSM I/O card (error error_string).
m = re.search(r'card \(error (\w+)\)', df.loc[id, 'Text'])
if m:
df.loc[id, 'Error'] = m.group(1)
elif id == 114012:
# %ASA-3-114012: Failed to delete multicast hardware address in 4GE SSM I/O card (error error_string).
m = re.search(r'card \(error (\w+)\)', df.loc[id, 'Text'])
if m:
df.loc[id, 'Error'] = m.group(1)
elif id == 114013:
# %ASA-3-114013: Failed to set mac address table in 4GE SSM I/O card (error error_string).
m = re.search(r'card \(error (\w+)\)', df.loc[id, 'Text'])
if m:
df.loc[id, 'Error'] = m.group(1)
elif id == 114017:
# %ASA-3-114017: Failed to get link status in 4GE SSM I/O card (error error_string).
m = re.search(r'card \(error (\w+)\)', df.loc[id, 'Text'])
if m:
df.loc[id, 'Error'] = m.group(1)
elif id == 114009:
# %ASA-3-114009: Failed to initialize 4GE SSM I/O card (error error_string).
m = re.search(r'card \(error (\w+)\)', df.loc[id, 'Text'])
if m:
df.loc[id, 'Error'] = m.group(1)
elif id == 114011:
# %ASA-3-114011: Failed to delete multicast address in 4GE SSM I/O card (error error_string).
m = re.search(r'card \(error (\w+)\)', df.loc[id, 'Text'])
if m:
df.loc[id, 'Error'] = m.group(1)
elif id == 114006:
# %ASA-3-114006: Failed to get port statistics in 4GE SSM I/O card (error error_string).
m = re.search(r'card \(error (\w+)\)', df.loc[id, 'Text'])
if m:
df.loc[id, 'Error'] = m.group(1)
elif id == 114014:
# %ASA-3-114014: Failed to set mac address in 4GE SSM I/O card (error error_string).
m = re.search(r'card \(error (\w+)\)', df.loc[id, 'Text'])
if m:
df.loc[id, 'Error'] = m.group(1)
return df
def parse_asa_logfile(self, asa_lotfile):
"""Parses an ASA logfile and returns everything in a dataframe"""
# Will read all the data into this dict and convert to a dataframe leter
data = {'Date': [],
'Host': [],
'Type': [],
'Severity': [],
'ID': [],
'Message': [],
'IP Address': []}
with open(asa_lotfile, encoding='utf-8') as f:
for line in f:
# Parse an ASA logfile - groups are split as follows
# 1 - timestamp
# 2 = host
# 3 = type
# 4 = severity
# 5 = message ID
# 6 = message
m = re.search(r'^(\w+ \d+ \d+ \d+:\d+:\d+) (\w+) : %(\w+)-(\d)-(\d+): (.+)', line)
if m:
data['Date'].append(m.group(1))
data['Host'].append(m.group(2))
data['Type'].append(m.group(3))
data['Severity'].append(m.group(4))
data['ID'].append(m.group(5))
data['Message'].append(m.group(6))
# Check for an im address in the error message
# this example will use all 10.b.c.d addresses since these
# are classified as private addresses
# https://en.wikipedia.org/wiki/Private_network
m2 = re.search(r'\(error (\d+.\d+.\d+.\d+)\)', m.group(6))
if m2:
data['IP Address'].append(m2.group(1))
# Create the dataframe and convert timestamp
df = pd.DataFrame(data)
df['Date'] = pd.to_datetime(df['Date'])
return df
def parse_syslog_file(self, syslog_file):
"""Returns a dataframe of parsed example syslogs"""
# https://pandas.pydata.org/docs/user_guide/index.html
df = pd.DataFrame()
with open(syslog_file, encoding='utf-8') as f:
for line in f:
# %(Type)-(Severity)-(id): (Text)
m = re.search(r'^%(\w+)-(\d)-(\d+): (.+)', line)
# If the re matched
if m:
id = int(m.group(3))
df.loc[id, 'Type'] = m.group(1)
df.loc[id, 'Severity'] = int(m.group(2))
df.loc[id, 'Text'] = m.group(4).rstrip()
df = self.handle_message(df, id)
return df