-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrule_engine.py
168 lines (139 loc) · 5.25 KB
/
rule_engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from ast_structure import Node
def tokenize(rule_string):
tokens = []
current = []
in_quotes = False
i = 0
while i < len(rule_string):
char = rule_string[i]
if char == "'":
if in_quotes and i + 1 < len(rule_string) and rule_string[i + 1] == "'":
current.append("'")
i += 1
else:
in_quotes = not in_quotes
current.append(char)
elif not in_quotes:
if char in '()':
if current:
tokens.append(''.join(current).strip())
current = []
tokens.append(char)
elif char.isspace():
if current:
tokens.append(''.join(current).strip())
current = []
elif char in ['>', '<', '=', '!']:
if current:
tokens.append(''.join(current).strip())
current = []
if i + 1 < len(rule_string) and rule_string[i:i+2] in ['>=', '<=', '!=']:
tokens.append(rule_string[i:i+2])
i += 1
else:
tokens.append(char)
else:
current.append(char)
else:
current.append(char)
i += 1
if current:
tokens.append(''.join(current).strip())
return [t for t in tokens if t]
def parse_expression(tokens, pos=0):
stack = []
operators = []
def apply_operator():
if len(stack) < 2:
return
right = stack.pop()
left = stack.pop()
op = operators.pop()
stack.append(Node("operator", value=op, left=left, right=right))
while pos < len(tokens):
token = tokens[pos]
if token == '(':
subexpr, new_pos = parse_expression(tokens, pos + 1)
if subexpr:
stack.append(subexpr)
pos = new_pos
elif token == ')':
while operators:
apply_operator()
return stack[0] if stack else None, pos
elif token.upper() in ['AND', 'OR']:
while operators and operators[-1] in ['AND', 'OR']:
if (operators[-1] == 'AND') or (operators[-1] == 'OR' and token.upper() == 'AND'):
apply_operator()
else:
break
operators.append(token.upper())
else:
if pos + 2 < len(tokens):
op = tokens[pos + 1]
value = tokens[pos + 2]
if op in ['>', '<', '>=', '<=', '=', '!=']:
condition = f"{token} {op} {value}"
stack.append(Node("operand", value=condition))
pos += 2
else:
stack.append(Node("operand", value=token))
else:
stack.append(Node("operand", value=token))
pos += 1
while operators and len(stack) >= 2:
apply_operator()
return stack[0] if stack else None, pos + 1
def create_rule(rule_string):
try:
rule_string = ' '.join(rule_string.split())
tokens = tokenize(rule_string)
ast, _ = parse_expression(tokens)
return ast
except Exception as e:
print(f"Error parsing rule: {str(e)}")
return None
def evaluate_rule(rule_ast, data):
if not rule_ast:
return False
if rule_ast.type == "operand":
try:
field, op, value = rule_ast.value.split()
if value.startswith(("'", '"')) and value.endswith(("'", '"')):
value = value[1:-1]
elif value.replace('.', '').isdigit():
value = float(value)
if field not in data:
return False
actual_value = data[field]
if isinstance(value, (int, float)):
try:
actual_value = float(actual_value)
except (ValueError, TypeError):
return False
if op == '>': return actual_value > value
if op == '<': return actual_value < value
if op == '>=': return actual_value >= value
if op == '<=': return actual_value <= value
if op == '=': return actual_value == value
if op == '!=': return actual_value != value
except (ValueError, KeyError) as e:
print(f"Error evaluating condition: {str(e)}")
return False
if rule_ast.type == "operator":
left_result = evaluate_rule(rule_ast.left, data)
right_result = evaluate_rule(rule_ast.right, data)
if rule_ast.value == "AND":
return left_result and right_result
elif rule_ast.value == "OR":
return left_result or right_result
return False
def format_ast(node, level=0):
indent = " " * level
if hasattr(node, 'operator'):
result = f"{indent}{node.operator}\n"
for child in node.children:
result += format_ast(child, level + 1)
return result
else:
return f"{indent}{str(node)}\n"