11import  json 
22from  pathlib  import  Path 
3+ import  unittest 
34
45from  .. import  case , formats 
56
@@ -8,15 +9,20 @@ class TestSecConfig(case.TestCase):
89    requires_waf  =  True 
910
1011    def  apply_config (self , conf_name ):
11-         conf_path  =  Path (__file__ ).parent  /  f' ./conf/http_{ conf_name } '  
12+         conf_path  =  Path (__file__ ).parent  /  f" ./conf/http_{ conf_name } "  
1213        conf_text  =  conf_path .read_text ()
1314        status , log_lines  =  self .orch .nginx_replace_config (
1415            conf_text , conf_path .name )
1516        self .assertEqual (0 , status , log_lines )
1617
18+     def  _test_config (self , conf_name ):
19+         conf_path  =  Path (__file__ ).parent  /  f"./conf/http_{ conf_name }  
20+         conf_text  =  conf_path .read_text ()
21+         return  self .orch .nginx_replace_config (conf_text , conf_path .name )
22+ 
1723    def  get_appsec_data (self ):
1824        self .orch .reload_nginx ()
19-         log_lines  =  self .orch .sync_service (' agent' 
25+         log_lines  =  self .orch .sync_service (" agent" 
2026        entries  =  [
2127            entry  for  entry  in  (formats .parse_trace (line )
2228                                for  line  in  log_lines ) if  entry  is  not None 
@@ -25,141 +31,140 @@ def get_appsec_data(self):
2531        for  entry  in  entries :
2632            for  trace  in  entry :
2733                for  span  in  trace :
28-                     if  span .get (' meta' get (' _dd.appsec.json' 
29-                         return  json .loads (span [' meta' ][ ' _dd.appsec.json' 
30-         self .failureException (' No _dd.appsec.json found in traces' 
34+                     if  span .get (" meta" get (" _dd.appsec.json" 
35+                         return  json .loads (span [" meta" ][ " _dd.appsec.json" 
36+         self .failureException (" No _dd.appsec.json found in traces" 
3137
3238    def  test_custom_templates (self ):
33-         templ_json_path  =  Path (__file__ ).parent  /  ' ./conf/templ.json' 
34-         templ_html_path  =  Path (__file__ ).parent  /  ' ./conf/templ.html' 
35-         self .orch .nginx_replace_file (' /tmp/templ.json' 
39+         templ_json_path  =  Path (__file__ ).parent  /  " ./conf/templ.json" 
40+         templ_html_path  =  Path (__file__ ).parent  /  " ./conf/templ.html" 
41+         self .orch .nginx_replace_file (" /tmp/templ.json" 
3642                                     templ_json_path .read_text ())
37-         self .orch .nginx_replace_file (' /tmp/templ.html' 
43+         self .orch .nginx_replace_file (" /tmp/templ.html" 
3844                                     templ_html_path .read_text ())
3945
40-         self .apply_config (' custom_blocking_templates' 
46+         self .apply_config (" custom_blocking_templates" 
4147
4248        headers  =  {
43-             ' User-Agent' :  ' dd-test-scanner-log-block' 
44-             ' Accept' :  ' text/html' 
49+             " User-Agent" :  " dd-test-scanner-log-block" 
50+             " Accept" :  " text/html" 
4551        }
4652        status , headers , body  =  self .orch .send_nginx_http_request (
47-             ' /http' 80 , headers )
53+             " /http" 80 , headers )
4854        self .assertEqual (status , 403 )
4955        # find content-type header: 
5056        ct  =  next ((v  for  k , v  in  headers  if  k .lower () ==  "content-type" ), None )
51-         self .assertEqual (ct , ' text/html;charset=utf-8' 
52-         self .assertTrue (' My custom blocking response' in  body )
57+         self .assertEqual (ct , " text/html;charset=utf-8" 
58+         self .assertTrue (" My custom blocking response" in  body )
5359
5460        headers  =  {
55-             ' User-Agent' :  ' dd-test-scanner-log-block' 
56-             ' Accept' :  ' text/json' 
61+             " User-Agent" :  " dd-test-scanner-log-block" 
62+             " Accept" :  " text/json" 
5763        }
5864        status , headers , body  =  self .orch .send_nginx_http_request (
59-             ' /http' 80 , headers )
65+             " /http" 80 , headers )
6066        self .assertEqual (status , 403 )
6167        ct  =  next ((v  for  k , v  in  headers  if  k .lower () ==  "content-type" ), None )
62-         self .assertEqual (ct , ' application/json' 
68+         self .assertEqual (ct , " application/json" 
6369        self .assertEqual (
6470            body ,
6571            '{"error": "blocked", "details": "my custom json response"}\n ' )
6672
6773    def  test_appsec_fully_disabled (self ):
68-         self .apply_config (' appsec_fully_disabled' 
74+         self .apply_config (" appsec_fully_disabled" 
6975
7076        headers  =  {
71-             ' User-Agent' :  ' dd-test-scanner-log-block' 
72-             ' Accept' :  ' text/json' 
77+             " User-Agent" :  " dd-test-scanner-log-block" 
78+             " Accept" :  " text/json" 
7379        }
74-         status , _ , _  =  self .orch .send_nginx_http_request ('/' , 80 , headers )
80+         status , _ , _  =  self .orch .send_nginx_http_request ("/" , 80 , headers )
7581        self .assertEqual (status , 200 )
7682
7783    def  test_bad_custom_template (self ):
78-         self .apply_config ('bad_template_file' )
79- 
80-         msg  =  self .orch .wait_for_log_message (
81-             'nginx' ,
82-             '.*Initialising security library failed.*' ,
83-             timeout_secs = 5 )
84+         # We can't afford to shutdown workers 
85+         status , log_lines  =  self ._test_config ("bad_template_file" )
86+         self .assertNotEqual (0 , status , log_lines )
8487        self .assertTrue (
85-             'Failed to open file: /file/that/does/not/exist'  in  msg )
88+             any ('Failed to open file: "/file/that/does/not/exist"'  in  line 
89+                 for  line  in  log_lines ))
8690
8791    def  test_bad_rules_file (self ):
88-         self .apply_config ('bad_rules_file' )
89- 
90-         msg  =  self .orch .wait_for_log_message (
91-             'nginx' ,
92-             '.*Initialising security library failed.*' ,
93-             timeout_secs = 5 )
94-         self .assertTrue ('Failed to open file: /bad/rules/file'  in  msg )
92+         status , log_lines  =  self ._test_config ("bad_rules_file" )
93+         self .assertNotEqual (0 , status , log_lines )
94+         self .assertTrue (
95+             any ('Failed to open file: "/bad/rules/file'  in  line 
96+                 for  line  in  log_lines ))
9597
9698    def  test_bad_pool_name (self ):
97-         conf_path  =  Path (__file__ ).parent  /  'conf/http_bad_thread_pool.conf' 
98-         conf_text  =  conf_path .read_text ()
99-         status , log_lines  =  self .orch .nginx_replace_config (
100-             conf_text , conf_path .name )
99+         status , log_lines  =  self ._test_config ("bad_thread_pool" )
101100        self .assertNotEqual (0 , status , log_lines )
102101
103102        self .assertTrue (
104103            any ('datadog_waf_thread_pool_name: "bad_thread_pool" not found'  in 
105104                line  for  line  in  log_lines ))
106105
107106    def  test_multiple_pools (self ):
108-         self .apply_config (' multiple_thread_pools' 
107+         self .apply_config (" multiple_thread_pools" 
109108
110-         headers  =  {' User-Agent' :  ' dd-test-scanner-log-block' 
109+         headers  =  {" User-Agent" :  " dd-test-scanner-log-block" 
111110        status , _ , _  =  self .orch .send_nginx_http_request (
112-             ' /http/a' 80 , headers )
111+             " /http/a" 80 , headers )
113112        self .assertEqual (status , 403 )
114113
115-         headers  =  {' User-Agent' :  ' dd-test-scanner-log-block' 
114+         headers  =  {" User-Agent" :  " dd-test-scanner-log-block" 
116115        status , _ , _  =  self .orch .send_nginx_http_request (
117-             ' /local/' 80 , headers )
116+             " /local/" 80 , headers )
118117        self .assertEqual (status , 403 )
119118
120-         headers  =  {' User-Agent' :  ' dd-test-scanner-log-block' 
119+         headers  =  {" User-Agent" :  " dd-test-scanner-log-block" 
121120        status , _ , _  =  self .orch .send_nginx_http_request (
122-             ' /unmonitored/index.html' 80 , headers )
121+             " /unmonitored/index.html" 80 , headers )
123122        self .assertEqual (status , 200 )
124123
125124    def  test_custom_obfuscation (self ):
126-         waf_path  =  Path (__file__ ).parent  /  ' ./conf/waf.json' 
125+         waf_path  =  Path (__file__ ).parent  /  " ./conf/waf.json" 
127126        waf_text  =  waf_path .read_text ()
128-         self .orch .nginx_replace_file (' /tmp/waf.json' waf_text )
127+         self .orch .nginx_replace_file (" /tmp/waf.json" waf_text )
129128
130-         self .apply_config (' custom_obfuscation' 
129+         self .apply_config (" custom_obfuscation" 
131130
132131        # Redaction by key 
133132        # datadog_appsec_obfuscation_key_regex my.special.key; 
134133        status , _ , _  =  self .orch .send_nginx_http_request (
135-             ' /http/?my_special_key=matched+value' 80 )
134+             " /http/?my_special_key=matched+value" 80 )
136135        appsec_data  =  self .get_appsec_data ()
137136        self .assertEqual (
138-             appsec_data ['triggers' ][0 ]['rule_matches' ][0 ]['parameters' ][0 ]
139-             ['value' ], '<Redacted>' )
137+             appsec_data ["triggers" ][0 ]["rule_matches" ][0 ]["parameters" ][0 ]
138+             ["value" ],
139+             "<Redacted>" ,
140+         )
140141
141142        # Redaction by value 
142143        # datadog_appsec_obfuscation_value_regex \Az.*; 
143144        status , _ , _  =  self .orch .send_nginx_http_request (
144-             ' /http/?the+key=z_matched+value' 80 )
145+             " /http/?the+key=z_matched+value" 80 )
145146        appsec_data  =  self .get_appsec_data ()
146147        self .assertEqual (
147-             appsec_data ['triggers' ][0 ]['rule_matches' ][0 ]['parameters' ][0 ]
148-             ['value' ], '<Redacted>' )
148+             appsec_data ["triggers" ][0 ]["rule_matches" ][0 ]["parameters" ][0 ]
149+             ["value" ],
150+             "<Redacted>" ,
151+         )
149152
150153    def  test_no_obfuscation (self ):
151-         waf_path  =  Path (__file__ ).parent  /  ' ./conf/waf.json' 
154+         waf_path  =  Path (__file__ ).parent  /  " ./conf/waf.json" 
152155        waf_text  =  waf_path .read_text ()
153-         self .orch .nginx_replace_file (' /tmp/waf.json' waf_text )
156+         self .orch .nginx_replace_file (" /tmp/waf.json" waf_text )
154157
155-         self .apply_config (' no_obfuscation' 
158+         self .apply_config (" no_obfuscation" 
156159
157-         self .orch .sync_service (' agent' 
160+         self .orch .sync_service (" agent" 
158161
159162        # No redaction by key 
160163        status , _ , _  =  self .orch .send_nginx_http_request (
161-             ' /http/?password=matched+value' 80 )
164+             " /http/?password=matched+value" 80 )
162165        appsec_data  =  self .get_appsec_data ()
163166        self .assertEqual (
164-             appsec_data ['triggers' ][0 ]['rule_matches' ][0 ]['parameters' ][0 ]
165-             ['value' ], 'matched value' )
167+             appsec_data ["triggers" ][0 ]["rule_matches" ][0 ]["parameters" ][0 ]
168+             ["value" ],
169+             "matched value" ,
170+         )
0 commit comments