@@ -8,15 +8,20 @@ class TestSecConfig(case.TestCase):
8
8
requires_waf = True
9
9
10
10
def apply_config (self , conf_name ):
11
- conf_path = Path (__file__ ).parent / f' ./conf/http_{ conf_name } .conf'
11
+ conf_path = Path (__file__ ).parent / f" ./conf/http_{ conf_name } .conf"
12
12
conf_text = conf_path .read_text ()
13
13
status , log_lines = self .orch .nginx_replace_config (
14
14
conf_text , conf_path .name )
15
15
self .assertEqual (0 , status , log_lines )
16
16
17
+ def _test_config (self , conf_name ):
18
+ conf_path = Path (__file__ ).parent / f"./conf/http_{ conf_name } .conf"
19
+ conf_text = conf_path .read_text ()
20
+ return self .orch .nginx_replace_config (conf_text , conf_path .name )
21
+
17
22
def get_appsec_data (self ):
18
23
self .orch .reload_nginx ()
19
- log_lines = self .orch .sync_service (' agent' )
24
+ log_lines = self .orch .sync_service (" agent" )
20
25
entries = [
21
26
entry for entry in (formats .parse_trace (line )
22
27
for line in log_lines ) if entry is not None
@@ -25,141 +30,140 @@ def get_appsec_data(self):
25
30
for entry in entries :
26
31
for trace in entry :
27
32
for span in trace :
28
- if span .get (' meta' , {}).get (' _dd.appsec.json' ):
29
- return json .loads (span [' meta' ][ ' _dd.appsec.json' ])
30
- self .failureException (' No _dd.appsec.json found in traces' )
33
+ if span .get (" meta" , {}).get (" _dd.appsec.json" ):
34
+ return json .loads (span [" meta" ][ " _dd.appsec.json" ])
35
+ self .failureException (" No _dd.appsec.json found in traces" )
31
36
32
37
def test_custom_templates (self ):
33
- templ_json_path = Path (__file__ ).parent / ' ./conf/templ.json'
34
- templ_html_path = Path (__file__ ).parent / ' ./conf/templ.html'
35
- self .orch .nginx_replace_file (' /tmp/templ.json' ,
38
+ templ_json_path = Path (__file__ ).parent / " ./conf/templ.json"
39
+ templ_html_path = Path (__file__ ).parent / " ./conf/templ.html"
40
+ self .orch .nginx_replace_file (" /tmp/templ.json" ,
36
41
templ_json_path .read_text ())
37
- self .orch .nginx_replace_file (' /tmp/templ.html' ,
42
+ self .orch .nginx_replace_file (" /tmp/templ.html" ,
38
43
templ_html_path .read_text ())
39
44
40
- self .apply_config (' custom_blocking_templates' )
45
+ self .apply_config (" custom_blocking_templates" )
41
46
42
47
headers = {
43
- ' User-Agent' : ' dd-test-scanner-log-block' ,
44
- ' Accept' : ' text/html'
48
+ " User-Agent" : " dd-test-scanner-log-block" ,
49
+ " Accept" : " text/html"
45
50
}
46
51
status , headers , body = self .orch .send_nginx_http_request (
47
- ' /http' , 80 , headers )
52
+ " /http" , 80 , headers )
48
53
self .assertEqual (status , 403 )
49
54
# find content-type header:
50
55
ct = next ((v for k , v in headers if k .lower () == "content-type" ), None )
51
- self .assertEqual (ct , ' text/html;charset=utf-8' )
52
- self .assertTrue (' My custom blocking response' in body )
56
+ self .assertEqual (ct , " text/html;charset=utf-8" )
57
+ self .assertTrue (" My custom blocking response" in body )
53
58
54
59
headers = {
55
- ' User-Agent' : ' dd-test-scanner-log-block' ,
56
- ' Accept' : ' text/json'
60
+ " User-Agent" : " dd-test-scanner-log-block" ,
61
+ " Accept" : " text/json"
57
62
}
58
63
status , headers , body = self .orch .send_nginx_http_request (
59
- ' /http' , 80 , headers )
64
+ " /http" , 80 , headers )
60
65
self .assertEqual (status , 403 )
61
66
ct = next ((v for k , v in headers if k .lower () == "content-type" ), None )
62
- self .assertEqual (ct , ' application/json' )
67
+ self .assertEqual (ct , " application/json" )
63
68
self .assertEqual (
64
69
body ,
65
70
'{"error": "blocked", "details": "my custom json response"}\n ' )
66
71
67
72
def test_appsec_fully_disabled (self ):
68
- self .apply_config (' appsec_fully_disabled' )
73
+ self .apply_config (" appsec_fully_disabled" )
69
74
70
75
headers = {
71
- ' User-Agent' : ' dd-test-scanner-log-block' ,
72
- ' Accept' : ' text/json'
76
+ " User-Agent" : " dd-test-scanner-log-block" ,
77
+ " Accept" : " text/json"
73
78
}
74
- status , _ , _ = self .orch .send_nginx_http_request ('/' , 80 , headers )
79
+ status , _ , _ = self .orch .send_nginx_http_request ("/" , 80 , headers )
75
80
self .assertEqual (status , 200 )
76
81
77
82
def test_bad_custom_template (self ):
78
- self .apply_config ('bad_template_file' )
79
-
80
- msg = self .orch .wait_for_log_message (
81
- 'nginx' ,
82
- '.*Initialising security library failed.*' ,
83
- timeout_secs = 5 )
83
+ # We can't afford to shutdown workers
84
+ status , log_lines = self ._test_config ("bad_template_file" )
85
+ self .assertNotEqual (0 , status , log_lines )
84
86
self .assertTrue (
85
- 'Failed to open file: /file/that/does/not/exist' in msg )
87
+ any ('Failed to open file: "/file/that/does/not/exist"' in line
88
+ for line in log_lines ))
86
89
87
90
def test_bad_rules_file (self ):
88
- self .apply_config ('bad_rules_file' )
89
-
90
- msg = self .orch .wait_for_log_message (
91
- 'nginx' ,
92
- '.*Initialising security library failed.*' ,
93
- timeout_secs = 5 )
94
- self .assertTrue ('Failed to open file: /bad/rules/file' in msg )
91
+ status , log_lines = self ._test_config ("bad_rules_file" )
92
+ self .assertNotEqual (0 , status , log_lines )
93
+ self .assertTrue (
94
+ any ('Failed to open file: "/bad/rules/file' in line
95
+ for line in log_lines ))
95
96
96
97
def test_bad_pool_name (self ):
97
- conf_path = Path (__file__ ).parent / 'conf/http_bad_thread_pool.conf'
98
- conf_text = conf_path .read_text ()
99
- status , log_lines = self .orch .nginx_replace_config (
100
- conf_text , conf_path .name )
98
+ status , log_lines = self ._test_config ("bad_thread_pool" )
101
99
self .assertNotEqual (0 , status , log_lines )
102
100
103
101
self .assertTrue (
104
102
any ('datadog_waf_thread_pool_name: "bad_thread_pool" not found' in
105
103
line for line in log_lines ))
106
104
107
105
def test_multiple_pools (self ):
108
- self .apply_config (' multiple_thread_pools' )
106
+ self .apply_config (" multiple_thread_pools" )
109
107
110
- headers = {' User-Agent' : ' dd-test-scanner-log-block' }
108
+ headers = {" User-Agent" : " dd-test-scanner-log-block" }
111
109
status , _ , _ = self .orch .send_nginx_http_request (
112
- ' /http/a' , 80 , headers )
110
+ " /http/a" , 80 , headers )
113
111
self .assertEqual (status , 403 )
114
112
115
- headers = {' User-Agent' : ' dd-test-scanner-log-block' }
113
+ headers = {" User-Agent" : " dd-test-scanner-log-block" }
116
114
status , _ , _ = self .orch .send_nginx_http_request (
117
- ' /local/' , 80 , headers )
115
+ " /local/" , 80 , headers )
118
116
self .assertEqual (status , 403 )
119
117
120
- headers = {' User-Agent' : ' dd-test-scanner-log-block' }
118
+ headers = {" User-Agent" : " dd-test-scanner-log-block" }
121
119
status , _ , _ = self .orch .send_nginx_http_request (
122
- ' /unmonitored/index.html' , 80 , headers )
120
+ " /unmonitored/index.html" , 80 , headers )
123
121
self .assertEqual (status , 200 )
124
122
125
123
def test_custom_obfuscation (self ):
126
- waf_path = Path (__file__ ).parent / ' ./conf/waf.json'
124
+ waf_path = Path (__file__ ).parent / " ./conf/waf.json"
127
125
waf_text = waf_path .read_text ()
128
- self .orch .nginx_replace_file (' /tmp/waf.json' , waf_text )
126
+ self .orch .nginx_replace_file (" /tmp/waf.json" , waf_text )
129
127
130
- self .apply_config (' custom_obfuscation' )
128
+ self .apply_config (" custom_obfuscation" )
131
129
132
130
# Redaction by key
133
131
# datadog_appsec_obfuscation_key_regex my.special.key;
134
132
status , _ , _ = self .orch .send_nginx_http_request (
135
- ' /http/?my_special_key=matched+value' , 80 )
133
+ " /http/?my_special_key=matched+value" , 80 )
136
134
appsec_data = self .get_appsec_data ()
137
135
self .assertEqual (
138
- appsec_data ['triggers' ][0 ]['rule_matches' ][0 ]['parameters' ][0 ]
139
- ['value' ], '<Redacted>' )
136
+ appsec_data ["triggers" ][0 ]["rule_matches" ][0 ]["parameters" ][0 ]
137
+ ["value" ],
138
+ "<Redacted>" ,
139
+ )
140
140
141
141
# Redaction by value
142
142
# datadog_appsec_obfuscation_value_regex \Az.*;
143
143
status , _ , _ = self .orch .send_nginx_http_request (
144
- ' /http/?the+key=z_matched+value' , 80 )
144
+ " /http/?the+key=z_matched+value" , 80 )
145
145
appsec_data = self .get_appsec_data ()
146
146
self .assertEqual (
147
- appsec_data ['triggers' ][0 ]['rule_matches' ][0 ]['parameters' ][0 ]
148
- ['value' ], '<Redacted>' )
147
+ appsec_data ["triggers" ][0 ]["rule_matches" ][0 ]["parameters" ][0 ]
148
+ ["value" ],
149
+ "<Redacted>" ,
150
+ )
149
151
150
152
def test_no_obfuscation (self ):
151
- waf_path = Path (__file__ ).parent / ' ./conf/waf.json'
153
+ waf_path = Path (__file__ ).parent / " ./conf/waf.json"
152
154
waf_text = waf_path .read_text ()
153
- self .orch .nginx_replace_file (' /tmp/waf.json' , waf_text )
155
+ self .orch .nginx_replace_file (" /tmp/waf.json" , waf_text )
154
156
155
- self .apply_config (' no_obfuscation' )
157
+ self .apply_config (" no_obfuscation" )
156
158
157
- self .orch .sync_service (' agent' )
159
+ self .orch .sync_service (" agent" )
158
160
159
161
# No redaction by key
160
162
status , _ , _ = self .orch .send_nginx_http_request (
161
- ' /http/?password=matched+value' , 80 )
163
+ " /http/?password=matched+value" , 80 )
162
164
appsec_data = self .get_appsec_data ()
163
165
self .assertEqual (
164
- appsec_data ['triggers' ][0 ]['rule_matches' ][0 ]['parameters' ][0 ]
165
- ['value' ], 'matched value' )
166
+ appsec_data ["triggers" ][0 ]["rule_matches" ][0 ]["parameters" ][0 ]
167
+ ["value" ],
168
+ "matched value" ,
169
+ )
0 commit comments