1
1
import json
2
2
from pathlib import Path
3
+ import unittest
3
4
4
5
from .. import case , formats
5
6
@@ -8,15 +9,20 @@ class TestSecConfig(case.TestCase):
8
9
requires_waf = True
9
10
10
11
def apply_config (self , conf_name ):
11
- conf_path = Path (__file__ ).parent / f' ./conf/http_{ conf_name } .conf'
12
+ conf_path = Path (__file__ ).parent / f" ./conf/http_{ conf_name } .conf"
12
13
conf_text = conf_path .read_text ()
13
14
status , log_lines = self .orch .nginx_replace_config (
14
15
conf_text , conf_path .name )
15
16
self .assertEqual (0 , status , log_lines )
16
17
18
+ def _test_config (self , conf_name ):
19
+ conf_path = Path (__file__ ).parent / f"./conf/http_{ conf_name } .conf"
20
+ conf_text = conf_path .read_text ()
21
+ return self .orch .nginx_replace_config (conf_text , conf_path .name )
22
+
17
23
def get_appsec_data (self ):
18
24
self .orch .reload_nginx ()
19
- log_lines = self .orch .sync_service (' agent' )
25
+ log_lines = self .orch .sync_service (" agent" )
20
26
entries = [
21
27
entry for entry in (formats .parse_trace (line )
22
28
for line in log_lines ) if entry is not None
@@ -25,141 +31,140 @@ def get_appsec_data(self):
25
31
for entry in entries :
26
32
for trace in entry :
27
33
for span in trace :
28
- if span .get (' meta' , {}).get (' _dd.appsec.json' ):
29
- return json .loads (span [' meta' ][ ' _dd.appsec.json' ])
30
- self .failureException (' No _dd.appsec.json found in traces' )
34
+ if span .get (" meta" , {}).get (" _dd.appsec.json" ):
35
+ return json .loads (span [" meta" ][ " _dd.appsec.json" ])
36
+ self .failureException (" No _dd.appsec.json found in traces" )
31
37
32
38
def test_custom_templates (self ):
33
- templ_json_path = Path (__file__ ).parent / ' ./conf/templ.json'
34
- templ_html_path = Path (__file__ ).parent / ' ./conf/templ.html'
35
- self .orch .nginx_replace_file (' /tmp/templ.json' ,
39
+ templ_json_path = Path (__file__ ).parent / " ./conf/templ.json"
40
+ templ_html_path = Path (__file__ ).parent / " ./conf/templ.html"
41
+ self .orch .nginx_replace_file (" /tmp/templ.json" ,
36
42
templ_json_path .read_text ())
37
- self .orch .nginx_replace_file (' /tmp/templ.html' ,
43
+ self .orch .nginx_replace_file (" /tmp/templ.html" ,
38
44
templ_html_path .read_text ())
39
45
40
- self .apply_config (' custom_blocking_templates' )
46
+ self .apply_config (" custom_blocking_templates" )
41
47
42
48
headers = {
43
- ' User-Agent' : ' dd-test-scanner-log-block' ,
44
- ' Accept' : ' text/html'
49
+ " User-Agent" : " dd-test-scanner-log-block" ,
50
+ " Accept" : " text/html"
45
51
}
46
52
status , headers , body = self .orch .send_nginx_http_request (
47
- ' /http' , 80 , headers )
53
+ " /http" , 80 , headers )
48
54
self .assertEqual (status , 403 )
49
55
# find content-type header:
50
56
ct = next ((v for k , v in headers if k .lower () == "content-type" ), None )
51
- self .assertEqual (ct , ' text/html;charset=utf-8' )
52
- self .assertTrue (' My custom blocking response' in body )
57
+ self .assertEqual (ct , " text/html;charset=utf-8" )
58
+ self .assertTrue (" My custom blocking response" in body )
53
59
54
60
headers = {
55
- ' User-Agent' : ' dd-test-scanner-log-block' ,
56
- ' Accept' : ' text/json'
61
+ " User-Agent" : " dd-test-scanner-log-block" ,
62
+ " Accept" : " text/json"
57
63
}
58
64
status , headers , body = self .orch .send_nginx_http_request (
59
- ' /http' , 80 , headers )
65
+ " /http" , 80 , headers )
60
66
self .assertEqual (status , 403 )
61
67
ct = next ((v for k , v in headers if k .lower () == "content-type" ), None )
62
- self .assertEqual (ct , ' application/json' )
68
+ self .assertEqual (ct , " application/json" )
63
69
self .assertEqual (
64
70
body ,
65
71
'{"error": "blocked", "details": "my custom json response"}\n ' )
66
72
67
73
def test_appsec_fully_disabled (self ):
68
- self .apply_config (' appsec_fully_disabled' )
74
+ self .apply_config (" appsec_fully_disabled" )
69
75
70
76
headers = {
71
- ' User-Agent' : ' dd-test-scanner-log-block' ,
72
- ' Accept' : ' text/json'
77
+ " User-Agent" : " dd-test-scanner-log-block" ,
78
+ " Accept" : " text/json"
73
79
}
74
- status , _ , _ = self .orch .send_nginx_http_request ('/' , 80 , headers )
80
+ status , _ , _ = self .orch .send_nginx_http_request ("/" , 80 , headers )
75
81
self .assertEqual (status , 200 )
76
82
77
83
def test_bad_custom_template (self ):
78
- self .apply_config ('bad_template_file' )
79
-
80
- msg = self .orch .wait_for_log_message (
81
- 'nginx' ,
82
- '.*Initialising security library failed.*' ,
83
- timeout_secs = 5 )
84
+ # We can't afford to shutdown workers
85
+ status , log_lines = self ._test_config ("bad_template_file" )
86
+ self .assertNotEqual (0 , status , log_lines )
84
87
self .assertTrue (
85
- 'Failed to open file: /file/that/does/not/exist' in msg )
88
+ any ('Failed to open file: "/file/that/does/not/exist"' in line
89
+ for line in log_lines ))
86
90
87
91
def test_bad_rules_file (self ):
88
- self .apply_config ('bad_rules_file' )
89
-
90
- msg = self .orch .wait_for_log_message (
91
- 'nginx' ,
92
- '.*Initialising security library failed.*' ,
93
- timeout_secs = 5 )
94
- self .assertTrue ('Failed to open file: /bad/rules/file' in msg )
92
+ status , log_lines = self ._test_config ("bad_rules_file" )
93
+ self .assertNotEqual (0 , status , log_lines )
94
+ self .assertTrue (
95
+ any ('Failed to open file: "/bad/rules/file' in line
96
+ for line in log_lines ))
95
97
96
98
def test_bad_pool_name (self ):
97
- conf_path = Path (__file__ ).parent / 'conf/http_bad_thread_pool.conf'
98
- conf_text = conf_path .read_text ()
99
- status , log_lines = self .orch .nginx_replace_config (
100
- conf_text , conf_path .name )
99
+ status , log_lines = self ._test_config ("bad_thread_pool" )
101
100
self .assertNotEqual (0 , status , log_lines )
102
101
103
102
self .assertTrue (
104
103
any ('datadog_waf_thread_pool_name: "bad_thread_pool" not found' in
105
104
line for line in log_lines ))
106
105
107
106
def test_multiple_pools (self ):
108
- self .apply_config (' multiple_thread_pools' )
107
+ self .apply_config (" multiple_thread_pools" )
109
108
110
- headers = {' User-Agent' : ' dd-test-scanner-log-block' }
109
+ headers = {" User-Agent" : " dd-test-scanner-log-block" }
111
110
status , _ , _ = self .orch .send_nginx_http_request (
112
- ' /http/a' , 80 , headers )
111
+ " /http/a" , 80 , headers )
113
112
self .assertEqual (status , 403 )
114
113
115
- headers = {' User-Agent' : ' dd-test-scanner-log-block' }
114
+ headers = {" User-Agent" : " dd-test-scanner-log-block" }
116
115
status , _ , _ = self .orch .send_nginx_http_request (
117
- ' /local/' , 80 , headers )
116
+ " /local/" , 80 , headers )
118
117
self .assertEqual (status , 403 )
119
118
120
- headers = {' User-Agent' : ' dd-test-scanner-log-block' }
119
+ headers = {" User-Agent" : " dd-test-scanner-log-block" }
121
120
status , _ , _ = self .orch .send_nginx_http_request (
122
- ' /unmonitored/index.html' , 80 , headers )
121
+ " /unmonitored/index.html" , 80 , headers )
123
122
self .assertEqual (status , 200 )
124
123
125
124
def test_custom_obfuscation (self ):
126
- waf_path = Path (__file__ ).parent / ' ./conf/waf.json'
125
+ waf_path = Path (__file__ ).parent / " ./conf/waf.json"
127
126
waf_text = waf_path .read_text ()
128
- self .orch .nginx_replace_file (' /tmp/waf.json' , waf_text )
127
+ self .orch .nginx_replace_file (" /tmp/waf.json" , waf_text )
129
128
130
- self .apply_config (' custom_obfuscation' )
129
+ self .apply_config (" custom_obfuscation" )
131
130
132
131
# Redaction by key
133
132
# datadog_appsec_obfuscation_key_regex my.special.key;
134
133
status , _ , _ = self .orch .send_nginx_http_request (
135
- ' /http/?my_special_key=matched+value' , 80 )
134
+ " /http/?my_special_key=matched+value" , 80 )
136
135
appsec_data = self .get_appsec_data ()
137
136
self .assertEqual (
138
- appsec_data ['triggers' ][0 ]['rule_matches' ][0 ]['parameters' ][0 ]
139
- ['value' ], '<Redacted>' )
137
+ appsec_data ["triggers" ][0 ]["rule_matches" ][0 ]["parameters" ][0 ]
138
+ ["value" ],
139
+ "<Redacted>" ,
140
+ )
140
141
141
142
# Redaction by value
142
143
# datadog_appsec_obfuscation_value_regex \Az.*;
143
144
status , _ , _ = self .orch .send_nginx_http_request (
144
- ' /http/?the+key=z_matched+value' , 80 )
145
+ " /http/?the+key=z_matched+value" , 80 )
145
146
appsec_data = self .get_appsec_data ()
146
147
self .assertEqual (
147
- appsec_data ['triggers' ][0 ]['rule_matches' ][0 ]['parameters' ][0 ]
148
- ['value' ], '<Redacted>' )
148
+ appsec_data ["triggers" ][0 ]["rule_matches" ][0 ]["parameters" ][0 ]
149
+ ["value" ],
150
+ "<Redacted>" ,
151
+ )
149
152
150
153
def test_no_obfuscation (self ):
151
- waf_path = Path (__file__ ).parent / ' ./conf/waf.json'
154
+ waf_path = Path (__file__ ).parent / " ./conf/waf.json"
152
155
waf_text = waf_path .read_text ()
153
- self .orch .nginx_replace_file (' /tmp/waf.json' , waf_text )
156
+ self .orch .nginx_replace_file (" /tmp/waf.json" , waf_text )
154
157
155
- self .apply_config (' no_obfuscation' )
158
+ self .apply_config (" no_obfuscation" )
156
159
157
- self .orch .sync_service (' agent' )
160
+ self .orch .sync_service (" agent" )
158
161
159
162
# No redaction by key
160
163
status , _ , _ = self .orch .send_nginx_http_request (
161
- ' /http/?password=matched+value' , 80 )
164
+ " /http/?password=matched+value" , 80 )
162
165
appsec_data = self .get_appsec_data ()
163
166
self .assertEqual (
164
- appsec_data ['triggers' ][0 ]['rule_matches' ][0 ]['parameters' ][0 ]
165
- ['value' ], 'matched value' )
167
+ appsec_data ["triggers" ][0 ]["rule_matches" ][0 ]["parameters" ][0 ]
168
+ ["value" ],
169
+ "matched value" ,
170
+ )
0 commit comments