12
12
import time
13
13
from datetime import datetime
14
14
from pathlib import Path
15
- from typing import Dict
15
+ from typing import Dict , Optional
16
16
from uuid import uuid4
17
17
18
18
import click
22
22
from .rule import TOMLRule , TOMLRuleContents
23
23
from .rule_formatter import toml_write
24
24
from .rule_loader import RuleCollection
25
- from .schemas import all_versions
25
+ from .schemas import all_versions , definitions
26
26
from .utils import get_path , get_etc_path , clear_caches , load_dump , load_rule_contents
27
27
28
28
RULES_DIR = get_path ('rules' )
@@ -41,7 +41,7 @@ def root(ctx, debug):
41
41
42
42
43
43
@root .command ('create-rule' )
44
- @click .argument ('path' , type = click . Path ( dir_okay = False ) )
44
+ @click .argument ('path' , type = Path )
45
45
@click .option ('--config' , '-c' , type = click .Path (exists = True , dir_okay = False ), help = 'Rule or config file' )
46
46
@click .option ('--required-only' , is_flag = True , help = 'Only prompt for required fields' )
47
47
@click .option ('--rule-type' , '-t' , type = click .Choice (sorted (TOMLRuleContents .all_rule_types ())),
@@ -95,7 +95,7 @@ def import_rules(input_file, directory):
95
95
96
96
rule_contents = []
97
97
for rule_file in rule_files :
98
- rule_contents .extend (load_rule_contents (rule_file ))
98
+ rule_contents .extend (load_rule_contents (Path ( rule_file ) ))
99
99
100
100
if not rule_contents :
101
101
click .echo ('Must specify at least one file!' )
@@ -156,7 +156,7 @@ def mass_update(ctx, query, metadata, language, field):
156
156
157
157
158
158
@root .command ('view-rule' )
159
- @click .argument ('rule-file' )
159
+ @click .argument ('rule-file' , type = Path )
160
160
@click .option ('--api-format/--rule-format' , default = True , help = 'Print the rule in final api or rule format' )
161
161
@click .pass_context
162
162
def view_rule (ctx , rule_file , api_format ):
@@ -168,21 +168,57 @@ def view_rule(ctx, rule_file, api_format):
168
168
else :
169
169
click .echo (toml_write (rule .contents .to_dict ()))
170
170
171
+ return rule
172
+
173
+
174
+ def _export_rules (rules : RuleCollection , outfile : Path , downgrade_version : Optional [definitions .SemVer ] = None ,
175
+ verbose = True , skip_unsupported = False ):
176
+ """Export rules into a consolidated ndjson file."""
177
+ from .rule import downgrade_contents_from_rule
178
+
179
+ outfile = outfile .with_suffix ('.ndjson' )
180
+ unsupported = []
181
+
182
+ if downgrade_version :
183
+ if skip_unsupported :
184
+ output_lines = []
185
+
186
+ for rule in rules :
187
+ try :
188
+ output_lines .append (json .dumps (downgrade_contents_from_rule (rule , downgrade_version ),
189
+ sort_keys = True ))
190
+ except ValueError as e :
191
+ unsupported .append (f'{ e } : { rule .id } - { rule .name } ' )
192
+ continue
193
+
194
+ else :
195
+ output_lines = [json .dumps (downgrade_contents_from_rule (r , downgrade_version ), sort_keys = True )
196
+ for r in rules ]
197
+ else :
198
+ output_lines = [json .dumps (r .contents .to_api_format (), sort_keys = True ) for r in rules ]
199
+
200
+ outfile .write_text ('\n ' .join (output_lines ) + '\n ' )
201
+
202
+ if verbose :
203
+ click .echo (f'Exported { len (rules ) - len (unsupported )} rules into { outfile } ' )
204
+
205
+ if skip_unsupported and unsupported :
206
+ unsupported_str = '\n - ' .join (unsupported )
207
+ click .echo (f'Skipped { len (unsupported )} unsupported rules: \n - { unsupported_str } ' )
208
+
171
209
172
210
@root .command ('export-rules' )
173
211
@multi_collection
174
- @click .option ('--outfile' , '-o' , default = get_path ('exports' , f'{ time .strftime ("%Y%m%dT%H%M%SL" )} .ndjson' ),
175
- type = click . Path ( dir_okay = False ) , help = 'Name of file for exported rules' )
212
+ @click .option ('--outfile' , '-o' , default = Path ( get_path ('exports' , f'{ time .strftime ("%Y%m%dT%H%M%SL" )} .ndjson' ) ),
213
+ type = Path , help = 'Name of file for exported rules' )
176
214
@click .option ('--replace-id' , '-r' , is_flag = True , help = 'Replace rule IDs with new IDs before export' )
177
215
@click .option ('--stack-version' , type = click .Choice (all_versions ()),
178
216
help = 'Downgrade a rule version to be compatible with older instances of Kibana' )
179
217
@click .option ('--skip-unsupported' , '-s' , is_flag = True ,
180
218
help = 'If `--stack-version` is passed, skip rule types which are unsupported '
181
219
'(an error will be raised otherwise)' )
182
- def export_rules (rules , outfile , replace_id , stack_version , skip_unsupported ) -> RuleCollection :
220
+ def export_rules (rules , outfile : Path , replace_id , stack_version , skip_unsupported ) -> RuleCollection :
183
221
"""Export rule(s) into an importable ndjson file."""
184
- from .packaging import Package
185
-
186
222
assert len (rules ) > 0 , "No rules found"
187
223
188
224
if replace_id :
@@ -196,10 +232,11 @@ def export_rules(rules, outfile, replace_id, stack_version, skip_unsupported) ->
196
232
new_contents = dataclasses .replace (rule .contents , data = new_data )
197
233
rules .add_rule (TOMLRule (contents = new_contents ))
198
234
199
- Path (outfile ).parent .mkdir (exist_ok = True )
200
- package = Package (rules , '_' , verbose = False )
201
- package .export (outfile , downgrade_version = stack_version , skip_unsupported = skip_unsupported )
202
- return package .rules
235
+ outfile .parent .mkdir (exist_ok = True )
236
+ _export_rules (rules = rules , outfile = outfile , downgrade_version = stack_version ,
237
+ skip_unsupported = skip_unsupported )
238
+
239
+ return rules
203
240
204
241
205
242
@root .command ('validate-rule' )
@@ -231,13 +268,14 @@ def search_rules(query, columns, language, count, verbose=True, rules: Dict[str,
231
268
from eql .build import get_engine
232
269
from eql import parse_query
233
270
from eql .pipes import CountPipe
271
+ from .rule import get_unique_query_fields
234
272
235
273
flattened_rules = []
236
274
rules = rules or {str (rule .path ): rule for rule in RuleCollection .default ()}
237
275
238
- for file_name , rule_doc in rules .items ():
276
+ for file_name , rule in rules .items ():
239
277
flat : dict = {"file" : os .path .relpath (file_name )}
240
- flat .update (rule_doc .contents .to_dict ())
278
+ flat .update (rule .contents .to_dict ())
241
279
flat .update (flat ["metadata" ])
242
280
flat .update (flat ["rule" ])
243
281
@@ -254,8 +292,8 @@ def search_rules(query, columns, language, count, verbose=True, rules: Dict[str,
254
292
technique_ids .extend ([t ['id' ] for t in techniques ])
255
293
subtechnique_ids .extend ([st ['id' ] for t in techniques for st in t .get ('subtechnique' , [])])
256
294
257
- flat .update (techniques = technique_ids , tactics = tactic_names , subtechniques = subtechnique_ids )
258
- # unique_fields=TOMLRule. get_unique_query_fields(rule_doc[' rule'] ))
295
+ flat .update (techniques = technique_ids , tactics = tactic_names , subtechniques = subtechnique_ids ,
296
+ unique_fields = get_unique_query_fields (rule ))
259
297
flattened_rules .append (flat )
260
298
261
299
flattened_rules .sort (key = lambda dct : dct ["name" ])
0 commit comments